Fixed #21803 -- Added support for post-commit callbacks
Made it possible to register and run callbacks after a database transaction is committed with the `transaction.on_commit()` function. This patch is heavily based on Carl Meyers django-transaction-hooks <https://django-transaction-hooks.readthedocs.org/>. Thanks to Aymeric Augustin, Carl Meyer, and Tim Graham for review and feedback.
This commit is contained in:
parent
9f0d67137c
commit
00a1d4d042
|
@ -78,6 +78,15 @@ class BaseDatabaseWrapper(object):
|
|||
self.allow_thread_sharing = allow_thread_sharing
|
||||
self._thread_ident = thread.get_ident()
|
||||
|
||||
# A list of no-argument functions to run when the transaction commits.
|
||||
# Each entry is an (sids, func) tuple, where sids is a set of the
|
||||
# active savepoint IDs when this function was registered.
|
||||
self.run_on_commit = []
|
||||
|
||||
# Should we run the on-commit hooks the next time set_autocommit(True)
|
||||
# is called?
|
||||
self.run_commit_hooks_on_set_autocommit_on = False
|
||||
|
||||
@cached_property
|
||||
def timezone(self):
|
||||
"""
|
||||
|
@ -163,6 +172,8 @@ class BaseDatabaseWrapper(object):
|
|||
self.init_connection_state()
|
||||
connection_created.send(sender=self.__class__, connection=self)
|
||||
|
||||
self.run_on_commit = []
|
||||
|
||||
def check_settings(self):
|
||||
if self.settings_dict['TIME_ZONE'] is not None:
|
||||
if not settings.USE_TZ:
|
||||
|
@ -230,6 +241,7 @@ class BaseDatabaseWrapper(object):
|
|||
self._commit()
|
||||
# A successful commit means that the database connection works.
|
||||
self.errors_occurred = False
|
||||
self.run_commit_hooks_on_set_autocommit_on = True
|
||||
|
||||
def rollback(self):
|
||||
"""
|
||||
|
@ -241,11 +253,15 @@ class BaseDatabaseWrapper(object):
|
|||
# A successful rollback means that the database connection works.
|
||||
self.errors_occurred = False
|
||||
|
||||
self.run_on_commit = []
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Closes the connection to the database.
|
||||
"""
|
||||
self.validate_thread_sharing()
|
||||
self.run_on_commit = []
|
||||
|
||||
# Don't call validate_no_atomic_block() to avoid making it difficult
|
||||
# to get rid of a connection in an invalid state. The next connect()
|
||||
# will reset the transaction state anyway.
|
||||
|
@ -310,6 +326,11 @@ class BaseDatabaseWrapper(object):
|
|||
self.validate_thread_sharing()
|
||||
self._savepoint_rollback(sid)
|
||||
|
||||
# Remove any callbacks registered while this savepoint was active.
|
||||
self.run_on_commit = [
|
||||
(sids, func) for (sids, func) in self.run_on_commit if sid not in sids
|
||||
]
|
||||
|
||||
def savepoint_commit(self, sid):
|
||||
"""
|
||||
Releases a savepoint. Does nothing if savepoints are not supported.
|
||||
|
@ -343,15 +364,38 @@ class BaseDatabaseWrapper(object):
|
|||
self.ensure_connection()
|
||||
return self.autocommit
|
||||
|
||||
def set_autocommit(self, autocommit):
|
||||
def set_autocommit(self, autocommit, force_begin_transaction_with_broken_autocommit=False):
|
||||
"""
|
||||
Enable or disable autocommit.
|
||||
|
||||
The usual way to start a transaction is to turn autocommit off.
|
||||
SQLite does not properly start a transaction when disabling
|
||||
autocommit. To avoid this buggy behavior and to actually enter a new
|
||||
transaction, an explcit BEGIN is required. Using
|
||||
force_begin_transaction_with_broken_autocommit=True will issue an
|
||||
explicit BEGIN with SQLite. This option will be ignored for other
|
||||
backends.
|
||||
"""
|
||||
self.validate_no_atomic_block()
|
||||
self.ensure_connection()
|
||||
self._set_autocommit(autocommit)
|
||||
|
||||
start_transaction_under_autocommit = (
|
||||
force_begin_transaction_with_broken_autocommit
|
||||
and not autocommit
|
||||
and self.features.autocommits_when_autocommit_is_off
|
||||
)
|
||||
|
||||
if start_transaction_under_autocommit:
|
||||
self._start_transaction_under_autocommit()
|
||||
else:
|
||||
self._set_autocommit(autocommit)
|
||||
|
||||
self.autocommit = autocommit
|
||||
|
||||
if autocommit and self.run_commit_hooks_on_set_autocommit_on:
|
||||
self.run_and_clear_commit_hooks()
|
||||
self.run_commit_hooks_on_set_autocommit_on = False
|
||||
|
||||
def get_rollback(self):
|
||||
"""
|
||||
Get the "needs rollback" flag -- for *advanced use* only.
|
||||
|
@ -558,3 +602,23 @@ class BaseDatabaseWrapper(object):
|
|||
raise NotImplementedError(
|
||||
'The SchemaEditorClass attribute of this database wrapper is still None')
|
||||
return self.SchemaEditorClass(self, *args, **kwargs)
|
||||
|
||||
def on_commit(self, func):
|
||||
if self.in_atomic_block:
|
||||
# Transaction in progress; save for execution on commit.
|
||||
self.run_on_commit.append((set(self.savepoint_ids), func))
|
||||
elif not self.get_autocommit():
|
||||
raise TransactionManagementError('on_commit() cannot be used in manual transaction management')
|
||||
else:
|
||||
# No transaction in progress and in autocommit mode; execute
|
||||
# immediately.
|
||||
func()
|
||||
|
||||
def run_and_clear_commit_hooks(self):
|
||||
self.validate_no_atomic_block()
|
||||
try:
|
||||
while self.run_on_commit:
|
||||
sids, func = self.run_on_commit.pop(0)
|
||||
func()
|
||||
finally:
|
||||
self.run_on_commit = []
|
||||
|
|
|
@ -103,6 +103,14 @@ def set_rollback(rollback, using=None):
|
|||
return get_connection(using).set_rollback(rollback)
|
||||
|
||||
|
||||
def on_commit(func, using=None):
|
||||
"""
|
||||
Register `func` to be called when the current transaction is committed.
|
||||
If the current transaction is rolled back, `func` will not be called.
|
||||
"""
|
||||
get_connection(using).on_commit(func)
|
||||
|
||||
|
||||
#################################
|
||||
# Decorators / context managers #
|
||||
#################################
|
||||
|
@ -180,17 +188,7 @@ class Atomic(ContextDecorator):
|
|||
else:
|
||||
connection.savepoint_ids.append(None)
|
||||
else:
|
||||
# We aren't in a transaction yet; create one.
|
||||
# The usual way to start a transaction is to turn autocommit off.
|
||||
# However, some database adapters (namely sqlite3) don't handle
|
||||
# transactions and savepoints properly when autocommit is off.
|
||||
# In such cases, start an explicit transaction instead, which has
|
||||
# the side-effect of disabling autocommit.
|
||||
if connection.features.autocommits_when_autocommit_is_off:
|
||||
connection._start_transaction_under_autocommit()
|
||||
connection.autocommit = False
|
||||
else:
|
||||
connection.set_autocommit(False)
|
||||
connection.set_autocommit(False, force_begin_transaction_with_broken_autocommit=True)
|
||||
connection.in_atomic_block = True
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
|
@ -272,8 +270,6 @@ class Atomic(ContextDecorator):
|
|||
if not connection.in_atomic_block:
|
||||
if connection.closed_in_transaction:
|
||||
connection.connection = None
|
||||
elif connection.features.autocommits_when_autocommit_is_off:
|
||||
connection.autocommit = True
|
||||
else:
|
||||
connection.set_autocommit(True)
|
||||
# Outermost block exit when autocommit was disabled.
|
||||
|
|
|
@ -25,6 +25,19 @@ Python 3.2 and 3.3, and added support for Python 3.5.
|
|||
What's new in Django 1.9
|
||||
========================
|
||||
|
||||
Performing actions after a transaction commit
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The new :func:`~django.db.transaction.on_commit` hook allows performing actions
|
||||
after a database transaction is successfully committed. This is useful for
|
||||
tasks such as sending notification emails, creating queued tasks, or
|
||||
invalidating caches.
|
||||
|
||||
This functionality from the `django-transaction-hooks`_ package has been
|
||||
integrated into Django.
|
||||
|
||||
.. _django-transaction-hooks: https://pypi.python.org/pypi/django-transaction-hooks
|
||||
|
||||
Password validation
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
|
|
@ -252,6 +252,150 @@ by Django or by third-party libraries. Thus, this is best used in situations
|
|||
where you want to run your own transaction-controlling middleware or do
|
||||
something really strange.
|
||||
|
||||
Performing actions after commit
|
||||
===============================
|
||||
|
||||
.. versionadded:: 1.9
|
||||
|
||||
Sometimes you need to perform an action related to the current database
|
||||
transaction, but only if the transaction successfully commits. Examples might
|
||||
include a `Celery`_ task, an email notification, or a cache invalidation.
|
||||
|
||||
.. _Celery: http://www.celeryproject.org/
|
||||
|
||||
Django provides the :func:`on_commit` function to register callback functions
|
||||
that should be executed after a transaction is successfully committed:
|
||||
|
||||
.. function:: on_commit(func, using=None)
|
||||
|
||||
Pass any function (that takes no arguments) to :func:`on_commit`::
|
||||
|
||||
from django.db import transaction
|
||||
|
||||
def do_something():
|
||||
pass # send a mail, invalidate a cache, fire off a Celery task, etc.
|
||||
|
||||
transaction.on_commit(do_something)
|
||||
|
||||
You can also wrap your function in a lambda::
|
||||
|
||||
transaction.on_commit(lambda: some_celery_task.delay('arg1'))
|
||||
|
||||
The function you pass in will be called immediately after a hypothetical
|
||||
database write made where ``on_commit()`` is called would be successfully
|
||||
committed.
|
||||
|
||||
If you call ``on_commit()`` while there isn't an active transaction, the
|
||||
callback will be executed immediately.
|
||||
|
||||
If that hypothetical database write is instead rolled back (typically when an
|
||||
unhandled exception is raised in an :func:`atomic` block), your function will
|
||||
be discarded and never called.
|
||||
|
||||
Savepoints
|
||||
----------
|
||||
|
||||
Savepoints (i.e. nested :func:`atomic` blocks) are handled correctly. That is,
|
||||
an :func:`on_commit` callable registered after a savepoint (in a nested
|
||||
:func:`atomic` block) will be called after the outer transaction is committed,
|
||||
but not if a rollback to that savepoint or any previous savepoint occurred
|
||||
during the transaction::
|
||||
|
||||
with transaction.atomic(): # Outer atomic, start a new transaction
|
||||
transaction.on_commit(foo)
|
||||
|
||||
with transaction.atomic(): # Inner atomic block, create a savepoint
|
||||
transaction.on_commit(bar)
|
||||
|
||||
# foo() and then bar() will be called when leaving the outermost block
|
||||
|
||||
On the other hand, when a savepoint is rolled back (due to an exception being
|
||||
raised), the inner callable will not be called::
|
||||
|
||||
with transaction.atomic(): # Outer atomic, start a new transaction
|
||||
transaction.on_commit(foo)
|
||||
|
||||
try:
|
||||
with transaction.atomic(): # Inner atomic block, create a savepoint
|
||||
transaction.on_commit(bar)
|
||||
raise SomeError() # Raising an exception - abort the savepoint
|
||||
except SomeError:
|
||||
pass
|
||||
|
||||
# foo() will be called, but not bar()
|
||||
|
||||
Order of execution
|
||||
------------------
|
||||
|
||||
On-commit functions for a given transaction are executed in the order they were
|
||||
registered.
|
||||
|
||||
Exception handling
|
||||
------------------
|
||||
|
||||
If one on-commit function within a given transaction raises an uncaught
|
||||
exception, no later registered functions in that same transaction will run.
|
||||
This is, of course, the same behavior as if you'd executed the functions
|
||||
sequentially yourself without :func:`on_commit`.
|
||||
|
||||
Timing of execution
|
||||
-------------------
|
||||
|
||||
Your callbacks are executed *after* a successful commit, so a failure in a
|
||||
callback will not cause the transaction to roll back. They are executed
|
||||
conditionally upon the success of the transaction, but they are not *part* of
|
||||
the transaction. For the intended use cases (mail notifications, Celery tasks,
|
||||
etc.), this should be fine. If it's not (if your follow-up action is so
|
||||
critical that its failure should mean the failure of the transaction itself),
|
||||
then you don't want to use the :func:`on_commit` hook. Instead, you may want
|
||||
`two-phase commit`_ such as the `psycopg Two-Phase Commit protocol support`_
|
||||
and the `optional Two-Phase Commit Extensions in the Python DB-API
|
||||
specification`_.
|
||||
|
||||
Callbacks are not run until autocommit is restored on the connection following
|
||||
the commit (because otherwise any queries done in a callback would open an
|
||||
implicit transaction, preventing the connection from going back into autocommit
|
||||
mode).
|
||||
|
||||
When in autocommit mode and outside of an :func:`atomic` block, the function
|
||||
will run immediately, not on commit.
|
||||
|
||||
On-commit functions only work with :ref:`autocommit mode <managing-autocommit>`
|
||||
and the :func:`atomic` (or :setting:`ATOMIC_REQUESTS
|
||||
<DATABASE-ATOMIC_REQUESTS>`) transaction API. Calling :func:`on_commit` when
|
||||
autocommit is disabled and you are not within an atomic block will result in an
|
||||
error.
|
||||
|
||||
.. _two-phase commit: http://en.wikipedia.org/wiki/Two-phase_commit_protocol
|
||||
.. _psycopg Two-Phase Commit protocol support: http://initd.org/psycopg/docs/usage.html#tpc
|
||||
.. _optional Two-Phase Commit Extensions in the Python DB-API specification: https://www.python.org/dev/peps/pep-0249/#optional-two-phase-commit-extensions
|
||||
|
||||
Use in tests
|
||||
------------
|
||||
|
||||
Django's :class:`~django.test.TestCase` class wraps each test in a transaction
|
||||
and rolls back that transaction after each test, in order to provide test
|
||||
isolation. This means that no transaction is ever actually committed, thus your
|
||||
:func:`on_commit` callbacks will never be run. If you need to test the results
|
||||
of an :func:`on_commit` callback, use a
|
||||
:class:`~django.test.TransactionTestCase` instead.
|
||||
|
||||
Why no rollback hook?
|
||||
---------------------
|
||||
|
||||
A rollback hook is harder to implement robustly than a commit hook, since a
|
||||
variety of things can cause an implicit rollback.
|
||||
|
||||
For instance, if your database connection is dropped because your process was
|
||||
killed without a chance to shut down gracefully, your rollback hook will never
|
||||
run.
|
||||
|
||||
The solution is simple: instead of doing something during the atomic block
|
||||
(transaction) and then undoing it if the transaction fails, use
|
||||
:func:`on_commit` to delay doing it in the first place until after the
|
||||
transaction succeeds. It’s a lot easier to undo something you never did in the
|
||||
first place!
|
||||
|
||||
Low-level APIs
|
||||
==============
|
||||
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
from django.db import models
|
||||
from django.utils.encoding import python_2_unicode_compatible
|
||||
|
||||
|
||||
@python_2_unicode_compatible
|
||||
class Thing(models.Model):
|
||||
num = models.IntegerField()
|
||||
|
||||
def __str__(self):
|
||||
return "Thing %d" % self.num
|
|
@ -0,0 +1,220 @@
|
|||
from django.db import connection, transaction
|
||||
from django.test import TransactionTestCase, skipUnlessDBFeature
|
||||
|
||||
from .models import Thing
|
||||
|
||||
|
||||
class ForcedError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TestConnectionOnCommit(TransactionTestCase):
|
||||
"""
|
||||
Tests for transaction.on_commit().
|
||||
|
||||
Creation/checking of database objects in parallel with callback tracking is
|
||||
to verify that the behavior of the two match in all tested cases.
|
||||
"""
|
||||
available_apps = ['transaction_hooks']
|
||||
|
||||
def setUp(self):
|
||||
self.notified = []
|
||||
|
||||
def notify(self, id_):
|
||||
if id_ == 'error':
|
||||
raise ForcedError()
|
||||
self.notified.append(id_)
|
||||
|
||||
def do(self, num):
|
||||
"""Create a Thing instance and notify about it."""
|
||||
Thing.objects.create(num=num)
|
||||
transaction.on_commit(lambda: self.notify(num))
|
||||
|
||||
def assertDone(self, nums):
|
||||
self.assertNotified(nums)
|
||||
self.assertEqual(sorted(t.num for t in Thing.objects.all()), sorted(nums))
|
||||
|
||||
def assertNotified(self, nums):
|
||||
self.assertEqual(self.notified, nums)
|
||||
|
||||
def test_executes_immediately_if_no_transaction(self):
|
||||
self.do(1)
|
||||
self.assertDone([1])
|
||||
|
||||
def test_delays_execution_until_after_transaction_commit(self):
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
self.assertNotified([])
|
||||
self.assertDone([1])
|
||||
|
||||
def test_does_not_execute_if_transaction_rolled_back(self):
|
||||
try:
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
raise ForcedError()
|
||||
except ForcedError:
|
||||
pass
|
||||
|
||||
self.assertDone([])
|
||||
|
||||
def test_executes_only_after_final_transaction_committed(self):
|
||||
with transaction.atomic():
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
self.assertNotified([])
|
||||
self.assertNotified([])
|
||||
self.assertDone([1])
|
||||
|
||||
def test_discards_hooks_from_rolled_back_savepoint(self):
|
||||
with transaction.atomic():
|
||||
# one successful savepoint
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
# one failed savepoint
|
||||
try:
|
||||
with transaction.atomic():
|
||||
self.do(2)
|
||||
raise ForcedError()
|
||||
except ForcedError:
|
||||
pass
|
||||
# another successful savepoint
|
||||
with transaction.atomic():
|
||||
self.do(3)
|
||||
|
||||
# only hooks registered during successful savepoints execute
|
||||
self.assertDone([1, 3])
|
||||
|
||||
def test_no_hooks_run_from_failed_transaction(self):
|
||||
"""If outer transaction fails, no hooks from within it run."""
|
||||
try:
|
||||
with transaction.atomic():
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
raise ForcedError()
|
||||
except ForcedError:
|
||||
pass
|
||||
|
||||
self.assertDone([])
|
||||
|
||||
def test_inner_savepoint_rolled_back_with_outer(self):
|
||||
with transaction.atomic():
|
||||
try:
|
||||
with transaction.atomic():
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
raise ForcedError()
|
||||
except ForcedError:
|
||||
pass
|
||||
self.do(2)
|
||||
|
||||
self.assertDone([2])
|
||||
|
||||
def test_no_savepoints_atomic_merged_with_outer(self):
|
||||
with transaction.atomic():
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
try:
|
||||
with transaction.atomic(savepoint=False):
|
||||
raise ForcedError()
|
||||
except ForcedError:
|
||||
pass
|
||||
|
||||
self.assertDone([])
|
||||
|
||||
def test_inner_savepoint_does_not_affect_outer(self):
|
||||
with transaction.atomic():
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
try:
|
||||
with transaction.atomic():
|
||||
raise ForcedError()
|
||||
except ForcedError:
|
||||
pass
|
||||
|
||||
self.assertDone([1])
|
||||
|
||||
def test_runs_hooks_in_order_registered(self):
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
with transaction.atomic():
|
||||
self.do(2)
|
||||
self.do(3)
|
||||
|
||||
self.assertDone([1, 2, 3])
|
||||
|
||||
def test_hooks_cleared_after_successful_commit(self):
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
with transaction.atomic():
|
||||
self.do(2)
|
||||
|
||||
self.assertDone([1, 2]) # not [1, 1, 2]
|
||||
|
||||
def test_hooks_cleared_after_rollback(self):
|
||||
try:
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
raise ForcedError()
|
||||
except ForcedError:
|
||||
pass
|
||||
|
||||
with transaction.atomic():
|
||||
self.do(2)
|
||||
|
||||
self.assertDone([2])
|
||||
|
||||
@skipUnlessDBFeature('test_db_allows_multiple_connections')
|
||||
def test_hooks_cleared_on_reconnect(self):
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
connection.close()
|
||||
|
||||
connection.connect()
|
||||
|
||||
with transaction.atomic():
|
||||
self.do(2)
|
||||
|
||||
self.assertDone([2])
|
||||
|
||||
def test_error_in_hook_doesnt_prevent_clearing_hooks(self):
|
||||
try:
|
||||
with transaction.atomic():
|
||||
transaction.on_commit(lambda: self.notify('error'))
|
||||
except ForcedError:
|
||||
pass
|
||||
|
||||
with transaction.atomic():
|
||||
self.do(1)
|
||||
|
||||
self.assertDone([1])
|
||||
|
||||
def test_db_query_in_hook(self):
|
||||
with transaction.atomic():
|
||||
Thing.objects.create(num=1)
|
||||
transaction.on_commit(
|
||||
lambda: [self.notify(t.num) for t in Thing.objects.all()]
|
||||
)
|
||||
|
||||
self.assertDone([1])
|
||||
|
||||
def test_transaction_in_hook(self):
|
||||
def on_commit():
|
||||
with transaction.atomic():
|
||||
t = Thing.objects.create(num=1)
|
||||
self.notify(t.num)
|
||||
|
||||
with transaction.atomic():
|
||||
transaction.on_commit(on_commit)
|
||||
|
||||
self.assertDone([1])
|
||||
|
||||
def test_raises_exception_non_autocommit_mode(self):
|
||||
def should_never_be_called():
|
||||
raise AssertionError('this function should never be called')
|
||||
|
||||
try:
|
||||
connection.set_autocommit(False)
|
||||
with self.assertRaises(transaction.TransactionManagementError):
|
||||
transaction.on_commit(should_never_be_called)
|
||||
finally:
|
||||
connection.set_autocommit(True)
|
Loading…
Reference in New Issue