Source code for weitsicht.transform.coordinates_transformer
# -----------------------------------------------------------------------
# Copyright 2026 Martin Wieser
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------
"""Coordinate transformation helpers based on :mod:`pyproj`."""
from __future__ import annotations
import logging
from functools import lru_cache
from typing import overload
import numpy as np
from pyproj import CRS, Transformer
from pyproj.exceptions import CRSError, ProjError
from pyproj.transformer import TransformerGroup
from shapely import errors as shapely_errors
from shapely import geometry
from weitsicht import cfg
from weitsicht.exceptions import CoordinateTransformationError
from weitsicht.utils import ArrayNx2, ArrayNx3, Vector2D, Vector3D
__all__ = ["CoordinateTransformer"]
logger = logging.getLogger(__name__)
# The doc states that cached Versions can be used to speed up performance significantly
TransformerFromCRS = lru_cache(Transformer.from_crs)
TransformerFromPipeline = lru_cache(Transformer.from_pipeline)
def get_transformation_transformergroup(crs_source: CRS, crs_target: CRS) -> TransformerGroup: # pragma: no cover
"""Create a :class:`pyproj.transformer.TransformerGroup` between two CRSs.
Note: the pyproj documentation states that the returned objects are not thread-safe.
:param crs_source: Source CRS.
:type crs_source: CRS
:param crs_target: Target CRS.
:type crs_target: CRS
:return: Transformer group for ``crs_source`` → ``crs_target``.
:rtype: TransformerGroup
:raises CoordinateTransformationError: If no suitable transformation can be established.
"""
try:
transformer = TransformerGroup(
crs_source,
crs_target,
always_xy=True,
allow_ballpark=cfg._ballpark_transformation,
)
if cfg._only_best_transformation and not transformer.best_available:
raise CoordinateTransformationError(
"Best Transformation is not available but only best transformations are allowed"
)
return transformer
# self.transformer = Transformer.from_crs(self._crs, crs_target, always_xy=True,
# only_best=cfg._only_best_transformation,
# allow_ballpark=cfg._ballpark_transformation)
except ProjError as e:
raise CoordinateTransformationError("Transformation could not be established.") from e
[docs]
class CoordinateTransformer:
"""Class to handle transformations of coordinates.
Basically a little pyproj wrapper with basic functions needed in the package.
"""
@overload
def __init__(self, transformer: Transformer): ...
@overload
def __init__(self, transformer: list[Transformer]): ...
[docs]
def __init__(self, transformer: Transformer | list[Transformer]):
"""Create a transformer wrapper.
:param transformer: A single pyproj transformer or a chain of transformers.
:type transformer: Transformer | list[Transformer]
"""
if isinstance(transformer, Transformer):
self._transformers = [transformer]
else:
self._transformers: list[Transformer] = transformer
@property
def source_crs(self) -> CRS | None:
"""Return the source CRS of the transformer chain."""
return self._transformers[0].source_crs
@property
def target_crs(self) -> CRS | None:
"""Return the target CRS of the transformer chain."""
return self._transformers[-1].target_crs
[docs]
@classmethod
def from_pipeline(cls, proj_pipeline: str):
"""Create a transformer from a PROJ pipeline definition.
:param proj_pipeline: PROJ pipeline string.
:type proj_pipeline: str
:return: Transformer instance.
:rtype: CoordinateTransformer
:raises CoordinateTransformationError: If the pipeline cannot be established.
"""
try:
transformer = TransformerFromPipeline(proj_pipeline)
except (CRSError, ProjError) as e:
raise CoordinateTransformationError("Establishing PROJ PIPELINE not working") from e
_obj = cls(transformer=[transformer])
return _obj
[docs]
@classmethod
def from_crs(cls, crs_s: CRS | None, crs_t: CRS | None) -> CoordinateTransformer | None:
"""Create a coordinate transformer from source CRS to target CRS.
If both CRSs are ``None`` or equal, this returns ``None`` (no transformation required).
:param crs_s: Source CRS, or ``None``.
:type crs_s: CRS | None
:param crs_t: Target CRS, or ``None``.
:type crs_t: CRS | None
:return: Coordinate transformer or ``None`` if no transformation is required.
:rtype: CoordinateTransformer | None
:raises ValueError: If only one CRS is ``None``.
:raises CoordinateTransformationError: If establishing a transformation fails.
"""
# First we check if we anyhow need any transformation
# If both CRS are None or both are equal we will not do any transformation
if crs_s is None and crs_t is None:
return None
if (crs_s is None) != (crs_t is None):
raise ValueError("Either both CRS have to be stated or for both None")
assert crs_s is not None
assert crs_t is not None
if crs_s.equals(crs_t):
return None
transformer: list[Transformer] = []
geod_s = crs_s.geodetic_crs.to_3d()
geod_t = crs_t.geodetic_crs.to_3d()
if geod_s.equals(geod_t):
transformer.append(
TransformerFromCRS(
crs_s,
crs_t,
always_xy=True,
allow_ballpark=cfg._ballpark_transformation,
only_best=cfg._only_best_transformation,
)
)
else:
try:
transformer.append(
TransformerFromCRS(
crs_s,
geod_s,
always_xy=True,
allow_ballpark=cfg._ballpark_transformation,
only_best=cfg._only_best_transformation,
)
)
transformer.append(
TransformerFromCRS(
geod_s,
geod_t,
always_xy=True,
allow_ballpark=cfg._ballpark_transformation,
only_best=cfg._only_best_transformation,
)
)
transformer.append(
TransformerFromCRS(
geod_t,
crs_t,
always_xy=True,
allow_ballpark=cfg._ballpark_transformation,
only_best=cfg._only_best_transformation,
)
)
except (CRSError, ProjError) as e:
raise CoordinateTransformationError("Establishing coordinate transformation for mapping failed") from e
_obj = cls(transformer=transformer)
return _obj
[docs]
@staticmethod
def is_3d(coo: ArrayNx2 | ArrayNx3):
"""Return whether the coordinate array has a Z component.
:param coo: Coordinate array.
:type coo: ArrayNx2 | ArrayNx3
:return: ``True`` if the input has shape N×3.
:rtype: bool
"""
if coo.shape[1] == 3:
return True
return False
[docs]
def transform(self, coordinates: ArrayNx3 | ArrayNx2, direction: str = "forward") -> ArrayNx3 | ArrayNx2:
"""Transform coordinates using the configured transformer chain.
:param coordinates: Coordinates to transform as N×2 (2D) or N×3 (3D).
:type coordinates: ArrayNx3 | ArrayNx2
:param direction: Transform direction (``forward``, ``inverse``, ``identity``), defaults to ``forward``.
:type direction: str
:return: Transformed coordinates with the same dimensionality as input.
:rtype: ArrayNx3 | ArrayNx2
:raises CoordinateTransformationError: If input dimensions are invalid or a transformation fails.
"""
_coordinates = coordinates.copy()
if _coordinates.ndim == 1:
_coordinates = np.array([_coordinates])
if _coordinates.shape[1] < 2:
raise CoordinateTransformationError("Wrong input dimension")
# Iterate over the transformer list
# if direction is inverse we need to start from end for list
if direction == "inverse":
transformers_order = list(reversed(self._transformers))
else:
transformers_order = self._transformers
for trafo in transformers_order:
try:
# Other ways I called transform
# res = trafo.transform(*_coordinates.T, errcheck=True)
# p_crs = np.array(res).T
# fx, fy, fz = transformer_srs_4979.transform(ray_start[:, 0], ray_start[:, 1],
# ray_start[:, 2], errcheck=True)
if self.is_3d(_coordinates):
res = trafo.transform(
_coordinates[:, 0],
_coordinates[:, 1],
_coordinates[:, 2],
errcheck=True,
direction=direction,
)
_coordinates = np.array(res).T
else:
res = trafo.transform(
_coordinates[:, 0],
_coordinates[:, 1],
errcheck=True,
direction=direction,
)
_coordinates = np.array(res).T
if np.any(_coordinates == np.inf):
raise CoordinateTransformationError("Coordinate Transformation failed")
except (CRSError, ProjError) as e:
raise CoordinateTransformationError("Error while transforming coordinates") from e
return _coordinates
[docs]
def transform_vector(self, coordinates_start: ArrayNx3, vector: ArrayNx3, direction: str = "forward") -> ArrayNx3:
"""Transform direction vectors using the configured transformer chain.
Vectors can not be transformed directly through most CRS transformations, therefore this
helper transforms two points per vector and derives the resulting direction:
``start`` and ``start + unit(vector) * 10`` (i.e. 10 units along the provided vector direction).
:param coordinates_start: Start coordinates (N×3).
:type coordinates_start: ArrayNx3
:param vector: Direction vectors in the source CRS (N×3 or single 3-vector).
:type vector: ArrayNx3
:param direction: Transform direction (``forward``, ``inverse``, ``identity``), defaults to ``forward``.
:type direction: str
:return: Unit direction vectors in the target CRS (N×3).
:rtype: ArrayNx3
:raises ValueError: If input shapes are incompatible or vectors have zero length.
:raises CoordinateTransformationError: If transforming coordinates fails.
"""
start = np.asarray(coordinates_start, dtype=float)
if start.ndim == 1:
start = np.array([start], dtype=float)
if start.ndim != 2 or start.shape[1] < 3:
raise ValueError("coordinates_start must be array-like with shape (N, 3)")
start = start[:, :3]
vec = np.asarray(vector, dtype=float)
if vec.ndim == 1:
vec = np.array([vec], dtype=float)
if vec.ndim != 2 or vec.shape[1] < 3:
raise ValueError("vector must be array-like with shape (N, 3) or (3,)")
vec = vec[:, :3]
if vec.shape[0] == 1 and start.shape[0] > 1:
vec = np.repeat(vec, start.shape[0], axis=0)
if vec.shape[0] != start.shape[0]:
raise ValueError("coordinates_start and vector must have the same length")
vec_norm = np.linalg.norm(vec, axis=1)
if np.any(vec_norm == 0):
raise ValueError("vector contains zero-length vectors")
vec_unit = vec / vec_norm[:, None]
end = start + vec_unit * 10.0
# Transform both start/end in one go for performance.
stacked = np.vstack((start, end))
stacked_t = self.transform(stacked, direction=direction)
stacked_t = np.asarray(stacked_t, dtype=float)
start_t = stacked_t[: start.shape[0], :]
end_t = stacked_t[start.shape[0] :, :]
vec_t = end_t - start_t
vec_t_norm = np.linalg.norm(vec_t, axis=1)
if np.any(vec_t_norm == 0):
raise ValueError("Transformed vectors contain zero-length vectors")
return vec_t / vec_t_norm[:, None]
[docs]
def transform_rotation_matrix(
self,
coordinates_start: ArrayNx3,
rotation_matrix: np.ndarray,
direction: str = "forward",
orthonormalize: bool = True,
) -> np.ndarray:
"""Transform rotation matrices between CRSs by transforming their basis axes.
The rotation matrix convention in ``weitsicht`` is::
v_world = R @ v_local
i.e. the *columns* of ``R`` are the local basis axes expressed in the world CRS.
This helper transforms those three axes using :meth:`transform_vector` evaluated at
``coordinates_start`` and re-assembles a rotation matrix in the target CRS.
Since many CRS transformations are not strictly rigid, the transformed axes may not be
perfectly orthogonal. If ``orthonormalize`` is ``True``, the result is projected onto
SO(3) via an SVD-based orthonormalization, enforcing ``det(R)=+1``.
:param coordinates_start: Start coordinates (N×3) where the rotation is defined.
:type coordinates_start: ArrayNx3
:param rotation_matrix: Rotation matrix (3×3) or stacked (N×3×3).
:type rotation_matrix: numpy.ndarray
:param direction: Transform direction (``forward``, ``inverse``), defaults to ``forward``.
:type direction: str
:param orthonormalize: Re-orthonormalize the transformed axes, defaults to ``True``.
:type orthonormalize: bool
:return: Transformed rotation matrix (3×3) or stacked (N×3×3).
:rtype: numpy.ndarray
:raises ValueError: If input shapes are incompatible.
"""
start = np.asarray(coordinates_start, dtype=float)
start_was_vector = start.ndim == 1
if start_was_vector:
start = np.array([start], dtype=float)
if start.ndim != 2 or start.shape[1] < 3:
raise ValueError("coordinates_start must be array-like with shape (N, 3)")
start = start[:, :3]
rot = np.asarray(rotation_matrix, dtype=float)
rot_was_matrix = rot.ndim == 2
if rot_was_matrix:
if rot.shape != (3, 3):
raise ValueError("rotation_matrix must have shape (3, 3)")
rot = np.array([rot], dtype=float)
if rot.ndim != 3 or rot.shape[1:] != (3, 3):
raise ValueError("rotation_matrix must have shape (3, 3) or (N, 3, 3)")
# Broadcast a single start/rotation to match the other input.
if start.shape[0] == 1 and rot.shape[0] > 1:
start = np.repeat(start, rot.shape[0], axis=0)
if rot.shape[0] == 1 and start.shape[0] > 1:
rot = np.repeat(rot, start.shape[0], axis=0)
if start.shape[0] != rot.shape[0]:
raise ValueError("coordinates_start and rotation_matrix must have the same length")
n = start.shape[0]
# Transform the three basis axes (columns) in one call.
axes_src = rot.transpose(0, 2, 1).reshape(n * 3, 3)
start_rep = np.repeat(start, 3, axis=0)
axes_t = self.transform_vector(start_rep, axes_src, direction=direction).reshape(n, 3, 3)
rot_t = axes_t.transpose(0, 2, 1)
if orthonormalize:
u, _, vt = np.linalg.svd(rot_t)
rot_t = u @ vt
det = np.linalg.det(rot_t)
reflect = det < 0
if np.any(reflect):
u[reflect, :, -1] *= -1.0
rot_t[reflect] = u[reflect] @ vt[reflect]
if start_was_vector and rot_was_matrix:
return rot_t[0]
return rot_t
class Geometries: # pragma: no cover
pass
def __init__(self, points: ArrayNx3 | Vector3D | Vector2D | ArrayNx2):
# points: ArrayNx3 | Vector3D | Vector2D | ArrayNx2):
self._coordinates = points.copy()
# The class uses internal only array with ndim 2. So point np.array([0,0,1]) will be np.array([[0,0,1]])
if self._coordinates.ndim == 1:
self._coordinates = np.array([self._coordinates])
@property
def is_point(self) -> bool:
if self._coordinates.shape[0] == 1:
return True
return False
@property
def is_3d(self):
if self._coordinates.shape[1] == 3:
return True
return False
def geojson(self, geom_type: str) -> dict:
"""Convert the stored coordinates into a GeoJSON mapping.
:param geom_type: Geometry type (``Point``, ``LineString``, ``Polygon``).
:type geom_type: str
:return: GeoJSON mapping dict.
:rtype: dict
:raises CoordinateTransformationError: If geometry type is unsupported or coordinates are invalid.
"""
try:
if geom_type == "Point":
geojson = geometry.Point(self.coordinates)
elif geom_type == "LineString":
geojson = geometry.LineString(self._coordinates)
elif geom_type == "Polygon":
geojson = geometry.Polygon(self._coordinates)
else:
raise CoordinateTransformationError(r"Geometry is not supported. Only 'Point', 'LineString', 'Polygon'")
except (shapely_errors.GEOSException, ValueError) as err:
# Will also raise if multi coordinates and point type is used
raise CoordinateTransformationError("GeoJSON can not be established from given coordinates") from err
geojson = geometry.mapping(geojson)
return geojson
@property
def coordinates(self) -> ArrayNx3 | ArrayNx2:
return self._coordinates
@coordinates.setter
def coordinates(self, new_coordinate: ArrayNx3 | ArrayNx2):
points = new_coordinate.copy()
# The class uses internal only array with ndim 2. So point np.array([0,0,1]) will be np.array([[0,0,1]])
if points.ndim == 1:
new_coordinate = np.array([new_coordinate])
self._coordinates = new_coordinate