Refs #2333 - Added test framework. This includes doctest and unittest finders, Django-specific doctest and unittest wrappers, and a pseudo-client that can be used to stimulate and test views.
git-svn-id: http://code.djangoproject.com/svn/django/trunk@3658 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
parent
1a1fb70c9f
commit
7dce86ce02
|
@ -0,0 +1,208 @@
|
|||
from cStringIO import StringIO
|
||||
from django.contrib.admin.views.decorators import LOGIN_FORM_KEY, _encode_post_data
|
||||
from django.core.handlers.base import BaseHandler
|
||||
from django.core.handlers.wsgi import WSGIRequest
|
||||
from django.dispatch import dispatcher
|
||||
from django.http import urlencode, SimpleCookie
|
||||
from django.template import signals
|
||||
from django.utils.functional import curry
|
||||
|
||||
class ClientHandler(BaseHandler):
|
||||
"""
|
||||
A HTTP Handler that can be used for testing purposes.
|
||||
Uses the WSGI interface to compose requests, but returns
|
||||
the raw HttpResponse object
|
||||
"""
|
||||
def __call__(self, environ):
|
||||
from django.conf import settings
|
||||
from django.core import signals
|
||||
|
||||
# Set up middleware if needed. We couldn't do this earlier, because
|
||||
# settings weren't available.
|
||||
if self._request_middleware is None:
|
||||
self.load_middleware()
|
||||
|
||||
dispatcher.send(signal=signals.request_started)
|
||||
try:
|
||||
request = WSGIRequest(environ)
|
||||
response = self.get_response(request.path, request)
|
||||
|
||||
# Apply response middleware
|
||||
for middleware_method in self._response_middleware:
|
||||
response = middleware_method(request, response)
|
||||
|
||||
finally:
|
||||
dispatcher.send(signal=signals.request_finished)
|
||||
|
||||
return response
|
||||
|
||||
def store_rendered_templates(store, signal, sender, template, context):
|
||||
"A utility function for storing templates and contexts that are rendered"
|
||||
store.setdefault('template',[]).append(template)
|
||||
store.setdefault('context',[]).append(context)
|
||||
|
||||
def encode_multipart(boundary, data):
|
||||
"""
|
||||
A simple method for encoding multipart POST data from a dictionary of
|
||||
form values.
|
||||
|
||||
The key will be used as the form data name; the value will be transmitted
|
||||
as content. If the value is a file, the contents of the file will be sent
|
||||
as an application/octet-stream; otherwise, str(value) will be sent.
|
||||
"""
|
||||
lines = []
|
||||
for (key, value) in data.items():
|
||||
if isinstance(value, file):
|
||||
lines.extend([
|
||||
'--' + boundary,
|
||||
'Content-Disposition: form-data; name="%s"' % key,
|
||||
'',
|
||||
'--' + boundary,
|
||||
'Content-Disposition: form-data; name="%s_file"; filename="%s"' % (key, value.name),
|
||||
'Content-Type: application/octet-stream',
|
||||
'',
|
||||
value.read()
|
||||
])
|
||||
else:
|
||||
lines.extend([
|
||||
'--' + boundary,
|
||||
'Content-Disposition: form-data; name="%s"' % key,
|
||||
'',
|
||||
str(value)
|
||||
])
|
||||
|
||||
lines.extend([
|
||||
'--' + boundary + '--',
|
||||
'',
|
||||
])
|
||||
return '\r\n'.join(lines)
|
||||
|
||||
class Client:
|
||||
"""
|
||||
A class that can act as a client for testing purposes.
|
||||
|
||||
It allows the user to compose GET and POST requests, and
|
||||
obtain the response that the server gave to those requests.
|
||||
The server Response objects are annotated with the details
|
||||
of the contexts and templates that were rendered during the
|
||||
process of serving the request.
|
||||
|
||||
Client objects are stateful - they will retain cookie (and
|
||||
thus session) details for the lifetime of the Client instance.
|
||||
|
||||
This is not intended as a replacement for Twill/Selenium or
|
||||
the like - it is here to allow testing against the
|
||||
contexts and templates produced by a view, rather than the
|
||||
HTML rendered to the end-user.
|
||||
"""
|
||||
def __init__(self, **defaults):
|
||||
self.handler = TestHandler()
|
||||
self.defaults = defaults
|
||||
self.cookie = SimpleCookie()
|
||||
|
||||
def request(self, **request):
|
||||
"""
|
||||
The master request method. Composes the environment dictionary
|
||||
and passes to the handler, returning the result of the handler.
|
||||
Assumes defaults for the query environment, which can be overridden
|
||||
using the arguments to the request.
|
||||
"""
|
||||
|
||||
environ = {
|
||||
'HTTP_COOKIE': self.cookie,
|
||||
'PATH_INFO': '/',
|
||||
'QUERY_STRING': '',
|
||||
'REQUEST_METHOD': 'GET',
|
||||
'SCRIPT_NAME': None,
|
||||
'SERVER_NAME': 'testserver',
|
||||
'SERVER_PORT': 80,
|
||||
'SERVER_PROTOCOL': 'HTTP/1.1',
|
||||
}
|
||||
environ.update(self.defaults)
|
||||
environ.update(request)
|
||||
|
||||
# Curry a data dictionary into an instance of
|
||||
# the template renderer callback function
|
||||
data = {}
|
||||
on_template_render = curry(store_rendered_templates, data)
|
||||
dispatcher.connect(on_template_render, signal=signals.template_rendered)
|
||||
|
||||
response = self.handler(environ)
|
||||
|
||||
# Add any rendered template detail to the response
|
||||
# If there was only one template rendered (the most likely case),
|
||||
# flatten the list to a single element
|
||||
for detail in ('template', 'context'):
|
||||
if data.get(detail):
|
||||
if len(data[detail]) == 1:
|
||||
setattr(response, detail, data[detail][0]);
|
||||
else:
|
||||
setattr(response, detail, data[detail])
|
||||
else:
|
||||
setattr(response, detail, None)
|
||||
|
||||
if response.cookies:
|
||||
self.cookie.update(response.cookies)
|
||||
|
||||
return response
|
||||
|
||||
def get(self, path, data={}, **extra):
|
||||
"Request a response from the server using GET."
|
||||
r = {
|
||||
'CONTENT_LENGTH': None,
|
||||
'CONTENT_TYPE': 'text/html; charset=utf-8',
|
||||
'PATH_INFO': path,
|
||||
'QUERY_STRING': urlencode(data),
|
||||
'REQUEST_METHOD': 'GET',
|
||||
}
|
||||
r.update(extra)
|
||||
|
||||
return self.request(**r)
|
||||
|
||||
def post(self, path, data={}, **extra):
|
||||
"Request a response from the server using POST."
|
||||
|
||||
BOUNDARY = 'BoUnDaRyStRiNg'
|
||||
|
||||
encoded = encode_multipart(BOUNDARY, data)
|
||||
stream = StringIO(encoded)
|
||||
r = {
|
||||
'CONTENT_LENGTH': len(encoded),
|
||||
'CONTENT_TYPE': 'multipart/form-data; boundary=%s' % BOUNDARY,
|
||||
'PATH_INFO': path,
|
||||
'REQUEST_METHOD': 'POST',
|
||||
'wsgi.input': stream,
|
||||
}
|
||||
r.update(extra)
|
||||
|
||||
return self.request(**r)
|
||||
|
||||
def login(self, path, username, password, **extra):
|
||||
"""
|
||||
A specialized sequence of GET and POST to log into a view that
|
||||
is protected by @login_required or a similar access decorator.
|
||||
|
||||
path should be the URL of the login page, or of any page that
|
||||
is login protected.
|
||||
|
||||
Returns True if login was successful; False if otherwise.
|
||||
"""
|
||||
# First, GET the login page.
|
||||
# This is required to establish the session.
|
||||
response = self.get(path)
|
||||
if response.status_code != 200:
|
||||
return False
|
||||
|
||||
# Set up the block of form data required by the login page.
|
||||
form_data = {
|
||||
'username': username,
|
||||
'password': password,
|
||||
'this_is_the_login_form': 1,
|
||||
'post_data': _encode_post_data({LOGIN_FORM_KEY: 1})
|
||||
}
|
||||
response = self.post(path, data=form_data, **extra)
|
||||
|
||||
# login page should give response 200 (if you requested the login
|
||||
# page specifically), or 302 (if you requested a login
|
||||
# protected page, to which the login can redirect).
|
||||
return response.status_code in (200,302)
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,67 @@
|
|||
import unittest, doctest
|
||||
from django.conf import settings
|
||||
from django.core import management
|
||||
from django.test.utils import create_test_db, destroy_test_db
|
||||
from django.test.testcases import OutputChecker, DocTestRunner
|
||||
|
||||
# The module name for tests outside models.py
|
||||
TEST_MODULE = 'tests'
|
||||
|
||||
doctestOutputChecker = OutputChecker()
|
||||
|
||||
def build_suite(app_module):
|
||||
"Create a complete Django test suite for the provided application module"
|
||||
suite = unittest.TestSuite()
|
||||
|
||||
# Load unit and doctests in the models.py file
|
||||
suite.addTest(unittest.defaultTestLoader.loadTestsFromModule(app_module))
|
||||
try:
|
||||
suite.addTest(doctest.DocTestSuite(app_module,
|
||||
checker=doctestOutputChecker,
|
||||
runner=DocTestRunner))
|
||||
except ValueError:
|
||||
# No doc tests in models.py
|
||||
pass
|
||||
|
||||
# Check to see if a separate 'tests' module exists parallel to the
|
||||
# models module
|
||||
try:
|
||||
app_path = app_module.__name__.split('.')[:-1]
|
||||
test_module = __import__('.'.join(app_path + [TEST_MODULE]), [], [], TEST_MODULE)
|
||||
|
||||
suite.addTest(unittest.defaultTestLoader.loadTestsFromModule(test_module))
|
||||
try:
|
||||
suite.addTest(doctest.DocTestSuite(test_module,
|
||||
checker=doctestOutputChecker,
|
||||
runner=DocTestRunner))
|
||||
except ValueError:
|
||||
# No doc tests in tests.py
|
||||
pass
|
||||
except ImportError:
|
||||
# No tests.py file for application
|
||||
pass
|
||||
|
||||
return suite
|
||||
|
||||
def run_tests(module_list, verbosity=1, extra_tests=[]):
|
||||
"""
|
||||
Run the unit tests for all the modules in the provided list.
|
||||
This testrunner will search each of the modules in the provided list,
|
||||
looking for doctests and unittests in models.py or tests.py within
|
||||
the module. A list of 'extra' tests may also be provided; these tests
|
||||
will be added to the test suite.
|
||||
"""
|
||||
|
||||
settings.DEBUG = False
|
||||
suite = unittest.TestSuite()
|
||||
|
||||
for module in module_list:
|
||||
suite.addTest(build_suite(module))
|
||||
|
||||
for test in extra_tests:
|
||||
suite.addTest(test)
|
||||
|
||||
old_name = create_test_db(verbosity)
|
||||
management.syncdb(verbosity, interactive=False)
|
||||
unittest.TextTestRunner(verbosity=verbosity).run(suite)
|
||||
destroy_test_db(old_name, verbosity)
|
|
@ -0,0 +1,30 @@
|
|||
import re, doctest, unittest
|
||||
from django.db import transaction
|
||||
|
||||
normalize_long_ints = lambda s: re.sub(r'(?<![\w])(\d+)L(?![\w])', '\\1', s)
|
||||
|
||||
class OutputChecker(doctest.OutputChecker):
|
||||
def check_output(self, want, got, optionflags):
|
||||
ok = doctest.OutputChecker.check_output(self, want, got, optionflags)
|
||||
|
||||
# Doctest does an exact string comparison of output, which means long
|
||||
# integers aren't equal to normal integers ("22L" vs. "22"). The
|
||||
# following code normalizes long integers so that they equal normal
|
||||
# integers.
|
||||
if not ok:
|
||||
return normalize_long_ints(want) == normalize_long_ints(got)
|
||||
return ok
|
||||
|
||||
class DocTestRunner(doctest.DocTestRunner):
|
||||
def __init__(self, *args, **kwargs):
|
||||
doctest.DocTestRunner.__init__(self, *args, **kwargs)
|
||||
self.optionflags = doctest.ELLIPSIS
|
||||
|
||||
def report_unexpected_exception(self, out, test, example, exc_info):
|
||||
doctest.DocTestRunner.report_unexpected_exception(self,out,test,example,exc_info)
|
||||
|
||||
# Rollback, in case of database errors. Otherwise they'd have
|
||||
# side effects on other tests.
|
||||
from django.db import transaction
|
||||
transaction.rollback_unless_managed()
|
||||
|
|
@ -0,0 +1,78 @@
|
|||
import sys, time
|
||||
from django.conf import settings
|
||||
from django.db import connection, transaction
|
||||
|
||||
# The prefix to put on the default database name when creating
|
||||
# the test database.
|
||||
TEST_DATABASE_PREFIX = 'test_'
|
||||
|
||||
def _set_autocommit(connection):
|
||||
"Make sure a connection is in autocommit mode."
|
||||
if hasattr(connection.connection, "autocommit"):
|
||||
connection.connection.autocommit(True)
|
||||
elif hasattr(connection.connection, "set_isolation_level"):
|
||||
connection.connection.set_isolation_level(0)
|
||||
|
||||
def create_test_db(verbosity=1, autoclobber=False):
|
||||
if verbosity >= 1:
|
||||
print "Creating test database..."
|
||||
# If we're using SQLite, it's more convenient to test against an
|
||||
# in-memory database.
|
||||
if settings.DATABASE_ENGINE == "sqlite3":
|
||||
TEST_DATABASE_NAME = ":memory:"
|
||||
else:
|
||||
TEST_DATABASE_NAME = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
|
||||
|
||||
# Create the test database and connect to it. We need to autocommit
|
||||
# if the database supports it because PostgreSQL doesn't allow
|
||||
# CREATE/DROP DATABASE statements within transactions.
|
||||
cursor = connection.cursor()
|
||||
_set_autocommit(connection)
|
||||
try:
|
||||
cursor.execute("CREATE DATABASE %s" % TEST_DATABASE_NAME)
|
||||
except Exception, e:
|
||||
sys.stderr.write("Got an error creating the test database: %s\n" % e)
|
||||
if not autoclobber:
|
||||
confirm = raw_input("It appears the test database, %s, already exists. Type 'yes' to delete it, or 'no' to cancel: " % TEST_DATABASE_NAME)
|
||||
if autoclobber or confirm == 'yes':
|
||||
try:
|
||||
if verbosity >= 1:
|
||||
print "Destroying old test database..."
|
||||
cursor.execute("DROP DATABASE %s" % TEST_DATABASE_NAME)
|
||||
if verbosity >= 1:
|
||||
print "Creating test database..."
|
||||
cursor.execute("CREATE DATABASE %s" % TEST_DATABASE_NAME)
|
||||
except Exception, e:
|
||||
sys.stderr.write("Got an error recreating the test database: %s\n" % e)
|
||||
sys.exit(2)
|
||||
else:
|
||||
print "Tests cancelled."
|
||||
sys.exit(1)
|
||||
|
||||
connection.close()
|
||||
old_database_name = settings.DATABASE_NAME
|
||||
settings.DATABASE_NAME = TEST_DATABASE_NAME
|
||||
|
||||
# Get a cursor (even though we don't need one yet). This has
|
||||
# the side effect of initializing the test database.
|
||||
cursor = connection.cursor()
|
||||
|
||||
return old_database_name
|
||||
|
||||
def destroy_test_db(old_database_name, verbosity=1):
|
||||
# Unless we're using SQLite, remove the test database to clean up after
|
||||
# ourselves. Connect to the previous database (not the test database)
|
||||
# to do so, because it's not allowed to delete a database while being
|
||||
# connected to it.
|
||||
if verbosity >= 1:
|
||||
print "Destroying test database..."
|
||||
if settings.DATABASE_ENGINE != "sqlite3":
|
||||
connection.close()
|
||||
TEST_DATABASE_NAME = settings.DATABASE_NAME
|
||||
settings.DATABASE_NAME = old_database_name
|
||||
cursor = connection.cursor()
|
||||
_set_autocommit(connection)
|
||||
time.sleep(1) # To avoid "database is being accessed by other users" errors.
|
||||
cursor.execute("DROP DATABASE %s" % TEST_DATABASE_NAME)
|
||||
connection.close()
|
||||
|
Loading…
Reference in New Issue