581 lines
24 KiB
Python
581 lines
24 KiB
Python
import os
|
|
import re
|
|
from io import StringIO
|
|
from unittest import mock, skipUnless
|
|
|
|
from django.core.management import call_command
|
|
from django.db import connection
|
|
from django.db.backends.base.introspection import TableInfo
|
|
from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
|
|
|
|
from .models import PeopleMoreData, test_collation
|
|
|
|
|
|
def inspectdb_tables_only(table_name):
|
|
"""
|
|
Limit introspection to tables created for models of this app.
|
|
Some databases such as Oracle are extremely slow at introspection.
|
|
"""
|
|
return table_name.startswith("inspectdb_")
|
|
|
|
|
|
def inspectdb_views_only(table_name):
|
|
return table_name.startswith("inspectdb_") and table_name.endswith(
|
|
("_materialized", "_view")
|
|
)
|
|
|
|
|
|
def special_table_only(table_name):
|
|
return table_name.startswith("inspectdb_special")
|
|
|
|
|
|
class InspectDBTestCase(TestCase):
|
|
unique_re = re.compile(r".*unique_together = \((.+),\).*")
|
|
|
|
def test_stealth_table_name_filter_option(self):
|
|
out = StringIO()
|
|
call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
|
|
error_message = (
|
|
"inspectdb has examined a table that should have been filtered out."
|
|
)
|
|
# contrib.contenttypes is one of the apps always installed when running
|
|
# the Django test suite, check that one of its tables hasn't been
|
|
# inspected
|
|
self.assertNotIn(
|
|
"class DjangoContentType(models.Model):", out.getvalue(), msg=error_message
|
|
)
|
|
|
|
def test_table_option(self):
|
|
"""
|
|
inspectdb can inspect a subset of tables by passing the table names as
|
|
arguments.
|
|
"""
|
|
out = StringIO()
|
|
call_command("inspectdb", "inspectdb_people", stdout=out)
|
|
output = out.getvalue()
|
|
self.assertIn("class InspectdbPeople(models.Model):", output)
|
|
self.assertNotIn("InspectdbPeopledata", output)
|
|
|
|
def make_field_type_asserter(self):
|
|
"""
|
|
Call inspectdb and return a function to validate a field type in its
|
|
output.
|
|
"""
|
|
out = StringIO()
|
|
call_command("inspectdb", "inspectdb_columntypes", stdout=out)
|
|
output = out.getvalue()
|
|
|
|
def assertFieldType(name, definition):
|
|
out_def = re.search(r"^\s*%s = (models.*)$" % name, output, re.MULTILINE)[1]
|
|
self.assertEqual(definition, out_def)
|
|
|
|
return assertFieldType
|
|
|
|
def test_field_types(self):
|
|
"""Test introspection of various Django field types"""
|
|
assertFieldType = self.make_field_type_asserter()
|
|
introspected_field_types = connection.features.introspected_field_types
|
|
char_field_type = introspected_field_types["CharField"]
|
|
# Inspecting Oracle DB doesn't produce correct results (#19884):
|
|
# - it reports fields as blank=True when they aren't.
|
|
if (
|
|
not connection.features.interprets_empty_strings_as_nulls
|
|
and char_field_type == "CharField"
|
|
):
|
|
assertFieldType("char_field", "models.CharField(max_length=10)")
|
|
assertFieldType(
|
|
"null_char_field",
|
|
"models.CharField(max_length=10, blank=True, null=True)",
|
|
)
|
|
assertFieldType("email_field", "models.CharField(max_length=254)")
|
|
assertFieldType("file_field", "models.CharField(max_length=100)")
|
|
assertFieldType("file_path_field", "models.CharField(max_length=100)")
|
|
assertFieldType("slug_field", "models.CharField(max_length=50)")
|
|
assertFieldType("text_field", "models.TextField()")
|
|
assertFieldType("url_field", "models.CharField(max_length=200)")
|
|
if char_field_type == "TextField":
|
|
assertFieldType("char_field", "models.TextField()")
|
|
assertFieldType(
|
|
"null_char_field", "models.TextField(blank=True, null=True)"
|
|
)
|
|
assertFieldType("email_field", "models.TextField()")
|
|
assertFieldType("file_field", "models.TextField()")
|
|
assertFieldType("file_path_field", "models.TextField()")
|
|
assertFieldType("slug_field", "models.TextField()")
|
|
assertFieldType("text_field", "models.TextField()")
|
|
assertFieldType("url_field", "models.TextField()")
|
|
assertFieldType("date_field", "models.DateField()")
|
|
assertFieldType("date_time_field", "models.DateTimeField()")
|
|
if introspected_field_types["GenericIPAddressField"] == "GenericIPAddressField":
|
|
assertFieldType("gen_ip_address_field", "models.GenericIPAddressField()")
|
|
elif not connection.features.interprets_empty_strings_as_nulls:
|
|
assertFieldType("gen_ip_address_field", "models.CharField(max_length=39)")
|
|
assertFieldType(
|
|
"time_field", "models.%s()" % introspected_field_types["TimeField"]
|
|
)
|
|
if connection.features.has_native_uuid_field:
|
|
assertFieldType("uuid_field", "models.UUIDField()")
|
|
elif not connection.features.interprets_empty_strings_as_nulls:
|
|
assertFieldType("uuid_field", "models.CharField(max_length=32)")
|
|
|
|
@skipUnlessDBFeature("can_introspect_json_field", "supports_json_field")
|
|
def test_json_field(self):
|
|
out = StringIO()
|
|
call_command("inspectdb", "inspectdb_jsonfieldcolumntype", stdout=out)
|
|
output = out.getvalue()
|
|
if not connection.features.interprets_empty_strings_as_nulls:
|
|
self.assertIn("json_field = models.JSONField()", output)
|
|
self.assertIn(
|
|
"null_json_field = models.JSONField(blank=True, null=True)", output
|
|
)
|
|
|
|
@skipUnlessDBFeature("supports_collation_on_charfield")
|
|
@skipUnless(test_collation, "Language collations are not supported.")
|
|
def test_char_field_db_collation(self):
|
|
out = StringIO()
|
|
call_command("inspectdb", "inspectdb_charfielddbcollation", stdout=out)
|
|
output = out.getvalue()
|
|
if not connection.features.interprets_empty_strings_as_nulls:
|
|
self.assertIn(
|
|
"char_field = models.CharField(max_length=10, "
|
|
"db_collation='%s')" % test_collation,
|
|
output,
|
|
)
|
|
else:
|
|
self.assertIn(
|
|
"char_field = models.CharField(max_length=10, "
|
|
"db_collation='%s', blank=True, null=True)" % test_collation,
|
|
output,
|
|
)
|
|
|
|
@skipUnlessDBFeature("supports_collation_on_textfield")
|
|
@skipUnless(test_collation, "Language collations are not supported.")
|
|
def test_text_field_db_collation(self):
|
|
out = StringIO()
|
|
call_command("inspectdb", "inspectdb_textfielddbcollation", stdout=out)
|
|
output = out.getvalue()
|
|
if not connection.features.interprets_empty_strings_as_nulls:
|
|
self.assertIn(
|
|
"text_field = models.TextField(db_collation='%s')" % test_collation,
|
|
output,
|
|
)
|
|
else:
|
|
self.assertIn(
|
|
"text_field = models.TextField(db_collation='%s, blank=True, "
|
|
"null=True)" % test_collation,
|
|
output,
|
|
)
|
|
|
|
def test_number_field_types(self):
|
|
"""Test introspection of various Django field types"""
|
|
assertFieldType = self.make_field_type_asserter()
|
|
introspected_field_types = connection.features.introspected_field_types
|
|
|
|
auto_field_type = connection.features.introspected_field_types["AutoField"]
|
|
if auto_field_type != "AutoField":
|
|
assertFieldType(
|
|
"id", "models.%s(primary_key=True) # AutoField?" % auto_field_type
|
|
)
|
|
|
|
assertFieldType(
|
|
"big_int_field", "models.%s()" % introspected_field_types["BigIntegerField"]
|
|
)
|
|
|
|
bool_field_type = introspected_field_types["BooleanField"]
|
|
assertFieldType("bool_field", "models.{}()".format(bool_field_type))
|
|
assertFieldType(
|
|
"null_bool_field",
|
|
"models.{}(blank=True, null=True)".format(bool_field_type),
|
|
)
|
|
|
|
if connection.vendor != "sqlite":
|
|
assertFieldType(
|
|
"decimal_field", "models.DecimalField(max_digits=6, decimal_places=1)"
|
|
)
|
|
else: # Guessed arguments on SQLite, see #5014
|
|
assertFieldType(
|
|
"decimal_field",
|
|
"models.DecimalField(max_digits=10, decimal_places=5) "
|
|
"# max_digits and decimal_places have been guessed, "
|
|
"as this database handles decimal fields as float",
|
|
)
|
|
|
|
assertFieldType("float_field", "models.FloatField()")
|
|
assertFieldType(
|
|
"int_field", "models.%s()" % introspected_field_types["IntegerField"]
|
|
)
|
|
assertFieldType(
|
|
"pos_int_field",
|
|
"models.%s()" % introspected_field_types["PositiveIntegerField"],
|
|
)
|
|
assertFieldType(
|
|
"pos_big_int_field",
|
|
"models.%s()" % introspected_field_types["PositiveBigIntegerField"],
|
|
)
|
|
assertFieldType(
|
|
"pos_small_int_field",
|
|
"models.%s()" % introspected_field_types["PositiveSmallIntegerField"],
|
|
)
|
|
assertFieldType(
|
|
"small_int_field",
|
|
"models.%s()" % introspected_field_types["SmallIntegerField"],
|
|
)
|
|
|
|
@skipUnlessDBFeature("can_introspect_foreign_keys")
|
|
def test_attribute_name_not_python_keyword(self):
|
|
out = StringIO()
|
|
call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
|
|
output = out.getvalue()
|
|
error_message = (
|
|
"inspectdb generated an attribute name which is a Python keyword"
|
|
)
|
|
# Recursive foreign keys should be set to 'self'
|
|
self.assertIn("parent = models.ForeignKey('self', models.DO_NOTHING)", output)
|
|
self.assertNotIn(
|
|
"from = models.ForeignKey(InspectdbPeople, models.DO_NOTHING)",
|
|
output,
|
|
msg=error_message,
|
|
)
|
|
# As InspectdbPeople model is defined after InspectdbMessage, it should
|
|
# be quoted.
|
|
self.assertIn(
|
|
"from_field = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, "
|
|
"db_column='from_id')",
|
|
output,
|
|
)
|
|
self.assertIn(
|
|
"people_pk = models.OneToOneField(InspectdbPeople, models.DO_NOTHING, "
|
|
"primary_key=True)",
|
|
output,
|
|
)
|
|
self.assertIn(
|
|
"people_unique = models.OneToOneField(InspectdbPeople, models.DO_NOTHING)",
|
|
output,
|
|
)
|
|
|
|
@skipUnlessDBFeature("can_introspect_foreign_keys")
|
|
def test_foreign_key_to_field(self):
|
|
out = StringIO()
|
|
call_command("inspectdb", "inspectdb_foreignkeytofield", stdout=out)
|
|
self.assertIn(
|
|
"to_field_fk = models.ForeignKey('InspectdbPeoplemoredata', "
|
|
"models.DO_NOTHING, to_field='people_unique_id')",
|
|
out.getvalue(),
|
|
)
|
|
|
|
def test_digits_column_name_introspection(self):
|
|
"""Introspection of column names consist/start with digits (#16536/#17676)"""
|
|
char_field_type = connection.features.introspected_field_types["CharField"]
|
|
out = StringIO()
|
|
call_command("inspectdb", "inspectdb_digitsincolumnname", stdout=out)
|
|
output = out.getvalue()
|
|
error_message = "inspectdb generated a model field name which is a number"
|
|
self.assertNotIn(
|
|
" 123 = models.%s" % char_field_type, output, msg=error_message
|
|
)
|
|
self.assertIn("number_123 = models.%s" % char_field_type, output)
|
|
|
|
error_message = (
|
|
"inspectdb generated a model field name which starts with a digit"
|
|
)
|
|
self.assertNotIn(
|
|
" 4extra = models.%s" % char_field_type, output, msg=error_message
|
|
)
|
|
self.assertIn("number_4extra = models.%s" % char_field_type, output)
|
|
|
|
self.assertNotIn(
|
|
" 45extra = models.%s" % char_field_type, output, msg=error_message
|
|
)
|
|
self.assertIn("number_45extra = models.%s" % char_field_type, output)
|
|
|
|
def test_special_column_name_introspection(self):
|
|
"""
|
|
Introspection of column names containing special characters,
|
|
unsuitable for Python identifiers
|
|
"""
|
|
out = StringIO()
|
|
call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
|
|
output = out.getvalue()
|
|
base_name = connection.introspection.identifier_converter("Field")
|
|
integer_field_type = connection.features.introspected_field_types[
|
|
"IntegerField"
|
|
]
|
|
self.assertIn("field = models.%s()" % integer_field_type, output)
|
|
self.assertIn(
|
|
"field_field = models.%s(db_column='%s_')"
|
|
% (integer_field_type, base_name),
|
|
output,
|
|
)
|
|
self.assertIn(
|
|
"field_field_0 = models.%s(db_column='%s__')"
|
|
% (integer_field_type, base_name),
|
|
output,
|
|
)
|
|
self.assertIn(
|
|
"field_field_1 = models.%s(db_column='__field')" % integer_field_type,
|
|
output,
|
|
)
|
|
self.assertIn(
|
|
"prc_x = models.{}(db_column='prc(%) x')".format(integer_field_type), output
|
|
)
|
|
self.assertIn("tamaño = models.%s()" % integer_field_type, output)
|
|
|
|
def test_table_name_introspection(self):
|
|
"""
|
|
Introspection of table names containing special characters,
|
|
unsuitable for Python identifiers
|
|
"""
|
|
out = StringIO()
|
|
call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
|
|
output = out.getvalue()
|
|
self.assertIn("class InspectdbSpecialTableName(models.Model):", output)
|
|
|
|
def test_managed_models(self):
|
|
"""
|
|
By default the command generates models with `Meta.managed = False`.
|
|
"""
|
|
out = StringIO()
|
|
call_command("inspectdb", "inspectdb_columntypes", stdout=out)
|
|
output = out.getvalue()
|
|
self.longMessage = False
|
|
self.assertIn(
|
|
" managed = False",
|
|
output,
|
|
msg="inspectdb should generate unmanaged models.",
|
|
)
|
|
|
|
def test_unique_together_meta(self):
|
|
out = StringIO()
|
|
call_command("inspectdb", "inspectdb_uniquetogether", stdout=out)
|
|
output = out.getvalue()
|
|
self.assertIn(" unique_together = (('", output)
|
|
unique_together_match = self.unique_re.findall(output)
|
|
# There should be one unique_together tuple.
|
|
self.assertEqual(len(unique_together_match), 1)
|
|
fields = unique_together_match[0]
|
|
# Fields with db_column = field name.
|
|
self.assertIn("('field1', 'field2')", fields)
|
|
# Fields from columns whose names are Python keywords.
|
|
self.assertIn("('field1', 'field2')", fields)
|
|
# Fields whose names normalize to the same Python field name and hence
|
|
# are given an integer suffix.
|
|
self.assertIn("('non_unique_column', 'non_unique_column_0')", fields)
|
|
|
|
@skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
|
|
def test_unsupported_unique_together(self):
|
|
"""Unsupported index types (COALESCE here) are skipped."""
|
|
with connection.cursor() as c:
|
|
c.execute(
|
|
"CREATE UNIQUE INDEX Findex ON %s "
|
|
"(id, people_unique_id, COALESCE(message_id, -1))"
|
|
% PeopleMoreData._meta.db_table
|
|
)
|
|
try:
|
|
out = StringIO()
|
|
call_command(
|
|
"inspectdb",
|
|
table_name_filter=lambda tn: tn.startswith(
|
|
PeopleMoreData._meta.db_table
|
|
),
|
|
stdout=out,
|
|
)
|
|
output = out.getvalue()
|
|
self.assertIn("# A unique constraint could not be introspected.", output)
|
|
self.assertEqual(
|
|
self.unique_re.findall(output), ["('id', 'people_unique')"]
|
|
)
|
|
finally:
|
|
with connection.cursor() as c:
|
|
c.execute("DROP INDEX Findex")
|
|
|
|
@skipUnless(
|
|
connection.vendor == "sqlite",
|
|
"Only patched sqlite's DatabaseIntrospection.data_types_reverse for this test",
|
|
)
|
|
def test_custom_fields(self):
|
|
"""
|
|
Introspection of columns with a custom field (#21090)
|
|
"""
|
|
out = StringIO()
|
|
with mock.patch(
|
|
"django.db.connection.introspection.data_types_reverse."
|
|
"base_data_types_reverse",
|
|
{
|
|
"text": "myfields.TextField",
|
|
"bigint": "BigIntegerField",
|
|
},
|
|
):
|
|
call_command("inspectdb", "inspectdb_columntypes", stdout=out)
|
|
output = out.getvalue()
|
|
self.assertIn("text_field = myfields.TextField()", output)
|
|
self.assertIn("big_int_field = models.BigIntegerField()", output)
|
|
|
|
def test_introspection_errors(self):
|
|
"""
|
|
Introspection errors should not crash the command, and the error should
|
|
be visible in the output.
|
|
"""
|
|
out = StringIO()
|
|
with mock.patch(
|
|
"django.db.connection.introspection.get_table_list",
|
|
return_value=[TableInfo(name="nonexistent", type="t")],
|
|
):
|
|
call_command("inspectdb", stdout=out)
|
|
output = out.getvalue()
|
|
self.assertIn("# Unable to inspect table 'nonexistent'", output)
|
|
# The error message depends on the backend
|
|
self.assertIn("# The error was:", output)
|
|
|
|
|
|
class InspectDBTransactionalTests(TransactionTestCase):
|
|
available_apps = ["inspectdb"]
|
|
|
|
def test_include_views(self):
|
|
"""inspectdb --include-views creates models for database views."""
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"CREATE VIEW inspectdb_people_view AS "
|
|
"SELECT id, name FROM inspectdb_people"
|
|
)
|
|
out = StringIO()
|
|
view_model = "class InspectdbPeopleView(models.Model):"
|
|
view_managed = "managed = False # Created from a view."
|
|
try:
|
|
call_command(
|
|
"inspectdb",
|
|
table_name_filter=inspectdb_views_only,
|
|
stdout=out,
|
|
)
|
|
no_views_output = out.getvalue()
|
|
self.assertNotIn(view_model, no_views_output)
|
|
self.assertNotIn(view_managed, no_views_output)
|
|
call_command(
|
|
"inspectdb",
|
|
table_name_filter=inspectdb_views_only,
|
|
include_views=True,
|
|
stdout=out,
|
|
)
|
|
with_views_output = out.getvalue()
|
|
self.assertIn(view_model, with_views_output)
|
|
self.assertIn(view_managed, with_views_output)
|
|
finally:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("DROP VIEW inspectdb_people_view")
|
|
|
|
@skipUnlessDBFeature("can_introspect_materialized_views")
|
|
def test_include_materialized_views(self):
|
|
"""inspectdb --include-views creates models for materialized views."""
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"CREATE MATERIALIZED VIEW inspectdb_people_materialized AS "
|
|
"SELECT id, name FROM inspectdb_people"
|
|
)
|
|
out = StringIO()
|
|
view_model = "class InspectdbPeopleMaterialized(models.Model):"
|
|
view_managed = "managed = False # Created from a view."
|
|
try:
|
|
call_command(
|
|
"inspectdb",
|
|
table_name_filter=inspectdb_views_only,
|
|
stdout=out,
|
|
)
|
|
no_views_output = out.getvalue()
|
|
self.assertNotIn(view_model, no_views_output)
|
|
self.assertNotIn(view_managed, no_views_output)
|
|
call_command(
|
|
"inspectdb",
|
|
table_name_filter=inspectdb_views_only,
|
|
include_views=True,
|
|
stdout=out,
|
|
)
|
|
with_views_output = out.getvalue()
|
|
self.assertIn(view_model, with_views_output)
|
|
self.assertIn(view_managed, with_views_output)
|
|
finally:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("DROP MATERIALIZED VIEW inspectdb_people_materialized")
|
|
|
|
@skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
|
|
def test_include_partitions(self):
|
|
"""inspectdb --include-partitions creates models for partitions."""
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"""\
|
|
CREATE TABLE inspectdb_partition_parent (name text not null)
|
|
PARTITION BY LIST (left(upper(name), 1))
|
|
"""
|
|
)
|
|
cursor.execute(
|
|
"""\
|
|
CREATE TABLE inspectdb_partition_child
|
|
PARTITION OF inspectdb_partition_parent
|
|
FOR VALUES IN ('A', 'B', 'C')
|
|
"""
|
|
)
|
|
out = StringIO()
|
|
partition_model_parent = "class InspectdbPartitionParent(models.Model):"
|
|
partition_model_child = "class InspectdbPartitionChild(models.Model):"
|
|
partition_managed = "managed = False # Created from a partition."
|
|
try:
|
|
call_command(
|
|
"inspectdb", table_name_filter=inspectdb_tables_only, stdout=out
|
|
)
|
|
no_partitions_output = out.getvalue()
|
|
self.assertIn(partition_model_parent, no_partitions_output)
|
|
self.assertNotIn(partition_model_child, no_partitions_output)
|
|
self.assertNotIn(partition_managed, no_partitions_output)
|
|
call_command(
|
|
"inspectdb",
|
|
table_name_filter=inspectdb_tables_only,
|
|
include_partitions=True,
|
|
stdout=out,
|
|
)
|
|
with_partitions_output = out.getvalue()
|
|
self.assertIn(partition_model_parent, with_partitions_output)
|
|
self.assertIn(partition_model_child, with_partitions_output)
|
|
self.assertIn(partition_managed, with_partitions_output)
|
|
finally:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_child")
|
|
cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_parent")
|
|
|
|
@skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
|
|
def test_foreign_data_wrapper(self):
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("CREATE EXTENSION IF NOT EXISTS file_fdw")
|
|
cursor.execute(
|
|
"CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw"
|
|
)
|
|
cursor.execute(
|
|
"""\
|
|
CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
|
|
petal_length real,
|
|
petal_width real,
|
|
sepal_length real,
|
|
sepal_width real
|
|
) SERVER inspectdb_server OPTIONS (
|
|
filename %s
|
|
)
|
|
""",
|
|
[os.devnull],
|
|
)
|
|
out = StringIO()
|
|
foreign_table_model = "class InspectdbIrisForeignTable(models.Model):"
|
|
foreign_table_managed = "managed = False"
|
|
try:
|
|
call_command(
|
|
"inspectdb",
|
|
table_name_filter=inspectdb_tables_only,
|
|
stdout=out,
|
|
)
|
|
output = out.getvalue()
|
|
self.assertIn(foreign_table_model, output)
|
|
self.assertIn(foreign_table_managed, output)
|
|
finally:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(
|
|
"DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table"
|
|
)
|
|
cursor.execute("DROP SERVER IF EXISTS inspectdb_server")
|
|
cursor.execute("DROP EXTENSION IF EXISTS file_fdw")
|