Major refactoring of django.core.management -- it's now a package rather than a 1730-line single module. All django-admin/manage.py commands are now stored in separate modules. This is backwards-incompatible for people who used django.core.management functions directly

git-svn-id: http://code.djangoproject.com/svn/django/trunk@5898 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
Adrian Holovaty 2007-08-16 06:06:55 +00:00
parent 7f06e44f99
commit 01adbb55e6
39 changed files with 1972 additions and 1771 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,180 @@
import django
from optparse import OptionParser
import os
import sys
import textwrap
# For backwards compatibility: get_version() used to be in this module.
get_version = django.get_version
def load_command_class(name):
"""
Given a command name, returns the Command class instance. Raises
ImportError if it doesn't exist.
"""
# Let the ImportError propogate.
return getattr(__import__('django.core.management.commands.%s' % name, {}, {}, ['Command']), 'Command')()
def call_command(name, *args, **options):
"""
Calls the given command, with the given options and args/kwargs.
This is the primary API you should use for calling specific commands.
Some examples:
call_command('syncdb')
call_command('shell', plain=True)
call_command('sqlall', 'myapp')
"""
klass = load_command_class(name)
return klass.execute(*args, **options)
class ManagementUtility(object):
"""
Encapsulates the logic of the django-admin.py and manage.py utilities.
A ManagementUtility has a number of commands, which can be manipulated
by editing the self.commands dictionary.
"""
def __init__(self):
self.commands = self.default_commands()
def default_commands(self):
"""
Returns a dictionary of instances of all available Command classes.
This works by looking for and loading all Python modules in the
django.core.management.commands package.
The dictionary is in the format {name: command_instance}.
"""
command_dir = os.path.join(__path__[0], 'commands')
names = [f[:-3] for f in os.listdir(command_dir) if not f.startswith('_') and f.endswith('.py')]
return dict([(name, load_command_class(name)) for name in names])
def usage(self):
"""
Returns a usage string, for use with optparse.
The string doesn't include the options (e.g., "--verbose"), because
optparse puts those in automatically.
"""
usage = ["%prog command [options]\nactions:"]
commands = self.commands.items()
commands.sort()
for name, cmd in commands:
usage.append(' %s %s' % (name, cmd.args))
usage.extend(textwrap.wrap(cmd.help, initial_indent=' ', subsequent_indent=' '))
usage.append('')
return '\n'.join(usage[:-1]) # Cut off the last list element, an empty space.
def execute(self, argv=None):
"""
Parses the given argv from the command line, determines which command
to run and runs the command.
"""
if argv is None:
argv = sys.argv
# Create the parser object and parse the command-line args.
# TODO: Ideally each Command class would register its own options for
# add_option(), but we'd need to figure out how to allow for multiple
# Commands using the same options. The optparse library gets in the way
# by checking for conflicts:
# http://docs.python.org/lib/optparse-conflicts-between-options.html
parser = OptionParser(usage=self.usage(), version=get_version())
parser.add_option('--settings',
help='The Python path to a settings module, e.g. "myproject.settings.main". If this isn\'t provided, the DJANGO_SETTINGS_MODULE environment variable will be used.')
parser.add_option('--pythonpath',
help='A directory to add to the Python path, e.g. "/home/djangoprojects/myproject".')
parser.add_option('--plain', action='store_true', dest='plain',
help='When using "shell": Tells Django to use plain Python, not IPython.')
parser.add_option('--noinput', action='store_false', dest='interactive', default=True,
help='Tells Django to NOT prompt the user for input of any kind.')
parser.add_option('--noreload', action='store_false', dest='use_reloader', default=True,
help='When using "runserver": Tells Django to NOT use the auto-reloader.')
parser.add_option('--format', default='json', dest='format',
help='Specifies the output serialization format for fixtures')
parser.add_option('--indent', default=None, dest='indent',
type='int', help='Specifies the indent level to use when pretty-printing output')
parser.add_option('--verbosity', action='store', dest='verbosity', default='1',
type='choice', choices=['0', '1', '2'],
help='Verbosity level; 0=minimal output, 1=normal output, 2=all output')
parser.add_option('--adminmedia', dest='admin_media_path', default='',
help='When using "runserver": Specifies the directory from which to serve admin media.')
options, args = parser.parse_args(argv[1:])
# If the 'settings' or 'pythonpath' options were submitted, activate those.
if options.settings:
os.environ['DJANGO_SETTINGS_MODULE'] = options.settings
if options.pythonpath:
sys.path.insert(0, options.pythonpath)
# Run the appropriate command.
try:
command_name = args[0]
except IndexError:
sys.stderr.write("Type '%s --help' for usage.\n" % os.path.basename(argv[0]))
sys.exit(1)
try:
command = self.commands[command_name]
except KeyError:
sys.stderr.write("Unknown command: %r\nType '%s --help' for usage.\n" % (command_name, os.path.basename(argv[0])))
sys.exit(1)
command.execute(*args[1:], **options.__dict__)
class ProjectManagementUtility(ManagementUtility):
"""
A ManagementUtility that is specific to a particular Django project.
As such, its commands are slightly different than those of its parent
class.
In practice, this class represents manage.py, whereas ManagementUtility
represents django-admin.py.
"""
def __init__(self, project_directory):
super(ProjectManagementUtility, self).__init__()
# Remove the "startproject" command from self.commands, because
# that's a django-admin.py command, not a manage.py command.
del self.commands['startproject']
# Override the startapp command so that it always uses the
# project_directory, not the current working directory (which is default).
from django.core.management.commands.startapp import ProjectCommand
self.commands['startapp'] = ProjectCommand(project_directory)
def setup_environ(settings_mod):
"""
Configure the runtime environment. This can also be used by external
scripts wanting to set up a similar environment to manage.py.
"""
# Add this project to sys.path so that it's importable in the conventional
# way. For example, if this file (manage.py) lives in a directory
# "myproject", this code would add "/path/to/myproject" to sys.path.
project_directory, settings_filename = os.path.split(settings_mod.__file__)
project_name = os.path.basename(project_directory)
settings_name = os.path.splitext(settings_filename)[0]
sys.path.append(os.path.join(project_directory, '..'))
project_module = __import__(project_name, {}, {}, [''])
sys.path.pop()
# Set DJANGO_SETTINGS_MODULE appropriately.
os.environ['DJANGO_SETTINGS_MODULE'] = '%s.%s' % (project_name, settings_name)
return project_directory
def execute_from_command_line(argv=None):
"""
A simple method that runs a ManagementUtility.
"""
utility = ManagementUtility()
utility.execute(argv)
def execute_manager(settings_mod, argv=None):
"""
Like execute_from_command_line(), but for use by manage.py, a
project-specific django-admin.py utility.
"""
project_directory = setup_environ(settings_mod)
utility = ProjectManagementUtility(project_directory)
utility.execute(argv)

View File

@ -0,0 +1,131 @@
from django.core.exceptions import ImproperlyConfigured
from django.core.management.color import color_style
import sys
class CommandError(Exception):
pass
class BaseCommand(object):
# Metadata about this command.
help = ''
args = ''
# Configuration shortcuts that alter various logic.
can_import_settings = True
requires_model_validation = True
output_transaction = False # Whether to wrap the output in a "BEGIN; COMMIT;"
def __init__(self):
self.style = color_style()
def execute(self, *args, **options):
# Switch to English, because django-admin.py creates database content
# like permissions, and those shouldn't contain any translations.
# But only do this if we can assume we have a working settings file,
# because django.utils.translation requires settings.
if self.can_import_settings:
from django.utils import translation
translation.activate('en-us')
try:
if self.requires_model_validation:
self.validate()
output = self.handle(*args, **options)
if output:
if self.output_transaction:
# This needs to be imported here, because it relies on settings.
from django.db import backend
if backend.get_start_transaction_sql():
print self.style.SQL_KEYWORD(backend.get_start_transaction_sql())
print output
if self.output_transaction:
print self.style.SQL_KEYWORD("COMMIT;")
except CommandError, e:
sys.stderr.write(self.style.ERROR(str('Error: %s\n' % e)))
sys.exit(1)
def validate(self, app=None):
"""
Validates the given app, raising CommandError for any errors.
If app is None, then this will validate all installed apps.
"""
from django.core.management.validation import get_validation_errors
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
s = StringIO()
num_errors = get_validation_errors(s, app)
if num_errors:
s.seek(0)
error_text = s.read()
raise CommandError("One or more models did not validate:\n%s" % error_text)
def handle(self, *args, **options):
raise NotImplementedError()
class AppCommand(BaseCommand):
args = '[appname ...]'
def handle(self, *app_labels, **options):
from django.db import models
if not app_labels:
raise CommandError('Enter at least one appname.')
try:
app_list = [models.get_app(app_label) for app_label in app_labels]
except (ImproperlyConfigured, ImportError), e:
raise CommandError("%s. Are you sure your INSTALLED_APPS setting is correct?" % e)
output = []
for app in app_list:
app_output = self.handle_app(app, **options)
if app_output:
output.append(app_output)
return '\n'.join(output)
def handle_app(self, app, **options):
raise NotImplementedError()
class CopyFilesCommand(BaseCommand):
requires_model_validation = False
def copy_helper(self, app_or_project, name, directory, other_name=''):
import django
import os
import re
import shutil
other = {'project': 'app', 'app': 'project'}[app_or_project]
if not re.search(r'^\w+$', name): # If it's not a valid directory name.
raise CommandError("%r is not a valid %s name. Please use only numbers, letters and underscores." % (name, app_or_project))
top_dir = os.path.join(directory, name)
try:
os.mkdir(top_dir)
except OSError, e:
raise CommandError(e)
# Determine where the app or project templates are. Use
# django.__path__[0] because we don't know into which directory
# django has been installed.
template_dir = os.path.join(django.__path__[0], 'conf', '%s_template' % app_or_project)
for d, subdirs, files in os.walk(template_dir):
relative_dir = d[len(template_dir)+1:].replace('%s_name' % app_or_project, name)
if relative_dir:
os.mkdir(os.path.join(top_dir, relative_dir))
for i, subdir in enumerate(subdirs):
if subdir.startswith('.'):
del subdirs[i]
for f in files:
if f.endswith('.pyc'):
continue
path_old = os.path.join(d, f)
path_new = os.path.join(top_dir, relative_dir, f.replace('%s_name' % app_or_project, name))
fp_old = open(path_old, 'r')
fp_new = open(path_new, 'w')
fp_new.write(fp_old.read().replace('{{ %s_name }}' % app_or_project, name).replace('{{ %s_name }}' % other, other_name))
fp_old.close()
fp_new.close()
try:
shutil.copymode(path_old, path_new)
except OSError:
sys.stderr.write(self.style.NOTICE("Notice: Couldn't set permission bits on %s. You're probably using an uncommon filesystem setup. No problem.\n" % path_new))

View File

