Fixed #33815 -- Fixed last_executed_query() on Oracle when parameter names overlap.

This commit is contained in:
Mariusz Felisiak 2022-07-05 05:53:49 +02:00 committed by GitHub
parent d12d7c4c42
commit 249ecc437f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 4 deletions

View File

@ -309,14 +309,15 @@ END;
# `statement` doesn't contain the query parameters. Substitute # `statement` doesn't contain the query parameters. Substitute
# parameters manually. # parameters manually.
if isinstance(params, (tuple, list)): if isinstance(params, (tuple, list)):
for i, param in enumerate(params): for i, param in enumerate(reversed(params), start=1):
param_num = len(params) - i
statement = statement.replace( statement = statement.replace(
":arg%d" % i, force_str(param, errors="replace") ":arg%d" % param_num, force_str(param, errors="replace")
) )
elif isinstance(params, dict): elif isinstance(params, dict):
for key, param in params.items(): for key in sorted(params, key=len, reverse=True):
statement = statement.replace( statement = statement.replace(
":%s" % key, force_str(param, errors="replace") ":%s" % key, force_str(params[key], errors="replace")
) )
return statement return statement

View File

@ -101,6 +101,7 @@ class LastExecutedQueryTest(TestCase):
pk=1, pk=1,
reporter__pk=9, reporter__pk=9,
).exclude(reporter__pk__in=[2, 1]), ).exclude(reporter__pk__in=[2, 1]),
Article.objects.filter(pk__in=list(range(20, 31))),
): ):
sql, params = qs.query.sql_with_params() sql, params = qs.query.sql_with_params()
with qs.query.get_compiler(DEFAULT_DB_ALIAS).execute_sql(CURSOR) as cursor: with qs.query.get_compiler(DEFAULT_DB_ALIAS).execute_sql(CURSOR) as cursor:
@ -125,6 +126,22 @@ class LastExecutedQueryTest(TestCase):
sql % params, sql % params,
) )
@skipUnlessDBFeature("supports_paramstyle_pyformat")
def test_last_executed_query_dict_overlap_keys(self):
square_opts = Square._meta
sql = "INSERT INTO %s (%s, %s) VALUES (%%(root)s, %%(root2)s)" % (
connection.introspection.identifier_converter(square_opts.db_table),
connection.ops.quote_name(square_opts.get_field("root").column),
connection.ops.quote_name(square_opts.get_field("square").column),
)
with connection.cursor() as cursor:
params = {"root": 2, "root2": 4}
cursor.execute(sql, params)
self.assertEqual(
cursor.db.ops.last_executed_query(cursor, sql, params),
sql % params,
)
class ParameterHandlingTest(TestCase): class ParameterHandlingTest(TestCase):
def test_bad_parameter_count(self): def test_bad_parameter_count(self):