Skip to content
Snippets Groups Projects
Commit 7ad9279b authored by Roelofsen, Hans's avatar Roelofsen, Hans
Browse files

Opgaande Elementen DSM DTM tool

parent 5a68c310
No related branches found
No related tags found
No related merge requests found
import os
import numpy as np
import rasterio as rio
try:
from filter_powerlines import filter_opgaande_elementen_under_powerlines
from read_write_windows import get_rw_windows
from ahn_read import get_opgaande_elementen, verify_matching_rasters
from hoogtes_dbf import build_heights_table, heights_table_to_file
from flt_header import replace_hdr
except ModuleNotFoundError:
from sample.geslotenheid.ahn_opgaande_elemenden.ahn_read import get_opgaande_elementen, verify_matching_rasters
from sample.geslotenheid.ahn_opgaande_elemenden.read_write_windows import get_rw_windows
from sample.geslotenheid.ahn_opgaande_elemenden.hoogtes_dbf import build_heights_table, heights_table_to_file
def build_opgaande_elementenkaart(**kwargs):
"""
Wrapper function to build opgaande elementenkaart from AHN DSM-DTM
Parameters
----------
kwargs
Returns
-------
"""
dsm_source = kwargs['dsm_src']
dtm_source = kwargs['dtm_src']
dsm = rio.open(dsm_source)
dtm = rio.open(dtm_source)
verify_matching_rasters(dsm, dtm)
windows = get_rw_windows(target_profile=dsm.profile,
sample=kwargs['sample'],
window_width=kwargs['w_width'],
window_height=kwargs['w_height'])
output_profile = dsm.profile
output_profile.update(dtype=np.uint16, nodata=0)
output_profile.update(driver={'tif': "GTiff", 'flt': 'EHdr'}[kwargs['of']])
extension = kwargs['of']
u_values = set()
with rio.open(os.path.join(kwargs['out_dir'], f'{kwargs["out_name"]}.{extension}'), 'w', **output_profile) as dest:
dest.update_tags(**kwargs)
for i, w in enumerate(windows, start=1):
print(f'processing window {i}/{len(windows)}', end="\r")
opg = get_opgaande_elementen(
dsm=dsm,
dtm=dtm,
read_window=w.rio_window,
minimum_difference=kwargs['min_diff'],
uniform_height=kwargs['u_height'],
maaiveld_value=kwargs['mv_value'],
)
dest.write(opg, 1, window=w.rio_window)
u_values.update(np.unique(opg[opg != kwargs['mv_value']]))
heights_table_to_file(os.path.join(kwargs['out_dir'],
f'{kwargs["out_name"]}_heights.dbf'),
build_heights_table(list(u_values)))
if kwargs['of'] == 'flt':
replace_hdr(dir=kwargs['out_dir'],
hdrname=kwargs['out_name'],
output_profile=output_profile)
if __name__ == '__main__':
import sys
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('dsm_src', type=str, help='path to AHN DSM.')
parser.add_argument('dtm_src', type=str, help='path to AHN DTM')
parser.add_argument('out_name', type=str, help='output filename.')
parser.add_argument('out_dir', type=str, help='output directory.')
parser.add_argument('--sample', type=int, help='n sample blocks. Default 0.', default=0)
parser.add_argument('--min_diff', type=int, help='minimum DSM-DTM difference. Default 2.', default=2)
parser.add_argument('--mv_value', type=int, help='Value for maaiveld. Default 0.', default=0)
parser.add_argument('--w_width', type=int, help='Window width in pxls. Default 1000.', default=1000)
parser.add_argument('--w_height', type=int, help='Window height in pxls. Default 500.', default=500)
parser.add_argument('--of', choices=['flt', 'tif'], help='Output format. Default GeoTiff', default='tif')
parser.add_argument('--u_height', type=int, default=0, help='Uniform height for all opgaande elementen.')
parser.add_argument('--filter_powerlines', action='store_true', help='Majority filter below powerlines.')
args = parser.parse_args()
try:
build_opgaande_elementenkaart(**vars(args))
if args.filter_powerlines:
filter_opgaande_elementen_under_powerlines(**vars(args))
except (ValueError, AssertionError) as e:
print(e)
sys.exit(0)
import os
import affine
import rasterio as rio
import numpy as np
from typing import Tuple
import skimage.filters.rank
from skimage.morphology import disk
from rasterio.windows import Window
from rasterio.coords import BoundingBox
from rasterio.fill import fillnodata
def verify_matching_rasters(
raster_a: rio.DatasetReader, raster_b: rio.DatasetReader
) -> Tuple[bool, AssertionError]:
"""
Verify that two geospatial rasters have identical bounds and origin
:param raster_a:
:param raster_b:
:return:
"""
origin = (raster_a.transform.c, raster_a.transform.f) == (
raster_b.transform.c,
raster_b.transform.f,
)
resolution = raster_a.transform.a == raster_b.transform.a
if origin and resolution:
return True
else:
raise ValueError(f"Rasters do not match")
def snap_bounds_outwards(bbox: BoundingBox, interval: int = 1000) -> BoundingBox:
"""
Snap bounds from a GeoDataFrame to nearest 1000m
Snap upwards for right and top
Snap downwards for left and bottom
:param interval:
:return:
"""
return BoundingBox(
left=(bbox.left // interval) * interval,
bottom=(bbox.bottom // interval) * interval,
right=((bbox.right // interval) * interval) + interval,
top=((bbox.top // interval) * interval) + interval,
)
def bbox_to_window(
bbox: BoundingBox, transform: affine.Affine
) -> Tuple[Window, affine.Affine]:
"""
Transform bounds in projected coordinates to a rio Window
:param bbox:
:param transform:
:return:
"""
w = rio.windows.from_bounds(
left=bbox.left,
bottom=bbox.bottom,
right=bbox.right,
top=bbox.top,
transform=transform,
)
t = affine.Affine(
a=transform.a,
b=transform.b,
c=bbox.left,
d=transform.d,
e=transform.e,
f=bbox.top,
)
return w, t
def fill_array(
arr: np.array, nodata, max_search_distance: int = 50, si: int = 2
) -> np.array:
"""
Interpolate NoData values in array
"""
return fillnodata(
image=arr,
mask=np.where(arr == nodata, 0, 1),
max_search_distance=max_search_distance,
smoothing_iterations=si,
)
def smooth_array(
arr: np.array, disk_radius: int = 1
) -> np.array:
"""
Majority filter on binary array
"""
return skimage.filters.rank.majority(
arr, footprint=disk(disk_radius)
)
def verify_dsm_exceeds_dtm(dsm_array: np.ma.array,
dsm_nodata,
dtm_array: np.ma.array,
dtm_nodata) -> bool:
"""
Verify that DSM values are always equal to or greater than DTM. Ignore NoData values
Parameters
----------
dsm_array
dtm_array
Returns
-------
"""
any_nodata = np.array((dsm_array == dsm_nodata) |
(dtm_array == dtm_nodata))
dsm_below_dtm = np.where((dsm_array < dtm_array) &
(any_nodata == False), True, False)
return False if np.any(dsm_below_dtm) else True
def cap_dsm_to_dtm(dsm_array, dsm_nodata,
dtm_array, dtm_nodata) -> np.array:
"""
Set DSM equal to DTM where DSM < DTM
Parameters
----------
dsm_array
dsm_nodata
dtm_array
dtm_nodata
Returns
-------
"""
any_nodata = np.array((dsm_array == dsm_nodata) |
(dtm_array == dtm_nodata))
return np.where((dsm_array < dtm_array) &
(any_nodata == False), dtm_array, dsm_array)
def get_opgaande_elementen(
dsm: rio.DatasetReader,
dtm: rio.DatasetReader,
read_window: rio.windows.Window,
minimum_difference: int = 3,
maaiveld_value: int = 0,
uniform_height: int = 0,
) -> np.array:
"""
Read DSM and DTM for a certain bounding box. Return array with DSM-DTM differences
where difference exceeds treshold cast to integer type
Parameters
----------
dsm_source
dtm_source
read_window
minimum_difference
maaiveld_value
uniform_height
Returns
-------
Details
-------
DSM | DTM | Difference | Opgaand Element (minimum_differnce = 3)
10 08 02 False
10 10 0 False
10 06 04 True
NoData NoData NoData False
10 NoData NoData True
-05 -08 03 True
01 -02 03 True
NoData 10 NoData False
"""
# dtm_array = dtm.read(1)#, window=read_window)
dtm_array = fill_array(dtm.read(1, window=read_window), nodata=dtm.nodata)
dsm_array = dsm.read(1, window=read_window)
if np.all(dsm_array == dsm.nodata):
# print(' All nodata, returning dummy array')
return np.multiply(np.ones(shape=(read_window.height, read_window.width)),
maaiveld_value)
verify = verify_dsm_exceeds_dtm(dsm_array=dsm_array, dsm_nodata=dsm.nodata,
dtm_array=dtm_array, dtm_nodata=dtm.nodata)
if not verify:
dsm_array = cap_dsm_to_dtm(dsm_array, dsm.nodata,
dtm_array, dtm.nodata)
# NoData mask
any_nodata = np.array((dsm_array == dsm.nodata) |
(dtm_array == dtm.nodata))
# Generate array of opgaande elementen
difference_array = np.subtract(
np.where(any_nodata, 0, dsm_array),
np.where(any_nodata, 0, dtm_array)
)
assert np.all(difference_array >= 0), f'Not all >= 0'
# Threshold differences
opgaande_elementen = np.where(difference_array >= minimum_difference,
uniform_height if uniform_height > 0 else difference_array,
maaiveld_value)
# If uniform height, also include pixles where DTM == NoData and DSM != NoData
if uniform_height:
opgaande_elementen = np.where((dtm_array == dtm.nodata) & (dsm_array != dsm.nodata),
uniform_height, opgaande_elementen)
# Cast difference array to integer
return np.round(opgaande_elementen).astype(np.uint16)
# aoi = gp.read_file(r'c:\apps\temp_geodata\scratch.gpkg',
# layer='test_aoi')
# dsm_source = r'c:\apps\temp_geodata\AHN3\AHN3-DSM-5m.tif'
# dtm_source = r'c:\apps\temp_geodata\AHN3\AHN3-DTM-5m.tif'
# og, values = get_opgaande_elementen(dsm_source, dtm_source, aoi)
# bbox_snap = snap_bounds_outwards(aoi)
# prof = rio.default_gtiff_profile
# prof.update(width=og.shape[1], height=og.shape[0],
# transform=affine.Affine(a=5, b=0, c=bbox_snap.left,
# d=0, e=-5, f=bbox_snap.top),
# count=1)
# with rio.open(r'c:\apps\temp_geodata\AHN3\sample\aoi01_test.tif', 'w', **prof) as dest:
# dest.write(og, 1)
import os
import numpy as np
from rasterio.features import rasterize
import rasterio as rio
import geopandas as gp
from skimage.filters.rank import majority
from skimage.morphology import disk
POWERLINES_SOURCE = r'c:\Users\roelo008\OneDrive - Wageningen University & Research\b_geodata\RIVM\vw_nl_netlijnen_2023_v4_5_december\vw_nl_netlijnen_2023_v4_5_december.shp'
TEMPORARY_FILENAME = r'powerlines_temporary'
def powerlines_to_raster(profile: dict):
"""
Read, buffer and rasterize powerlines, write to raster using raster profile
Parameters
----------
profile
Returns
-------
"""
powerlines = gp.read_file(POWERLINES_SOURCE)
powerlines = powerlines.set_geometry(powerlines.buffer(25)).assign(powerline=1)
print('Rasterizing powerlines.')
array = rasterize(
shapes=[(x.geometry, getattr(x, 'powerline')) for i, x in powerlines.iterrows()],
merge_alg=rio.enums.MergeAlg.replace,
transform=profile["transform"],
out_shape=(profile["height"], profile["width"]),
fill=profile["nodata"],
).astype(np.uint8)
return array
def majority_filter(powerline_array: np.array, **kwargs) -> np.array:
"""
Read opgaande elementen raster from file
Parameters
----------
kwargs
Returns
-------
"""
print('Majority filter.')
opgaande_elementen = rio.open(os.path.join(kwargs['out_dir'], f'{kwargs["out_name"]}.{kwargs["of"]}')).read(1)
filter_array = np.where((powerline_array == 1) & (opgaande_elementen > kwargs['mv_value']), 1, 0)
filtered_array = majority(filter_array, footprint=disk(2))
# Return where original array was > maaiveld AND where filtering creates 0
return np.where((filtered_array == 0) & (opgaande_elementen > kwargs['mv_value']) & (powerline_array == 1),
kwargs['mv_value'],
opgaande_elementen)
def filter_opgaande_elementen_under_powerlines(**kwargs):
"""
Read opgaande elementen array. Apply majority filter on areas below powerlines.
Parameters
----------
kwargs
Returns
-------
"""
profile = rio.open(os.path.join(kwargs['out_dir'], f'{kwargs["out_name"]}.{kwargs["of"]}')).profile
with rio.open(os.path.join(kwargs['out_dir'], f'{kwargs["out_name"]}_filtered.{kwargs["of"]}'),
'w', **profile) as dest:
dest.write(majority_filter(powerlines_to_raster(profile), **kwargs), 1)
print(r'Done \O/')
import os
def replace_hdr(dir: str, hdrname: str, output_profile: dict):
"""
replace a header file with header format compatible with ViewScape
:param dir: destination directory
:param f: header filename
:param output_profile:
:return:
"""
x_lower_left, y_lower_left = output_profile['transform'] * (0, output_profile['height'])
hdr = f"ncols {output_profile['width']}\n" \
f"nrows {output_profile['height']}\n" \
f"xllcorner {x_lower_left}\n" \
f"yllcorner {y_lower_left}\n" \
f"cellsize {output_profile['transform'].a}\n" \
f"NODATA_value {output_profile['nodata']}\n" \
"byteorder LSBFIRST"
with open(os.path.join(dir, f'{hdrname}_viewscape.hdr'), 'w') as f:
f.write(hdr)
\ No newline at end of file
import os
import geopandas as gp
import pandas as pd
COLUMN_NAMES = ["ID", "HEIGHT", "USE"]
def build_heights_table(height_values: list) -> gp.GeoDataFrame:
"""
Build GeoDataFrame with column names and None Geometry, with ID and HEIGHT value for each value in height_values
"""
return gp.GeoDataFrame(
pd.concat(
[
pd.DataFrame(data=[[0, 0, "maaiveld"]], columns=COLUMN_NAMES),
pd.DataFrame(
data={
COLUMN_NAMES[0]: height_values,
COLUMN_NAMES[1]: height_values,
COLUMN_NAMES[2]: "ahn opgaand element",
}
).sort_values(by=COLUMN_NAMES[0]),
],
ignore_index=True,
).assign(geometry=None)
)
def heights_table_to_file(destination: str, height_table: gp.GeoDataFrame) -> None:
"""
Write heights table to file as *.dbf
"""
height_table.to_file(f'{os.path.splitext(destination)[0]}.shp')
# Remove redundant Shapefile components
for f in os.listdir(os.path.dirname(destination)):
if f.endswith(
tuple(
[
f"{os.path.splitext(os.path.basename(destination))[0]}{ext}"
for ext in [
".shp",
".shx",
".prj",
".sbn",
".sbx",
".cpg",
"shp.xml",
]
]
)
):
os.remove(os.path.join(os.path.dirname(destination), f))
import affine
import numpy as np
import random
from rasterio.windows import Window
class ReadWriteWindow:
"""
A single read/write window and its associated unique combinations
"""
def __init__(
self,
row_start: int,
row_stop: int,
column_start: int,
column_stop: int,
nr: int,
top_left_x: int,
top_left_y: int,
resolution: int,
):
self.rio_window = Window.from_slices(
(row_start, row_stop), (column_start, column_stop)
)
self.transform = affine.Affine(
a=resolution, b=0, c=top_left_x, d=0, e=resolution * -1, f=top_left_y
)
def get_rw_windows(
target_profile: dict,
window_height: int = 2000,
window_width: int = 2000,
sample: int = 0,
) -> list:
""" """
target_width, target_height = (target_profile['width'], target_profile['height'])
assert np.divmod(target_height, window_height)[1] == 0, f"window height {window_height} not conmensurable with raster height {target_height}"
assert np.divmod(target_width, window_width)[1] == 0, f"window height {window_width} not conmensurable with raster width {target_width}"
row_starts = range(0, target_profile["height"], window_height)
column_starts = range(0, target_profile["width"], window_width)
windows = []
n = 0
n_samples = sample
for row_start in {True: random.sample(row_starts, n_samples), False: row_starts}[sample > 0]:
for column_start in {
True: random.sample(column_starts, n_samples),
False: column_starts,
}[sample > 0]:
windows.append(
ReadWriteWindow(
row_start=row_start,
row_stop=row_start + window_height,
column_start=column_start,
column_stop=column_start + window_width,
nr=n,
top_left_x=(
target_profile["transform"] * (column_start, row_start)
)[0],
top_left_y=(
target_profile["transform"] * (column_start, row_start)
)[1],
resolution=getattr(
target_profile["transform"],
"a",
),
)
)
n += 1
print(f"Created {n} windows of size {window_height}X{window_width}")
return windows
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment