Note
Go to the end to download the full example code
Decipher RasterΒΆ
The RasterElement objects store the Raster data in WKB form. When using rasters it is usually better to convert them into TIFF, PNG, JPEG or whatever. Nevertheless, it is possible to decipher the WKB to get a 2D list of values. This example uses SQLAlchemy ORM queries.
10 import binascii
11 import struct
12
13 import pytest
14 from sqlalchemy import Column
15 from sqlalchemy import Integer
16 from sqlalchemy import MetaData
17 from sqlalchemy.orm import declarative_base
18
19 from geoalchemy2 import Raster
20 from geoalchemy2 import WKTElement
21
22 # Tests imports
23 from tests import test_only_with_dialects
24
25 metadata = MetaData()
26 Base = declarative_base(metadata=metadata)
27
28
29 class Ocean(Base): # type: ignore
30 __tablename__ = "ocean"
31 id = Column(Integer, primary_key=True)
32 rast = Column(Raster)
33
34 def __init__(self, rast):
35 self.rast = rast
36
37
38 def _format_e(endianness, struct_format):
39 return _ENDIANNESS[endianness] + struct_format
40
41
42 def wkbHeader(raw):
43 # Function to decipher the WKB header
44 # See http://trac.osgeo.org/postgis/browser/trunk/raster/doc/RFC2-WellKnownBinaryFormat
45
46 header = {}
47
48 header["endianness"] = struct.unpack("b", raw[0:1])[0]
49
50 e = header["endianness"]
51 header["version"] = struct.unpack(_format_e(e, "H"), raw[1:3])[0]
52 header["nbands"] = struct.unpack(_format_e(e, "H"), raw[3:5])[0]
53 header["scaleX"] = struct.unpack(_format_e(e, "d"), raw[5:13])[0]
54 header["scaleY"] = struct.unpack(_format_e(e, "d"), raw[13:21])[0]
55 header["ipX"] = struct.unpack(_format_e(e, "d"), raw[21:29])[0]
56 header["ipY"] = struct.unpack(_format_e(e, "d"), raw[29:37])[0]
57 header["skewX"] = struct.unpack(_format_e(e, "d"), raw[37:45])[0]
58 header["skewY"] = struct.unpack(_format_e(e, "d"), raw[45:53])[0]
59 header["srid"] = struct.unpack(_format_e(e, "i"), raw[53:57])[0]
60 header["width"] = struct.unpack(_format_e(e, "H"), raw[57:59])[0]
61 header["height"] = struct.unpack(_format_e(e, "H"), raw[59:61])[0]
62
63 return header
64
65
66 def read_band(data, offset, pixtype, height, width, endianness=1):
67 ptype, _, psize = _PTYPE[pixtype]
68 pix_data = data[offset + 1 : offset + 1 + width * height * psize]
69 band = [
70 [
71 struct.unpack(
72 _format_e(endianness, ptype),
73 pix_data[(i * width + j) * psize : (i * width + j + 1) * psize],
74 )[0]
75 for j in range(width)
76 ]
77 for i in range(height)
78 ]
79 return band
80
81
82 def read_band_numpy(data, offset, pixtype, height, width, endianness=1):
83 import numpy as np # noqa
84
85 _, dtype, psize = _PTYPE[pixtype]
86 dt = np.dtype(dtype)
87 dt = dt.newbyteorder(_ENDIANNESS[endianness])
88 band = np.frombuffer(data, dtype=dtype, count=height * width, offset=offset + 1)
89 band = np.reshape(band, ((height, width)))
90 return band
91
92
93 _PTYPE = {
94 0: ["?", "?", 1],
95 1: ["B", "B", 1],
96 2: ["B", "B", 1],
97 3: ["b", "b", 1],
98 4: ["B", "B", 1],
99 5: ["h", "i2", 2],
100 6: ["H", "u2", 2],
101 7: ["i", "i4", 4],
102 8: ["I", "u4", 4],
103 10: ["f", "f4", 4],
104 11: ["d", "f8", 8],
105 }
106
107 _ENDIANNESS = {
108 0: ">",
109 1: "<",
110 }
111
112
113 def wkbImage(raster_data, use_numpy=False):
114 """Function to decipher the WKB raster data"""
115
116 # Get binary data
117 raw = binascii.unhexlify(raster_data)
118
119 # Read header
120 h = wkbHeader(bytes(raw))
121 e = h["endianness"]
122
123 img = [] # array to store image bands
124 offset = 61 # header raw length in bytes
125 band_size = h["width"] * h["height"] # number of pixels in each band
126
127 for i in range(h["nbands"]):
128 # Determine pixtype for this band
129 pixtype = struct.unpack(_format_e(e, "b"), raw[offset : offset + 1])[0] - 64
130
131 # Read data with either pure Python or Numpy
132 if use_numpy:
133 band = read_band_numpy(raw, offset, pixtype, h["height"], h["width"])
134 else:
135 band = read_band(raw, offset, pixtype, h["height"], h["width"])
136
137 # Store the result
138 img.append(band)
139 offset = offset + 2 + band_size
140
141 return img
142
143
144 @test_only_with_dialects("postgresql")
145 class TestDecipherRaster:
146 @pytest.mark.parametrize(
147 "pixel_type",
148 [
149 "1BB",
150 "2BUI",
151 "4BUI",
152 "8BSI",
153 "8BUI",
154 "16BSI",
155 "16BUI",
156 "32BSI",
157 "32BUI",
158 "32BF",
159 "64BF",
160 ],
161 )
162 def test_decipher_raster(self, pixel_type, session, conn):
163 """Create a raster and decipher it"""
164 metadata.drop_all(conn, checkfirst=True)
165 metadata.create_all(conn)
166
167 # Create a new raster
168 polygon = WKTElement("POLYGON((0 0,1 1,0 1,0 0))", srid=4326)
169 o = Ocean(polygon.ST_AsRaster(5, 6, pixel_type))
170 session.add(o)
171 session.flush()
172
173 # Decipher data from each raster
174 image = wkbImage(o.rast.data)
175
176 # Define expected result
177 expected = [
178 [0, 1, 1, 1, 1],
179 [1, 1, 1, 1, 1],
180 [0, 1, 1, 1, 0],
181 [0, 1, 1, 0, 0],
182 [0, 1, 0, 0, 0],
183 [0, 0, 0, 0, 0],
184 ]
185
186 # Check results
187 band = image[0]
188 assert band == expected