Source code for geoalchemy2.admin.dialects.geopackage

"""This module defines specific functions for GeoPackage dialect.

See GeoPackage specifications here: http://www.geopackage.org/spec/
"""

import re

from sqlalchemy import text
from sqlalchemy.dialects import registry
from sqlalchemy.dialects.sqlite.pysqlite import SQLiteDialect_pysqlite
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql import func
from sqlalchemy.sql import select

from geoalchemy2 import functions
from geoalchemy2.admin.dialects.common import _check_spatial_type
from geoalchemy2.admin.dialects.common import _format_select_args
from geoalchemy2.admin.dialects.common import _spatial_idx_name
from geoalchemy2.admin.dialects.common import setup_create_drop
from geoalchemy2.admin.dialects.sqlite import _SQLITE_FUNCTIONS
from geoalchemy2.admin.dialects.sqlite import get_col_dim
from geoalchemy2.admin.dialects.sqlite import load_spatialite_driver
from geoalchemy2.types import Geography
from geoalchemy2.types import Geometry
from geoalchemy2.types import _DummyGeometry


[docs] class GeoPackageDialect(SQLiteDialect_pysqlite): """Define a specific dialect for GeoPackage.""" name = "geopackage" driver = "gpkg" supports_statement_cache = True """Enable caching for GeoPackage dialect."""
registry.register("gpkg", "geoalchemy2.admin.dialects.geopackage", "GeoPackageDialect")
[docs] def load_geopackage_driver(dbapi_conn, *args): """Load SpatiaLite extension in GeoPackage connection and set VirtualGpkg and Amphibious modes. .. Warning:: The path to the SpatiaLite module should be set in the `SPATIALITE_LIBRARY_PATH` environment variable. Args: dbapi_conn: The DBAPI connection. """ load_spatialite_driver(dbapi_conn, *args) dbapi_conn.execute("SELECT AutoGpkgStart();") dbapi_conn.execute("SELECT EnableGpkgAmphibiousMode();")
[docs] def init_geopackage(dbapi_conn, *args): """Initialize GeoPackage tables. Args: dbapi_conn: The DBAPI connection. .. Warning:: No EPSG SRID is loaded in the `gpkg_spatial_ref_sys` table after initialization but it is possible to load other EPSG SRIDs afterwards using the `gpkgInsertEpsgSRID(srid)`. Nevertheless, SRIDs of newly created tables are automatically added. """ if not dbapi_conn.execute("SELECT CheckGeoPackageMetaData();").fetchone()[0]: # This only works on the main database dbapi_conn.execute("SELECT gpkgCreateBaseTables();")
[docs] def load_spatialite_gpkg(*args, **kwargs): """Load SpatiaLite extension in GeoPackage and initialize internal tables. See :func:`geoalchemy2.admin.dialects.geopackage.load_geopackage_driver` and :func:`geoalchemy2.admin.dialects.geopackage.init_geopackage` functions for details about arguments. """ load_geopackage_driver(*args) init_geopackage(*args, **kwargs)
def _get_spatialite_attrs(bind, table_name, col_name): attrs = bind.execute( text( """SELECT A.geometry_type_name, A.srs_id, A.z, A.m, IFNULL(B.has_index, 0) AS has_index FROM gpkg_geometry_columns AS A LEFT JOIN ( SELECT table_name, column_name, COUNT(*) AS has_index FROM gpkg_extensions WHERE LOWER(table_name) = LOWER(:table_name) AND column_name = :column_name AND extension_name = 'gpkg_rtree_index' ) AS B ON LOWER(A.table_name) = LOWER(B.table_name) AND A.column_name = B.column_name WHERE LOWER(A.table_name) = LOWER(:table_name) AND A.column_name = :column_name; """ ).bindparams(table_name=table_name, column_name=col_name) ).fetchone() if attrs is None: # If the column is not registered as a spatial column we ignore it return None geometry_type, srid, has_z, has_m, has_index = attrs coord_dimension = "XY" if has_z: coord_dimension += "Z" if has_m: coord_dimension += "M" col_attributes = geometry_type, coord_dimension, srid, has_index return col_attributes
[docs] def _setup_dummy_type(table, gis_cols): """Setup dummy type for new Geometry columns so they can be updated later.""" for col in gis_cols: # Add dummy columns with GEOMETRY type type_str = re.fullmatch("(.+?)[ZMzm]*", col.type.geometry_type).group(1) col._actual_type = col.type col.type = _DummyGeometry(geometry_type=type_str) table.columns = table.info["_saved_columns"]
[docs] def create_spatial_index(bind, table, col): """Create spatial index on the given column.""" stmt = select(*_format_select_args(func.gpkgAddSpatialIndex(table.name, col.name))) stmt = stmt.execution_options(autocommit=True) bind.execute(stmt)
[docs] def disable_spatial_index(bind, table, col): """Disable spatial indexes if present.""" for i in ["", "_node", "_parent", "_rowid"]: bind.execute( text( "DROP TABLE IF EXISTS rtree_{}_{}{};".format( table.name, col.name, i, ) ) ) bind.execute( text( """DELETE FROM gpkg_extensions WHERE LOWER(table_name) = LOWER(:table_name) AND column_name = :column_name AND extension_name = 'gpkg_rtree_index';""" ).bindparams( table_name=table.name, column_name=col.name, ) )
[docs] def reflect_geometry_column(inspector, table, column_info): """Reflect a column of type Geometry with GeoPackage dialect.""" # Get geometry type, SRID and spatial index from the SpatiaLite metadata if not isinstance(column_info.get("type"), Geometry): return col_attributes = _get_spatialite_attrs(inspector.bind, table.name, column_info["name"]) if col_attributes is not None: geometry_type, coord_dimension, srid, spatial_index = col_attributes coord_dimension = { "XY": 2, "XYZ": 3, "XYM": 3, "XYZM": 4, }.get(coord_dimension, coord_dimension) # Set attributes column_info["type"].geometry_type = geometry_type column_info["type"].dimension = coord_dimension column_info["type"].srid = srid column_info["type"].spatial_index = bool(spatial_index) # Spatial indexes are not automatically reflected with GeoPackage dialect column_info["type"]._spatial_index_reflected = False
[docs] def before_create(table, bind, **kw): """Handle spatial indexes during the before_create event.""" dialect, gis_cols, regular_cols = setup_create_drop(table, bind) # Remove the spatial indexes from the table metadata because they should not be # created during the table.create() step since the associated columns do not exist # at this time. table.info["_after_create_indexes"] = [] current_indexes = set(table.indexes) for idx in current_indexes: for col in table.info["_saved_columns"]: if _check_spatial_type(col.type, Geometry, dialect) and col in idx.columns.values(): table.indexes.remove(idx) if idx.name != _spatial_idx_name(table.name, col.name) or not getattr( col.type, "spatial_index", False ): table.info["_after_create_indexes"].append(idx) if len(gis_cols) > 1: raise ValueError("Only one geometry column is allowed for a table stored in a GeoPackage.") elif len(gis_cols) == 1: col = gis_cols[0] srid = col.type.srid if col.type.geometry_type is None: col.type.geometry_type = "GEOMETRY" # Add the SRID of the table in 'gpkg_spatial_ref_sys' if this table exists if not bind.execute( text("SELECT COUNT(*) FROM gpkg_spatial_ref_sys WHERE srs_id = :srid;").bindparams( srid=srid ) ).scalar(): bind.execute(text("SELECT gpkgInsertEpsgSRID(:srid)").bindparams(srid=srid)) _setup_dummy_type(table, gis_cols)
[docs] def after_create(table, bind, **kw): """Handle spatial indexes during the after_create event.""" dialect = bind.dialect for col in table.columns: # Add the managed Geometry columns with gpkgAddGeometryColumn() if _check_spatial_type(col.type, Geometry, dialect): col.type = col._actual_type del col._actual_type dimension = get_col_dim(col) has_z = "Z" in dimension has_m = "M" in dimension bind.execute( text( """INSERT INTO gpkg_contents VALUES ( :table_name, 'features', :table_name, "", strftime('%Y-%m-%dT%H:%M:%fZ', CURRENT_TIMESTAMP), NULL, NULL, NULL, NULL, :srid );""" ).bindparams( table_name=table.name, srid=col.type.srid, ) ) bind.execute( text( """INSERT INTO gpkg_geometry_columns VALUES (:table_name, :column_name, :geometry_type, :srid, :has_z, :has_m);""" ).bindparams( table_name=table.name, column_name=col.name, geometry_type=col.type.geometry_type, srid=col.type.srid, has_z=has_z, has_m=has_m, ) ) stmt = select(*_format_select_args(func.gpkgAddGeometryTriggers(table.name, col.name))) stmt = stmt.execution_options(autocommit=True) bind.execute(stmt) for col in table.columns: # Add spatial indexes for the Geometry and Geography columns # TODO: Check that the Geography type makes sense here if ( _check_spatial_type(col.type, (Geometry, Geography), dialect) and col.type.spatial_index is True ): create_spatial_index(bind, table, col) for idx in table.info.pop("_after_create_indexes"): table.indexes.add(idx) idx.create(bind=bind)
[docs] def before_drop(table, bind, **kw): """Handle spatial indexes during the before_drop event.""" dialect, gis_cols, regular_cols = setup_create_drop(table, bind) for col in gis_cols: # Disable spatial indexes if present # TODO: This is useless but if we remove it then the disable_spatial_index should be # tested separately disable_spatial_index(bind, table, col) # Remove metadata from internal tables # (this is equivalent to DiscardGeometryColumn but for GeoPackage) bind.execute( text( """DELETE FROM gpkg_extensions WHERE LOWER(table_name) = LOWER(:table_name) AND column_name = :column_name;""" ).bindparams( table_name=table.name, column_name=col.name, ) ) bind.execute( text( """DELETE FROM gpkg_geometry_columns WHERE LOWER(table_name) = LOWER(:table_name) AND column_name = :column_name;""" ).bindparams( table_name=table.name, column_name=col.name, ) ) bind.execute( text( """DELETE FROM gpkg_contents WHERE LOWER(table_name) = LOWER(:table_name);""" ).bindparams(table_name=table.name) )
[docs] def after_drop(table, bind, **kw): """Handle spatial indexes during the after_drop event.""" table.columns = table.info.pop("_saved_columns")
def _compiles_gpkg(cls, fn): def _compile_gpkg(element, compiler, **kw): return "{}({})".format(fn, compiler.process(element.clauses, **kw)) compiles(getattr(functions, cls), "geopackage")(_compile_gpkg)
[docs] def register_gpkg_mapping(mapping): """Register compilation mappings for the given functions. Args: mapping: Should have the following form:: { "function_name_1": "gpkg_function_name_1", "function_name_2": "gpkg_function_name_2", ... } """ for cls, fn in mapping.items(): _compiles_gpkg(cls, fn)
register_gpkg_mapping(_SQLITE_FUNCTIONS)
[docs] def create_spatial_ref_sys_view(bind): """Create the `spatial_ref_sys` view from the `gpkg_spatial_ref_sys` table. .. Note:: This is usually only needed to use the `ST_Transform` function on GeoPackage data because this function, when used with SpatiaLite, requires the `spatial_ref_sys` table. """ bind.execute( text( """CREATE VIEW spatial_ref_sys AS SELECT srs_id AS srid, organization AS auth_name, organization_coordsys_id AS auth_srid, definition AS srtext FROM gpkg_spatial_ref_sys;""" ) )