Refs #35234 -- Moved constraint system checks to Check/UniqueConstraint methods.

This commit is contained in:
Simon Charette 2024-02-18 22:55:54 -05:00 committed by Mariusz Felisiak
parent a738281265
commit 0fb104dda2
2 changed files with 179 additions and 210 deletions

View File

@ -28,9 +28,7 @@ from django.db import (
)
from django.db.models import NOT_PROVIDED, ExpressionWrapper, IntegerField, Max, Value
from django.db.models.constants import LOOKUP_SEP
from django.db.models.constraints import CheckConstraint, UniqueConstraint
from django.db.models.deletion import CASCADE, Collector
from django.db.models.expressions import RawSQL
from django.db.models.fields.related import (
ForeignObjectRel,
OneToOneField,
@ -2390,213 +2388,8 @@ class Model(AltersData, metaclass=ModelBase):
if not router.allow_migrate_model(db, cls):
continue
connection = connections[db]
if not (
connection.features.supports_table_check_constraints
or "supports_table_check_constraints" in cls._meta.required_db_features
) and any(
isinstance(constraint, CheckConstraint)
for constraint in cls._meta.constraints
):
errors.append(
checks.Warning(
"%s does not support check constraints."
% connection.display_name,
hint=(
"A constraint won't be created. Silence this "
"warning if you don't care about it."
),
obj=cls,
id="models.W027",
)
)
if not (
connection.features.supports_partial_indexes
or "supports_partial_indexes" in cls._meta.required_db_features
) and any(
isinstance(constraint, UniqueConstraint)
and constraint.condition is not None
for constraint in cls._meta.constraints
):
errors.append(
checks.Warning(
"%s does not support unique constraints with "
"conditions." % connection.display_name,
hint=(
"A constraint won't be created. Silence this "
"warning if you don't care about it."
),
obj=cls,
id="models.W036",
)
)
if not (
connection.features.supports_deferrable_unique_constraints
or "supports_deferrable_unique_constraints"
in cls._meta.required_db_features
) and any(
isinstance(constraint, UniqueConstraint)
and constraint.deferrable is not None
for constraint in cls._meta.constraints
):
errors.append(
checks.Warning(
"%s does not support deferrable unique constraints."
% connection.display_name,
hint=(
"A constraint won't be created. Silence this "
"warning if you don't care about it."
),
obj=cls,
id="models.W038",
)
)
if not (
connection.features.supports_covering_indexes
or "supports_covering_indexes" in cls._meta.required_db_features
) and any(
isinstance(constraint, UniqueConstraint) and constraint.include
for constraint in cls._meta.constraints
):
errors.append(
checks.Warning(
"%s does not support unique constraints with non-key "
"columns." % connection.display_name,
hint=(
"A constraint won't be created. Silence this "
"warning if you don't care about it."
),
obj=cls,
id="models.W039",
)
)
if not (
connection.features.supports_expression_indexes
or "supports_expression_indexes" in cls._meta.required_db_features
) and any(
isinstance(constraint, UniqueConstraint)
and constraint.contains_expressions
for constraint in cls._meta.constraints
):
errors.append(
checks.Warning(
"%s does not support unique constraints on "
"expressions." % connection.display_name,
hint=(
"A constraint won't be created. Silence this "
"warning if you don't care about it."
),
obj=cls,
id="models.W044",
)
)
if not (
connection.features.supports_nulls_distinct_unique_constraints
or (
"supports_nulls_distinct_unique_constraints"
in cls._meta.required_db_features
)
) and any(
isinstance(constraint, UniqueConstraint)
and constraint.nulls_distinct is not None
for constraint in cls._meta.constraints
):
errors.append(
checks.Warning(
"%s does not support unique constraints with "
"nulls distinct." % connection.display_name,
hint=(
"A constraint won't be created. Silence this "
"warning if you don't care about it."
),
obj=cls,
id="models.W047",
)
)
fields = set(
chain.from_iterable(
(*constraint.fields, *constraint.include)
for constraint in cls._meta.constraints
if isinstance(constraint, UniqueConstraint)
)
)
references = set()
for constraint in cls._meta.constraints:
if isinstance(constraint, UniqueConstraint):
if (
connection.features.supports_partial_indexes
or "supports_partial_indexes"
not in cls._meta.required_db_features
) and isinstance(constraint.condition, Q):
references.update(
cls._get_expr_references(constraint.condition)
)
if (
connection.features.supports_expression_indexes
or "supports_expression_indexes"
not in cls._meta.required_db_features
) and constraint.contains_expressions:
for expression in constraint.expressions:
references.update(cls._get_expr_references(expression))
elif isinstance(constraint, CheckConstraint):
if (
connection.features.supports_table_check_constraints
or "supports_table_check_constraints"
not in cls._meta.required_db_features
):
if isinstance(constraint.check, Q):
references.update(
cls._get_expr_references(constraint.check)
)
if any(
isinstance(expr, RawSQL)
for expr in constraint.check.flatten()
):
errors.append(
checks.Warning(
f"Check constraint {constraint.name!r} contains "
f"RawSQL() expression and won't be validated "
f"during the model full_clean().",
hint=(
"Silence this warning if you don't care about "
"it."
),
obj=cls,
id="models.W045",
),
)
for field_name, *lookups in references:
# pk is an alias that won't be found by opts.get_field.
if field_name != "pk":
fields.add(field_name)
if not lookups:
# If it has no lookups it cannot result in a JOIN.
continue
try:
if field_name == "pk":
field = cls._meta.pk
else:
field = cls._meta.get_field(field_name)
if not field.is_relation or field.many_to_many or field.one_to_many:
continue
except FieldDoesNotExist:
continue
# JOIN must happen at the first lookup.
first_lookup = lookups[0]
if (
hasattr(field, "get_transform")
and hasattr(field, "get_lookup")
and field.get_transform(first_lookup) is None
and field.get_lookup(first_lookup) is None
):
errors.append(
checks.Error(
"'constraints' refers to the joined field '%s'."
% LOOKUP_SEP.join([field_name] + lookups),
obj=cls,
id="models.E041",
)
)
errors.extend(cls._check_local_fields(fields, "constraints"))
errors.extend(constraint._check(cls, connection))
return errors

View File

@ -2,9 +2,11 @@ import warnings
from enum import Enum
from types import NoneType
from django.core.exceptions import FieldError, ValidationError
from django.core import checks
from django.core.exceptions import FieldDoesNotExist, FieldError, ValidationError
from django.db import connections
from django.db.models.expressions import Exists, ExpressionList, F, OrderBy
from django.db.models.constants import LOOKUP_SEP
from django.db.models.expressions import Exists, ExpressionList, F, OrderBy, RawSQL
from django.db.models.indexes import IndexExpression
from django.db.models.lookups import Exact
from django.db.models.query_utils import Q
@ -72,6 +74,47 @@ class BaseConstraint:
def get_violation_error_message(self):
return self.violation_error_message % {"name": self.name}
def _check(self, model, connection):
return []
def _check_references(self, model, references):
errors = []
fields = set()
for field_name, *lookups in references:
# pk is an alias that won't be found by opts.get_field.
if field_name != "pk":
fields.add(field_name)
if not lookups:
# If it has no lookups it cannot result in a JOIN.
continue
try:
if field_name == "pk":
field = model._meta.pk
else:
field = model._meta.get_field(field_name)
if not field.is_relation or field.many_to_many or field.one_to_many:
continue
except FieldDoesNotExist:
continue
# JOIN must happen at the first lookup.
first_lookup = lookups[0]
if (
hasattr(field, "get_transform")
and hasattr(field, "get_lookup")
and field.get_transform(first_lookup) is None
and field.get_lookup(first_lookup) is None
):
errors.append(
checks.Error(
"'constraints' refers to the joined field '%s'."
% LOOKUP_SEP.join([field_name] + lookups),
obj=model,
id="models.E041",
)
)
errors.extend(model._check_local_fields(fields, "constraints"))
return errors
def deconstruct(self):
path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
path = path.replace("django.db.models.constraints", "django.db.models")
@ -105,6 +148,41 @@ class CheckConstraint(BaseConstraint):
violation_error_message=violation_error_message,
)
def _check(self, model, connection):
errors = []
if not (
connection.features.supports_table_check_constraints
or "supports_table_check_constraints" in model._meta.required_db_features
):
errors.append(
checks.Warning(
f"{connection.display_name} does not support check constraints.",
hint=(
"A constraint won't be created. Silence this warning if you "
"don't care about it."
),
obj=model,
id="models.W027",
)
)
else:
references = set()
check = self.check
if isinstance(check, Q):
references.update(model._get_expr_references(check))
if any(isinstance(expr, RawSQL) for expr in check.flatten()):
errors.append(
checks.Warning(
f"Check constraint {self.name!r} contains RawSQL() expression "
"and won't be validated during the model full_clean().",
hint="Silence this warning if you don't care about it.",
obj=model,
id="models.W045",
),
)
errors.extend(self._check_references(model, references))
return errors
def _get_check_sql(self, model, schema_editor):
query = Query(model=model, alias_cols=False)
where = query.build_where(self.check)
@ -251,6 +329,104 @@ class UniqueConstraint(BaseConstraint):
def contains_expressions(self):
return bool(self.expressions)
def _check(self, model, connection):
errors = model._check_local_fields({*self.fields, *self.include}, "constraints")
required_db_features = model._meta.required_db_features
if self.condition is not None and not (
connection.features.supports_partial_indexes
or "supports_partial_indexes" in required_db_features
):
errors.append(
checks.Warning(
f"{connection.display_name} does not support unique constraints "
"with conditions.",
hint=(
"A constraint won't be created. Silence this warning if you "
"don't care about it."
),
obj=model,
id="models.W036",
)
)
if self.deferrable is not None and not (
connection.features.supports_deferrable_unique_constraints
or "supports_deferrable_unique_constraints" in required_db_features
):
errors.append(
checks.Warning(
f"{connection.display_name} does not support deferrable unique "
"constraints.",
hint=(
"A constraint won't be created. Silence this warning if you "
"don't care about it."
),
obj=model,
id="models.W038",
)
)
if self.include and not (
connection.features.supports_covering_indexes
or "supports_covering_indexes" in required_db_features
):
errors.append(
checks.Warning(
f"{connection.display_name} does not support unique constraints "
"with non-key columns.",
hint=(
"A constraint won't be created. Silence this warning if you "
"don't care about it."
),
obj=model,
id="models.W039",
)
)
if self.contains_expressions and not (
connection.features.supports_expression_indexes
or "supports_expression_indexes" in required_db_features
):
errors.append(
checks.Warning(
f"{connection.display_name} does not support unique constraints on "
"expressions.",
hint=(
"A constraint won't be created. Silence this warning if you "
"don't care about it."
),
obj=model,
id="models.W044",
)
)
if self.nulls_distinct is not None and not (
connection.features.supports_nulls_distinct_unique_constraints
or "supports_nulls_distinct_unique_constraints" in required_db_features
):
errors.append(
checks.Warning(
f"{connection.display_name} does not support unique constraints "
"with nulls distinct.",
hint=(
"A constraint won't be created. Silence this warning if you "
"don't care about it."
),
obj=model,
id="models.W047",
)
)
references = set()
if (
connection.features.supports_partial_indexes
or "supports_partial_indexes" not in required_db_features
) and isinstance(self.condition, Q):
references.update(model._get_expr_references(self.condition))
if self.contains_expressions and (
connection.features.supports_expression_indexes
or "supports_expression_indexes" not in required_db_features
):
for expression in self.expressions:
references.update(model._get_expr_references(expression))
errors.extend(self._check_references(model, references))
return errors
def _get_condition_sql(self, model, schema_editor):
if self.condition is None:
return None