Note
Go to the end to download the full example code.
Decipher RasterΒΆ
The RasterElement objects store the Raster data in WKB form. When using rasters it is usually better to convert them into TIFF, PNG, JPEG or whatever. Nevertheless, it is possible to decipher the WKB to get a 2D list of values. This example uses SQLAlchemy ORM queries.
11 import binascii
12 import struct
13
14 import pytest
15 from sqlalchemy import Column
16 from sqlalchemy import Integer
17 from sqlalchemy import MetaData
18 from sqlalchemy.orm import declarative_base
19
20 from geoalchemy2 import Raster
21 from geoalchemy2 import WKTElement
22
23 # Tests imports
24 from tests import test_only_with_dialects
25
26 metadata = MetaData()
27 Base = declarative_base(metadata=metadata)
28
29
30 class Ocean(Base): # type: ignore
31 __tablename__ = "ocean"
32 id = Column(Integer, primary_key=True)
33 rast = Column(Raster)
34
35 def __init__(self, rast):
36 self.rast = rast
37
38
39 def _format_e(endianness, struct_format):
40 return _ENDIANNESS[endianness] + struct_format
41
42
43 def wkbHeader(raw):
44 # Function to decipher the WKB header
45 # See http://trac.osgeo.org/postgis/browser/trunk/raster/doc/RFC2-WellKnownBinaryFormat
46
47 header = {}
48
49 header["endianness"] = struct.unpack("b", raw[0:1])[0]
50
51 e = header["endianness"]
52 header["version"] = struct.unpack(_format_e(e, "H"), raw[1:3])[0]
53 header["nbands"] = struct.unpack(_format_e(e, "H"), raw[3:5])[0]
54 header["scaleX"] = struct.unpack(_format_e(e, "d"), raw[5:13])[0]
55 header["scaleY"] = struct.unpack(_format_e(e, "d"), raw[13:21])[0]
56 header["ipX"] = struct.unpack(_format_e(e, "d"), raw[21:29])[0]
57 header["ipY"] = struct.unpack(_format_e(e, "d"), raw[29:37])[0]
58 header["skewX"] = struct.unpack(_format_e(e, "d"), raw[37:45])[0]
59 header["skewY"] = struct.unpack(_format_e(e, "d"), raw[45:53])[0]
60 header["srid"] = struct.unpack(_format_e(e, "i"), raw[53:57])[0]
61 header["width"] = struct.unpack(_format_e(e, "H"), raw[57:59])[0]
62 header["height"] = struct.unpack(_format_e(e, "H"), raw[59:61])[0]
63
64 return header
65
66
67 def read_band(data, offset, pixtype, height, width, endianness=1):
68 ptype, _, psize = _PTYPE[pixtype]
69 pix_data = data[offset + 1 : offset + 1 + width * height * psize]
70 band = [
71 [
72 struct.unpack(
73 _format_e(endianness, ptype),
74 pix_data[(i * width + j) * psize : (i * width + j + 1) * psize],
75 )[0]
76 for j in range(width)
77 ]
78 for i in range(height)
79 ]
80 return band
81
82
83 def read_band_numpy(data, offset, pixtype, height, width, endianness=1):
84 import numpy as np # noqa
85
86 _, dtype, psize = _PTYPE[pixtype]
87 dt = np.dtype(dtype)
88 dt = dt.newbyteorder(_ENDIANNESS[endianness])
89 band = np.frombuffer(data, dtype=dtype, count=height * width, offset=offset + 1)
90 band = np.reshape(band, ((height, width)))
91 return band
92
93
94 _PTYPE = {
95 0: ["?", "?", 1],
96 1: ["B", "B", 1],
97 2: ["B", "B", 1],
98 3: ["b", "b", 1],
99 4: ["B", "B", 1],
100 5: ["h", "i2", 2],
101 6: ["H", "u2", 2],
102 7: ["i", "i4", 4],
103 8: ["I", "u4", 4],
104 10: ["f", "f4", 4],
105 11: ["d", "f8", 8],
106 }
107
108 _ENDIANNESS = {
109 0: ">",
110 1: "<",
111 }
112
113
114 def wkbImage(raster_data, use_numpy=False):
115 """Function to decipher the WKB raster data"""
116
117 # Get binary data
118 raw = binascii.unhexlify(raster_data)
119
120 # Read header
121 h = wkbHeader(bytes(raw))
122 e = h["endianness"]
123
124 img = [] # array to store image bands
125 offset = 61 # header raw length in bytes
126 band_size = h["width"] * h["height"] # number of pixels in each band
127
128 for i in range(h["nbands"]):
129 # Determine pixtype for this band
130 pixtype = struct.unpack(_format_e(e, "b"), raw[offset : offset + 1])[0] - 64
131
132 # Read data with either pure Python or Numpy
133 if use_numpy:
134 band = read_band_numpy(raw, offset, pixtype, h["height"], h["width"])
135 else:
136 band = read_band(raw, offset, pixtype, h["height"], h["width"])
137
138 # Store the result
139 img.append(band)
140 offset = offset + 2 + band_size
141
142 return img
143
144
145 @test_only_with_dialects("postgresql")
146 class TestDecipherRaster:
147 @pytest.mark.parametrize(
148 "pixel_type",
149 [
150 "1BB",
151 "2BUI",
152 "4BUI",
153 "8BSI",
154 "8BUI",
155 "16BSI",
156 "16BUI",
157 "32BSI",
158 "32BUI",
159 "32BF",
160 "64BF",
161 ],
162 )
163 def test_decipher_raster(self, pixel_type, session, conn):
164 """Create a raster and decipher it"""
165 metadata.drop_all(conn, checkfirst=True)
166 metadata.create_all(conn)
167
168 # Create a new raster
169 polygon = WKTElement("POLYGON((0 0,1 1,0 1,0 0))", srid=4326)
170 o = Ocean(polygon.ST_AsRaster(5, 6, pixel_type))
171 session.add(o)
172 session.flush()
173
174 # Decipher data from each raster
175 image = wkbImage(o.rast.data)
176
177 # Define expected result
178 expected = [
179 [0, 1, 1, 1, 1],
180 [1, 1, 1, 1, 1],
181 [0, 1, 1, 1, 0],
182 [0, 1, 1, 0, 0],
183 [0, 1, 0, 0, 0],
184 [0, 0, 0, 0, 0],
185 ]
186
187 # Check results
188 band = image[0]
189 assert band == expected