From 0fb104dda287431f5ab74532e45e8471e22b58c8 Mon Sep 17 00:00:00 2001 From: Simon Charette Date: Sun, 18 Feb 2024 22:55:54 -0500 Subject: [PATCH] Refs #35234 -- Moved constraint system checks to Check/UniqueConstraint methods. --- django/db/models/base.py | 209 +------------------------------- django/db/models/constraints.py | 180 ++++++++++++++++++++++++++- 2 files changed, 179 insertions(+), 210 deletions(-) diff --git a/django/db/models/base.py b/django/db/models/base.py index 9dda7cbff9b..ce1c7d1046d 100644 --- a/django/db/models/base.py +++ b/django/db/models/base.py @@ -28,9 +28,7 @@ from django.db import ( ) from django.db.models import NOT_PROVIDED, ExpressionWrapper, IntegerField, Max, Value from django.db.models.constants import LOOKUP_SEP -from django.db.models.constraints import CheckConstraint, UniqueConstraint from django.db.models.deletion import CASCADE, Collector -from django.db.models.expressions import RawSQL from django.db.models.fields.related import ( ForeignObjectRel, OneToOneField, @@ -2390,213 +2388,8 @@ class Model(AltersData, metaclass=ModelBase): if not router.allow_migrate_model(db, cls): continue connection = connections[db] - if not ( - connection.features.supports_table_check_constraints - or "supports_table_check_constraints" in cls._meta.required_db_features - ) and any( - isinstance(constraint, CheckConstraint) - for constraint in cls._meta.constraints - ): - errors.append( - checks.Warning( - "%s does not support check constraints." - % connection.display_name, - hint=( - "A constraint won't be created. Silence this " - "warning if you don't care about it." - ), - obj=cls, - id="models.W027", - ) - ) - if not ( - connection.features.supports_partial_indexes - or "supports_partial_indexes" in cls._meta.required_db_features - ) and any( - isinstance(constraint, UniqueConstraint) - and constraint.condition is not None - for constraint in cls._meta.constraints - ): - errors.append( - checks.Warning( - "%s does not support unique constraints with " - "conditions." % connection.display_name, - hint=( - "A constraint won't be created. Silence this " - "warning if you don't care about it." - ), - obj=cls, - id="models.W036", - ) - ) - if not ( - connection.features.supports_deferrable_unique_constraints - or "supports_deferrable_unique_constraints" - in cls._meta.required_db_features - ) and any( - isinstance(constraint, UniqueConstraint) - and constraint.deferrable is not None - for constraint in cls._meta.constraints - ): - errors.append( - checks.Warning( - "%s does not support deferrable unique constraints." - % connection.display_name, - hint=( - "A constraint won't be created. Silence this " - "warning if you don't care about it." - ), - obj=cls, - id="models.W038", - ) - ) - if not ( - connection.features.supports_covering_indexes - or "supports_covering_indexes" in cls._meta.required_db_features - ) and any( - isinstance(constraint, UniqueConstraint) and constraint.include - for constraint in cls._meta.constraints - ): - errors.append( - checks.Warning( - "%s does not support unique constraints with non-key " - "columns." % connection.display_name, - hint=( - "A constraint won't be created. Silence this " - "warning if you don't care about it." - ), - obj=cls, - id="models.W039", - ) - ) - if not ( - connection.features.supports_expression_indexes - or "supports_expression_indexes" in cls._meta.required_db_features - ) and any( - isinstance(constraint, UniqueConstraint) - and constraint.contains_expressions - for constraint in cls._meta.constraints - ): - errors.append( - checks.Warning( - "%s does not support unique constraints on " - "expressions." % connection.display_name, - hint=( - "A constraint won't be created. Silence this " - "warning if you don't care about it." - ), - obj=cls, - id="models.W044", - ) - ) - if not ( - connection.features.supports_nulls_distinct_unique_constraints - or ( - "supports_nulls_distinct_unique_constraints" - in cls._meta.required_db_features - ) - ) and any( - isinstance(constraint, UniqueConstraint) - and constraint.nulls_distinct is not None - for constraint in cls._meta.constraints - ): - errors.append( - checks.Warning( - "%s does not support unique constraints with " - "nulls distinct." % connection.display_name, - hint=( - "A constraint won't be created. Silence this " - "warning if you don't care about it." - ), - obj=cls, - id="models.W047", - ) - ) - fields = set( - chain.from_iterable( - (*constraint.fields, *constraint.include) - for constraint in cls._meta.constraints - if isinstance(constraint, UniqueConstraint) - ) - ) - references = set() for constraint in cls._meta.constraints: - if isinstance(constraint, UniqueConstraint): - if ( - connection.features.supports_partial_indexes - or "supports_partial_indexes" - not in cls._meta.required_db_features - ) and isinstance(constraint.condition, Q): - references.update( - cls._get_expr_references(constraint.condition) - ) - if ( - connection.features.supports_expression_indexes - or "supports_expression_indexes" - not in cls._meta.required_db_features - ) and constraint.contains_expressions: - for expression in constraint.expressions: - references.update(cls._get_expr_references(expression)) - elif isinstance(constraint, CheckConstraint): - if ( - connection.features.supports_table_check_constraints - or "supports_table_check_constraints" - not in cls._meta.required_db_features - ): - if isinstance(constraint.check, Q): - references.update( - cls._get_expr_references(constraint.check) - ) - if any( - isinstance(expr, RawSQL) - for expr in constraint.check.flatten() - ): - errors.append( - checks.Warning( - f"Check constraint {constraint.name!r} contains " - f"RawSQL() expression and won't be validated " - f"during the model full_clean().", - hint=( - "Silence this warning if you don't care about " - "it." - ), - obj=cls, - id="models.W045", - ), - ) - for field_name, *lookups in references: - # pk is an alias that won't be found by opts.get_field. - if field_name != "pk": - fields.add(field_name) - if not lookups: - # If it has no lookups it cannot result in a JOIN. - continue - try: - if field_name == "pk": - field = cls._meta.pk - else: - field = cls._meta.get_field(field_name) - if not field.is_relation or field.many_to_many or field.one_to_many: - continue - except FieldDoesNotExist: - continue - # JOIN must happen at the first lookup. - first_lookup = lookups[0] - if ( - hasattr(field, "get_transform") - and hasattr(field, "get_lookup") - and field.get_transform(first_lookup) is None - and field.get_lookup(first_lookup) is None - ): - errors.append( - checks.Error( - "'constraints' refers to the joined field '%s'." - % LOOKUP_SEP.join([field_name] + lookups), - obj=cls, - id="models.E041", - ) - ) - errors.extend(cls._check_local_fields(fields, "constraints")) + errors.extend(constraint._check(cls, connection)) return errors diff --git a/django/db/models/constraints.py b/django/db/models/constraints.py index 56d547e6b08..6c521700d2d 100644 --- a/django/db/models/constraints.py +++ b/django/db/models/constraints.py @@ -2,9 +2,11 @@ import warnings from enum import Enum from types import NoneType -from django.core.exceptions import FieldError, ValidationError +from django.core import checks +from django.core.exceptions import FieldDoesNotExist, FieldError, ValidationError from django.db import connections -from django.db.models.expressions import Exists, ExpressionList, F, OrderBy +from django.db.models.constants import LOOKUP_SEP +from django.db.models.expressions import Exists, ExpressionList, F, OrderBy, RawSQL from django.db.models.indexes import IndexExpression from django.db.models.lookups import Exact from django.db.models.query_utils import Q @@ -72,6 +74,47 @@ class BaseConstraint: def get_violation_error_message(self): return self.violation_error_message % {"name": self.name} + def _check(self, model, connection): + return [] + + def _check_references(self, model, references): + errors = [] + fields = set() + for field_name, *lookups in references: + # pk is an alias that won't be found by opts.get_field. + if field_name != "pk": + fields.add(field_name) + if not lookups: + # If it has no lookups it cannot result in a JOIN. + continue + try: + if field_name == "pk": + field = model._meta.pk + else: + field = model._meta.get_field(field_name) + if not field.is_relation or field.many_to_many or field.one_to_many: + continue + except FieldDoesNotExist: + continue + # JOIN must happen at the first lookup. + first_lookup = lookups[0] + if ( + hasattr(field, "get_transform") + and hasattr(field, "get_lookup") + and field.get_transform(first_lookup) is None + and field.get_lookup(first_lookup) is None + ): + errors.append( + checks.Error( + "'constraints' refers to the joined field '%s'." + % LOOKUP_SEP.join([field_name] + lookups), + obj=model, + id="models.E041", + ) + ) + errors.extend(model._check_local_fields(fields, "constraints")) + return errors + def deconstruct(self): path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__) path = path.replace("django.db.models.constraints", "django.db.models") @@ -105,6 +148,41 @@ class CheckConstraint(BaseConstraint): violation_error_message=violation_error_message, ) + def _check(self, model, connection): + errors = [] + if not ( + connection.features.supports_table_check_constraints + or "supports_table_check_constraints" in model._meta.required_db_features + ): + errors.append( + checks.Warning( + f"{connection.display_name} does not support check constraints.", + hint=( + "A constraint won't be created. Silence this warning if you " + "don't care about it." + ), + obj=model, + id="models.W027", + ) + ) + else: + references = set() + check = self.check + if isinstance(check, Q): + references.update(model._get_expr_references(check)) + if any(isinstance(expr, RawSQL) for expr in check.flatten()): + errors.append( + checks.Warning( + f"Check constraint {self.name!r} contains RawSQL() expression " + "and won't be validated during the model full_clean().", + hint="Silence this warning if you don't care about it.", + obj=model, + id="models.W045", + ), + ) + errors.extend(self._check_references(model, references)) + return errors + def _get_check_sql(self, model, schema_editor): query = Query(model=model, alias_cols=False) where = query.build_where(self.check) @@ -251,6 +329,104 @@ class UniqueConstraint(BaseConstraint): def contains_expressions(self): return bool(self.expressions) + def _check(self, model, connection): + errors = model._check_local_fields({*self.fields, *self.include}, "constraints") + required_db_features = model._meta.required_db_features + if self.condition is not None and not ( + connection.features.supports_partial_indexes + or "supports_partial_indexes" in required_db_features + ): + errors.append( + checks.Warning( + f"{connection.display_name} does not support unique constraints " + "with conditions.", + hint=( + "A constraint won't be created. Silence this warning if you " + "don't care about it." + ), + obj=model, + id="models.W036", + ) + ) + if self.deferrable is not None and not ( + connection.features.supports_deferrable_unique_constraints + or "supports_deferrable_unique_constraints" in required_db_features + ): + errors.append( + checks.Warning( + f"{connection.display_name} does not support deferrable unique " + "constraints.", + hint=( + "A constraint won't be created. Silence this warning if you " + "don't care about it." + ), + obj=model, + id="models.W038", + ) + ) + if self.include and not ( + connection.features.supports_covering_indexes + or "supports_covering_indexes" in required_db_features + ): + errors.append( + checks.Warning( + f"{connection.display_name} does not support unique constraints " + "with non-key columns.", + hint=( + "A constraint won't be created. Silence this warning if you " + "don't care about it." + ), + obj=model, + id="models.W039", + ) + ) + if self.contains_expressions and not ( + connection.features.supports_expression_indexes + or "supports_expression_indexes" in required_db_features + ): + errors.append( + checks.Warning( + f"{connection.display_name} does not support unique constraints on " + "expressions.", + hint=( + "A constraint won't be created. Silence this warning if you " + "don't care about it." + ), + obj=model, + id="models.W044", + ) + ) + if self.nulls_distinct is not None and not ( + connection.features.supports_nulls_distinct_unique_constraints + or "supports_nulls_distinct_unique_constraints" in required_db_features + ): + errors.append( + checks.Warning( + f"{connection.display_name} does not support unique constraints " + "with nulls distinct.", + hint=( + "A constraint won't be created. Silence this warning if you " + "don't care about it." + ), + obj=model, + id="models.W047", + ) + ) + references = set() + if ( + connection.features.supports_partial_indexes + or "supports_partial_indexes" not in required_db_features + ) and isinstance(self.condition, Q): + references.update(model._get_expr_references(self.condition)) + if self.contains_expressions and ( + connection.features.supports_expression_indexes + or "supports_expression_indexes" not in required_db_features + ): + for expression in self.expressions: + references.update(model._get_expr_references(expression)) + errors.extend(self._check_references(model, references)) + return errors + def _get_condition_sql(self, model, schema_editor): if self.condition is None: return None