Decipher RasterΒΆ

The RasterElement objects store the Raster data in WKB form. When using rasters it is usually better to convert them into TIFF, PNG, JPEG or whatever. Nevertheless, it is possible to decipher the WKB to get a 2D list of values. This example uses SQLAlchemy ORM queries.

 11 import binascii
 12 import struct
 13
 14 import pytest
 15 from sqlalchemy import Column
 16 from sqlalchemy import Integer
 17 from sqlalchemy import MetaData
 18 from sqlalchemy.orm import declarative_base
 19
 20 from geoalchemy2 import Raster
 21 from geoalchemy2 import WKTElement
 22
 23 # Tests imports
 24 from tests import test_only_with_dialects
 25
 26 metadata = MetaData()
 27 Base = declarative_base(metadata=metadata)
 28
 29
 30 class Ocean(Base):  # type: ignore
 31     __tablename__ = "ocean"
 32     id = Column(Integer, primary_key=True)
 33     rast = Column(Raster)
 34
 35     def __init__(self, rast):
 36         self.rast = rast
 37
 38
 39 def _format_e(endianness, struct_format):
 40     return _ENDIANNESS[endianness] + struct_format
 41
 42
 43 def wkbHeader(raw):
 44     # Function to decipher the WKB header
 45     # See http://trac.osgeo.org/postgis/browser/trunk/raster/doc/RFC2-WellKnownBinaryFormat
 46
 47     header = {}
 48
 49     header["endianness"] = struct.unpack("b", raw[0:1])[0]
 50
 51     e = header["endianness"]
 52     header["version"] = struct.unpack(_format_e(e, "H"), raw[1:3])[0]
 53     header["nbands"] = struct.unpack(_format_e(e, "H"), raw[3:5])[0]
 54     header["scaleX"] = struct.unpack(_format_e(e, "d"), raw[5:13])[0]
 55     header["scaleY"] = struct.unpack(_format_e(e, "d"), raw[13:21])[0]
 56     header["ipX"] = struct.unpack(_format_e(e, "d"), raw[21:29])[0]
 57     header["ipY"] = struct.unpack(_format_e(e, "d"), raw[29:37])[0]
 58     header["skewX"] = struct.unpack(_format_e(e, "d"), raw[37:45])[0]
 59     header["skewY"] = struct.unpack(_format_e(e, "d"), raw[45:53])[0]
 60     header["srid"] = struct.unpack(_format_e(e, "i"), raw[53:57])[0]
 61     header["width"] = struct.unpack(_format_e(e, "H"), raw[57:59])[0]
 62     header["height"] = struct.unpack(_format_e(e, "H"), raw[59:61])[0]
 63
 64     return header
 65
 66
 67 def read_band(data, offset, pixtype, height, width, endianness=1):
 68     ptype, _, psize = _PTYPE[pixtype]
 69     pix_data = data[offset + 1 : offset + 1 + width * height * psize]
 70     band = [
 71         [
 72             struct.unpack(
 73                 _format_e(endianness, ptype),
 74                 pix_data[(i * width + j) * psize : (i * width + j + 1) * psize],
 75             )[0]
 76             for j in range(width)
 77         ]
 78         for i in range(height)
 79     ]
 80     return band
 81
 82
 83 def read_band_numpy(data, offset, pixtype, height, width, endianness=1):
 84     import numpy as np  # noqa
 85
 86     _, dtype, psize = _PTYPE[pixtype]
 87     dt = np.dtype(dtype)
 88     dt = dt.newbyteorder(_ENDIANNESS[endianness])
 89     band = np.frombuffer(data, dtype=dtype, count=height * width, offset=offset + 1)
 90     band = np.reshape(band, ((height, width)))
 91     return band
 92
 93
 94 _PTYPE = {
 95     0: ["?", "?", 1],
 96     1: ["B", "B", 1],
 97     2: ["B", "B", 1],
 98     3: ["b", "b", 1],
 99     4: ["B", "B", 1],
100     5: ["h", "i2", 2],
101     6: ["H", "u2", 2],
102     7: ["i", "i4", 4],
103     8: ["I", "u4", 4],
104     10: ["f", "f4", 4],
105     11: ["d", "f8", 8],
106 }
107
108 _ENDIANNESS = {
109     0: ">",
110     1: "<",
111 }
112
113
114 def wkbImage(raster_data, use_numpy=False):
115     """Function to decipher the WKB raster data"""
116
117     # Get binary data
118     raw = binascii.unhexlify(raster_data)
119
120     # Read header
121     h = wkbHeader(bytes(raw))
122     e = h["endianness"]
123
124     img = []  # array to store image bands
125     offset = 61  # header raw length in bytes
126     band_size = h["width"] * h["height"]  # number of pixels in each band
127
128     for i in range(h["nbands"]):
129         # Determine pixtype for this band
130         pixtype = struct.unpack(_format_e(e, "b"), raw[offset : offset + 1])[0] - 64
131
132         # Read data with either pure Python or Numpy
133         if use_numpy:
134             band = read_band_numpy(raw, offset, pixtype, h["height"], h["width"])
135         else:
136             band = read_band(raw, offset, pixtype, h["height"], h["width"])
137
138         # Store the result
139         img.append(band)
140         offset = offset + 2 + band_size
141
142     return img
143
144
145 @test_only_with_dialects("postgresql")
146 class TestDecipherRaster:
147     @pytest.mark.parametrize(
148         "pixel_type",
149         [
150             "1BB",
151             "2BUI",
152             "4BUI",
153             "8BSI",
154             "8BUI",
155             "16BSI",
156             "16BUI",
157             "32BSI",
158             "32BUI",
159             "32BF",
160             "64BF",
161         ],
162     )
163     def test_decipher_raster(self, pixel_type, session, conn):
164         """Create a raster and decipher it"""
165         metadata.drop_all(conn, checkfirst=True)
166         metadata.create_all(conn)
167
168         # Create a new raster
169         polygon = WKTElement("POLYGON((0 0,1 1,0 1,0 0))", srid=4326)
170         o = Ocean(polygon.ST_AsRaster(5, 6, pixel_type))
171         session.add(o)
172         session.flush()
173
174         # Decipher data from each raster
175         image = wkbImage(o.rast.data)
176
177         # Define expected result
178         expected = [
179             [0, 1, 1, 1, 1],
180             [1, 1, 1, 1, 1],
181             [0, 1, 1, 1, 0],
182             [0, 1, 1, 0, 0],
183             [0, 1, 0, 0, 0],
184             [0, 0, 0, 0, 0],
185         ]
186
187         # Check results
188         band = image[0]
189         assert band == expected

Gallery generated by Sphinx-Gallery