Source code for geoalchemy2.elements

from __future__ import annotations

import binascii
import re
import struct
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Union

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql import functions
from sqlalchemy.sql.functions import FunctionElement
from sqlalchemy.types import to_instance

from geoalchemy2.exc import ArgumentError

BinasciiError = binascii.Error

function_registry: Set[str] = set()


[docs] class _SpatialElement: """The base class for public spatial elements. Args: data: The first argument passed to the constructor is the data wrapped by the ``_SpatialElement`` object being constructed. srid: An integer representing the spatial reference system. E.g. ``4326``. Default value is ``-1``, which means no/unknown reference system. extended: A boolean indicating whether the extended format (EWKT or EWKB) is used. Default is ``None``. """ def __init__(self, data, srid: int = -1, extended: Optional[bool] = None) -> None: self.srid = srid self.data = data self.extended = extended def __str__(self) -> str: return self.desc def __repr__(self) -> str: return "<%s at 0x%x; %s>" % ( self.__class__.__name__, id(self), self, ) # pragma: no cover def __eq__(self, other) -> bool: try: return ( self.extended == other.extended and self.srid == other.srid and self.desc == other.desc ) except AttributeError: return False def __ne__(self, other) -> bool: return not self.__eq__(other) def __hash__(self): return hash((self.desc, self.srid, self.extended)) def __getattr__(self, name): # # This is how things like lake.geom.ST_Buffer(2) creates # SQL expressions of this form: # # ST_Buffer(ST_GeomFromWKB(:ST_GeomFromWKB_1), :param_1) # # Raise an AttributeError when the attribute name doesn't start # with st_. This is to be nice with other libraries that use # some ducktyping (e.g. hasattr(element, "copy")) to determine # the type of the element. if name.lower() not in function_registry: raise AttributeError # We create our own _FunctionGenerator here, and use it in place of # SQLAlchemy's "func" object. This is to be able to "bind" the # function to the SQL expression. See also GenericFunction above. func_ = functions._FunctionGenerator(expr=self) return getattr(func_, name) def __getstate__(self) -> Dict[str, Any]: state = { "srid": self.srid, "data": str(self), "extended": self.extended, } return state def __setstate__(self, state: Dict[str, Any]) -> None: self.srid = state["srid"] self.extended = state["extended"] self.data = self._data_from_desc(state["data"]) @staticmethod def _data_from_desc(desc): raise NotImplementedError() # pragma: no cover
[docs] class WKTElement(_SpatialElement): """Instances of this class wrap a WKT or EWKT value. Usage examples:: wkt_element_1 = WKTElement('POINT(5 45)') wkt_element_2 = WKTElement('POINT(5 45)', srid=4326) wkt_element_3 = WKTElement('SRID=4326;POINT(5 45)', extended=True) """ _REMOVE_SRID = re.compile("(SRID=([0-9]+); ?)?(.*)") geom_from: str = "ST_GeomFromText" geom_from_extended_version: str = "ST_GeomFromEWKT" def __init__(self, data: str, srid: int = -1, extended: Optional[bool] = None) -> None: if extended is None: extended = data.startswith("SRID=") if extended and srid == -1: # read srid from EWKT data_s = data.split(";") if len(data_s) != 2: raise ArgumentError("invalid EWKT string {}".format(data)) header = data_s[0] try: srid = int(header[5:]) except ValueError: raise ArgumentError("invalid EWKT string {}".format(data)) _SpatialElement.__init__(self, data, srid, extended) @property def desc(self) -> str: """This element's description string.""" return self.data @staticmethod def _data_from_desc(desc): return desc
[docs] def as_wkt(self) -> WKTElement: if self.extended: srid_match = self._REMOVE_SRID.match(self.data) assert srid_match is not None return WKTElement(srid_match.group(3), self.srid, extended=False) return WKTElement(self.data, self.srid, self.extended)
[docs] def as_ewkt(self) -> WKTElement: if not self.extended and self.srid != -1: data = f"SRID={self.srid};" + self.data return WKTElement(data, extended=True) return WKTElement(self.data, self.srid, self.extended)
[docs] class WKBElement(_SpatialElement): """Instances of this class wrap a WKB or EWKB value. Geometry values read from the database are converted to instances of this type. In most cases you won't need to create ``WKBElement`` instances yourself. If ``extended`` is ``True`` and ``srid`` is ``-1`` at construction time then the SRID will be read from the EWKB data. Note: you can create ``WKBElement`` objects from Shapely geometries using the :func:`geoalchemy2.shape.from_shape` function. """ geom_from: str = "ST_GeomFromWKB" geom_from_extended_version: str = "ST_GeomFromEWKB" def __init__( self, data: Union[str, bytes, memoryview], srid: int = -1, extended: Optional[bool] = None ) -> None: if srid == -1 or extended is None or extended: # read srid from the EWKB # # WKB struct { # byte byteOrder; # uint32 wkbType; # uint32 SRID; # struct geometry; # } # byteOrder enum { # WKB_XDR = 0, // Most Significant Byte First # WKB_NDR = 1, // Least Significant Byte First # } # See https://trac.osgeo.org/postgis/browser/branches/3.0/doc/ZMSgeoms.txt # for more details about WKB/EWKB specifications. if isinstance(data, str): # SpatiaLite case # assume that the string is an hex value header = binascii.unhexlify(data[:18]) else: header = data[:9] byte_order, wkb_type, wkb_srid = header[0], header[1:5], header[5:] byte_order_marker = "<I" if byte_order else ">I" wkb_type_int = ( int(struct.unpack(byte_order_marker, wkb_type)[0]) if len(wkb_type) == 4 else 0 ) if extended is None: if not wkb_type_int: extended = False else: extended = extended or bool(wkb_type_int & 536870912) # Check SRID bit if extended and srid == -1: wkb_srid = struct.unpack(byte_order_marker, wkb_srid)[0] srid = int(wkb_srid) _SpatialElement.__init__(self, data, srid, extended) @property def desc(self) -> str: """This element's description string.""" if isinstance(self.data, str): # SpatiaLite case return self.data.lower() desc = str(binascii.hexlify(self.data), encoding="utf-8").lower() return desc @staticmethod def _data_from_desc(desc) -> bytes: desc = desc.encode(encoding="utf-8") return binascii.unhexlify(desc)
[docs] def as_wkb(self) -> WKBElement: if self.extended: if isinstance(self.data, str): # SpatiaLite case # assume that the string is an hex value is_hex = True header = binascii.unhexlify(self.data[:10]) byte_order, wkb_type = header[0], header[1:5] else: is_hex = False byte_order, wkb_type = self.data[0], self.data[1:5] byte_order_marker = "<I" if byte_order else ">I" wkb_type_int = ( int(struct.unpack(byte_order_marker, wkb_type)[0]) if len(wkb_type) == 4 else 0 ) wkb_type_int &= 3758096383 # Set SRID bit to 0 and keep all other bits if is_hex: wkb_type_hex = binascii.hexlify( wkb_type_int.to_bytes(4, "little" if byte_order else "big") ) data = self.data[:2] + wkb_type_hex.decode("ascii") + self.data[18:] else: buffer = bytearray() buffer.extend(self.data[:1]) buffer.extend(struct.pack(byte_order_marker, wkb_type_int)) buffer.extend(self.data[9:]) data = memoryview(buffer) return WKBElement(data, self.srid, extended=False) return WKBElement(self.data, self.srid)
[docs] def as_ewkb(self) -> WKBElement: if not self.extended and self.srid != -1: if isinstance(self.data, str): # SpatiaLite case # assume that the string is an hex value header = binascii.unhexlify(self.data[:10]) byte_order, wkb_type = header[0], header[1:5] else: byte_order, wkb_type = self.data[0], self.data[1:5] byte_order_marker = "<I" if byte_order else ">I" wkb_type_int = int( struct.unpack(byte_order_marker, wkb_type)[0] if len(wkb_type) == 4 else 0 ) wkb_type_int |= 536870912 # Set SRID bit to 1 and keep all other bits data: Union[str, memoryview] if isinstance(self.data, str): wkb_type_hex = binascii.hexlify( wkb_type_int.to_bytes(4, "little" if byte_order else "big") ) wkb_srid_hex = binascii.hexlify( self.srid.to_bytes(4, "little" if byte_order else "big") ) data = ( self.data[:2] + wkb_type_hex.decode("ascii") + wkb_srid_hex.decode("ascii") + self.data[10:] ) else: buffer = bytearray() buffer.extend(self.data[:1]) buffer.extend(struct.pack(byte_order_marker, wkb_type_int)) buffer.extend(struct.pack(byte_order_marker, self.srid)) buffer.extend(self.data[5:]) data = memoryview(buffer) return WKBElement(data, self.srid, extended=True) return WKBElement(self.data, self.srid)
[docs] class RasterElement(_SpatialElement): """Instances of this class wrap a ``raster`` value. Raster values read from the database are converted to instances of this type. In most cases you won't need to create ``RasterElement`` instances yourself. """ geom_from_extended_version: str = "raster" def __init__(self, data: Union[str, bytes, memoryview]) -> None: # read srid from the WKB (binary or hexadecimal format) # The WKB structure is documented in the file # raster/doc/RFC2-WellKnownBinaryFormat of the PostGIS sources. bin_data: Union[str, bytes, memoryview] try: bin_data = binascii.unhexlify(data[:114]) except BinasciiError: bin_data = data data = str(binascii.hexlify(data).decode(encoding="utf-8")) # type: ignore byte_order = bin_data[0] srid = bin_data[53:57] srid = struct.unpack("<I" if byte_order else ">I", srid)[0] # type: ignore _SpatialElement.__init__(self, data, int(srid), True) @property def desc(self) -> str: """This element's description string.""" return self.data @staticmethod def _data_from_desc(desc): return desc
[docs] class CompositeElement(FunctionElement): """Instances of this class wrap a Postgres composite type.""" inherit_cache: bool = False """The cache is disabled for this class.""" def __init__(self, base, field, type_) -> None: self.name = field self.type = to_instance(type_) super(CompositeElement, self).__init__(base)
@compiles(CompositeElement) def _compile_pgelem(expr, compiler, **kw) -> str: return "(%s).%s" % (compiler.process(expr.clauses, **kw), expr.name) __all__: List[str] = [ "_SpatialElement", "CompositeElement", "RasterElement", "WKBElement", "WKTElement", ] def __dir__() -> List[str]: return __all__