@ -0,0 +1,28 @@
"""
Sets up the terminal color scheme.
"""
from django.utils import termcolors
import sys
def color_style():
"Returns a Style object with the Django color scheme."
if sys.platform == 'win32' or sys.platform == 'Pocket PC' or not sys.stdout.isatty():
return no_style()
class dummy: pass
style = dummy()
style.ERROR = termcolors.make_style(fg='red', opts=('bold',))
style.ERROR_OUTPUT = termcolors.make_style(fg='red', opts=('bold',))
style.NOTICE = termcolors.make_style(fg='red')
style.SQL_FIELD = termcolors.make_style(fg='green', opts=('bold',))
style.SQL_COLTYPE = termcolors.make_style(fg='green')
style.SQL_KEYWORD = termcolors.make_style(fg='yellow')
style.SQL_TABLE = termcolors.make_style(opts=('bold',))
return style
def no_style():
"Returns a Style object that has no colors."
class dummy:
def __getattr__(self, attr):
return lambda x: x
return dummy()

View File

@ -0,0 +1,33 @@
from django.core.management.base import AppCommand
from django.utils.text import capfirst
MODULE_TEMPLATE = ''' {%% if perms.%(app)s.%(addperm)s or perms.%(app)s.%(changeperm)s %%}
<tr>
<th>{%% if perms.%(app)s.%(changeperm)s %%}<a href="%(app)s/%(mod)s/">{%% endif %%}%(name)s{%% if perms.%(app)s.%(changeperm)s %%}</a>{%% endif %%}</th>
<td class="x50">{%% if perms.%(app)s.%(addperm)s %%}<a href="%(app)s/%(mod)s/add/" class="addlink">{%% endif %%}Add{%% if perms.%(app)s.%(addperm)s %%}</a>{%% endif %%}</td>
<td class="x75">{%% if perms.%(app)s.%(changeperm)s %%}<a href="%(app)s/%(mod)s/" class="changelink">{%% endif %%}Change{%% if perms.%(app)s.%(changeperm)s %%}</a>{%% endif %%}</td>
</tr>
{%% endif %%}'''
class Command(AppCommand):
help = 'Prints the admin-index template snippet for the given app name(s).'
def handle_app(self, app, **options):
from django.db.models import get_models
output = []
app_models = get_models(app)
app_label = app_models[0]._meta.app_label
output.append('{%% if perms.%s %%}' % app_label)
output.append('<div class="module"><h2>%s</h2><table>' % app_label.title())
for model in app_models:
if model._meta.admin:
output.append(MODULE_TEMPLATE % {
'app': app_label,
'mod': model._meta.module_name,
'name': capfirst(model._meta.verbose_name_plural),
'addperm': model._meta.get_add_permission(),
'changeperm': model._meta.get_change_permission(),
})
output.append('</table></div>')
output.append('{% endif %}')
return '\n'.join(output)

View File

@ -0,0 +1,40 @@
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Creates the table needed to use the SQL cache backend."
args = "[tablename]"
requires_model_validation = False
def handle(self, tablename, **options):
from django.db import backend, connection, transaction, models
fields = (
# "key" is a reserved word in MySQL, so use "cache_key" instead.
models.CharField(name='cache_key', max_length=255, unique=True, primary_key=True),
models.TextField(name='value'),
models.DateTimeField(name='expires', db_index=True),
)
table_output = []
index_output = []
for f in fields:
field_output = [backend.quote_name(f.name), f.db_type()]
field_output.append("%sNULL" % (not f.null and "NOT " or ""))
if f.unique:
field_output.append("UNIQUE")
if f.primary_key:
field_output.append("PRIMARY KEY")
if f.db_index:
unique = f.unique and "UNIQUE " or ""
index_output.append("CREATE %sINDEX %s_%s ON %s (%s);" % \
(unique, tablename, f.name, backend.quote_name(tablename),
backend.quote_name(f.name)))
table_output.append(" ".join(field_output))
full_statement = ["CREATE TABLE %s (" % backend.quote_name(tablename)]
for i, line in enumerate(table_output):
full_statement.append(' %s%s' % (line, i < len(table_output)-1 and ',' or ''))
full_statement.append(');')
curs = connection.cursor()
curs.execute("\n".join(full_statement))
for statement in index_output:
curs.execute(statement)
transaction.commit_unless_managed()

View File

@ -0,0 +1,10 @@
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Runs the command-line client for the current DATABASE_ENGINE."
requires_model_validation = False
def handle(self, **options):
from django.db import runshell
runshell()

View File

@ -0,0 +1,32 @@
from django.core.management.base import BaseCommand
def module_to_dict(module, omittable=lambda k: k.startswith('_')):
"Converts a module namespace to a Python dictionary. Used by get_settings_diff."
return dict([(k, repr(v)) for k, v in module.__dict__.items() if not omittable(k)])
class Command(BaseCommand):
help = """Displays differences between the current settings.py and Django's
default settings. Settings that don't appear in the defaults are
followed by "###"."""
requires_model_validation = False
def handle(self, **options):
# Inspired by Postfix's "postconf -n".
from django.conf import settings, global_settings
# Because settings are imported lazily, we need to explicitly load them.
settings._import_settings()
user_settings = module_to_dict(settings._target)
default_settings = module_to_dict(global_settings)
output = []
keys = user_settings.keys()
keys.sort()
for key in keys:
if key not in default_settings:
output.append("%s = %s ###" % (key, user_settings[key]))
elif user_settings[key] != default_settings[key]:
output.append("%s = %s" % (key, user_settings[key]))
print '\n'.join(output)

View File

@ -0,0 +1,33 @@
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
help = 'Output the contents of the database as a fixture of the given format.'
args = '[--format] [--indent] [appname ...]'
def handle(self, *app_labels, **options):
from django.db.models import get_app, get_apps, get_models
from django.core import serializers
format = options.get('format', 'json')
indent = options.get('indent', None)
if len(app_labels) == 0:
app_list = get_apps()
else:
app_list = [get_app(app_label) for app_label in app_labels]
# Check that the serialization format exists; this is a shortcut to
# avoid collating all the objects and _then_ failing.
try:
serializers.get_serializer(format)
except KeyError:
raise CommandError("Unknown serialization format: %s" % format)
objects = []
for app in app_list:
for model in get_models(app):
objects.extend(model.objects.all())
try:
return serializers.serialize(format, objects, indent=indent)
except Exception, e:
raise CommandError("Unable to serialize database: %s" % e)

View File

@ -0,0 +1,64 @@
from django.core.management.base import BaseCommand, CommandError
from django.core.management.color import no_style
class Command(BaseCommand):
help = "Executes ``sqlflush`` on the current database."
args = '[--verbosity] [--noinput]'
def handle(self, **options):
from django.conf import settings
from django.db import connection, transaction, models
from django.dispatch import dispatcher
from django.core.management.sql import sql_flush, emit_post_sync_signal
verbosity = int(options.get('verbosity', 1))
interactive = options.get('interactive')
self.style = no_style()
# Import the 'management' module within each installed app, to register
# dispatcher events.
for app_name in settings.INSTALLED_APPS:
try:
__import__(app_name + '.management', {}, {}, [''])
except ImportError:
pass
sql_list = sql_flush(self.style)
if interactive:
confirm = raw_input("""You have requested a flush of the database.
This will IRREVERSIBLY DESTROY all data currently in the %r database,
and return each table to the state it was in after syncdb.
Are you sure you want to do this?
Type 'yes' to continue, or 'no' to cancel: """ % settings.DATABASE_NAME)
else:
confirm = 'yes'
if confirm == 'yes':
try:
cursor = connection.cursor()
for sql in sql_list:
cursor.execute(sql)
except Exception, e:
transaction.rollback_unless_managed()
raise CommandError("""Database %s couldn't be flushed. Possible reasons:
* The database isn't running or isn't configured correctly.
* At least one of the expected database tables doesn't exist.
* The SQL was invalid.
Hint: Look at the output of 'django-admin.py sqlflush'. That's the SQL this command wasn't able to run.
The full error: %s""" % (settings.DATABASE_NAME, e))
transaction.commit_unless_managed()
# Emit the post sync signal. This allows individual
# applications to respond as if the database had been
# sync'd from scratch.
emit_post_sync_signal(models.get_models(), verbosity, interactive)
# Reinstall the initial_data fixture.
from django.core.management import call_command
call_command('loaddata', 'initial_data', **options)
else:
print "Flush cancelled."

View File

@ -0,0 +1,120 @@
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
help = "Introspects the database tables in the given database and outputs a Django model module."
requires_model_validation = False
def handle(self, **options):
try:
for line in self.handle_inspection():
print line
except NotImplementedError:
raise CommandError("Database inspection isn't supported for the currently selected database backend.")
def handle_inspection(self):
from django.db import connection, get_introspection_module
import keyword
introspection_module = get_introspection_module()
table2model = lambda table_name: table_name.title().replace('_', '')
cursor = connection.cursor()
yield "# This is an auto-generated Django model module."
yield "# You'll have to do the following manually to clean this up:"
yield "# * Rearrange models' order"
yield "# * Make sure each model has one field with primary_key=True"
yield "# Feel free to rename the models, but don't rename db_table values or field names."
yield "#"
yield "# Also note: You'll have to insert the output of 'django-admin.py sqlcustom [appname]'"
yield "# into your database."
yield ''
yield 'from django.db import models'
yield ''
for table_name in introspection_module.get_table_list(cursor):
yield 'class %s(models.Model):' % table2model(table_name)
try:
relations = introspection_module.get_relations(cursor, table_name)
except NotImplementedError:
relations = {}
try:
indexes = introspection_module.get_indexes(cursor, table_name)
except NotImplementedError:
indexes = {}
for i, row in enumerate(introspection_module.get_table_description(cursor, table_name)):
att_name = row[0].lower()
comment_notes = [] # Holds Field notes, to be displayed in a Python comment.
extra_params = {} # Holds Field parameters such as 'db_column'.
if ' ' in att_name:
extra_params['db_column'] = att_name
att_name = att_name.replace(' ', '')
comment_notes.append('Field renamed to remove spaces.')
if keyword.iskeyword(att_name):
extra_params['db_column'] = att_name
att_name += '_field'
comment_notes.append('Field renamed because it was a Python reserved word.')
if i in relations:
rel_to = relations[i][1] == table_name and "'self'" or table2model(relations[i][1])
field_type = 'ForeignKey(%s' % rel_to
if att_name.endswith('_id'):
att_name = att_name[:-3]
else:
extra_params['db_column'] = att_name
else:
try:
field_type = introspection_module.DATA_TYPES_REVERSE[row[1]]
except KeyError:
field_type = 'TextField'
comment_notes.append('This field type is a guess.')
# This is a hook for DATA_TYPES_REVERSE to return a tuple of
# (field_type, extra_params_dict).
if type(field_type) is tuple:
field_type, new_params = field_type
extra_params.update(new_params)
# Add max_length for all CharFields.
if field_type == 'CharField' and row[3]:
extra_params['max_length'] = row[3]
if field_type == 'DecimalField':
extra_params['max_digits'] = row[4]
extra_params['decimal_places'] = row[5]
# Add primary_key and unique, if necessary.
column_name = extra_params.get('db_column', att_name)
if column_name in indexes:
if indexes[column_name]['primary_key']:
extra_params['primary_key'] = True
elif indexes[column_name]['unique']:
extra_params['unique'] = True
field_type += '('
# Don't output 'id = meta.AutoField(primary_key=True)', because
# that's assumed if it doesn't exist.
if att_name == 'id' and field_type == 'AutoField(' and extra_params == {'primary_key': True}:
continue
# Add 'null' and 'blank', if the 'null_ok' flag was present in the
# table description.
if row[6]: # If it's NULL...
extra_params['blank'] = True
if not field_type in ('TextField(', 'CharField('):
extra_params['null'] = True
field_desc = '%s = models.%s' % (att_name, field_type)
if extra_params:
if not field_desc.endswith('('):
field_desc += ', '
field_desc += ', '.join(['%s=%r' % (k, v) for k, v in extra_params.items()])
field_desc += ')'
if comment_notes:
field_desc += ' # ' + ' '.join(comment_notes)
yield ' %s' % field_desc
yield ' class Meta:'
yield ' db_table = %r' % table_name
yield ''

