From 8f0f73c7b8b110489a1a127cc47e3cabb0eea646 Mon Sep 17 00:00:00 2001 From: Jacob Kaplan-Moss Date: Wed, 20 Apr 2011 20:42:07 +0000 Subject: [PATCH] Fixed #2705: added a `select_for_update()` clause to querysets. A number of people worked on this patch over the years -- Hawkeye, Colin Grady, KBS, sakyamuni, anih, jdemoor, and Issak Kelly. Thanks to them all, and apologies if I missed anyone. Special thanks to Dan Fairs for picking it up again at the end and seeing this through to commit. git-svn-id: http://code.djangoproject.com/svn/django/trunk@16058 bcc190cf-cafb-0310-a4f2-bffc1f526a37 --- AUTHORS | 1 + django/db/backends/__init__.py | 11 + django/db/backends/mysql/base.py | 2 + django/db/backends/oracle/base.py | 2 + .../db/backends/postgresql_psycopg2/base.py | 3 + django/db/models/manager.py | 3 + django/db/models/query.py | 13 + django/db/models/sql/compiler.py | 15 + django/db/models/sql/query.py | 5 + docs/ref/databases.txt | 13 + docs/ref/models/querysets.txt | 40 +++ .../modeltests/select_for_update/__init__.py | 1 + tests/modeltests/select_for_update/models.py | 4 + tests/modeltests/select_for_update/tests.py | 262 ++++++++++++++++++ 14 files changed, 375 insertions(+) create mode 100644 tests/modeltests/select_for_update/__init__.py create mode 100644 tests/modeltests/select_for_update/models.py create mode 100644 tests/modeltests/select_for_update/tests.py diff --git a/AUTHORS b/AUTHORS index a19e49b10cb..a6e4aa2eedb 100644 --- a/AUTHORS +++ b/AUTHORS @@ -168,6 +168,7 @@ answer newbie questions, and generally made Django that much better: eriks@win.tue.nl Tomáš Ehrlich Dirk Eschler + Dan Fairs Marc Fargas Szilveszter Farkas Grigory Fateyev diff --git a/django/db/backends/__init__.py b/django/db/backends/__init__.py index 4b845d51357..b64fb01b039 100644 --- a/django/db/backends/__init__.py +++ b/django/db/backends/__init__.py @@ -279,6 +279,8 @@ class BaseDatabaseFeatures(object): # integer primary keys. related_fields_match_type = False allow_sliced_subqueries = True + has_select_for_update = False + has_select_for_update_nowait = False # Does the default test database allow multiple connections? # Usually an indication that the test database is in-memory @@ -476,6 +478,15 @@ class BaseDatabaseOperations(object): """ return [] + def for_update_sql(self, nowait=False): + """ + Returns the FOR UPDATE SQL clause to lock rows for an update operation. + """ + if nowait: + return 'FOR UPDATE NOWAIT' + else: + return 'FOR UPDATE' + def fulltext_search_sql(self, field_name): """ Returns the SQL WHERE clause to use in order to perform a full-text diff --git a/django/db/backends/mysql/base.py b/django/db/backends/mysql/base.py index 630b9805c3d..16067c68071 100644 --- a/django/db/backends/mysql/base.py +++ b/django/db/backends/mysql/base.py @@ -124,6 +124,8 @@ class DatabaseFeatures(BaseDatabaseFeatures): allows_group_by_pk = True related_fields_match_type = True allow_sliced_subqueries = False + has_select_for_update = True + has_select_for_update_nowait = False supports_forward_references = False supports_long_model_names = False supports_microsecond_precision = False diff --git a/django/db/backends/oracle/base.py b/django/db/backends/oracle/base.py index dc70a1018cb..bcb906762b0 100644 --- a/django/db/backends/oracle/base.py +++ b/django/db/backends/oracle/base.py @@ -70,6 +70,8 @@ class DatabaseFeatures(BaseDatabaseFeatures): needs_datetime_string_cast = False interprets_empty_strings_as_nulls = True uses_savepoints = True + has_select_for_update = True + has_select_for_update_nowait = True can_return_id_from_insert = True allow_sliced_subqueries = False supports_subqueries_in_group_by = False diff --git a/django/db/backends/postgresql_psycopg2/base.py b/django/db/backends/postgresql_psycopg2/base.py index e0990a1089a..67e28774fe5 100644 --- a/django/db/backends/postgresql_psycopg2/base.py +++ b/django/db/backends/postgresql_psycopg2/base.py @@ -70,6 +70,9 @@ class DatabaseFeatures(BaseDatabaseFeatures): requires_rollback_on_dirty_transaction = True has_real_datatype = True can_defer_constraint_checks = True + has_select_for_update = True + has_select_for_update_nowait = True + class DatabaseWrapper(BaseDatabaseWrapper): vendor = 'postgresql' diff --git a/django/db/models/manager.py b/django/db/models/manager.py index 9528d668121..4fa4c4a52df 100644 --- a/django/db/models/manager.py +++ b/django/db/models/manager.py @@ -164,6 +164,9 @@ class Manager(object): def order_by(self, *args, **kwargs): return self.get_query_set().order_by(*args, **kwargs) + def select_for_update(self, *args, **kwargs): + return self.get_query_set().select_for_update(*args, **kwargs) + def select_related(self, *args, **kwargs): return self.get_query_set().select_related(*args, **kwargs) diff --git a/django/db/models/query.py b/django/db/models/query.py index fbd391971f0..8e33765fdfc 100644 --- a/django/db/models/query.py +++ b/django/db/models/query.py @@ -435,6 +435,7 @@ class QuerySet(object): del_query._for_write = True # Disable non-supported fields. + del_query.query.select_for_update = False del_query.query.select_related = False del_query.query.clear_ordering() @@ -583,6 +584,18 @@ class QuerySet(object): else: return self._filter_or_exclude(None, **filter_obj) + def select_for_update(self, **kwargs): + """ + Returns a new QuerySet instance that will select objects with a + FOR UPDATE lock. + """ + # Default to false for nowait + nowait = kwargs.pop('nowait', False) + obj = self._clone() + obj.query.select_for_update = True + obj.query.select_for_update_nowait = nowait + return obj + def select_related(self, *fields, **kwargs): """ Returns a new QuerySet instance that will select related objects. diff --git a/django/db/models/sql/compiler.py b/django/db/models/sql/compiler.py index d425c8b32b5..ace9cf4c883 100644 --- a/django/db/models/sql/compiler.py +++ b/django/db/models/sql/compiler.py @@ -1,11 +1,13 @@ from django.core.exceptions import FieldError from django.db import connections +from django.db import transaction from django.db.backends.util import truncate_name from django.db.models.sql.constants import * from django.db.models.sql.datastructures import EmptyResultSet from django.db.models.sql.expressions import SQLEvaluator from django.db.models.sql.query import get_proxied_model, get_order_dir, \ select_related_descend, Query +from django.db.utils import DatabaseError class SQLCompiler(object): def __init__(self, query, connection, using): @@ -117,6 +119,14 @@ class SQLCompiler(object): result.append('LIMIT %d' % val) result.append('OFFSET %d' % self.query.low_mark) + if self.query.select_for_update and self.connection.features.has_select_for_update: + # If we've been asked for a NOWAIT query but the backend does not support it, + # raise a DatabaseError otherwise we could get an unexpected deadlock. + nowait = self.query.select_for_update_nowait + if nowait and not self.connection.features.has_select_for_update_nowait: + raise DatabaseError('NOWAIT is not supported on this database backend.') + result.append(self.connection.ops.for_update_sql(nowait=nowait)) + return ' '.join(result), tuple(params) def as_nested_sql(self): @@ -677,6 +687,11 @@ class SQLCompiler(object): resolve_columns = hasattr(self, 'resolve_columns') fields = None has_aggregate_select = bool(self.query.aggregate_select) + # Set transaction dirty if we're using SELECT FOR UPDATE to ensure + # a subsequent commit/rollback is executed, so any database locks + # are released. + if self.query.select_for_update and transaction.is_managed(self.using): + transaction.set_dirty(self.using) for rows in self.execute_sql(MULTI): for row in rows: if resolve_columns: diff --git a/django/db/models/sql/query.py b/django/db/models/sql/query.py index 59f7bcbf14e..f41272cf0fa 100644 --- a/django/db/models/sql/query.py +++ b/django/db/models/sql/query.py @@ -125,6 +125,8 @@ class Query(object): self.order_by = [] self.low_mark, self.high_mark = 0, None # Used for offset/limit self.distinct = False + self.select_for_update = False + self.select_for_update_nowait = False self.select_related = False self.related_select_cols = [] @@ -254,6 +256,8 @@ class Query(object): obj.order_by = self.order_by[:] obj.low_mark, obj.high_mark = self.low_mark, self.high_mark obj.distinct = self.distinct + obj.select_for_update = self.select_for_update + obj.select_for_update_nowait = self.select_for_update_nowait obj.select_related = self.select_related obj.related_select_cols = [] obj.aggregates = copy.deepcopy(self.aggregates, memo=memo) @@ -360,6 +364,7 @@ class Query(object): query.clear_ordering(True) query.clear_limits() + query.select_for_update = False query.select_related = False query.related_select_cols = [] query.related_select_fields = [] diff --git a/docs/ref/databases.txt b/docs/ref/databases.txt index 1715afda6ba..fb97c9f0456 100644 --- a/docs/ref/databases.txt +++ b/docs/ref/databases.txt @@ -359,6 +359,13 @@ store a timezone-aware ``time`` or ``datetime`` to a :class:`~django.db.models.TimeField` or :class:`~django.db.models.DateTimeField` respectively, a ``ValueError`` is raised rather than truncating data. +Row locking with ``QuerySet.select_for_update()`` +------------------------------------------------- + +MySQL does not support the ``NOWAIT`` option to the ``SELECT ... FOR UPDATE`` +statement. If ``select_for_update()`` is used with ``nowait=True`` then a +``DatabaseError`` will be raised. + .. _sqlite-notes: SQLite notes @@ -493,6 +500,12 @@ If you're getting this error, you can solve it by: This will simply make SQLite wait a bit longer before throwing "database is locked" errors; it won't really do anything to solve them. +``QuerySet.select_for_update()`` not supported +---------------------------------------------- + +SQLite does not support the ``SELECT ... FOR UPDATE`` syntax. Calling it will +have no effect. + .. _oracle-notes: Oracle notes diff --git a/docs/ref/models/querysets.txt b/docs/ref/models/querysets.txt index badcf38f530..f05368bc0c5 100644 --- a/docs/ref/models/querysets.txt +++ b/docs/ref/models/querysets.txt @@ -966,6 +966,46 @@ For example:: # queries the database with the 'backup' alias >>> Entry.objects.using('backup') +select_for_update +~~~~~~~~~~~~~~~~~ + +.. method:: select_for_update(nowait=False) + +.. versionadded:: 1.4 + +Returns a queryset that will lock rows until the end of the transaction, +generating a ``SELECT ... FOR UPDATE`` SQL statement on supported databases. + +For example:: + + entries = Entry.objects.select_for_update().filter(author=request.user) + +All matched entries will be locked until the end of the transaction block, +meaning that other transactions will be prevented from changing or acquiring +locks on them. + +Usually, if another transaction has already acquired a lock on one of the +selected rows, the query will block until the lock is released. If this is +not the behaviour you want, call ``select_for_update(nowait=True)``. This will +make the call non-blocking. If a conflicting lock is already acquired by +another transaction, ``django.db.utils.DatabaseError`` will be raised when +the queryset is evaluated. + +Note that using ``select_for_update`` will cause the current transaction to be +set dirty, if under transaction management. This is to ensure that Django issues +a ``COMMIT`` or ``ROLLBACK``, releasing any locks held by the ``SELECT FOR +UPDATE``. + +Currently, the ``postgresql_psycopg2``, ``oracle``, and ``mysql`` +database backends support ``select_for_update()``. However, MySQL has no +support for the ``nowait`` argument. + +Passing ``nowait=True`` to ``select_for_update`` using database backends that +do not support ``nowait``, such as MySQL, will cause a ``DatabaseError`` to be +raised. This is in order to prevent code unexpectedly blocking. + +Using ``select_for_update`` on backends which do not support +``SELECT ... FOR UPDATE`` (such as SQLite) will have no effect. Methods that do not return QuerySets ------------------------------------ diff --git a/tests/modeltests/select_for_update/__init__.py b/tests/modeltests/select_for_update/__init__.py new file mode 100644 index 00000000000..792d6005489 --- /dev/null +++ b/tests/modeltests/select_for_update/__init__.py @@ -0,0 +1 @@ +# diff --git a/tests/modeltests/select_for_update/models.py b/tests/modeltests/select_for_update/models.py new file mode 100644 index 00000000000..a124d1f077a --- /dev/null +++ b/tests/modeltests/select_for_update/models.py @@ -0,0 +1,4 @@ +from django.db import models + +class Person(models.Model): + name = models.CharField(max_length=30) diff --git a/tests/modeltests/select_for_update/tests.py b/tests/modeltests/select_for_update/tests.py new file mode 100644 index 00000000000..514b9e46fd8 --- /dev/null +++ b/tests/modeltests/select_for_update/tests.py @@ -0,0 +1,262 @@ +import sys +import time +import unittest +from django.conf import settings +from django.db import transaction, connection +from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError +from django.test import (TransactionTestCase, skipIfDBFeature, + skipUnlessDBFeature) +from django.utils.functional import wraps +from django.utils import unittest + +from models import Person + +try: + import threading + def requires_threading(func): + return func +except ImportError: + # Note we can't use dummy_threading here, as our tests will actually + # block. We just have to skip the test completely. + def requires_threading(func): + @wraps(func) + def wrapped(*args, **kw): + raise unittest.SkipTest('threading required') + +class SelectForUpdateTests(TransactionTestCase): + + def setUp(self): + transaction.enter_transaction_management(True) + transaction.managed(True) + self.person = Person.objects.create(name='Reinhardt') + + # We have to commit here so that code in run_select_for_update can + # see this data. + transaction.commit() + + # We need another database connection to test that one connection + # issuing a SELECT ... FOR UPDATE will block. + new_connections = ConnectionHandler(settings.DATABASES) + self.new_connection = new_connections[DEFAULT_DB_ALIAS] + + # We need to set settings.DEBUG to True so we can capture + # the output SQL to examine. + self._old_debug = settings.DEBUG + settings.DEBUG = True + + def tearDown(self): + try: + # We don't really care if this fails - some of the tests will set + # this in the course of their run. + transaction.managed(False) + transaction.leave_transaction_management() + except transaction.TransactionManagementError: + pass + self.new_connection.close() + settings.DEBUG = self._old_debug + try: + self.end_blocking_transaction() + except (DatabaseError, AttributeError): + pass + + def start_blocking_transaction(self): + # Start a blocking transaction. At some point, + # end_blocking_transaction() should be called. + self.cursor = self.new_connection.cursor() + sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % { + 'db_table': Person._meta.db_table, + 'for_update': self.new_connection.ops.for_update_sql(), + } + self.cursor.execute(sql, ()) + result = self.cursor.fetchone() + + def end_blocking_transaction(self): + # Roll back the blocking transaction. + self.new_connection._rollback() + + def has_for_update_sql(self, tested_connection, nowait=False): + # Examine the SQL that was executed to determine whether it + # contains the 'SELECT..FOR UPDATE' stanza. + for_update_sql = tested_connection.ops.for_update_sql(nowait) + sql = tested_connection.queries[-1]['sql'] + return bool(sql.find(for_update_sql) > -1) + + def check_exc(self, exc): + self.failUnless(isinstance(exc, DatabaseError)) + + @skipUnlessDBFeature('has_select_for_update') + def test_for_update_sql_generated(self): + """ + Test that the backend's FOR UPDATE variant appears in + generated SQL when select_for_update is invoked. + """ + list(Person.objects.all().select_for_update()) + self.assertTrue(self.has_for_update_sql(connection)) + + @skipUnlessDBFeature('has_select_for_update_nowait') + def test_for_update_sql_generated_nowait(self): + """ + Test that the backend's FOR UPDATE NOWAIT variant appears in + generated SQL when select_for_update is invoked. + """ + list(Person.objects.all().select_for_update(nowait=True)) + self.assertTrue(self.has_for_update_sql(connection, nowait=True)) + + # In Python 2.6 beta and some final releases, exceptions raised in __len__ + # are swallowed (Python issue 1242657), so these cases return an empty + # list, rather than raising an exception. Not a lot we can do about that, + # unfortunately, due to the way Python handles list() calls internally. + # Thus, we skip this test for Python 2.6. + @requires_threading + @skipUnlessDBFeature('has_select_for_update_nowait') + @unittest.skipIf(sys.version_info[:2] == (2, 6), "Python version is 2.6") + def test_nowait_raises_error_on_block(self): + """ + If nowait is specified, we expect an error to be raised rather + than blocking. + """ + self.start_blocking_transaction() + status = [] + thread = threading.Thread( + target=self.run_select_for_update, + args=(status,), + kwargs={'nowait': True}, + ) + + thread.start() + time.sleep(1) + thread.join() + self.end_blocking_transaction() + self.check_exc(status[-1]) + + @skipIfDBFeature('has_select_for_update_nowait') + @skipUnlessDBFeature('has_select_for_update') + def test_unsupported_nowait_raises_error(self): + """ + If a SELECT...FOR UPDATE NOWAIT is run on a database backend + that supports FOR UPDATE but not NOWAIT, then we should find + that a DatabaseError is raised. + """ + self.assertRaises( + DatabaseError, + list, + Person.objects.all().select_for_update(nowait=True) + ) + + def run_select_for_update(self, status, nowait=False): + """ + Utility method that runs a SELECT FOR UPDATE against all + Person instances. After the select_for_update, it attempts + to update the name of the only record, save, and commit. + + In general, this will be run in a separate thread. + """ + status.append('started') + try: + # We need to enter transaction management again, as this is done on + # per-thread basis + transaction.enter_transaction_management(True) + transaction.managed(True) + people = list( + Person.objects.all().select_for_update(nowait=nowait) + ) + people[0].name = 'Fred' + people[0].save() + transaction.commit() + except DatabaseError, e: + status.append(e) + except Exception, e: + raise + + @requires_threading + @skipUnlessDBFeature('has_select_for_update') + @skipUnlessDBFeature('supports_transactions') + def test_block(self): + """ + Check that a thread running a select_for_update that + accesses rows being touched by a similar operation + on another connection blocks correctly. + """ + # First, let's start the transaction in our thread. + self.start_blocking_transaction() + + # Now, try it again using the ORM's select_for_update + # facility. Do this in a separate thread. + status = [] + thread = threading.Thread( + target=self.run_select_for_update, args=(status,) + ) + + # The thread should immediately block, but we'll sleep + # for a bit to make sure. + thread.start() + sanity_count = 0 + while len(status) != 1 and sanity_count < 10: + sanity_count += 1 + time.sleep(1) + if sanity_count >= 10: + raise ValueError, 'Thread did not run and block' + + # Check the person hasn't been updated. Since this isn't + # using FOR UPDATE, it won't block. + p = Person.objects.get(pk=self.person.pk) + self.assertEqual('Reinhardt', p.name) + + # When we end our blocking transaction, our thread should + # be able to continue. + self.end_blocking_transaction() + thread.join(5.0) + + # Check the thread has finished. Assuming it has, we should + # find that it has updated the person's name. + self.failIf(thread.isAlive()) + p = Person.objects.get(pk=self.person.pk) + self.assertEqual('Fred', p.name) + + @requires_threading + @skipUnlessDBFeature('has_select_for_update') + def test_raw_lock_not_available(self): + """ + Check that running a raw query which can't obtain a FOR UPDATE lock + raises the correct exception + """ + self.start_blocking_transaction() + def raw(status): + try: + list( + Person.objects.raw( + 'SELECT * FROM %s %s' % ( + Person._meta.db_table, + connection.ops.for_update_sql(nowait=True) + ) + ) + ) + except DatabaseError, e: + status.append(e) + status = [] + thread = threading.Thread(target=raw, kwargs={'status': status}) + thread.start() + time.sleep(1) + thread.join() + self.end_blocking_transaction() + self.check_exc(status[-1]) + + @skipUnlessDBFeature('has_select_for_update') + def test_transaction_dirty_managed(self): + """ Check that a select_for_update sets the transaction to be + dirty when executed under txn management. Setting the txn dirty + means that it will be either committed or rolled back by Django, + which will release any locks held by the SELECT FOR UPDATE. + """ + people = list(Person.objects.select_for_update()) + self.assertTrue(transaction.is_dirty()) + + @skipUnlessDBFeature('has_select_for_update') + def test_transaction_not_dirty_unmanaged(self): + """ If we're not under txn management, the txn will never be + marked as dirty. + """ + transaction.managed(False) + transaction.leave_transaction_management() + people = list(Person.objects.select_for_update()) + self.assertFalse(transaction.is_dirty())