94 lines
3.9 KiB
Python
94 lines
3.9 KiB
Python
from django.db.models.constants import LOOKUP_SEP
|
|
from django.db.models.fields import FieldDoesNotExist
|
|
from django.db.models.sql.expressions import SQLEvaluator
|
|
from django.db.models.sql.where import Constraint, WhereNode
|
|
from django.contrib.gis.db.models.fields import GeometryField
|
|
|
|
|
|
class GeoConstraint(Constraint):
|
|
"""
|
|
This subclass overrides `process` to better handle geographic SQL
|
|
construction.
|
|
"""
|
|
def __init__(self, init_constraint):
|
|
self.alias = init_constraint.alias
|
|
self.col = init_constraint.col
|
|
self.field = init_constraint.field
|
|
|
|
def process(self, lookup_type, value, connection):
|
|
if isinstance(value, SQLEvaluator):
|
|
# Make sure the F Expression destination field exists, and
|
|
# set an `srid` attribute with the same as that of the
|
|
# destination.
|
|
geo_fld = GeoWhereNode._check_geo_field(value.opts, value.expression.name)
|
|
if not geo_fld:
|
|
raise ValueError('No geographic field found in expression.')
|
|
value.srid = geo_fld.srid
|
|
db_type = self.field.db_type(connection=connection)
|
|
params = self.field.get_db_prep_lookup(lookup_type, value, connection=connection)
|
|
return (self.alias, self.col, db_type), params
|
|
|
|
|
|
class GeoWhereNode(WhereNode):
|
|
"""
|
|
Used to represent the SQL where-clause for spatial databases --
|
|
these are tied to the GeoQuery class that created it.
|
|
"""
|
|
|
|
def _prepare_data(self, data):
|
|
if isinstance(data, (list, tuple)):
|
|
obj, lookup_type, value = data
|
|
if (isinstance(obj, Constraint) and
|
|
isinstance(obj.field, GeometryField)):
|
|
data = (GeoConstraint(obj), lookup_type, value)
|
|
return super(GeoWhereNode, self)._prepare_data(data)
|
|
|
|
def make_atom(self, child, qn, connection):
|
|
lvalue, lookup_type, value_annot, params_or_value = child
|
|
if isinstance(lvalue, GeoConstraint):
|
|
data, params = lvalue.process(lookup_type, params_or_value, connection)
|
|
spatial_sql, spatial_params = connection.ops.spatial_lookup_sql(
|
|
data, lookup_type, params_or_value, lvalue.field, qn)
|
|
return spatial_sql, spatial_params + params
|
|
else:
|
|
return super(GeoWhereNode, self).make_atom(child, qn, connection)
|
|
|
|
@classmethod
|
|
def _check_geo_field(cls, opts, lookup):
|
|
"""
|
|
Utility for checking the given lookup with the given model options.
|
|
The lookup is a string either specifying the geographic field, e.g.
|
|
'point, 'the_geom', or a related lookup on a geographic field like
|
|
'address__point'.
|
|
|
|
If a GeometryField exists according to the given lookup on the model
|
|
options, it will be returned. Otherwise returns None.
|
|
"""
|
|
# This takes into account the situation where the lookup is a
|
|
# lookup to a related geographic field, e.g., 'address__point'.
|
|
field_list = lookup.split(LOOKUP_SEP)
|
|
|
|
# Reversing so list operates like a queue of related lookups,
|
|
# and popping the top lookup.
|
|
field_list.reverse()
|
|
fld_name = field_list.pop()
|
|
|
|
try:
|
|
geo_fld = opts.get_field(fld_name)
|
|
# If the field list is still around, then it means that the
|
|
# lookup was for a geometry field across a relationship --
|
|
# thus we keep on getting the related model options and the
|
|
# model field associated with the next field in the list
|
|
# until there's no more left.
|
|
while len(field_list):
|
|
opts = geo_fld.rel.to._meta
|
|
geo_fld = opts.get_field(field_list.pop())
|
|
except (FieldDoesNotExist, AttributeError):
|
|
return False
|
|
|
|
# Finally, make sure we got a Geographic field and return.
|
|
if isinstance(geo_fld, GeometryField):
|
|
return geo_fld
|
|
else:
|
|
return False
|