diff --git a/django/core/handlers/asgi.py b/django/core/handlers/asgi.py index b5372a1d493..f0125e7321e 100644 --- a/django/core/handlers/asgi.py +++ b/django/core/handlers/asgi.py @@ -19,6 +19,7 @@ from django.http import ( parse_cookie, ) from django.urls import set_script_prefix +from django.utils.asyncio import aclosing from django.utils.functional import cached_property logger = logging.getLogger("django.request") @@ -263,19 +264,22 @@ class ASGIHandler(base.BaseHandler): ) # Streaming responses need to be pinned to their iterator. if response.streaming: - # Access `__iter__` and not `streaming_content` directly in case - # it has been overridden in a subclass. - for part in response: - for chunk, _ in self.chunk_bytes(part): - await send( - { - "type": "http.response.body", - "body": chunk, - # Ignore "more" as there may be more parts; instead, - # use an empty final closing message with False. - "more_body": True, - } - ) + # - Consume via `__aiter__` and not `streaming_content` directly, to + # allow mapping of a sync iterator. + # - Use aclosing() when consuming aiter. + # See https://github.com/python/cpython/commit/6e8dcda + async with aclosing(response.__aiter__()) as content: + async for part in content: + for chunk, _ in self.chunk_bytes(part): + await send( + { + "type": "http.response.body", + "body": chunk, + # Ignore "more" as there may be more parts; instead, + # use an empty final closing message with False. + "more_body": True, + } + ) # Final closing message. await send({"type": "http.response.body"}) # Other responses just need chunking. diff --git a/django/http/response.py b/django/http/response.py index 3c281f3dd02..465a8553dc6 100644 --- a/django/http/response.py +++ b/django/http/response.py @@ -6,10 +6,13 @@ import os import re import sys import time +import warnings from email.header import Header from http.client import responses from urllib.parse import urlparse +from asgiref.sync import async_to_sync, sync_to_async + from django.conf import settings from django.core import signals, signing from django.core.exceptions import DisallowedRedirect @@ -476,7 +479,18 @@ class StreamingHttpResponse(HttpResponseBase): @property def streaming_content(self): - return map(self.make_bytes, self._iterator) + if self.is_async: + # pull to lexical scope to capture fixed reference in case + # streaming_content is set again later. + _iterator = self._iterator + + async def awrapper(): + async for part in _iterator: + yield self.make_bytes(part) + + return awrapper() + else: + return map(self.make_bytes, self._iterator) @streaming_content.setter def streaming_content(self, value): @@ -484,12 +498,48 @@ class StreamingHttpResponse(HttpResponseBase): def _set_streaming_content(self, value): # Ensure we can never iterate on "value" more than once. - self._iterator = iter(value) + try: + self._iterator = iter(value) + self.is_async = False + except TypeError: + self._iterator = value.__aiter__() + self.is_async = True if hasattr(value, "close"): self._resource_closers.append(value.close) def __iter__(self): - return self.streaming_content + try: + return iter(self.streaming_content) + except TypeError: + warnings.warn( + "StreamingHttpResponse must consume asynchronous iterators in order to " + "serve them synchronously. Use a synchronous iterator instead.", + Warning, + ) + + # async iterator. Consume in async_to_sync and map back. + async def to_list(_iterator): + as_list = [] + async for chunk in _iterator: + as_list.append(chunk) + return as_list + + return map(self.make_bytes, iter(async_to_sync(to_list)(self._iterator))) + + async def __aiter__(self): + try: + async for part in self.streaming_content: + yield part + except TypeError: + warnings.warn( + "StreamingHttpResponse must consume synchronous iterators in order to " + "serve them asynchronously. Use an asynchronous iterator instead.", + Warning, + ) + # sync iterator. Consume via sync_to_async and yield via async + # generator. + for part in await sync_to_async(list)(self.streaming_content): + yield part def getvalue(self): return b"".join(self.streaming_content) diff --git a/django/middleware/gzip.py b/django/middleware/gzip.py index d91246c007a..45be6ccb437 100644 --- a/django/middleware/gzip.py +++ b/django/middleware/gzip.py @@ -31,12 +31,26 @@ class GZipMiddleware(MiddlewareMixin): return response if response.streaming: + if response.is_async: + # pull to lexical scope to capture fixed reference in case + # streaming_content is set again later. + orignal_iterator = response.streaming_content + + async def gzip_wrapper(): + async for chunk in orignal_iterator: + yield compress_string( + chunk, + max_random_bytes=self.max_random_bytes, + ) + + response.streaming_content = gzip_wrapper() + else: + response.streaming_content = compress_sequence( + response.streaming_content, + max_random_bytes=self.max_random_bytes, + ) # Delete the `Content-Length` header for streaming content, because # we won't know the compressed size until we stream it. - response.streaming_content = compress_sequence( - response.streaming_content, - max_random_bytes=self.max_random_bytes, - ) del response.headers["Content-Length"] else: # Return the compressed content only if it's actually shorter. diff --git a/django/utils/asyncio.py b/django/utils/asyncio.py index 1e79f90c2c1..eea2df48e27 100644 --- a/django/utils/asyncio.py +++ b/django/utils/asyncio.py @@ -37,3 +37,28 @@ def async_unsafe(message): return decorator(func) else: return decorator + + +try: + from contextlib import aclosing +except ImportError: + # TODO: Remove when dropping support for PY39. + from contextlib import AbstractAsyncContextManager + + # Backport of contextlib.aclosing() from Python 3.10. Copyright (C) Python + # Software Foundation (see LICENSE.python). + class aclosing(AbstractAsyncContextManager): + """ + Async context manager for safely finalizing an asynchronously + cleaned-up resource such as an async generator, calling its + ``aclose()`` method. + """ + + def __init__(self, thing): + self.thing = thing + + async def __aenter__(self): + return self.thing + + async def __aexit__(self, *exc_info): + await self.thing.aclose() diff --git a/docs/ref/request-response.txt b/docs/ref/request-response.txt index 34a31c49362..ebcd9ee523d 100644 --- a/docs/ref/request-response.txt +++ b/docs/ref/request-response.txt @@ -1116,43 +1116,76 @@ parameter to the constructor method:: .. class:: StreamingHttpResponse The :class:`StreamingHttpResponse` class is used to stream a response from -Django to the browser. You might want to do this if generating the response -takes too long or uses too much memory. For instance, it's useful for -:ref:`generating large CSV files `. +Django to the browser. -.. admonition:: Performance considerations +.. admonition:: Advanced usage - Django is designed for short-lived requests. Streaming responses will tie - a worker process for the entire duration of the response. This may result - in poor performance. + :class:`StreamingHttpResponse` is somewhat advanced, in that it is + important to know whether you'll be serving your application synchronously + under WSGI or asynchronously under ASGI, and adjust your usage + appropriately. - Generally speaking, you should perform expensive tasks outside of the - request-response cycle, rather than resorting to a streamed response. + Please read these notes with care. + +An example usage of :class:`StreamingHttpResponse` under WSGI is streaming +content when generating the response would take too long or uses too much +memory. For instance, it's useful for :ref:`generating large CSV files +`. + +There are performance considerations when doing this, though. Django, under +WSGI, is designed for short-lived requests. Streaming responses will tie a +worker process for the entire duration of the response. This may result in poor +performance. + +Generally speaking, you would perform expensive tasks outside of the +request-response cycle, rather than resorting to a streamed response. + +When serving under ASGI, however, a :class:`StreamingHttpResponse` need not +stop other requests from being served whilst waiting for I/O. This opens up +the possibility of long-lived requests for streaming content and implementing +patterns such as long-polling, and server-sent events. + +Even under ASGI note, :class:`StreamingHttpResponse` should only be used in +situations where it is absolutely required that the whole content isn't +iterated before transferring the data to the client. Because the content can't +be accessed, many middleware can't function normally. For example the ``ETag`` +and ``Content-Length`` headers can't be generated for streaming responses. The :class:`StreamingHttpResponse` is not a subclass of :class:`HttpResponse`, because it features a slightly different API. However, it is almost identical, with the following notable differences: -* It should be given an iterator that yields bytestrings as content. +* It should be given an iterator that yields bytestrings as content. When + serving under WSGI, this should be a sync iterator. When serving under ASGI, + this is should an async iterator. * You cannot access its content, except by iterating the response object - itself. This should only occur when the response is returned to the client. + itself. This should only occur when the response is returned to the client: + you should not iterate the response yourself. + + Under WSGI the response will be iterated synchronously. Under ASGI the + response will be iterated asynchronously. (This is why the iterator type must + match the protocol you're using.) + + To avoid a crash, an incorrect iterator type will be mapped to the correct + type during iteration, and a warning will be raised, but in order to do this + the iterator must be fully-consumed, which defeats the purpose of using a + :class:`StreamingHttpResponse` at all. * It has no ``content`` attribute. Instead, it has a - :attr:`~StreamingHttpResponse.streaming_content` attribute. + :attr:`~StreamingHttpResponse.streaming_content` attribute. This can be used + in middleware to wrap the response iterable, but should not be consumed. * You cannot use the file-like object ``tell()`` or ``write()`` methods. Doing so will raise an exception. -:class:`StreamingHttpResponse` should only be used in situations where it is -absolutely required that the whole content isn't iterated before transferring -the data to the client. Because the content can't be accessed, many -middleware can't function normally. For example the ``ETag`` and -``Content-Length`` headers can't be generated for streaming responses. - The :class:`HttpResponseBase` base class is common between :class:`HttpResponse` and :class:`StreamingHttpResponse`. +.. versionchanged:: 4.2 + + Support for asynchronous iteration was added. + Attributes ---------- @@ -1181,6 +1214,16 @@ Attributes This is always ``True``. +.. attribute:: StreamingHttpResponse.is_async + + .. versionadded:: 4.2 + + Boolean indicating whether :attr:`StreamingHttpResponse.streaming_content` + is an asynchronous iterator or not. + + This is useful for middleware needing to wrap + :attr:`StreamingHttpResponse.streaming_content`. + ``FileResponse`` objects ======================== @@ -1213,6 +1256,15 @@ a file open in binary mode like so:: The file will be closed automatically, so don't open it with a context manager. +.. admonition:: Use under ASGI + + Python's file API is synchronous. This means that the file must be fully + consumed in order to be served under ASGI. + + In order to stream a file asynchronously you need to use a third-party + package that provides an asynchronous file API, such as `aiofiles + `_. + Methods ------- diff --git a/docs/releases/4.2.txt b/docs/releases/4.2.txt index 7979e2359b1..9710e889ca5 100644 --- a/docs/releases/4.2.txt +++ b/docs/releases/4.2.txt @@ -286,7 +286,8 @@ Models Requests and Responses ~~~~~~~~~~~~~~~~~~~~~~ -* ... +* :class:`~django.http.StreamingHttpResponse` now supports async iterators + when Django is served via ASGI. Security ~~~~~~~~ diff --git a/docs/topics/http/middleware.txt b/docs/topics/http/middleware.txt index e1a3e95ebc6..f0db49abe54 100644 --- a/docs/topics/http/middleware.txt +++ b/docs/topics/http/middleware.txt @@ -267,6 +267,16 @@ must test for streaming responses and adjust their behavior accordingly:: for chunk in content: yield alter_content(chunk) +:class:`~django.http.StreamingHttpResponse` allows both synchronous and +asynchronous iterators. The wrapping function must match. Check +:attr:`StreamingHttpResponse.is_async +` if your middleware needs to +support both types of iterator. + +.. versionchanged:: 4.2 + + Support for streaming responses with asynchronous iterators was added. + Exception handling ================== diff --git a/tests/asgi/tests.py b/tests/asgi/tests.py index 4e51c2d9fec..61d040b45b5 100644 --- a/tests/asgi/tests.py +++ b/tests/asgi/tests.py @@ -12,6 +12,7 @@ from django.db import close_old_connections from django.test import ( AsyncRequestFactory, SimpleTestCase, + ignore_warnings, modify_settings, override_settings, ) @@ -58,6 +59,13 @@ class ASGITest(SimpleTestCase): # Allow response.close() to finish. await communicator.wait() + # Python's file API is not async compatible. A third-party library such + # as https://github.com/Tinche/aiofiles allows passing the file to + # FileResponse as an async interator. With a sync iterator + # StreamingHTTPResponse triggers a warning when iterating the file. + # assertWarnsMessage is not async compatible, so ignore_warnings for the + # test. + @ignore_warnings(module="django.http.response") async def test_file_response(self): """ Makes sure that FileResponse works over ASGI. @@ -91,6 +99,8 @@ class ASGITest(SimpleTestCase): self.assertEqual(value, b"text/plain") else: raise + + # Warning ignored here. response_body = await communicator.receive_output() self.assertEqual(response_body["type"], "http.response.body") self.assertEqual(response_body["body"], test_file_contents) @@ -106,6 +116,7 @@ class ASGITest(SimpleTestCase): "django.contrib.staticfiles.finders.FileSystemFinder", ], ) + @ignore_warnings(module="django.http.response") async def test_static_file_response(self): application = ASGIStaticFilesHandler(get_asgi_application()) # Construct HTTP request. diff --git a/tests/httpwrappers/tests.py b/tests/httpwrappers/tests.py index e1920e2edae..fa2c8fd5d26 100644 --- a/tests/httpwrappers/tests.py +++ b/tests/httpwrappers/tests.py @@ -720,6 +720,42 @@ class StreamingHttpResponseTests(SimpleTestCase): '', ) + async def test_async_streaming_response(self): + async def async_iter(): + yield b"hello" + yield b"world" + + r = StreamingHttpResponse(async_iter()) + + chunks = [] + async for chunk in r: + chunks.append(chunk) + self.assertEqual(chunks, [b"hello", b"world"]) + + def test_async_streaming_response_warning(self): + async def async_iter(): + yield b"hello" + yield b"world" + + r = StreamingHttpResponse(async_iter()) + + msg = ( + "StreamingHttpResponse must consume asynchronous iterators in order to " + "serve them synchronously. Use a synchronous iterator instead." + ) + with self.assertWarnsMessage(Warning, msg): + self.assertEqual(list(r), [b"hello", b"world"]) + + async def test_sync_streaming_response_warning(self): + r = StreamingHttpResponse(iter(["hello", "world"])) + + msg = ( + "StreamingHttpResponse must consume synchronous iterators in order to " + "serve them asynchronously. Use an asynchronous iterator instead." + ) + with self.assertWarnsMessage(Warning, msg): + self.assertEqual(b"hello", await r.__aiter__().__anext__()) + class FileCloseTests(SimpleTestCase): def setUp(self): diff --git a/tests/middleware/tests.py b/tests/middleware/tests.py index 1b8efe1a3e6..e29d32ad749 100644 --- a/tests/middleware/tests.py +++ b/tests/middleware/tests.py @@ -899,6 +899,28 @@ class GZipMiddlewareTest(SimpleTestCase): self.assertEqual(r.get("Content-Encoding"), "gzip") self.assertFalse(r.has_header("Content-Length")) + async def test_compress_async_streaming_response(self): + """ + Compression is performed on responses with async streaming content. + """ + + async def get_stream_response(request): + async def iterator(): + for chunk in self.sequence: + yield chunk + + resp = StreamingHttpResponse(iterator()) + resp["Content-Type"] = "text/html; charset=UTF-8" + return resp + + r = await GZipMiddleware(get_stream_response)(self.req) + self.assertEqual( + self.decompress(b"".join([chunk async for chunk in r])), + b"".join(self.sequence), + ) + self.assertEqual(r.get("Content-Encoding"), "gzip") + self.assertFalse(r.has_header("Content-Length")) + def test_compress_streaming_response_unicode(self): """ Compression is performed on responses with streaming Unicode content.