Fixed #32172 -- Adapted signals to allow async handlers.

co-authored-by: kozzztik <kozzztik@mail.ru>
co-authored-by: Carlton Gibson <carlton.gibson@noumenal.es>
This commit is contained in:
Jon Janzen 2020-11-07 13:19:20 +03:00 committed by Mariusz Felisiak
parent 9a07999aef
commit e83a88566a
9 changed files with 370 additions and 43 deletions

View File

@ -170,9 +170,7 @@ class ASGIHandler(base.BaseHandler):
return
# Request is complete and can be served.
set_script_prefix(self.get_script_prefix(scope))
await sync_to_async(signals.request_started.send, thread_sensitive=True)(
sender=self.__class__, scope=scope
)
await signals.request_started.asend(sender=self.__class__, scope=scope)
# Get the request and check for basic issues.
request, error_response = self.create_request(scope, body_file)
if request is None:

View File

@ -1,7 +1,10 @@
import asyncio
import logging
import threading
import weakref
from asgiref.sync import async_to_sync, iscoroutinefunction, sync_to_async
from django.utils.inspect import func_accepts_kwargs
logger = logging.getLogger("django.dispatch")
@ -52,7 +55,8 @@ class Signal:
receiver
A function or an instance method which is to receive signals.
Receivers must be hashable objects.
Receivers must be hashable objects. Receivers can be
asynchronous.
If weak is True, then receiver must be weak referenceable.
@ -94,6 +98,8 @@ class Signal:
else:
lookup_key = (_make_id(receiver), _make_id(sender))
is_async = iscoroutinefunction(receiver)
if weak:
ref = weakref.ref
receiver_object = receiver
@ -106,8 +112,8 @@ class Signal:
with self.lock:
self._clear_dead_receivers()
if not any(r_key == lookup_key for r_key, _ in self.receivers):
self.receivers.append((lookup_key, receiver))
if not any(r_key == lookup_key for r_key, _, _ in self.receivers):
self.receivers.append((lookup_key, receiver, is_async))
self.sender_receivers_cache.clear()
def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
@ -138,7 +144,7 @@ class Signal:
with self.lock:
self._clear_dead_receivers()
for index in range(len(self.receivers)):
(r_key, _) = self.receivers[index]
r_key, *_ = self.receivers[index]
if r_key == lookup_key:
disconnected = True
del self.receivers[index]
@ -147,7 +153,8 @@ class Signal:
return disconnected
def has_listeners(self, sender=None):
return bool(self._live_receivers(sender))
sync_receivers, async_receivers = self._live_receivers(sender)
return bool(sync_receivers) or bool(async_receivers)
def send(self, sender, **named):
"""
@ -157,6 +164,10 @@ class Signal:
terminating the dispatch loop. So it's possible that all receivers
won't be called if an error is raised.
If any receivers are asynchronous, they are called after all the
synchronous receivers via a single call to async_to_sync(). They are
also executed concurrently with asyncio.gather().
Arguments:
sender
@ -172,16 +183,97 @@ class Signal:
or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
):
return []
responses = []
sync_receivers, async_receivers = self._live_receivers(sender)
for receiver in sync_receivers:
response = receiver(signal=self, sender=sender, **named)
responses.append((receiver, response))
if async_receivers:
return [
(receiver, receiver(signal=self, sender=sender, **named))
for receiver in self._live_receivers(sender)
]
async def asend():
async_responses = await asyncio.gather(
*(
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
)
)
return zip(async_receivers, async_responses)
responses.extend(async_to_sync(asend)())
return responses
async def asend(self, sender, **named):
"""
Send signal from sender to all connected receivers in async mode.
All sync receivers will be wrapped by sync_to_async()
If any receiver raises an error, the error propagates back through
send, terminating the dispatch loop. So it's possible that all
receivers won't be called if an error is raised.
If any receivers are synchronous, they are grouped and called behind a
sync_to_async() adaption before executing any asynchronous receivers.
If any receivers are asynchronous, they are grouped and executed
concurrently with asyncio.gather().
Arguments:
sender
The sender of the signal. Either a specific object or None.
named
Named arguments which will be passed to receivers.
Return a list of tuple pairs [(receiver, response), ...].
"""
if (
not self.receivers
or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
):
return []
sync_receivers, async_receivers = self._live_receivers(sender)
if sync_receivers:
@sync_to_async
def sync_send():
responses = []
for receiver in sync_receivers:
response = receiver(signal=self, sender=sender, **named)
responses.append((receiver, response))
return responses
else:
sync_send = list
responses, async_responses = await asyncio.gather(
sync_send(),
asyncio.gather(
*(
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
)
),
)
responses.extend(zip(async_receivers, async_responses))
return responses
def _log_robust_failure(self, receiver, err):
logger.error(
"Error calling %s in Signal.send_robust() (%s)",
receiver.__qualname__,
err,
exc_info=err,
)
def send_robust(self, sender, **named):
"""
Send signal from sender to all connected receivers catching errors.
If any receivers are asynchronous, they are called after all the
synchronous receivers via a single call to async_to_sync(). They are
also executed concurrently with asyncio.gather().
Arguments:
sender
@ -206,19 +298,105 @@ class Signal:
# Call each receiver with whatever arguments it can accept.
# Return a list of tuple pairs [(receiver, response), ... ].
responses = []
for receiver in self._live_receivers(sender):
sync_receivers, async_receivers = self._live_receivers(sender)
for receiver in sync_receivers:
try:
response = receiver(signal=self, sender=sender, **named)
except Exception as err:
logger.error(
"Error calling %s in Signal.send_robust() (%s)",
receiver.__qualname__,
err,
exc_info=err,
)
self._log_robust_failure(receiver, err)
responses.append((receiver, err))
else:
responses.append((receiver, response))
if async_receivers:
async def asend_and_wrap_exception(receiver):
try:
response = await receiver(signal=self, sender=sender, **named)
except Exception as err:
self._log_robust_failure(receiver, err)
return err
return response
async def asend():
async_responses = await asyncio.gather(
*(
asend_and_wrap_exception(receiver)
for receiver in async_receivers
)
)
return zip(async_receivers, async_responses)
responses.extend(async_to_sync(asend)())
return responses
async def asend_robust(self, sender, **named):
"""
Send signal from sender to all connected receivers catching errors.
If any receivers are synchronous, they are grouped and called behind a
sync_to_async() adaption before executing any asynchronous receivers.
If any receivers are asynchronous, they are grouped and executed
concurrently with asyncio.gather.
Arguments:
sender
The sender of the signal. Can be any Python object (normally one
registered with a connect if you actually want something to
occur).
named
Named arguments which will be passed to receivers.
Return a list of tuple pairs [(receiver, response), ... ].
If any receiver raises an error (specifically any subclass of
Exception), return the error instance as the result for that receiver.
"""
if (
not self.receivers
or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
):
return []
# Call each receiver with whatever arguments it can accept.
# Return a list of tuple pairs [(receiver, response), ... ].
sync_receivers, async_receivers = self._live_receivers(sender)
if sync_receivers:
@sync_to_async
def sync_send():
responses = []
for receiver in sync_receivers:
try:
response = receiver(signal=self, sender=sender, **named)
except Exception as err:
self._log_robust_failure(receiver, err)
responses.append((receiver, err))
else:
responses.append((receiver, response))
return responses
else:
sync_send = list
async def asend_and_wrap_exception(receiver):
try:
response = await receiver(signal=self, sender=sender, **named)
except Exception as err:
self._log_robust_failure(receiver, err)
return err
return response
responses, async_responses = await asyncio.gather(
sync_send(),
asyncio.gather(
*(asend_and_wrap_exception(receiver) for receiver in async_receivers),
),
)
responses.extend(zip(async_receivers, async_responses))
return responses
def _clear_dead_receivers(self):
@ -244,31 +422,38 @@ class Signal:
# We could end up here with NO_RECEIVERS even if we do check this case in
# .send() prior to calling _live_receivers() due to concurrent .send() call.
if receivers is NO_RECEIVERS:
return []
return [], []
if receivers is None:
with self.lock:
self._clear_dead_receivers()
senderkey = _make_id(sender)
receivers = []
for (receiverkey, r_senderkey), receiver in self.receivers:
for (_receiverkey, r_senderkey), receiver, is_async in self.receivers:
if r_senderkey == NONE_ID or r_senderkey == senderkey:
receivers.append(receiver)
receivers.append((receiver, is_async))
if self.use_caching:
if not receivers:
self.sender_receivers_cache[sender] = NO_RECEIVERS
else:
# Note, we must cache the weakref versions.
self.sender_receivers_cache[sender] = receivers
non_weak_receivers = []
for receiver in receivers:
non_weak_sync_receivers = []
non_weak_async_receivers = []
for receiver, is_async in receivers:
if isinstance(receiver, weakref.ReferenceType):
# Dereference the weak reference.
receiver = receiver()
if receiver is not None:
non_weak_receivers.append(receiver)
if is_async:
non_weak_async_receivers.append(receiver)
else:
non_weak_sync_receivers.append(receiver)
else:
non_weak_receivers.append(receiver)
return non_weak_receivers
if is_async:
non_weak_async_receivers.append(receiver)
else:
non_weak_sync_receivers.append(receiver)
return non_weak_sync_receivers, non_weak_async_receivers
def _remove_receiver(self, receiver=None):
# Mark that the self.receivers list has dead weakrefs. If so, we will

