Fixed #12612 -- Corrected handling of parameter formatting in SQLite backend so that executemany raises exceptions when bad parameter counts are provided. Thanks to Niels <niels@pythonheads.nl> for the report, and Gabriel Hurley for the help narrowing down the problem.

git-svn-id: http://code.djangoproject.com/svn/django/trunk@12836 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
Russell Keith-Magee 2010-03-23 13:51:11 +00:00
parent ed36a5f2ce
commit d18501b0ed
2 changed files with 20 additions and 7 deletions

View File

@ -7,6 +7,7 @@ Python 2.5 and later can use a pysqlite2 module or the sqlite3 module in the
standard library. standard library.
""" """
import re
import sys import sys
from django.db import utils from django.db import utils
@ -185,6 +186,8 @@ class DatabaseWrapper(BaseDatabaseWrapper):
if self.settings_dict['NAME'] != ":memory:": if self.settings_dict['NAME'] != ":memory:":
BaseDatabaseWrapper.close(self) BaseDatabaseWrapper.close(self)
FORMAT_QMARK_REGEX = re.compile(r'(?![^%])%s')
class SQLiteCursorWrapper(Database.Cursor): class SQLiteCursorWrapper(Database.Cursor):
""" """
Django uses "format" style placeholders, but pysqlite2 uses "qmark" style. Django uses "format" style placeholders, but pysqlite2 uses "qmark" style.
@ -192,8 +195,8 @@ class SQLiteCursorWrapper(Database.Cursor):
you'll need to use "%%s". you'll need to use "%%s".
""" """
def execute(self, query, params=()): def execute(self, query, params=()):
query = self.convert_query(query)
try: try:
query = self.convert_query(query, len(params))
return Database.Cursor.execute(self, query, params) return Database.Cursor.execute(self, query, params)
except Database.IntegrityError, e: except Database.IntegrityError, e:
raise utils.IntegrityError, utils.IntegrityError(*tuple(e)), sys.exc_info()[2] raise utils.IntegrityError, utils.IntegrityError(*tuple(e)), sys.exc_info()[2]
@ -201,19 +204,16 @@ class SQLiteCursorWrapper(Database.Cursor):
raise utils.DatabaseError, utils.DatabaseError(*tuple(e)), sys.exc_info()[2] raise utils.DatabaseError, utils.DatabaseError(*tuple(e)), sys.exc_info()[2]
def executemany(self, query, param_list): def executemany(self, query, param_list):
query = self.convert_query(query)
try: try:
query = self.convert_query(query, len(param_list[0]))
return Database.Cursor.executemany(self, query, param_list) return Database.Cursor.executemany(self, query, param_list)
except (IndexError,TypeError):
# No parameter list provided
return None
except Database.IntegrityError, e: except Database.IntegrityError, e:
raise utils.IntegrityError, utils.IntegrityError(*tuple(e)), sys.exc_info()[2] raise utils.IntegrityError, utils.IntegrityError(*tuple(e)), sys.exc_info()[2]
except Database.DatabaseError, e: except Database.DatabaseError, e:
raise utils.DatabaseError, utils.DatabaseError(*tuple(e)), sys.exc_info()[2] raise utils.DatabaseError, utils.DatabaseError(*tuple(e)), sys.exc_info()[2]
def convert_query(self, query, num_params): def convert_query(self, query):
return query % tuple("?" * num_params) return FORMAT_QMARK_REGEX.sub('?', query).replace('%%','%')
def _sqlite_extract(lookup_type, dt): def _sqlite_extract(lookup_type, dt):
if dt is None: if dt is None:

View File

@ -66,6 +66,19 @@ class DateQuotingTest(TestCase):
classes = models.SchoolClass.objects.filter(last_updated__day=20) classes = models.SchoolClass.objects.filter(last_updated__day=20)
self.assertEqual(len(classes), 1) self.assertEqual(len(classes), 1)
class ParameterHandlingTest(TestCase):
def test_bad_parameter_count(self):
"An executemany call with too many/not enough parameters will raise an exception (Refs #12612)"
cursor = connection.cursor()
query = ('INSERT INTO %s (%s, %s) VALUES (%%s, %%s)' % (
connection.introspection.table_name_converter('backends_square'),
connection.ops.quote_name('root'),
connection.ops.quote_name('square')
))
self.assertRaises(Exception, cursor.executemany, query, [(1,2,3),])
self.assertRaises(Exception, cursor.executemany, query, [(1,),])
def connection_created_test(sender, **kwargs): def connection_created_test(sender, **kwargs):
print 'connection_created signal' print 'connection_created signal'