from unittest import mock from django.contrib.postgres.indexes import ( BloomIndex, BrinIndex, BTreeIndex, GinIndex, GistIndex, HashIndex, OpClass, SpGistIndex, ) from django.db import NotSupportedError, connection from django.db.models import CharField, F, Index, Q from django.db.models.functions import Cast, Collate, Length, Lower from django.test import skipUnlessDBFeature from django.test.utils import modify_settings, register_lookup from . import PostgreSQLSimpleTestCase, PostgreSQLTestCase from .fields import SearchVector, SearchVectorField from .models import CharFieldModel, IntegerArrayModel, Scene, TextFieldModel class IndexTestMixin: def test_name_auto_generation(self): index = self.index_class(fields=['field']) index.set_name_with_model(CharFieldModel) self.assertRegex(index.name, r'postgres_te_field_[0-9a-f]{6}_%s' % self.index_class.suffix) def test_deconstruction_no_customization(self): index = self.index_class(fields=['title'], name='test_title_%s' % self.index_class.suffix) path, args, kwargs = index.deconstruct() self.assertEqual(path, 'django.contrib.postgres.indexes.%s' % self.index_class.__name__) self.assertEqual(args, ()) self.assertEqual(kwargs, {'fields': ['title'], 'name': 'test_title_%s' % self.index_class.suffix}) def test_deconstruction_with_expressions_no_customization(self): name = f'test_title_{self.index_class.suffix}' index = self.index_class(Lower('title'), name=name) path, args, kwargs = index.deconstruct() self.assertEqual( path, f'django.contrib.postgres.indexes.{self.index_class.__name__}', ) self.assertEqual(args, (Lower('title'),)) self.assertEqual(kwargs, {'name': name}) class BloomIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase): index_class = BloomIndex def test_suffix(self): self.assertEqual(BloomIndex.suffix, 'bloom') def test_deconstruction(self): index = BloomIndex(fields=['title'], name='test_bloom', length=80, columns=[4]) path, args, kwargs = index.deconstruct() self.assertEqual(path, 'django.contrib.postgres.indexes.BloomIndex') self.assertEqual(args, ()) self.assertEqual(kwargs, { 'fields': ['title'], 'name': 'test_bloom', 'length': 80, 'columns': [4], }) def test_invalid_fields(self): msg = 'Bloom indexes support a maximum of 32 fields.' with self.assertRaisesMessage(ValueError, msg): BloomIndex(fields=['title'] * 33, name='test_bloom') def test_invalid_columns(self): msg = 'BloomIndex.columns must be a list or tuple.' with self.assertRaisesMessage(ValueError, msg): BloomIndex(fields=['title'], name='test_bloom', columns='x') msg = 'BloomIndex.columns cannot have more values than fields.' with self.assertRaisesMessage(ValueError, msg): BloomIndex(fields=['title'], name='test_bloom', columns=[4, 3]) def test_invalid_columns_value(self): msg = 'BloomIndex.columns must contain integers from 1 to 4095.' for length in (0, 4096): with self.subTest(length), self.assertRaisesMessage(ValueError, msg): BloomIndex(fields=['title'], name='test_bloom', columns=[length]) def test_invalid_length(self): msg = 'BloomIndex.length must be None or an integer from 1 to 4096.' for length in (0, 4097): with self.subTest(length), self.assertRaisesMessage(ValueError, msg): BloomIndex(fields=['title'], name='test_bloom', length=length) class BrinIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase): index_class = BrinIndex def test_suffix(self): self.assertEqual(BrinIndex.suffix, 'brin') def test_deconstruction(self): index = BrinIndex(fields=['title'], name='test_title_brin', autosummarize=True, pages_per_range=16) path, args, kwargs = index.deconstruct() self.assertEqual(path, 'django.contrib.postgres.indexes.BrinIndex') self.assertEqual(args, ()) self.assertEqual(kwargs, { 'fields': ['title'], 'name': 'test_title_brin', 'autosummarize': True, 'pages_per_range': 16, }) def test_invalid_pages_per_range(self): with self.assertRaisesMessage(ValueError, 'pages_per_range must be None or a positive integer'): BrinIndex(fields=['title'], name='test_title_brin', pages_per_range=0) class BTreeIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase): index_class = BTreeIndex def test_suffix(self): self.assertEqual(BTreeIndex.suffix, 'btree') def test_deconstruction(self): index = BTreeIndex(fields=['title'], name='test_title_btree', fillfactor=80) path, args, kwargs = index.deconstruct() self.assertEqual(path, 'django.contrib.postgres.indexes.BTreeIndex') self.assertEqual(args, ()) self.assertEqual(kwargs, {'fields': ['title'], 'name': 'test_title_btree', 'fillfactor': 80}) class GinIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase): index_class = GinIndex def test_suffix(self): self.assertEqual(GinIndex.suffix, 'gin') def test_deconstruction(self): index = GinIndex( fields=['title'], name='test_title_gin', fastupdate=True, gin_pending_list_limit=128, ) path, args, kwargs = index.deconstruct() self.assertEqual(path, 'django.contrib.postgres.indexes.GinIndex') self.assertEqual(args, ()) self.assertEqual(kwargs, { 'fields': ['title'], 'name': 'test_title_gin', 'fastupdate': True, 'gin_pending_list_limit': 128, }) class GistIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase): index_class = GistIndex def test_suffix(self): self.assertEqual(GistIndex.suffix, 'gist') def test_deconstruction(self): index = GistIndex(fields=['title'], name='test_title_gist', buffering=False, fillfactor=80) path, args, kwargs = index.deconstruct() self.assertEqual(path, 'django.contrib.postgres.indexes.GistIndex') self.assertEqual(args, ()) self.assertEqual(kwargs, { 'fields': ['title'], 'name': 'test_title_gist', 'buffering': False, 'fillfactor': 80, }) class HashIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase): index_class = HashIndex def test_suffix(self): self.assertEqual(HashIndex.suffix, 'hash') def test_deconstruction(self): index = HashIndex(fields=['title'], name='test_title_hash', fillfactor=80) path, args, kwargs = index.deconstruct() self.assertEqual(path, 'django.contrib.postgres.indexes.HashIndex') self.assertEqual(args, ()) self.assertEqual(kwargs, {'fields': ['title'], 'name': 'test_title_hash', 'fillfactor': 80}) class SpGistIndexTests(IndexTestMixin, PostgreSQLSimpleTestCase): index_class = SpGistIndex def test_suffix(self): self.assertEqual(SpGistIndex.suffix, 'spgist') def test_deconstruction(self): index = SpGistIndex(fields=['title'], name='test_title_spgist', fillfactor=80) path, args, kwargs = index.deconstruct() self.assertEqual(path, 'django.contrib.postgres.indexes.SpGistIndex') self.assertEqual(args, ()) self.assertEqual(kwargs, {'fields': ['title'], 'name': 'test_title_spgist', 'fillfactor': 80}) @modify_settings(INSTALLED_APPS={'append': 'django.contrib.postgres'}) class SchemaTests(PostgreSQLTestCase): get_opclass_query = ''' SELECT opcname, c.relname FROM pg_opclass AS oc JOIN pg_index as i on oc.oid = ANY(i.indclass) JOIN pg_class as c on c.oid = i.indexrelid WHERE c.relname = %s ''' def get_constraints(self, table): """ Get the indexes on the table using a new cursor. """ with connection.cursor() as cursor: return connection.introspection.get_constraints(cursor, table) def test_gin_index(self): # Ensure the table is there and doesn't have an index. self.assertNotIn('field', self.get_constraints(IntegerArrayModel._meta.db_table)) # Add the index index_name = 'integer_array_model_field_gin' index = GinIndex(fields=['field'], name=index_name) with connection.schema_editor() as editor: editor.add_index(IntegerArrayModel, index) constraints = self.get_constraints(IntegerArrayModel._meta.db_table) # Check gin index was added self.assertEqual(constraints[index_name]['type'], GinIndex.suffix) # Drop the index with connection.schema_editor() as editor: editor.remove_index(IntegerArrayModel, index) self.assertNotIn(index_name, self.get_constraints(IntegerArrayModel._meta.db_table)) def test_gin_fastupdate(self): index_name = 'integer_array_gin_fastupdate' index = GinIndex(fields=['field'], name=index_name, fastupdate=False) with connection.schema_editor() as editor: editor.add_index(IntegerArrayModel, index) constraints = self.get_constraints(IntegerArrayModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], 'gin') self.assertEqual(constraints[index_name]['options'], ['fastupdate=off']) with connection.schema_editor() as editor: editor.remove_index(IntegerArrayModel, index) self.assertNotIn(index_name, self.get_constraints(IntegerArrayModel._meta.db_table)) def test_partial_gin_index(self): with register_lookup(CharField, Length): index_name = 'char_field_gin_partial_idx' index = GinIndex(fields=['field'], name=index_name, condition=Q(field__length=40)) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], 'gin') with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_partial_gin_index_with_tablespace(self): with register_lookup(CharField, Length): index_name = 'char_field_gin_partial_idx' index = GinIndex( fields=['field'], name=index_name, condition=Q(field__length=40), db_tablespace='pg_default', ) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) self.assertIn('TABLESPACE "pg_default" ', str(index.create_sql(CharFieldModel, editor))) constraints = self.get_constraints(CharFieldModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], 'gin') with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_gin_parameters(self): index_name = 'integer_array_gin_params' index = GinIndex(fields=['field'], name=index_name, fastupdate=True, gin_pending_list_limit=64) with connection.schema_editor() as editor: editor.add_index(IntegerArrayModel, index) constraints = self.get_constraints(IntegerArrayModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], 'gin') self.assertEqual(constraints[index_name]['options'], ['gin_pending_list_limit=64', 'fastupdate=on']) with connection.schema_editor() as editor: editor.remove_index(IntegerArrayModel, index) self.assertNotIn(index_name, self.get_constraints(IntegerArrayModel._meta.db_table)) def test_trigram_op_class_gin_index(self): index_name = 'trigram_op_class_gin' index = GinIndex(OpClass(F('scene'), name='gin_trgm_ops'), name=index_name) with connection.schema_editor() as editor: editor.add_index(Scene, index) with editor.connection.cursor() as cursor: cursor.execute(self.get_opclass_query, [index_name]) self.assertCountEqual(cursor.fetchall(), [('gin_trgm_ops', index_name)]) constraints = self.get_constraints(Scene._meta.db_table) self.assertIn(index_name, constraints) self.assertIn(constraints[index_name]['type'], GinIndex.suffix) with connection.schema_editor() as editor: editor.remove_index(Scene, index) self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table)) def test_cast_search_vector_gin_index(self): index_name = 'cast_search_vector_gin' index = GinIndex(Cast('field', SearchVectorField()), name=index_name) with connection.schema_editor() as editor: editor.add_index(TextFieldModel, index) sql = index.create_sql(TextFieldModel, editor) table = TextFieldModel._meta.db_table constraints = self.get_constraints(table) self.assertIn(index_name, constraints) self.assertIn(constraints[index_name]['type'], GinIndex.suffix) self.assertIs(sql.references_column(table, 'field'), True) self.assertIn(' AS tsvector', str(sql)) with connection.schema_editor() as editor: editor.remove_index(TextFieldModel, index) self.assertNotIn(index_name, self.get_constraints(table)) def test_bloom_index(self): index_name = 'char_field_model_field_bloom' index = BloomIndex(fields=['field'], name=index_name) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], BloomIndex.suffix) with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_bloom_parameters(self): index_name = 'char_field_model_field_bloom_params' index = BloomIndex(fields=['field'], name=index_name, length=512, columns=[3]) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], BloomIndex.suffix) self.assertEqual(constraints[index_name]['options'], ['length=512', 'col1=3']) with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_brin_index(self): index_name = 'char_field_model_field_brin' index = BrinIndex(fields=['field'], name=index_name, pages_per_range=4) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], BrinIndex.suffix) self.assertEqual(constraints[index_name]['options'], ['pages_per_range=4']) with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_brin_parameters(self): index_name = 'char_field_brin_params' index = BrinIndex(fields=['field'], name=index_name, autosummarize=True) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], BrinIndex.suffix) self.assertEqual(constraints[index_name]['options'], ['autosummarize=on']) with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_btree_index(self): # Ensure the table is there and doesn't have an index. self.assertNotIn('field', self.get_constraints(CharFieldModel._meta.db_table)) # Add the index. index_name = 'char_field_model_field_btree' index = BTreeIndex(fields=['field'], name=index_name) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) # The index was added. self.assertEqual(constraints[index_name]['type'], BTreeIndex.suffix) # Drop the index. with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_btree_parameters(self): index_name = 'integer_array_btree_fillfactor' index = BTreeIndex(fields=['field'], name=index_name, fillfactor=80) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], BTreeIndex.suffix) self.assertEqual(constraints[index_name]['options'], ['fillfactor=80']) with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_gist_index(self): # Ensure the table is there and doesn't have an index. self.assertNotIn('field', self.get_constraints(CharFieldModel._meta.db_table)) # Add the index. index_name = 'char_field_model_field_gist' index = GistIndex(fields=['field'], name=index_name) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) # The index was added. self.assertEqual(constraints[index_name]['type'], GistIndex.suffix) # Drop the index. with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_gist_parameters(self): index_name = 'integer_array_gist_buffering' index = GistIndex(fields=['field'], name=index_name, buffering=True, fillfactor=80) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], GistIndex.suffix) self.assertEqual(constraints[index_name]['options'], ['buffering=on', 'fillfactor=80']) with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) @skipUnlessDBFeature('supports_covering_gist_indexes') def test_gist_include(self): index_name = 'scene_gist_include_setting' index = GistIndex(name=index_name, fields=['scene'], include=['setting']) with connection.schema_editor() as editor: editor.add_index(Scene, index) constraints = self.get_constraints(Scene._meta.db_table) self.assertIn(index_name, constraints) self.assertEqual(constraints[index_name]['type'], GistIndex.suffix) self.assertEqual(constraints[index_name]['columns'], ['scene', 'setting']) with connection.schema_editor() as editor: editor.remove_index(Scene, index) self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table)) def test_gist_include_not_supported(self): index_name = 'gist_include_exception' index = GistIndex(fields=['scene'], name=index_name, include=['setting']) msg = 'Covering GiST indexes requires PostgreSQL 12+.' with self.assertRaisesMessage(NotSupportedError, msg): with mock.patch( 'django.db.backends.postgresql.features.DatabaseFeatures.supports_covering_gist_indexes', False, ): with connection.schema_editor() as editor: editor.add_index(Scene, index) self.assertNotIn(index_name, self.get_constraints(Scene._meta.db_table)) def test_tsvector_op_class_gist_index(self): index_name = 'tsvector_op_class_gist' index = GistIndex( OpClass( SearchVector('scene', 'setting', config='english'), name='tsvector_ops', ), name=index_name, ) with connection.schema_editor() as editor: editor.add_index(Scene, index) sql = index.create_sql(Scene, editor) table = Scene._meta.db_table constraints = self.get_constraints(table) self.assertIn(index_name, constraints) self.assertIn(constraints[index_name]['type'], GistIndex.suffix) self.assertIs(sql.references_column(table, 'scene'), True) self.assertIs(sql.references_column(table, 'setting'), True) with connection.schema_editor() as editor: editor.remove_index(Scene, index) self.assertNotIn(index_name, self.get_constraints(table)) def test_hash_index(self): # Ensure the table is there and doesn't have an index. self.assertNotIn('field', self.get_constraints(CharFieldModel._meta.db_table)) # Add the index. index_name = 'char_field_model_field_hash' index = HashIndex(fields=['field'], name=index_name) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) # The index was added. self.assertEqual(constraints[index_name]['type'], HashIndex.suffix) # Drop the index. with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_hash_parameters(self): index_name = 'integer_array_hash_fillfactor' index = HashIndex(fields=['field'], name=index_name, fillfactor=80) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], HashIndex.suffix) self.assertEqual(constraints[index_name]['options'], ['fillfactor=80']) with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_spgist_index(self): # Ensure the table is there and doesn't have an index. self.assertNotIn('field', self.get_constraints(CharFieldModel._meta.db_table)) # Add the index. index_name = 'char_field_model_field_spgist' index = SpGistIndex(fields=['field'], name=index_name) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) # The index was added. self.assertEqual(constraints[index_name]['type'], SpGistIndex.suffix) # Drop the index. with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_spgist_parameters(self): index_name = 'integer_array_spgist_fillfactor' index = SpGistIndex(fields=['field'], name=index_name, fillfactor=80) with connection.schema_editor() as editor: editor.add_index(CharFieldModel, index) constraints = self.get_constraints(CharFieldModel._meta.db_table) self.assertEqual(constraints[index_name]['type'], SpGistIndex.suffix) self.assertEqual(constraints[index_name]['options'], ['fillfactor=80']) with connection.schema_editor() as editor: editor.remove_index(CharFieldModel, index) self.assertNotIn(index_name, self.get_constraints(CharFieldModel._meta.db_table)) def test_op_class(self): index_name = 'test_op_class' index = Index( OpClass(Lower('field'), name='text_pattern_ops'), name=index_name, ) with connection.schema_editor() as editor: editor.add_index(TextFieldModel, index) with editor.connection.cursor() as cursor: cursor.execute(self.get_opclass_query, [index_name]) self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', index_name)]) def test_op_class_descending_collation(self): collation = connection.features.test_collations.get('non_default') if not collation: self.skipTest( 'This backend does not support case-insensitive collations.' ) index_name = 'test_op_class_descending_collation' index = Index( Collate( OpClass(Lower('field'), name='text_pattern_ops').desc(nulls_last=True), collation=collation, ), name=index_name, ) with connection.schema_editor() as editor: editor.add_index(TextFieldModel, index) self.assertIn( 'COLLATE %s' % editor.quote_name(collation), str(index.create_sql(TextFieldModel, editor)), ) with editor.connection.cursor() as cursor: cursor.execute(self.get_opclass_query, [index_name]) self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', index_name)]) table = TextFieldModel._meta.db_table constraints = self.get_constraints(table) self.assertIn(index_name, constraints) self.assertEqual(constraints[index_name]['orders'], ['DESC']) with connection.schema_editor() as editor: editor.remove_index(TextFieldModel, index) self.assertNotIn(index_name, self.get_constraints(table)) def test_op_class_descending_partial(self): index_name = 'test_op_class_descending_partial' index = Index( OpClass(Lower('field'), name='text_pattern_ops').desc(), name=index_name, condition=Q(field__contains='China'), ) with connection.schema_editor() as editor: editor.add_index(TextFieldModel, index) with editor.connection.cursor() as cursor: cursor.execute(self.get_opclass_query, [index_name]) self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', index_name)]) constraints = self.get_constraints(TextFieldModel._meta.db_table) self.assertIn(index_name, constraints) self.assertEqual(constraints[index_name]['orders'], ['DESC']) def test_op_class_descending_partial_tablespace(self): index_name = 'test_op_class_descending_partial_tablespace' index = Index( OpClass(Lower('field').desc(), name='text_pattern_ops'), name=index_name, condition=Q(field__contains='China'), db_tablespace='pg_default', ) with connection.schema_editor() as editor: editor.add_index(TextFieldModel, index) self.assertIn( 'TABLESPACE "pg_default" ', str(index.create_sql(TextFieldModel, editor)) ) with editor.connection.cursor() as cursor: cursor.execute(self.get_opclass_query, [index_name]) self.assertCountEqual(cursor.fetchall(), [('text_pattern_ops', index_name)]) constraints = self.get_constraints(TextFieldModel._meta.db_table) self.assertIn(index_name, constraints) self.assertEqual(constraints[index_name]['orders'], ['DESC'])