From 00a1d4d042a7afd139316982c9b57e87d26a894f Mon Sep 17 00:00:00 2001 From: Andreas Pelme Date: Tue, 30 Jun 2015 18:18:56 +0200 Subject: [PATCH] Fixed #21803 -- Added support for post-commit callbacks Made it possible to register and run callbacks after a database transaction is committed with the `transaction.on_commit()` function. This patch is heavily based on Carl Meyers django-transaction-hooks . Thanks to Aymeric Augustin, Carl Meyer, and Tim Graham for review and feedback. --- django/db/backends/base/base.py | 68 ++++++++- django/db/transaction.py | 22 ++- docs/releases/1.9.txt | 13 ++ docs/topics/db/transactions.txt | 144 ++++++++++++++++++ tests/transaction_hooks/__init__.py | 0 tests/transaction_hooks/models.py | 10 ++ tests/transaction_hooks/tests.py | 220 ++++++++++++++++++++++++++++ 7 files changed, 462 insertions(+), 15 deletions(-) create mode 100644 tests/transaction_hooks/__init__.py create mode 100644 tests/transaction_hooks/models.py create mode 100644 tests/transaction_hooks/tests.py diff --git a/django/db/backends/base/base.py b/django/db/backends/base/base.py index 2390cd7940..96ca0ccfde 100644 --- a/django/db/backends/base/base.py +++ b/django/db/backends/base/base.py @@ -78,6 +78,15 @@ class BaseDatabaseWrapper(object): self.allow_thread_sharing = allow_thread_sharing self._thread_ident = thread.get_ident() + # A list of no-argument functions to run when the transaction commits. + # Each entry is an (sids, func) tuple, where sids is a set of the + # active savepoint IDs when this function was registered. + self.run_on_commit = [] + + # Should we run the on-commit hooks the next time set_autocommit(True) + # is called? + self.run_commit_hooks_on_set_autocommit_on = False + @cached_property def timezone(self): """ @@ -163,6 +172,8 @@ class BaseDatabaseWrapper(object): self.init_connection_state() connection_created.send(sender=self.__class__, connection=self) + self.run_on_commit = [] + def check_settings(self): if self.settings_dict['TIME_ZONE'] is not None: if not settings.USE_TZ: @@ -230,6 +241,7 @@ class BaseDatabaseWrapper(object): self._commit() # A successful commit means that the database connection works. self.errors_occurred = False + self.run_commit_hooks_on_set_autocommit_on = True def rollback(self): """ @@ -241,11 +253,15 @@ class BaseDatabaseWrapper(object): # A successful rollback means that the database connection works. self.errors_occurred = False + self.run_on_commit = [] + def close(self): """ Closes the connection to the database. """ self.validate_thread_sharing() + self.run_on_commit = [] + # Don't call validate_no_atomic_block() to avoid making it difficult # to get rid of a connection in an invalid state. The next connect() # will reset the transaction state anyway. @@ -310,6 +326,11 @@ class BaseDatabaseWrapper(object): self.validate_thread_sharing() self._savepoint_rollback(sid) + # Remove any callbacks registered while this savepoint was active. + self.run_on_commit = [ + (sids, func) for (sids, func) in self.run_on_commit if sid not in sids + ] + def savepoint_commit(self, sid): """ Releases a savepoint. Does nothing if savepoints are not supported. @@ -343,15 +364,38 @@ class BaseDatabaseWrapper(object): self.ensure_connection() return self.autocommit - def set_autocommit(self, autocommit): + def set_autocommit(self, autocommit, force_begin_transaction_with_broken_autocommit=False): """ Enable or disable autocommit. + + The usual way to start a transaction is to turn autocommit off. + SQLite does not properly start a transaction when disabling + autocommit. To avoid this buggy behavior and to actually enter a new + transaction, an explcit BEGIN is required. Using + force_begin_transaction_with_broken_autocommit=True will issue an + explicit BEGIN with SQLite. This option will be ignored for other + backends. """ self.validate_no_atomic_block() self.ensure_connection() - self._set_autocommit(autocommit) + + start_transaction_under_autocommit = ( + force_begin_transaction_with_broken_autocommit + and not autocommit + and self.features.autocommits_when_autocommit_is_off + ) + + if start_transaction_under_autocommit: + self._start_transaction_under_autocommit() + else: + self._set_autocommit(autocommit) + self.autocommit = autocommit + if autocommit and self.run_commit_hooks_on_set_autocommit_on: + self.run_and_clear_commit_hooks() + self.run_commit_hooks_on_set_autocommit_on = False + def get_rollback(self): """ Get the "needs rollback" flag -- for *advanced use* only. @@ -558,3 +602,23 @@ class BaseDatabaseWrapper(object): raise NotImplementedError( 'The SchemaEditorClass attribute of this database wrapper is still None') return self.SchemaEditorClass(self, *args, **kwargs) + + def on_commit(self, func): + if self.in_atomic_block: + # Transaction in progress; save for execution on commit. + self.run_on_commit.append((set(self.savepoint_ids), func)) + elif not self.get_autocommit(): + raise TransactionManagementError('on_commit() cannot be used in manual transaction management') + else: + # No transaction in progress and in autocommit mode; execute + # immediately. + func() + + def run_and_clear_commit_hooks(self): + self.validate_no_atomic_block() + try: + while self.run_on_commit: + sids, func = self.run_on_commit.pop(0) + func() + finally: + self.run_on_commit = [] diff --git a/django/db/transaction.py b/django/db/transaction.py index d1388675d5..6c174bf2d3 100644 --- a/django/db/transaction.py +++ b/django/db/transaction.py @@ -103,6 +103,14 @@ def set_rollback(rollback, using=None): return get_connection(using).set_rollback(rollback) +def on_commit(func, using=None): + """ + Register `func` to be called when the current transaction is committed. + If the current transaction is rolled back, `func` will not be called. + """ + get_connection(using).on_commit(func) + + ################################# # Decorators / context managers # ################################# @@ -180,17 +188,7 @@ class Atomic(ContextDecorator): else: connection.savepoint_ids.append(None) else: - # We aren't in a transaction yet; create one. - # The usual way to start a transaction is to turn autocommit off. - # However, some database adapters (namely sqlite3) don't handle - # transactions and savepoints properly when autocommit is off. - # In such cases, start an explicit transaction instead, which has - # the side-effect of disabling autocommit. - if connection.features.autocommits_when_autocommit_is_off: - connection._start_transaction_under_autocommit() - connection.autocommit = False - else: - connection.set_autocommit(False) + connection.set_autocommit(False, force_begin_transaction_with_broken_autocommit=True) connection.in_atomic_block = True def __exit__(self, exc_type, exc_value, traceback): @@ -272,8 +270,6 @@ class Atomic(ContextDecorator): if not connection.in_atomic_block: if connection.closed_in_transaction: connection.connection = None - elif connection.features.autocommits_when_autocommit_is_off: - connection.autocommit = True else: connection.set_autocommit(True) # Outermost block exit when autocommit was disabled. diff --git a/docs/releases/1.9.txt b/docs/releases/1.9.txt index a4cbd7e46d..2716932d7b 100644 --- a/docs/releases/1.9.txt +++ b/docs/releases/1.9.txt @@ -25,6 +25,19 @@ Python 3.2 and 3.3, and added support for Python 3.5. What's new in Django 1.9 ======================== +Performing actions after a transaction commit +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The new :func:`~django.db.transaction.on_commit` hook allows performing actions +after a database transaction is successfully committed. This is useful for +tasks such as sending notification emails, creating queued tasks, or +invalidating caches. + +This functionality from the `django-transaction-hooks`_ package has been +integrated into Django. + +.. _django-transaction-hooks: https://pypi.python.org/pypi/django-transaction-hooks + Password validation ~~~~~~~~~~~~~~~~~~~ diff --git a/docs/topics/db/transactions.txt b/docs/topics/db/transactions.txt index 6b54510a47..b507b3ff67 100644 --- a/docs/topics/db/transactions.txt +++ b/docs/topics/db/transactions.txt @@ -252,6 +252,150 @@ by Django or by third-party libraries. Thus, this is best used in situations where you want to run your own transaction-controlling middleware or do something really strange. +Performing actions after commit +=============================== + +.. versionadded:: 1.9 + +Sometimes you need to perform an action related to the current database +transaction, but only if the transaction successfully commits. Examples might +include a `Celery`_ task, an email notification, or a cache invalidation. + +.. _Celery: http://www.celeryproject.org/ + +Django provides the :func:`on_commit` function to register callback functions +that should be executed after a transaction is successfully committed: + +.. function:: on_commit(func, using=None) + +Pass any function (that takes no arguments) to :func:`on_commit`:: + + from django.db import transaction + + def do_something(): + pass # send a mail, invalidate a cache, fire off a Celery task, etc. + + transaction.on_commit(do_something) + +You can also wrap your function in a lambda:: + + transaction.on_commit(lambda: some_celery_task.delay('arg1')) + +The function you pass in will be called immediately after a hypothetical +database write made where ``on_commit()`` is called would be successfully +committed. + +If you call ``on_commit()`` while there isn't an active transaction, the +callback will be executed immediately. + +If that hypothetical database write is instead rolled back (typically when an +unhandled exception is raised in an :func:`atomic` block), your function will +be discarded and never called. + +Savepoints +---------- + +Savepoints (i.e. nested :func:`atomic` blocks) are handled correctly. That is, +an :func:`on_commit` callable registered after a savepoint (in a nested +:func:`atomic` block) will be called after the outer transaction is committed, +but not if a rollback to that savepoint or any previous savepoint occurred +during the transaction:: + + with transaction.atomic(): # Outer atomic, start a new transaction + transaction.on_commit(foo) + + with transaction.atomic(): # Inner atomic block, create a savepoint + transaction.on_commit(bar) + + # foo() and then bar() will be called when leaving the outermost block + +On the other hand, when a savepoint is rolled back (due to an exception being +raised), the inner callable will not be called:: + + with transaction.atomic(): # Outer atomic, start a new transaction + transaction.on_commit(foo) + + try: + with transaction.atomic(): # Inner atomic block, create a savepoint + transaction.on_commit(bar) + raise SomeError() # Raising an exception - abort the savepoint + except SomeError: + pass + + # foo() will be called, but not bar() + +Order of execution +------------------ + +On-commit functions for a given transaction are executed in the order they were +registered. + +Exception handling +------------------ + +If one on-commit function within a given transaction raises an uncaught +exception, no later registered functions in that same transaction will run. +This is, of course, the same behavior as if you'd executed the functions +sequentially yourself without :func:`on_commit`. + +Timing of execution +------------------- + +Your callbacks are executed *after* a successful commit, so a failure in a +callback will not cause the transaction to roll back. They are executed +conditionally upon the success of the transaction, but they are not *part* of +the transaction. For the intended use cases (mail notifications, Celery tasks, +etc.), this should be fine. If it's not (if your follow-up action is so +critical that its failure should mean the failure of the transaction itself), +then you don't want to use the :func:`on_commit` hook. Instead, you may want +`two-phase commit`_ such as the `psycopg Two-Phase Commit protocol support`_ +and the `optional Two-Phase Commit Extensions in the Python DB-API +specification`_. + +Callbacks are not run until autocommit is restored on the connection following +the commit (because otherwise any queries done in a callback would open an +implicit transaction, preventing the connection from going back into autocommit +mode). + +When in autocommit mode and outside of an :func:`atomic` block, the function +will run immediately, not on commit. + +On-commit functions only work with :ref:`autocommit mode ` +and the :func:`atomic` (or :setting:`ATOMIC_REQUESTS +`) transaction API. Calling :func:`on_commit` when +autocommit is disabled and you are not within an atomic block will result in an +error. + +.. _two-phase commit: http://en.wikipedia.org/wiki/Two-phase_commit_protocol +.. _psycopg Two-Phase Commit protocol support: http://initd.org/psycopg/docs/usage.html#tpc +.. _optional Two-Phase Commit Extensions in the Python DB-API specification: https://www.python.org/dev/peps/pep-0249/#optional-two-phase-commit-extensions + +Use in tests +------------ + +Django's :class:`~django.test.TestCase` class wraps each test in a transaction +and rolls back that transaction after each test, in order to provide test +isolation. This means that no transaction is ever actually committed, thus your +:func:`on_commit` callbacks will never be run. If you need to test the results +of an :func:`on_commit` callback, use a +:class:`~django.test.TransactionTestCase` instead. + +Why no rollback hook? +--------------------- + +A rollback hook is harder to implement robustly than a commit hook, since a +variety of things can cause an implicit rollback. + +For instance, if your database connection is dropped because your process was +killed without a chance to shut down gracefully, your rollback hook will never +run. + +The solution is simple: instead of doing something during the atomic block +(transaction) and then undoing it if the transaction fails, use +:func:`on_commit` to delay doing it in the first place until after the +transaction succeeds. It’s a lot easier to undo something you never did in the +first place! + Low-level APIs ============== diff --git a/tests/transaction_hooks/__init__.py b/tests/transaction_hooks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/transaction_hooks/models.py b/tests/transaction_hooks/models.py new file mode 100644 index 0000000000..cd2f22b514 --- /dev/null +++ b/tests/transaction_hooks/models.py @@ -0,0 +1,10 @@ +from django.db import models +from django.utils.encoding import python_2_unicode_compatible + + +@python_2_unicode_compatible +class Thing(models.Model): + num = models.IntegerField() + + def __str__(self): + return "Thing %d" % self.num diff --git a/tests/transaction_hooks/tests.py b/tests/transaction_hooks/tests.py new file mode 100644 index 0000000000..ebf07bc656 --- /dev/null +++ b/tests/transaction_hooks/tests.py @@ -0,0 +1,220 @@ +from django.db import connection, transaction +from django.test import TransactionTestCase, skipUnlessDBFeature + +from .models import Thing + + +class ForcedError(Exception): + pass + + +class TestConnectionOnCommit(TransactionTestCase): + """ + Tests for transaction.on_commit(). + + Creation/checking of database objects in parallel with callback tracking is + to verify that the behavior of the two match in all tested cases. + """ + available_apps = ['transaction_hooks'] + + def setUp(self): + self.notified = [] + + def notify(self, id_): + if id_ == 'error': + raise ForcedError() + self.notified.append(id_) + + def do(self, num): + """Create a Thing instance and notify about it.""" + Thing.objects.create(num=num) + transaction.on_commit(lambda: self.notify(num)) + + def assertDone(self, nums): + self.assertNotified(nums) + self.assertEqual(sorted(t.num for t in Thing.objects.all()), sorted(nums)) + + def assertNotified(self, nums): + self.assertEqual(self.notified, nums) + + def test_executes_immediately_if_no_transaction(self): + self.do(1) + self.assertDone([1]) + + def test_delays_execution_until_after_transaction_commit(self): + with transaction.atomic(): + self.do(1) + self.assertNotified([]) + self.assertDone([1]) + + def test_does_not_execute_if_transaction_rolled_back(self): + try: + with transaction.atomic(): + self.do(1) + raise ForcedError() + except ForcedError: + pass + + self.assertDone([]) + + def test_executes_only_after_final_transaction_committed(self): + with transaction.atomic(): + with transaction.atomic(): + self.do(1) + self.assertNotified([]) + self.assertNotified([]) + self.assertDone([1]) + + def test_discards_hooks_from_rolled_back_savepoint(self): + with transaction.atomic(): + # one successful savepoint + with transaction.atomic(): + self.do(1) + # one failed savepoint + try: + with transaction.atomic(): + self.do(2) + raise ForcedError() + except ForcedError: + pass + # another successful savepoint + with transaction.atomic(): + self.do(3) + + # only hooks registered during successful savepoints execute + self.assertDone([1, 3]) + + def test_no_hooks_run_from_failed_transaction(self): + """If outer transaction fails, no hooks from within it run.""" + try: + with transaction.atomic(): + with transaction.atomic(): + self.do(1) + raise ForcedError() + except ForcedError: + pass + + self.assertDone([]) + + def test_inner_savepoint_rolled_back_with_outer(self): + with transaction.atomic(): + try: + with transaction.atomic(): + with transaction.atomic(): + self.do(1) + raise ForcedError() + except ForcedError: + pass + self.do(2) + + self.assertDone([2]) + + def test_no_savepoints_atomic_merged_with_outer(self): + with transaction.atomic(): + with transaction.atomic(): + self.do(1) + try: + with transaction.atomic(savepoint=False): + raise ForcedError() + except ForcedError: + pass + + self.assertDone([]) + + def test_inner_savepoint_does_not_affect_outer(self): + with transaction.atomic(): + with transaction.atomic(): + self.do(1) + try: + with transaction.atomic(): + raise ForcedError() + except ForcedError: + pass + + self.assertDone([1]) + + def test_runs_hooks_in_order_registered(self): + with transaction.atomic(): + self.do(1) + with transaction.atomic(): + self.do(2) + self.do(3) + + self.assertDone([1, 2, 3]) + + def test_hooks_cleared_after_successful_commit(self): + with transaction.atomic(): + self.do(1) + with transaction.atomic(): + self.do(2) + + self.assertDone([1, 2]) # not [1, 1, 2] + + def test_hooks_cleared_after_rollback(self): + try: + with transaction.atomic(): + self.do(1) + raise ForcedError() + except ForcedError: + pass + + with transaction.atomic(): + self.do(2) + + self.assertDone([2]) + + @skipUnlessDBFeature('test_db_allows_multiple_connections') + def test_hooks_cleared_on_reconnect(self): + with transaction.atomic(): + self.do(1) + connection.close() + + connection.connect() + + with transaction.atomic(): + self.do(2) + + self.assertDone([2]) + + def test_error_in_hook_doesnt_prevent_clearing_hooks(self): + try: + with transaction.atomic(): + transaction.on_commit(lambda: self.notify('error')) + except ForcedError: + pass + + with transaction.atomic(): + self.do(1) + + self.assertDone([1]) + + def test_db_query_in_hook(self): + with transaction.atomic(): + Thing.objects.create(num=1) + transaction.on_commit( + lambda: [self.notify(t.num) for t in Thing.objects.all()] + ) + + self.assertDone([1]) + + def test_transaction_in_hook(self): + def on_commit(): + with transaction.atomic(): + t = Thing.objects.create(num=1) + self.notify(t.num) + + with transaction.atomic(): + transaction.on_commit(on_commit) + + self.assertDone([1]) + + def test_raises_exception_non_autocommit_mode(self): + def should_never_be_called(): + raise AssertionError('this function should never be called') + + try: + connection.set_autocommit(False) + with self.assertRaises(transaction.TransactionManagementError): + transaction.on_commit(should_never_be_called) + finally: + connection.set_autocommit(True)