Fixed #2705: added a `select_for_update()` clause to querysets.
A number of people worked on this patch over the years -- Hawkeye, Colin Grady, KBS, sakyamuni, anih, jdemoor, and Issak Kelly. Thanks to them all, and apologies if I missed anyone. Special thanks to Dan Fairs for picking it up again at the end and seeing this through to commit. git-svn-id: http://code.djangoproject.com/svn/django/trunk@16058 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
parent
99c1794427
commit
8f0f73c7b8
1
AUTHORS
1
AUTHORS
|
@ -168,6 +168,7 @@ answer newbie questions, and generally made Django that much better:
|
||||||
eriks@win.tue.nl
|
eriks@win.tue.nl
|
||||||
Tomáš Ehrlich <tomas.ehrlich@gmail.com>
|
Tomáš Ehrlich <tomas.ehrlich@gmail.com>
|
||||||
Dirk Eschler <dirk.eschler@gmx.net>
|
Dirk Eschler <dirk.eschler@gmx.net>
|
||||||
|
Dan Fairs <dan@fezconsulting.com>
|
||||||
Marc Fargas <telenieko@telenieko.com>
|
Marc Fargas <telenieko@telenieko.com>
|
||||||
Szilveszter Farkas <szilveszter.farkas@gmail.com>
|
Szilveszter Farkas <szilveszter.farkas@gmail.com>
|
||||||
Grigory Fateyev <greg@dial.com.ru>
|
Grigory Fateyev <greg@dial.com.ru>
|
||||||
|
|
|
@ -279,6 +279,8 @@ class BaseDatabaseFeatures(object):
|
||||||
# integer primary keys.
|
# integer primary keys.
|
||||||
related_fields_match_type = False
|
related_fields_match_type = False
|
||||||
allow_sliced_subqueries = True
|
allow_sliced_subqueries = True
|
||||||
|
has_select_for_update = False
|
||||||
|
has_select_for_update_nowait = False
|
||||||
|
|
||||||
# Does the default test database allow multiple connections?
|
# Does the default test database allow multiple connections?
|
||||||
# Usually an indication that the test database is in-memory
|
# Usually an indication that the test database is in-memory
|
||||||
|
@ -476,6 +478,15 @@ class BaseDatabaseOperations(object):
|
||||||
"""
|
"""
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
def for_update_sql(self, nowait=False):
|
||||||
|
"""
|
||||||
|
Returns the FOR UPDATE SQL clause to lock rows for an update operation.
|
||||||
|
"""
|
||||||
|
if nowait:
|
||||||
|
return 'FOR UPDATE NOWAIT'
|
||||||
|
else:
|
||||||
|
return 'FOR UPDATE'
|
||||||
|
|
||||||
def fulltext_search_sql(self, field_name):
|
def fulltext_search_sql(self, field_name):
|
||||||
"""
|
"""
|
||||||
Returns the SQL WHERE clause to use in order to perform a full-text
|
Returns the SQL WHERE clause to use in order to perform a full-text
|
||||||
|
|
|
@ -124,6 +124,8 @@ class DatabaseFeatures(BaseDatabaseFeatures):
|
||||||
allows_group_by_pk = True
|
allows_group_by_pk = True
|
||||||
related_fields_match_type = True
|
related_fields_match_type = True
|
||||||
allow_sliced_subqueries = False
|
allow_sliced_subqueries = False
|
||||||
|
has_select_for_update = True
|
||||||
|
has_select_for_update_nowait = False
|
||||||
supports_forward_references = False
|
supports_forward_references = False
|
||||||
supports_long_model_names = False
|
supports_long_model_names = False
|
||||||
supports_microsecond_precision = False
|
supports_microsecond_precision = False
|
||||||
|
|
|
@ -70,6 +70,8 @@ class DatabaseFeatures(BaseDatabaseFeatures):
|
||||||
needs_datetime_string_cast = False
|
needs_datetime_string_cast = False
|
||||||
interprets_empty_strings_as_nulls = True
|
interprets_empty_strings_as_nulls = True
|
||||||
uses_savepoints = True
|
uses_savepoints = True
|
||||||
|
has_select_for_update = True
|
||||||
|
has_select_for_update_nowait = True
|
||||||
can_return_id_from_insert = True
|
can_return_id_from_insert = True
|
||||||
allow_sliced_subqueries = False
|
allow_sliced_subqueries = False
|
||||||
supports_subqueries_in_group_by = False
|
supports_subqueries_in_group_by = False
|
||||||
|
|
|
@ -70,6 +70,9 @@ class DatabaseFeatures(BaseDatabaseFeatures):
|
||||||
requires_rollback_on_dirty_transaction = True
|
requires_rollback_on_dirty_transaction = True
|
||||||
has_real_datatype = True
|
has_real_datatype = True
|
||||||
can_defer_constraint_checks = True
|
can_defer_constraint_checks = True
|
||||||
|
has_select_for_update = True
|
||||||
|
has_select_for_update_nowait = True
|
||||||
|
|
||||||
|
|
||||||
class DatabaseWrapper(BaseDatabaseWrapper):
|
class DatabaseWrapper(BaseDatabaseWrapper):
|
||||||
vendor = 'postgresql'
|
vendor = 'postgresql'
|
||||||
|
|
|
@ -164,6 +164,9 @@ class Manager(object):
|
||||||
def order_by(self, *args, **kwargs):
|
def order_by(self, *args, **kwargs):
|
||||||
return self.get_query_set().order_by(*args, **kwargs)
|
return self.get_query_set().order_by(*args, **kwargs)
|
||||||
|
|
||||||
|
def select_for_update(self, *args, **kwargs):
|
||||||
|
return self.get_query_set().select_for_update(*args, **kwargs)
|
||||||
|
|
||||||
def select_related(self, *args, **kwargs):
|
def select_related(self, *args, **kwargs):
|
||||||
return self.get_query_set().select_related(*args, **kwargs)
|
return self.get_query_set().select_related(*args, **kwargs)
|
||||||
|
|
||||||
|
|
|
@ -435,6 +435,7 @@ class QuerySet(object):
|
||||||
del_query._for_write = True
|
del_query._for_write = True
|
||||||
|
|
||||||
# Disable non-supported fields.
|
# Disable non-supported fields.
|
||||||
|
del_query.query.select_for_update = False
|
||||||
del_query.query.select_related = False
|
del_query.query.select_related = False
|
||||||
del_query.query.clear_ordering()
|
del_query.query.clear_ordering()
|
||||||
|
|
||||||
|
@ -583,6 +584,18 @@ class QuerySet(object):
|
||||||
else:
|
else:
|
||||||
return self._filter_or_exclude(None, **filter_obj)
|
return self._filter_or_exclude(None, **filter_obj)
|
||||||
|
|
||||||
|
def select_for_update(self, **kwargs):
|
||||||
|
"""
|
||||||
|
Returns a new QuerySet instance that will select objects with a
|
||||||
|
FOR UPDATE lock.
|
||||||
|
"""
|
||||||
|
# Default to false for nowait
|
||||||
|
nowait = kwargs.pop('nowait', False)
|
||||||
|
obj = self._clone()
|
||||||
|
obj.query.select_for_update = True
|
||||||
|
obj.query.select_for_update_nowait = nowait
|
||||||
|
return obj
|
||||||
|
|
||||||
def select_related(self, *fields, **kwargs):
|
def select_related(self, *fields, **kwargs):
|
||||||
"""
|
"""
|
||||||
Returns a new QuerySet instance that will select related objects.
|
Returns a new QuerySet instance that will select related objects.
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
from django.core.exceptions import FieldError
|
from django.core.exceptions import FieldError
|
||||||
from django.db import connections
|
from django.db import connections
|
||||||
|
from django.db import transaction
|
||||||
from django.db.backends.util import truncate_name
|
from django.db.backends.util import truncate_name
|
||||||
from django.db.models.sql.constants import *
|
from django.db.models.sql.constants import *
|
||||||
from django.db.models.sql.datastructures import EmptyResultSet
|
from django.db.models.sql.datastructures import EmptyResultSet
|
||||||
from django.db.models.sql.expressions import SQLEvaluator
|
from django.db.models.sql.expressions import SQLEvaluator
|
||||||
from django.db.models.sql.query import get_proxied_model, get_order_dir, \
|
from django.db.models.sql.query import get_proxied_model, get_order_dir, \
|
||||||
select_related_descend, Query
|
select_related_descend, Query
|
||||||
|
from django.db.utils import DatabaseError
|
||||||
|
|
||||||
class SQLCompiler(object):
|
class SQLCompiler(object):
|
||||||
def __init__(self, query, connection, using):
|
def __init__(self, query, connection, using):
|
||||||
|
@ -117,6 +119,14 @@ class SQLCompiler(object):
|
||||||
result.append('LIMIT %d' % val)
|
result.append('LIMIT %d' % val)
|
||||||
result.append('OFFSET %d' % self.query.low_mark)
|
result.append('OFFSET %d' % self.query.low_mark)
|
||||||
|
|
||||||
|
if self.query.select_for_update and self.connection.features.has_select_for_update:
|
||||||
|
# If we've been asked for a NOWAIT query but the backend does not support it,
|
||||||
|
# raise a DatabaseError otherwise we could get an unexpected deadlock.
|
||||||
|
nowait = self.query.select_for_update_nowait
|
||||||
|
if nowait and not self.connection.features.has_select_for_update_nowait:
|
||||||
|
raise DatabaseError('NOWAIT is not supported on this database backend.')
|
||||||
|
result.append(self.connection.ops.for_update_sql(nowait=nowait))
|
||||||
|
|
||||||
return ' '.join(result), tuple(params)
|
return ' '.join(result), tuple(params)
|
||||||
|
|
||||||
def as_nested_sql(self):
|
def as_nested_sql(self):
|
||||||
|
@ -677,6 +687,11 @@ class SQLCompiler(object):
|
||||||
resolve_columns = hasattr(self, 'resolve_columns')
|
resolve_columns = hasattr(self, 'resolve_columns')
|
||||||
fields = None
|
fields = None
|
||||||
has_aggregate_select = bool(self.query.aggregate_select)
|
has_aggregate_select = bool(self.query.aggregate_select)
|
||||||
|
# Set transaction dirty if we're using SELECT FOR UPDATE to ensure
|
||||||
|
# a subsequent commit/rollback is executed, so any database locks
|
||||||
|
# are released.
|
||||||
|
if self.query.select_for_update and transaction.is_managed(self.using):
|
||||||
|
transaction.set_dirty(self.using)
|
||||||
for rows in self.execute_sql(MULTI):
|
for rows in self.execute_sql(MULTI):
|
||||||
for row in rows:
|
for row in rows:
|
||||||
if resolve_columns:
|
if resolve_columns:
|
||||||
|
|
|
@ -125,6 +125,8 @@ class Query(object):
|
||||||
self.order_by = []
|
self.order_by = []
|
||||||
self.low_mark, self.high_mark = 0, None # Used for offset/limit
|
self.low_mark, self.high_mark = 0, None # Used for offset/limit
|
||||||
self.distinct = False
|
self.distinct = False
|
||||||
|
self.select_for_update = False
|
||||||
|
self.select_for_update_nowait = False
|
||||||
self.select_related = False
|
self.select_related = False
|
||||||
self.related_select_cols = []
|
self.related_select_cols = []
|
||||||
|
|
||||||
|
@ -254,6 +256,8 @@ class Query(object):
|
||||||
obj.order_by = self.order_by[:]
|
obj.order_by = self.order_by[:]
|
||||||
obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
|
obj.low_mark, obj.high_mark = self.low_mark, self.high_mark
|
||||||
obj.distinct = self.distinct
|
obj.distinct = self.distinct
|
||||||
|
obj.select_for_update = self.select_for_update
|
||||||
|
obj.select_for_update_nowait = self.select_for_update_nowait
|
||||||
obj.select_related = self.select_related
|
obj.select_related = self.select_related
|
||||||
obj.related_select_cols = []
|
obj.related_select_cols = []
|
||||||
obj.aggregates = copy.deepcopy(self.aggregates, memo=memo)
|
obj.aggregates = copy.deepcopy(self.aggregates, memo=memo)
|
||||||
|
@ -360,6 +364,7 @@ class Query(object):
|
||||||
|
|
||||||
query.clear_ordering(True)
|
query.clear_ordering(True)
|
||||||
query.clear_limits()
|
query.clear_limits()
|
||||||
|
query.select_for_update = False
|
||||||
query.select_related = False
|
query.select_related = False
|
||||||
query.related_select_cols = []
|
query.related_select_cols = []
|
||||||
query.related_select_fields = []
|
query.related_select_fields = []
|
||||||
|
|
|
@ -359,6 +359,13 @@ store a timezone-aware ``time`` or ``datetime`` to a
|
||||||
:class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField`
|
:class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField`
|
||||||
respectively, a ``ValueError`` is raised rather than truncating data.
|
respectively, a ``ValueError`` is raised rather than truncating data.
|
||||||
|
|
||||||
|
Row locking with ``QuerySet.select_for_update()``
|
||||||
|
-------------------------------------------------
|
||||||
|
|
||||||
|
MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE``
|
||||||
|
statement. If ``select_for_update()`` is used with ``nowait=True`` then a
|
||||||
|
``DatabaseError`` will be raised.
|
||||||
|
|
||||||
.. _sqlite-notes:
|
.. _sqlite-notes:
|
||||||
|
|
||||||
SQLite notes
|
SQLite notes
|
||||||
|
@ -493,6 +500,12 @@ If you're getting this error, you can solve it by:
|
||||||
This will simply make SQLite wait a bit longer before throwing "database
|
This will simply make SQLite wait a bit longer before throwing "database
|
||||||
is locked" errors; it won't really do anything to solve them.
|
is locked" errors; it won't really do anything to solve them.
|
||||||
|
|
||||||
|
``QuerySet.select_for_update()`` not supported
|
||||||
|
----------------------------------------------
|
||||||
|
|
||||||
|
SQLite does not support the ``SELECT ... FOR UPDATE`` syntax. Calling it will
|
||||||
|
have no effect.
|
||||||
|
|
||||||
.. _oracle-notes:
|
.. _oracle-notes:
|
||||||
|
|
||||||
Oracle notes
|
Oracle notes
|
||||||
|
|
|
@ -966,6 +966,46 @@ For example::
|
||||||
# queries the database with the 'backup' alias
|
# queries the database with the 'backup' alias
|
||||||
>>> Entry.objects.using('backup')
|
>>> Entry.objects.using('backup')
|
||||||
|
|
||||||
|
select_for_update
|
||||||
|
~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
.. method:: select_for_update(nowait=False)
|
||||||
|
|
||||||
|
.. versionadded:: 1.4
|
||||||
|
|
||||||
|
Returns a queryset that will lock rows until the end of the transaction,
|
||||||
|
generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases.
|
||||||
|
|
||||||
|
For example::
|
||||||
|
|
||||||
|
entries = Entry.objects.select_for_update().filter(author=request.user)
|
||||||
|
|
||||||
|
All matched entries will be locked until the end of the transaction block,
|
||||||
|
meaning that other transactions will be prevented from changing or acquiring
|
||||||
|
locks on them.
|
||||||
|
|
||||||
|
Usually, if another transaction has already acquired a lock on one of the
|
||||||
|
selected rows, the query will block until the lock is released. If this is
|
||||||
|
not the behaviour you want, call ``select_for_update(nowait=True)``. This will
|
||||||
|
make the call non-blocking. If a conflicting lock is already acquired by
|
||||||
|
another transaction, ``django.db.utils.DatabaseError`` will be raised when
|
||||||
|
the queryset is evaluated.
|
||||||
|
|
||||||
|
Note that using ``select_for_update`` will cause the current transaction to be
|
||||||
|
set dirty, if under transaction management. This is to ensure that Django issues
|
||||||
|
a ``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR
|
||||||
|
UPDATE``.
|
||||||
|
|
||||||
|
Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql``
|
||||||
|
database backends support ``select_for_update()``. However, MySQL has no
|
||||||
|
support for the ``nowait`` argument.
|
||||||
|
|
||||||
|
Passing ``nowait=True`` to ``select_for_update`` using database backends that
|
||||||
|
do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be
|
||||||
|
raised. This is in order to prevent code unexpectedly blocking.
|
||||||
|
|
||||||
|
Using ``select_for_update`` on backends which do not support
|
||||||
|
``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect.
|
||||||
|
|
||||||
Methods that do not return QuerySets
|
Methods that do not return QuerySets
|
||||||
------------------------------------
|
------------------------------------
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
#
|
|
@ -0,0 +1,4 @@
|
||||||
|
from django.db import models
|
||||||
|
|
||||||
|
class Person(models.Model):
|
||||||
|
name = models.CharField(max_length=30)
|
|
@ -0,0 +1,262 @@
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
from django.conf import settings
|
||||||
|
from django.db import transaction, connection
|
||||||
|
from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
|
||||||
|
from django.test import (TransactionTestCase, skipIfDBFeature,
|
||||||
|
skipUnlessDBFeature)
|
||||||
|
from django.utils.functional import wraps
|
||||||
|
from django.utils import unittest
|
||||||
|
|
||||||
|
from models import Person
|
||||||
|
|
||||||
|
try:
|
||||||
|
import threading
|
||||||
|
def requires_threading(func):
|
||||||
|
return func
|
||||||
|
except ImportError:
|
||||||
|
# Note we can't use dummy_threading here, as our tests will actually
|
||||||
|
# block. We just have to skip the test completely.
|
||||||
|
def requires_threading(func):
|
||||||
|
@wraps(func)
|
||||||
|
def wrapped(*args, **kw):
|
||||||
|
raise unittest.SkipTest('threading required')
|
||||||
|
|
||||||
|
class SelectForUpdateTests(TransactionTestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
transaction.enter_transaction_management(True)
|
||||||
|
transaction.managed(True)
|
||||||
|
self.person = Person.objects.create(name='Reinhardt')
|
||||||
|
|
||||||
|
# We have to commit here so that code in run_select_for_update can
|
||||||
|
# see this data.
|
||||||
|
transaction.commit()
|
||||||
|
|
||||||
|
# We need another database connection to test that one connection
|
||||||
|
# issuing a SELECT ... FOR UPDATE will block.
|
||||||
|
new_connections = ConnectionHandler(settings.DATABASES)
|
||||||
|
self.new_connection = new_connections[DEFAULT_DB_ALIAS]
|
||||||
|
|
||||||
|
# We need to set settings.DEBUG to True so we can capture
|
||||||
|
# the output SQL to examine.
|
||||||
|
self._old_debug = settings.DEBUG
|
||||||
|
settings.DEBUG = True
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
try:
|
||||||
|
# We don't really care if this fails - some of the tests will set
|
||||||
|
# this in the course of their run.
|
||||||
|
transaction.managed(False)
|
||||||
|
transaction.leave_transaction_management()
|
||||||
|
except transaction.TransactionManagementError:
|
||||||
|
pass
|
||||||
|
self.new_connection.close()
|
||||||
|
settings.DEBUG = self._old_debug
|
||||||
|
try:
|
||||||
|
self.end_blocking_transaction()
|
||||||
|
except (DatabaseError, AttributeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def start_blocking_transaction(self):
|
||||||
|
# Start a blocking transaction. At some point,
|
||||||
|
# end_blocking_transaction() should be called.
|
||||||
|
self.cursor = self.new_connection.cursor()
|
||||||
|
sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
|
||||||
|
'db_table': Person._meta.db_table,
|
||||||
|
'for_update': self.new_connection.ops.for_update_sql(),
|
||||||
|
}
|
||||||
|
self.cursor.execute(sql, ())
|
||||||
|
result = self.cursor.fetchone()
|
||||||
|
|
||||||
|
def end_blocking_transaction(self):
|
||||||
|
# Roll back the blocking transaction.
|
||||||
|
self.new_connection._rollback()
|
||||||
|
|
||||||
|
def has_for_update_sql(self, tested_connection, nowait=False):
|
||||||
|
# Examine the SQL that was executed to determine whether it
|
||||||
|
# contains the 'SELECT..FOR UPDATE' stanza.
|
||||||
|
for_update_sql = tested_connection.ops.for_update_sql(nowait)
|
||||||
|
sql = tested_connection.queries[-1]['sql']
|
||||||
|
return bool(sql.find(for_update_sql) > -1)
|
||||||
|
|
||||||
|
def check_exc(self, exc):
|
||||||
|
self.failUnless(isinstance(exc, DatabaseError))
|
||||||
|
|
||||||
|
@skipUnlessDBFeature('has_select_for_update')
|
||||||
|
def test_for_update_sql_generated(self):
|
||||||
|
"""
|
||||||
|
Test that the backend's FOR UPDATE variant appears in
|
||||||
|
generated SQL when select_for_update is invoked.
|
||||||
|
"""
|
||||||
|
list(Person.objects.all().select_for_update())
|
||||||
|
self.assertTrue(self.has_for_update_sql(connection))
|
||||||
|
|
||||||
|
@skipUnlessDBFeature('has_select_for_update_nowait')
|
||||||
|
def test_for_update_sql_generated_nowait(self):
|
||||||
|
"""
|
||||||
|
Test that the backend's FOR UPDATE NOWAIT variant appears in
|
||||||
|
generated SQL when select_for_update is invoked.
|
||||||
|
"""
|
||||||
|
list(Person.objects.all().select_for_update(nowait=True))
|
||||||
|
self.assertTrue(self.has_for_update_sql(connection, nowait=True))
|
||||||
|
|
||||||
|
# In Python 2.6 beta and some final releases, exceptions raised in __len__
|
||||||
|
# are swallowed (Python issue 1242657), so these cases return an empty
|
||||||
|
# list, rather than raising an exception. Not a lot we can do about that,
|
||||||
|
# unfortunately, due to the way Python handles list() calls internally.
|
||||||
|
# Thus, we skip this test for Python 2.6.
|
||||||
|
@requires_threading
|
||||||
|
@skipUnlessDBFeature('has_select_for_update_nowait')
|
||||||
|
@unittest.skipIf(sys.version_info[:2] == (2, 6), "Python version is 2.6")
|
||||||
|
def test_nowait_raises_error_on_block(self):
|
||||||
|
"""
|
||||||
|
If nowait is specified, we expect an error to be raised rather
|
||||||
|
than blocking.
|
||||||
|
"""
|
||||||
|
self.start_blocking_transaction()
|
||||||
|
status = []
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=self.run_select_for_update,
|
||||||
|
args=(status,),
|
||||||
|
kwargs={'nowait': True},
|
||||||
|
)
|
||||||
|
|
||||||
|
thread.start()
|
||||||
|
time.sleep(1)
|
||||||
|
thread.join()
|
||||||
|
self.end_blocking_transaction()
|
||||||
|
self.check_exc(status[-1])
|
||||||
|
|
||||||
|
@skipIfDBFeature('has_select_for_update_nowait')
|
||||||
|
@skipUnlessDBFeature('has_select_for_update')
|
||||||
|
def test_unsupported_nowait_raises_error(self):
|
||||||
|
"""
|
||||||
|
If a SELECT...FOR UPDATE NOWAIT is run on a database backend
|
||||||
|
that supports FOR UPDATE but not NOWAIT, then we should find
|
||||||
|
that a DatabaseError is raised.
|
||||||
|
"""
|
||||||
|
self.assertRaises(
|
||||||
|
DatabaseError,
|
||||||
|
list,
|
||||||
|
Person.objects.all().select_for_update(nowait=True)
|
||||||
|
)
|
||||||
|
|
||||||
|
def run_select_for_update(self, status, nowait=False):
|
||||||
|
"""
|
||||||
|
Utility method that runs a SELECT FOR UPDATE against all
|
||||||
|
Person instances. After the select_for_update, it attempts
|
||||||
|
to update the name of the only record, save, and commit.
|
||||||
|
|
||||||
|
In general, this will be run in a separate thread.
|
||||||
|
"""
|
||||||
|
status.append('started')
|
||||||
|
try:
|
||||||
|
# We need to enter transaction management again, as this is done on
|
||||||
|
# per-thread basis
|
||||||
|
transaction.enter_transaction_management(True)
|
||||||
|
transaction.managed(True)
|
||||||
|
people = list(
|
||||||
|
Person.objects.all().select_for_update(nowait=nowait)
|
||||||
|
)
|
||||||
|
people[0].name = 'Fred'
|
||||||
|
people[0].save()
|
||||||
|
transaction.commit()
|
||||||
|
except DatabaseError, e:
|
||||||
|
status.append(e)
|
||||||
|
except Exception, e:
|
||||||
|
raise
|
||||||
|
|
||||||
|
@requires_threading
|
||||||
|
@skipUnlessDBFeature('has_select_for_update')
|
||||||
|
@skipUnlessDBFeature('supports_transactions')
|
||||||
|
def test_block(self):
|
||||||
|
"""
|
||||||
|
Check that a thread running a select_for_update that
|
||||||
|
accesses rows being touched by a similar operation
|
||||||
|
on another connection blocks correctly.
|
||||||
|
"""
|
||||||
|
# First, let's start the transaction in our thread.
|
||||||
|
self.start_blocking_transaction()
|
||||||
|
|
||||||
|
# Now, try it again using the ORM's select_for_update
|
||||||
|
# facility. Do this in a separate thread.
|
||||||
|
status = []
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=self.run_select_for_update, args=(status,)
|
||||||
|
)
|
||||||
|
|
||||||
|
# The thread should immediately block, but we'll sleep
|
||||||
|
# for a bit to make sure.
|
||||||
|
thread.start()
|
||||||
|
sanity_count = 0
|
||||||
|
while len(status) != 1 and sanity_count < 10:
|
||||||
|
sanity_count += 1
|
||||||
|
time.sleep(1)
|
||||||
|
if sanity_count >= 10:
|
||||||
|
raise ValueError, 'Thread did not run and block'
|
||||||
|
|
||||||
|
# Check the person hasn't been updated. Since this isn't
|
||||||
|
# using FOR UPDATE, it won't block.
|
||||||
|
p = Person.objects.get(pk=self.person.pk)
|
||||||
|
self.assertEqual('Reinhardt', p.name)
|
||||||
|
|
||||||
|
# When we end our blocking transaction, our thread should
|
||||||
|
# be able to continue.
|
||||||
|
self.end_blocking_transaction()
|
||||||
|
thread.join(5.0)
|
||||||
|
|
||||||
|
# Check the thread has finished. Assuming it has, we should
|
||||||
|
# find that it has updated the person's name.
|
||||||
|
self.failIf(thread.isAlive())
|
||||||
|
p = Person.objects.get(pk=self.person.pk)
|
||||||
|
self.assertEqual('Fred', p.name)
|
||||||
|
|
||||||
|
@requires_threading
|
||||||
|
@skipUnlessDBFeature('has_select_for_update')
|
||||||
|
def test_raw_lock_not_available(self):
|
||||||
|
"""
|
||||||
|
Check that running a raw query which can't obtain a FOR UPDATE lock
|
||||||
|
raises the correct exception
|
||||||
|
"""
|
||||||
|
self.start_blocking_transaction()
|
||||||
|
def raw(status):
|
||||||
|
try:
|
||||||
|
list(
|
||||||
|
Person.objects.raw(
|
||||||
|
'SELECT * FROM %s %s' % (
|
||||||
|
Person._meta.db_table,
|
||||||
|
connection.ops.for_update_sql(nowait=True)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except DatabaseError, e:
|
||||||
|
status.append(e)
|
||||||
|
status = []
|
||||||
|
thread = threading.Thread(target=raw, kwargs={'status': status})
|
||||||
|
thread.start()
|
||||||
|
time.sleep(1)
|
||||||
|
thread.join()
|
||||||
|
self.end_blocking_transaction()
|
||||||
|
self.check_exc(status[-1])
|
||||||
|
|
||||||
|
@skipUnlessDBFeature('has_select_for_update')
|
||||||
|
def test_transaction_dirty_managed(self):
|
||||||
|
""" Check that a select_for_update sets the transaction to be
|
||||||
|
dirty when executed under txn management. Setting the txn dirty
|
||||||
|
means that it will be either committed or rolled back by Django,
|
||||||
|
which will release any locks held by the SELECT FOR UPDATE.
|
||||||
|
"""
|
||||||
|
people = list(Person.objects.select_for_update())
|
||||||
|
self.assertTrue(transaction.is_dirty())
|
||||||
|
|
||||||
|
@skipUnlessDBFeature('has_select_for_update')
|
||||||
|
def test_transaction_not_dirty_unmanaged(self):
|
||||||
|
""" If we're not under txn management, the txn will never be
|
||||||
|
marked as dirty.
|
||||||
|
"""
|
||||||
|
transaction.managed(False)
|
||||||
|
transaction.leave_transaction_management()
|
||||||
|
people = list(Person.objects.select_for_update())
|
||||||
|
self.assertFalse(transaction.is_dirty())
|
Loading…
Reference in New Issue