Fixed #26500 -- Added SKIP LOCKED support to select_for_update().
Thanks Tim for the review.
This commit is contained in:
parent
46509cf13d
commit
b8e6e1b43b
|
@ -36,6 +36,7 @@ class BaseDatabaseFeatures(object):
|
||||||
allow_sliced_subqueries = True
|
allow_sliced_subqueries = True
|
||||||
has_select_for_update = False
|
has_select_for_update = False
|
||||||
has_select_for_update_nowait = False
|
has_select_for_update_nowait = False
|
||||||
|
has_select_for_update_skip_locked = False
|
||||||
|
|
||||||
supports_select_related = True
|
supports_select_related = True
|
||||||
|
|
||||||
|
|
|
@ -177,12 +177,14 @@ class BaseDatabaseOperations(object):
|
||||||
"""
|
"""
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def for_update_sql(self, nowait=False):
|
def for_update_sql(self, nowait=False, skip_locked=False):
|
||||||
"""
|
"""
|
||||||
Returns the FOR UPDATE SQL clause to lock rows for an update operation.
|
Returns the FOR UPDATE SQL clause to lock rows for an update operation.
|
||||||
"""
|
"""
|
||||||
if nowait:
|
if nowait:
|
||||||
return 'FOR UPDATE NOWAIT'
|
return 'FOR UPDATE NOWAIT'
|
||||||
|
elif skip_locked:
|
||||||
|
return 'FOR UPDATE SKIP LOCKED'
|
||||||
else:
|
else:
|
||||||
return 'FOR UPDATE'
|
return 'FOR UPDATE'
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
|
||||||
uses_savepoints = True
|
uses_savepoints = True
|
||||||
has_select_for_update = True
|
has_select_for_update = True
|
||||||
has_select_for_update_nowait = True
|
has_select_for_update_nowait = True
|
||||||
|
has_select_for_update_skip_locked = True
|
||||||
can_return_id_from_insert = True
|
can_return_id_from_insert = True
|
||||||
allow_sliced_subqueries = False
|
allow_sliced_subqueries = False
|
||||||
supports_subqueries_in_group_by = False
|
supports_subqueries_in_group_by = False
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from django.db.backends.base.features import BaseDatabaseFeatures
|
from django.db.backends.base.features import BaseDatabaseFeatures
|
||||||
from django.db.utils import InterfaceError
|
from django.db.utils import InterfaceError
|
||||||
|
from django.utils.functional import cached_property
|
||||||
|
|
||||||
|
|
||||||
class DatabaseFeatures(BaseDatabaseFeatures):
|
class DatabaseFeatures(BaseDatabaseFeatures):
|
||||||
|
@ -31,3 +32,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
|
||||||
greatest_least_ignores_nulls = True
|
greatest_least_ignores_nulls = True
|
||||||
can_clone_databases = True
|
can_clone_databases = True
|
||||||
supports_temporal_subtraction = True
|
supports_temporal_subtraction = True
|
||||||
|
|
||||||
|
@cached_property
|
||||||
|
def has_select_for_update_skip_locked(self):
|
||||||
|
return self.connection.pg_version >= 90500
|
||||||
|
|
|
@ -835,15 +835,18 @@ class QuerySet(object):
|
||||||
else:
|
else:
|
||||||
return self._filter_or_exclude(None, **filter_obj)
|
return self._filter_or_exclude(None, **filter_obj)
|
||||||
|
|
||||||
def select_for_update(self, nowait=False):
|
def select_for_update(self, nowait=False, skip_locked=False):
|
||||||
"""
|
"""
|
||||||
Returns a new QuerySet instance that will select objects with a
|
Returns a new QuerySet instance that will select objects with a
|
||||||
FOR UPDATE lock.
|
FOR UPDATE lock.
|
||||||
"""
|
"""
|
||||||
|
if nowait and skip_locked:
|
||||||
|
raise ValueError('The nowait option cannot be used with skip_locked.')
|
||||||
obj = self._clone()
|
obj = self._clone()
|
||||||
obj._for_write = True
|
obj._for_write = True
|
||||||
obj.query.select_for_update = True
|
obj.query.select_for_update = True
|
||||||
obj.query.select_for_update_nowait = nowait
|
obj.query.select_for_update_nowait = nowait
|
||||||
|
obj.query.select_for_update_skip_locked = skip_locked
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
def select_related(self, *fields):
|
def select_related(self, *fields):
|
||||||
|
|
|
@ -445,13 +445,16 @@ class SQLCompiler(object):
|
||||||
"select_for_update cannot be used outside of a transaction."
|
"select_for_update cannot be used outside of a transaction."
|
||||||
)
|
)
|
||||||
|
|
||||||
# If we've been asked for a NOWAIT query but the backend does
|
|
||||||
# not support it, raise a DatabaseError otherwise we could get
|
|
||||||
# an unexpected deadlock.
|
|
||||||
nowait = self.query.select_for_update_nowait
|
nowait = self.query.select_for_update_nowait
|
||||||
|
skip_locked = self.query.select_for_update_skip_locked
|
||||||
|
# If we've been asked for a NOWAIT/SKIP LOCKED query but the
|
||||||
|
# backend does not support it, raise a DatabaseError otherwise
|
||||||
|
# we could get an unexpected deadlock.
|
||||||
if nowait and not self.connection.features.has_select_for_update_nowait:
|
if nowait and not self.connection.features.has_select_for_update_nowait:
|
||||||
raise DatabaseError('NOWAIT is not supported on this database backend.')
|
raise DatabaseError('NOWAIT is not supported on this database backend.')
|
||||||
result.append(self.connection.ops.for_update_sql(nowait=nowait))
|
elif skip_locked and not self.connection.features.has_select_for_update_skip_locked:
|
||||||
|
raise DatabaseError('SKIP LOCKED is not supported on this database backend.')
|
||||||
|
result.append(self.connection.ops.for_update_sql(nowait=nowait, skip_locked=skip_locked))
|
||||||
|
|
||||||
return ' '.join(result), tuple(params)
|
return ' '.join(result), tuple(params)
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -167,6 +167,7 @@ class Query(object):
|
||||||
self.distinct_fields = []
|
self.distinct_fields = []
|
||||||
self.select_for_update = False
|
self.select_for_update = False
|
||||||
self.select_for_update_nowait = False
|
self.select_for_update_nowait = False
|
||||||
|
self.select_for_update_skip_locked = False
|
||||||
|
|
||||||
self.select_related = False
|
self.select_related = False
|
||||||
# Arbitrary limit for select_related to prevents infinite recursion.
|
# Arbitrary limit for select_related to prevents infinite recursion.
|
||||||
|
@ -286,6 +287,7 @@ class Query(object):
|
||||||
obj.distinct_fields = self.distinct_fields[:]
|
obj.distinct_fields = self.distinct_fields[:]
|
||||||
obj.select_for_update = self.select_for_update
|
obj.select_for_update = self.select_for_update
|
||||||
obj.select_for_update_nowait = self.select_for_update_nowait
|
obj.select_for_update_nowait = self.select_for_update_nowait
|
||||||
|
obj.select_for_update_skip_locked = self.select_for_update_skip_locked
|
||||||
obj.select_related = self.select_related
|
obj.select_related = self.select_related
|
||||||
obj.values_select = self.values_select[:]
|
obj.values_select = self.values_select[:]
|
||||||
obj._annotations = self._annotations.copy() if self._annotations is not None else None
|
obj._annotations = self._annotations.copy() if self._annotations is not None else None
|
||||||
|
|
|
@ -569,9 +569,9 @@ both MySQL and Django will attempt to convert the values from UTC to local time.
|
||||||
Row locking with ``QuerySet.select_for_update()``
|
Row locking with ``QuerySet.select_for_update()``
|
||||||
-------------------------------------------------
|
-------------------------------------------------
|
||||||
|
|
||||||
MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
|
MySQL does not support the ``NOWAIT`` and ``SKIP LOCKED`` options to the
|
||||||
statement. If ``select_for_update()`` is used with ``nowait=True`` then a
|
``SELECT ... FOR UPDATE`` statement. If ``select_for_update()`` is used with
|
||||||
``DatabaseError`` will be raised.
|
``nowait=True`` or ``skip_locked=True`` then a ``DatabaseError`` will be raised.
|
||||||
|
|
||||||
Automatic typecasting can cause unexpected results
|
Automatic typecasting can cause unexpected results
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
|
|
|
@ -1528,7 +1528,7 @@ For example::
|
||||||
``select_for_update()``
|
``select_for_update()``
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
.. method:: select_for_update(nowait=False)
|
.. method:: select_for_update(nowait=False, skip_locked=False)
|
||||||
|
|
||||||
Returns a queryset that will lock rows until the end of the transaction,
|
Returns a queryset that will lock rows until the end of the transaction,
|
||||||
generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
|
generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
|
||||||
|
@ -1546,16 +1546,19 @@ selected rows, the query will block until the lock is released. If this is
|
||||||
not the behavior you want, call ``select_for_update(nowait=True)``. This will
|
not the behavior you want, call ``select_for_update(nowait=True)``. This will
|
||||||
make the call non-blocking. If a conflicting lock is already acquired by
|
make the call non-blocking. If a conflicting lock is already acquired by
|
||||||
another transaction, :exc:`~django.db.DatabaseError` will be raised when the
|
another transaction, :exc:`~django.db.DatabaseError` will be raised when the
|
||||||
queryset is evaluated.
|
queryset is evaluated. You can also ignore locked rows by using
|
||||||
|
``select_for_update(skip_locked=True)`` instead. The ``nowait`` and
|
||||||
|
``skip_locked`` are mutually exclusive and attempts to call
|
||||||
|
``select_for_update()`` with both options enabled will result in a
|
||||||
|
:exc:`ValueError`.
|
||||||
|
|
||||||
Currently, the ``postgresql``, ``oracle``, and ``mysql`` database
|
Currently, the ``postgresql``, ``oracle``, and ``mysql`` database
|
||||||
backends support ``select_for_update()``. However, MySQL has no support for the
|
backends support ``select_for_update()``. However, MySQL doesn't support the
|
||||||
``nowait`` argument. Obviously, users of external third-party backends should
|
``nowait`` and ``skip_locked`` arguments.
|
||||||
check with their backend's documentation for specifics in those cases.
|
|
||||||
|
|
||||||
Passing ``nowait=True`` to ``select_for_update()`` using database backends that
|
Passing ``nowait=True`` or ``skip_locked=True`` to ``select_for_update()``
|
||||||
do not support ``nowait``, such as MySQL, will cause a
|
using database backends that do not support these options, such as MySQL, will
|
||||||
:exc:`~django.db.DatabaseError` to be raised. This is in order to prevent code
|
cause a :exc:`~django.db.DatabaseError` to be raised. This prevents code from
|
||||||
unexpectedly blocking.
|
unexpectedly blocking.
|
||||||
|
|
||||||
Evaluating a queryset with ``select_for_update()`` in autocommit mode on
|
Evaluating a queryset with ``select_for_update()`` in autocommit mode on
|
||||||
|
@ -1580,6 +1583,10 @@ raised if ``select_for_update()`` is used in autocommit mode.
|
||||||
``select_for_update()`` you should use
|
``select_for_update()`` you should use
|
||||||
:class:`~django.test.TransactionTestCase`.
|
:class:`~django.test.TransactionTestCase`.
|
||||||
|
|
||||||
|
.. versionchanged:: 1.11
|
||||||
|
|
||||||
|
The ``skip_locked`` argument was added.
|
||||||
|
|
||||||
``raw()``
|
``raw()``
|
||||||
~~~~~~~~~
|
~~~~~~~~~
|
||||||
|
|
||||||
|
|
|
@ -150,7 +150,9 @@ CSRF
|
||||||
Database backends
|
Database backends
|
||||||
~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
* ...
|
* Added the ``skip_locked`` argument to :meth:`.QuerySet.select_for_update()`
|
||||||
|
on PostgreSQL 9.5+ and Oracle to execute queries with
|
||||||
|
``FOR UPDATE SKIP LOCKED``.
|
||||||
|
|
||||||
Email
|
Email
|
||||||
~~~~~
|
~~~~~
|
||||||
|
@ -297,6 +299,9 @@ Database backend API
|
||||||
support the :lookup:`time` lookup. It accepts a ``field_name`` and ``tzname``
|
support the :lookup:`time` lookup. It accepts a ``field_name`` and ``tzname``
|
||||||
arguments and returns the SQL necessary to cast a datetime value to time value.
|
arguments and returns the SQL necessary to cast a datetime value to time value.
|
||||||
|
|
||||||
|
* To enable ``FOR UPDATE SKIP LOCKED`` support, set
|
||||||
|
``DatabaseFeatures.has_select_for_update_skip_locked = True``.
|
||||||
|
|
||||||
Dropped support for PostgreSQL 9.2 and PostGIS 2.0
|
Dropped support for PostgreSQL 9.2 and PostGIS 2.0
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -52,10 +52,10 @@ class SelectForUpdateTests(TransactionTestCase):
|
||||||
self.new_connection.rollback()
|
self.new_connection.rollback()
|
||||||
self.new_connection.set_autocommit(True)
|
self.new_connection.set_autocommit(True)
|
||||||
|
|
||||||
def has_for_update_sql(self, queries, nowait=False):
|
def has_for_update_sql(self, queries, **kwargs):
|
||||||
# Examine the SQL that was executed to determine whether it
|
# Examine the SQL that was executed to determine whether it
|
||||||
# contains the 'SELECT..FOR UPDATE' stanza.
|
# contains the 'SELECT..FOR UPDATE' stanza.
|
||||||
for_update_sql = connection.ops.for_update_sql(nowait)
|
for_update_sql = connection.ops.for_update_sql(**kwargs)
|
||||||
return any(for_update_sql in query['sql'] for query in queries)
|
return any(for_update_sql in query['sql'] for query in queries)
|
||||||
|
|
||||||
@skipUnlessDBFeature('has_select_for_update')
|
@skipUnlessDBFeature('has_select_for_update')
|
||||||
|
@ -78,6 +78,16 @@ class SelectForUpdateTests(TransactionTestCase):
|
||||||
list(Person.objects.all().select_for_update(nowait=True))
|
list(Person.objects.all().select_for_update(nowait=True))
|
||||||
self.assertTrue(self.has_for_update_sql(ctx.captured_queries, nowait=True))
|
self.assertTrue(self.has_for_update_sql(ctx.captured_queries, nowait=True))
|
||||||
|
|
||||||
|
@skipUnlessDBFeature('has_select_for_update_skip_locked')
|
||||||
|
def test_for_update_sql_generated_skip_locked(self):
|
||||||
|
"""
|
||||||
|
Test that the backend's FOR UPDATE SKIP LOCKED variant appears in
|
||||||
|
generated SQL when select_for_update is invoked.
|
||||||
|
"""
|
||||||
|
with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
|
||||||
|
list(Person.objects.all().select_for_update(skip_locked=True))
|
||||||
|
self.assertTrue(self.has_for_update_sql(ctx.captured_queries, skip_locked=True))
|
||||||
|
|
||||||
@skipUnlessDBFeature('has_select_for_update_nowait')
|
@skipUnlessDBFeature('has_select_for_update_nowait')
|
||||||
def test_nowait_raises_error_on_block(self):
|
def test_nowait_raises_error_on_block(self):
|
||||||
"""
|
"""
|
||||||
|
@ -99,6 +109,25 @@ class SelectForUpdateTests(TransactionTestCase):
|
||||||
self.end_blocking_transaction()
|
self.end_blocking_transaction()
|
||||||
self.assertIsInstance(status[-1], DatabaseError)
|
self.assertIsInstance(status[-1], DatabaseError)
|
||||||
|
|
||||||
|
@skipUnlessDBFeature('has_select_for_update_skip_locked')
|
||||||
|
def test_skip_locked_skips_locked_rows(self):
|
||||||
|
"""
|
||||||
|
If skip_locked is specified, the locked row is skipped resulting in
|
||||||
|
Person.DoesNotExist.
|
||||||
|
"""
|
||||||
|
self.start_blocking_transaction()
|
||||||
|
status = []
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=self.run_select_for_update,
|
||||||
|
args=(status,),
|
||||||
|
kwargs={'skip_locked': True},
|
||||||
|
)
|
||||||
|
thread.start()
|
||||||
|
time.sleep(1)
|
||||||
|
thread.join()
|
||||||
|
self.end_blocking_transaction()
|
||||||
|
self.assertIsInstance(status[-1], Person.DoesNotExist)
|
||||||
|
|
||||||
@skipIfDBFeature('has_select_for_update_nowait')
|
@skipIfDBFeature('has_select_for_update_nowait')
|
||||||
@skipUnlessDBFeature('has_select_for_update')
|
@skipUnlessDBFeature('has_select_for_update')
|
||||||
def test_unsupported_nowait_raises_error(self):
|
def test_unsupported_nowait_raises_error(self):
|
||||||
|
@ -110,6 +139,17 @@ class SelectForUpdateTests(TransactionTestCase):
|
||||||
with self.assertRaises(DatabaseError):
|
with self.assertRaises(DatabaseError):
|
||||||
list(Person.objects.all().select_for_update(nowait=True))
|
list(Person.objects.all().select_for_update(nowait=True))
|
||||||
|
|
||||||
|
@skipIfDBFeature('has_select_for_update_skip_locked')
|
||||||
|
@skipUnlessDBFeature('has_select_for_update')
|
||||||
|
def test_unsupported_skip_locked_raises_error(self):
|
||||||
|
"""
|
||||||
|
DatabaseError is raised if a SELECT...FOR UPDATE SKIP LOCKED is run on
|
||||||
|
a database backend that supports FOR UPDATE but not SKIP LOCKED.
|
||||||
|
"""
|
||||||
|
with self.assertRaisesMessage(DatabaseError, 'SKIP LOCKED is not supported on this database backend.'):
|
||||||
|
with transaction.atomic():
|
||||||
|
Person.objects.select_for_update(skip_locked=True).get()
|
||||||
|
|
||||||
@skipUnlessDBFeature('has_select_for_update')
|
@skipUnlessDBFeature('has_select_for_update')
|
||||||
def test_for_update_requires_transaction(self):
|
def test_for_update_requires_transaction(self):
|
||||||
"""
|
"""
|
||||||
|
@ -130,7 +170,7 @@ class SelectForUpdateTests(TransactionTestCase):
|
||||||
with self.assertRaises(transaction.TransactionManagementError):
|
with self.assertRaises(transaction.TransactionManagementError):
|
||||||
list(people)
|
list(people)
|
||||||
|
|
||||||
def run_select_for_update(self, status, nowait=False):
|
def run_select_for_update(self, status, **kwargs):
|
||||||
"""
|
"""
|
||||||
Utility method that runs a SELECT FOR UPDATE against all
|
Utility method that runs a SELECT FOR UPDATE against all
|
||||||
Person instances. After the select_for_update, it attempts
|
Person instances. After the select_for_update, it attempts
|
||||||
|
@ -143,12 +183,10 @@ class SelectForUpdateTests(TransactionTestCase):
|
||||||
# We need to enter transaction management again, as this is done on
|
# We need to enter transaction management again, as this is done on
|
||||||
# per-thread basis
|
# per-thread basis
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
people = list(
|
person = Person.objects.select_for_update(**kwargs).get()
|
||||||
Person.objects.all().select_for_update(nowait=nowait)
|
person.name = 'Fred'
|
||||||
)
|
person.save()
|
||||||
people[0].name = 'Fred'
|
except (DatabaseError, Person.DoesNotExist) as e:
|
||||||
people[0].save()
|
|
||||||
except DatabaseError as e:
|
|
||||||
status.append(e)
|
status.append(e)
|
||||||
finally:
|
finally:
|
||||||
# This method is run in a separate thread. It uses its own
|
# This method is run in a separate thread. It uses its own
|
||||||
|
@ -248,3 +286,7 @@ class SelectForUpdateTests(TransactionTestCase):
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
person = Person.objects.select_for_update().get(name='Reinhardt')
|
person = Person.objects.select_for_update().get(name='Reinhardt')
|
||||||
self.assertEqual(person.name, 'Reinhardt')
|
self.assertEqual(person.name, 'Reinhardt')
|
||||||
|
|
||||||
|
def test_nowait_and_skip_locked(self):
|
||||||
|
with self.assertRaisesMessage(ValueError, 'The nowait option cannot be used with skip_locked.'):
|
||||||
|
Person.objects.select_for_update(nowait=True, skip_locked=True)
|
||||||
|
|
Loading…
Reference in New Issue