mirror of https://github.com/django/django.git
231 lines
8.5 KiB
Python
231 lines
8.5 KiB
Python
from io import BytesIO
|
|
from socketserver import ThreadingMixIn
|
|
|
|
from django.core.handlers.wsgi import WSGIRequest
|
|
from django.core.servers.basehttp import WSGIRequestHandler, WSGIServer
|
|
from django.test import SimpleTestCase
|
|
from django.test.client import RequestFactory
|
|
from django.test.utils import captured_stderr
|
|
|
|
|
|
class Stub(ThreadingMixIn):
|
|
def __init__(self, **kwargs):
|
|
self.__dict__.update(kwargs)
|
|
|
|
def sendall(self, data):
|
|
self.makefile("wb").write(data)
|
|
|
|
|
|
class UnclosableBytesIO(BytesIO):
|
|
def close(self):
|
|
# WSGIRequestHandler closes the output file; we need to make this a
|
|
# no-op so we can still read its contents.
|
|
pass
|
|
|
|
|
|
class WSGIRequestHandlerTestCase(SimpleTestCase):
|
|
request_factory = RequestFactory()
|
|
|
|
def test_log_message(self):
|
|
request = WSGIRequest(self.request_factory.get("/").environ)
|
|
request.makefile = lambda *args, **kwargs: BytesIO()
|
|
handler = WSGIRequestHandler(request, "192.168.0.2", None)
|
|
level_status_codes = {
|
|
"info": [200, 301, 304],
|
|
"warning": [400, 403, 404],
|
|
"error": [500, 503],
|
|
}
|
|
for level, status_codes in level_status_codes.items():
|
|
for status_code in status_codes:
|
|
# The correct level gets the message.
|
|
with self.assertLogs("django.server", level.upper()) as cm:
|
|
handler.log_message("GET %s %s", "A", str(status_code))
|
|
self.assertIn("GET A %d" % status_code, cm.output[0])
|
|
# Incorrect levels don't have any messages.
|
|
for wrong_level in level_status_codes:
|
|
if wrong_level != level:
|
|
with self.assertLogs("django.server", "INFO") as cm:
|
|
handler.log_message("GET %s %s", "A", str(status_code))
|
|
self.assertNotEqual(
|
|
cm.records[0].levelname, wrong_level.upper()
|
|
)
|
|
|
|
def test_https(self):
|
|
request = WSGIRequest(self.request_factory.get("/").environ)
|
|
request.makefile = lambda *args, **kwargs: BytesIO()
|
|
|
|
handler = WSGIRequestHandler(request, "192.168.0.2", None)
|
|
|
|
with self.assertLogs("django.server", "ERROR") as cm:
|
|
handler.log_message("GET %s %s", "\x16\x03", "4")
|
|
self.assertEqual(
|
|
"You're accessing the development server over HTTPS, "
|
|
"but it only supports HTTP.",
|
|
cm.records[0].getMessage(),
|
|
)
|
|
|
|
def test_strips_underscore_headers(self):
|
|
"""WSGIRequestHandler ignores headers containing underscores.
|
|
|
|
This follows the lead of nginx and Apache 2.4, and is to avoid
|
|
ambiguity between dashes and underscores in mapping to WSGI environ,
|
|
which can have security implications.
|
|
"""
|
|
|
|
def test_app(environ, start_response):
|
|
"""A WSGI app that just reflects its HTTP environ."""
|
|
start_response("200 OK", [])
|
|
http_environ_items = sorted(
|
|
"%s:%s" % (k, v) for k, v in environ.items() if k.startswith("HTTP_")
|
|
)
|
|
yield (",".join(http_environ_items)).encode()
|
|
|
|
rfile = BytesIO()
|
|
rfile.write(b"GET / HTTP/1.0\r\n")
|
|
rfile.write(b"Some-Header: good\r\n")
|
|
rfile.write(b"Some_Header: bad\r\n")
|
|
rfile.write(b"Other_Header: bad\r\n")
|
|
rfile.seek(0)
|
|
|
|
wfile = UnclosableBytesIO()
|
|
|
|
def makefile(mode, *a, **kw):
|
|
if mode == "rb":
|
|
return rfile
|
|
elif mode == "wb":
|
|
return wfile
|
|
|
|
request = Stub(makefile=makefile)
|
|
server = Stub(base_environ={}, get_app=lambda: test_app)
|
|
|
|
# Prevent logging from appearing in test output.
|
|
with self.assertLogs("django.server", "INFO"):
|
|
# instantiating a handler runs the request as side effect
|
|
WSGIRequestHandler(request, "192.168.0.2", server)
|
|
|
|
wfile.seek(0)
|
|
body = list(wfile.readlines())[-1]
|
|
|
|
self.assertEqual(body, b"HTTP_SOME_HEADER:good")
|
|
|
|
def test_no_body_returned_for_head_requests(self):
|
|
hello_world_body = b"<!DOCTYPE html><html><body>Hello World</body></html>"
|
|
content_length = len(hello_world_body)
|
|
|
|
def test_app(environ, start_response):
|
|
"""A WSGI app that returns a hello world."""
|
|
start_response("200 OK", [])
|
|
return [hello_world_body]
|
|
|
|
rfile = BytesIO(b"GET / HTTP/1.0\r\n")
|
|
rfile.seek(0)
|
|
|
|
wfile = UnclosableBytesIO()
|
|
|
|
def makefile(mode, *a, **kw):
|
|
if mode == "rb":
|
|
return rfile
|
|
elif mode == "wb":
|
|
return wfile
|
|
|
|
request = Stub(makefile=makefile)
|
|
server = Stub(base_environ={}, get_app=lambda: test_app)
|
|
|
|
# Prevent logging from appearing in test output.
|
|
with self.assertLogs("django.server", "INFO"):
|
|
# Instantiating a handler runs the request as side effect.
|
|
WSGIRequestHandler(request, "192.168.0.2", server)
|
|
|
|
wfile.seek(0)
|
|
lines = list(wfile.readlines())
|
|
body = lines[-1]
|
|
# The body is returned in a GET response.
|
|
self.assertEqual(body, hello_world_body)
|
|
self.assertIn(f"Content-Length: {content_length}\r\n".encode(), lines)
|
|
self.assertNotIn(b"Connection: close\r\n", lines)
|
|
|
|
rfile = BytesIO(b"HEAD / HTTP/1.0\r\n")
|
|
rfile.seek(0)
|
|
wfile = UnclosableBytesIO()
|
|
|
|
with self.assertLogs("django.server", "INFO"):
|
|
WSGIRequestHandler(request, "192.168.0.2", server)
|
|
|
|
wfile.seek(0)
|
|
lines = list(wfile.readlines())
|
|
body = lines[-1]
|
|
# The body is not returned in a HEAD response.
|
|
self.assertEqual(body, b"\r\n")
|
|
self.assertIs(
|
|
any([line.startswith(b"Content-Length:") for line in lines]), False
|
|
)
|
|
self.assertNotIn(b"Connection: close\r\n", lines)
|
|
|
|
def test_non_zero_content_length_set_head_request(self):
|
|
hello_world_body = b"<!DOCTYPE html><html><body>Hello World</body></html>"
|
|
content_length = len(hello_world_body)
|
|
|
|
def test_app(environ, start_response):
|
|
"""
|
|
A WSGI app that returns a hello world with non-zero Content-Length.
|
|
"""
|
|
start_response("200 OK", [("Content-length", str(content_length))])
|
|
return [hello_world_body]
|
|
|
|
rfile = BytesIO(b"HEAD / HTTP/1.0\r\n")
|
|
rfile.seek(0)
|
|
|
|
wfile = UnclosableBytesIO()
|
|
|
|
def makefile(mode, *a, **kw):
|
|
if mode == "rb":
|
|
return rfile
|
|
elif mode == "wb":
|
|
return wfile
|
|
|
|
request = Stub(makefile=makefile)
|
|
server = Stub(base_environ={}, get_app=lambda: test_app)
|
|
|
|
# Prevent logging from appearing in test output.
|
|
with self.assertLogs("django.server", "INFO"):
|
|
# Instantiating a handler runs the request as side effect.
|
|
WSGIRequestHandler(request, "192.168.0.2", server)
|
|
|
|
wfile.seek(0)
|
|
lines = list(wfile.readlines())
|
|
body = lines[-1]
|
|
# The body is not returned in a HEAD response.
|
|
self.assertEqual(body, b"\r\n")
|
|
# Non-zero Content-Length is not removed.
|
|
self.assertEqual(lines[-2], f"Content-length: {content_length}\r\n".encode())
|
|
self.assertNotIn(b"Connection: close\r\n", lines)
|
|
|
|
|
|
class WSGIServerTestCase(SimpleTestCase):
|
|
request_factory = RequestFactory()
|
|
|
|
def test_broken_pipe_errors(self):
|
|
"""WSGIServer handles broken pipe errors."""
|
|
request = WSGIRequest(self.request_factory.get("/").environ)
|
|
client_address = ("192.168.2.0", 8080)
|
|
msg = f"- Broken pipe from {client_address}"
|
|
tests = [
|
|
BrokenPipeError,
|
|
ConnectionAbortedError,
|
|
ConnectionResetError,
|
|
]
|
|
for exception in tests:
|
|
with self.subTest(exception=exception):
|
|
try:
|
|
server = WSGIServer(("localhost", 0), WSGIRequestHandler)
|
|
try:
|
|
raise exception()
|
|
except Exception:
|
|
with captured_stderr() as err:
|
|
with self.assertLogs("django.server", "INFO") as cm:
|
|
server.handle_error(request, client_address)
|
|
self.assertEqual(err.getvalue(), "")
|
|
self.assertEqual(cm.records[0].getMessage(), msg)
|
|
finally:
|
|
server.server_close()
|