# -----------------------------------------------------------------------
# Copyright 2026 Martin Wieser
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------
"""Mapping backend for a horizontal plane at a fixed altitude."""
from __future__ import annotations
import logging
from collections.abc import Mapping
from typing import Any
import numpy as np
import pyproj.exceptions
from pyproj import CRS
from weitsicht.exceptions import CRSInputError
from weitsicht.geometry.intersection_plane import intersection_plane_mat_operation
from weitsicht.mapping.base_class import MappingBase, MappingType
from weitsicht.transform.coordinates_transformer import CoordinateTransformer
from weitsicht.utils import (
ArrayNx2,
ArrayNx3,
Issue,
MappingResult,
MappingResultSuccess,
ResultFailure,
to_array_nx3,
)
__all__ = ["MappingHorizontalPlane"]
logger = logging.getLogger(__name__)
[docs]
class MappingHorizontalPlane(MappingBase):
"""Mapping backend for a horizontal plane at a fixed altitude.
The plane is defined by ``plane_altitude`` (Z coordinate in mapper CRS) and a fixed normal (0, 0, 1).
If a CRS transformation is performed, the plane altitude is transformed appropriately via the provided
``crs_s``/``transformer`` inputs.
"""
[docs]
def __init__(self, plane_altitude: float | int = 0.0, crs: CRS | None = None):
"""Create a horizontal plane mapper.
:param plane_altitude: Altitude of the horizontal plane (Z value in mapper CRS), defaults to ``0.0``.
:type plane_altitude: float | int
:param crs: CRS of the plane, defaults to ``None``.
:type crs: CRS | None
"""
super().__init__()
self.plane_altitude = float(plane_altitude)
self.plane_normal = np.array([0, 0, 1])
self._crs = crs
# TODO check if crs is not None that the CRS provides a vertical reference
[docs]
@classmethod
def from_dict(cls, mapper_dict: Mapping[str, Any]) -> MappingHorizontalPlane:
"""Create a :class:`MappingHorizontalPlane` instance from a configuration dictionary.
Required keys:
- ``type``: must be ``horizontalPlane``
Optional keys:
- ``plane_altitude``: plane altitude (float), defaults to ``0.0``
- ``crs``: CRS WKT string, defaults to ``None``
:param mapper_dict: Mapper configuration dictionary (typically created via ``mapper.param_dict``).
:type mapper_dict: Mapping[str, Any]
:return: Instantiated mapper.
:rtype: MappingHorizontalPlane
:raises KeyError: If the dictionary key ``type`` is missing.
:raises ValueError: If the mapper type is unsupported.
:raises CRSInputError: If the CRS WKT string is invalid.
"""
if mapper_dict.get("type", None) is None:
raise KeyError("Dictionary key 'type' is missing")
if mapper_dict.get("type", None) != MappingType.HorizontalPlane.fullname:
raise ValueError("Mapper dictionary type is not " + MappingType.HorizontalPlane.fullname)
plane_altitude = mapper_dict.get("plane_altitude", 0.0)
crs_text = mapper_dict.get("crs", None)
crs = None
if crs_text is not None:
try:
crs = CRS(crs_text)
except pyproj.exceptions.CRSError as err:
raise CRSInputError("No valid CRS WKT string supported") from err
mapping = cls(plane_altitude=plane_altitude, crs=crs)
return mapping
@property
def type(self):
"""Return the mapper type."""
return MappingType.HorizontalPlane
@property
def param_dict(self):
"""Return the mapper configuration dictionary.
:return: Mapper configuration dictionary.
:rtype: dict[str, Any]
"""
crs_text = None
if self._crs is not None:
crs_text = self.crs_wkt
return {
"type": self.type.fullname,
"plane_altitude": self.plane_altitude,
"crs": crs_text,
}
[docs]
def map_coordinates_from_rays(
self,
ray_vectors_crs_s: ArrayNx3,
ray_start_crs_s: ArrayNx3,
crs_s: CRS | None = None,
transformer: CoordinateTransformer | None = None,
) -> MappingResult:
"""Map coordinates from rays to a horizontal plane.
:param ray_vectors_crs_s: Ray direction vectors (N×3).
:type ray_vectors_crs_s: ArrayNx3
:param ray_start_crs_s: Ray start points (N×3), same shape as ``ray_vectors_crs_s``.
:type ray_start_crs_s: ArrayNx3
:param crs_s: CRS of the input rays, defaults to None.
:type crs_s: CRS | None
:param transformer: Coordinate transformer to mapper CRS, defaults to None.
:type transformer: CoordinateTransformer | None
:return: Mapping result containing intersection coordinates and a validity mask.
:rtype: MappingResult
:raises ValueError: If input arrays have incompatible shapes.
:raises CRSInputError: If both ``crs_s`` and ``transformer`` are provided.
:raises CoordinateTransformationError: If coordinate transformation fails.
"""
# A Array(3,) (Vector3D) should also work ray_vectors_crs_s and ray_start_crs_s
ray_pos = to_array_nx3(ray_start_crs_s)
ray_vec = to_array_nx3(ray_vectors_crs_s)
if ray_pos.shape != ray_vec.shape:
raise ValueError("Ray and Pos Vector not the same size")
# transform the given coordinates to the mappers crs
# replace the z coordinate of the coordinates in mapper crs with the mappers height
# transform back to the source CRS which now provides mapper height in source crs
# Therefore the mapper's height is correct transformed to the image crs at the image's prj center
if crs_s is not None and transformer is not None:
raise CRSInputError("Either crs or transformation can be used or both None")
coo_trafo = (
transformer if transformer is not None else CoordinateTransformer.from_crs(crs_s=crs_s, crs_t=self._crs)
)
# We will do this iterative, so if in the coordinate transformation raster are used
# We will iterate the plane height to that one from the correct intersection point
# Thus also valid mask should be taken into account, otherwise coordinate operation are wrong
# If both coo_trafo is None we will skip iteration
prev_intersect_points = np.ones(ray_pos.shape, dtype=float)
diff_greater_1mm = np.ones(ray_pos.shape, dtype=float)
iteration: int = 0
valid_mask = np.ones((ray_pos.shape[0],), dtype=bool)
plane_point_source_crs = np.ones(ray_pos.shape, dtype=float)
intersect_points = np.empty(ray_pos.shape, dtype=float)
intersect_points.fill(np.nan)
# initial height will be taken from ray start position
if coo_trafo is not None:
plane_point_mapper_crs = coo_trafo.transform(ray_pos)
else:
plane_point_mapper_crs = ray_pos * 1.0
plane_point_mapper_crs[:, 2] = self.plane_altitude
while np.any(diff_greater_1mm > 0.001) and iteration < 20:
index_valid_mask = np.flatnonzero(valid_mask)
# Transform back to source crs. So we have the horizontal planes Altitude in the correct system
if coo_trafo is not None:
plane_point_source_crs[index_valid_mask, :] = coo_trafo.transform(
plane_point_mapper_crs[index_valid_mask, :], direction="inverse"
)
else:
plane_point_source_crs[index_valid_mask, :] = plane_point_mapper_crs[index_valid_mask, :] * 1.0
# _intersect_points, _valid_mask are corresponding to size of true items in valid_mask
# its very unlikely that validity changes between iterations
# only if the plane point would then be exactly the ray pos it would be invalid
# so at least always grapping validity we can reduce risk here
# Testing that case would be lot of work, creating a pipeline which transform exactly like this.
_intersect_points, _valid_mask = intersection_plane_mat_operation(
ray_vec[index_valid_mask, :],
ray_pos[index_valid_mask, :],
plane_point=plane_point_source_crs[index_valid_mask, :],
plane_normal=self.plane_normal,
)
_index_false = np.flatnonzero(~_valid_mask)
_index_correct = np.flatnonzero(_valid_mask)
valid_mask[index_valid_mask[_index_false]] = False
intersect_points[index_valid_mask[_index_correct], :] = _intersect_points[_index_correct, :]
if coo_trafo is None:
break
if not np.any(valid_mask):
break
diff_greater_1mm = np.abs(intersect_points[valid_mask, :] - prev_intersect_points[valid_mask, :])
prev_intersect_points = intersect_points * 1.0
iteration += 1
# Now the new plane point is the previous intersection point
if coo_trafo is not None:
plane_point_mapper_crs[valid_mask, :] = coo_trafo.transform(intersect_points[valid_mask, :])
plane_point_mapper_crs[valid_mask, 2] = self.plane_altitude
else:
plane_point_mapper_crs[valid_mask, :] = ray_pos[valid_mask, :] * 1.0
issue = set()
if not np.all(valid_mask):
issue = {Issue.NO_INTERSECTION}
if not np.any(valid_mask):
return ResultFailure(
ok=False,
error="None of the rays intersects",
issues={Issue.NO_INTERSECTION},
)
normals_mapper_crs = np.full(plane_point_mapper_crs.shape, np.nan, dtype=float)
normals_mapper_crs[valid_mask, :] = np.array([0, 0, 1.0])
normals = np.full(plane_point_mapper_crs.shape, np.nan, dtype=float)
if coo_trafo is not None:
normals[valid_mask, :] = coo_trafo.transform_vector(
plane_point_mapper_crs[valid_mask, :],
normals_mapper_crs[valid_mask, :],
direction="inverse",
)
else:
normals = normals_mapper_crs
return MappingResultSuccess(
ok=True,
coordinates=intersect_points,
mask=valid_mask,
normals=normals,
crs=crs_s,
issues=issue,
)
[docs]
def map_heights_from_coordinates(
self,
coordinates_crs_s: ArrayNx3 | ArrayNx2,
crs_s: CRS | None = None,
transformer: CoordinateTransformer | None = None,
) -> MappingResult:
"""Assign the plane altitude as height for given coordinates.
:param coordinates_crs_s: Coordinates to sample (N×2 or N×3).
:type coordinates_crs_s: ArrayNx3 | ArrayNx2
:param crs_s: CRS of the input coordinates, defaults to None.
:type crs_s: CRS | None
:param transformer: Coordinate transformer to mapper CRS, defaults to None.
:type transformer: CoordinateTransformer | None
:return: Mapping result containing coordinates with plane altitude and a validity mask.
:rtype: MappingResult
:raises CRSInputError: If both ``crs_s`` and ``transformer`` are provided.
:raises CoordinateTransformationError: If coordinate transformation fails.
"""
# Prepare input if something else than ArrayNx3 (e.g. List, ArrayN)
coordinates = to_array_nx3(coordinates_crs_s, 0.0)
if crs_s is not None and transformer is not None:
raise CRSInputError("Either crs or transformation can be used or both None")
coo_trafo = (
transformer if transformer is not None else CoordinateTransformer.from_crs(crs_s=crs_s, crs_t=self._crs)
)
if coo_trafo is not None:
coo_mapper_crs = coo_trafo.transform(coordinates)
else:
coo_mapper_crs = coordinates * 1.0
# Now change the altitude to the plane height
coo_mapper_crs[:, 2] = self.plane_altitude
# Transform back to source crs. So we have the horizontal planes Altitude in the correct system
# Here no iteration is needed.
# Or at least if we not assume that the transformation with a new height will not change the 2d coordinates
if coo_trafo is not None:
coo_source_crs = coo_trafo.transform(coo_mapper_crs, direction="inverse")
else:
coo_source_crs = coo_mapper_crs * 1.0
# In that case all are valid because its simple assigned its heights
# Any coordinate error would have already raised an error before
mask = np.ones((coo_source_crs.shape[0]), dtype=bool)
normals_mapper_crs = np.full(coo_source_crs.shape, np.array([0, 0, 1]), dtype=float)
if coo_trafo is not None:
normals = coo_trafo.transform_vector(coo_mapper_crs, normals_mapper_crs, direction="inverse")
else:
normals = normals_mapper_crs
return MappingResultSuccess(ok=True, coordinates=coo_source_crs, mask=mask, normals=normals, crs=crs_s)