Cleanup, bug fixes, and new tests for the GEOS I/O objects and `GEOSBase` object.

git-svn-id: http://code.djangoproject.com/svn/django/trunk@10177 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
Justin Bronn 2009-03-28 16:36:48 +00:00
parent faf1609851
commit f0b7cc4a23
6 changed files with 203 additions and 35 deletions

View File

@ -41,9 +41,7 @@ class GEOSBase(object):
def _set_ptr(self, ptr): def _set_ptr(self, ptr):
# Only allow the pointer to be set with pointers of the # Only allow the pointer to be set with pointers of the
# compatible type or None (NULL). # compatible type or None (NULL).
if isinstance(ptr, int): if isinstance(ptr, (self.ptr_type, NoneType)):
self._ptr = self.ptr_type(ptr)
elif isinstance(ptr, (self.ptr_type, NoneType)):
self._ptr = ptr self._ptr = ptr
else: else:
raise TypeError('Incompatible pointer type') raise TypeError('Incompatible pointer type')

View File

@ -20,7 +20,6 @@ from django.contrib.gis.geos.mutable_list import ListMixin
# prototypes module -- which handles all interaction with # prototypes module -- which handles all interaction with
# the underlying GEOS library. # the underlying GEOS library.
from django.contrib.gis.geos import prototypes as capi from django.contrib.gis.geos import prototypes as capi
from django.contrib.gis.geos import io
# Regular expression for recognizing HEXEWKB and WKT. A prophylactic measure # Regular expression for recognizing HEXEWKB and WKT. A prophylactic measure
# to prevent potentially malicious input from reaching the underlying C # to prevent potentially malicious input from reaching the underlying C
@ -62,13 +61,13 @@ class GEOSGeometry(GEOSBase, ListMixin):
if wkt_m: if wkt_m:
# Handling WKT input. # Handling WKT input.
if wkt_m.group('srid'): srid = int(wkt_m.group('srid')) if wkt_m.group('srid'): srid = int(wkt_m.group('srid'))
g = io.wkt_r.read(wkt_m.group('wkt')) g = wkt_r.read(wkt_m.group('wkt'))
elif hex_regex.match(geo_input): elif hex_regex.match(geo_input):
# Handling HEXEWKB input. # Handling HEXEWKB input.
g = io.wkb_r.read(geo_input) g = wkb_r.read(geo_input)
elif gdal.GEOJSON and gdal.geometries.json_regex.match(geo_input): elif gdal.GEOJSON and gdal.geometries.json_regex.match(geo_input):
# Handling GeoJSON input. # Handling GeoJSON input.
g = io.wkb_r.read(gdal.OGRGeometry(geo_input).wkb) g = wkb_r.read(gdal.OGRGeometry(geo_input).wkb)
else: else:
raise ValueError('String or unicode input unrecognized as WKT EWKT, and HEXEWKB.') raise ValueError('String or unicode input unrecognized as WKT EWKT, and HEXEWKB.')
elif isinstance(geo_input, GEOM_PTR): elif isinstance(geo_input, GEOM_PTR):
@ -76,7 +75,7 @@ class GEOSGeometry(GEOSBase, ListMixin):
g = geo_input g = geo_input
elif isinstance(geo_input, buffer): elif isinstance(geo_input, buffer):
# When the input is a buffer (WKB). # When the input is a buffer (WKB).
g = io.wkb_r.read(geo_input) g = wkb_r.read(geo_input)
elif isinstance(geo_input, GEOSGeometry): elif isinstance(geo_input, GEOSGeometry):
g = capi.geom_clone(geo_input.ptr) g = capi.geom_clone(geo_input.ptr)
else: else:
@ -366,7 +365,7 @@ class GEOSGeometry(GEOSBase, ListMixin):
@property @property
def wkt(self): def wkt(self):
"Returns the WKT (Well-Known Text) of the Geometry." "Returns the WKT (Well-Known Text) of the Geometry."
return io.wkt_w.write(self.ptr) return wkt_w.write(self)
@property @property
def hex(self): def hex(self):
@ -377,7 +376,7 @@ class GEOSGeometry(GEOSBase, ListMixin):
""" """
# A possible faster, all-python, implementation: # A possible faster, all-python, implementation:
# str(self.wkb).encode('hex') # str(self.wkb).encode('hex')
return io.wkb_w.write_hex(self.ptr) return wkb_w.write_hex(self)
@property @property
def json(self): def json(self):
@ -394,7 +393,7 @@ class GEOSGeometry(GEOSBase, ListMixin):
@property @property
def wkb(self): def wkb(self):
"Returns the WKB of the Geometry as a buffer." "Returns the WKB of the Geometry as a buffer."
return io.wkb_w.write(self.ptr) return wkb_w.write(self)
@property @property
def kml(self): def kml(self):
@ -456,7 +455,7 @@ class GEOSGeometry(GEOSBase, ListMixin):
g = gdal.OGRGeometry(self.wkb, srid) g = gdal.OGRGeometry(self.wkb, srid)
g.transform(ct) g.transform(ct)
# Getting a new GEOS pointer # Getting a new GEOS pointer
ptr = io.wkb_r.read(g.wkb) ptr = wkb_r.read(g.wkb)
if clone: if clone:
# User wants a cloned transformed geometry returned. # User wants a cloned transformed geometry returned.
return GEOSGeometry(ptr, srid=g.srid) return GEOSGeometry(ptr, srid=g.srid)
@ -618,6 +617,9 @@ GEOS_CLASSES = {0 : Point,
7 : GeometryCollection, 7 : GeometryCollection,
} }
# Similarly, import the GEOS I/O instances here to avoid conflicts.
from django.contrib.gis.geos.io import wkt_r, wkt_w, wkb_r, wkb_w
# If supported, import the PreparedGeometry class. # If supported, import the PreparedGeometry class.
if GEOS_PREPARE: if GEOS_PREPARE:
from django.contrib.gis.geos.prepared import PreparedGeometry from django.contrib.gis.geos.prepared import PreparedGeometry

