Support 'pyformat' style parameters in raw queries, Refs #10070
Add support for Oracle, fix an issue with the repr of RawQuerySet, add tests and documentations. Also added a 'supports_paramstyle_pyformat' database feature, True by default, False for SQLite. Thanks Donald Stufft for review of documentation.
This commit is contained in:
parent
7c0b72a826
commit
d097417025
|
@ -613,6 +613,11 @@ class BaseDatabaseFeatures(object):
|
|||
# when autocommit is disabled? http://bugs.python.org/issue8145#msg109965
|
||||
autocommits_when_autocommit_is_off = False
|
||||
|
||||
# Does the backend support 'pyformat' style ("... %(name)s ...", {'name': value})
|
||||
# parameter passing? Note this can be provided by the backend even if not
|
||||
# supported by the Python driver
|
||||
supports_paramstyle_pyformat = True
|
||||
|
||||
def __init__(self, connection):
|
||||
self.connection = connection
|
||||
|
||||
|
|
|
@ -757,20 +757,37 @@ class FormatStylePlaceholderCursor(object):
|
|||
self.cursor.arraysize = 100
|
||||
|
||||
def _format_params(self, params):
|
||||
return tuple([OracleParam(p, self, True) for p in params])
|
||||
try:
|
||||
return dict((k,OracleParam(v, self, True)) for k,v in params.items())
|
||||
except AttributeError:
|
||||
return tuple([OracleParam(p, self, True) for p in params])
|
||||
|
||||
def _guess_input_sizes(self, params_list):
|
||||
sizes = [None] * len(params_list[0])
|
||||
for params in params_list:
|
||||
for i, value in enumerate(params):
|
||||
if value.input_size:
|
||||
sizes[i] = value.input_size
|
||||
self.setinputsizes(*sizes)
|
||||
# Try dict handling; if that fails, treat as sequence
|
||||
if hasattr(params_list[0], 'keys'):
|
||||
sizes = {}
|
||||
for params in params_list:
|
||||
for k, value in params.items():
|
||||
if value.input_size:
|
||||
sizes[k] = value.input_size
|
||||
self.setinputsizes(**sizes)
|
||||
else:
|
||||
# It's not a list of dicts; it's a list of sequences
|
||||
sizes = [None] * len(params_list[0])
|
||||
for params in params_list:
|
||||
for i, value in enumerate(params):
|
||||
if value.input_size:
|
||||
sizes[i] = value.input_size
|
||||
self.setinputsizes(*sizes)
|
||||
|
||||
def _param_generator(self, params):
|
||||
return [p.force_bytes for p in params]
|
||||
# Try dict handling; if that fails, treat as sequence
|
||||
if hasattr(params, 'items'):
|
||||
return dict((k, v.force_bytes) for k,v in params.items())
|
||||
else:
|
||||
return [p.force_bytes for p in params]
|
||||
|
||||
def execute(self, query, params=None):
|
||||
def _fix_for_params(self, query, params):
|
||||
# cx_Oracle wants no trailing ';' for SQL statements. For PL/SQL, it
|
||||
# it does want a trailing ';' but not a trailing '/'. However, these
|
||||
# characters must be included in the original query in case the query
|
||||
|
@ -780,10 +797,18 @@ class FormatStylePlaceholderCursor(object):
|
|||
if params is None:
|
||||
params = []
|
||||
query = convert_unicode(query, self.charset)
|
||||
elif hasattr(params, 'keys'):
|
||||
# Handle params as dict
|
||||
args = dict((k, ":%s"%k) for k in params.keys())
|
||||
query = convert_unicode(query % args, self.charset)
|
||||
else:
|
||||
params = self._format_params(params)
|
||||
# Handle params as sequence
|
||||
args = [(':arg%d' % i) for i in range(len(params))]
|
||||
query = convert_unicode(query % tuple(args), self.charset)
|
||||
return query, self._format_params(params)
|
||||
|
||||
def execute(self, query, params=None):
|
||||
query, params = self._fix_for_params(query, params)
|
||||
self._guess_input_sizes([params])
|
||||
try:
|
||||
return self.cursor.execute(query, self._param_generator(params))
|
||||
|
@ -794,22 +819,15 @@ class FormatStylePlaceholderCursor(object):
|
|||
raise
|
||||
|
||||
def executemany(self, query, params=None):
|
||||
# cx_Oracle doesn't support iterators, convert them to lists
|
||||
if params is not None and not isinstance(params, (list, tuple)):
|
||||
params = list(params)
|
||||
try:
|
||||
args = [(':arg%d' % i) for i in range(len(params[0]))]
|
||||
except (IndexError, TypeError):
|
||||
if not params:
|
||||
# No params given, nothing to do
|
||||
return None
|
||||
# cx_Oracle wants no trailing ';' for SQL statements. For PL/SQL, it
|
||||
# it does want a trailing ';' but not a trailing '/'. However, these
|
||||
# characters must be included in the original query in case the query
|
||||
# is being passed to SQL*Plus.
|
||||
if query.endswith(';') or query.endswith('/'):
|
||||
query = query[:-1]
|
||||
query = convert_unicode(query % tuple(args), self.charset)
|
||||
formatted = [self._format_params(i) for i in params]
|
||||
# uniform treatment for sequences and iterables
|
||||
params_iter = iter(params)
|
||||
query, firstparams = self._fix_for_params(query, next(params_iter))
|
||||
# we build a list of formatted params; as we're going to traverse it
|
||||
# more than once, we can't make it lazy by using a generator
|
||||
formatted = [firstparams]+[self._format_params(p) for p in params_iter]
|
||||
self._guess_input_sizes(formatted)
|
||||
try:
|
||||
return self.cursor.executemany(query,
|
||||
|
|
|
@ -101,6 +101,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
|
|||
has_bulk_insert = True
|
||||
can_combine_inserts_with_and_without_auto_increment_pk = False
|
||||
autocommits_when_autocommit_is_off = True
|
||||
supports_paramstyle_pyformat = False
|
||||
|
||||
@cached_property
|
||||
def uses_savepoints(self):
|
||||
|
|
|
@ -1445,7 +1445,10 @@ class RawQuerySet(object):
|
|||
yield instance
|
||||
|
||||
def __repr__(self):
|
||||
return "<RawQuerySet: %r>" % (self.raw_query % tuple(self.params))
|
||||
text = self.raw_query
|
||||
if self.params:
|
||||
text = text % (self.params if hasattr(self.params, 'keys') else tuple(self.params))
|
||||
return "<RawQuerySet: %r>" % text
|
||||
|
||||
def __getitem__(self, k):
|
||||
return list(self)[k]
|
||||
|
|
|
@ -623,6 +623,14 @@ If you're getting this error, you can solve it by:
|
|||
SQLite does not support the ``SELECT ... FOR UPDATE`` syntax. Calling it will
|
||||
have no effect.
|
||||
|
||||
"pyformat" parameter style in raw queries not supported
|
||||
-------------------------------------------------------
|
||||
|
||||
For most backends, raw queries (``Manager.raw()`` or ``cursor.execute()``)
|
||||
can use the "pyformat" parameter style, where placeholders in the query
|
||||
are given as ``'%(name)s'`` and the parameters are passed as a dictionary
|
||||
rather than a list. SQLite does not support this.
|
||||
|
||||
.. _sqlite-connection-queries:
|
||||
|
||||
Parameters not quoted in ``connection.queries``
|
||||
|
|
|
@ -337,6 +337,12 @@ Minor features
|
|||
default) to allow customizing the :attr:`~django.forms.Form.prefix` of the
|
||||
form.
|
||||
|
||||
* Raw queries (``Manager.raw()`` or ``cursor.execute()``) can now use the
|
||||
"pyformat" parameter style, where placeholders in the query are given as
|
||||
``'%(name)s'`` and the parameters are passed as a dictionary rather than
|
||||
a list (except on SQLite). This has long been possible (but not officially
|
||||
supported) on MySQL and PostgreSQL, and is now also available on Oracle.
|
||||
|
||||
Backwards incompatible changes in 1.6
|
||||
=====================================
|
||||
|
||||
|
|
|
@ -166,9 +166,17 @@ argument to ``raw()``::
|
|||
>>> lname = 'Doe'
|
||||
>>> Person.objects.raw('SELECT * FROM myapp_person WHERE last_name = %s', [lname])
|
||||
|
||||
``params`` is a list of parameters. You'll use ``%s`` placeholders in the
|
||||
query string (regardless of your database engine); they'll be replaced with
|
||||
parameters from the ``params`` list.
|
||||
``params`` is a list or dictionary of parameters. You'll use ``%s``
|
||||
placeholders in the query string for a list, or ``%(key)s``
|
||||
placeholders for a dictionary (where ``key`` is replaced by a
|
||||
dictionary key, of course), regardless of your database engine. Such
|
||||
placeholders will be replaced with parameters from the ``params``
|
||||
argument.
|
||||
|
||||
.. note:: Dictionary params not supported with SQLite
|
||||
|
||||
Dictionary params are not supported with the SQLite backend; with
|
||||
this backend, you must pass parameters as a list.
|
||||
|
||||
.. warning::
|
||||
|
||||
|
@ -181,14 +189,21 @@ parameters from the ``params`` list.
|
|||
|
||||
**Don't.**
|
||||
|
||||
Using the ``params`` list completely protects you from `SQL injection
|
||||
Using the ``params`` argument completely protects you from `SQL injection
|
||||
attacks`__, a common exploit where attackers inject arbitrary SQL into
|
||||
your database. If you use string interpolation, sooner or later you'll
|
||||
fall victim to SQL injection. As long as you remember to always use the
|
||||
``params`` list you'll be protected.
|
||||
``params`` argument you'll be protected.
|
||||
|
||||
__ http://en.wikipedia.org/wiki/SQL_injection
|
||||
|
||||
.. versionchanged:: 1.6
|
||||
|
||||
In Django 1.5 and earlier, you could pass parameters as dictionaries
|
||||
when using PostgreSQL or MySQL, although this wasn't documented. Now
|
||||
you can also do this whem using Oracle, and it is officially supported.
|
||||
|
||||
|
||||
.. _executing-custom-sql:
|
||||
|
||||
Executing custom SQL directly
|
||||
|
|
|
@ -456,13 +456,24 @@ class SqliteChecks(TestCase):
|
|||
class BackendTestCase(TestCase):
|
||||
|
||||
def create_squares_with_executemany(self, args):
|
||||
self.create_squares(args, 'format', True)
|
||||
|
||||
def create_squares(self, args, paramstyle, multiple):
|
||||
cursor = connection.cursor()
|
||||
opts = models.Square._meta
|
||||
tbl = connection.introspection.table_name_converter(opts.db_table)
|
||||
f1 = connection.ops.quote_name(opts.get_field('root').column)
|
||||
f2 = connection.ops.quote_name(opts.get_field('square').column)
|
||||
query = 'INSERT INTO %s (%s, %s) VALUES (%%s, %%s)' % (tbl, f1, f2)
|
||||
cursor.executemany(query, args)
|
||||
if paramstyle=='format':
|
||||
query = 'INSERT INTO %s (%s, %s) VALUES (%%s, %%s)' % (tbl, f1, f2)
|
||||
elif paramstyle=='pyformat':
|
||||
query = 'INSERT INTO %s (%s, %s) VALUES (%%(root)s, %%(square)s)' % (tbl, f1, f2)
|
||||
else:
|
||||
raise ValueError("unsupported paramstyle in test")
|
||||
if multiple:
|
||||
cursor.executemany(query, args)
|
||||
else:
|
||||
cursor.execute(query, args)
|
||||
|
||||
def test_cursor_executemany(self):
|
||||
#4896: Test cursor.executemany
|
||||
|
@ -491,6 +502,35 @@ class BackendTestCase(TestCase):
|
|||
self.create_squares_with_executemany(args)
|
||||
self.assertEqual(models.Square.objects.count(), 9)
|
||||
|
||||
@skipUnlessDBFeature('supports_paramstyle_pyformat')
|
||||
def test_cursor_execute_with_pyformat(self):
|
||||
#10070: Support pyformat style passing of paramters
|
||||
args = {'root': 3, 'square': 9}
|
||||
self.create_squares(args, 'pyformat', multiple=False)
|
||||
self.assertEqual(models.Square.objects.count(), 1)
|
||||
|
||||
@skipUnlessDBFeature('supports_paramstyle_pyformat')
|
||||
def test_cursor_executemany_with_pyformat(self):
|
||||
#10070: Support pyformat style passing of paramters
|
||||
args = [{'root': i, 'square': i**2} for i in range(-5, 6)]
|
||||
self.create_squares(args, 'pyformat', multiple=True)
|
||||
self.assertEqual(models.Square.objects.count(), 11)
|
||||
for i in range(-5, 6):
|
||||
square = models.Square.objects.get(root=i)
|
||||
self.assertEqual(square.square, i**2)
|
||||
|
||||
@skipUnlessDBFeature('supports_paramstyle_pyformat')
|
||||
def test_cursor_executemany_with_pyformat_iterator(self):
|
||||
args = iter({'root': i, 'square': i**2} for i in range(-3, 2))
|
||||
self.create_squares(args, 'pyformat', multiple=True)
|
||||
self.assertEqual(models.Square.objects.count(), 5)
|
||||
|
||||
args = iter({'root': i, 'square': i**2} for i in range(3, 7))
|
||||
with override_settings(DEBUG=True):
|
||||
# same test for DebugCursorWrapper
|
||||
self.create_squares(args, 'pyformat', multiple=True)
|
||||
self.assertEqual(models.Square.objects.count(), 9)
|
||||
|
||||
def test_unicode_fetches(self):
|
||||
#6254: fetchone, fetchmany, fetchall return strings as unicode objects
|
||||
qn = connection.ops.quote_name
|
||||
|
|
|
@ -3,7 +3,7 @@ from __future__ import absolute_import
|
|||
from datetime import date
|
||||
|
||||
from django.db.models.query_utils import InvalidQuery
|
||||
from django.test import TestCase
|
||||
from django.test import TestCase, skipUnlessDBFeature
|
||||
|
||||
from .models import Author, Book, Coffee, Reviewer, FriendlyAuthor
|
||||
|
||||
|
@ -123,10 +123,27 @@ class RawQueryTests(TestCase):
|
|||
query = "SELECT * FROM raw_query_author WHERE first_name = %s"
|
||||
author = Author.objects.all()[2]
|
||||
params = [author.first_name]
|
||||
results = list(Author.objects.raw(query, params=params))
|
||||
qset = Author.objects.raw(query, params=params)
|
||||
results = list(qset)
|
||||
self.assertProcessed(Author, results, [author])
|
||||
self.assertNoAnnotations(results)
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertIsInstance(repr(qset), str)
|
||||
|
||||
@skipUnlessDBFeature('supports_paramstyle_pyformat')
|
||||
def testPyformatParams(self):
|
||||
"""
|
||||
Test passing optional query parameters
|
||||
"""
|
||||
query = "SELECT * FROM raw_query_author WHERE first_name = %(first)s"
|
||||
author = Author.objects.all()[2]
|
||||
params = {'first': author.first_name}
|
||||
qset = Author.objects.raw(query, params=params)
|
||||
results = list(qset)
|
||||
self.assertProcessed(Author, results, [author])
|
||||
self.assertNoAnnotations(results)
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertIsInstance(repr(qset), str)
|
||||
|
||||
def testManyToMany(self):
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue