From 76aecfbc4b49f5ab0613cccff1df6fab03253fab Mon Sep 17 00:00:00 2001 From: Claude Paroz Date: Sat, 23 Mar 2013 16:09:56 +0100 Subject: [PATCH] Fixed #9055 -- Standardized behaviour of parameter escaping in db cursors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, depending on the database backend or the cursor type, you'd need to double the percent signs in the query before passing it to cursor.execute. Now cursor.execute consistently need percent doubling whenever params argument is not None (placeholder substitution will happen). Thanks Thomas Güttler for the report and Walter Doekes for his work on the patch. --- django/db/backends/__init__.py | 2 ++ django/db/backends/mysql/base.py | 1 + django/db/backends/oracle/base.py | 13 +++++++------ django/db/backends/sqlite3/base.py | 4 +++- django/db/backends/util.py | 5 ++++- docs/topics/db/sql.txt | 6 ++++++ tests/backends/tests.py | 24 ++++++++++++++++++++---- 7 files changed, 43 insertions(+), 12 deletions(-) diff --git a/django/db/backends/__init__.py b/django/db/backends/__init__.py index b713b4c97a..9acef4ad19 100644 --- a/django/db/backends/__init__.py +++ b/django/db/backends/__init__.py @@ -815,6 +815,8 @@ class BaseDatabaseOperations(object): to_unicode = lambda s: force_text(s, strings_only=True, errors='replace') if isinstance(params, (list, tuple)): u_params = tuple(to_unicode(val) for val in params) + elif params is None: + u_params = () else: u_params = dict((to_unicode(k), to_unicode(v)) for k, v in params.items()) diff --git a/django/db/backends/mysql/base.py b/django/db/backends/mysql/base.py index f7d07cf4b7..57b6f82270 100644 --- a/django/db/backends/mysql/base.py +++ b/django/db/backends/mysql/base.py @@ -115,6 +115,7 @@ class CursorWrapper(object): def execute(self, query, args=None): try: + # args is None means no string interpolation return self.cursor.execute(query, args) except Database.OperationalError as e: # Map some error codes to IntegrityError, since they seem to be diff --git a/django/db/backends/oracle/base.py b/django/db/backends/oracle/base.py index 6309088c4c..9e69743d33 100644 --- a/django/db/backends/oracle/base.py +++ b/django/db/backends/oracle/base.py @@ -757,18 +757,19 @@ class FormatStylePlaceholderCursor(object): return [p.force_bytes for p in params] def execute(self, query, params=None): - if params is None: - params = [] - else: - params = self._format_params(params) - args = [(':arg%d' % i) for i in range(len(params))] # cx_Oracle wants no trailing ';' for SQL statements. For PL/SQL, it # it does want a trailing ';' but not a trailing '/'. However, these # characters must be included in the original query in case the query # is being passed to SQL*Plus. if query.endswith(';') or query.endswith('/'): query = query[:-1] - query = convert_unicode(query % tuple(args), self.charset) + if params is None: + params = [] + query = convert_unicode(query, self.charset) + else: + params = self._format_params(params) + args = [(':arg%d' % i) for i in range(len(params))] + query = convert_unicode(query % tuple(args), self.charset) self._guess_input_sizes([params]) try: return self.cursor.execute(query, self._param_generator(params)) diff --git a/django/db/backends/sqlite3/base.py b/django/db/backends/sqlite3/base.py index f70c3872a8..ead325a33b 100644 --- a/django/db/backends/sqlite3/base.py +++ b/django/db/backends/sqlite3/base.py @@ -433,7 +433,9 @@ class SQLiteCursorWrapper(Database.Cursor): This fixes it -- but note that if you want to use a literal "%s" in a query, you'll need to use "%%s". """ - def execute(self, query, params=()): + def execute(self, query, params=None): + if params is None: + return Database.Cursor.execute(self, query) query = self.convert_query(query) return Database.Cursor.execute(self, query, params) diff --git a/django/db/backends/util.py b/django/db/backends/util.py index 5eb6626fc7..b7fb48dca5 100644 --- a/django/db/backends/util.py +++ b/django/db/backends/util.py @@ -35,11 +35,14 @@ class CursorWrapper(object): class CursorDebugWrapper(CursorWrapper): - def execute(self, sql, params=()): + def execute(self, sql, params=None): self.db.set_dirty() start = time() try: with self.db.wrap_database_errors(): + if params is None: + # params default might be backend specific + return self.cursor.execute(sql) return self.cursor.execute(sql, params) finally: stop = time() diff --git a/docs/topics/db/sql.txt b/docs/topics/db/sql.txt index b52e6e795f..34cfa382d3 100644 --- a/docs/topics/db/sql.txt +++ b/docs/topics/db/sql.txt @@ -227,6 +227,12 @@ For example:: were committed to the database. Since Django now defaults to database-level autocommit, this isn't necessary any longer. +Note that if you want to include literal percent signs in the query, you have to +double them in the case you are passing parameters:: + + cursor.execute("SELECT foo FROM bar WHERE baz = '30%'") + cursor.execute("SELECT foo FROM bar WHERE baz = '30%%' and id = %s", [self.id]) + If you are using :doc:`more than one database `, you can use ``django.db.connections`` to obtain the connection (and cursor) for a specific database. ``django.db.connections`` is a dictionary-like diff --git a/tests/backends/tests.py b/tests/backends/tests.py index cec2267450..9a5f6c008d 100644 --- a/tests/backends/tests.py +++ b/tests/backends/tests.py @@ -361,18 +361,34 @@ class ConnectionCreatedSignalTest(TransactionTestCase): class EscapingChecks(TestCase): + """ + All tests in this test case are also run with settings.DEBUG=True in + EscapingChecksDebug test case, to also test CursorDebugWrapper. + """ + def test_paramless_no_escaping(self): + cursor = connection.cursor() + cursor.execute("SELECT '%s'") + self.assertEqual(cursor.fetchall()[0][0], '%s') + + def test_parameter_escaping(self): + cursor = connection.cursor() + cursor.execute("SELECT '%%', %s", ('%d',)) + self.assertEqual(cursor.fetchall()[0], ('%', '%d')) @unittest.skipUnless(connection.vendor == 'sqlite', "This is a sqlite-specific issue") - def test_parameter_escaping(self): + def test_sqlite_parameter_escaping(self): #13648: '%s' escaping support for sqlite3 cursor = connection.cursor() - response = cursor.execute( - "select strftime('%%s', date('now'))").fetchall()[0][0] - self.assertNotEqual(response, None) + cursor.execute("select strftime('%s', date('now'))") + response = cursor.fetchall()[0][0] # response should be an non-zero integer self.assertTrue(int(response)) +@override_settings(DEBUG=True) +class EscapingChecksDebug(EscapingChecks): + pass + class SqlliteAggregationTests(TestCase): """