# Copyright (c) 2021, GeoVista Contributors.
#
# This file is part of GeoVista and is distributed under the 3-Clause BSD license.
# See the LICENSE file in the package root directory for licensing details.
"""Support to find points or cells within a mesh using various techniques.
Notes
-----
.. versionadded:: 0.1.0
"""
from __future__ import annotations
from collections.abc import Iterable
from enum import Enum
from typing import TYPE_CHECKING
import lazy_loader as lazy
from .common import MixinStrEnum, to_cartesian
from .crs import WGS84, from_wkt
from .transform import transform_points
if TYPE_CHECKING:
import numpy as np
from numpy.typing import ArrayLike
import pyvista as pv
# Type aliases
NearestNeighbours = tuple[ArrayLike, ArrayLike]
"""Type alias for a tuple of nearest neighbour distances and indices.""" ""
# type aliases
CellIDs = list[int]
CellIDLike = int | CellIDs
# lazy import third-party dependencies
np = lazy.load("numpy")
__all__ = [
"KDTREE_EPSILON",
"KDTREE_K",
"KDTREE_LEAF_SIZE",
"KDTREE_PREFERENCE",
"KDTree",
"NearestNeighbours",
"SearchPreference",
"find_cell_neighbours",
"find_nearest_cell",
]
KDTREE_EPSILON: float = 0.0
"""The default kd-tree nearest neighbour epsilon."""
KDTREE_K: int = 1
"""The default kd-tree number of nearest neighbours."""
KDTREE_LEAF_SIZE: int = 16
"""The default kd-tree leaf-size."""
KDTREE_PREFERENCE: str = "point"
"""The default search preference."""
# TODO @bjlittle: Use StrEnum and auto when minimum supported python version is 3.11.
[docs]
class SearchPreference(MixinStrEnum, Enum):
"""Enumeration of mesh geometry search preferences.
Notes
-----
.. versionadded:: 0.3.0
"""
CENTER = "center"
POINT = "point"
[docs]
class KDTree: # numpydoc ignore=PR01
"""Construct a kd-tree for fast nearest neighbour search of a mesh.
For further details, see https://github.com/storpipfugl/pykdtree.
Notes
-----
.. versionadded:: 0.3.0
"""
def __init__(
self,
mesh: pv.PolyData,
leaf_size: int | None = None,
preference: str | SearchPreference | None = None,
) -> None:
"""Construct kd-tree for nearest neighbour search of mesh points/cell centers.
Note that, the cell centers will be calculated from the mesh geometry and do not
require to be provided.
The geolocated mesh data points are converted to cartesian coordinates on a S2
sphere, as the kd-tree implementation uses Euclidean distance as the metric for
nearest neighbours.
Nearest neighbour queries are optionally multi-threaded using OpenMP. For
further details see https://github.com/storpipfugl/pykdtree.
Parameters
----------
mesh : PolyData
The mesh used to construct the kd-tree.
leaf_size : int, optional
The number of data points per tree leaf. Used to control the memory overhead
of the kd-tree. Increasing the leaf size will reduce the memory overhead and
construction time, but increase the query time. Defaults to
:data:`KDTREE_LEAF_SIZE`.
preference : str or SearchPreference, optional
Construct the kd-tree from the `mesh` points ``point`` or cell centers
``center``. Also see :class:`SearchPreference`. Defaults to
:data:`KDTREE_PREFERENCE`.
Notes
-----
.. versionadded:: 0.3.0
"""
from pykdtree.kdtree import KDTree as pyKDTree
if leaf_size is None:
leaf_size = KDTREE_LEAF_SIZE
leaf_size = int(leaf_size)
if preference is None:
preference = KDTREE_PREFERENCE
if not SearchPreference.valid(preference):
options = " or ".join(f"{item!r}" for item in SearchPreference.values())
emsg = f"Expected a preference of {options}, got '{preference}'."
raise ValueError(emsg)
self._preference = SearchPreference(preference)
xyz = (
mesh.points
if self._preference == SearchPreference.POINT
else mesh.cell_centers().points
)
crs = from_wkt(mesh)
if crs is None:
crs = WGS84
if crs != WGS84:
transformed = transform_points(
src_crs=crs, tgt_crs=WGS84, xs=xyz[:, 0], ys=xyz[:, 1]
)
# TODO @bjlittle: Clarify zlevel preservation for non-WGS84 point-clouds.
xyz = to_cartesian(transformed[:, 0], transformed[:, 1])
self._n_points = xyz.shape[0]
self._mesh_type = mesh.__class__.__name__
self._kdtree = pyKDTree(xyz, leafsize=leaf_size)
def __repr__(self) -> str:
"""Serialize kd-tree representation.
Returns
-------
str
Notes
-----
.. versionadded:: 0.3.0
"""
klass = f"{self.__class__.__name__}"
mesh = f"<{self._mesh_type} N POINTS: {self._n_points}>"
leaf_size = f"leaf_size={self.leaf_size}"
preference = f"preference='{self.preference}'"
return f"{klass}({mesh}, {leaf_size}, {preference})"
@property
def leaf_size(self) -> int:
"""The number of data points per tree leaf.
Used to control the memory overhead of the kd-tree. Increasing the leaf size
will reduce the memory overhead and construction time, but increase the query
time.
Returns
-------
int
The leaf size i.e., the number of data points per leaf, for the tree
creation.
Notes
-----
.. versionadded:: 0.3.0
"""
return self._kdtree.leafsize
@property
def n_points(self) -> int:
"""Number of mesh points registered with the kd-tree.
Returns
-------
int
The number of mesh points in the kd-tree.
Notes
-----
.. versionadded:: 0.3.0
"""
return self._n_points
@property
def points(self) -> np.ndarray:
"""The cartesian data points registered with the kd-tree.
Returns
-------
np.ndarray
The cartesian data points in the kd-tree.
Notes
-----
.. versionadded:: 0.3.0
"""
return self._kdtree.data.reshape(-1, 3).copy()
@property
def preference(self) -> SearchPreference:
"""The target mesh geometry to search.
Focus either on the mesh points or the mesh cell centers.
Returns
-------
SearchPreference
The preference of mesh geometry to search.
Notes
-----
.. versionadded:: 0.3.0
"""
return self._preference
[docs]
def query(
self,
lons: float | ArrayLike,
lats: float | ArrayLike,
k: int | None = None,
epsilon: float | None = None,
distance_upper_bound: float | None = None,
radius: float | None = None,
zlevel: float | ArrayLike | None = None,
zscale: float | None = None,
) -> NearestNeighbours:
"""Query the kd-tree for `k` nearest neighbours per point-of-interest.
Parameters
----------
lons : float or ArrayLike
One or more longitude values for the query points-of-interest.
lats : float or ArrayLike
One or more latitude values for the query points-of-interest.
k : int, optional
The number of nearest neighbours to find per point-of-interest. Defaults
to :data:`KDTREE_K`.
epsilon : non-negative float, optional
Return approximate nearest neighbours; the k-th returned value
is guaranteed to be no further than (1 + epsilon) times the distance
to the real k-th nearest neighbour. Defaults to :data:`KDTREE_EPSILON`.
distance_upper_bound : non-negative float, optional
Return only neighbors within this distance. This is used to prune tree
searches.
radius : float, optional
The radius of the sphere. Defaults to :data:`geovista.common.RADIUS`.
zlevel : float or ArrayLike, default=0.0
The z-axis level. Used in combination with the `zscale` to offset the
`radius` by a proportional amount i.e., ``radius * zlevel * zscale``.
If `zlevel` is not a scalar, then its shape must match or broadcast
with the shape of `lons` and `lats`.
zscale : float, optional
The proportional multiplier for z-axis `zlevel`. Defaults to
:data:`geovista.common.ZLEVEL_SCALE`.
Returns
-------
NearestNeighbours
The Euclidean distance and index of the k nearest neighbours for each
query point-of-interest.
Notes
-----
.. versionadded:: 0.3.0
"""
if k is None:
k = KDTREE_K
if epsilon is None:
epsilon = KDTREE_EPSILON
xyz = to_cartesian(lons, lats, radius=radius, zlevel=zlevel, zscale=zscale)
return self._kdtree.query(
xyz, k=k, eps=epsilon, distance_upper_bound=distance_upper_bound
)
[docs]
def find_cell_neighbours(mesh: pv.PolyData, cid: CellIDLike) -> CellIDs:
"""Find all the cells neighbouring the given `cid` cell/s of the `mesh`.
A cell is deemed to neighbour a `cid` cell if it shares at least one
vertex.
Parameters
----------
mesh : PolyData
The mesh defining the points cells.
cid : int or list of int
The offset of the cell/s in the `mesh` that is/are the focus of the
neighbourhood.
Returns
-------
list of int
The sorted list of neighbouring cell-ids.
Notes
-----
.. versionadded:: 0.1.0
"""
if not isinstance(cid, Iterable):
cid = [cid]
pids = []
for idx in cid:
# NOTE: pyvista 0.38.0: cell_point_ids(idx) -> get_cell(idx).point_ids
pids.extend(mesh.get_cell(idx).point_ids)
# determine the unique points
pids = np.unique(list(pids))
# get the cell-ids of cells containing at least one common point
result = set(mesh.extract_points(pids)["vtkOriginalCellIds"])
# remove the original cell/s
result -= set(cid)
return sorted(result)
[docs]
def find_nearest_cell(
mesh: pv.PolyData,
x: float,
y: float,
z: float | None = 0,
single: bool | None = False,
) -> CellIDLike:
"""Find the cell in the `mesh` that is closest to the point-of-interest (POI).
Assumes that the POI is in the canonical units of the `gvCRS`
associated with the `mesh`, otherwise assumes geographic longitude
and latitude.
If the POI is coincident with a vertex of the `mesh`, then the
``cellID`` of each cell face which shares that vertex is returned.
Parameters
----------
mesh : PolyData
The mesh defining the points, cells and CRS.
x : float
The POI x-coordinate. Defaults to ``longitude`` if no `mesh` CRS is
available.
y : float
The POI y-coordinate. Defaults to ``latitude`` if no `mesh` CRS is
available.
z : float, optional
The POI z-coordinate, if applicable. Defaults to zero.
single : bool, default=False
Enforce expectation of only one nearest ``cellID`` result. Otherwise,
a sorted list of ``cellIDs`` are returned.
Returns
-------
int or list of int
The cellID of the closest mesh cell, or the cellIDs that share the
coincident point-of-interest as a node.
Notes
-----
.. versionadded:: 0.1.0
"""
crs = from_wkt(mesh)
poi = to_cartesian(x, y)[0] if crs in [WGS84, None] else (x, y, z)
cid = mesh.find_closest_cell(poi)
# NOTE: pyvista 0.38.0: cell_point_ids(cid) -> get_cell(cid).point_ids
pids = np.asanyarray(mesh.get_cell(cid).point_ids)
points = mesh.points[pids]
mask = np.all(np.isclose(points, poi), axis=1)
poi_is_vertex = np.any(mask)
if poi_is_vertex:
pid = pids[mask][0]
result = sorted(mesh.extract_points(pid)["vtkOriginalCellIds"])
else:
result = [cid]
if single:
if (count := len(result)) > 1:
emsg = f"Expected to find 1 cell but found {count}, got CellIDs {result}."
raise ValueError(emsg)
(result,) = result
return result