Fixed #15042 -- Ensured that email addresses without a domain can still be mail recipients. Patch also improves the IDN handling introduced by r15006, and refactors the test suite to ensure even feature coverage. Thanks to net147 for the report, and to Łukasz Rekucki for the awesome patch.

git-svn-id: http://code.djangoproject.com/svn/django/trunk@15211 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
Russell Keith-Magee 2011-01-15 05:55:24 +00:00
parent 0d70d29227
commit 11997218ee
3 changed files with 468 additions and 242 deletions

View File

@ -1,5 +1,4 @@
"""SMTP email backend class.""" """SMTP email backend class."""
import smtplib import smtplib
import socket import socket
import threading import threading
@ -7,6 +6,8 @@ import threading
from django.conf import settings from django.conf import settings
from django.core.mail.backends.base import BaseEmailBackend from django.core.mail.backends.base import BaseEmailBackend
from django.core.mail.utils import DNS_NAME from django.core.mail.utils import DNS_NAME
from django.core.mail.message import sanitize_address
class EmailBackend(BaseEmailBackend): class EmailBackend(BaseEmailBackend):
""" """
@ -91,17 +92,13 @@ class EmailBackend(BaseEmailBackend):
self._lock.release() self._lock.release()
return num_sent return num_sent
def _sanitize(self, email):
name, domain = email.split('@', 1)
email = '@'.join([name, domain.encode('idna')])
return email
def _send(self, email_message): def _send(self, email_message):
"""A helper method that does the actual sending.""" """A helper method that does the actual sending."""
if not email_message.recipients(): if not email_message.recipients():
return False return False
from_email = self._sanitize(email_message.from_email) from_email = sanitize_address(email_message.from_email, email_message.encoding)
recipients = map(self._sanitize, email_message.recipients()) recipients = [sanitize_address(addr, email_message.encoding)
for addr in email_message.recipients()]
try: try:
self.connection.sendmail(from_email, recipients, self.connection.sendmail(from_email, recipients,
email_message.message().as_string()) email_message.message().as_string())

View File

@ -12,6 +12,7 @@ from email.Utils import formatdate, getaddresses, formataddr
from django.conf import settings from django.conf import settings
from django.core.mail.utils import DNS_NAME from django.core.mail.utils import DNS_NAME
from django.utils.encoding import smart_str, force_unicode from django.utils.encoding import smart_str, force_unicode
from email.Utils import parseaddr
# Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from # Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from
# some spam filters. # some spam filters.
@ -54,6 +55,22 @@ def make_msgid(idstring=None):
return msgid return msgid
# Header names that contain structured address data (RFC #5322)
ADDRESS_HEADERS = set([
'from',
'sender',
'reply-to',
'to',
'cc',
'bcc',
'resent-from',
'resent-sender',
'resent-to',
'resent-cc',
'resent-bcc',
])
def forbid_multi_line_headers(name, val, encoding): def forbid_multi_line_headers(name, val, encoding):
"""Forbids multi-line headers, to prevent header injection.""" """Forbids multi-line headers, to prevent header injection."""
encoding = encoding or settings.DEFAULT_CHARSET encoding = encoding or settings.DEFAULT_CHARSET
@ -63,23 +80,35 @@ def forbid_multi_line_headers(name, val, encoding):
try: try:
val = val.encode('ascii') val = val.encode('ascii')
except UnicodeEncodeError: except UnicodeEncodeError:
if name.lower() in ('to', 'from', 'cc'): if name.lower() in ADDRESS_HEADERS:
result = [] val = ', '.join(sanitize_address(addr, encoding)
for nm, addr in getaddresses((val,)): for addr in getaddresses((val,)))
nm = str(Header(nm.encode(encoding), encoding))
try:
addr = addr.encode('ascii')
except UnicodeEncodeError: # IDN
addr = str(Header(addr.encode(encoding), encoding))
result.append(formataddr((nm, addr)))
val = ', '.join(result)
else: else:
val = Header(val.encode(encoding), encoding) val = str(Header(val, encoding))
else: else:
if name.lower() == 'subject': if name.lower() == 'subject':
val = Header(val) val = Header(val)
return name, val return name, val
def sanitize_address(addr, encoding):
if isinstance(addr, basestring):
addr = parseaddr(force_unicode(addr))
nm, addr = addr
nm = str(Header(nm, encoding))
try:
addr = addr.encode('ascii')
except UnicodeEncodeError: # IDN
if u'@' in addr:
localpart, domain = addr.split(u'@', 1)
localpart = str(Header(localpart, encoding))
domain = domain.encode('idna')
addr = '@'.join([localpart, domain])
else:
addr = str(Header(addr, encoding))
return formataddr((nm, addr))
class SafeMIMEText(MIMEText): class SafeMIMEText(MIMEText):
def __init__(self, text, subtype, charset): def __init__(self, text, subtype, charset):
@ -90,6 +119,7 @@ class SafeMIMEText(MIMEText):
name, val = forbid_multi_line_headers(name, val, self.encoding) name, val = forbid_multi_line_headers(name, val, self.encoding)
MIMEText.__setitem__(self, name, val) MIMEText.__setitem__(self, name, val)
class SafeMIMEMultipart(MIMEMultipart): class SafeMIMEMultipart(MIMEMultipart):
def __init__(self, _subtype='mixed', boundary=None, _subparts=None, encoding=None, **_params): def __init__(self, _subtype='mixed', boundary=None, _subparts=None, encoding=None, **_params):
@ -100,6 +130,7 @@ class SafeMIMEMultipart(MIMEMultipart):
name, val = forbid_multi_line_headers(name, val, self.encoding) name, val = forbid_multi_line_headers(name, val, self.encoding)
MIMEMultipart.__setitem__(self, name, val) MIMEMultipart.__setitem__(self, name, val)
class EmailMessage(object): class EmailMessage(object):
""" """
A container for email information. A container for email information.
@ -274,7 +305,7 @@ class EmailMultiAlternatives(EmailMessage):
conversions. conversions.
""" """
super(EmailMultiAlternatives, self).__init__(subject, body, from_email, to, bcc, connection, attachments, headers, cc) super(EmailMultiAlternatives, self).__init__(subject, body, from_email, to, bcc, connection, attachments, headers, cc)
self.alternatives=alternatives or [] self.alternatives = alternatives or []
def attach_alternative(self, content, mimetype): def attach_alternative(self, content, mimetype):
"""Attach an alternative content representation.""" """Attach an alternative content representation."""