View File

@ -0,0 +1,123 @@
from django.core.management.base import BaseCommand
from django.core.management.color import no_style
import sys
import os
try:
set
except NameError:
from sets import Set as set # Python 2.3 fallback
class Command(BaseCommand):
help = 'Installs the named fixture(s) in the database.'
args = "[--verbosity] fixture, fixture, ..."
def handle(self, *fixture_labels, **options):
from django.db.models import get_apps
from django.core import serializers
from django.db import connection, transaction, backend
from django.conf import settings
self.style = no_style()
verbosity = options.get('verbosity', 1)
# Keep a count of the installed objects and fixtures
count = [0, 0]
models = set()
humanize = lambda dirname: dirname and "'%s'" % dirname or 'absolute path'
# Get a cursor (even though we don't need one yet). This has
# the side effect of initializing the test database (if
# it isn't already initialized).
cursor = connection.cursor()
# Start transaction management. All fixtures are installed in a
# single transaction to ensure that all references are resolved.
transaction.commit_unless_managed()
transaction.enter_transaction_management()
transaction.managed(True)
app_fixtures = [os.path.join(os.path.dirname(app.__file__), 'fixtures') for app in get_apps()]
for fixture_label in fixture_labels:
parts = fixture_label.split('.')
if len(parts) == 1:
fixture_name = fixture_label
formats = serializers.get_serializer_formats()
else:
fixture_name, format = '.'.join(parts[:-1]), parts[-1]
if format in serializers.get_serializer_formats():
formats = [format]
else:
formats = []
if verbosity > 0:
if formats:
print "Loading '%s' fixtures..." % fixture_name
else:
print "Skipping fixture '%s': %s is not a known serialization format" % (fixture_name, format)
for fixture_dir in app_fixtures + list(settings.FIXTURE_DIRS) + ['']:
if verbosity > 1:
print "Checking %s for fixtures..." % humanize(fixture_dir)
label_found = False
for format in formats:
serializer = serializers.get_serializer(format)
if verbosity > 1:
print "Trying %s for %s fixture '%s'..." % \
(humanize(fixture_dir), format, fixture_name)
try:
full_path = os.path.join(fixture_dir, '.'.join([fixture_name, format]))
fixture = open(full_path, 'r')
if label_found:
fixture.close()
print self.style.ERROR("Multiple fixtures named '%s' in %s. Aborting." %
(fixture_name, humanize(fixture_dir)))
transaction.rollback()
transaction.leave_transaction_management()
return
else:
count[1] += 1
if verbosity > 0:
print "Installing %s fixture '%s' from %s." % \
(format, fixture_name, humanize(fixture_dir))
try:
objects = serializers.deserialize(format, fixture)
for obj in objects:
count[0] += 1
models.add(obj.object.__class__)
obj.save()
label_found = True
except Exception, e:
fixture.close()
sys.stderr.write(
self.style.ERROR("Problem installing fixture '%s': %s\n" %
(full_path, str(e))))
transaction.rollback()
transaction.leave_transaction_management()
return
fixture.close()
except:
if verbosity > 1:
print "No %s fixture '%s' in %s." % \
(format, fixture_name, humanize(fixture_dir))
if count[0] > 0:
sequence_sql = backend.get_sql_sequence_reset(self.style, models)
if sequence_sql:
if verbosity > 1:
print "Resetting sequences"
for line in sequence_sql:
cursor.execute(line)
transaction.commit()
transaction.leave_transaction_management()
if count[0] == 0:
if verbosity > 0:
print "No fixtures found."
else:
if verbosity > 0:
print "Installed %d object(s) from %d fixture(s)" % tuple(count)

View File

@ -0,0 +1,47 @@
from django.core.management.base import AppCommand, CommandError
from django.core.management.color import no_style
class Command(AppCommand):
help = "Executes ``sqlreset`` for the given app(s) in the current database."
args = '[--noinput] [appname ...]'
output_transaction = True
def handle_app(self, app, **options):
from django.db import connection, transaction
from django.conf import settings
from django.core.management.sql import sql_reset
app_name = app.__name__.split('.')[-2]
self.style = no_style()
sql_list = sql_reset(app, self.style)
if options.get('interactive'):
confirm = raw_input("""
You have requested a database reset.
This will IRREVERSIBLY DESTROY any data for
the "%s" application in the database "%s".
Are you sure you want to do this?
Type 'yes' to continue, or 'no' to cancel: """ % (app_name, settings.DATABASE_NAME))
else:
confirm = 'yes'
if confirm == 'yes':
try:
cursor = connection.cursor()
for sql in sql_list:
cursor.execute(sql)
except Exception, e:
transaction.rollback_unless_managed()
raise CommandError("""Error: %s couldn't be reset. Possible reasons:
* The database isn't running or isn't configured correctly.
* At least one of the database tables doesn't exist.
* The SQL was invalid.
Hint: Look at the output of 'django-admin.py sqlreset %s'. That's the SQL this command wasn't able to run.
The full error: %s""" % (app_name, app_name, e))
transaction.commit_unless_managed()
else:
print "Reset cancelled."

View File

@ -0,0 +1,16 @@
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Runs this project as a FastCGI application. Requires flup."
args = '[various KEY=val options, use `runfcgi help` for help]'
def handle(self, *args, **options):
from django.conf import settings
from django.utils import translation
# Activate the current language, because it won't get activated later.
try:
translation.activate(settings.LANGUAGE_CODE)
except AttributeError:
pass
from django.core.servers.fastcgi import runfastcgi
runfastcgi(args)

View File

@ -0,0 +1,65 @@
from django.core.management.base import BaseCommand, CommandError
import os
import sys
class Command(BaseCommand):
help = "Starts a lightweight Web server for development."
args = '[--noreload] [--adminmedia=ADMIN_MEDIA_PATH] [optional port number, or ipaddr:port]'
# Validation is called explicitly each time the server is reloaded.
requires_model_validation = False
def handle(self, addrport='', **options):
import django
from django.core.servers.basehttp import run, AdminMediaHandler, WSGIServerException
from django.core.handlers.wsgi import WSGIHandler
if not addrport:
addr = ''
port = '8000'
else:
try:
addr, port = addrport.split(':')
except ValueError:
addr, port = '', addrport
if not addr:
addr = '127.0.0.1'
if not port.isdigit():
raise CommandError("%r is not a valid port number." % port)
use_reloader = options.get('use_reloader', True)
admin_media_dir = options.get('admin_media_dir', '')
quit_command = (sys.platform == 'win32') and 'CTRL-BREAK' or 'CONTROL-C'
def inner_run():
from django.conf import settings
print "Validating models..."
self.validate()
print "\nDjango version %s, using settings %r" % (django.get_version(), settings.SETTINGS_MODULE)
print "Development server is running at http://%s:%s/" % (addr, port)
print "Quit the server with %s." % quit_command
try:
path = admin_media_dir or django.__path__[0] + '/contrib/admin/media'
handler = AdminMediaHandler(WSGIHandler(), path)
run(addr, int(port), handler)
except WSGIServerException, e:
# Use helpful error messages instead of ugly tracebacks.
ERRORS = {
13: "You don't have permission to access that port.",
98: "That port is already in use.",
99: "That IP address can't be assigned-to.",
}
try:
error_text = ERRORS[e.args[0].args[0]]
except (AttributeError, KeyError):
error_text = str(e)
sys.stderr.write(self.style.ERROR("Error: %s" % error_text) + '\n')
# Need to use an OS exit because sys.exit doesn't work in a thread
os._exit(1)
except KeyboardInterrupt:
sys.exit(0)
if use_reloader:
from django.utils import autoreload
autoreload.main(inner_run)
else:
inner_run()

View File

@ -0,0 +1,42 @@
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Runs a Python interactive interpreter. Tries to use IPython, if it's available."
args = '[--plain]'
requires_model_validation = False
def handle(self, **options):
# XXX: (Temporary) workaround for ticket #1796: force early loading of all
# models from installed apps.
from django.db.models.loading import get_models
loaded_models = get_models()
use_plain = options.get('plain', False)
try:
if use_plain:
# Don't bother loading IPython, because the user wants plain Python.
raise ImportError
import IPython
# Explicitly pass an empty list as arguments, because otherwise IPython
# would use sys.argv from this script.
shell = IPython.Shell.IPShell(argv=[])
shell.mainloop()
except ImportError:
import code
# Set up a dictionary to serve as the environment for the shell, so
# that tab completion works on objects that are imported at runtime.
# See ticket 5082.
imported_objects = {}
try: # Try activating rlcompleter, because it's handy.
import readline
except ImportError:
pass
else:
# We don't have to wrap the following import in a 'try', because
# we already know 'readline' was imported successfully.
import rlcompleter
readline.set_completer(rlcompleter.Completer(imported_objects).complete)
readline.parse_and_bind("tab:complete")
code.interact(local=imported_objects)

View File

@ -0,0 +1,10 @@
from django.core.management.base import AppCommand
class Command(AppCommand):
help = "Prints the CREATE TABLE SQL statements for the given app name(s)."
output_transaction = True
def handle_app(self, app, **options):
from django.core.management.sql import sql_create
return '\n'.join(sql_create(app, self.style))

View File

@ -0,0 +1,10 @@
from django.core.management.base import AppCommand
class Command(AppCommand):
help = "Prints the CREATE TABLE, initial-data and CREATE INDEX SQL statements for the given model module name(s)."
output_transaction = True
def handle_app(self, app, **options):
from django.core.management.sql import sql_all
return '\n'.join(sql_all(app, self.style))

View File

