Decipher RasterΒΆ

The RasterElement objects store the Raster data in WKB form. When using rasters it is usually better to convert them into TIFF, PNG, JPEG or whatever. Nevertheless, it is possible to decipher the WKB to get a 2D list of values. This example uses SQLAlchemy ORM queries.

 10 import binascii
 11 import struct
 12
 13 import pytest
 14 from sqlalchemy import Column
 15 from sqlalchemy import Integer
 16 from sqlalchemy import MetaData
 17 from sqlalchemy.orm import declarative_base
 18
 19 from geoalchemy2 import Raster
 20 from geoalchemy2 import WKTElement
 21
 22 # Tests imports
 23 from tests import test_only_with_dialects
 24
 25 metadata = MetaData()
 26 Base = declarative_base(metadata=metadata)
 27
 28
 29 class Ocean(Base):  # type: ignore
 30     __tablename__ = "ocean"
 31     id = Column(Integer, primary_key=True)
 32     rast = Column(Raster)
 33
 34     def __init__(self, rast):
 35         self.rast = rast
 36
 37
 38 def _format_e(endianness, struct_format):
 39     return _ENDIANNESS[endianness] + struct_format
 40
 41
 42 def wkbHeader(raw):
 43     # Function to decipher the WKB header
 44     # See http://trac.osgeo.org/postgis/browser/trunk/raster/doc/RFC2-WellKnownBinaryFormat
 45
 46     header = {}
 47
 48     header["endianness"] = struct.unpack("b", raw[0:1])[0]
 49
 50     e = header["endianness"]
 51     header["version"] = struct.unpack(_format_e(e, "H"), raw[1:3])[0]
 52     header["nbands"] = struct.unpack(_format_e(e, "H"), raw[3:5])[0]
 53     header["scaleX"] = struct.unpack(_format_e(e, "d"), raw[5:13])[0]
 54     header["scaleY"] = struct.unpack(_format_e(e, "d"), raw[13:21])[0]
 55     header["ipX"] = struct.unpack(_format_e(e, "d"), raw[21:29])[0]
 56     header["ipY"] = struct.unpack(_format_e(e, "d"), raw[29:37])[0]
 57     header["skewX"] = struct.unpack(_format_e(e, "d"), raw[37:45])[0]
 58     header["skewY"] = struct.unpack(_format_e(e, "d"), raw[45:53])[0]
 59     header["srid"] = struct.unpack(_format_e(e, "i"), raw[53:57])[0]
 60     header["width"] = struct.unpack(_format_e(e, "H"), raw[57:59])[0]
 61     header["height"] = struct.unpack(_format_e(e, "H"), raw[59:61])[0]
 62
 63     return header
 64
 65
 66 def read_band(data, offset, pixtype, height, width, endianness=1):
 67     ptype, _, psize = _PTYPE[pixtype]
 68     pix_data = data[offset + 1 : offset + 1 + width * height * psize]
 69     band = [
 70         [
 71             struct.unpack(
 72                 _format_e(endianness, ptype),
 73                 pix_data[(i * width + j) * psize : (i * width + j + 1) * psize],
 74             )[0]
 75             for j in range(width)
 76         ]
 77         for i in range(height)
 78     ]
 79     return band
 80
 81
 82 def read_band_numpy(data, offset, pixtype, height, width, endianness=1):
 83     import numpy as np  # noqa
 84
 85     _, dtype, psize = _PTYPE[pixtype]
 86     dt = np.dtype(dtype)
 87     dt = dt.newbyteorder(_ENDIANNESS[endianness])
 88     band = np.frombuffer(data, dtype=dtype, count=height * width, offset=offset + 1)
 89     band = np.reshape(band, ((height, width)))
 90     return band
 91
 92
 93 _PTYPE = {
 94     0: ["?", "?", 1],
 95     1: ["B", "B", 1],
 96     2: ["B", "B", 1],
 97     3: ["b", "b", 1],
 98     4: ["B", "B", 1],
 99     5: ["h", "i2", 2],
100     6: ["H", "u2", 2],
101     7: ["i", "i4", 4],
102     8: ["I", "u4", 4],
103     10: ["f", "f4", 4],
104     11: ["d", "f8", 8],
105 }
106
107 _ENDIANNESS = {
108     0: ">",
109     1: "<",
110 }
111
112
113 def wkbImage(raster_data, use_numpy=False):
114     """Function to decipher the WKB raster data"""
115
116     # Get binary data
117     raw = binascii.unhexlify(raster_data)
118
119     # Read header
120     h = wkbHeader(bytes(raw))
121     e = h["endianness"]
122
123     img = []  # array to store image bands
124     offset = 61  # header raw length in bytes
125     band_size = h["width"] * h["height"]  # number of pixels in each band
126
127     for i in range(h["nbands"]):
128         # Determine pixtype for this band
129         pixtype = struct.unpack(_format_e(e, "b"), raw[offset : offset + 1])[0] - 64
130
131         # Read data with either pure Python or Numpy
132         if use_numpy:
133             band = read_band_numpy(raw, offset, pixtype, h["height"], h["width"])
134         else:
135             band = read_band(raw, offset, pixtype, h["height"], h["width"])
136
137         # Store the result
138         img.append(band)
139         offset = offset + 2 + band_size
140
141     return img
142
143
144 @test_only_with_dialects("postgresql")
145 class TestDecipherRaster:
146     @pytest.mark.parametrize(
147         "pixel_type",
148         [
149             "1BB",
150             "2BUI",
151             "4BUI",
152             "8BSI",
153             "8BUI",
154             "16BSI",
155             "16BUI",
156             "32BSI",
157             "32BUI",
158             "32BF",
159             "64BF",
160         ],
161     )
162     def test_decipher_raster(self, pixel_type, session, conn):
163         """Create a raster and decipher it"""
164         metadata.drop_all(conn, checkfirst=True)
165         metadata.create_all(conn)
166
167         # Create a new raster
168         polygon = WKTElement("POLYGON((0 0,1 1,0 1,0 0))", srid=4326)
169         o = Ocean(polygon.ST_AsRaster(5, 6, pixel_type))
170         session.add(o)
171         session.flush()
172
173         # Decipher data from each raster
174         image = wkbImage(o.rast.data)
175
176         # Define expected result
177         expected = [
178             [0, 1, 1, 1, 1],
179             [1, 1, 1, 1, 1],
180             [0, 1, 1, 1, 0],
181             [0, 1, 1, 0, 0],
182             [0, 1, 0, 0, 0],
183             [0, 0, 0, 0, 0],
184         ]
185
186         # Check results
187         band = image[0]
188         assert band == expected

Gallery generated by Sphinx-Gallery