From 11997218eeed02c8be7a81b87db321464839a2cf Mon Sep 17 00:00:00 2001 From: Russell Keith-Magee Date: Sat, 15 Jan 2011 05:55:24 +0000 Subject: [PATCH] =?UTF-8?q?Fixed=20#15042=20--=20Ensured=20that=20email=20?= =?UTF-8?q?addresses=20without=20a=20domain=20can=20still=20be=20mail=20re?= =?UTF-8?q?cipients.=20Patch=20also=20improves=20the=20IDN=20handling=20in?= =?UTF-8?q?troduced=20by=20r15006,=20and=20refactors=20the=20test=20suite?= =?UTF-8?q?=20to=20ensure=20even=20feature=20coverage.=20Thanks=20to=20net?= =?UTF-8?q?147=20for=20the=20report,=20and=20to=20=C5=81ukasz=20Rekucki=20?= =?UTF-8?q?for=20the=20awesome=20patch.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: http://code.djangoproject.com/svn/django/trunk@15211 bcc190cf-cafb-0310-a4f2-bffc1f526a37 --- django/core/mail/backends/smtp.py | 13 +- django/core/mail/message.py | 65 ++- tests/regressiontests/mail/tests.py | 632 ++++++++++++++++++---------- 3 files changed, 468 insertions(+), 242 deletions(-) diff --git a/django/core/mail/backends/smtp.py b/django/core/mail/backends/smtp.py index 3b2962f7d8a..bb184ab3125 100644 --- a/django/core/mail/backends/smtp.py +++ b/django/core/mail/backends/smtp.py @@ -1,5 +1,4 @@ """SMTP email backend class.""" - import smtplib import socket import threading @@ -7,6 +6,8 @@ import threading from django.conf import settings from django.core.mail.backends.base import BaseEmailBackend from django.core.mail.utils import DNS_NAME +from django.core.mail.message import sanitize_address + class EmailBackend(BaseEmailBackend): """ @@ -91,17 +92,13 @@ class EmailBackend(BaseEmailBackend): self._lock.release() return num_sent - def _sanitize(self, email): - name, domain = email.split('@', 1) - email = '@'.join([name, domain.encode('idna')]) - return email - def _send(self, email_message): """A helper method that does the actual sending.""" if not email_message.recipients(): return False - from_email = self._sanitize(email_message.from_email) - recipients = map(self._sanitize, email_message.recipients()) + from_email = sanitize_address(email_message.from_email, email_message.encoding) + recipients = [sanitize_address(addr, email_message.encoding) + for addr in email_message.recipients()] try: self.connection.sendmail(from_email, recipients, email_message.message().as_string()) diff --git a/django/core/mail/message.py b/django/core/mail/message.py index 2311102fd08..96ff689fd43 100644 --- a/django/core/mail/message.py +++ b/django/core/mail/message.py @@ -12,6 +12,7 @@ from email.Utils import formatdate, getaddresses, formataddr from django.conf import settings from django.core.mail.utils import DNS_NAME from django.utils.encoding import smart_str, force_unicode +from email.Utils import parseaddr # Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from # some spam filters. @@ -54,6 +55,22 @@ def make_msgid(idstring=None): return msgid +# Header names that contain structured address data (RFC #5322) +ADDRESS_HEADERS = set([ + 'from', + 'sender', + 'reply-to', + 'to', + 'cc', + 'bcc', + 'resent-from', + 'resent-sender', + 'resent-to', + 'resent-cc', + 'resent-bcc', +]) + + def forbid_multi_line_headers(name, val, encoding): """Forbids multi-line headers, to prevent header injection.""" encoding = encoding or settings.DEFAULT_CHARSET @@ -63,43 +80,57 @@ def forbid_multi_line_headers(name, val, encoding): try: val = val.encode('ascii') except UnicodeEncodeError: - if name.lower() in ('to', 'from', 'cc'): - result = [] - for nm, addr in getaddresses((val,)): - nm = str(Header(nm.encode(encoding), encoding)) - try: - addr = addr.encode('ascii') - except UnicodeEncodeError: # IDN - addr = str(Header(addr.encode(encoding), encoding)) - result.append(formataddr((nm, addr))) - val = ', '.join(result) + if name.lower() in ADDRESS_HEADERS: + val = ', '.join(sanitize_address(addr, encoding) + for addr in getaddresses((val,))) else: - val = Header(val.encode(encoding), encoding) + val = str(Header(val, encoding)) else: if name.lower() == 'subject': val = Header(val) return name, val + +def sanitize_address(addr, encoding): + if isinstance(addr, basestring): + addr = parseaddr(force_unicode(addr)) + nm, addr = addr + nm = str(Header(nm, encoding)) + try: + addr = addr.encode('ascii') + except UnicodeEncodeError: # IDN + if u'@' in addr: + localpart, domain = addr.split(u'@', 1) + localpart = str(Header(localpart, encoding)) + domain = domain.encode('idna') + addr = '@'.join([localpart, domain]) + else: + addr = str(Header(addr, encoding)) + return formataddr((nm, addr)) + + class SafeMIMEText(MIMEText): - + def __init__(self, text, subtype, charset): self.encoding = charset MIMEText.__init__(self, text, subtype, charset) - - def __setitem__(self, name, val): + + def __setitem__(self, name, val): name, val = forbid_multi_line_headers(name, val, self.encoding) MIMEText.__setitem__(self, name, val) + class SafeMIMEMultipart(MIMEMultipart): - + def __init__(self, _subtype='mixed', boundary=None, _subparts=None, encoding=None, **_params): self.encoding = encoding MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params) - + def __setitem__(self, name, val): name, val = forbid_multi_line_headers(name, val, self.encoding) MIMEMultipart.__setitem__(self, name, val) + class EmailMessage(object): """ A container for email information. @@ -274,7 +305,7 @@ class EmailMultiAlternatives(EmailMessage): conversions. """ super(EmailMultiAlternatives, self).__init__(subject, body, from_email, to, bcc, connection, attachments, headers, cc) - self.alternatives=alternatives or [] + self.alternatives = alternatives or [] def attach_alternative(self, content, mimetype): """Attach an alternative content representation.""" diff --git a/tests/regressiontests/mail/tests.py b/tests/regressiontests/mail/tests.py index a6cd60e2ac1..8bdc562c818 100644 --- a/tests/regressiontests/mail/tests.py +++ b/tests/regressiontests/mail/tests.py @@ -1,21 +1,62 @@ # coding: utf-8 +import asyncore import email import os import shutil +import smtpd import sys -import tempfile from StringIO import StringIO +import tempfile +import threading + from django.conf import settings from django.core import mail from django.core.mail import EmailMessage, mail_admins, mail_managers, EmailMultiAlternatives from django.core.mail import send_mail, send_mass_mail -from django.core.mail.backends.base import BaseEmailBackend from django.core.mail.backends import console, dummy, locmem, filebased, smtp from django.core.mail.message import BadHeaderError from django.test import TestCase from django.utils.translation import ugettext_lazy +from django.utils.functional import wraps + + +def alter_django_settings(**kwargs): + oldvalues = {} + nonexistant = [] + for setting, newvalue in kwargs.iteritems(): + try: + oldvalues[setting] = getattr(settings, setting) + except AttributeError: + nonexistant.append(setting) + setattr(settings, setting, newvalue) + return oldvalues, nonexistant + + +def restore_django_settings(state): + oldvalues, nonexistant = state + for setting, oldvalue in oldvalues.iteritems(): + setattr(settings, setting, oldvalue) + for setting in nonexistant: + delattr(settings, setting) + + +def with_django_settings(**kwargs): + def decorator(test): + @wraps(test) + def decorated_test(self): + state = alter_django_settings(**kwargs) + try: + return test(self) + finally: + restore_django_settings(state) + return decorated_test + return decorator + class MailTests(TestCase): + """ + Non-backend specific tests. + """ def test_ascii(self): email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) @@ -26,7 +67,7 @@ class MailTests(TestCase): self.assertEqual(message['To'], 'to@example.com') def test_multiple_recipients(self): - email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com','other@example.com']) + email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com']) message = email.message() self.assertEqual(message['Subject'].encode(), 'Subject') self.assertEqual(message.get_payload(), 'Content') @@ -40,14 +81,6 @@ class MailTests(TestCase): self.assertEqual(message['Cc'], 'cc@example.com') self.assertEqual(email.recipients(), ['to@example.com', 'cc@example.com']) - # Verify headers - old_stdout = sys.stdout - sys.stdout = StringIO() - connection = console.EmailBackend() - connection.send_messages([email]) - self.assertTrue(sys.stdout.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: ')) - sys.stdout = old_stdout - # Test multiple CC with multiple To email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com']) message = email.message() @@ -83,33 +116,6 @@ class MailTests(TestCase): email = EmailMessage('subject', 'content', 'from@example.com', ['to@example.com'], headers=headers) self.assertEqual(email.message().as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: subject\nFrom: from@example.com\nTo: to@example.com\ndate: Fri, 09 Nov 2001 01:08:47 -0000\nMessage-ID: foo\n\ncontent') - def test_empty_admins(self): - """ - Test that mail_admins/mail_managers doesn't connect to the mail server - if there are no recipients (#9383) - """ - old_admins = settings.ADMINS - old_managers = settings.MANAGERS - - settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')] - mail.outbox = [] - mail_admins('hi', 'there') - self.assertEqual(len(mail.outbox), 1) - mail.outbox = [] - mail_managers('hi', 'there') - self.assertEqual(len(mail.outbox), 1) - - settings.ADMINS = settings.MANAGERS = [] - mail.outbox = [] - mail_admins('hi', 'there') - self.assertEqual(len(mail.outbox), 0) - mail.outbox = [] - mail_managers('hi', 'there') - self.assertEqual(len(mail.outbox), 0) - - settings.ADMINS = old_admins - settings.MANAGERS = old_managers - def test_from_header(self): """ Make sure we can manually set the From header (#9214) @@ -129,17 +135,26 @@ class MailTests(TestCase): message = email.message() self.assertEqual(message['From'], 'from@example.com') - def test_unicode_header(self): + def test_unicode_address_header(self): """ Regression for #11144 - When a to/from/cc header contains unicode, make sure the email addresses are parsed correctly (especially with regards to commas) """ - email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" ','other@example.com']) + email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" ', 'other@example.com']) self.assertEqual(email.message()['To'], '=?utf-8?q?Firstname_S=C3=BCrname?= , other@example.com') - email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" ','other@example.com']) + email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" ', 'other@example.com']) self.assertEqual(email.message()['To'], '=?utf-8?q?S=C3=BCrname=2C_Firstname?= , other@example.com') + def test_unicode_headers(self): + email = EmailMessage(u"Gżegżółka", "Content", "from@example.com", ["to@example.com"], + headers={"Sender": '"Firstname Sürname" ', + "Comments": 'My Sürname is non-ASCII'}) + message = email.message() + self.assertEqual(message['Subject'], '=?utf-8?b?R8W8ZWfFvMOzxYJrYQ==?=') + self.assertEqual(message['Sender'], '=?utf-8?q?Firstname_S=C3=BCrname?= ') + self.assertEqual(message['Comments'], '=?utf-8?q?My_S=C3=BCrname_is_non-ASCII?=') + def test_safe_mime_multipart(self): """ Make sure headers can be set with a different encoding than utf-8 in @@ -193,26 +208,7 @@ class MailTests(TestCase): self.assertEqual(payload[0].get_content_type(), 'multipart/alternative') self.assertEqual(payload[1].get_content_type(), 'application/pdf') - def test_arbitrary_stream(self): - """ - Test that the console backend can be pointed at an arbitrary stream. - """ - s = StringIO() - connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s) - send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) - self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: ')) - - def test_stdout(self): - """Make sure that the console backend writes to stdout by default""" - old_stdout = sys.stdout - sys.stdout = StringIO() - connection = console.EmailBackend() - email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) - connection.send_messages([email]) - self.assertTrue(sys.stdout.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: ')) - sys.stdout = old_stdout - - def test_dummy(self): + def test_dummy_backend(self): """ Make sure that dummy backends returns correct number of sent messages """ @@ -220,52 +216,6 @@ class MailTests(TestCase): email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) self.assertEqual(connection.send_messages([email, email, email]), 3) - def test_locmem(self): - """ - Make sure that the locmen backend populates the outbox. - """ - mail.outbox = [] - connection = locmem.EmailBackend() - email1 = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) - email2 = EmailMessage('Subject 2', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) - connection.send_messages([email1, email2]) - self.assertEqual(len(mail.outbox), 2) - self.assertEqual(mail.outbox[0].subject, 'Subject') - self.assertEqual(mail.outbox[1].subject, 'Subject 2') - - # Make sure that multiple locmem connections share mail.outbox - mail.outbox = [] - connection2 = locmem.EmailBackend() - email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) - connection.send_messages([email]) - connection2.send_messages([email]) - self.assertEqual(len(mail.outbox), 2) - - def test_file_backend(self): - tmp_dir = tempfile.mkdtemp() - connection = filebased.EmailBackend(file_path=tmp_dir) - email1 = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) - connection.send_messages([email1]) - self.assertEqual(len(os.listdir(tmp_dir)), 1) - message = email.message_from_file(open(os.path.join(tmp_dir, os.listdir(tmp_dir)[0]))) - self.assertEqual(message.get_content_type(), 'text/plain') - self.assertEqual(message.get('subject'), 'Subject') - self.assertEqual(message.get('from'), 'from@example.com') - self.assertEqual(message.get('to'), 'to@example.com') - connection2 = filebased.EmailBackend(file_path=tmp_dir) - connection2.send_messages([email1]) - self.assertEqual(len(os.listdir(tmp_dir)), 2) - connection.send_messages([email1]) - self.assertEqual(len(os.listdir(tmp_dir)), 2) - email1.connection = filebased.EmailBackend(file_path=tmp_dir) - connection_created = connection.open() - email1.send() - self.assertEqual(len(os.listdir(tmp_dir)), 3) - email1.send() - self.assertEqual(len(os.listdir(tmp_dir)), 3) - connection.close() - shutil.rmtree(tmp_dir) - def test_arbitrary_keyword(self): """ Make sure that get_connection() accepts arbitrary keyword that might be @@ -289,144 +239,392 @@ class MailTests(TestCase): self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.dummy.EmailBackend'), dummy.EmailBackend)) self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.console.EmailBackend'), console.EmailBackend)) tmp_dir = tempfile.mkdtemp() - self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend)) - shutil.rmtree(tmp_dir) + try: + self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend)) + finally: + shutil.rmtree(tmp_dir) self.assertTrue(isinstance(mail.get_connection(), locmem.EmailBackend)) + @with_django_settings( + EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend', + ADMINS=[('nobody', 'nobody@example.com')], + MANAGERS=[('nobody', 'nobody@example.com')]) def test_connection_arg(self): """Test connection argument to send_mail(), et. al.""" - connection = mail.get_connection('django.core.mail.backends.locmem.EmailBackend') - mail.outbox = [] + + # Send using non-default connection + connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) - self.assertEqual(len(mail.outbox), 1) - message = mail.outbox[0] - self.assertEqual(message.subject, 'Subject') - self.assertEqual(message.from_email, 'from@example.com') - self.assertEqual(message.to, ['to@example.com']) + self.assertEqual(mail.outbox, []) + self.assertEqual(len(connection.test_outbox), 1) + self.assertEqual(connection.test_outbox[0].subject, 'Subject') - mail.outbox = [] + connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') send_mass_mail([ ('Subject1', 'Content1', 'from1@example.com', ['to1@example.com']), - ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']) + ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']), ], connection=connection) - self.assertEqual(len(mail.outbox), 2) - message = mail.outbox[0] - self.assertEqual(message.subject, 'Subject1') - self.assertEqual(message.from_email, 'from1@example.com') - self.assertEqual(message.to, ['to1@example.com']) - message = mail.outbox[1] - self.assertEqual(message.subject, 'Subject2') - self.assertEqual(message.from_email, 'from2@example.com') - self.assertEqual(message.to, ['to2@example.com']) + self.assertEqual(mail.outbox, []) + self.assertEqual(len(connection.test_outbox), 2) + self.assertEqual(connection.test_outbox[0].subject, 'Subject1') + self.assertEqual(connection.test_outbox[1].subject, 'Subject2') - old_admins = settings.ADMINS - old_managers = settings.MANAGERS - settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')] + connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') + mail_admins('Admin message', 'Content', connection=connection) + self.assertEqual(mail.outbox, []) + self.assertEqual(len(connection.test_outbox), 1) + self.assertEqual(connection.test_outbox[0].subject, '[Django] Admin message') - mail.outbox = [] - mail_admins('Subject', 'Content', connection=connection) - self.assertEqual(len(mail.outbox), 1) - message = mail.outbox[0] - self.assertEqual(message.subject, '[Django] Subject') - self.assertEqual(message.from_email, 'root@localhost') - self.assertEqual(message.to, ['nobody@example.com']) + connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') + mail_managers('Manager message', 'Content', connection=connection) + self.assertEqual(mail.outbox, []) + self.assertEqual(len(connection.test_outbox), 1) + self.assertEqual(connection.test_outbox[0].subject, '[Django] Manager message') - mail.outbox = [] - mail_managers('Subject', 'Content', connection=connection) - self.assertEqual(len(mail.outbox), 1) - message = mail.outbox[0] - self.assertEqual(message.subject, '[Django] Subject') - self.assertEqual(message.from_email, 'root@localhost') - self.assertEqual(message.to, ['nobody@example.com']) - settings.ADMINS = old_admins - settings.MANAGERS = old_managers +class BaseEmailBackendTests(object): + email_backend = None - def test_mail_prefix(self): - """Test prefix argument in manager/admin mail.""" - # Regression for #13494. - old_admins = settings.ADMINS - old_managers = settings.MANAGERS - settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')] + def setUp(self): + self.__settings_state = alter_django_settings(EMAIL_BACKEND=self.email_backend) - mail_managers(ugettext_lazy('Subject'), 'Content') - self.assertEqual(len(mail.outbox), 1) - message = mail.outbox[0] - self.assertEqual(message.subject, '[Django] Subject') + def tearDown(self): + restore_django_settings(self.__settings_state) - mail.outbox = [] - mail_admins(ugettext_lazy('Subject'), 'Content') - self.assertEqual(len(mail.outbox), 1) - message = mail.outbox[0] - self.assertEqual(message.subject, '[Django] Subject') + def assertStartsWith(self, first, second): + if not first.startswith(second): + self.longMessage = True + self.assertEqual(first[:len(second)], second, "First string doesn't start with the second.") - settings.ADMINS = old_admins - settings.MANAGERS = old_managers + def get_mailbox_content(self): + raise NotImplementedError - def test_html_mail_admins(self): - """Test html_message argument to mail_admins and mail_managers""" - old_admins = settings.ADMINS - settings.ADMINS = [('nobody','nobody@example.com')] + def flush_mailbox(self): + raise NotImplementedError - mail.outbox = [] - mail_admins('Subject', 'Content', html_message='HTML Content') - self.assertEqual(len(mail.outbox), 1) - message = mail.outbox[0] - self.assertEqual(message.subject, '[Django] Subject') - self.assertEqual(message.body, 'Content') - self.assertEqual(message.alternatives, [('HTML Content', 'text/html')]) + def get_the_message(self): + mailbox = self.get_mailbox_content() + self.assertEqual(len(mailbox), 1, + "Expected exactly one message, got %d.\n%r" % (len(mailbox), [ + m.as_string() for m in mailbox])) + return mailbox[0] - settings.ADMINS = old_admins + def test_send(self): + email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) + num_sent = mail.get_connection().send_messages([email]) + self.assertEqual(num_sent, 1) + message = self.get_the_message() + self.assertEqual(message["subject"], "Subject") + self.assertEqual(message.get_payload(), "Content") + self.assertEqual(message["from"], "from@example.com") + self.assertEqual(message.get_all("to"), ["to@example.com"]) + def test_send_many(self): + email1 = EmailMessage('Subject', 'Content1', 'from@example.com', ['to@example.com']) + email2 = EmailMessage('Subject', 'Content2', 'from@example.com', ['to@example.com']) + num_sent = mail.get_connection().send_messages([email1, email2]) + self.assertEqual(num_sent, 2) + messages = self.get_mailbox_content() + self.assertEquals(len(messages), 2) + self.assertEqual(messages[0].get_payload(), "Content1") + self.assertEqual(messages[1].get_payload(), "Content2") + + def test_send_verbose_name(self): + email = EmailMessage("Subject", "Content", '"Firstname Sürname" ', + ["to@example.com"]) + email.send() + message = self.get_the_message() + self.assertEqual(message["subject"], "Subject") + self.assertEqual(message.get_payload(), "Content") + self.assertEqual(message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= ") + + @with_django_settings(MANAGERS=[('nobody', 'nobody@example.com')]) def test_html_mail_managers(self): - """Test html_message argument to mail_admins and mail_managers""" - old_managers = settings.MANAGERS - settings.MANAGERS = [('nobody','nobody@example.com')] - - mail.outbox = [] + """Test html_message argument to mail_managers""" mail_managers('Subject', 'Content', html_message='HTML Content') - self.assertEqual(len(mail.outbox), 1) - message = mail.outbox[0] - self.assertEqual(message.subject, '[Django] Subject') - self.assertEqual(message.body, 'Content') - self.assertEqual(message.alternatives, [('HTML Content', 'text/html')]) + message = self.get_the_message() - settings.MANAGERS = old_managers + self.assertEqual(message.get('subject'), '[Django] Subject') + self.assertEqual(message.get_all('to'), ['nobody@example.com']) + self.assertTrue(message.is_multipart()) + self.assertEqual(len(message.get_payload()), 2) + self.assertEqual(message.get_payload(0).get_payload(), 'Content') + self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain') + self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content') + self.assertEqual(message.get_payload(1).get_content_type(), 'text/html') - def test_idn_validation(self): - """Test internationalized email adresses""" - # Regression for #14301. + @with_django_settings(ADMINS=[('nobody', 'nobody@example.com')]) + def test_html_mail_admins(self): + """Test html_message argument to mail_admins """ + mail_admins('Subject', 'Content', html_message='HTML Content') + message = self.get_the_message() + + self.assertEqual(message.get('subject'), '[Django] Subject') + self.assertEqual(message.get_all('to'), ['nobody@example.com']) + self.assertTrue(message.is_multipart()) + self.assertEqual(len(message.get_payload()), 2) + self.assertEqual(message.get_payload(0).get_payload(), 'Content') + self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain') + self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content') + self.assertEqual(message.get_payload(1).get_content_type(), 'text/html') + + @with_django_settings(ADMINS=[('nobody', 'nobody+admin@example.com')], + MANAGERS=[('nobody', 'nobody+manager@example.com')]) + def test_manager_and_admin_mail_prefix(self): + """ + String prefix + lazy translated subject = bad output + Regression for #13494 + """ + mail_managers(ugettext_lazy('Subject'), 'Content') + message = self.get_the_message() + self.assertEqual(message.get('subject'), '[Django] Subject') + + self.flush_mailbox() + mail_admins(ugettext_lazy('Subject'), 'Content') + message = self.get_the_message() + self.assertEqual(message.get('subject'), '[Django] Subject') + + @with_django_settings(ADMINS=(), MANAGERS=()) + def test_empty_admins(self): + """ + Test that mail_admins/mail_managers doesn't connect to the mail server + if there are no recipients (#9383) + """ + mail_admins('hi', 'there') + self.assertEqual(self.get_mailbox_content(), []) + mail_managers('hi', 'there') + self.assertEqual(self.get_mailbox_content(), []) + + def test_message_cc_header(self): + """ + Regression test for #7722 + """ + email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com']) + mail.get_connection().send_messages([email]) + message = self.get_the_message() + self.assertStartsWith(message.as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: ') + + def test_idn_send(self): + """ + Regression test for #14301 + """ + self.assertTrue(send_mail('Subject', 'Content', 'from@öäü.com', [u'to@öäü.com'])) + message = self.get_the_message() + self.assertEqual(message.get('subject'), 'Subject') + self.assertEqual(message.get('from'), 'from@xn--4ca9at.com') + self.assertEqual(message.get('to'), 'to@xn--4ca9at.com') + + self.flush_mailbox() + m = EmailMessage('Subject', 'Content', 'from@öäü.com', + [u'to@öäü.com'], cc=[u'cc@öäü.com']) + m.send() + message = self.get_the_message() + self.assertEqual(message.get('subject'), 'Subject') + self.assertEqual(message.get('from'), 'from@xn--4ca9at.com') + self.assertEqual(message.get('to'), 'to@xn--4ca9at.com') + self.assertEqual(message.get('cc'), 'cc@xn--4ca9at.com') + + def test_recipient_without_domain(self): + """ + Regression test for #15042 + """ + self.assertTrue(send_mail("Subject", "Content", "tester", ["django"])) + message = self.get_the_message() + self.assertEqual(message.get('subject'), 'Subject') + self.assertEqual(message.get('from'), "tester") + self.assertEqual(message.get('to'), "django") + + +class LocmemBackendTests(BaseEmailBackendTests, TestCase): + email_backend = 'django.core.mail.backends.locmem.EmailBackend' + + def get_mailbox_content(self): + return [m.message() for m in mail.outbox] + + def flush_mailbox(self): mail.outbox = [] - from_email = u'fröm@öäü.com' - to_email = u'tö@öäü.com' - connection = mail.get_connection('django.core.mail.backends.locmem.EmailBackend') - send_mail('Subject', 'Content', from_email, [to_email], connection=connection) - self.assertEqual(len(mail.outbox), 1) - message = mail.outbox[0] - self.assertEqual(message.subject, 'Subject') - self.assertEqual(message.from_email, from_email) - self.assertEqual(message.to, [to_email]) - self.assertTrue(message.message().as_string().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: =?utf-8?b?ZnLDtm1Aw7bDpMO8LmNvbQ==?=\nTo: =?utf-8?b?dMO2QMO2w6TDvC5jb20=?=')) - def test_idn_smtp_send(self): - import smtplib - smtplib.SMTP = MockSMTP - from_email = u'fröm@öäü.com' - to_email = u'tö@öäü.com' - connection = mail.get_connection('django.core.mail.backends.smtp.EmailBackend') - self.assertTrue(send_mail('Subject', 'Content', from_email, [to_email], connection=connection)) + def tearDown(self): + super(LocmemBackendTests, self).tearDown() + mail.outbox = [] -class MockSMTP(object): - def __init__(self, host='', port=0, local_hostname=None, - timeout=1): - pass + def test_locmem_shared_messages(self): + """ + Make sure that the locmen backend populates the outbox. + """ + connection = locmem.EmailBackend() + connection2 = locmem.EmailBackend() + email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + connection.send_messages([email]) + connection2.send_messages([email]) + self.assertEqual(len(mail.outbox), 2) - def sendmail(self, from_addr, to_addrs, msg, mail_options=[], - rcpt_options=[]): - for addr in to_addrs: - str(addr.split('@', 1)[-1]) - return {} - def quit(self): - return 0 +class FileBackendTests(BaseEmailBackendTests, TestCase): + email_backend = 'django.core.mail.backends.filebased.EmailBackend' + + def setUp(self): + super(FileBackendTests, self).setUp() + self.tmp_dir = tempfile.mkdtemp() + self.__settings_state = alter_django_settings(EMAIL_FILE_PATH=self.tmp_dir) + + def tearDown(self): + restore_django_settings(self.__settings_state) + shutil.rmtree(self.tmp_dir) + super(FileBackendTests, self).tearDown() + + def flush_mailbox(self): + for filename in os.listdir(self.tmp_dir): + os.unlink(os.path.join(self.tmp_dir, filename)) + + def get_mailbox_content(self): + messages = [] + for filename in os.listdir(self.tmp_dir): + session = open(os.path.join(self.tmp_dir, filename)).read().split('\n' + ('-' * 79) + '\n') + messages.extend(email.message_from_string(m) for m in session if m) + return messages + + def test_file_sessions(self): + """Make sure opening a connection creates a new file""" + msg = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + connection = mail.get_connection() + connection.send_messages([msg]) + + self.assertEqual(len(os.listdir(self.tmp_dir)), 1) + message = email.message_from_file(open(os.path.join(self.tmp_dir, os.listdir(self.tmp_dir)[0]))) + self.assertEqual(message.get_content_type(), 'text/plain') + self.assertEqual(message.get('subject'), 'Subject') + self.assertEqual(message.get('from'), 'from@example.com') + self.assertEqual(message.get('to'), 'to@example.com') + + connection2 = mail.get_connection() + connection2.send_messages([msg]) + self.assertEqual(len(os.listdir(self.tmp_dir)), 2) + + connection.send_messages([msg]) + self.assertEqual(len(os.listdir(self.tmp_dir)), 2) + + msg.connection = mail.get_connection() + self.assertTrue(connection.open()) + msg.send() + self.assertEqual(len(os.listdir(self.tmp_dir)), 3) + msg.send() + self.assertEqual(len(os.listdir(self.tmp_dir)), 3) + + +class ConsoleBackendTests(BaseEmailBackendTests, TestCase): + email_backend = 'django.core.mail.backends.console.EmailBackend' + + def setUp(self): + super(ConsoleBackendTests, self).setUp() + self.__stdout = sys.stdout + self.stream = sys.stdout = StringIO() + + def tearDown(self): + del self.stream + sys.stdout = self.__stdout + del self.__stdout + super(ConsoleBackendTests, self).tearDown() + + def flush_mailbox(self): + self.stream = sys.stdout = StringIO() + + def get_mailbox_content(self): + messages = self.stream.getvalue().split('\n' + ('-' * 79) + '\n') + return [email.message_from_string(m) for m in messages if m] + + def test_console_stream_kwarg(self): + """ + Test that the console backend can be pointed at an arbitrary stream. + """ + s = StringIO() + connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s) + send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) + self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: ')) + + +class FakeSMTPServer(smtpd.SMTPServer, threading.Thread): + """ + Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from: + http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup + """ + + def __init__(self, *args, **kwargs): + threading.Thread.__init__(self) + smtpd.SMTPServer.__init__(self, *args, **kwargs) + self._sink = [] + self.active = False + self.active_lock = threading.Lock() + self.sink_lock = threading.Lock() + + def process_message(self, peer, mailfrom, rcpttos, data): + m = email.message_from_string(data) + maddr = email.Utils.parseaddr(m.get('from'))[1] + if mailfrom != maddr: + return "553 '%s' != '%s'" % (mailfrom, maddr) + self.sink_lock.acquire() + self._sink.append(m) + self.sink_lock.release() + + def get_sink(self): + self.sink_lock.acquire() + try: + return self._sink[:] + finally: + self.sink_lock.release() + + def flush_sink(self): + self.sink_lock.acquire() + self._sink[:] = [] + self.sink_lock.release() + + def start(self): + assert not self.active + self.__flag = threading.Event() + threading.Thread.start(self) + self.__flag.wait() + + def run(self): + self.active = True + self.__flag.set() + while self.active and asyncore.socket_map: + self.active_lock.acquire() + asyncore.loop(timeout=0.1, count=1) + self.active_lock.release() + asyncore.close_all() + + def stop(self): + assert self.active + self.active = False + self.join() + + +class SMTPBackendTests(BaseEmailBackendTests, TestCase): + email_backend = 'django.core.mail.backends.smtp.EmailBackend' + + @classmethod + def setUpClass(cls): + cls.server = FakeSMTPServer(('127.0.0.1', 0), None) + cls.settings = alter_django_settings( + EMAIL_HOST="127.0.0.1", + EMAIL_PORT=cls.server.socket.getsockname()[1]) + cls.server.start() + + @classmethod + def tearDownClass(cls): + cls.server.stop() + + def setUp(self): + super(SMTPBackendTests, self).setUp() + self.server.flush_sink() + + def tearDown(self): + self.server.flush_sink() + super(SMTPBackendTests, self).tearDown() + + def flush_mailbox(self): + self.server.flush_sink() + + def get_mailbox_content(self): + return self.server.get_sink()