336 lines
12 KiB
Python
336 lines
12 KiB
Python
import urllib
|
|
import sys
|
|
import os
|
|
try:
|
|
from cStringIO import StringIO
|
|
except ImportError:
|
|
from StringIO import StringIO
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import authenticate, login
|
|
from django.core.handlers.base import BaseHandler
|
|
from django.core.handlers.wsgi import WSGIRequest
|
|
from django.core.signals import got_request_exception
|
|
from django.http import SimpleCookie, HttpRequest
|
|
from django.template import TemplateDoesNotExist
|
|
from django.test import signals
|
|
from django.utils.functional import curry
|
|
from django.utils.encoding import smart_str
|
|
from django.utils.http import urlencode
|
|
from django.utils.itercompat import is_iterable
|
|
|
|
BOUNDARY = 'BoUnDaRyStRiNg'
|
|
MULTIPART_CONTENT = 'multipart/form-data; boundary=%s' % BOUNDARY
|
|
|
|
|
|
class FakePayload(object):
|
|
"""
|
|
A wrapper around StringIO that restricts what can be read since data from
|
|
the network can't be seeked and cannot be read outside of its content
|
|
length. This makes sure that views can't do anything under the test client
|
|
that wouldn't work in Real Life.
|
|
"""
|
|
def __init__(self, content):
|
|
self.__content = StringIO(content)
|
|
self.__len = len(content)
|
|
|
|
def read(self, num_bytes=None):
|
|
if num_bytes is None:
|
|
num_bytes = self.__len or 1
|
|
assert self.__len >= num_bytes, "Cannot read more than the available bytes from the HTTP incoming data."
|
|
content = self.__content.read(num_bytes)
|
|
self.__len -= num_bytes
|
|
return content
|
|
|
|
|
|
class ClientHandler(BaseHandler):
|
|
"""
|
|
A HTTP Handler that can be used for testing purposes.
|
|
Uses the WSGI interface to compose requests, but returns
|
|
the raw HttpResponse object
|
|
"""
|
|
def __call__(self, environ):
|
|
from django.conf import settings
|
|
from django.core import signals
|
|
|
|
# Set up middleware if needed. We couldn't do this earlier, because
|
|
# settings weren't available.
|
|
if self._request_middleware is None:
|
|
self.load_middleware()
|
|
|
|
signals.request_started.send(sender=self.__class__)
|
|
try:
|
|
request = WSGIRequest(environ)
|
|
response = self.get_response(request)
|
|
|
|
# Apply response middleware.
|
|
for middleware_method in self._response_middleware:
|
|
response = middleware_method(request, response)
|
|
response = self.apply_response_fixes(request, response)
|
|
finally:
|
|
signals.request_finished.send(sender=self.__class__)
|
|
|
|
return response
|
|
|
|
def store_rendered_templates(store, signal, sender, template, context, **kwargs):
|
|
"""
|
|
Stores templates and contexts that are rendered.
|
|
"""
|
|
store.setdefault('template',[]).append(template)
|
|
store.setdefault('context',[]).append(context)
|
|
|
|
def encode_multipart(boundary, data):
|
|
"""
|
|
Encodes multipart POST data from a dictionary of form values.
|
|
|
|
The key will be used as the form data name; the value will be transmitted
|
|
as content. If the value is a file, the contents of the file will be sent
|
|
as an application/octet-stream; otherwise, str(value) will be sent.
|
|
"""
|
|
lines = []
|
|
to_str = lambda s: smart_str(s, settings.DEFAULT_CHARSET)
|
|
|
|
# Not by any means perfect, but good enough for our purposes.
|
|
is_file = lambda thing: hasattr(thing, "read") and callable(thing.read)
|
|
|
|
# Each bit of the multipart form data could be either a form value or a
|
|
# file, or a *list* of form values and/or files. Remember that HTTP field
|
|
# names can be duplicated!
|
|
for (key, value) in data.items():
|
|
if is_file(value):
|
|
lines.extend(encode_file(boundary, key, value))
|
|
elif not isinstance(value, basestring) and is_iterable(value):
|
|
for item in value:
|
|
if is_file(item):
|
|
lines.extend(encode_file(boundary, key, item))
|
|
else:
|
|
lines.extend([
|
|
'--' + boundary,
|
|
'Content-Disposition: form-data; name="%s"' % to_str(key),
|
|
'',
|
|
to_str(item)
|
|
])
|
|
else:
|
|
lines.extend([
|
|
'--' + boundary,
|
|
'Content-Disposition: form-data; name="%s"' % to_str(key),
|
|
'',
|
|
to_str(value)
|
|
])
|
|
|
|
lines.extend([
|
|
'--' + boundary + '--',
|
|
'',
|
|
])
|
|
return '\r\n'.join(lines)
|
|
|
|
def encode_file(boundary, key, file):
|
|
to_str = lambda s: smart_str(s, settings.DEFAULT_CHARSET)
|
|
return [
|
|
'--' + boundary,
|
|
'Content-Disposition: form-data; name="%s"; filename="%s"' \
|
|
% (to_str(key), to_str(os.path.basename(file.name))),
|
|
'Content-Type: application/octet-stream',
|
|
'',
|
|
file.read()
|
|
]
|
|
|
|
class Client:
|
|
"""
|
|
A class that can act as a client for testing purposes.
|
|
|
|
It allows the user to compose GET and POST requests, and
|
|
obtain the response that the server gave to those requests.
|
|
The server Response objects are annotated with the details
|
|
of the contexts and templates that were rendered during the
|
|
process of serving the request.
|
|
|
|
Client objects are stateful - they will retain cookie (and
|
|
thus session) details for the lifetime of the Client instance.
|
|
|
|
This is not intended as a replacement for Twill/Selenium or
|
|
the like - it is here to allow testing against the
|
|
contexts and templates produced by a view, rather than the
|
|
HTML rendered to the end-user.
|
|
"""
|
|
def __init__(self, **defaults):
|
|
self.handler = ClientHandler()
|
|
self.defaults = defaults
|
|
self.cookies = SimpleCookie()
|
|
self.exc_info = None
|
|
|
|
def store_exc_info(self, **kwargs):
|
|
"""
|
|
Stores exceptions when they are generated by a view.
|
|
"""
|
|
self.exc_info = sys.exc_info()
|
|
|
|
def _session(self):
|
|
"""
|
|
Obtains the current session variables.
|
|
"""
|
|
if 'django.contrib.sessions' in settings.INSTALLED_APPS:
|
|
engine = __import__(settings.SESSION_ENGINE, {}, {}, [''])
|
|
cookie = self.cookies.get(settings.SESSION_COOKIE_NAME, None)
|
|
if cookie:
|
|
return engine.SessionStore(cookie.value)
|
|
return {}
|
|
session = property(_session)
|
|
|
|
def request(self, **request):
|
|
"""
|
|
The master request method. Composes the environment dictionary
|
|
and passes to the handler, returning the result of the handler.
|
|
Assumes defaults for the query environment, which can be overridden
|
|
using the arguments to the request.
|
|
"""
|
|
environ = {
|
|
'HTTP_COOKIE': self.cookies,
|
|
'PATH_INFO': '/',
|
|
'QUERY_STRING': '',
|
|
'REQUEST_METHOD': 'GET',
|
|
'SCRIPT_NAME': '',
|
|
'SERVER_NAME': 'testserver',
|
|
'SERVER_PORT': 80,
|
|
'SERVER_PROTOCOL': 'HTTP/1.1',
|
|
}
|
|
environ.update(self.defaults)
|
|
environ.update(request)
|
|
|
|
# Curry a data dictionary into an instance of the template renderer
|
|
# callback function.
|
|
data = {}
|
|
on_template_render = curry(store_rendered_templates, data)
|
|
signals.template_rendered.connect(on_template_render)
|
|
|
|
# Capture exceptions created by the handler.
|
|
got_request_exception.connect(self.store_exc_info)
|
|
|
|
try:
|
|
response = self.handler(environ)
|
|
except TemplateDoesNotExist, e:
|
|
# If the view raises an exception, Django will attempt to show
|
|
# the 500.html template. If that template is not available,
|
|
# we should ignore the error in favor of re-raising the
|
|
# underlying exception that caused the 500 error. Any other
|
|
# template found to be missing during view error handling
|
|
# should be reported as-is.
|
|
if e.args != ('500.html',):
|
|
raise
|
|
|
|
# Look for a signalled exception, clear the current context
|
|
# exception data, then re-raise the signalled exception.
|
|
# Also make sure that the signalled exception is cleared from
|
|
# the local cache!
|
|
if self.exc_info:
|
|
exc_info = self.exc_info
|
|
self.exc_info = None
|
|
raise exc_info[1], None, exc_info[2]
|
|
|
|
# Save the client and request that stimulated the response.
|
|
response.client = self
|
|
response.request = request
|
|
|
|
# Add any rendered template detail to the response.
|
|
# If there was only one template rendered (the most likely case),
|
|
# flatten the list to a single element.
|
|
for detail in ('template', 'context'):
|
|
if data.get(detail):
|
|
if len(data[detail]) == 1:
|
|
setattr(response, detail, data[detail][0]);
|
|
else:
|
|
setattr(response, detail, data[detail])
|
|
else:
|
|
setattr(response, detail, None)
|
|
|
|
# Update persistent cookie data.
|
|
if response.cookies:
|
|
self.cookies.update(response.cookies)
|
|
|
|
return response
|
|
|
|
def get(self, path, data={}, **extra):
|
|
"""
|
|
Requests a response from the server using GET.
|
|
"""
|
|
r = {
|
|
'CONTENT_LENGTH': None,
|
|
'CONTENT_TYPE': 'text/html; charset=utf-8',
|
|
'PATH_INFO': urllib.unquote(path),
|
|
'QUERY_STRING': urlencode(data, doseq=True),
|
|
'REQUEST_METHOD': 'GET',
|
|
}
|
|
r.update(extra)
|
|
|
|
return self.request(**r)
|
|
|
|
def post(self, path, data={}, content_type=MULTIPART_CONTENT, **extra):
|
|
"""
|
|
Requests a response from the server using POST.
|
|
"""
|
|
if content_type is MULTIPART_CONTENT:
|
|
post_data = encode_multipart(BOUNDARY, data)
|
|
else:
|
|
post_data = data
|
|
|
|
r = {
|
|
'CONTENT_LENGTH': len(post_data),
|
|
'CONTENT_TYPE': content_type,
|
|
'PATH_INFO': urllib.unquote(path),
|
|
'REQUEST_METHOD': 'POST',
|
|
'wsgi.input': FakePayload(post_data),
|
|
}
|
|
r.update(extra)
|
|
|
|
return self.request(**r)
|
|
|
|
def login(self, **credentials):
|
|
"""
|
|
Sets the Client to appear as if it has successfully logged into a site.
|
|
|
|
Returns True if login is possible; False if the provided credentials
|
|
are incorrect, or the user is inactive, or if the sessions framework is
|
|
not available.
|
|
"""
|
|
user = authenticate(**credentials)
|
|
if user and user.is_active \
|
|
and 'django.contrib.sessions' in settings.INSTALLED_APPS:
|
|
engine = __import__(settings.SESSION_ENGINE, {}, {}, [''])
|
|
|
|
# Create a fake request to store login details.
|
|
request = HttpRequest()
|
|
if self.session:
|
|
request.session = self.session
|
|
else:
|
|
request.session = engine.SessionStore()
|
|
login(request, user)
|
|
|
|
# Set the cookie to represent the session.
|
|
session_cookie = settings.SESSION_COOKIE_NAME
|
|
self.cookies[session_cookie] = request.session.session_key
|
|
cookie_data = {
|
|
'max-age': None,
|
|
'path': '/',
|
|
'domain': settings.SESSION_COOKIE_DOMAIN,
|
|
'secure': settings.SESSION_COOKIE_SECURE or None,
|
|
'expires': None,
|
|
}
|
|
self.cookies[session_cookie].update(cookie_data)
|
|
|
|
# Save the session values.
|
|
request.session.save()
|
|
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def logout(self):
|
|
"""
|
|
Removes the authenticated user's cookies.
|
|
|
|
Causes the authenticated user to be logged out.
|
|
"""
|
|
session = __import__(settings.SESSION_ENGINE, {}, {}, ['']).SessionStore()
|
|
session.delete(session_key=self.cookies[settings.SESSION_COOKIE_NAME].value)
|
|
self.cookies = SimpleCookie()
|