@ -0,0 +1,10 @@
from django.core.management.base import AppCommand
class Command(AppCommand):
help = "Prints the DROP TABLE SQL statements for the given app name(s)."
output_transaction = True
def handle_app(self, app, **options):
from django.core.management.sql import sql_delete
return '\n'.join(sql_delete(app, self.style))

View File

@ -0,0 +1,10 @@
from django.core.management.base import AppCommand
class Command(AppCommand):
help = "Prints the custom table modifying SQL statements for the given app name(s)."
output_transaction = True
def handle_app(self, app, **options):
from django.core.management.sql import sql_custom
return '\n'.join(sql_custom(app))

View File

@ -0,0 +1,10 @@
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Returns a list of the SQL statements required to return all tables in the database to the state they were in just after they were installed."
output_transaction = True
def handle(self, **options):
from django.core.management.sql import sql_flush
return '\n'.join(sql_flush(self.style))

View File

@ -0,0 +1,10 @@
from django.core.management.base import AppCommand
class Command(AppCommand):
help = "Prints the CREATE INDEX SQL statements for the given model module name(s)."
output_transaction = True
def handle_app(self, app, **options):
from django.core.management.sql import sql_indexes
return '\n'.join(sql_indexes(app, self.style))

View File

@ -0,0 +1,7 @@
from django.core.management.base import AppCommand, CommandError
class Command(AppCommand):
help = "RENAMED: see 'sqlcustom'"
def handle(self, *apps, **options):
raise CommandError("This command has been renamed. Use the 'sqlcustom' command instead.")

View File

@ -0,0 +1,10 @@
from django.core.management.base import AppCommand
class Command(AppCommand):
help = "Prints the DROP TABLE SQL, then the CREATE TABLE SQL, for the given app name(s)."
output_transaction = True
def handle_app(self, app, **options):
from django.core.management.sql import sql_reset
return '\n'.join(sql_reset(app, self.style))

View File

@ -0,0 +1,9 @@
from django.core.management.base import AppCommand
class Command(AppCommand):
help = 'Prints the SQL statements for resetting sequences for the given app name(s).'
output_transaction = True
def handle_app(self, app, **options):
from django.db import backend, models
return '\n'.join(backend.get_sql_sequence_reset(self.style, models.get_models(app)))

View File

@ -0,0 +1,33 @@
from django.core.management.base import CopyFilesCommand, CommandError
import os
class Command(CopyFilesCommand):
help = "Creates a Django app directory structure for the given app name in the current directory."
args = "[appname]"
requires_model_validation = False
# Can't import settings during this command, because they haven't
# necessarily been created.
can_import_settings = False
def handle(self, app_name, directory=None, **options):
if directory is None:
directory = os.getcwd()
# Determine the project_name a bit naively -- by looking at the name of
# the parent directory.
project_dir = os.path.normpath(os.path.join(directory, '..'))
parent_dir = os.path.basename(project_dir)
project_name = os.path.basename(directory)
if app_name == project_name:
raise CommandError("You cannot create an app with the same name (%r) as your project." % app_name)
self.copy_helper('app', app_name, directory, parent_dir)
class ProjectCommand(Command):
help = "Creates a Django app directory structure for the given app name in this project's directory."
def __init__(self, project_directory):
super(ProjectCommand, self).__init__()
self.project_directory = project_directory
def handle(self, app_name):
super(ProjectCommand, self).handle(app_name, self.project_directory)

View File

@ -0,0 +1,39 @@
from django.core.management.base import CopyFilesCommand, CommandError
import os
import re
from random import choice
INVALID_PROJECT_NAMES = ('django', 'site', 'test')
class Command(CopyFilesCommand):
help = "Creates a Django project directory structure for the given project name in the current directory."
args = "[projectname]"
requires_model_validation = False
# Can't import settings during this command, because they haven't
# necessarily been created.
can_import_settings = False
def handle(self, project_name, **options):
# Determine the project_name a bit naively -- by looking at the name of
# the parent directory.
directory = os.getcwd()
if project_name in INVALID_PROJECT_NAMES:
raise CommandError("%r conflicts with the name of an existing Python module and cannot be used as a project name. Please try another name." % project_name)
self.copy_helper('project', project_name, directory)
# Create a random SECRET_KEY hash, and put it in the main settings.
main_settings_file = os.path.join(directory, project_name, 'settings.py')
settings_contents = open(main_settings_file, 'r').read()
# If settings.py was copied from a read-only source, make it writeable.
if not os.access(main_settings_file, os.W_OK):
os.chmod(main_settings_file, 0600)
fp = open(main_settings_file, 'w')
secret_key = ''.join([choice('abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*(-_=+)') for i in range(50)])
settings_contents = re.sub(r"(?<=SECRET_KEY = ')'", secret_key + "'", settings_contents)
fp.write(settings_contents)
fp.close()

View File

@ -0,0 +1,129 @@
from django.core.management.base import BaseCommand
from django.core.management.color import no_style
try:
set
except NameError:
from sets import Set as set # Python 2.3 fallback
class Command(BaseCommand):
help = "Create the database tables for all apps in INSTALLED_APPS whose tables haven't already been created."
args = '[--verbosity] [--noinput]'
def handle(self, **options):
from django.db import backend, connection, transaction, models
from django.conf import settings
from django.core.management.sql import table_list, installed_models, sql_model_create, sql_for_pending_references, many_to_many_sql_for_model, custom_sql_for_model, sql_indexes_for_model, emit_post_sync_signal
verbosity = int(options.get('verbosity', 1))
interactive = options.get('interactive')
self.style = no_style()
# Import the 'management' module within each installed app, to register
# dispatcher events.
for app_name in settings.INSTALLED_APPS:
try:
__import__(app_name + '.management', {}, {}, [''])
except ImportError:
pass
cursor = connection.cursor()
# Get a list of all existing database tables,
# so we know what needs to be added.
table_list = table_list()
if backend.uses_case_insensitive_names:
table_name_converter = str.upper
else:
table_name_converter = lambda x: x
# Get a list of already installed *models* so that references work right.
seen_models = installed_models(table_list)
created_models = set()
pending_references = {}
# Create the tables for each model
for app in models.get_apps():
app_name = app.__name__.split('.')[-2]
model_list = models.get_models(app)
for model in model_list:
# Create the model's database table, if it doesn't already exist.
if verbosity >= 2:
print "Processing %s.%s model" % (app_name, model._meta.object_name)
if table_name_converter(model._meta.db_table) in table_list:
continue
sql, references = sql_model_create(model, self.style, seen_models)
seen_models.add(model)
created_models.add(model)
for refto, refs in references.items():
pending_references.setdefault(refto, []).extend(refs)
sql.extend(sql_for_pending_references(model, self.style, pending_references))
if verbosity >= 1:
print "Creating table %s" % model._meta.db_table
for statement in sql:
cursor.execute(statement)
table_list.append(table_name_converter(model._meta.db_table))
# Create the m2m tables. This must be done after all tables have been created
# to ensure that all referred tables will exist.
for app in models.get_apps():
app_name = app.__name__.split('.')[-2]
model_list = models.get_models(app)
for model in model_list:
if model in created_models:
sql = many_to_many_sql_for_model(model, self.style)
if sql:
if verbosity >= 2:
print "Creating many-to-many tables for %s.%s model" % (app_name, model._meta.object_name)
for statement in sql:
cursor.execute(statement)
transaction.commit_unless_managed()
# Send the post_syncdb signal, so individual apps can do whatever they need
# to do at this point.
emit_post_sync_signal(created_models, verbosity, interactive)
# Install custom SQL for the app (but only if this
# is a model we've just created)
for app in models.get_apps():
app_name = app.__name__.split('.')[-2]
for model in models.get_models(app):
if model in created_models:
custom_sql = custom_sql_for_model(model)
if custom_sql:
if verbosity >= 1:
print "Installing custom SQL for %s.%s model" % (app_name, model._meta.object_name)
try:
for sql in custom_sql:
cursor.execute(sql)
except Exception, e:
sys.stderr.write("Failed to install custom SQL for %s.%s model: %s" % \
(app_name, model._meta.object_name, e))
transaction.rollback_unless_managed()
else:
transaction.commit_unless_managed()
# Install SQL indicies for all newly created models
for app in models.get_apps():
app_name = app.__name__.split('.')[-2]
for model in models.get_models(app):
if model in created_models:
index_sql = sql_indexes_for_model(model, self.style)
if index_sql:
if verbosity >= 1:
print "Installing index for %s.%s model" % (app_name, model._meta.object_name)
try:
for sql in index_sql:
cursor.execute(sql)
except Exception, e:
sys.stderr.write("Failed to install index for %s.%s model: %s" % \
(app_name, model._meta.object_name, e))
transaction.rollback_unless_managed()
else:
transaction.commit_unless_managed()
# Install the 'initialdata' fixture, using format discovery
from django.core.management import call_command
call_command('loaddata', 'initial_data', **options)

View File

@ -0,0 +1,27 @@
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = 'Runs the test suite for the specified applications, or the entire site if no apps are specified.'
args = '[--verbosity] [--noinput] [appname ...]'
requires_model_validation = False
def handle(self, *test_labels, **options):
from django.conf import settings
from django.db.models import get_app, get_apps
verbosity = options.get('verbosity', 1)
interactive = options.get('interactive', True)
test_path = settings.TEST_RUNNER.split('.')
# Allow for Python 2.5 relative paths
if len(test_path) > 1:
test_module_name = '.'.join(test_path[:-1])
else:
test_module_name = '.'
test_module = __import__(test_module_name, {}, {}, test_path[-1])
test_runner = getattr(test_module, test_path[-1])
failures = test_runner(test_labels, verbosity=verbosity, interactive=interactive)
if failures:
sys.exit(failures)

View File

@ -0,0 +1,9 @@
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Validates all installed models."
requires_model_validation = False
def handle(self, **options):
self.validate()

View File