View File

@ -219,9 +219,7 @@ class AsyncClientHandler(BaseHandler):
body_file = FakePayload("")
request_started.disconnect(close_old_connections)
await sync_to_async(request_started.send, thread_sensitive=False)(
sender=self.__class__, scope=scope
)
await request_started.asend(sender=self.__class__, scope=scope)
request_started.connect(close_old_connections)
# Wrap FakePayload body_file to allow large read() in test environment.
request = ASGIRequest(scope, LimitedStream(body_file, len(body_file)))

View File

@ -183,7 +183,7 @@ def complex_setting_changed(*, enter, setting, **kwargs):
# this stacklevel shows the line containing the override_settings call.
warnings.warn(
f"Overriding setting {setting} can lead to unexpected behavior.",
stacklevel=6,
stacklevel=5,
)

View File

@ -218,7 +218,9 @@ Serialization
Signals
~~~~~~~
* ...
* The new :meth:`.Signal.asend` and :meth:`.Signal.asend_robust` methods allow
asynchronous signal dispatch. Signal receivers may be synchronous or
asynchronous, and will be automatically adapted to the correct calling style.
Templates
~~~~~~~~~

View File

@ -108,6 +108,8 @@ synchronous function and call it using :func:`sync_to_async`.
Asynchronous model and related manager interfaces were added.
.. _async_performance:
Performance
-----------