View File

@ -6,11 +6,12 @@ reader and writer classes.
from ctypes import byref, c_size_t from ctypes import byref, c_size_t
from django.contrib.gis.geos.base import GEOSBase from django.contrib.gis.geos.base import GEOSBase
from django.contrib.gis.geos.error import GEOSException from django.contrib.gis.geos.error import GEOSException
from django.contrib.gis.geos.geometry import GEOSGeometry
from django.contrib.gis.geos.libgeos import GEOM_PTR from django.contrib.gis.geos.libgeos import GEOM_PTR
from django.contrib.gis.geos.prototypes import io as capi from django.contrib.gis.geos.prototypes import io as capi
class IOBase(GEOSBase): class IOBase(GEOSBase):
"Base class for IO objects that that have `destroy` method." "Base class for GEOS I/O objects."
def __init__(self): def __init__(self):
# Getting the pointer with the constructor. # Getting the pointer with the constructor.
self.ptr = self.constructor() self.ptr = self.constructor()
@ -19,36 +20,43 @@ class IOBase(GEOSBase):
# Cleaning up with the appropriate destructor. # Cleaning up with the appropriate destructor.
if self._ptr: self.destructor(self._ptr) if self._ptr: self.destructor(self._ptr)
def _get_geom_ptr(self, geom):
if hasattr(geom, 'ptr'): geom = geom.ptr
if not isinstance(geom, GEOM_PTR): raise TypeError
return geom
### WKT Reading and Writing objects ### ### WKT Reading and Writing objects ###
class WKTReader(IOBase):
# Non-public class for internal use because its `read` method returns
# _pointers_ instead of a GEOSGeometry object.
class _WKTReader(IOBase):
constructor = capi.wkt_reader_create constructor = capi.wkt_reader_create
destructor = capi.wkt_reader_destroy destructor = capi.wkt_reader_destroy
ptr_type = capi.WKT_READ_PTR ptr_type = capi.WKT_READ_PTR
def read(self, wkt, ptr=False): def read(self, wkt):
if not isinstance(wkt, basestring): raise TypeError if not isinstance(wkt, basestring): raise TypeError
return capi.wkt_reader_read(self.ptr, wkt) return capi.wkt_reader_read(self.ptr, wkt)
class WKTReader(_WKTReader):
def read(self, wkt):
"Returns a GEOSGeometry for the given WKT string."
return GEOSGeometry(super(WKTReader, self).read(wkt))
class WKTWriter(IOBase): class WKTWriter(IOBase):
constructor = capi.wkt_writer_create constructor = capi.wkt_writer_create
destructor = capi.wkt_writer_destroy destructor = capi.wkt_writer_destroy
ptr_type = capi.WKT_WRITE_PTR ptr_type = capi.WKT_WRITE_PTR
def write(self, geom): def write(self, geom):
return capi.wkt_writer_write(self.ptr, self._get_geom_ptr(geom)) "Returns the WKT representation of the given geometry."
return capi.wkt_writer_write(self.ptr, geom.ptr)
### WKB Reading and Writing objects ### ### WKB Reading and Writing objects ###
class WKBReader(IOBase):
# Non-public class for the same reason as _WKTReader above.
class _WKBReader(IOBase):
constructor = capi.wkb_reader_create constructor = capi.wkb_reader_create
destructor = capi.wkb_reader_destroy destructor = capi.wkb_reader_destroy
ptr_type = capi.WKB_READ_PTR ptr_type = capi.WKB_READ_PTR
def read(self, wkb): def read(self, wkb):
"Returns a _pointer_ to C GEOS Geometry object from the given WKB."
if isinstance(wkb, buffer): if isinstance(wkb, buffer):
wkb_s = str(wkb) wkb_s = str(wkb)
return capi.wkb_reader_read(self.ptr, wkb_s, len(wkb_s)) return capi.wkb_reader_read(self.ptr, wkb_s, len(wkb_s))
@ -57,16 +65,23 @@ class WKBReader(IOBase):
else: else:
raise TypeError raise TypeError
class WKBReader(_WKBReader):
def read(self, wkb):
"Returns a GEOSGeometry for the given WKB buffer."
return GEOSGeometry(super(WKBReader, self).read(wkb))
class WKBWriter(IOBase): class WKBWriter(IOBase):
constructor = capi.wkb_writer_create constructor = capi.wkb_writer_create
destructor = capi.wkb_writer_destroy destructor = capi.wkb_writer_destroy
ptr_type = capi.WKB_READ_PTR ptr_type = capi.WKB_WRITE_PTR
def write(self, geom): def write(self, geom):
return buffer(capi.wkb_writer_write(self.ptr, self._get_geom_ptr(geom), byref(c_size_t()))) "Returns the WKB representation of the given geometry."
return buffer(capi.wkb_writer_write(self.ptr, geom.ptr, byref(c_size_t())))
def write_hex(self, geom): def write_hex(self, geom):
return capi.wkb_writer_write_hex(self.ptr, self._get_geom_ptr(geom), byref(c_size_t())) "Returns the HEXEWKB representation of the given geometry."
return capi.wkb_writer_write_hex(self.ptr, geom.ptr, byref(c_size_t()))
### WKBWriter Properties ### ### WKBWriter Properties ###
@ -102,8 +117,7 @@ class WKBWriter(IOBase):
srid = property(_get_include_srid, _set_include_srid) srid = property(_get_include_srid, _set_include_srid)
# Instances of the WKT and WKB reader/writer objects. # Instances of the WKT and WKB reader/writer objects.
wkt_r = WKTReader() wkt_r = _WKTReader()
wkt_w = WKTWriter() wkt_w = WKTWriter()
wkb_r = WKBReader() wkb_r = _WKBReader()
wkb_w = WKBWriter() wkb_w = WKBWriter()

