Fixed #34171 -- Fixed QuerySet.bulk_create() on fields with db_column in unique_fields/update_fields.

Bug in 0f6946495a.

Thanks Joshua Brooks for the report.
This commit is contained in:
DevilsAutumn 2022-11-22 15:04:55 +05:30 committed by Mariusz Felisiak
parent 7d5329852f
commit 4035bab56f
5 changed files with 46 additions and 7 deletions

View File

@ -720,7 +720,6 @@ class QuerySet(AltersData):
"Unique fields that can trigger the upsert must be provided." "Unique fields that can trigger the upsert must be provided."
) )
# Updating primary keys and non-concrete fields is forbidden. # Updating primary keys and non-concrete fields is forbidden.
update_fields = [self.model._meta.get_field(name) for name in update_fields]
if any(not f.concrete or f.many_to_many for f in update_fields): if any(not f.concrete or f.many_to_many for f in update_fields):
raise ValueError( raise ValueError(
"bulk_create() can only be used with concrete fields in " "bulk_create() can only be used with concrete fields in "
@ -732,9 +731,6 @@ class QuerySet(AltersData):
"update_fields." "update_fields."
) )
if unique_fields: if unique_fields:
unique_fields = [
self.model._meta.get_field(name) for name in unique_fields
]
if any(not f.concrete or f.many_to_many for f in unique_fields): if any(not f.concrete or f.many_to_many for f in unique_fields):
raise ValueError( raise ValueError(
"bulk_create() can only be used with concrete fields " "bulk_create() can only be used with concrete fields "
@ -786,8 +782,11 @@ class QuerySet(AltersData):
if unique_fields: if unique_fields:
# Primary key is allowed in unique_fields. # Primary key is allowed in unique_fields.
unique_fields = [ unique_fields = [
opts.pk.name if name == "pk" else name for name in unique_fields self.model._meta.get_field(opts.pk.name if name == "pk" else name)
for name in unique_fields
] ]
if update_fields:
update_fields = [self.model._meta.get_field(name) for name in update_fields]
on_conflict = self._check_bulk_create_options( on_conflict = self._check_bulk_create_options(
ignore_conflicts, ignore_conflicts,
update_conflicts, update_conflicts,

View File

@ -1725,8 +1725,8 @@ class SQLInsertCompiler(SQLCompiler):
on_conflict_suffix_sql = self.connection.ops.on_conflict_suffix_sql( on_conflict_suffix_sql = self.connection.ops.on_conflict_suffix_sql(
fields, fields,
self.query.on_conflict, self.query.on_conflict,
self.query.update_fields, (f.column for f in self.query.update_fields),
self.query.unique_fields, (f.column for f in self.query.unique_fields),
) )
if ( if (
self.returning_fields self.returning_fields

View File

@ -23,3 +23,6 @@ Bugfixes
* Fixed a bug in Django 4.1 that caused a crash of ``QuerySet.bulk_create()`` * Fixed a bug in Django 4.1 that caused a crash of ``QuerySet.bulk_create()``
with ``"pk"`` in ``unique_fields`` (:ticket:`34177`). with ``"pk"`` in ``unique_fields`` (:ticket:`34177`).
* Fixed a bug in Django 4.1 that caused a crash of ``QuerySet.bulk_create()``
on fields with ``db_column`` (:ticket:`34171`).

View File

@ -69,6 +69,11 @@ class TwoFields(models.Model):
name = models.CharField(max_length=15, null=True) name = models.CharField(max_length=15, null=True)
class FieldsWithDbColumns(models.Model):
rank = models.IntegerField(unique=True, db_column="rAnK")
name = models.CharField(max_length=15, null=True, db_column="oTheRNaMe")
class UpsertConflict(models.Model): class UpsertConflict(models.Model):
number = models.IntegerField(unique=True) number = models.IntegerField(unique=True)
rank = models.IntegerField() rank = models.IntegerField()

View File

@ -21,6 +21,7 @@ from django.test import (
from .models import ( from .models import (
BigAutoFieldModel, BigAutoFieldModel,
Country, Country,
FieldsWithDbColumns,
NoFields, NoFields,
NullableFields, NullableFields,
Pizzeria, Pizzeria,
@ -772,3 +773,34 @@ class BulkCreateTests(TestCase):
@skipIfDBFeature("supports_update_conflicts_with_target") @skipIfDBFeature("supports_update_conflicts_with_target")
def test_update_conflicts_no_unique_fields(self): def test_update_conflicts_no_unique_fields(self):
self._test_update_conflicts([]) self._test_update_conflicts([])
@skipUnlessDBFeature(
"supports_update_conflicts", "supports_update_conflicts_with_target"
)
def test_update_conflicts_unique_fields_update_fields_db_column(self):
FieldsWithDbColumns.objects.bulk_create(
[
FieldsWithDbColumns(rank=1, name="a"),
FieldsWithDbColumns(rank=2, name="b"),
]
)
self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
conflicting_objects = [
FieldsWithDbColumns(rank=1, name="c"),
FieldsWithDbColumns(rank=2, name="d"),
]
FieldsWithDbColumns.objects.bulk_create(
conflicting_objects,
update_conflicts=True,
unique_fields=["rank"],
update_fields=["name"],
)
self.assertEqual(FieldsWithDbColumns.objects.count(), 2)
self.assertCountEqual(
FieldsWithDbColumns.objects.values("rank", "name"),
[
{"rank": 1, "name": "c"},
{"rank": 2, "name": "d"},
],
)