Fixed #28897 -- Fixed QuerySet.update() on querysets ordered by annotations.

This commit is contained in:
David Wobrock 2022-06-17 09:20:27 +02:00 committed by Mariusz Felisiak
parent f4680a112d
commit 3ef37a5245
3 changed files with 38 additions and 8 deletions

View File

@ -113,6 +113,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
"related fields.": { "related fields.": {
"update.tests.AdvancedTests." "update.tests.AdvancedTests."
"test_update_ordered_by_inline_m2m_annotation", "test_update_ordered_by_inline_m2m_annotation",
"update.tests.AdvancedTests.test_update_ordered_by_m2m_annotation",
}, },
} }
if "ONLY_FULL_GROUP_BY" in self.connection.sql_mode: if "ONLY_FULL_GROUP_BY" in self.connection.sql_mode:

View File

@ -1169,6 +1169,20 @@ class QuerySet:
self._for_write = True self._for_write = True
query = self.query.chain(sql.UpdateQuery) query = self.query.chain(sql.UpdateQuery)
query.add_update_values(kwargs) query.add_update_values(kwargs)
# Inline annotations in order_by(), if possible.
new_order_by = []
for col in query.order_by:
if annotation := query.annotations.get(col):
if getattr(annotation, "contains_aggregate", False):
raise exceptions.FieldError(
f"Cannot update when ordering by an aggregate: {annotation}"
)
new_order_by.append(annotation)
else:
new_order_by.append(col)
query.order_by = tuple(new_order_by)
# Clear any annotations so that they won't be present in subqueries. # Clear any annotations so that they won't be present in subqueries.
query.annotations = {} query.annotations = {}
with transaction.mark_for_rollback_on_error(using=self.db): with transaction.mark_for_rollback_on_error(using=self.db):

View File

@ -225,6 +225,16 @@ class AdvancedTests(TestCase):
new_name=annotation, new_name=annotation,
).update(name=F("new_name")) ).update(name=F("new_name"))
def test_update_ordered_by_m2m_aggregation_annotation(self):
msg = (
"Cannot update when ordering by an aggregate: "
"Count(Col(update_bar_m2m_foo, update.Bar_m2m_foo.foo))"
)
with self.assertRaisesMessage(FieldError, msg):
Bar.objects.annotate(m2m_count=Count("m2m_foo")).order_by(
"m2m_count"
).update(x=2)
def test_update_ordered_by_inline_m2m_annotation(self): def test_update_ordered_by_inline_m2m_annotation(self):
foo = Foo.objects.create(target="test") foo = Foo.objects.create(target="test")
Bar.objects.create(foo=foo) Bar.objects.create(foo=foo)
@ -232,6 +242,13 @@ class AdvancedTests(TestCase):
Bar.objects.order_by(Abs("m2m_foo")).update(x=2) Bar.objects.order_by(Abs("m2m_foo")).update(x=2)
self.assertEqual(Bar.objects.get().x, 2) self.assertEqual(Bar.objects.get().x, 2)
def test_update_ordered_by_m2m_annotation(self):
foo = Foo.objects.create(target="test")
Bar.objects.create(foo=foo)
Bar.objects.annotate(abs_id=Abs("m2m_foo")).order_by("abs_id").update(x=3)
self.assertEqual(Bar.objects.get().x, 3)
@unittest.skipUnless( @unittest.skipUnless(
connection.vendor == "mysql", connection.vendor == "mysql",
@ -259,14 +276,12 @@ class MySQLUpdateOrderByTest(TestCase):
self.assertEqual(updated, 2) self.assertEqual(updated, 2)
def test_order_by_update_on_unique_constraint_annotation(self): def test_order_by_update_on_unique_constraint_annotation(self):
# Ordering by annotations is omitted because they cannot be resolved in updated = (
# .update(). UniqueNumber.objects.annotate(number_inverse=F("number").desc())
with self.assertRaises(IntegrityError): .order_by("number_inverse")
UniqueNumber.objects.annotate(number_inverse=F("number").desc(),).order_by( .update(number=F("number") + 1)
"number_inverse" )
).update( self.assertEqual(updated, 2)
number=F("number") + 1,
)
def test_order_by_update_on_parent_unique_constraint(self): def test_order_by_update_on_parent_unique_constraint(self):
# Ordering by inherited fields is omitted because joined fields cannot # Ordering by inherited fields is omitted because joined fields cannot