View File

@ -2,10 +2,11 @@
GEOS Testing module. GEOS Testing module.
""" """
from unittest import TestSuite, TextTestRunner from unittest import TestSuite, TextTestRunner
import test_geos, test_geos_mutation, test_mutable_list import test_geos, test_io, test_geos_mutation, test_mutable_list
test_suites = [ test_suites = [
test_geos.suite(), test_geos.suite(),
test_io.suite(),
test_geos_mutation.suite(), test_geos_mutation.suite(),
test_mutable_list.suite(), test_mutable_list.suite(),
] ]

View File

@ -1,7 +1,6 @@
import random, unittest, sys import ctypes, random, unittest, sys
from ctypes import ArgumentError
from django.contrib.gis.geos import * from django.contrib.gis.geos import *
from django.contrib.gis.geos.base import gdal, numpy from django.contrib.gis.geos.base import gdal, numpy, GEOSBase
from django.contrib.gis.tests.geometries import * from django.contrib.gis.tests.geometries import *
class GEOSTest(unittest.TestCase): class GEOSTest(unittest.TestCase):
@ -18,6 +17,48 @@ class GEOSTest(unittest.TestCase):
else: else:
return None return None
def test00_base(self):
"Tests out the GEOSBase class."
# Testing out GEOSBase class, which provides a `ptr` property
# that abstracts out access to underlying C pointers.
class FakeGeom1(GEOSBase):
pass
# This one only accepts pointers to floats
c_float_p = ctypes.POINTER(ctypes.c_float)
class FakeGeom2(GEOSBase):
ptr_type = c_float_p
# Default ptr_type is `c_void_p`.
fg1 = FakeGeom1()
# Default ptr_type is C float pointer
fg2 = FakeGeom2()
# These assignments are OK -- None is allowed because
# it's equivalent to the NULL pointer.
fg1.ptr = ctypes.c_void_p()
fg1.ptr = None
fg2.ptr = c_float_p(ctypes.c_float(5.23))
fg2.ptr = None
# Because pointers have been set to NULL, an exception should be
# raised when we try to access it. Raising an exception is
# preferrable to a segmentation fault that commonly occurs when
# a C method is given a NULL memory reference.
for fg in (fg1, fg2):
# Equivalent to `fg.ptr`
self.assertRaises(GEOSException, fg._get_ptr)
# Anything that is either not None or the acceptable pointer type will
# result in a TypeError when trying to assign it to the `ptr` property.
# Thus, memmory addresses (integers) and pointers of the incorrect type
# (in `bad_ptrs`) will not be allowed.
bad_ptrs = (5, ctypes.c_char_p('foobar'))
for bad_ptr in bad_ptrs:
# Equivalent to `fg.ptr = bad_ptr`
self.assertRaises(TypeError, fg1._set_ptr, bad_ptr)
self.assertRaises(TypeError, fg2._set_ptr, bad_ptr)
def test01a_wkt(self): def test01a_wkt(self):
"Testing WKT output." "Testing WKT output."
for g in wkt_out: for g in wkt_out:
@ -494,7 +535,7 @@ class GEOSTest(unittest.TestCase):
exp_buf = fromstr(g_tup[1].wkt) exp_buf = fromstr(g_tup[1].wkt)
# Can't use a floating-point for the number of quadsegs. # Can't use a floating-point for the number of quadsegs.
self.assertRaises(ArgumentError, g.buffer, g_tup[2], float(g_tup[3])) self.assertRaises(ctypes.ArgumentError, g.buffer, g_tup[2], float(g_tup[3]))
# Constructing our buffer # Constructing our buffer
buf = g.buffer(g_tup[2], g_tup[3]) buf = g.buffer(g_tup[2], g_tup[3])
@ -518,7 +559,7 @@ class GEOSTest(unittest.TestCase):
self.assertEqual(4326, pnt.srid) self.assertEqual(4326, pnt.srid)
pnt.srid = 3084 pnt.srid = 3084
self.assertEqual(3084, pnt.srid) self.assertEqual(3084, pnt.srid)
self.assertRaises(ArgumentError, pnt.set_srid, '4326') self.assertRaises(ctypes.ArgumentError, pnt.set_srid, '4326')
# Testing SRID keyword on fromstr(), and on Polygon rings. # Testing SRID keyword on fromstr(), and on Polygon rings.
poly = fromstr(polygons[1].wkt, srid=4269) poly = fromstr(polygons[1].wkt, srid=4269)