View File

@ -96,6 +96,21 @@ This would be wrong -- in fact, Django will throw an error if you do so. That's
because at any point arguments could get added to the signal and your receiver
must be able to handle those new arguments.
Receivers may also be asynchronous functions, with the same signature but
declared using ``async def``::
async def my_callback(sender, **kwargs):
await asyncio.sleep(5)
print("Request finished!")
Signals can be sent either synchronously or asynchronously, and receivers will
automatically be adapted to the correct call-style. See :ref:`sending signals
<sending-signals>` for more information.
.. versionchanged:: 5.0
Support for asynchronous receivers was added.
.. _connecting-receiver-functions:
Connecting receiver functions
@ -248,18 +263,26 @@ For example::
This declares a ``pizza_done`` signal.
.. _sending-signals:
Sending signals
---------------
There are two ways to send signals in Django.
There are two ways to send signals synchronously in Django.
.. method:: Signal.send(sender, **kwargs)
.. method:: Signal.send_robust(sender, **kwargs)
To send a signal, call either :meth:`Signal.send` (all built-in signals use
this) or :meth:`Signal.send_robust`. You must provide the ``sender`` argument
(which is a class most of the time) and may provide as many other keyword
arguments as you like.
Signals may also be sent asynchronously.
.. method:: Signal.asend(sender, **kwargs)
.. method:: Signal.asend_robust(sender, **kwargs)
To send a signal, call either :meth:`Signal.send`, :meth:`Signal.send_robust`,
:meth:`await Signal.asend()<Signal.asend>`, or
:meth:`await Signal.asend_robust() <Signal.asend_robust>`. You must provide the
``sender`` argument (which is a class most of the time) and may provide as many
other keyword arguments as you like.
For example, here's how sending our ``pizza_done`` signal might look::
@ -270,9 +293,8 @@ For example, here's how sending our ``pizza_done`` signal might look::
pizza_done.send(sender=self.__class__, toppings=toppings, size=size)
...
Both ``send()`` and ``send_robust()`` return a list of tuple pairs
``[(receiver, response), ... ]``, representing the list of called receiver
functions and their response values.
All four methods return a list of tuple pairs ``[(receiver, response), ...]``,
representing the list of called receiver functions and their response values.
``send()`` differs from ``send_robust()`` in how exceptions raised by receiver
functions are handled. ``send()`` does *not* catch any exceptions raised by
@ -286,6 +308,33 @@ error instance is returned in the tuple pair for the receiver that raised the er
The tracebacks are present on the ``__traceback__`` attribute of the errors
returned when calling ``send_robust()``.
``asend()`` is similar as ``send()``, but it is coroutine that must be
awaited::
async def asend_pizza(self, toppings, size):
await pizza_done.asend(sender=self.__class__, toppings=toppings, size=size)
...
Whether synchronous or asynchronous, receivers will be correctly adapted to
whether ``send()`` or ``asend()`` is used. Synchronous receivers will be
called using :func:`~.sync_to_async` when invoked via ``asend()``. Asynchronous
receivers will be called using :func:`~.async_to_sync` when invoked via
``sync()``. Similar to the :ref:`case for middleware <async_performance>`,
there is a small performance cost to adapting receivers in this way. Note that
in order to reduce the number of sync/async calling-style switches within a
``send()`` or ``asend()`` call, the receivers are grouped by whether or not
they are async before being called. This means that an asynchronous receiver
registered before a synchronous receiver may be executed after the synchronous
receiver. In addition, async receivers are executed concurrently using
``asyncio.gather()``.
All built-in signals, except those in the async request-response cycle, are
dispatched using :meth:`Signal.send`.
.. versionchanged:: 5.0
Support for asynchronous signals was added.
Disconnecting signals
=====================