View File

@ -1,21 +1,62 @@
# coding: utf-8 # coding: utf-8
import asyncore
import email import email
import os import os
import shutil import shutil
import smtpd
import sys import sys
import tempfile
from StringIO import StringIO from StringIO import StringIO
import tempfile
import threading
from django.conf import settings from django.conf import settings
from django.core import mail from django.core import mail
from django.core.mail import EmailMessage, mail_admins, mail_managers, EmailMultiAlternatives from django.core.mail import EmailMessage, mail_admins, mail_managers, EmailMultiAlternatives
from django.core.mail import send_mail, send_mass_mail from django.core.mail import send_mail, send_mass_mail
from django.core.mail.backends.base import BaseEmailBackend
from django.core.mail.backends import console, dummy, locmem, filebased, smtp from django.core.mail.backends import console, dummy, locmem, filebased, smtp
from django.core.mail.message import BadHeaderError from django.core.mail.message import BadHeaderError
from django.test import TestCase from django.test import TestCase
from django.utils.translation import ugettext_lazy from django.utils.translation import ugettext_lazy
from django.utils.functional import wraps
def alter_django_settings(**kwargs):
oldvalues = {}
nonexistant = []
for setting, newvalue in kwargs.iteritems():
try:
oldvalues[setting] = getattr(settings, setting)
except AttributeError:
nonexistant.append(setting)
setattr(settings, setting, newvalue)
return oldvalues, nonexistant
def restore_django_settings(state):
oldvalues, nonexistant = state
for setting, oldvalue in oldvalues.iteritems():
setattr(settings, setting, oldvalue)
for setting in nonexistant:
delattr(settings, setting)
def with_django_settings(**kwargs):
def decorator(test):
@wraps(test)
def decorated_test(self):
state = alter_django_settings(**kwargs)
try:
return test(self)
finally:
restore_django_settings(state)
return decorated_test
return decorator
class MailTests(TestCase): class MailTests(TestCase):
"""
Non-backend specific tests.
"""
def test_ascii(self): def test_ascii(self):
email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'])
@ -26,7 +67,7 @@ class MailTests(TestCase):
self.assertEqual(message['To'], 'to@example.com') self.assertEqual(message['To'], 'to@example.com')
def test_multiple_recipients(self): def test_multiple_recipients(self):
email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com','other@example.com']) email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'])
message = email.message() message = email.message()
self.assertEqual(message['Subject'].encode(), 'Subject') self.assertEqual(message['Subject'].encode(), 'Subject')
self.assertEqual(message.get_payload(), 'Content') self.assertEqual(message.get_payload(), 'Content')
@ -40,14 +81,6 @@ class MailTests(TestCase):
self.assertEqual(message['Cc'], 'cc@example.com') self.assertEqual(message['Cc'], 'cc@example.com')
self.assertEqual(email.recipients(), ['to@example.com', 'cc@example.com']) self.assertEqual(email.recipients(), ['to@example.com', 'cc@example.com'])
# Verify headers
old_stdout = sys.stdout
sys.stdout = StringIO()
connection = console.EmailBackend()
connection.send_messages([email])
self.assertTrue(sys.stdout.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: '))
sys.stdout = old_stdout
# Test multiple CC with multiple To # Test multiple CC with multiple To
email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com']) email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com'])
message = email.message() message = email.message()
@ -83,33 +116,6 @@ class MailTests(TestCase):
email = EmailMessage('subject', 'content', 'from@example.com', ['to@example.com'], headers=headers) email = EmailMessage('subject', 'content', 'from@example.com', ['to@example.com'], headers=headers)
self.assertEqual(email.message().as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: subject\nFrom: from@example.com\nTo: to@example.com\ndate: Fri, 09 Nov 2001 01:08:47 -0000\nMessage-ID: foo\n\ncontent') self.assertEqual(email.message().as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: subject\nFrom: from@example.com\nTo: to@example.com\ndate: Fri, 09 Nov 2001 01:08:47 -0000\nMessage-ID: foo\n\ncontent')
def test_empty_admins(self):
"""
Test that mail_admins/mail_managers doesn't connect to the mail server
if there are no recipients (#9383)
"""
old_admins = settings.ADMINS
old_managers = settings.MANAGERS
settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')]
mail.outbox = []
mail_admins('hi', 'there')
self.assertEqual(len(mail.outbox), 1)
mail.outbox = []
mail_managers('hi', 'there')
self.assertEqual(len(mail.outbox), 1)
settings.ADMINS = settings.MANAGERS = []
mail.outbox = []
mail_admins('hi', 'there')
self.assertEqual(len(mail.outbox), 0)
mail.outbox = []
mail_managers('hi', 'there')
self.assertEqual(len(mail.outbox), 0)
settings.ADMINS = old_admins
settings.MANAGERS = old_managers
def test_from_header(self): def test_from_header(self):
""" """
Make sure we can manually set the From header (#9214) Make sure we can manually set the From header (#9214)
@ -129,17 +135,26 @@ class MailTests(TestCase):
message = email.message() message = email.message()
self.assertEqual(message['From'], 'from@example.com') self.assertEqual(message['From'], 'from@example.com')
def test_unicode_header(self): def test_unicode_address_header(self):
""" """
Regression for #11144 - When a to/from/cc header contains unicode, Regression for #11144 - When a to/from/cc header contains unicode,
make sure the email addresses are parsed correctly (especially with make sure the email addresses are parsed correctly (especially with
regards to commas) regards to commas)
""" """
email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>','other@example.com']) email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>', 'other@example.com'])
self.assertEqual(email.message()['To'], '=?utf-8?q?Firstname_S=C3=BCrname?= <to@example.com>, other@example.com') self.assertEqual(email.message()['To'], '=?utf-8?q?Firstname_S=C3=BCrname?= <to@example.com>, other@example.com')
email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>','other@example.com']) email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>', 'other@example.com'])
self.assertEqual(email.message()['To'], '=?utf-8?q?S=C3=BCrname=2C_Firstname?= <to@example.com>, other@example.com') self.assertEqual(email.message()['To'], '=?utf-8?q?S=C3=BCrname=2C_Firstname?= <to@example.com>, other@example.com')
def test_unicode_headers(self):
email = EmailMessage(u"Gżegżółka", "Content", "from@example.com", ["to@example.com"],
headers={"Sender": '"Firstname Sürname" <sender@example.com>',
"Comments": 'My Sürname is non-ASCII'})
message = email.message()
self.assertEqual(message['Subject'], '=?utf-8?b?R8W8ZWfFvMOzxYJrYQ==?=')
self.assertEqual(message['Sender'], '=?utf-8?q?Firstname_S=C3=BCrname?= <sender@example.com>')
self.assertEqual(message['Comments'], '=?utf-8?q?My_S=C3=BCrname_is_non-ASCII?=')
def test_safe_mime_multipart(self): def test_safe_mime_multipart(self):
""" """
Make sure headers can be set with a different encoding than utf-8 in Make sure headers can be set with a different encoding than utf-8 in
@ -193,26 +208,7 @@ class MailTests(TestCase):
self.assertEqual(payload[0].get_content_type(), 'multipart/alternative') self.assertEqual(payload[0].get_content_type(), 'multipart/alternative')
self.assertEqual(payload[1].get_content_type(), 'application/pdf') self.assertEqual(payload[1].get_content_type(), 'application/pdf')
def test_arbitrary_stream(self): def test_dummy_backend(self):
"""
Test that the console backend can be pointed at an arbitrary stream.
"""
s = StringIO()
connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s)
send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: '))
def test_stdout(self):
"""Make sure that the console backend writes to stdout by default"""
old_stdout = sys.stdout
sys.stdout = StringIO()
connection = console.EmailBackend()
email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
connection.send_messages([email])
self.assertTrue(sys.stdout.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: '))
sys.stdout = old_stdout
def test_dummy(self):
""" """
Make sure that dummy backends returns correct number of sent messages Make sure that dummy backends returns correct number of sent messages
""" """
@ -220,52 +216,6 @@ class MailTests(TestCase):
email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
self.assertEqual(connection.send_messages([email, email, email]), 3) self.assertEqual(connection.send_messages([email, email, email]), 3)
def test_locmem(self):
"""
Make sure that the locmen backend populates the outbox.
"""
mail.outbox = []
connection = locmem.EmailBackend()
email1 = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
email2 = EmailMessage('Subject 2', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
connection.send_messages([email1, email2])
self.assertEqual(len(mail.outbox), 2)
self.assertEqual(mail.outbox[0].subject, 'Subject')
self.assertEqual(mail.outbox[1].subject, 'Subject 2')
# Make sure that multiple locmem connections share mail.outbox
mail.outbox = []
connection2 = locmem.EmailBackend()
email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
connection.send_messages([email])
connection2.send_messages([email])
self.assertEqual(len(mail.outbox), 2)
def test_file_backend(self):
tmp_dir = tempfile.mkdtemp()
connection = filebased.EmailBackend(file_path=tmp_dir)
email1 = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
connection.send_messages([email1])
self.assertEqual(len(os.listdir(tmp_dir)), 1)
message = email.message_from_file(open(os.path.join(tmp_dir, os.listdir(tmp_dir)[0])))
self.assertEqual(message.get_content_type(), 'text/plain')
self.assertEqual(message.get('subject'), 'Subject')
self.assertEqual(message.get('from'), 'from@example.com')
self.assertEqual(message.get('to'), 'to@example.com')
connection2 = filebased.EmailBackend(file_path=tmp_dir)
connection2.send_messages([email1])
self.assertEqual(len(os.listdir(tmp_dir)), 2)
connection.send_messages([email1])
self.assertEqual(len(os.listdir(tmp_dir)), 2)
email1.connection = filebased.EmailBackend(file_path=tmp_dir)
connection_created = connection.open()
email1.send()
self.assertEqual(len(os.listdir(tmp_dir)), 3)
email1.send()
self.assertEqual(len(os.listdir(tmp_dir)), 3)
connection.close()
shutil.rmtree(tmp_dir)
def test_arbitrary_keyword(self): def test_arbitrary_keyword(self):
""" """
Make sure that get_connection() accepts arbitrary keyword that might be Make sure that get_connection() accepts arbitrary keyword that might be
@ -289,144 +239,392 @@ class MailTests(TestCase):
self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.dummy.EmailBackend'), dummy.EmailBackend)) self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.dummy.EmailBackend'), dummy.EmailBackend))
self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.console.EmailBackend'), console.EmailBackend)) self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.console.EmailBackend'), console.EmailBackend))
tmp_dir = tempfile.mkdtemp() tmp_dir = tempfile.mkdtemp()
try:
self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend)) self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend))
finally:
shutil.rmtree(tmp_dir) shutil.rmtree(tmp_dir)
self.assertTrue(isinstance(mail.get_connection(), locmem.EmailBackend)) self.assertTrue(isinstance(mail.get_connection(), locmem.EmailBackend))
@with_django_settings(
EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend',
ADMINS=[('nobody', 'nobody@example.com')],
MANAGERS=[('nobody', 'nobody@example.com')])
def test_connection_arg(self): def test_connection_arg(self):
"""Test connection argument to send_mail(), et. al.""" """Test connection argument to send_mail(), et. al."""
connection = mail.get_connection('django.core.mail.backends.locmem.EmailBackend')
mail.outbox = [] mail.outbox = []
# Send using non-default connection
connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
self.assertEqual(len(mail.outbox), 1) self.assertEqual(mail.outbox, [])
message = mail.outbox[0] self.assertEqual(len(connection.test_outbox), 1)
self.assertEqual(message.subject, 'Subject') self.assertEqual(connection.test_outbox[0].subject, 'Subject')
self.assertEqual(message.from_email, 'from@example.com')
self.assertEqual(message.to, ['to@example.com'])
mail.outbox = [] connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
send_mass_mail([ send_mass_mail([
('Subject1', 'Content1', 'from1@example.com', ['to1@example.com']), ('Subject1', 'Content1', 'from1@example.com', ['to1@example.com']),
('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']) ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']),
], connection=connection) ], connection=connection)
self.assertEqual(len(mail.outbox), 2) self.assertEqual(mail.outbox, [])
message = mail.outbox[0] self.assertEqual(len(connection.test_outbox), 2)
self.assertEqual(message.subject, 'Subject1') self.assertEqual(connection.test_outbox[0].subject, 'Subject1')
self.assertEqual(message.from_email, 'from1@example.com') self.assertEqual(connection.test_outbox[1].subject, 'Subject2')
self.assertEqual(message.to, ['to1@example.com'])
message = mail.outbox[1]
self.assertEqual(message.subject, 'Subject2')
self.assertEqual(message.from_email, 'from2@example.com')
self.assertEqual(message.to, ['to2@example.com'])
old_admins = settings.ADMINS connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
old_managers = settings.MANAGERS mail_admins('Admin message', 'Content', connection=connection)
settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')] self.assertEqual(mail.outbox, [])
self.assertEqual(len(connection.test_outbox), 1)
self.assertEqual(connection.test_outbox[0].subject, '[Django] Admin message')
mail.outbox = [] connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
mail_admins('Subject', 'Content', connection=connection) mail_managers('Manager message', 'Content', connection=connection)
self.assertEqual(len(mail.outbox), 1) self.assertEqual(mail.outbox, [])
message = mail.outbox[0] self.assertEqual(len(connection.test_outbox), 1)
self.assertEqual(message.subject, '[Django] Subject') self.assertEqual(connection.test_outbox[0].subject, '[Django] Manager message')
self.assertEqual(message.from_email, 'root@localhost')
self.assertEqual(message.to, ['nobody@example.com'])
mail.outbox = []
mail_managers('Subject', 'Content', connection=connection)
self.assertEqual(len(mail.outbox), 1)
message = mail.outbox[0]
self.assertEqual(message.subject, '[Django] Subject')
self.assertEqual(message.from_email, 'root@localhost')
self.assertEqual(message.to, ['nobody@example.com'])
settings.ADMINS = old_admins class BaseEmailBackendTests(object):
settings.MANAGERS = old_managers email_backend = None
def test_mail_prefix(self): def setUp(self):
"""Test prefix argument in manager/admin mail.""" self.__settings_state = alter_django_settings(EMAIL_BACKEND=self.email_backend)
# Regression for #13494.
old_admins = settings.ADMINS
old_managers = settings.MANAGERS
settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')]
mail_managers(ugettext_lazy('Subject'), 'Content') def tearDown(self):
self.assertEqual(len(mail.outbox), 1) restore_django_settings(self.__settings_state)
message = mail.outbox[0]
self.assertEqual(message.subject, '[Django] Subject')
mail.outbox = [] def assertStartsWith(self, first, second):
mail_admins(ugettext_lazy('Subject'), 'Content') if not first.startswith(second):
self.assertEqual(len(mail.outbox), 1) self.longMessage = True
message = mail.outbox[0] self.assertEqual(first[:len(second)], second, "First string doesn't start with the second.")
self.assertEqual(message.subject, '[Django] Subject')
settings.ADMINS = old_admins def get_mailbox_content(self):
settings.MANAGERS = old_managers raise NotImplementedError
def test_html_mail_admins(self): def flush_mailbox(self):
"""Test html_message argument to mail_admins and mail_managers""" raise NotImplementedError
old_admins = settings.ADMINS
settings.ADMINS = [('nobody','nobody@example.com')]
mail.outbox = [] def get_the_message(self):
mail_admins('Subject', 'Content', html_message='HTML Content') mailbox = self.get_mailbox_content()
self.assertEqual(len(mail.outbox), 1) self.assertEqual(len(mailbox), 1,
message = mail.outbox[0] "Expected exactly one message, got %d.\n%r" % (len(mailbox), [
self.assertEqual(message.subject, '[Django] Subject') m.as_string() for m in mailbox]))
self.assertEqual(message.body, 'Content') return mailbox[0]
self.assertEqual(message.alternatives, [('HTML Content', 'text/html')])
settings.ADMINS = old_admins def test_send(self):
email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'])
num_sent = mail.get_connection().send_messages([email])
self.assertEqual(num_sent, 1)
message = self.get_the_message()
self.assertEqual(message["subject"], "Subject")
self.assertEqual(message.get_payload(), "Content")
self.assertEqual(message["from"], "from@example.com")
self.assertEqual(message.get_all("to"), ["to@example.com"])
def test_send_many(self):
email1 = EmailMessage('Subject', 'Content1', 'from@example.com', ['to@example.com'])
email2 = EmailMessage('Subject', 'Content2', 'from@example.com', ['to@example.com'])
num_sent = mail.get_connection().send_messages([email1, email2])
self.assertEqual(num_sent, 2)
messages = self.get_mailbox_content()
self.assertEquals(len(messages), 2)
self.assertEqual(messages[0].get_payload(), "Content1")
self.assertEqual(messages[1].get_payload(), "Content2")
def test_send_verbose_name(self):
email = EmailMessage("Subject", "Content", '"Firstname Sürname" <from@example.com>',
["to@example.com"])
email.send()
message = self.get_the_message()
self.assertEqual(message["subject"], "Subject")
self.assertEqual(message.get_payload(), "Content")
self.assertEqual(message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= <from@example.com>")
@with_django_settings(MANAGERS=[('nobody', 'nobody@example.com')])
def test_html_mail_managers(self): def test_html_mail_managers(self):
"""Test html_message argument to mail_admins and mail_managers""" """Test html_message argument to mail_managers"""
old_managers = settings.MANAGERS
settings.MANAGERS = [('nobody','nobody@example.com')]
mail.outbox = []
mail_managers('Subject', 'Content', html_message='HTML Content') mail_managers('Subject', 'Content', html_message='HTML Content')
self.assertEqual(len(mail.outbox), 1) message = self.get_the_message()
message = mail.outbox[0]
self.assertEqual(message.subject, '[Django] Subject')
self.assertEqual(message.body, 'Content')
self.assertEqual(message.alternatives, [('HTML Content', 'text/html')])
settings.MANAGERS = old_managers self.assertEqual(message.get('subject'), '[Django] Subject')
self.assertEqual(message.get_all('to'), ['nobody@example.com'])
self.assertTrue(message.is_multipart())
self.assertEqual(len(message.get_payload()), 2)
self.assertEqual(message.get_payload(0).get_payload(), 'Content')
self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain')
self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content')
self.assertEqual(message.get_payload(1).get_content_type(), 'text/html')
def test_idn_validation(self): @with_django_settings(ADMINS=[('nobody', 'nobody@example.com')])
"""Test internationalized email adresses""" def test_html_mail_admins(self):
# Regression for #14301. """Test html_message argument to mail_admins """
mail_admins('Subject', 'Content', html_message='HTML Content')
message = self.get_the_message()
self.assertEqual(message.get('subject'), '[Django] Subject')
self.assertEqual(message.get_all('to'), ['nobody@example.com'])
self.assertTrue(message.is_multipart())
self.assertEqual(len(message.get_payload()), 2)
self.assertEqual(message.get_payload(0).get_payload(), 'Content')
self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain')
self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content')
self.assertEqual(message.get_payload(1).get_content_type(), 'text/html')
@with_django_settings(ADMINS=[('nobody', 'nobody+admin@example.com')],
MANAGERS=[('nobody', 'nobody+manager@example.com')])
def test_manager_and_admin_mail_prefix(self):
"""
String prefix + lazy translated subject = bad output
Regression for #13494
"""
mail_managers(ugettext_lazy('Subject'), 'Content')
message = self.get_the_message()
self.assertEqual(message.get('subject'), '[Django] Subject')
self.flush_mailbox()
mail_admins(ugettext_lazy('Subject'), 'Content')
message = self.get_the_message()
self.assertEqual(message.get('subject'), '[Django] Subject')
@with_django_settings(ADMINS=(), MANAGERS=())
def test_empty_admins(self):
"""
Test that mail_admins/mail_managers doesn't connect to the mail server
if there are no recipients (#9383)
"""
mail_admins('hi', 'there')
self.assertEqual(self.get_mailbox_content(), [])
mail_managers('hi', 'there')
self.assertEqual(self.get_mailbox_content(), [])
def test_message_cc_header(self):
"""
Regression test for #7722
"""
email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com'])
mail.get_connection().send_messages([email])
message = self.get_the_message()
self.assertStartsWith(message.as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: ')
def test_idn_send(self):
"""
Regression test for #14301
"""
self.assertTrue(send_mail('Subject', 'Content', 'from@öäü.com', [u'to@öäü.com']))
message = self.get_the_message()
self.assertEqual(message.get('subject'), 'Subject')
self.assertEqual(message.get('from'), 'from@xn--4ca9at.com')
self.assertEqual(message.get('to'), 'to@xn--4ca9at.com')
self.flush_mailbox()
m = EmailMessage('Subject', 'Content', 'from@öäü.com',
[u'to@öäü.com'], cc=[u'cc@öäü.com'])
m.send()
message = self.get_the_message()
self.assertEqual(message.get('subject'), 'Subject')
self.assertEqual(message.get('from'), 'from@xn--4ca9at.com')
self.assertEqual(message.get('to'), 'to@xn--4ca9at.com')
self.assertEqual(message.get('cc'), 'cc@xn--4ca9at.com')
def test_recipient_without_domain(self):
"""
Regression test for #15042
"""
self.assertTrue(send_mail("Subject", "Content", "tester", ["django"]))
message = self.get_the_message()
self.assertEqual(message.get('subject'), 'Subject')
self.assertEqual(message.get('from'), "tester")
self.assertEqual(message.get('to'), "django")
class LocmemBackendTests(BaseEmailBackendTests, TestCase):
email_backend = 'django.core.mail.backends.locmem.EmailBackend'
def get_mailbox_content(self):
return [m.message() for m in mail.outbox]
def flush_mailbox(self):
mail.outbox = [] mail.outbox = []
from_email = u'fröm@öäü.com'
to_email = u'tö@öäü.com'
connection = mail.get_connection('django.core.mail.backends.locmem.EmailBackend')
send_mail('Subject', 'Content', from_email, [to_email], connection=connection)
self.assertEqual(len(mail.outbox), 1)
message = mail.outbox[0]
self.assertEqual(message.subject, 'Subject')
self.assertEqual(message.from_email, from_email)
self.assertEqual(message.to, [to_email])
self.assertTrue(message.message().as_string().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: =?utf-8?b?ZnLDtm1Aw7bDpMO8LmNvbQ==?=\nTo: =?utf-8?b?dMO2QMO2w6TDvC5jb20=?='))
def test_idn_smtp_send(self): def tearDown(self):
import smtplib super(LocmemBackendTests, self).tearDown()
smtplib.SMTP = MockSMTP mail.outbox = []
from_email = u'fröm@öäü.com'
to_email = u'tö@öäü.com'
connection = mail.get_connection('django.core.mail.backends.smtp.EmailBackend')
self.assertTrue(send_mail('Subject', 'Content', from_email, [to_email], connection=connection))
class MockSMTP(object): def test_locmem_shared_messages(self):
def __init__(self, host='', port=0, local_hostname=None, """
timeout=1): Make sure that the locmen backend populates the outbox.
pass """
connection = locmem.EmailBackend()
connection2 = locmem.EmailBackend()
email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
connection.send_messages([email])
connection2.send_messages([email])
self.assertEqual(len(mail.outbox), 2)
def sendmail(self, from_addr, to_addrs, msg, mail_options=[],
rcpt_options=[]):
for addr in to_addrs:
str(addr.split('@', 1)[-1])
return {}
def quit(self): class FileBackendTests(BaseEmailBackendTests, TestCase):
return 0 email_backend = 'django.core.mail.backends.filebased.EmailBackend'
def setUp(self):
super(FileBackendTests, self).setUp()
self.tmp_dir = tempfile.mkdtemp()
self.__settings_state = alter_django_settings(EMAIL_FILE_PATH=self.tmp_dir)
def tearDown(self):
restore_django_settings(self.__settings_state)
shutil.rmtree(self.tmp_dir)
super(FileBackendTests, self).tearDown()
def flush_mailbox(self):
for filename in os.listdir(self.tmp_dir):
os.unlink(os.path.join(self.tmp_dir, filename))
def get_mailbox_content(self):
messages = []
for filename in os.listdir(self.tmp_dir):
session = open(os.path.join(self.tmp_dir, filename)).read().split('\n' + ('-' * 79) + '\n')
messages.extend(email.message_from_string(m) for m in session if m)
return messages
def test_file_sessions(self):
"""Make sure opening a connection creates a new file"""
msg = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
connection = mail.get_connection()
connection.send_messages([msg])
self.assertEqual(len(os.listdir(self.tmp_dir)), 1)
message = email.message_from_file(open(os.path.join(self.tmp_dir, os.listdir(self.tmp_dir)[0])))
self.assertEqual(message.get_content_type(), 'text/plain')
self.assertEqual(message.get('subject'), 'Subject')
self.assertEqual(message.get('from'), 'from@example.com')
self.assertEqual(message.get('to'), 'to@example.com')
connection2 = mail.get_connection()
connection2.send_messages([msg])
self.assertEqual(len(os.listdir(self.tmp_dir)), 2)
connection.send_messages([msg])
self.assertEqual(len(os.listdir(self.tmp_dir)), 2)
msg.connection = mail.get_connection()
self.assertTrue(connection.open())
msg.send()
self.assertEqual(len(os.listdir(self.tmp_dir)), 3)
msg.send()
self.assertEqual(len(os.listdir(self.tmp_dir)), 3)
class ConsoleBackendTests(BaseEmailBackendTests, TestCase):
email_backend = 'django.core.mail.backends.console.EmailBackend'
def setUp(self):
super(ConsoleBackendTests, self).setUp()
self.__stdout = sys.stdout
self.stream = sys.stdout = StringIO()
def tearDown(self):
del self.stream
sys.stdout = self.__stdout
del self.__stdout
super(ConsoleBackendTests, self).tearDown()
def flush_mailbox(self):
self.stream = sys.stdout = StringIO()
def get_mailbox_content(self):
messages = self.stream.getvalue().split('\n' + ('-' * 79) + '\n')
return [email.message_from_string(m) for m in messages if m]
def test_console_stream_kwarg(self):
"""
Test that the console backend can be pointed at an arbitrary stream.
"""
s = StringIO()
connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s)
send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: '))
class FakeSMTPServer(smtpd.SMTPServer, threading.Thread):
"""
Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from:
http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup
"""
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self)
smtpd.SMTPServer.__init__(self, *args, **kwargs)
self._sink = []
self.active = False
self.active_lock = threading.Lock()
self.sink_lock = threading.Lock()
def process_message(self, peer, mailfrom, rcpttos, data):
m = email.message_from_string(data)
maddr = email.Utils.parseaddr(m.get('from'))[1]
if mailfrom != maddr:
return "553 '%s' != '%s'" % (mailfrom, maddr)
self.sink_lock.acquire()
self._sink.append(m)
self.sink_lock.release()
def get_sink(self):
self.sink_lock.acquire()
try:
return self._sink[:]
finally:
self.sink_lock.release()
def flush_sink(self):
self.sink_lock.acquire()
self._sink[:] = []
self.sink_lock.release()
def start(self):
assert not self.active
self.__flag = threading.Event()
threading.Thread.start(self)
self.__flag.wait()
def run(self):
self.active = True
self.__flag.set()
while self.active and asyncore.socket_map:
self.active_lock.acquire()
asyncore.loop(timeout=0.1, count=1)
self.active_lock.release()
asyncore.close_all()
def stop(self):
assert self.active
self.active = False
self.join()
class SMTPBackendTests(BaseEmailBackendTests, TestCase):
email_backend = 'django.core.mail.backends.smtp.EmailBackend'
@classmethod
def setUpClass(cls):
cls.server = FakeSMTPServer(('127.0.0.1', 0), None)
cls.settings = alter_django_settings(
EMAIL_HOST="127.0.0.1",
EMAIL_PORT=cls.server.socket.getsockname()[1])
cls.server.start()
@classmethod
def tearDownClass(cls):
cls.server.stop()
def setUp(self):
super(SMTPBackendTests, self).setUp()
self.server.flush_sink()
def tearDown(self):
self.server.flush_sink()
super(SMTPBackendTests, self).tearDown()
def flush_mailbox(self):
self.server.flush_sink()
def get_mailbox_content(self):
return self.server.get_sink()