View File

@ -0,0 +1,112 @@
import binascii, ctypes, unittest
from django.contrib.gis.geos import GEOSGeometry, WKTReader, WKTWriter, WKBReader, WKBWriter, geos_version_info
class GEOSIOTest(unittest.TestCase):
def test01_wktreader(self):
# Creating a WKTReader instance
wkt_r = WKTReader()
wkt = 'POINT (5 23)'
# read() should return a GEOSGeometry
ref = GEOSGeometry(wkt)
g1 = wkt_r.read(wkt)
g2 = wkt_r.read(unicode(wkt))
for geom in (g1, g2):
self.assertEqual(ref, geom)
# Should only accept basestring objects.
self.assertRaises(TypeError, wkt_r.read, 1)
self.assertRaises(TypeError, wkt_r.read, buffer('foo'))
def test02_wktwriter(self):
# Creating a WKTWriter instance, testing its ptr property.
wkt_w = WKTWriter()
self.assertRaises(TypeError, wkt_w._set_ptr, WKTReader.ptr_type())
ref = GEOSGeometry('POINT (5 23)')
ref_wkt = 'POINT (5.0000000000000000 23.0000000000000000)'
self.assertEqual(ref_wkt, wkt_w.write(ref))
def test03_wkbreader(self):
# Creating a WKBReader instance
wkb_r = WKBReader()
hex = '000000000140140000000000004037000000000000'
wkb = buffer(binascii.a2b_hex(hex))
ref = GEOSGeometry(hex)
# read() should return a GEOSGeometry on either a hex string or
# a WKB buffer.
g1 = wkb_r.read(wkb)
g2 = wkb_r.read(hex)
for geom in (g1, g2):
self.assertEqual(ref, geom)
bad_input = (1, 5.23, None, False)
for bad_wkb in bad_input:
self.assertRaises(TypeError, wkb_r.read, bad_wkb)
def test04_wkbwriter(self):
wkb_w = WKBWriter()
# Representations of 'POINT (5 23)' in hex -- one normal and
# the other with the byte order changed.
g = GEOSGeometry('POINT (5 23)')
hex1 = '010100000000000000000014400000000000003740'
wkb1 = buffer(binascii.a2b_hex(hex1))
hex2 = '000000000140140000000000004037000000000000'
wkb2 = buffer(binascii.a2b_hex(hex2))
self.assertEqual(hex1, wkb_w.write_hex(g))
self.assertEqual(wkb1, wkb_w.write(g))
# Ensuring bad byteorders are not accepted.
for bad_byteorder in (-1, 2, 523, 'foo', None):
# Equivalent of `wkb_w.byteorder = bad_byteorder`
self.assertRaises(ValueError, wkb_w._set_byteorder, bad_byteorder)
# Setting the byteorder to 0 (for Big Endian)
wkb_w.byteorder = 0
self.assertEqual(hex2, wkb_w.write_hex(g))
self.assertEqual(wkb2, wkb_w.write(g))
# Back to Little Endian
wkb_w.byteorder = 1
# Now, trying out the 3D and SRID flags.
g = GEOSGeometry('POINT (5 23 17)')
g.srid = 4326
hex3d = '0101000080000000000000144000000000000037400000000000003140'
wkb3d = buffer(binascii.a2b_hex(hex3d))
hex3d_srid = '01010000A0E6100000000000000000144000000000000037400000000000003140'
wkb3d_srid = buffer(binascii.a2b_hex(hex3d_srid))
# Ensuring bad output dimensions are not accepted
for bad_outdim in (-1, 0, 1, 4, 423, 'foo', None):
# Equivalent of `wkb_w.outdim = bad_outdim`
self.assertRaises(ValueError, wkb_w._set_outdim, bad_outdim)
# These tests will fail on 3.0.0 because of a bug that was fixed in 3.1:
# http://trac.osgeo.org/geos/ticket/216
if not geos_version_info()['version'] == '3.0.0':
# Now setting the output dimensions to be 3
wkb_w.outdim = 3
self.assertEqual(hex3d, wkb_w.write_hex(g))
self.assertEqual(wkb3d, wkb_w.write(g))
# Telling the WKBWriter to inlcude the srid in the representation.
wkb_w.srid = True
self.assertEqual(hex3d_srid, wkb_w.write_hex(g))
self.assertEqual(wkb3d_srid, wkb_w.write(g))
def suite():
s = unittest.TestSuite()
s.addTest(unittest.makeSuite(GEOSIOTest))
return s
def run(verbosity=2):
unittest.TextTestRunner(verbosity=verbosity).run(suite())