Fixed #34177 -- Fixed QuerySet.bulk_create() crash on "pk" in unique_fields.

Bug in 0f6946495a.
This commit is contained in:
Mariusz Felisiak 2022-11-22 14:26:23 +01:00 committed by GitHub
parent 744a1af7f9
commit 7d5329852f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 5 deletions

View File

@ -732,11 +732,8 @@ class QuerySet(AltersData):
"update_fields."
)
if unique_fields:
# Primary key is allowed in unique_fields.
unique_fields = [
self.model._meta.get_field(name)
for name in unique_fields
if name != "pk"
self.model._meta.get_field(name) for name in unique_fields
]
if any(not f.concrete or f.many_to_many for f in unique_fields):
raise ValueError(
@ -785,6 +782,12 @@ class QuerySet(AltersData):
raise ValueError("Can't bulk create a multi-table inherited model")
if not objs:
return objs
opts = self.model._meta
if unique_fields:
# Primary key is allowed in unique_fields.
unique_fields = [
opts.pk.name if name == "pk" else name for name in unique_fields
]
on_conflict = self._check_bulk_create_options(
ignore_conflicts,
update_conflicts,
@ -792,7 +795,6 @@ class QuerySet(AltersData):
unique_fields,
)
self._for_write = True
opts = self.model._meta
fields = opts.concrete_fields
objs = list(objs)
self._prepare_for_bulk_create(objs)

View File

@ -20,3 +20,6 @@ Bugfixes
* Fixed a bug in Django 4.1 that caused a crash of ``acreate()``,
``aget_or_create()``, and ``aupdate_or_create()`` asynchronous methods for
related managers (:ticket:`34139`).
* Fixed a bug in Django 4.1 that caused a crash of ``QuerySet.bulk_create()``
with ``"pk"`` in ``unique_fields`` (:ticket:`34177`).

View File

@ -595,6 +595,39 @@ class BulkCreateTests(TestCase):
def test_update_conflicts_two_fields_unique_fields_second(self):
self._test_update_conflicts_two_fields(["f2"])
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)
def test_update_conflicts_unique_fields_pk(self):
TwoFields.objects.bulk_create(
[
TwoFields(f1=1, f2=1, name="a"),
TwoFields(f1=2, f2=2, name="b"),
]
)
self.assertEqual(TwoFields.objects.count(), 2)
obj1 = TwoFields.objects.get(f1=1)
obj2 = TwoFields.objects.get(f1=2)
conflicting_objects = [
TwoFields(pk=obj1.pk, f1=3, f2=3, name="c"),
TwoFields(pk=obj2.pk, f1=4, f2=4, name="d"),
]
TwoFields.objects.bulk_create(
conflicting_objects,
update_conflicts=True,
unique_fields=["pk"],
update_fields=["name"],
)
self.assertEqual(TwoFields.objects.count(), 2)
self.assertCountEqual(
TwoFields.objects.values("f1", "f2", "name"),
[
{"f1": 1, "f2": 1, "name": "c"},
{"f1": 2, "f2": 2, "name": "d"},
],
)
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)