Fixed #27131 -- Passed proper string type to SMTP connection login

Passing an Unicode string on Python 2 was crashing the connection.
Thanks slavugan@gmail.com for the report, and Tim Graham for the review.
This commit is contained in:
Claude Paroz 2016-08-29 14:37:03 +02:00
parent 190d2ff4a7
commit fe252c0a5a
2 changed files with 54 additions and 5 deletions

View File

@ -7,6 +7,7 @@ from django.conf import settings
from django.core.mail.backends.base import BaseEmailBackend from django.core.mail.backends.base import BaseEmailBackend
from django.core.mail.message import sanitize_address from django.core.mail.message import sanitize_address
from django.core.mail.utils import DNS_NAME from django.core.mail.utils import DNS_NAME
from django.utils.encoding import force_str
class EmailBackend(BaseEmailBackend): class EmailBackend(BaseEmailBackend):
@ -34,6 +35,10 @@ class EmailBackend(BaseEmailBackend):
self.connection = None self.connection = None
self._lock = threading.RLock() self._lock = threading.RLock()
@property
def connection_class(self):
return smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP
def open(self): def open(self):
""" """
Ensures we have a connection to the email server. Returns whether or Ensures we have a connection to the email server. Returns whether or
@ -43,7 +48,6 @@ class EmailBackend(BaseEmailBackend):
# Nothing to do if the connection is already open. # Nothing to do if the connection is already open.
return False return False
connection_class = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP
# If local_hostname is not specified, socket.getfqdn() gets used. # If local_hostname is not specified, socket.getfqdn() gets used.
# For performance, we use the cached FQDN for local_hostname. # For performance, we use the cached FQDN for local_hostname.
connection_params = {'local_hostname': DNS_NAME.get_fqdn()} connection_params = {'local_hostname': DNS_NAME.get_fqdn()}
@ -55,7 +59,7 @@ class EmailBackend(BaseEmailBackend):
'certfile': self.ssl_certfile, 'certfile': self.ssl_certfile,
}) })
try: try:
self.connection = connection_class(self.host, self.port, **connection_params) self.connection = self.connection_class(self.host, self.port, **connection_params)
# TLS/SSL are mutually exclusive, so only attempt TLS over # TLS/SSL are mutually exclusive, so only attempt TLS over
# non-secure connections. # non-secure connections.
@ -64,7 +68,7 @@ class EmailBackend(BaseEmailBackend):
self.connection.starttls(keyfile=self.ssl_keyfile, certfile=self.ssl_certfile) self.connection.starttls(keyfile=self.ssl_keyfile, certfile=self.ssl_certfile)
self.connection.ehlo() self.connection.ehlo()
if self.username and self.password: if self.username and self.password:
self.connection.login(self.username, self.password) self.connection.login(force_str(self.username), force_str(self.password))
return True return True
except smtplib.SMTPException: except smtplib.SMTPException:
if not self.fail_silently: if not self.fail_silently:

View File

@ -2,6 +2,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import asyncore import asyncore
import base64
import mimetypes import mimetypes
import os import os
import shutil import shutil
@ -11,7 +12,7 @@ import tempfile
import threading import threading
from email.header import Header from email.header import Header
from email.mime.text import MIMEText from email.mime.text import MIMEText
from smtplib import SMTP, SMTPException from smtplib import SMTP, SMTPAuthenticationError, SMTPException
from ssl import SSLError from ssl import SSLError
from django.core import mail from django.core import mail
@ -1115,12 +1116,21 @@ class FakeSMTPChannel(smtpd.SMTPChannel):
def collect_incoming_data(self, data): def collect_incoming_data(self, data):
try: try:
super(FakeSMTPChannel, self).collect_incoming_data(data) smtpd.SMTPChannel.collect_incoming_data(self, data)
except UnicodeDecodeError: except UnicodeDecodeError:
# ignore decode error in SSL/TLS connection tests as we only care # ignore decode error in SSL/TLS connection tests as we only care
# whether the connection attempt was made # whether the connection attempt was made
pass pass
def smtp_AUTH(self, arg):
if arg == 'CRAM-MD5':
# This is only the first part of the login process. But it's enough
# for our tests.
challenge = base64.b64encode(b'somerandomstring13579')
self.push(str('334 %s' % challenge.decode()))
else:
self.push(str('502 Error: login "%s" not implemented' % arg))
class FakeSMTPServer(smtpd.SMTPServer, threading.Thread): class FakeSMTPServer(smtpd.SMTPServer, threading.Thread):
""" """
@ -1140,6 +1150,15 @@ class FakeSMTPServer(smtpd.SMTPServer, threading.Thread):
self.active_lock = threading.Lock() self.active_lock = threading.Lock()
self.sink_lock = threading.Lock() self.sink_lock = threading.Lock()
if not PY3:
def handle_accept(self):
# copy of Python 2.7 smtpd.SMTPServer.handle_accept with hardcoded
# SMTPChannel replaced by self.channel_class
pair = self.accept()
if pair is not None:
conn, addr = pair
self.channel_class(self, conn, addr)
def process_message(self, peer, mailfrom, rcpttos, data): def process_message(self, peer, mailfrom, rcpttos, data):
if PY3: if PY3:
data = data.encode('utf-8') data = data.encode('utf-8')
@ -1187,6 +1206,20 @@ class FakeSMTPServer(smtpd.SMTPServer, threading.Thread):
self.join() self.join()
class FakeAUTHSMTPConnection(SMTP):
"""
A SMTP connection pretending support for the AUTH command. It does not, but
at least this can allow testing the first part of the AUTH process.
"""
def ehlo(self, name=''):
response = SMTP.ehlo(self, name=name)
self.esmtp_features.update({
'auth': 'CRAM-MD5 PLAIN LOGIN',
})
return response
class SMTPBackendTestsBase(SimpleTestCase): class SMTPBackendTestsBase(SimpleTestCase):
@classmethod @classmethod
@ -1270,6 +1303,18 @@ class SMTPBackendTests(BaseEmailBackendTests, SMTPBackendTestsBase):
backend.close() backend.close()
self.assertTrue(opened) self.assertTrue(opened)
def test_server_login(self):
"""
Even if the Python SMTP server doesn't support authentication, the
login process starts and the appropriate exception is raised.
"""
class CustomEmailBackend(smtp.EmailBackend):
connection_class = FakeAUTHSMTPConnection
backend = CustomEmailBackend(username='username', password='password')
with self.assertRaises(SMTPAuthenticationError):
backend.open()
@override_settings(EMAIL_USE_TLS=True) @override_settings(EMAIL_USE_TLS=True)
def test_email_tls_use_settings(self): def test_email_tls_use_settings(self):
backend = smtp.EmailBackend() backend = smtp.EmailBackend()