Source code for geoalchemy2.admin.dialects.mariadb

"""This module defines specific functions for MariaDB dialect."""

from sqlalchemy.ext.compiler import compiles

from geoalchemy2 import functions
from geoalchemy2.admin.dialects.common import compile_bin_literal
from geoalchemy2.admin.dialects.common import unwrap_wkb_constructor_clauses
from geoalchemy2.admin.dialects.mysql import _coerce_ewkb_bind_clause_to_wkb
from geoalchemy2.admin.dialects.mysql import _coerce_ewkb_clause_to_wkb
from geoalchemy2.admin.dialects.mysql import _coerce_known_ewkb_clause_to_wkb
from geoalchemy2.admin.dialects.mysql import _compile_srid_arg
from geoalchemy2.admin.dialects.mysql import _dynamic_ewkb_default_srid
from geoalchemy2.admin.dialects.mysql import _dynamic_ewkb_split_disabled
from geoalchemy2.admin.dialects.mysql import _dynamic_inferred_srid_bind_clauses
from geoalchemy2.admin.dialects.mysql import _ewkb_processor_fixed_srid
from geoalchemy2.admin.dialects.mysql import _get_mysql_dynamic_ewkb_bind_clauses
from geoalchemy2.admin.dialects.mysql import _has_effective_srid_argument
from geoalchemy2.admin.dialects.mysql import _is_bindparam_clause
from geoalchemy2.admin.dialects.mysql import _runtime_ewkb_processor_context
from geoalchemy2.admin.dialects.mysql import after_create  # noqa
from geoalchemy2.admin.dialects.mysql import after_drop  # noqa
from geoalchemy2.admin.dialects.mysql import before_create  # noqa
from geoalchemy2.admin.dialects.mysql import before_drop  # noqa
from geoalchemy2.admin.dialects.mysql import before_execute  # noqa
from geoalchemy2.admin.dialects.mysql import reflect_geometry_column  # noqa
from geoalchemy2.elements import WKBElement
from geoalchemy2.elements import WKTElement


def _cast(param):
    if isinstance(param, memoryview):
        param = param.tobytes()
    if isinstance(param, bytearray):
        param = bytes(param)
    if isinstance(param, bytes):
        param = WKBElement(param)
    if isinstance(param, WKBElement):
        param = param.as_wkb().desc
    return param


