import asyncio import sys import threading from pathlib import Path from asgiref.testing import ApplicationCommunicator from django.contrib.staticfiles.handlers import ASGIStaticFilesHandler from django.core.asgi import get_asgi_application from django.core.signals import request_finished, request_started from django.db import close_old_connections from django.test import ( AsyncRequestFactory, SimpleTestCase, modify_settings, override_settings, ) from django.utils.http import http_date from .urls import sync_waiter, test_filename TEST_STATIC_ROOT = Path(__file__).parent / "project" / "static" @override_settings(ROOT_URLCONF="asgi.urls") class ASGITest(SimpleTestCase): async_request_factory = AsyncRequestFactory() def setUp(self): request_started.disconnect(close_old_connections) def tearDown(self): request_started.connect(close_old_connections) async def test_get_asgi_application(self): """ get_asgi_application() returns a functioning ASGI callable. """ application = get_asgi_application() # Construct HTTP request. scope = self.async_request_factory._base_scope(path="/") communicator = ApplicationCommunicator(application, scope) await communicator.send_input({"type": "http.request"}) # Read the response. response_start = await communicator.receive_output() self.assertEqual(response_start["type"], "http.response.start") self.assertEqual(response_start["status"], 200) self.assertEqual( set(response_start["headers"]), { (b"Content-Length", b"12"), (b"Content-Type", b"text/html; charset=utf-8"), }, ) response_body = await communicator.receive_output() self.assertEqual(response_body["type"], "http.response.body") self.assertEqual(response_body["body"], b"Hello World!") async def test_file_response(self): """ Makes sure that FileResponse works over ASGI. """ application = get_asgi_application() # Construct HTTP request. scope = self.async_request_factory._base_scope(path="/file/") communicator = ApplicationCommunicator(application, scope) await communicator.send_input({"type": "http.request"}) # Get the file content. with open(test_filename, "rb") as test_file: test_file_contents = test_file.read() # Read the response. response_start = await communicator.receive_output() self.assertEqual(response_start["type"], "http.response.start") self.assertEqual(response_start["status"], 200) headers = response_start["headers"] self.assertEqual(len(headers), 3) expected_headers = { b"Content-Length": str(len(test_file_contents)).encode("ascii"), b"Content-Type": b"text/x-python", b"Content-Disposition": b'inline; filename="urls.py"', } for key, value in headers: try: self.assertEqual(value, expected_headers[key]) except AssertionError: # Windows registry may not be configured with correct # mimetypes. if sys.platform == "win32" and key == b"Content-Type": self.assertEqual(value, b"text/plain") else: raise response_body = await communicator.receive_output() self.assertEqual(response_body["type"], "http.response.body") self.assertEqual(response_body["body"], test_file_contents) # Allow response.close() to finish. await communicator.wait() @modify_settings(INSTALLED_APPS={"append": "django.contrib.staticfiles"}) @override_settings( STATIC_URL="static/", STATIC_ROOT=TEST_STATIC_ROOT, STATICFILES_DIRS=[TEST_STATIC_ROOT], STATICFILES_FINDERS=[ "django.contrib.staticfiles.finders.FileSystemFinder", ], ) async def test_static_file_response(self): application = ASGIStaticFilesHandler(get_asgi_application()) # Construct HTTP request. scope = self.async_request_factory._base_scope(path="/static/file.txt") communicator = ApplicationCommunicator(application, scope) await communicator.send_input({"type": "http.request"}) # Get the file content. file_path = TEST_STATIC_ROOT / "file.txt" with open(file_path, "rb") as test_file: test_file_contents = test_file.read() # Read the response. stat = file_path.stat() response_start = await communicator.receive_output() self.assertEqual(response_start["type"], "http.response.start") self.assertEqual(response_start["status"], 200) self.assertEqual( set(response_start["headers"]), { (b"Content-Length", str(len(test_file_contents)).encode("ascii")), (b"Content-Type", b"text/plain"), (b"Content-Disposition", b'inline; filename="file.txt"'), (b"Last-Modified", http_date(stat.st_mtime).encode("ascii")), }, ) response_body = await communicator.receive_output() self.assertEqual(response_body["type"], "http.response.body") self.assertEqual(response_body["body"], test_file_contents) # Allow response.close() to finish. await communicator.wait() async def test_headers(self): application = get_asgi_application() communicator = ApplicationCommunicator( application, self.async_request_factory._base_scope( path="/meta/", headers=[ [b"content-type", b"text/plain; charset=utf-8"], [b"content-length", b"77"], [b"referer", b"Scotland"], [b"referer", b"Wales"], ], ), ) await communicator.send_input({"type": "http.request"}) response_start = await communicator.receive_output() self.assertEqual(response_start["type"], "http.response.start") self.assertEqual(response_start["status"], 200) self.assertEqual( set(response_start["headers"]), { (b"Content-Length", b"19"), (b"Content-Type", b"text/plain; charset=utf-8"), }, ) response_body = await communicator.receive_output() self.assertEqual(response_body["type"], "http.response.body") self.assertEqual(response_body["body"], b"From Scotland,Wales") async def test_post_body(self): application = get_asgi_application() scope = self.async_request_factory._base_scope(method="POST", path="/post/") communicator = ApplicationCommunicator(application, scope) await communicator.send_input({"type": "http.request", "body": b"Echo!"}) response_start = await communicator.receive_output() self.assertEqual(response_start["type"], "http.response.start") self.assertEqual(response_start["status"], 200) response_body = await communicator.receive_output() self.assertEqual(response_body["type"], "http.response.body") self.assertEqual(response_body["body"], b"Echo!") async def test_get_query_string(self): application = get_asgi_application() for query_string in (b"name=Andrew", "name=Andrew"): with self.subTest(query_string=query_string): scope = self.async_request_factory._base_scope( path="/", query_string=query_string, ) communicator = ApplicationCommunicator(application, scope) await communicator.send_input({"type": "http.request"}) response_start = await communicator.receive_output() self.assertEqual(response_start["type"], "http.response.start") self.assertEqual(response_start["status"], 200) response_body = await communicator.receive_output() self.assertEqual(response_body["type"], "http.response.body") self.assertEqual(response_body["body"], b"Hello Andrew!") async def test_disconnect(self): application = get_asgi_application() scope = self.async_request_factory._base_scope(path="/") communicator = ApplicationCommunicator(application, scope) await communicator.send_input({"type": "http.disconnect"}) with self.assertRaises(asyncio.TimeoutError): await communicator.receive_output() async def test_wrong_connection_type(self): application = get_asgi_application() scope = self.async_request_factory._base_scope(path="/", type="other") communicator = ApplicationCommunicator(application, scope) await communicator.send_input({"type": "http.request"}) msg = "Django can only handle ASGI/HTTP connections, not other." with self.assertRaisesMessage(ValueError, msg): await communicator.receive_output() async def test_non_unicode_query_string(self): application = get_asgi_application() scope = self.async_request_factory._base_scope(path="/", query_string=b"\xff") communicator = ApplicationCommunicator(application, scope) await communicator.send_input({"type": "http.request"}) response_start = await communicator.receive_output() self.assertEqual(response_start["type"], "http.response.start") self.assertEqual(response_start["status"], 400) response_body = await communicator.receive_output() self.assertEqual(response_body["type"], "http.response.body") self.assertEqual(response_body["body"], b"") async def test_request_lifecycle_signals_dispatched_with_thread_sensitive(self): class SignalHandler: """Track threads handler is dispatched on.""" threads = [] def __call__(self, **kwargs): self.threads.append(threading.current_thread()) signal_handler = SignalHandler() request_started.connect(signal_handler) request_finished.connect(signal_handler) # Perform a basic request. application = get_asgi_application() scope = self.async_request_factory._base_scope(path="/") communicator = ApplicationCommunicator(application, scope) await communicator.send_input({"type": "http.request"}) response_start = await communicator.receive_output() self.assertEqual(response_start["type"], "http.response.start") self.assertEqual(response_start["status"], 200) response_body = await communicator.receive_output() self.assertEqual(response_body["type"], "http.response.body") self.assertEqual(response_body["body"], b"Hello World!") # Give response.close() time to finish. await communicator.wait() # AsyncToSync should have executed the signals in the same thread. request_started_thread, request_finished_thread = signal_handler.threads self.assertEqual(request_started_thread, request_finished_thread) request_started.disconnect(signal_handler) request_finished.disconnect(signal_handler) async def test_concurrent_async_uses_multiple_thread_pools(self): sync_waiter.active_threads.clear() # Send 2 requests concurrently application = get_asgi_application() scope = self.async_request_factory._base_scope(path="/wait/") communicators = [] for _ in range(2): communicators.append(ApplicationCommunicator(application, scope)) await communicators[-1].send_input({"type": "http.request"}) # Each request must complete with a status code of 200 # If requests aren't scheduled concurrently, the barrier in the # sync_wait view will time out, resulting in a 500 status code. for communicator in communicators: response_start = await communicator.receive_output() self.assertEqual(response_start["type"], "http.response.start") self.assertEqual(response_start["status"], 200) response_body = await communicator.receive_output() self.assertEqual(response_body["type"], "http.response.body") self.assertEqual(response_body["body"], b"Hello World!") # Give response.close() time to finish. await communicator.wait() # The requests should have scheduled on different threads. Note # active_threads is a set (a thread can only appear once), therefore # length is a sufficient check. self.assertEqual(len(sync_waiter.active_threads), 2) sync_waiter.active_threads.clear()