Fixed #9055 -- Standardized behaviour of parameter escaping in db cursors

Previously, depending on the database backend or the cursor type,
you'd need to double the percent signs in the query before passing
it to cursor.execute. Now cursor.execute consistently need percent
doubling whenever params argument is not None (placeholder substitution
will happen).
Thanks Thomas Güttler for the report and Walter Doekes for his work
on the patch.
This commit is contained in:
Claude Paroz 2013-03-23 16:09:56 +01:00
parent e7514e4978
commit 76aecfbc4b
7 changed files with 43 additions and 12 deletions

View File

@ -815,6 +815,8 @@ class BaseDatabaseOperations(object):
to_unicode = lambda s: force_text(s, strings_only=True, errors='replace') to_unicode = lambda s: force_text(s, strings_only=True, errors='replace')
if isinstance(params, (list, tuple)): if isinstance(params, (list, tuple)):
u_params = tuple(to_unicode(val) for val in params) u_params = tuple(to_unicode(val) for val in params)
elif params is None:
u_params = ()
else: else:
u_params = dict((to_unicode(k), to_unicode(v)) for k, v in params.items()) u_params = dict((to_unicode(k), to_unicode(v)) for k, v in params.items())

View File

@ -115,6 +115,7 @@ class CursorWrapper(object):
def execute(self, query, args=None): def execute(self, query, args=None):
try: try:
# args is None means no string interpolation
return self.cursor.execute(query, args) return self.cursor.execute(query, args)
except Database.OperationalError as e: except Database.OperationalError as e:
# Map some error codes to IntegrityError, since they seem to be # Map some error codes to IntegrityError, since they seem to be

View File

@ -757,17 +757,18 @@ class FormatStylePlaceholderCursor(object):
return [p.force_bytes for p in params] return [p.force_bytes for p in params]
def execute(self, query, params=None): def execute(self, query, params=None):
if params is None:
params = []
else:
params = self._format_params(params)
args = [(':arg%d' % i) for i in range(len(params))]
# cx_Oracle wants no trailing ';' for SQL statements. For PL/SQL, it # cx_Oracle wants no trailing ';' for SQL statements. For PL/SQL, it
# it does want a trailing ';' but not a trailing '/'. However, these # it does want a trailing ';' but not a trailing '/'. However, these
# characters must be included in the original query in case the query # characters must be included in the original query in case the query
# is being passed to SQL*Plus. # is being passed to SQL*Plus.
if query.endswith(';') or query.endswith('/'): if query.endswith(';') or query.endswith('/'):
query = query[:-1] query = query[:-1]
if params is None:
params = []
query = convert_unicode(query, self.charset)
else:
params = self._format_params(params)
args = [(':arg%d' % i) for i in range(len(params))]
query = convert_unicode(query % tuple(args), self.charset) query = convert_unicode(query % tuple(args), self.charset)
self._guess_input_sizes([params]) self._guess_input_sizes([params])
try: try:

View File

@ -433,7 +433,9 @@ class SQLiteCursorWrapper(Database.Cursor):
This fixes it -- but note that if you want to use a literal "%s" in a query, This fixes it -- but note that if you want to use a literal "%s" in a query,
you'll need to use "%%s". you'll need to use "%%s".
""" """
def execute(self, query, params=()): def execute(self, query, params=None):
if params is None:
return Database.Cursor.execute(self, query)
query = self.convert_query(query) query = self.convert_query(query)
return Database.Cursor.execute(self, query, params) return Database.Cursor.execute(self, query, params)

View File

@ -35,11 +35,14 @@ class CursorWrapper(object):
class CursorDebugWrapper(CursorWrapper): class CursorDebugWrapper(CursorWrapper):
def execute(self, sql, params=()): def execute(self, sql, params=None):
self.db.set_dirty() self.db.set_dirty()
start = time() start = time()
try: try:
with self.db.wrap_database_errors(): with self.db.wrap_database_errors():
if params is None:
# params default might be backend specific
return self.cursor.execute(sql)
return self.cursor.execute(sql, params) return self.cursor.execute(sql, params)
finally: finally:
stop = time() stop = time()

View File

@ -227,6 +227,12 @@ For example::
were committed to the database. Since Django now defaults to database-level were committed to the database. Since Django now defaults to database-level
autocommit, this isn't necessary any longer. autocommit, this isn't necessary any longer.
Note that if you want to include literal percent signs in the query, you have to
double them in the case you are passing parameters::
cursor.execute("SELECT foo FROM bar WHERE baz = '30%'")
cursor.execute("SELECT foo FROM bar WHERE baz = '30%%' and id = %s", [self.id])
If you are using :doc:`more than one database </topics/db/multi-db>`, you can If you are using :doc:`more than one database </topics/db/multi-db>`, you can
use ``django.db.connections`` to obtain the connection (and cursor) for a use ``django.db.connections`` to obtain the connection (and cursor) for a
specific database. ``django.db.connections`` is a dictionary-like specific database. ``django.db.connections`` is a dictionary-like

View File

@ -361,18 +361,34 @@ class ConnectionCreatedSignalTest(TransactionTestCase):
class EscapingChecks(TestCase): class EscapingChecks(TestCase):
"""
All tests in this test case are also run with settings.DEBUG=True in
EscapingChecksDebug test case, to also test CursorDebugWrapper.
"""
def test_paramless_no_escaping(self):
cursor = connection.cursor()
cursor.execute("SELECT '%s'")
self.assertEqual(cursor.fetchall()[0][0], '%s')
def test_parameter_escaping(self):
cursor = connection.cursor()
cursor.execute("SELECT '%%', %s", ('%d',))
self.assertEqual(cursor.fetchall()[0], ('%', '%d'))
@unittest.skipUnless(connection.vendor == 'sqlite', @unittest.skipUnless(connection.vendor == 'sqlite',
"This is a sqlite-specific issue") "This is a sqlite-specific issue")
def test_parameter_escaping(self): def test_sqlite_parameter_escaping(self):
#13648: '%s' escaping support for sqlite3 #13648: '%s' escaping support for sqlite3
cursor = connection.cursor() cursor = connection.cursor()
response = cursor.execute( cursor.execute("select strftime('%s', date('now'))")
"select strftime('%%s', date('now'))").fetchall()[0][0] response = cursor.fetchall()[0][0]
self.assertNotEqual(response, None)
# response should be an non-zero integer # response should be an non-zero integer
self.assertTrue(int(response)) self.assertTrue(int(response))
@override_settings(DEBUG=True)
class EscapingChecksDebug(EscapingChecks):
pass
class SqlliteAggregationTests(TestCase): class SqlliteAggregationTests(TestCase):
""" """