Refs #30581 -- Allowed sql.Query to be used without model.

This commit is contained in:
Gagaro 2022-01-31 15:51:38 +01:00 committed by Mariusz Felisiak
parent 970f5bf503
commit bf524d229f
4 changed files with 90 additions and 16 deletions

View File

@ -921,12 +921,15 @@ class RawSQL(Expression):
self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
):
# Resolve parents fields used in raw SQL.
for parent in query.model._meta.get_parent_list():
for parent_field in parent._meta.local_fields:
_, column_name = parent_field.get_attname_column()
if column_name.lower() in self.sql.lower():
query.resolve_ref(parent_field.name, allow_joins, reuse, summarize)
break
if query.model:
for parent in query.model._meta.get_parent_list():
for parent_field in parent._meta.local_fields:
_, column_name = parent_field.get_attname_column()
if column_name.lower() in self.sql.lower():
query.resolve_ref(
parent_field.name, allow_joins, reuse, summarize
)
break
return super().resolve_expression(
query, allow_joins, reuse, summarize, for_save
)

View File

@ -310,8 +310,8 @@ class SQLCompiler:
ordering = self.query.order_by
elif self.query.order_by:
ordering = self.query.order_by
elif self.query.get_meta().ordering:
ordering = self.query.get_meta().ordering
elif (meta := self.query.get_meta()) and meta.ordering:
ordering = meta.ordering
self._meta_ordering = ordering
else:
ordering = []
@ -645,7 +645,11 @@ class SQLCompiler:
params.extend(s_params)
out_cols.append(s_sql)
result += [", ".join(out_cols), "FROM", *from_]
result += [", ".join(out_cols)]
if from_:
result += ["FROM", *from_]
elif self.connection.features.bare_select_suffix:
result += [self.connection.features.bare_select_suffix]
params.extend(f_params)
if self.query.select_for_update and features.has_select_for_update:
@ -796,7 +800,8 @@ class SQLCompiler:
"""
result = []
if opts is None:
opts = self.query.get_meta()
if (opts := self.query.get_meta()) is None:
return result
only_load = self.deferred_to_columns()
start_alias = start_alias or self.query.get_initial_alias()
# The 'seen_models' is used to optimize checking the needed parent

View File

@ -46,6 +46,8 @@ __all__ = ["Query", "RawQuery"]
def get_field_names_from_opts(opts):
if opts is None:
return set()
return set(
chain.from_iterable(
(f.name, f.attname) if f.concrete else (f.name,) for f in opts.get_fields()
@ -301,7 +303,8 @@ class Query(BaseExpression):
processing. Normally, this is self.model._meta, but it can be changed
by subclasses.
"""
return self.model._meta
if self.model:
return self.model._meta
def clone(self):
"""
@ -994,8 +997,10 @@ class Query(BaseExpression):
if self.alias_map:
alias = self.base_table
self.ref_alias(alias)
else:
elif self.model:
alias = self.join(self.base_table_class(self.get_meta().db_table, None))
else:
alias = None
return alias
def count_active_tables(self):
@ -1619,6 +1624,8 @@ class Query(BaseExpression):
field = None
filtered_relation = None
try:
if opts is None:
raise FieldDoesNotExist
field = opts.get_field(name)
except FieldDoesNotExist:
if name in self.annotation_select:
@ -1673,7 +1680,7 @@ class Query(BaseExpression):
# Check if we need any joins for concrete inheritance cases (the
# field lives in parent, but we are currently in one of its
# children)
if model is not opts.model:
if opts is not None and model is not opts.model:
path_to_parent = opts.get_path_to_parent(model)
if path_to_parent:
path.extend(path_to_parent)

View File

@ -1,14 +1,23 @@
from datetime import datetime
from django.core.exceptions import FieldError
from django.db import DEFAULT_DB_ALIAS, connection
from django.db.models import BooleanField, CharField, F, Q
from django.db.models.expressions import Col, Func
from django.db.models.expressions import (
Col,
Exists,
ExpressionWrapper,
Func,
RawSQL,
Value,
)
from django.db.models.fields.related_lookups import RelatedIsNull
from django.db.models.functions import Lower
from django.db.models.lookups import Exact, GreaterThan, IsNull, LessThan
from django.db.models.sql.query import JoinPromoter, Query
from django.db.models.sql.constants import SINGLE
from django.db.models.sql.query import JoinPromoter, Query, get_field_names_from_opts
from django.db.models.sql.where import OR
from django.test import SimpleTestCase
from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature
from django.test.utils import register_lookup
from .models import Author, Item, ObjectC, Ranking
@ -152,6 +161,56 @@ class TestQuery(SimpleTestCase):
query.build_where(Func(output_field=CharField()))
class TestQueryNoModel(TestCase):
def test_rawsql_annotation(self):
query = Query(None)
sql = "%s IS NULL"
# Wrap with a CASE WHEN expression if a database backend (e.g. Oracle)
# doesn't support boolean expression in SELECT list.
if not connection.features.supports_boolean_expr_in_select_clause:
sql = f"CASE WHEN {sql} THEN 1 ELSE 0 END"
query.add_annotation(RawSQL(sql, (None,), BooleanField()), "_check")
result = query.get_compiler(using=DEFAULT_DB_ALIAS).execute_sql(SINGLE)
self.assertEqual(result[0], 1)
def test_subquery_annotation(self):
query = Query(None)
query.add_annotation(Exists(Item.objects.all()), "_check")
result = query.get_compiler(using=DEFAULT_DB_ALIAS).execute_sql(SINGLE)
self.assertEqual(result[0], 0)
@skipUnlessDBFeature("supports_boolean_expr_in_select_clause")
def test_q_annotation(self):
query = Query(None)
check = ExpressionWrapper(
Q(RawSQL("%s IS NULL", (None,), BooleanField()))
| Q(Exists(Item.objects.all())),
BooleanField(),
)
query.add_annotation(check, "_check")
result = query.get_compiler(using=DEFAULT_DB_ALIAS).execute_sql(SINGLE)
self.assertEqual(result[0], 1)
def test_names_to_path_field(self):
query = Query(None)
query.add_annotation(Value(True), "value")
path, final_field, targets, names = query.names_to_path(["value"], opts=None)
self.assertEqual(path, [])
self.assertIsInstance(final_field, BooleanField)
self.assertEqual(len(targets), 1)
self.assertIsInstance(targets[0], BooleanField)
self.assertEqual(names, [])
def test_names_to_path_field_error(self):
query = Query(None)
msg = "Cannot resolve keyword 'nonexistent' into field."
with self.assertRaisesMessage(FieldError, msg):
query.names_to_path(["nonexistent"], opts=None)
def test_get_field_names_from_opts(self):
self.assertEqual(get_field_names_from_opts(None), set())
class JoinPromoterTest(SimpleTestCase):
def test_repr(self):
self.assertEqual(