View File

@ -31,14 +31,14 @@ class PostgresConfigTests(TestCase):
from django.contrib.postgres.signals import register_type_handlers
self.assertNotIn(
register_type_handlers, connection_created._live_receivers(None)
register_type_handlers, connection_created._live_receivers(None)[0]
)
with modify_settings(INSTALLED_APPS={"append": "django.contrib.postgres"}):
self.assertIn(
register_type_handlers, connection_created._live_receivers(None)
register_type_handlers, connection_created._live_receivers(None)[0]
)
self.assertNotIn(
register_type_handlers, connection_created._live_receivers(None)
register_type_handlers, connection_created._live_receivers(None)[0]
)
def test_register_serializer_for_migrations(self):

View File

@ -1,5 +1,7 @@
import asyncio
from unittest import mock
from django import dispatch
from django.apps.registry import Apps
from django.db import models
from django.db.models import signals
@ -530,3 +532,94 @@ class LazyModelRefTests(BaseSignalSetup, SimpleTestCase):
apps2 = Apps()
signals.post_init.connect(self.receiver, sender=Book, apps=apps2)
self.assertEqual(list(apps2._pending_operations), [])
class SyncHandler:
param = 0
def __call__(self, **kwargs):
self.param += 1
return self.param
class AsyncHandler:
_is_coroutine = asyncio.coroutines._is_coroutine
param = 0
async def __call__(self, **kwargs):
self.param += 1
return self.param
class AsyncReceiversTests(SimpleTestCase):
async def test_asend(self):
sync_handler = SyncHandler()
async_handler = AsyncHandler()
signal = dispatch.Signal()
signal.connect(sync_handler)
signal.connect(async_handler)
result = await signal.asend(self.__class__)
self.assertEqual(result, [(sync_handler, 1), (async_handler, 1)])
def test_send(self):
sync_handler = SyncHandler()
async_handler = AsyncHandler()
signal = dispatch.Signal()
signal.connect(sync_handler)
signal.connect(async_handler)
result = signal.send(self.__class__)
self.assertEqual(result, [(sync_handler, 1), (async_handler, 1)])
def test_send_robust(self):
class ReceiverException(Exception):
pass
receiver_exception = ReceiverException()
async def failing_async_handler(**kwargs):
raise receiver_exception
sync_handler = SyncHandler()
async_handler = AsyncHandler()
signal = dispatch.Signal()
signal.connect(failing_async_handler)
signal.connect(async_handler)
signal.connect(sync_handler)
result = signal.send_robust(self.__class__)
# The ordering here is different than the order that signals were
# connected in.
self.assertEqual(
result,
[
(sync_handler, 1),
(failing_async_handler, receiver_exception),
(async_handler, 1),
],
)
async def test_asend_robust(self):
class ReceiverException(Exception):
pass
receiver_exception = ReceiverException()
async def failing_async_handler(**kwargs):
raise receiver_exception
sync_handler = SyncHandler()
async_handler = AsyncHandler()
signal = dispatch.Signal()
signal.connect(failing_async_handler)
signal.connect(async_handler)
signal.connect(sync_handler)
result = await signal.asend_robust(self.__class__)
# The ordering here is different than the order that signals were
# connected in.
self.assertEqual(
result,
[
(sync_handler, 1),
(failing_async_handler, receiver_exception),
(async_handler, 1),
],
)