from __future__ import annotations
import binascii
import re
import struct
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Union
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql import functions
from sqlalchemy.sql.functions import FunctionElement
from sqlalchemy.types import to_instance
from geoalchemy2.exc import ArgumentError
BinasciiError = binascii.Error
function_registry: Set[str] = set()
[docs]
class _SpatialElement:
"""The base class for public spatial elements.
Args:
data: The first argument passed to the constructor is the data wrapped
by the ``_SpatialElement`` object being constructed.
srid: An integer representing the spatial reference system. E.g. ``4326``.
Default value is ``-1``, which means no/unknown reference system.
extended: A boolean indicating whether the extended format (EWKT or EWKB)
is used. Default is ``None``.
"""
# Define __slots__ to restrict attributes in this class.
# This is done intentionally to improve performance by preventing
# the creation of a dynamic __dict__ for each instance.
__slots__ = ("srid", "data", "extended")
def __init__(self, data, srid: int = -1, extended: Optional[bool] = None) -> None:
self.srid = srid
self.data = data
self.extended = extended
def __str__(self) -> str:
return self.desc
def __repr__(self) -> str:
return "<%s at 0x%x; %s>" % (
self.__class__.__name__,
id(self),
self,
) # pragma: no cover
def __eq__(self, other) -> bool:
try:
return (
self.extended == other.extended
and self.srid == other.srid
and self.desc == other.desc
)
except AttributeError:
return False
def __ne__(self, other) -> bool:
return not self.__eq__(other)
def __hash__(self):
return hash((self.desc, self.srid, self.extended))
def __getattr__(self, name):
#
# This is how things like lake.geom.ST_Buffer(2) creates
# SQL expressions of this form:
#
# ST_Buffer(ST_GeomFromWKB(:ST_GeomFromWKB_1), :param_1)
#
# Raise an AttributeError when the attribute name doesn't start
# with st_. This is to be nice with other libraries that use
# some ducktyping (e.g. hasattr(element, "copy")) to determine
# the type of the element.
if name.lower() not in function_registry:
raise AttributeError
# We create our own _FunctionGenerator here, and use it in place of
# SQLAlchemy's "func" object. This is to be able to "bind" the
# function to the SQL expression. See also GenericFunction above.
func_ = functions._FunctionGenerator(expr=self)
return getattr(func_, name)
def __getstate__(self) -> Dict[str, Any]:
state = {
"srid": self.srid,
"data": str(self),
"extended": self.extended,
}
return state
def __setstate__(self, state: Dict[str, Any]) -> None:
self.srid = state["srid"]
self.extended = state["extended"]
self.data = self._data_from_desc(state["data"])
@staticmethod
def _data_from_desc(desc):
raise NotImplementedError() # pragma: no cover
[docs]
class WKTElement(_SpatialElement):
"""Instances of this class wrap a WKT or EWKT value.
Usage examples::
wkt_element_1 = WKTElement('POINT(5 45)')
wkt_element_2 = WKTElement('POINT(5 45)', srid=4326)
wkt_element_3 = WKTElement('SRID=4326;POINT(5 45)', extended=True)
Note::
This class uses ``__slots__`` to restrict its attributes and improve memory efficiency by
preventing the creation of a dynamic ``__dict__`` for each instance.
If you require dynamic attributes or support for weak references, use the
``DynamicWKTElement`` subclass, which provides these capabilities.
"""
__slots__ = ()
_REMOVE_SRID = re.compile("(SRID=([0-9]+); ?)?(.*)")
SPLIT_WKT_PATTERN = re.compile(r"((SRID=\d+) *; *)?([\w ]+) *(\([-\d\. ,\(\)]+\))")
geom_from: str = "ST_GeomFromText"
geom_from_extended_version: str = "ST_GeomFromEWKT"
def __init__(self, data: str, srid: int = -1, extended: Optional[bool] = None) -> None:
if extended is None:
extended = data.startswith("SRID=")
if extended and srid == -1:
# read srid from EWKT
data_s = data.split(";")
if len(data_s) != 2:
raise ArgumentError("invalid EWKT string {}".format(data))
header = data_s[0]
try:
srid = int(header[5:])
except ValueError:
raise ArgumentError("invalid EWKT string {}".format(data))
_SpatialElement.__init__(self, data, srid, extended)
@property
def desc(self) -> str:
"""This element's description string."""
return self.data
@staticmethod
def _data_from_desc(desc):
return desc
[docs]
def as_wkt(self) -> WKTElement:
if self.extended:
srid_match = self._REMOVE_SRID.match(self.data)
assert srid_match is not None
return WKTElement(srid_match.group(3), self.srid, extended=False)
return WKTElement(self.data, self.srid, self.extended)
[docs]
def as_ewkt(self) -> WKTElement:
if not self.extended and self.srid != -1:
data = f"SRID={self.srid};" + self.data
return WKTElement(data, extended=True)
return WKTElement(self.data, self.srid, self.extended)
class DynamicWKTElement(WKTElement):
"""This is a subclass of ``WKTElement`` that allows dynamic attributes.
It is useful when you need to add attributes dynamically to the object.
"""
__slots__ = ("__dict__", "__weakref__")
[docs]
class WKBElement(_SpatialElement):
"""Instances of this class wrap a WKB or EWKB value.
Geometry values read from the database are converted to instances of this
type. In most cases you won't need to create ``WKBElement`` instances
yourself.
If ``extended`` is ``True`` and ``srid`` is ``-1`` at construction time
then the SRID will be read from the EWKB data.
Note: you can create ``WKBElement`` objects from Shapely geometries
using the :func:`geoalchemy2.shape.from_shape` function.
Note::
This class uses ``__slots__`` to restrict its attributes and improve memory efficiency by
preventing the creation of a dynamic ``__dict__`` for each instance.
If you require dynamic attributes or support for weak references, use the
``DynamicWKBElement`` subclass, which provides these capabilities.
"""
__slots__ = ()
geom_from: str = "ST_GeomFromWKB"
geom_from_extended_version: str = "ST_GeomFromEWKB"
def __init__(
self, data: str | bytes | memoryview, srid: int = -1, extended: Optional[bool] = None
) -> None:
if srid == -1 or extended is None or extended:
# read srid from the EWKB
#
# WKB struct {
# byte byteOrder;
# uint32 wkbType;
# uint32 SRID;
# struct geometry;
# }
# byteOrder enum {
# WKB_XDR = 0, // Most Significant Byte First
# WKB_NDR = 1, // Least Significant Byte First
# }
# See https://trac.osgeo.org/postgis/browser/branches/3.0/doc/ZMSgeoms.txt
# for more details about WKB/EWKB specifications.
if isinstance(data, str):
# SpatiaLite case
# assume that the string is an hex value
header = binascii.unhexlify(data[:18])
else:
header = data[:9]
byte_order, wkb_type, wkb_srid = header[0], header[1:5], header[5:]
byte_order_marker = "<I" if byte_order else ">I"
wkb_type_int = (
int(struct.unpack(byte_order_marker, wkb_type)[0]) if len(wkb_type) == 4 else 0
)
if extended is None:
if not wkb_type_int:
extended = False
else:
extended = extended or bool(wkb_type_int & 536870912) # Check SRID bit
if extended and srid == -1:
wkb_srid = struct.unpack(byte_order_marker, wkb_srid)[0]
srid = int(wkb_srid)
_SpatialElement.__init__(self, data, srid, extended)
@staticmethod
def _wkb_to_hex(data: Union[str, bytes, memoryview]) -> str:
"""Convert WKB to hex string."""
if isinstance(data, str):
# SpatiaLite case
return data.lower()
return str(binascii.hexlify(data), encoding="utf-8").lower()
@property
def desc(self) -> str:
"""This element's description string."""
return self._wkb_to_hex(self.data)
@staticmethod
def _data_from_desc(desc) -> bytes:
desc = desc.encode(encoding="utf-8")
return binascii.unhexlify(desc)
[docs]
def as_wkb(self) -> WKBElement:
if self.extended:
if isinstance(self.data, str):
# SpatiaLite case
# assume that the string is an hex value
is_hex = True
header = binascii.unhexlify(self.data[:10])
byte_order, wkb_type = header[0], header[1:5]
else:
is_hex = False
byte_order, wkb_type = self.data[0], self.data[1:5]
byte_order_marker = "<I" if byte_order else ">I"
wkb_type_int = (
int(struct.unpack(byte_order_marker, wkb_type)[0]) if len(wkb_type) == 4 else 0
)
wkb_type_int &= 3758096383 # Set SRID bit to 0 and keep all other bits
if is_hex:
wkb_type_hex = binascii.hexlify(
wkb_type_int.to_bytes(4, "little" if byte_order else "big")
)
data = self.data[:2] + wkb_type_hex.decode("ascii") + self.data[18:]
else:
buffer = bytearray()
buffer.extend(self.data[:1])
buffer.extend(struct.pack(byte_order_marker, wkb_type_int))
buffer.extend(self.data[9:])
data = memoryview(buffer)
return WKBElement(data, self.srid, extended=False)
return WKBElement(self.data, self.srid)
[docs]
def as_ewkb(self) -> WKBElement:
if not self.extended and self.srid != -1:
if isinstance(self.data, str):
# SpatiaLite case
# assume that the string is an hex value
header = binascii.unhexlify(self.data[:10])
byte_order, wkb_type = header[0], header[1:5]
else:
byte_order, wkb_type = self.data[0], self.data[1:5]
byte_order_marker = "<I" if byte_order else ">I"
wkb_type_int = int(
struct.unpack(byte_order_marker, wkb_type)[0] if len(wkb_type) == 4 else 0
)
wkb_type_int |= 536870912 # Set SRID bit to 1 and keep all other bits
data: str | memoryview
if isinstance(self.data, str):
wkb_type_hex = binascii.hexlify(
wkb_type_int.to_bytes(4, "little" if byte_order else "big")
)
wkb_srid_hex = binascii.hexlify(
self.srid.to_bytes(4, "little" if byte_order else "big")
)
data = (
self.data[:2]
+ wkb_type_hex.decode("ascii")
+ wkb_srid_hex.decode("ascii")
+ self.data[10:]
)
else:
buffer = bytearray()
buffer.extend(self.data[:1])
buffer.extend(struct.pack(byte_order_marker, wkb_type_int))
buffer.extend(struct.pack(byte_order_marker, self.srid))
buffer.extend(self.data[5:])
data = memoryview(buffer)
return WKBElement(data, self.srid, extended=True)
return WKBElement(self.data, self.srid)
class DynamicWKBElement(WKBElement):
"""This is a subclass of ``WKBElement`` that allows dynamic attributes.
It is useful when you need to add attributes dynamically to the object.
"""
__slots__ = ("__dict__", "__weakref__")
[docs]
class RasterElement(_SpatialElement):
"""Instances of this class wrap a ``raster`` value.
Raster values read from the database are converted to instances of this type. In
most cases you won't need to create ``RasterElement`` instances yourself.
Note::
This class uses ``__slots__`` to restrict its attributes and improve memory efficiency by
preventing the creation of a dynamic ``__dict__`` for each instance.
If you require dynamic attributes or support for weak references, use the
``DynamicRasterElement`` subclass, which provides these capabilities.
"""
__slots__ = ()
geom_from_extended_version: str = "raster"
def __init__(self, data: Union[str, bytes, memoryview]) -> None:
# read srid from the WKB (binary or hexadecimal format)
# The WKB structure is documented in the file
# raster/doc/RFC2-WellKnownBinaryFormat of the PostGIS sources.
bin_data: Union[str, bytes, memoryview]
try:
bin_data = binascii.unhexlify(data[:114])
except BinasciiError:
bin_data = data
data = str(binascii.hexlify(data).decode(encoding="utf-8")) # type: ignore
byte_order = bin_data[0]
srid = bin_data[53:57]
srid = struct.unpack("<I" if byte_order else ">I", srid)[0] # type: ignore
_SpatialElement.__init__(self, data, int(srid), True)
@property
def desc(self) -> str:
"""This element's description string."""
return self.data
@staticmethod
def _data_from_desc(desc):
return desc
class DynamicRasterElement(RasterElement):
"""This is a subclass of ``RasterElement`` that allows dynamic attributes.
It is useful when you need to add attributes dynamically to the object.
"""
__slots__ = ("__dict__", "__weakref__")
[docs]
class CompositeElement(FunctionElement):
"""Instances of this class wrap a Postgres composite type."""
__slots__ = ("name", "type")
inherit_cache: bool = False
"""The cache is disabled for this class."""
def __init__(self, base, field, type_) -> None:
self.name = field
self.type = to_instance(type_)
super(CompositeElement, self).__init__(base)
@compiles(CompositeElement)
def _compile_pgelem(expr, compiler, **kw) -> str:
return "(%s).%s" % (compiler.process(expr.clauses, **kw), expr.name)
__all__: List[str] = [
"_SpatialElement",
"CompositeElement",
"RasterElement",
"WKBElement",
"WKTElement",
"DynamicRasterElement",
"DynamicWKBElement",
"DynamicWKTElement",
]
def __dir__() -> List[str]:
return __all__