[docs] def before_cursor_execute(conn, cursor, statement, parameters, context, executemany, convert=True): # noqa: D417 """Event handler to cast the parameters properly. Args: convert (bool): Trigger the conversion. """ if convert: if isinstance(parameters, (tuple, list)): parameters = tuple(_cast(x) for x in parameters) elif isinstance(parameters, dict): for k in parameters: parameters[k] = _cast(parameters[k]) return statement, parameters
_MARIADB_FUNCTIONS = { "ST_AsEWKB": "ST_AsBinary", } def _compiles_mariadb(cls, fn): def _compile_mariadb(element, compiler, **kw): return f"{fn}({compiler.process(element.clauses, **kw)})" compiles(getattr(functions, cls), "mariadb")(_compile_mariadb)
[docs] def register_mariadb_mapping(mapping): """Register compilation mappings for the given functions. Args: mapping: Should have the following form:: { "function_name_1": "mariadb_function_name_1", "function_name_2": "mariadb_function_name_2", ... } """ for cls, fn in mapping.items(): _compiles_mariadb(cls, fn)
register_mariadb_mapping(_MARIADB_FUNCTIONS) def _compile_GeomFromText_MariaDB(element, compiler, **kw): identifier = "ST_GeomFromText" compiled = compiler.process(element.clauses, **kw) try: clauses = list(element.clauses) data_element = WKTElement(clauses[0].value) srid = data_element.srid if srid <= 0: srid = element.type.srid except Exception: srid = element.type.srid res = f"{identifier}({compiled}, {srid})" if srid > 0 else f"{identifier}({compiled})" return res def _compile_GeomFromWKB_MariaDB(element, compiler, *, coerce_ewkb=False, **kw): identifier = "ST_GeomFromWKB" # Store the SRID clauses = list(element.clauses) if kw.get("literal_binds", False): clauses, _ = unwrap_wkb_constructor_clauses(clauses) inferred_srid = None dynamic_srid_clause = None if coerce_ewkb: original_wkb_clause = clauses[0] split_disabled = _dynamic_ewkb_split_disabled(compiler) or kw.get("literal_binds", False) should_process_at_runtime = ( _is_bindparam_clause(original_wkb_clause) and not kw.get("literal_binds", False) and not split_disabled and ( getattr(original_wkb_clause, "value", None) is None or getattr(original_wkb_clause, "callable", None) is not None ) ) if should_process_at_runtime: has_srid_argument = _has_effective_srid_argument(clauses) dynamic_default_srid = _dynamic_ewkb_default_srid(element, clauses) if dynamic_default_srid is not None: wkb_value_clause, dynamic_srid_clause = _get_mysql_dynamic_ewkb_bind_clauses( original_wkb_clause, compiler, default_srid=dynamic_default_srid, as_hex=True, ) else: fixed_srid = None if not has_srid_argument: fixed_srid = _ewkb_processor_fixed_srid(element, clauses) processor_context = _runtime_ewkb_processor_context( compiler, original_wkb_clause, fixed_srid=fixed_srid, has_srid_argument=has_srid_argument, ) wkb_value_clause = _coerce_ewkb_bind_clause_to_wkb( original_wkb_clause, as_hex=True, fixed_srid=fixed_srid, has_srid_argument=has_srid_argument, processor_context=processor_context, ) else: _, inferred_srid = _coerce_ewkb_clause_to_wkb(original_wkb_clause, as_hex=True) dynamic_wkb_clause, dynamic_srid_clause = _dynamic_inferred_srid_bind_clauses( element, compiler, original_wkb_clause, clauses, inferred_srid=inferred_srid, as_hex=True, split_disabled=split_disabled, ) if dynamic_wkb_clause is not None: wkb_value_clause = dynamic_wkb_clause else: wkb_value_clause, inferred_srid = _coerce_known_ewkb_clause_to_wkb( element, compiler, original_wkb_clause, clauses, as_hex=True, preserve_user_bind=not kw.get("literal_binds", False), ) else: wkb_value_clause = clauses[0] wkb_clause = wkb_value_clause if kw.get("literal_binds", False): wkb_clause = compile_bin_literal(wkb_value_clause) prefix = "unhex(" suffix = ")" compiled = compiler.process(wkb_clause, **kw) compiled_srid = _compile_srid_arg( element, clauses, inferred_srid, compiler, srid_clause=dynamic_srid_clause, **kw, ) if compiled_srid is not None: return f"{identifier}({prefix}{compiled}{suffix}, {compiled_srid})" else: return f"{identifier}({prefix}{compiled}{suffix})" @compiles(functions.ST_GeomFromText, "mariadb") # type: ignore def _MariaDB_ST_GeomFromText(element, compiler, **kw): return _compile_GeomFromText_MariaDB(element, compiler, **kw) @compiles(functions.ST_GeomFromEWKT, "mariadb") # type: ignore def _MariaDB_ST_GeomFromEWKT(element, compiler, **kw): return _compile_GeomFromText_MariaDB(element, compiler, **kw) @compiles(functions.ST_GeomFromWKB, "mariadb") # type: ignore def _MariaDB_ST_GeomFromWKB(element, compiler, **kw): return _compile_GeomFromWKB_MariaDB(element, compiler, **kw) @compiles(functions.ST_GeomFromEWKB, "mariadb") # type: ignore def _MariaDB_ST_GeomFromEWKB(element, compiler, **kw): return _compile_GeomFromWKB_MariaDB(element, compiler, coerce_ewkb=True, **kw)