django1/django/core/handlers/wsgi.py

226 lines
7.9 KiB
Python
Raw Normal View History

import cgi
import codecs
import re
from io import BytesIO
from django import http
from django.conf import settings
from django.core import signals
from django.core.handlers import base
from django.urls import set_script_prefix
from django.utils.encoding import force_text, repercent_broken_unicode
from django.utils.functional import cached_property
_slashes_re = re.compile(br'/+')
class LimitedStream:
'''
LimitedStream wraps another stream in order to not allow reading from it
past specified amount of bytes.
'''
def __init__(self, stream, limit, buf_size=64 * 1024 * 1024):
self.stream = stream
self.remaining = limit
self.buffer = b''
self.buf_size = buf_size
def _read_limited(self, size=None):
if size is None or size > self.remaining:
size = self.remaining
if size == 0:
return b''
result = self.stream.read(size)
self.remaining -= len(result)
return result
def read(self, size=None):
if size is None:
result = self.buffer + self._read_limited()
self.buffer = b''
elif size < len(self.buffer):
result = self.buffer[:size]
self.buffer = self.buffer[size:]
2013-11-03 05:02:56 +08:00
else: # size >= len(self.buffer)
result = self.buffer + self._read_limited(size - len(self.buffer))
self.buffer = b''
return result
def readline(self, size=None):
while b'\n' not in self.buffer and \
(size is None or len(self.buffer) < size):
if size:
# since size is not None here, len(self.buffer) < size
chunk = self._read_limited(size - len(self.buffer))
else:
chunk = self._read_limited()
if not chunk:
break
self.buffer += chunk
sio = BytesIO(self.buffer)
if size:
line = sio.readline(size)
else:
line = sio.readline()
self.buffer = sio.read()
return line
class WSGIRequest(http.HttpRequest):
def __init__(self, environ):
script_name = get_script_name(environ)
path_info = get_path_info(environ)
if not path_info:
# Sometimes PATH_INFO exists, but is empty (e.g. accessing
# the SCRIPT_NAME URL without a trailing slash). We really need to
# operate as if they'd requested '/'. Not amazingly nice to force
# the path like this, but should be harmless.
path_info = '/'
self.environ = environ
self.path_info = path_info
# be careful to only replace the first slash in the path because of
# http://test/something and http://test//something being different as
# stated in http://www.ietf.org/rfc/rfc2396.txt
self.path = '%s/%s' % (script_name.rstrip('/'),
path_info.replace('/', '', 1))
self.META = environ
self.META['PATH_INFO'] = path_info
self.META['SCRIPT_NAME'] = script_name
self.method = environ['REQUEST_METHOD'].upper()
self.content_type, self.content_params = cgi.parse_header(environ.get('CONTENT_TYPE', ''))
if 'charset' in self.content_params:
try:
codecs.lookup(self.content_params['charset'])
except LookupError:
pass
else:
self.encoding = self.content_params['charset']
self._post_parse_error = False
try:
2013-09-07 23:25:16 +08:00
content_length = int(environ.get('CONTENT_LENGTH'))
except (ValueError, TypeError):
content_length = 0
self._stream = LimitedStream(self.environ['wsgi.input'], content_length)
self._read_started = False
self.resolver_match = None
def _get_scheme(self):
return self.environ.get('wsgi.url_scheme')
@cached_property
def GET(self):
# The WSGI spec says 'QUERY_STRING' may be absent.
raw_query_string = get_bytes_from_wsgi(self.environ, 'QUERY_STRING', '')
return http.QueryDict(raw_query_string, encoding=self._encoding)
def _get_post(self):
if not hasattr(self, '_post'):
self._load_post_and_files()
return self._post
def _set_post(self, post):
self._post = post
@cached_property
def COOKIES(self):
raw_cookie = get_str_from_wsgi(self.environ, 'HTTP_COOKIE', '')
return http.parse_cookie(raw_cookie)
@property
def FILES(self):
if not hasattr(self, '_files'):
self._load_post_and_files()
return self._files
POST = property(_get_post, _set_post)
class WSGIHandler(base.BaseHandler):
request_class = WSGIRequest
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.load_middleware()
def __call__(self, environ, start_response):
set_script_prefix(get_script_name(environ))
signals.request_started.send(sender=self.__class__, environ=environ)
request = self.request_class(environ)
response = self.get_response(request)
response._handler_class = self.__class__
status = '%d %s' % (response.status_code, response.reason_phrase)
response_headers = list(response.items())
for c in response.cookies.values():
response_headers.append(('Set-Cookie', c.output(header='')))
start_response(status, response_headers)
if getattr(response, 'file_to_stream', None) is not None and environ.get('wsgi.file_wrapper'):
response = environ['wsgi.file_wrapper'](response.file_to_stream)
return response
def get_path_info(environ):
"""
Return the HTTP request's PATH_INFO as a string.
"""
path_info = get_bytes_from_wsgi(environ, 'PATH_INFO', '/')
return repercent_broken_unicode(path_info).decode()
def get_script_name(environ):
"""
Returns the equivalent of the HTTP request's SCRIPT_NAME environment
variable. If Apache mod_rewrite has been used, returns what would have been
the script name prior to any rewriting (so it's the script name as seen
from the client's perspective), unless the FORCE_SCRIPT_NAME setting is
set (to anything).
"""
if settings.FORCE_SCRIPT_NAME is not None:
return force_text(settings.FORCE_SCRIPT_NAME)
# If Apache's mod_rewrite had a whack at the URL, Apache set either
# SCRIPT_URL or REDIRECT_URL to the full resource URL before applying any
# rewrites. Unfortunately not every Web server (lighttpd!) passes this
# information through all the time, so FORCE_SCRIPT_NAME, above, is still
# needed.
script_url = get_bytes_from_wsgi(environ, 'SCRIPT_URL', '')
if not script_url:
script_url = get_bytes_from_wsgi(environ, 'REDIRECT_URL', '')
if script_url:
if b'//' in script_url:
# mod_wsgi squashes multiple successive slashes in PATH_INFO,
# do the same with script_url before manipulating paths (#17133).
script_url = _slashes_re.sub(b'/', script_url)
path_info = get_bytes_from_wsgi(environ, 'PATH_INFO', '')
script_name = script_url[:-len(path_info)] if path_info else script_url
else:
script_name = get_bytes_from_wsgi(environ, 'SCRIPT_NAME', '')
return script_name.decode()
def get_bytes_from_wsgi(environ, key, default):
"""
Get a value from the WSGI environ dictionary as bytes.
key and default should be strings.
"""
value = environ.get(key, default)
# Non-ASCII values in the WSGI environ are arbitrarily decoded with
# ISO-8859-1. This is wrong for Django websites where UTF-8 is the default.
# Re-encode to recover the original bytestring.
return value.encode('iso-8859-1')
def get_str_from_wsgi(environ, key, default):
"""
Get a value from the WSGI environ dictionary as str.
key and default should be str objects.
"""
value = get_bytes_from_wsgi(environ, key, default)
return value.decode(errors='replace')