From 64b3c413da011f55469165256261f406a277e822 Mon Sep 17 00:00:00 2001 From: David Sanders Date: Tue, 18 Oct 2022 23:13:18 +1100 Subject: [PATCH] Fixed #34103 -- Fixed logging SQL queries with duplicate parameters on Oracle. --- django/db/backends/oracle/operations.py | 16 ++++++++-------- tests/backends/tests.py | 17 +++++++++++++++++ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/django/db/backends/oracle/operations.py b/django/db/backends/oracle/operations.py index 78f998183ea..d34ca23bae3 100644 --- a/django/db/backends/oracle/operations.py +++ b/django/db/backends/oracle/operations.py @@ -323,16 +323,16 @@ END; # Unlike Psycopg's `query` and MySQLdb`'s `_executed`, cx_Oracle's # `statement` doesn't contain the query parameters. Substitute # parameters manually. - if isinstance(params, (tuple, list)): - for i, param in enumerate(reversed(params), start=1): - param_num = len(params) - i - statement = statement.replace( - ":arg%d" % param_num, force_str(param, errors="replace") - ) - elif isinstance(params, dict): + if params: + if isinstance(params, (tuple, list)): + params = { + f":arg{i}": param for i, param in enumerate(dict.fromkeys(params)) + } + elif isinstance(params, dict): + params = {f":{key}": val for (key, val) in params.items()} for key in sorted(params, key=len, reverse=True): statement = statement.replace( - ":%s" % key, force_str(params[key], errors="replace") + key, force_str(params[key], errors="replace") ) return statement diff --git a/tests/backends/tests.py b/tests/backends/tests.py index c6980058ca4..c3cfa61fdb1 100644 --- a/tests/backends/tests.py +++ b/tests/backends/tests.py @@ -142,6 +142,23 @@ class LastExecutedQueryTest(TestCase): sql % params, ) + def test_last_executed_query_with_duplicate_params(self): + square_opts = Square._meta + table = connection.introspection.identifier_converter(square_opts.db_table) + id_column = connection.ops.quote_name(square_opts.get_field("id").column) + root_column = connection.ops.quote_name(square_opts.get_field("root").column) + sql = f"UPDATE {table} SET {root_column} = %s + %s WHERE {id_column} = %s" + with connection.cursor() as cursor: + params = [42, 42, 1] + cursor.execute(sql, params) + last_executed_query = connection.ops.last_executed_query( + cursor, sql, params + ) + self.assertEqual( + last_executed_query, + f"UPDATE {table} SET {root_column} = 42 + 42 WHERE {id_column} = 1", + ) + class ParameterHandlingTest(TestCase): def test_bad_parameter_count(self):