Refs #32682 -- Fixed QuerySet.delete() crash on querysets with self-referential subqueries on MySQL.

This commit is contained in:
Mariusz Felisiak 2021-04-27 09:53:27 +02:00
parent 158eca4f93
commit 4074f38e1d
3 changed files with 34 additions and 1 deletions

View File

@ -1452,6 +1452,24 @@ class SQLDeleteCompiler(SQLCompiler):
self.query.get_initial_alias()
return sum(self.query.alias_refcount[t] > 0 for t in self.query.alias_map) == 1
@classmethod
def _expr_refs_base_model(cls, expr, base_model):
if isinstance(expr, Query):
return expr.model == base_model
if not hasattr(expr, 'get_source_expressions'):
return False
return any(
cls._expr_refs_base_model(source_expr, base_model)
for source_expr in expr.get_source_expressions()
)
@cached_property
def contains_self_reference_subquery(self):
return any(
self._expr_refs_base_model(expr, self.query.model)
for expr in chain(self.query.annotations.values(), self.query.where.children)
)
def _as_sql(self, query):
result = [
'DELETE FROM %s' % self.quote_name_unless_alias(query.base_table)
@ -1466,7 +1484,7 @@ class SQLDeleteCompiler(SQLCompiler):
Create the SQL for this query. Return the SQL string and list of
parameters.
"""
if self.single_alias:
if self.single_alias and not self.contains_self_reference_subquery:
return self._as_sql(self.query)
innerq = self.query.clone()
innerq.__class__ = Query

View File

@ -24,6 +24,7 @@ class Person(models.Model):
class Book(models.Model):
pagecount = models.IntegerField()
owner = models.ForeignKey('Child', models.CASCADE, null=True)
class Toy(models.Model):

View File

@ -1,6 +1,7 @@
import datetime
from django.db import connection, models, transaction
from django.db.models import Exists, OuterRef
from django.test import (
SimpleTestCase, TestCase, TransactionTestCase, skipUnlessDBFeature,
)
@ -355,6 +356,19 @@ class DeleteTests(TestCase):
self.assertEqual(researcher2.primary_contact, contact2)
self.assertIsNone(researcher2.secondary_contact)
def test_self_reference_with_through_m2m_at_second_level(self):
toy = Toy.objects.create(name='Paints')
child = Child.objects.create(name='Juan')
Book.objects.create(pagecount=500, owner=child)
PlayedWith.objects.create(child=child, toy=toy, date=datetime.date.today())
Book.objects.filter(Exists(
Book.objects.filter(
pk=OuterRef('pk'),
owner__toys=toy.pk,
),
)).delete()
self.assertIs(Book.objects.exists(), False)
class DeleteDistinct(SimpleTestCase):
def test_disallowed_delete_distinct(self):