Note
Click here to download the full example code
Decipher RasterΒΆ
The RasterElement objects store the Raster data in WKB form. When using rasters it is usually better to convert them into TIFF, PNG, JPEG or whatever. Nevertheless, it is possible to decipher the WKB to get a 2D list of values. This example uses SQLAlchemy ORM queries.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | import binascii import struct import pytest from sqlalchemy import Column from sqlalchemy import Integer from sqlalchemy import MetaData from sqlalchemy.ext.declarative import declarative_base from geoalchemy2 import Raster from geoalchemy2 import WKTElement # Tests imports from tests import test_only_with_dialects metadata = MetaData() Base = declarative_base(metadata=metadata) class Ocean(Base): __tablename__ = 'ocean' id = Column(Integer, primary_key=True) rast = Column(Raster) def __init__(self, rast): self.rast = rast def _format_e(endianness, struct_format): return _ENDIANNESS[endianness] + struct_format def wkbHeader(raw): # Function to decipher the WKB header # See http://trac.osgeo.org/postgis/browser/trunk/raster/doc/RFC2-WellKnownBinaryFormat header = {} header['endianness'] = struct.unpack('b', raw[0:1])[0] e = header['endianness'] header['version'] = struct.unpack(_format_e(e, 'H'), raw[1:3])[0] header['nbands'] = struct.unpack(_format_e(e, 'H'), raw[3:5])[0] header['scaleX'] = struct.unpack(_format_e(e, 'd'), raw[5:13])[0] header['scaleY'] = struct.unpack(_format_e(e, 'd'), raw[13:21])[0] header['ipX'] = struct.unpack(_format_e(e, 'd'), raw[21:29])[0] header['ipY'] = struct.unpack(_format_e(e, 'd'), raw[29:37])[0] header['skewX'] = struct.unpack(_format_e(e, 'd'), raw[37:45])[0] header['skewY'] = struct.unpack(_format_e(e, 'd'), raw[45:53])[0] header['srid'] = struct.unpack(_format_e(e, 'i'), raw[53:57])[0] header['width'] = struct.unpack(_format_e(e, 'H'), raw[57:59])[0] header['height'] = struct.unpack(_format_e(e, 'H'), raw[59:61])[0] return header def read_band(data, offset, pixtype, height, width, endianness=1): ptype, _, psize = _PTYPE[pixtype] pix_data = data[offset + 1: offset + 1 + width * height * psize] band = [ [ struct.unpack(_format_e(endianness, ptype), pix_data[ (i * width + j) * psize: (i * width + j + 1) * psize ])[0] for j in range(width) ] for i in range(height) ] return band def read_band_numpy(data, offset, pixtype, height, width, endianness=1): import numpy as np # noqa _, dtype, psize = _PTYPE[pixtype] dt = np.dtype(dtype) dt = dt.newbyteorder(_ENDIANNESS[endianness]) band = np.frombuffer(data, dtype=dtype, count=height * width, offset=offset + 1) band = (np.reshape(band, ((height, width)))) return band _PTYPE = { 0: ['?', '?', 1], 1: ['B', 'B', 1], 2: ['B', 'B', 1], 3: ['b', 'b', 1], 4: ['B', 'B', 1], 5: ['h', 'i2', 2], 6: ['H', 'u2', 2], 7: ['i', 'i4', 4], 8: ['I', 'u4', 4], 10: ['f', 'f4', 4], 11: ['d', 'f8', 8], } _ENDIANNESS = { 0: '>', 1: '<', } def wkbImage(raster_data, use_numpy=False): """Function to decipher the WKB raster data""" # Get binary data raw = binascii.unhexlify(raster_data) # Read header h = wkbHeader(bytes(raw)) e = h["endianness"] img = [] # array to store image bands offset = 61 # header raw length in bytes band_size = h['width'] * h['height'] # number of pixels in each band for i in range(h['nbands']): # Determine pixtype for this band pixtype = struct.unpack(_format_e(e, 'b'), raw[offset: offset + 1])[0] - 64 # Read data with either pure Python or Numpy if use_numpy: band = read_band_numpy( raw, offset, pixtype, h['height'], h['width']) else: band = read_band( raw, offset, pixtype, h['height'], h['width']) # Store the result img.append(band) offset = offset + 2 + band_size return img @test_only_with_dialects("postgresql") class TestDecipherRaster(): @pytest.mark.parametrize("pixel_type", [ '1BB', '2BUI', '4BUI', '8BSI', '8BUI', '16BSI', '16BUI', '32BSI', '32BUI', '32BF', '64BF' ]) def test_decipher_raster(self, pixel_type, session, conn): """Create a raster and decipher it""" metadata.drop_all(conn, checkfirst=True) metadata.create_all(conn) # Create a new raster polygon = WKTElement('POLYGON((0 0,1 1,0 1,0 0))', srid=4326) o = Ocean(polygon.ST_AsRaster(5, 6, pixel_type)) session.add(o) session.flush() # Decipher data from each raster image = wkbImage(o.rast.data) # Define expected result expected = [ [0, 1, 1, 1, 1], [1, 1, 1, 1, 1], [0, 1, 1, 1, 0], [0, 1, 1, 0, 0], [0, 1, 0, 0, 0], [0, 0, 0, 0, 0] ] # Check results band = image[0] assert band == expected |