@ -0,0 +1,420 @@
from django.core.management.base import CommandError
import os
import re
try:
set
except NameError:
from sets import Set as set # Python 2.3 fallback
def table_list():
"Returns a list of all table names that exist in the database."
from django.db import connection, get_introspection_module
cursor = connection.cursor()
return get_introspection_module().get_table_list(cursor)
def installed_models(table_list):
"Returns a set of all models that are installed, given a list of existing table names."
from django.db import backend, models
all_models = []
for app in models.get_apps():
for model in models.get_models(app):
all_models.append(model)
if backend.uses_case_insensitive_names:
converter = lambda x: x.upper()
else:
converter = lambda x: x
return set([m for m in all_models if converter(m._meta.db_table) in map(converter, table_list)])
def sequence_list():
"Returns a list of information about all DB sequences for all models in all apps."
from django.db import models
apps = models.get_apps()
sequence_list = []
for app in apps:
for model in models.get_models(app):
for f in model._meta.fields:
if isinstance(f, models.AutoField):
sequence_list.append({'table': model._meta.db_table, 'column': f.column})
break # Only one AutoField is allowed per model, so don't bother continuing.
for f in model._meta.many_to_many:
sequence_list.append({'table': f.m2m_db_table(), 'column': None})
return sequence_list
def sql_create(app, style):
"Returns a list of the CREATE TABLE SQL statements for the given app."
from django.db import models
from django.conf import settings
if settings.DATABASE_ENGINE == 'dummy':
# This must be the "dummy" database backend, which means the user
# hasn't set DATABASE_ENGINE.
raise CommandError("Django doesn't know which syntax to use for your SQL statements,\n" +
"because you haven't specified the DATABASE_ENGINE setting.\n" +
"Edit your settings file and change DATABASE_ENGINE to something like 'postgresql' or 'mysql'.")
# Get installed models, so we generate REFERENCES right.
# We trim models from the current app so that the sqlreset command does not
# generate invalid SQL (leaving models out of known_models is harmless, so
# we can be conservative).
app_models = models.get_models(app)
final_output = []
known_models = set([model for model in installed_models(table_list()) if model not in app_models])
pending_references = {}
for model in app_models:
output, references = sql_model_create(model, style, known_models)
final_output.extend(output)
for refto, refs in references.items():
pending_references.setdefault(refto, []).extend(refs)
final_output.extend(sql_for_pending_references(model, style, pending_references))
# Keep track of the fact that we've created the table for this model.
known_models.add(model)
# Create the many-to-many join tables.
for model in app_models:
final_output.extend(many_to_many_sql_for_model(model, style))
# Handle references to tables that are from other apps
# but don't exist physically.
not_installed_models = set(pending_references.keys())
if not_installed_models:
alter_sql = []
for model in not_installed_models:
alter_sql.extend(['-- ' + sql for sql in
sql_for_pending_references(model, style, pending_references)])
if alter_sql:
final_output.append('-- The following references should be added but depend on non-existent tables:')
final_output.extend(alter_sql)
return final_output
def sql_delete(app, style):
"Returns a list of the DROP TABLE SQL statements for the given app."
from django.db import backend, connection, models, get_introspection_module
from django.db.backends.util import truncate_name
introspection = get_introspection_module()
# This should work even if a connection isn't available
try:
cursor = connection.cursor()
except:
cursor = None
# Figure out which tables already exist
if cursor:
table_names = introspection.get_table_list(cursor)
else:
table_names = []
if backend.uses_case_insensitive_names:
table_name_converter = str.upper
else:
table_name_converter = lambda x: x
output = []
# Output DROP TABLE statements for standard application tables.
to_delete = set()
references_to_delete = {}
app_models = models.get_models(app)
for model in app_models:
if cursor and table_name_converter(model._meta.db_table) in table_names:
# The table exists, so it needs to be dropped
opts = model._meta
for f in opts.fields:
if f.rel and f.rel.to not in to_delete:
references_to_delete.setdefault(f.rel.to, []).append( (model, f) )
to_delete.add(model)
for model in app_models:
if cursor and table_name_converter(model._meta.db_table) in table_names:
# Drop the table now
output.append('%s %s;' % (style.SQL_KEYWORD('DROP TABLE'),
style.SQL_TABLE(backend.quote_name(model._meta.db_table))))
if backend.supports_constraints and model in references_to_delete:
for rel_class, f in references_to_delete[model]:
table = rel_class._meta.db_table
col = f.column
r_table = model._meta.db_table
r_col = model._meta.get_field(f.rel.field_name).column
r_name = '%s_refs_%s_%x' % (col, r_col, abs(hash((table, r_table))))
output.append('%s %s %s %s;' % \
(style.SQL_KEYWORD('ALTER TABLE'),
style.SQL_TABLE(backend.quote_name(table)),
style.SQL_KEYWORD(backend.get_drop_foreignkey_sql()),
style.SQL_FIELD(truncate_name(r_name, backend.get_max_name_length()))))
del references_to_delete[model]
if model._meta.has_auto_field and hasattr(backend, 'get_drop_sequence'):
output.append(backend.get_drop_sequence(model._meta.db_table))
# Output DROP TABLE statements for many-to-many tables.
for model in app_models:
opts = model._meta
for f in opts.many_to_many:
if cursor and table_name_converter(f.m2m_db_table()) in table_names:
output.append("%s %s;" % (style.SQL_KEYWORD('DROP TABLE'),
style.SQL_TABLE(backend.quote_name(f.m2m_db_table()))))
if hasattr(backend, 'get_drop_sequence'):
output.append(backend.get_drop_sequence("%s_%s" % (model._meta.db_table, f.column)))
app_label = app_models[0]._meta.app_label
# Close database connection explicitly, in case this output is being piped
# directly into a database client, to avoid locking issues.
if cursor:
cursor.close()
connection.close()
return output[::-1] # Reverse it, to deal with table dependencies.
def sql_reset(app, style):
"Returns a list of the DROP TABLE SQL, then the CREATE TABLE SQL, for the given module."
return sql_delete(app, style) + sql_all(app, style)
def sql_flush(style):
"Returns a list of the SQL statements used to flush the database"
from django.db import backend
statements = backend.get_sql_flush(style, table_list(), sequence_list())
return statements
def sql_custom(app):
"Returns a list of the custom table modifying SQL statements for the given app."
from django.db.models import get_models
output = []
app_models = get_models(app)
app_dir = os.path.normpath(os.path.join(os.path.dirname(app.__file__), 'sql'))
for model in app_models:
output.extend(custom_sql_for_model(model))
return output
def sql_indexes(app, style):
"Returns a list of the CREATE INDEX SQL statements for all models in the given app."
from django.db import models
output = []
for model in models.get_models(app):
output.extend(sql_indexes_for_model(model, style))
return output
def sql_all(app, style):
"Returns a list of CREATE TABLE SQL, initial-data inserts, and CREATE INDEX SQL for the given module."
return sql_create(app, style) + sql_custom(app) + sql_indexes(app, style)
def sql_model_create(model, style, known_models=set()):
"""
Returns the SQL required to create a single model, as a tuple of:
(list_of_sql, pending_references_dict)
"""
from django.db import backend, models
opts = model._meta
final_output = []
table_output = []
pending_references = {}
for f in opts.fields:
col_type = f.db_type()
tablespace = f.db_tablespace or opts.db_tablespace
if col_type is None:
# Skip ManyToManyFields, because they're not represented as
# database columns in this table.
continue
# Make the definition (e.g. 'foo VARCHAR(30)') for this field.
field_output = [style.SQL_FIELD(backend.quote_name(f.column)),
style.SQL_COLTYPE(col_type)]
field_output.append(style.SQL_KEYWORD('%sNULL' % (not f.null and 'NOT ' or '')))
if f.unique and (not f.primary_key or backend.allows_unique_and_pk):
field_output.append(style.SQL_KEYWORD('UNIQUE'))
if f.primary_key:
field_output.append(style.SQL_KEYWORD('PRIMARY KEY'))
if tablespace and backend.supports_tablespaces and (f.unique or f.primary_key) and backend.autoindexes_primary_keys:
# We must specify the index tablespace inline, because we
# won't be generating a CREATE INDEX statement for this field.
field_output.append(backend.get_tablespace_sql(tablespace, inline=True))
if f.rel:
if f.rel.to in known_models:
field_output.append(style.SQL_KEYWORD('REFERENCES') + ' ' + \
style.SQL_TABLE(backend.quote_name(f.rel.to._meta.db_table)) + ' (' + \
style.SQL_FIELD(backend.quote_name(f.rel.to._meta.get_field(f.rel.field_name).column)) + ')' +
backend.get_deferrable_sql()
)
else:
# We haven't yet created the table to which this field
# is related, so save it for later.
pr = pending_references.setdefault(f.rel.to, []).append((model, f))
table_output.append(' '.join(field_output))
if opts.order_with_respect_to:
table_output.append(style.SQL_FIELD(backend.quote_name('_order')) + ' ' + \
style.SQL_COLTYPE(models.IntegerField().db_type()) + ' ' + \
style.SQL_KEYWORD('NULL'))
for field_constraints in opts.unique_together:
table_output.append(style.SQL_KEYWORD('UNIQUE') + ' (%s)' % \
", ".join([backend.quote_name(style.SQL_FIELD(opts.get_field(f).column)) for f in field_constraints]))
full_statement = [style.SQL_KEYWORD('CREATE TABLE') + ' ' + style.SQL_TABLE(backend.quote_name(opts.db_table)) + ' (']
for i, line in enumerate(table_output): # Combine and add commas.
full_statement.append(' %s%s' % (line, i < len(table_output)-1 and ',' or ''))
full_statement.append(')')
if opts.db_tablespace and backend.supports_tablespaces:
full_statement.append(backend.get_tablespace_sql(opts.db_tablespace))
full_statement.append(';')
final_output.append('\n'.join(full_statement))
if opts.has_auto_field and hasattr(backend, 'get_autoinc_sql'):
# Add any extra SQL needed to support auto-incrementing primary keys
autoinc_sql = backend.get_autoinc_sql(opts.db_table)
if autoinc_sql:
for stmt in autoinc_sql:
final_output.append(stmt)
return final_output, pending_references
def sql_for_pending_references(model, style, pending_references):
"""
Returns any ALTER TABLE statements to add constraints after the fact.
"""
from django.db import backend
from django.db.backends.util import truncate_name
final_output = []
if backend.supports_constraints:
opts = model._meta
if model in pending_references:
for rel_class, f in pending_references[model]:
rel_opts = rel_class._meta
r_table = rel_opts.db_table
r_col = f.column
table = opts.db_table
col = opts.get_field(f.rel.field_name).column
# For MySQL, r_name must be unique in the first 64 characters.
# So we are careful with character usage here.
r_name = '%s_refs_%s_%x' % (r_col, col, abs(hash((r_table, table))))
final_output.append(style.SQL_KEYWORD('ALTER TABLE') + ' %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s)%s;' % \
(backend.quote_name(r_table), truncate_name(r_name, backend.get_max_name_length()),
backend.quote_name(r_col), backend.quote_name(table), backend.quote_name(col),
backend.get_deferrable_sql()))
del pending_references[model]
return final_output
def many_to_many_sql_for_model(model, style):
from django.db import backend, models
from django.contrib.contenttypes import generic
opts = model._meta
final_output = []
for f in opts.many_to_many:
if not isinstance(f.rel, generic.GenericRel):
tablespace = f.db_tablespace or opts.db_tablespace
if tablespace and backend.supports_tablespaces and backend.autoindexes_primary_keys:
tablespace_sql = ' ' + backend.get_tablespace_sql(tablespace, inline=True)
else:
tablespace_sql = ''
table_output = [style.SQL_KEYWORD('CREATE TABLE') + ' ' + \
style.SQL_TABLE(backend.quote_name(f.m2m_db_table())) + ' (']
table_output.append(' %s %s %s%s,' % \
(style.SQL_FIELD(backend.quote_name('id')),
style.SQL_COLTYPE(models.AutoField(primary_key=True).db_type()),
style.SQL_KEYWORD('NOT NULL PRIMARY KEY'),
tablespace_sql))
table_output.append(' %s %s %s %s (%s)%s,' % \
(style.SQL_FIELD(backend.quote_name(f.m2m_column_name())),
style.SQL_COLTYPE(models.ForeignKey(model).db_type()),
style.SQL_KEYWORD('NOT NULL REFERENCES'),
style.SQL_TABLE(backend.quote_name(opts.db_table)),
style.SQL_FIELD(backend.quote_name(opts.pk.column)),
backend.get_deferrable_sql()))
table_output.append(' %s %s %s %s (%s)%s,' % \
(style.SQL_FIELD(backend.quote_name(f.m2m_reverse_name())),
style.SQL_COLTYPE(models.ForeignKey(f.rel.to).db_type()),
style.SQL_KEYWORD('NOT NULL REFERENCES'),
style.SQL_TABLE(backend.quote_name(f.rel.to._meta.db_table)),
style.SQL_FIELD(backend.quote_name(f.rel.to._meta.pk.column)),
backend.get_deferrable_sql()))
table_output.append(' %s (%s, %s)%s' % \
(style.SQL_KEYWORD('UNIQUE'),
style.SQL_FIELD(backend.quote_name(f.m2m_column_name())),
style.SQL_FIELD(backend.quote_name(f.m2m_reverse_name())),
tablespace_sql))
table_output.append(')')
if opts.db_tablespace and backend.supports_tablespaces:
# f.db_tablespace is only for indices, so ignore its value here.
table_output.append(backend.get_tablespace_sql(opts.db_tablespace))
table_output.append(';')
final_output.append('\n'.join(table_output))
# Add any extra SQL needed to support auto-incrementing PKs
autoinc_sql = backend.get_autoinc_sql(f.m2m_db_table())
if autoinc_sql:
for stmt in autoinc_sql:
final_output.append(stmt)
return final_output
def custom_sql_for_model(model):
from django.db import models
from django.conf import settings
opts = model._meta
app_dir = os.path.normpath(os.path.join(os.path.dirname(models.get_app(model._meta.app_label).__file__), 'sql'))
output = []
# Some backends can't execute more than one SQL statement at a time,
# so split into separate statements.
statements = re.compile(r";[ \t]*$", re.M)
# Find custom SQL, if it's available.
sql_files = [os.path.join(app_dir, "%s.%s.sql" % (opts.object_name.lower(), settings.DATABASE_ENGINE)),
os.path.join(app_dir, "%s.sql" % opts.object_name.lower())]
for sql_file in sql_files:
if os.path.exists(sql_file):
fp = open(sql_file, 'U')
for statement in statements.split(fp.read().decode(settings.FILE_CHARSET)):
# Remove any comments from the file
statement = re.sub(ur"--.*[\n\Z]", "", statement)
if statement.strip():
output.append(statement + u";")
fp.close()
return output
def sql_indexes_for_model(model, style):
"Returns the CREATE INDEX SQL statements for a single model"
from django.db import backend
output = []
for f in model._meta.fields:
if f.db_index and not ((f.primary_key or f.unique) and backend.autoindexes_primary_keys):
unique = f.unique and 'UNIQUE ' or ''
tablespace = f.db_tablespace or model._meta.db_tablespace
if tablespace and backend.supports_tablespaces:
tablespace_sql = ' ' + backend.get_tablespace_sql(tablespace)
else:
tablespace_sql = ''
output.append(
style.SQL_KEYWORD('CREATE %sINDEX' % unique) + ' ' + \
style.SQL_TABLE(backend.quote_name('%s_%s' % (model._meta.db_table, f.column))) + ' ' + \
style.SQL_KEYWORD('ON') + ' ' + \
style.SQL_TABLE(backend.quote_name(model._meta.db_table)) + ' ' + \
"(%s)" % style.SQL_FIELD(backend.quote_name(f.column)) + \
"%s;" % tablespace_sql
)
return output
def emit_post_sync_signal(created_models, verbosity, interactive):
from django.db import models
from django.dispatch import dispatcher
# Emit the post_sync signal for every application.
for app in models.get_apps():
app_name = app.__name__.split('.')[-2]
if verbosity >= 2:
print "Running post-sync handlers for application", app_name
dispatcher.send(signal=models.signals.post_syncdb, sender=app,
app=app, created_models=created_models,
verbosity=verbosity, interactive=interactive)

