diff --git a/docs/internals/contributing/writing-code/unit-tests.txt b/docs/internals/contributing/writing-code/unit-tests.txt index 9bba72c451..087f64205b 100644 --- a/docs/internals/contributing/writing-code/unit-tests.txt +++ b/docs/internals/contributing/writing-code/unit-tests.txt @@ -273,6 +273,7 @@ Running all the tests If you want to run the full suite of tests, you'll need to install a number of dependencies: +* aiosmtpd_ * argon2-cffi_ 19.1.0+ * asgiref_ 3.3.2+ (required) * bcrypt_ @@ -322,6 +323,7 @@ associated tests will be skipped. To run some of the autoreload tests, you'll need to install the Watchman_ service. +.. _aiosmtpd: https://pypi.org/project/aiosmtpd/ .. _argon2-cffi: https://pypi.org/project/argon2-cffi/ .. _asgiref: https://pypi.org/project/asgiref/ .. _bcrypt: https://pypi.org/project/bcrypt/ diff --git a/tests/mail/tests.py b/tests/mail/tests.py index b9b3e452d5..9584b450b3 100644 --- a/tests/mail/tests.py +++ b/tests/mail/tests.py @@ -1,11 +1,9 @@ -import asyncore import mimetypes import os import shutil -import smtpd +import socket import sys import tempfile -import threading from email import charset, message_from_binary_file, message_from_bytes from email.header import Header from email.mime.text import MIMEText @@ -14,7 +12,7 @@ from io import StringIO from pathlib import Path from smtplib import SMTP, SMTPException from ssl import SSLError -from unittest import mock +from unittest import mock, skipUnless from django.core import mail from django.core.mail import ( @@ -27,6 +25,12 @@ from django.test import SimpleTestCase, override_settings from django.test.utils import requires_tz_support from django.utils.translation import gettext_lazy +try: + from aiosmtpd.controller import Controller + HAS_AIOSMTPD = True +except ImportError: + HAS_AIOSMTPD = False + class HeadersCheckMixin: @@ -1336,109 +1340,78 @@ class ConsoleBackendTests(BaseEmailBackendTests, SimpleTestCase): self.assertIn(b'\nDate: ', message) -class FakeSMTPChannel(smtpd.SMTPChannel): - - def collect_incoming_data(self, data): - try: - smtpd.SMTPChannel.collect_incoming_data(self, data) - except UnicodeDecodeError: - # Ignore decode error in SSL/TLS connection tests as the test only - # cares whether the connection attempt was made. - pass - - -class FakeSMTPServer(smtpd.SMTPServer, threading.Thread): - """ - Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from: - http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup - """ - channel_class = FakeSMTPChannel - +class SMTPHandler: def __init__(self, *args, **kwargs): - threading.Thread.__init__(self) - smtpd.SMTPServer.__init__(self, *args, decode_data=True, **kwargs) - self._sink = [] - self.active = False - self.active_lock = threading.Lock() - self.sink_lock = threading.Lock() + self.mailbox = [] - def process_message(self, peer, mailfrom, rcpttos, data): - data = data.encode() - m = message_from_bytes(data) - maddr = parseaddr(m.get('from'))[1] + async def handle_DATA(self, server, session, envelope): + data = envelope.content + mail_from = envelope.mail_from - if mailfrom != maddr: - # According to the spec, mailfrom does not necessarily match the + message = message_from_bytes(data.rstrip()) + message_addr = parseaddr(message.get('from'))[1] + if mail_from != message_addr: + # According to the spec, mail_from does not necessarily match the # From header - this is the case where the local part isn't # encoded, so try to correct that. - lp, domain = mailfrom.split('@', 1) + lp, domain = mail_from.split('@', 1) lp = Header(lp, 'utf-8').encode() - mailfrom = '@'.join([lp, domain]) + mail_from = '@'.join([lp, domain]) - if mailfrom != maddr: - return "553 '%s' != '%s'" % (mailfrom, maddr) - with self.sink_lock: - self._sink.append(m) + if mail_from != message_addr: + return f"553 '{mail_from}' != '{message_addr}'" + self.mailbox.append(message) + return '250 OK' - def get_sink(self): - with self.sink_lock: - return self._sink[:] - - def flush_sink(self): - with self.sink_lock: - self._sink[:] = [] - - def start(self): - assert not self.active - self.__flag = threading.Event() - threading.Thread.start(self) - self.__flag.wait() - - def run(self): - self.active = True - self.__flag.set() - while self.active and asyncore.socket_map: - with self.active_lock: - asyncore.loop(timeout=0.1, count=1) - asyncore.close_all() - - def stop(self): - if self.active: - self.active = False - self.join() + def flush_mailbox(self): + self.mailbox[:] = [] +@skipUnless(HAS_AIOSMTPD, 'No aiosmtpd library detected.') class SMTPBackendTestsBase(SimpleTestCase): @classmethod def setUpClass(cls): super().setUpClass() - cls.server = FakeSMTPServer(('127.0.0.1', 0), None) + # Find a free port. + with socket.socket() as s: + s.bind(('127.0.0.1', 0)) + port = s.getsockname()[1] + cls.smtp_handler = SMTPHandler() + cls.smtp_controller = Controller( + cls.smtp_handler, hostname='127.0.0.1', port=port, + ) cls._settings_override = override_settings( - EMAIL_HOST="127.0.0.1", - EMAIL_PORT=cls.server.socket.getsockname()[1]) + EMAIL_HOST=cls.smtp_controller.hostname, + EMAIL_PORT=cls.smtp_controller.port, + ) cls._settings_override.enable() cls.addClassCleanup(cls._settings_override.disable) - cls.server.start() - cls.addClassCleanup(cls.server.stop) + cls.smtp_controller.start() + cls.addClassCleanup(cls.stop_smtp) + + @classmethod + def stop_smtp(cls): + cls.smtp_controller.stop() +@skipUnless(HAS_AIOSMTPD, 'No aiosmtpd library detected.') class SMTPBackendTests(BaseEmailBackendTests, SMTPBackendTestsBase): email_backend = 'django.core.mail.backends.smtp.EmailBackend' def setUp(self): super().setUp() - self.server.flush_sink() + self.smtp_handler.flush_mailbox() def tearDown(self): - self.server.flush_sink() + self.smtp_handler.flush_mailbox() super().tearDown() def flush_mailbox(self): - self.server.flush_sink() + self.smtp_handler.flush_mailbox() def get_mailbox_content(self): - return self.server.get_sink() + return self.smtp_handler.mailbox @override_settings( EMAIL_HOST_USER="not empty username", @@ -1657,17 +1630,18 @@ class SMTPBackendTests(BaseEmailBackendTests, SMTPBackendTestsBase): self.assertEqual(sent, 0) +@skipUnless(HAS_AIOSMTPD, 'No aiosmtpd library detected.') class SMTPBackendStoppedServerTests(SMTPBackendTestsBase): - """ - These tests require a separate class, because the FakeSMTPServer is shut - down in setUpClass(), and it cannot be restarted ("RuntimeError: threads - can only be started once"). - """ @classmethod def setUpClass(cls): super().setUpClass() cls.backend = smtp.EmailBackend(username='', password='') - cls.server.stop() + cls.smtp_controller.stop() + + @classmethod + def stop_smtp(cls): + # SMTP controller is stopped in setUpClass(). + pass def test_server_stopped(self): """ diff --git a/tests/requirements/py3.txt b/tests/requirements/py3.txt index 893e47a914..3b73b17b24 100644 --- a/tests/requirements/py3.txt +++ b/tests/requirements/py3.txt @@ -1,3 +1,4 @@ +aiosmtpd asgiref >= 3.3.2 argon2-cffi >= 16.1.0 backports.zoneinfo; python_version < '3.9'