from datetime import date from decimal import Decimal from django.core.exceptions import FieldDoesNotExist from django.db.models.query import RawQuerySet from django.test import TestCase, skipUnlessDBFeature from .models import ( Author, Book, BookFkAsPk, Coffee, FriendlyAuthor, MixedCaseIDColumn, Reviewer, ) class RawQueryTests(TestCase): @classmethod def setUpTestData(cls): cls.a1 = Author.objects.create(first_name='Joe', last_name='Smith', dob=date(1950, 9, 20)) cls.a2 = Author.objects.create(first_name='Jill', last_name='Doe', dob=date(1920, 4, 2)) cls.a3 = Author.objects.create(first_name='Bob', last_name='Smith', dob=date(1986, 1, 25)) cls.a4 = Author.objects.create(first_name='Bill', last_name='Jones', dob=date(1932, 5, 10)) cls.b1 = Book.objects.create( title='The awesome book', author=cls.a1, paperback=False, opening_line='It was a bright cold day in April and the clocks were striking thirteen.', ) cls.b2 = Book.objects.create( title='The horrible book', author=cls.a1, paperback=True, opening_line=( 'On an evening in the latter part of May a middle-aged man ' 'was walking homeward from Shaston to the village of Marlott, ' 'in the adjoining Vale of Blakemore, or Blackmoor.' ), ) cls.b3 = Book.objects.create( title='Another awesome book', author=cls.a1, paperback=False, opening_line='A squat gray building of only thirty-four stories.', ) cls.b4 = Book.objects.create( title='Some other book', author=cls.a3, paperback=True, opening_line='It was the day my grandmother exploded.', ) cls.c1 = Coffee.objects.create(brand='dunkin doughnuts') cls.c2 = Coffee.objects.create(brand='starbucks') cls.r1 = Reviewer.objects.create() cls.r2 = Reviewer.objects.create() cls.r1.reviewed.add(cls.b2, cls.b3, cls.b4) def assertSuccessfulRawQuery(self, model, query, expected_results, expected_annotations=(), params=[], translations=None): """ Execute the passed query against the passed model and check the output """ results = list(model.objects.raw(query, params=params, translations=translations)) self.assertProcessed(model, results, expected_results, expected_annotations) self.assertAnnotations(results, expected_annotations) def assertProcessed(self, model, results, orig, expected_annotations=()): """ Compare the results of a raw query against expected results """ self.assertEqual(len(results), len(orig)) for index, item in enumerate(results): orig_item = orig[index] for annotation in expected_annotations: setattr(orig_item, *annotation) for field in model._meta.fields: # All values on the model are equal self.assertEqual( getattr(item, field.attname), getattr(orig_item, field.attname) ) # This includes checking that they are the same type self.assertEqual( type(getattr(item, field.attname)), type(getattr(orig_item, field.attname)) ) def assertNoAnnotations(self, results): """ The results of a raw query contain no annotations """ self.assertAnnotations(results, ()) def assertAnnotations(self, results, expected_annotations): """ The passed raw query results contain the expected annotations """ if expected_annotations: for index, result in enumerate(results): annotation, value = expected_annotations[index] self.assertTrue(hasattr(result, annotation)) self.assertEqual(getattr(result, annotation), value) def test_rawqueryset_repr(self): queryset = RawQuerySet(raw_query='SELECT * FROM raw_query_author') self.assertEqual(repr(queryset), '') self.assertEqual(repr(queryset.query), '') def test_simple_raw_query(self): """ Basic test of raw query with a simple database query """ query = "SELECT * FROM raw_query_author" authors = Author.objects.all() self.assertSuccessfulRawQuery(Author, query, authors) def test_raw_query_lazy(self): """ Raw queries are lazy: they aren't actually executed until they're iterated over. """ q = Author.objects.raw('SELECT * FROM raw_query_author') self.assertIsNone(q.query.cursor) list(q) self.assertIsNotNone(q.query.cursor) def test_FK_raw_query(self): """ Test of a simple raw query against a model containing a foreign key """ query = "SELECT * FROM raw_query_book" books = Book.objects.all() self.assertSuccessfulRawQuery(Book, query, books) def test_db_column_handler(self): """ Test of a simple raw query against a model containing a field with db_column defined. """ query = "SELECT * FROM raw_query_coffee" coffees = Coffee.objects.all() self.assertSuccessfulRawQuery(Coffee, query, coffees) def test_pk_with_mixed_case_db_column(self): """ A raw query with a model that has a pk db_column with mixed case. """ query = "SELECT * FROM raw_query_mixedcaseidcolumn" queryset = MixedCaseIDColumn.objects.all() self.assertSuccessfulRawQuery(MixedCaseIDColumn, query, queryset) def test_order_handler(self): """ Test of raw raw query's tolerance for columns being returned in any order """ selects = ( ('dob, last_name, first_name, id'), ('last_name, dob, first_name, id'), ('first_name, last_name, dob, id'), ) for select in selects: query = "SELECT %s FROM raw_query_author" % select authors = Author.objects.all() self.assertSuccessfulRawQuery(Author, query, authors) def test_translations(self): """ Test of raw query's optional ability to translate unexpected result column names to specific model fields """ query = "SELECT first_name AS first, last_name AS last, dob, id FROM raw_query_author" translations = {'first': 'first_name', 'last': 'last_name'} authors = Author.objects.all() self.assertSuccessfulRawQuery(Author, query, authors, translations=translations) def test_params(self): """ Test passing optional query parameters """ query = "SELECT * FROM raw_query_author WHERE first_name = %s" author = Author.objects.all()[2] params = [author.first_name] qset = Author.objects.raw(query, params=params) results = list(qset) self.assertProcessed(Author, results, [author]) self.assertNoAnnotations(results) self.assertEqual(len(results), 1) self.assertIsInstance(repr(qset), str) def test_params_none(self): query = "SELECT * FROM raw_query_author WHERE first_name like 'J%'" qset = Author.objects.raw(query, params=None) self.assertEqual(len(qset), 2) def test_escaped_percent(self): query = "SELECT * FROM raw_query_author WHERE first_name like 'J%%'" qset = Author.objects.raw(query) self.assertEqual(len(qset), 2) @skipUnlessDBFeature('supports_paramstyle_pyformat') def test_pyformat_params(self): """ Test passing optional query parameters """ query = "SELECT * FROM raw_query_author WHERE first_name = %(first)s" author = Author.objects.all()[2] params = {'first': author.first_name} qset = Author.objects.raw(query, params=params) results = list(qset) self.assertProcessed(Author, results, [author]) self.assertNoAnnotations(results) self.assertEqual(len(results), 1) self.assertIsInstance(repr(qset), str) def test_query_representation(self): """ Test representation of raw query with parameters """ query = "SELECT * FROM raw_query_author WHERE last_name = %(last)s" qset = Author.objects.raw(query, {'last': 'foo'}) self.assertEqual(repr(qset), "") self.assertEqual(repr(qset.query), "") query = "SELECT * FROM raw_query_author WHERE last_name = %s" qset = Author.objects.raw(query, {'foo'}) self.assertEqual(repr(qset), "") self.assertEqual(repr(qset.query), "") def test_many_to_many(self): """ Test of a simple raw query against a model containing a m2m field """ query = "SELECT * FROM raw_query_reviewer" reviewers = Reviewer.objects.all() self.assertSuccessfulRawQuery(Reviewer, query, reviewers) def test_extra_conversions(self): """Extra translations are ignored.""" query = "SELECT * FROM raw_query_author" translations = {'something': 'else'} authors = Author.objects.all() self.assertSuccessfulRawQuery(Author, query, authors, translations=translations) def test_missing_fields(self): query = "SELECT id, first_name, dob FROM raw_query_author" for author in Author.objects.raw(query): self.assertIsNotNone(author.first_name) # last_name isn't given, but it will be retrieved on demand self.assertIsNotNone(author.last_name) def test_missing_fields_without_PK(self): query = "SELECT first_name, dob FROM raw_query_author" msg = 'Raw query must include the primary key' with self.assertRaisesMessage(FieldDoesNotExist, msg): list(Author.objects.raw(query)) def test_annotations(self): query = ( "SELECT a.*, count(b.id) as book_count " "FROM raw_query_author a " "LEFT JOIN raw_query_book b ON a.id = b.author_id " "GROUP BY a.id, a.first_name, a.last_name, a.dob ORDER BY a.id" ) expected_annotations = ( ('book_count', 3), ('book_count', 0), ('book_count', 1), ('book_count', 0), ) authors = Author.objects.all() self.assertSuccessfulRawQuery(Author, query, authors, expected_annotations) def test_white_space_query(self): query = " SELECT * FROM raw_query_author" authors = Author.objects.all() self.assertSuccessfulRawQuery(Author, query, authors) def test_multiple_iterations(self): query = "SELECT * FROM raw_query_author" normal_authors = Author.objects.all() raw_authors = Author.objects.raw(query) # First Iteration first_iterations = 0 for index, raw_author in enumerate(raw_authors): self.assertEqual(normal_authors[index], raw_author) first_iterations += 1 # Second Iteration second_iterations = 0 for index, raw_author in enumerate(raw_authors): self.assertEqual(normal_authors[index], raw_author) second_iterations += 1 self.assertEqual(first_iterations, second_iterations) def test_get_item(self): # Indexing on RawQuerySets query = "SELECT * FROM raw_query_author ORDER BY id ASC" third_author = Author.objects.raw(query)[2] self.assertEqual(third_author.first_name, 'Bob') first_two = Author.objects.raw(query)[0:2] self.assertEqual(len(first_two), 2) with self.assertRaises(TypeError): Author.objects.raw(query)['test'] def test_inheritance(self): f = FriendlyAuthor.objects.create(first_name="Wesley", last_name="Chun", dob=date(1962, 10, 28)) query = "SELECT * FROM raw_query_friendlyauthor" self.assertEqual( [o.pk for o in FriendlyAuthor.objects.raw(query)], [f.pk] ) def test_query_count(self): self.assertNumQueries(1, list, Author.objects.raw("SELECT * FROM raw_query_author")) def test_subquery_in_raw_sql(self): list(Book.objects.raw('SELECT id FROM (SELECT * FROM raw_query_book WHERE paperback IS NOT NULL) sq')) def test_db_column_name_is_used_in_raw_query(self): """ Regression test that ensures the `column` attribute on the field is used to generate the list of fields included in the query, as opposed to the `attname`. This is important when the primary key is a ForeignKey field because `attname` and `column` are not necessarily the same. """ b = BookFkAsPk.objects.create(book=self.b1) self.assertEqual(list(BookFkAsPk.objects.raw('SELECT not_the_default FROM raw_query_bookfkaspk')), [b]) def test_decimal_parameter(self): c = Coffee.objects.create(brand='starbucks', price=20.5) qs = Coffee.objects.raw("SELECT * FROM raw_query_coffee WHERE price >= %s", params=[Decimal(20)]) self.assertEqual(list(qs), [c]) def test_result_caching(self): with self.assertNumQueries(1): books = Book.objects.raw('SELECT * FROM raw_query_book') list(books) list(books) def test_iterator(self): with self.assertNumQueries(2): books = Book.objects.raw('SELECT * FROM raw_query_book') list(books.iterator()) list(books.iterator()) def test_bool(self): self.assertIs(bool(Book.objects.raw('SELECT * FROM raw_query_book')), True) self.assertIs(bool(Book.objects.raw('SELECT * FROM raw_query_book WHERE id = 0')), False) def test_len(self): self.assertEqual(len(Book.objects.raw('SELECT * FROM raw_query_book')), 4) self.assertEqual(len(Book.objects.raw('SELECT * FROM raw_query_book WHERE id = 0')), 0)