View File

@ -0,0 +1,221 @@
import sys
from django.core.management.color import color_style
class ModelErrorCollection:
def __init__(self, outfile=sys.stdout):
self.errors = []
self.outfile = outfile
self.style = color_style()
def add(self, context, error):
self.errors.append((context, error))
self.outfile.write(self.style.ERROR("%s: %s\n" % (context, error)))
def get_validation_errors(outfile, app=None):
"""
Validates all models that are part of the specified app. If no app name is provided,
validates all models of all installed apps. Writes errors, if any, to outfile.
Returns number of errors.
"""
from django.conf import settings
from django.db import models, connection
from django.db.models.loading import get_app_errors
from django.db.models.fields.related import RelatedObject
e = ModelErrorCollection(outfile)
for (app_name, error) in get_app_errors().items():
e.add(app_name, error)
for cls in models.get_models(app):
opts = cls._meta
# Do field-specific validation.
for f in opts.fields:
if f.name == 'id' and not f.primary_key and opts.pk.name == 'id':
e.add(opts, '"%s": You can\'t use "id" as a field name, because each model automatically gets an "id" field if none of the fields have primary_key=True. You need to either remove/rename your "id" field or add primary_key=True to a field.' % f.name)
if isinstance(f, models.CharField) and f.max_length in (None, 0):
e.add(opts, '"%s": CharFields require a "max_length" attribute.' % f.name)
if isinstance(f, models.DecimalField):
if f.decimal_places is None:
e.add(opts, '"%s": DecimalFields require a "decimal_places" attribute.' % f.name)
if f.max_digits is None:
e.add(opts, '"%s": DecimalFields require a "max_digits" attribute.' % f.name)
if isinstance(f, models.FileField) and not f.upload_to:
e.add(opts, '"%s": FileFields require an "upload_to" attribute.' % f.name)
if isinstance(f, models.ImageField):
try:
from PIL import Image
except ImportError:
e.add(opts, '"%s": To use ImageFields, you need to install the Python Imaging Library. Get it at http://www.pythonware.com/products/pil/ .' % f.name)
if f.prepopulate_from is not None and type(f.prepopulate_from) not in (list, tuple):
e.add(opts, '"%s": prepopulate_from should be a list or tuple.' % f.name)
if f.choices:
if not hasattr(f.choices, '__iter__'):
e.add(opts, '"%s": "choices" should be iterable (e.g., a tuple or list).' % f.name)
else:
for c in f.choices:
if not type(c) in (tuple, list) or len(c) != 2:
e.add(opts, '"%s": "choices" should be a sequence of two-tuples.' % f.name)
if f.db_index not in (None, True, False):
e.add(opts, '"%s": "db_index" should be either None, True or False.' % f.name)
# Check that max_length <= 255 if using older MySQL versions.
if settings.DATABASE_ENGINE == 'mysql':
db_version = connection.get_server_version()
if db_version < (5, 0, 3) and isinstance(f, (models.CharField, models.CommaSeparatedIntegerField, models.SlugField)) and f.max_length > 255:
e.add(opts, '"%s": %s cannot have a "max_length" greater than 255 when you are using a version of MySQL prior to 5.0.3 (you are using %s).' % (f.name, f.__class__.__name__, '.'.join([str(n) for n in db_version[:3]])))
# Check to see if the related field will clash with any
# existing fields, m2m fields, m2m related objects or related objects
if f.rel:
rel_opts = f.rel.to._meta
if f.rel.to not in models.get_models():
e.add(opts, "'%s' has relation with model %s, which has not been installed" % (f.name, rel_opts.object_name))
rel_name = RelatedObject(f.rel.to, cls, f).get_accessor_name()
rel_query_name = f.related_query_name()
for r in rel_opts.fields:
if r.name == rel_name:
e.add(opts, "Accessor for field '%s' clashes with field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.name, f.name))
if r.name == rel_query_name:
e.add(opts, "Reverse query name for field '%s' clashes with field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.name, f.name))
for r in rel_opts.many_to_many:
if r.name == rel_name:
e.add(opts, "Accessor for field '%s' clashes with m2m field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.name, f.name))
if r.name == rel_query_name:
e.add(opts, "Reverse query name for field '%s' clashes with m2m field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.name, f.name))
for r in rel_opts.get_all_related_many_to_many_objects():
if r.get_accessor_name() == rel_name:
e.add(opts, "Accessor for field '%s' clashes with related m2m field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.get_accessor_name(), f.name))
if r.get_accessor_name() == rel_query_name:
e.add(opts, "Reverse query name for field '%s' clashes with related m2m field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.get_accessor_name(), f.name))
for r in rel_opts.get_all_related_objects():
if r.field is not f:
if r.get_accessor_name() == rel_name:
e.add(opts, "Accessor for field '%s' clashes with related field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.get_accessor_name(), f.name))
if r.get_accessor_name() == rel_query_name:
e.add(opts, "Reverse query name for field '%s' clashes with related field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.get_accessor_name(), f.name))
for i, f in enumerate(opts.many_to_many):
# Check to see if the related m2m field will clash with any
# existing fields, m2m fields, m2m related objects or related objects
rel_opts = f.rel.to._meta
if f.rel.to not in models.get_models():
e.add(opts, "'%s' has m2m relation with model %s, which has not been installed" % (f.name, rel_opts.object_name))
rel_name = RelatedObject(f.rel.to, cls, f).get_accessor_name()
rel_query_name = f.related_query_name()
# If rel_name is none, there is no reverse accessor.
# (This only occurs for symmetrical m2m relations to self).
# If this is the case, there are no clashes to check for this field, as
# there are no reverse descriptors for this field.
if rel_name is not None:
for r in rel_opts.fields:
if r.name == rel_name:
e.add(opts, "Accessor for m2m field '%s' clashes with field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.name, f.name))
if r.name == rel_query_name:
e.add(opts, "Reverse query name for m2m field '%s' clashes with field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.name, f.name))
for r in rel_opts.many_to_many:
if r.name == rel_name:
e.add(opts, "Accessor for m2m field '%s' clashes with m2m field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.name, f.name))
if r.name == rel_query_name:
e.add(opts, "Reverse query name for m2m field '%s' clashes with m2m field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.name, f.name))
for r in rel_opts.get_all_related_many_to_many_objects():
if r.field is not f:
if r.get_accessor_name() == rel_name:
e.add(opts, "Accessor for m2m field '%s' clashes with related m2m field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.get_accessor_name(), f.name))
if r.get_accessor_name() == rel_query_name:
e.add(opts, "Reverse query name for m2m field '%s' clashes with related m2m field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.get_accessor_name(), f.name))
for r in rel_opts.get_all_related_objects():
if r.get_accessor_name() == rel_name:
e.add(opts, "Accessor for m2m field '%s' clashes with related field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.get_accessor_name(), f.name))
if r.get_accessor_name() == rel_query_name:
e.add(opts, "Reverse query name for m2m field '%s' clashes with related field '%s.%s'. Add a related_name argument to the definition for '%s'." % (f.name, rel_opts.object_name, r.get_accessor_name(), f.name))
# Check admin attribute.
if opts.admin is not None:
if not isinstance(opts.admin, models.AdminOptions):
e.add(opts, '"admin" attribute, if given, must be set to a models.AdminOptions() instance.')
else:
# list_display
if not isinstance(opts.admin.list_display, (list, tuple)):
e.add(opts, '"admin.list_display", if given, must be set to a list or tuple.')
else:
for fn in opts.admin.list_display:
try:
f = opts.get_field(fn)
except models.FieldDoesNotExist:
if not hasattr(cls, fn):
e.add(opts, '"admin.list_display" refers to %r, which isn\'t an attribute, method or property.' % fn)
else:
if isinstance(f, models.ManyToManyField):
e.add(opts, '"admin.list_display" doesn\'t support ManyToManyFields (%r).' % fn)
# list_display_links
if opts.admin.list_display_links and not opts.admin.list_display:
e.add(opts, '"admin.list_display" must be defined for "admin.list_display_links" to be used.')
if not isinstance(opts.admin.list_display_links, (list, tuple)):
e.add(opts, '"admin.list_display_links", if given, must be set to a list or tuple.')
else:
for fn in opts.admin.list_display_links:
try:
f = opts.get_field(fn)
except models.FieldDoesNotExist:
if not hasattr(cls, fn):
e.add(opts, '"admin.list_display_links" refers to %r, which isn\'t an attribute, method or property.' % fn)
if fn not in opts.admin.list_display:
e.add(opts, '"admin.list_display_links" refers to %r, which is not defined in "admin.list_display".' % fn)
# list_filter
if not isinstance(opts.admin.list_filter, (list, tuple)):
e.add(opts, '"admin.list_filter", if given, must be set to a list or tuple.')
else:
for fn in opts.admin.list_filter:
try:
f = opts.get_field(fn)
except models.FieldDoesNotExist:
e.add(opts, '"admin.list_filter" refers to %r, which isn\'t a field.' % fn)
# date_hierarchy
if opts.admin.date_hierarchy:
try:
f = opts.get_field(opts.admin.date_hierarchy)
except models.FieldDoesNotExist:
e.add(opts, '"admin.date_hierarchy" refers to %r, which isn\'t a field.' % opts.admin.date_hierarchy)
# Check ordering attribute.
if opts.ordering:
for field_name in opts.ordering:
if field_name == '?': continue
if field_name.startswith('-'):
field_name = field_name[1:]
if opts.order_with_respect_to and field_name == '_order':
continue
if '.' in field_name: continue # Skip ordering in the format 'table.field'.
try:
opts.get_field(field_name, many_to_many=False)
except models.FieldDoesNotExist:
e.add(opts, '"ordering" refers to "%s", a field that doesn\'t exist.' % field_name)
# Check core=True, if needed.
for related in opts.get_followed_related_objects():
if not related.edit_inline:
continue
try:
for f in related.opts.fields:
if f.core:
raise StopIteration
e.add(related.opts, "At least one field in %s should have core=True, because it's being edited inline by %s.%s." % (related.opts.object_name, opts.module_name, opts.object_name))
except StopIteration:
pass
# Check unique_together.
for ut in opts.unique_together:
for field_name in ut:
try:
f = opts.get_field(field_name, many_to_many=True)
except models.FieldDoesNotExist:
e.add(opts, '"unique_together" refers to %s, a field that doesn\'t exist. Check your syntax.' % field_name)
else:
if isinstance(f.rel, models.ManyToManyRel):
e.add(opts, '"unique_together" refers to %s. ManyToManyFields are not supported in unique_together.' % f.name)
return len(e.errors)

View File

@ -1,7 +1,8 @@
import re, unittest import re, unittest
from urlparse import urlparse from urlparse import urlparse
from django.db import transaction from django.db import transaction
from django.core import management, mail from django.core import mail
from django.core.management import call_command
from django.db.models import get_apps from django.db.models import get_apps
from django.test import _doctest as doctest from django.test import _doctest as doctest
from django.test.client import Client from django.test.client import Client
@ -42,9 +43,11 @@ class TestCase(unittest.TestCase):
* Clearing the mail test outbox. * Clearing the mail test outbox.
""" """
management.flush(verbosity=0, interactive=False) call_command('flush', verbosity=0, interactive=False)
if hasattr(self, 'fixtures'): if hasattr(self, 'fixtures'):
management.load_data(self.fixtures, verbosity=0) # We have to use this slightly awkward syntax due to the fact
# that we're using *args and **kwargs together.
call_command('loaddata', *self.fixtures, **{'verbosity': 0})
mail.outbox = [] mail.outbox = []
def __call__(self, result=None): def __call__(self, result=None):

View File

@ -1,8 +1,8 @@
import sys, time import sys, time
from django.conf import settings from django.conf import settings
from django.db import connection, backend, get_creation_module from django.db import connection, backend, get_creation_module
from django.core import management, mail from django.core import mail
from django.core import management, mail from django.core.management import call_command
from django.dispatch import dispatcher from django.dispatch import dispatcher
from django.test import signals from django.test import signals
from django.template import Template from django.template import Template
@ -18,12 +18,12 @@ def instrumented_test_render(self, context):
""" """
dispatcher.send(signal=signals.template_rendered, sender=self, template=self, context=context) dispatcher.send(signal=signals.template_rendered, sender=self, template=self, context=context)
return self.nodelist.render(context) return self.nodelist.render(context)
class TestSMTPConnection(object): class TestSMTPConnection(object):
"""A substitute SMTP connection for use during test sessions. """A substitute SMTP connection for use during test sessions.
The test connection stores email messages in a dummy outbox, The test connection stores email messages in a dummy outbox,
rather than sending them out on the wire. rather than sending them out on the wire.
""" """
def __init__(*args, **kwargs): def __init__(*args, **kwargs):
pass pass
@ -39,34 +39,34 @@ class TestSMTPConnection(object):
def setup_test_environment(): def setup_test_environment():
"""Perform any global pre-test setup. This involves: """Perform any global pre-test setup. This involves:
- Installing the instrumented test renderer - Installing the instrumented test renderer
- Diverting the email sending functions to a test buffer - Diverting the email sending functions to a test buffer
""" """
Template.original_render = Template.render Template.original_render = Template.render
Template.render = instrumented_test_render Template.render = instrumented_test_render
mail.original_SMTPConnection = mail.SMTPConnection mail.original_SMTPConnection = mail.SMTPConnection
mail.SMTPConnection = TestSMTPConnection mail.SMTPConnection = TestSMTPConnection
mail.outbox = [] mail.outbox = []
def teardown_test_environment(): def teardown_test_environment():
"""Perform any global post-test teardown. This involves: """Perform any global post-test teardown. This involves:
- Restoring the original test renderer - Restoring the original test renderer
- Restoring the email sending functions - Restoring the email sending functions
""" """
Template.render = Template.original_render Template.render = Template.original_render
del Template.original_render del Template.original_render
mail.SMTPConnection = mail.original_SMTPConnection mail.SMTPConnection = mail.original_SMTPConnection
del mail.original_SMTPConnection del mail.original_SMTPConnection
del mail.outbox del mail.outbox
def _set_autocommit(connection): def _set_autocommit(connection):
"Make sure a connection is in autocommit mode." "Make sure a connection is in autocommit mode."
if hasattr(connection.connection, "autocommit"): if hasattr(connection.connection, "autocommit"):
@ -94,7 +94,7 @@ def create_test_db(verbosity=1, autoclobber=False):
if hasattr(creation_module, "create_test_db"): if hasattr(creation_module, "create_test_db"):
creation_module.create_test_db(settings, connection, backend, verbosity, autoclobber) creation_module.create_test_db(settings, connection, backend, verbosity, autoclobber)
return return
if verbosity >= 1: if verbosity >= 1:
print "Creating test database..." print "Creating test database..."
# If we're using SQLite, it's more convenient to test against an # If we're using SQLite, it's more convenient to test against an
@ -112,22 +112,22 @@ def create_test_db(verbosity=1, autoclobber=False):
TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME TEST_DATABASE_NAME = settings.TEST_DATABASE_NAME
else: else:
TEST_DATABASE_NAME = TEST_DATABASE_PREFIX + settings.DATABASE_NAME TEST_DATABASE_NAME = TEST_DATABASE_PREFIX + settings.DATABASE_NAME
# Create the test database and connect to it. We need to autocommit # Create the test database and connect to it. We need to autocommit
# if the database supports it because PostgreSQL doesn't allow # if the database supports it because PostgreSQL doesn't allow
# CREATE/DROP DATABASE statements within transactions. # CREATE/DROP DATABASE statements within transactions.
cursor = connection.cursor() cursor = connection.cursor()
_set_autocommit(connection) _set_autocommit(connection)
try: try:
cursor.execute("CREATE DATABASE %s %s" % (backend.quote_name(TEST_DATABASE_NAME), suffix)) cursor.execute("CREATE DATABASE %s %s" % (backend.quote_name(TEST_DATABASE_NAME), suffix))
except Exception, e: except Exception, e:
sys.stderr.write("Got an error creating the test database: %s\n" % e) sys.stderr.write("Got an error creating the test database: %s\n" % e)
if not autoclobber: if not autoclobber:
confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME) confirm = raw_input("Type 'yes' if you would like to try deleting the test database '%s', or 'no' to cancel: " % TEST_DATABASE_NAME)
if autoclobber or confirm == 'yes': if autoclobber or confirm == 'yes':
try: try:
if verbosity >= 1: if verbosity >= 1:
print "Destroying old test database..." print "Destroying old test database..."
cursor.execute("DROP DATABASE %s" % backend.quote_name(TEST_DATABASE_NAME)) cursor.execute("DROP DATABASE %s" % backend.quote_name(TEST_DATABASE_NAME))
if verbosity >= 1: if verbosity >= 1:
print "Creating test database..." print "Creating test database..."
@ -138,15 +138,15 @@ def create_test_db(verbosity=1, autoclobber=False):
else: else:
print "Tests cancelled." print "Tests cancelled."
sys.exit(1) sys.exit(1)
connection.close() connection.close()
settings.DATABASE_NAME = TEST_DATABASE_NAME settings.DATABASE_NAME = TEST_DATABASE_NAME
management.syncdb(verbosity, interactive=False) call_command('syncdb', verbosity=verbosity, interactive=False)
if settings.CACHE_BACKEND.startswith('db://'): if settings.CACHE_BACKEND.startswith('db://'):
cache_name = settings.CACHE_BACKEND[len('db://'):] cache_name = settings.CACHE_BACKEND[len('db://'):]
management.createcachetable(cache_name) call_command('createcachetable', cache_name)
# Get a cursor (even though we don't need one yet). This has # Get a cursor (even though we don't need one yet). This has
# the side effect of initializing the test database. # the side effect of initializing the test database.
@ -158,7 +158,7 @@ def destroy_test_db(old_database_name, verbosity=1):
if hasattr(creation_module, "destroy_test_db"): if hasattr(creation_module, "destroy_test_db"):
creation_module.destroy_test_db(settings, connection, backend, old_database_name, verbosity) creation_module.destroy_test_db(settings, connection, backend, old_database_name, verbosity)
return return
# Unless we're using SQLite, remove the test database to clean up after # Unless we're using SQLite, remove the test database to clean up after
# ourselves. Connect to the previous database (not the test database) # ourselves. Connect to the previous database (not the test database)
# to do so, because it's not allowed to delete a database while being # to do so, because it's not allowed to delete a database while being

View File

@ -26,54 +26,54 @@ __test__ = {'API_TESTS': """
# Reset the database representation of this app. # Reset the database representation of this app.
# This will return the database to a clean initial state. # This will return the database to a clean initial state.
>>> management.flush(verbosity=0, interactive=False) >>> management.call_command('flush', verbosity=0, interactive=False)
# Syncdb introduces 1 initial data object from initial_data.json. # Syncdb introduces 1 initial data object from initial_data.json.
>>> Article.objects.all() >>> Article.objects.all()
[<Article: Python program becomes self aware>] [<Article: Python program becomes self aware>]
# Load fixture 1. Single JSON file, with two objects. # Load fixture 1. Single JSON file, with two objects.
>>> management.load_data(['fixture1.json'], verbosity=0) >>> management.call_command('loaddata', 'fixture1.json', verbosity=0)
>>> Article.objects.all() >>> Article.objects.all()
[<Article: Time to reform copyright>, <Article: Poker has no place on ESPN>, <Article: Python program becomes self aware>] [<Article: Time to reform copyright>, <Article: Poker has no place on ESPN>, <Article: Python program becomes self aware>]
# Load fixture 2. JSON file imported by default. Overwrites some existing objects # Load fixture 2. JSON file imported by default. Overwrites some existing objects
>>> management.load_data(['fixture2.json'], verbosity=0) >>> management.call_command('loaddata', 'fixture2.json', verbosity=0)
>>> Article.objects.all() >>> Article.objects.all()
[<Article: Django conquers world!>, <Article: Copyright is fine the way it is>, <Article: Poker has no place on ESPN>, <Article: Python program becomes self aware>] [<Article: Django conquers world!>, <Article: Copyright is fine the way it is>, <Article: Poker has no place on ESPN>, <Article: Python program becomes self aware>]
# Load fixture 3, XML format. # Load fixture 3, XML format.
>>> management.load_data(['fixture3.xml'], verbosity=0) >>> management.call_command('loaddata', 'fixture3.xml', verbosity=0)
>>> Article.objects.all() >>> Article.objects.all()
[<Article: XML identified as leading cause of cancer>, <Article: Django conquers world!>, <Article: Copyright is fine the way it is>, <Article: Poker on TV is great!>, <Article: Python program becomes self aware>] [<Article: XML identified as leading cause of cancer>, <Article: Django conquers world!>, <Article: Copyright is fine the way it is>, <Article: Poker on TV is great!>, <Article: Python program becomes self aware>]
# Load a fixture that doesn't exist # Load a fixture that doesn't exist
>>> management.load_data(['unknown.json'], verbosity=0) >>> management.call_command('loaddata', 'unknown.json', verbosity=0)
# object list is unaffected # object list is unaffected
>>> Article.objects.all() >>> Article.objects.all()
[<Article: XML identified as leading cause of cancer>, <Article: Django conquers world!>, <Article: Copyright is fine the way it is>, <Article: Poker on TV is great!>, <Article: Python program becomes self aware>] [<Article: XML identified as leading cause of cancer>, <Article: Django conquers world!>, <Article: Copyright is fine the way it is>, <Article: Poker on TV is great!>, <Article: Python program becomes self aware>]
# Reset the database representation of this app. This will delete all data. # Reset the database representation of this app. This will delete all data.
>>> management.flush(verbosity=0, interactive=False) >>> management.call_command('flush', verbosity=0, interactive=False)
>>> Article.objects.all() >>> Article.objects.all()
[<Article: Python program becomes self aware>] [<Article: Python program becomes self aware>]
# Load fixture 1 again, using format discovery # Load fixture 1 again, using format discovery
>>> management.load_data(['fixture1'], verbosity=0) >>> management.call_command('loaddata', 'fixture1', verbosity=0)
>>> Article.objects.all() >>> Article.objects.all()
[<Article: Time to reform copyright>, <Article: Poker has no place on ESPN>, <Article: Python program becomes self aware>] [<Article: Time to reform copyright>, <Article: Poker has no place on ESPN>, <Article: Python program becomes self aware>]
# Try to load fixture 2 using format discovery; this will fail # Try to load fixture 2 using format discovery; this will fail
# because there are two fixture2's in the fixtures directory # because there are two fixture2's in the fixtures directory
>>> management.load_data(['fixture2'], verbosity=0) # doctest: +ELLIPSIS >>> management.call_command('loaddata', 'fixture2', verbosity=0) # doctest: +ELLIPSIS
Multiple fixtures named 'fixture2' in '...fixtures'. Aborting. Multiple fixtures named 'fixture2' in '...fixtures'. Aborting.
>>> Article.objects.all() >>> Article.objects.all()
[<Article: Time to reform copyright>, <Article: Poker has no place on ESPN>, <Article: Python program becomes self aware>] [<Article: Time to reform copyright>, <Article: Poker has no place on ESPN>, <Article: Python program becomes self aware>]
# Dump the current contents of the database as a JSON fixture # Dump the current contents of the database as a JSON fixture
>>> print management.dump_data(['fixtures'], format='json') >>> print management.call_command('dumpdata', 'fixtures', format='json')
[{"pk": "3", "model": "fixtures.article", "fields": {"headline": "Time to reform copyright", "pub_date": "2006-06-16 13:00:00"}}, {"pk": "2", "model": "fixtures.article", "fields": {"headline": "Poker has no place on ESPN", "pub_date": "2006-06-16 12:00:00"}}, {"pk": "1", "model": "fixtures.article", "fields": {"headline": "Python program becomes self aware", "pub_date": "2006-06-16 11:00:00"}}] [{"pk": "3", "model": "fixtures.article", "fields": {"headline": "Time to reform copyright", "pub_date": "2006-06-16 13:00:00"}}, {"pk": "2", "model": "fixtures.article", "fields": {"headline": "Poker has no place on ESPN", "pub_date": "2006-06-16 12:00:00"}}, {"pk": "1", "model": "fixtures.article", "fields": {"headline": "Python program becomes self aware", "pub_date": "2006-06-16 11:00:00"}}]
"""} """}

View File

@ -26,7 +26,7 @@ __test__ = {'API_TESTS':"""
>>> from django.core import management >>> from django.core import management
# Load a fixture that uses PK=1 # Load a fixture that uses PK=1
>>> management.load_data(['sequence'], verbosity=0) >>> management.call_command('loaddata', 'sequence', verbosity=0)
# Create a new animal. Without a sequence reset, this new object # Create a new animal. Without a sequence reset, this new object
# will take a PK of 1 (on Postgres), and the save will fail. # will take a PK of 1 (on Postgres), and the save will fail.
@ -39,7 +39,7 @@ __test__ = {'API_TESTS':"""
# doesn't affect parsing of None values. # doesn't affect parsing of None values.
# Load a pretty-printed XML fixture with Nulls. # Load a pretty-printed XML fixture with Nulls.
>>> management.load_data(['pretty.xml'], verbosity=0) >>> management.call_command('loaddata', 'pretty.xml', verbosity=0)
>>> Stuff.objects.all() >>> Stuff.objects.all()
[<Stuff: None is owned by None>] [<Stuff: None is owned by None>]

View File

@ -273,7 +273,7 @@ class SerializerTests(unittest.TestCase):
def serializerTest(format, self): def serializerTest(format, self):
# Clear the database first # Clear the database first
management.flush(verbosity=0, interactive=False) management.call_command('flush', verbosity=0, interactive=False)
# Create all the objects defined in the test data # Create all the objects defined in the test data
objects = [] objects = []
@ -291,7 +291,7 @@ def serializerTest(format, self):
serialized_data = serializers.serialize(format, objects, indent=2) serialized_data = serializers.serialize(format, objects, indent=2)
# Flush the database and recreate from the serialized data # Flush the database and recreate from the serialized data
management.flush(verbosity=0, interactive=False) management.call_command('flush', verbosity=0, interactive=False)
transaction.enter_transaction_management() transaction.enter_transaction_management()
transaction.managed(True) transaction.managed(True)
for obj in serializers.deserialize(format, serialized_data): for obj in serializers.deserialize(format, serialized_data):
@ -306,7 +306,7 @@ def serializerTest(format, self):
def fieldsTest(format, self): def fieldsTest(format, self):
# Clear the database first # Clear the database first
management.flush(verbosity=0, interactive=False) management.call_command('flush', verbosity=0, interactive=False)
obj = ComplexModel(field1='first',field2='second',field3='third') obj = ComplexModel(field1='first',field2='second',field3='third')
obj.save(raw=True) obj.save(raw=True)
@ -322,7 +322,7 @@ def fieldsTest(format, self):
def streamTest(format, self): def streamTest(format, self):
# Clear the database first # Clear the database first
management.flush(verbosity=0, interactive=False) management.call_command('flush', verbosity=0, interactive=False)
obj = ComplexModel(field1='first',field2='second',field3='third') obj = ComplexModel(field1='first',field2='second',field3='third')
obj.save(raw=True) obj.save(raw=True)

View File

@ -51,7 +51,7 @@ class InvalidModelTestCase(unittest.TestCase):
self.model_label = model_label self.model_label = model_label
def runTest(self): def runTest(self):
from django.core import management from django.core.management.validation import get_validation_errors
from django.db.models.loading import load_app from django.db.models.loading import load_app
from cStringIO import StringIO from cStringIO import StringIO
@ -61,7 +61,7 @@ class InvalidModelTestCase(unittest.TestCase):
self.fail('Unable to load invalid model module') self.fail('Unable to load invalid model module')
s = StringIO() s = StringIO()
count = management.get_validation_errors(s, module) count = get_validation_errors(s, module)
s.seek(0) s.seek(0)
error_log = s.read() error_log = s.read()
actual = error_log.split('\n') actual = error_log.split('\n')