diff --git a/src/tophu/io.py b/src/tophu/io.py index a63cc00..6f35242 100644 --- a/src/tophu/io.py +++ b/src/tophu/io.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import mmap import os import textwrap from dataclasses import dataclass from pathlib import Path -from typing import Optional, Protocol, Tuple, Union, overload, runtime_checkable +from typing import Protocol, overload, runtime_checkable import h5py import numpy as np @@ -39,13 +41,13 @@ class DatasetReader(Protocol): dtype: np.dtype """numpy.dtype : Data-type of the array's elements.""" - shape: Tuple[int, ...] + shape: tuple[int, ...] """tuple of int : Tuple of array dimensions.""" ndim: int """int : Number of array dimensions.""" - def __getitem__(self, key: Tuple[slice, ...], /) -> ArrayLike: + def __getitem__(self, key: tuple[slice, ...], /) -> ArrayLike: """Read a block of data.""" ... @@ -68,18 +70,18 @@ class DatasetWriter(Protocol): dtype: np.dtype """numpy.dtype : Data-type of the array's elements.""" - shape: Tuple[int, ...] + shape: tuple[int, ...] """tuple of int : Tuple of array dimensions.""" ndim: int """int : Number of array dimensions.""" - def __setitem__(self, key: Tuple[slice, ...], value: np.ndarray, /) -> None: + def __setitem__(self, key: tuple[slice, ...], value: np.ndarray, /) -> None: """Write a block of data.""" ... -def _create_or_extend_file(filepath: Union[str, os.PathLike], size: int) -> None: +def _create_or_extend_file(filepath: str | os.PathLike, size: int) -> None: """ Create a file with the specified size or extend an existing file to the same size. @@ -126,13 +128,13 @@ class BinaryFile(DatasetReader, DatasetWriter): filepath: Path """pathlib.Path : The file path.""" - shape: Tuple[int, ...] + shape: tuple[int, ...] dtype: np.dtype def __init__( self, - filepath: Union[str, os.PathLike], - shape: Tuple[int, ...], + filepath: str | os.PathLike, + shape: tuple[int, ...], dtype: DTypeLike, ): """ @@ -175,7 +177,7 @@ def ndim(self) -> int: # type: ignore[override] def __array__(self) -> np.ndarray: return self[:,] - def __getitem__(self, key: Tuple[slice, ...], /) -> np.ndarray: + def __getitem__(self, key: tuple[slice, ...], /) -> np.ndarray: with self.filepath.open("rb") as f: # Memory-map the entire file. with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm: @@ -187,7 +189,7 @@ def __getitem__(self, key: Tuple[slice, ...], /) -> np.ndarray: del arr return data - def __setitem__(self, key: Tuple[slice, ...], value: np.ndarray, /) -> None: + def __setitem__(self, key: tuple[slice, ...], value: np.ndarray, /) -> None: with self.filepath.open("r+b") as f: # Memory-map the entire file. with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE) as mm: @@ -224,16 +226,16 @@ class HDF5Dataset(DatasetReader, DatasetWriter): datapath: str """str : The path to the dataset within the file.""" - chunks: Optional[Tuple[int, ...]] + chunks: tuple[int, ...] | None """ tuple of int : Tuple giving the chunk shape, or None if chunked storage is not used. """ - shape: Tuple[int, ...] + shape: tuple[int, ...] dtype: np.dtype @overload - def __init__(self, filepath: Union[str, os.PathLike], datapath: str): # noqa: D418 + def __init__(self, filepath: str | os.PathLike, datapath: str): # noqa: D418 """ Construct a new `HDF5Dataset` object from an existing dataset. @@ -249,9 +251,9 @@ def __init__(self, filepath: Union[str, os.PathLike], datapath: str): # noqa: D @overload def __init__( self, - filepath: Union[str, os.PathLike], + filepath: str | os.PathLike, datapath: str, - shape: Tuple[int, ...], + shape: tuple[int, ...], dtype: DTypeLike, **kwargs, ): # noqa: D418 @@ -340,19 +342,19 @@ def ndim(self) -> int: # type: ignore[override] def __array__(self) -> np.ndarray: return self[:,] - def __getitem__(self, key: Tuple[slice, ...], /) -> np.ndarray: + def __getitem__(self, key: tuple[slice, ...], /) -> np.ndarray: with h5py.File(self.filepath, "r") as f: dataset = f[self.datapath] return dataset[key] - def __setitem__(self, key: Tuple[slice, ...], value: np.ndarray, /) -> None: + def __setitem__(self, key: tuple[slice, ...], value: np.ndarray, /) -> None: with h5py.File(self.filepath, "r+") as f: dataset = f[self.datapath] dataset[key] = value def _check_contains_single_band( - dataset: Union[rasterio.io.DatasetReader, rasterio.io.DatasetWriter] + dataset: rasterio.io.DatasetReader | rasterio.io.DatasetWriter, ) -> None: """ Validate that the supplied dataset contains a single raster band. @@ -374,7 +376,7 @@ def _check_contains_single_band( def _check_valid_band( - dataset: Union[rasterio.io.DatasetReader, rasterio.io.DatasetWriter], + dataset: rasterio.io.DatasetReader | rasterio.io.DatasetWriter, band: int, ) -> None: """ @@ -438,7 +440,7 @@ class RasterBand(DatasetReader, DatasetWriter): coordinate reference system. """ - shape: Tuple[int, int] + shape: tuple[int, int] dtype: np.dtype # TODO: `chunks` & `nodata` attributes @@ -446,10 +448,10 @@ class RasterBand(DatasetReader, DatasetWriter): @overload def __init__( self, - filepath: Union[str, os.PathLike], + filepath: str | os.PathLike, *, - band: Optional[int] = None, - driver: Optional[str] = None, + band: int | None = None, + driver: str | None = None, ): # noqa: D418 """ Construct a new `RasterBand` object. @@ -470,14 +472,14 @@ def __init__( @overload def __init__( self, - filepath: Union[str, os.PathLike], + filepath: str | os.PathLike, width: int, height: int, dtype: DTypeLike, *, - driver: Optional[str] = None, - crs: Optional[Union[str, dict, rasterio.crs.CRS]] = None, - transform: Optional[rasterio.transform.Affine] = None, + driver: str | None = None, + crs: str | dict | rasterio.crs.CRS | None = None, + transform: rasterio.transform.Affine | None = None, ): # noqa: D418 """ Construct a new `RasterBand` object. @@ -595,7 +597,7 @@ def ndim(self) -> int: # type: ignore[override] def __array__(self) -> np.ndarray: return self[:, :] - def __getitem__(self, key: Tuple[slice, ...], /) -> np.ndarray: + def __getitem__(self, key: tuple[slice, ...], /) -> np.ndarray: with rasterio.io.DatasetReader( self.filepath, driver=self.driver, @@ -607,7 +609,7 @@ def __getitem__(self, key: Tuple[slice, ...], /) -> np.ndarray: ) return dataset.read(self.band, window=window) - def __setitem__(self, key: Tuple[slice, ...], value: np.ndarray, /) -> None: + def __setitem__(self, key: tuple[slice, ...], value: np.ndarray, /) -> None: with rasterio.io.DatasetWriter( self.filepath, "r+", diff --git a/src/tophu/multilook.py b/src/tophu/multilook.py index d812caa..0adc2be 100644 --- a/src/tophu/multilook.py +++ b/src/tophu/multilook.py @@ -1,5 +1,8 @@ +from __future__ import annotations + import warnings -from typing import Iterable, SupportsInt, Tuple, Union, cast +from collections.abc import Iterable +from typing import Tuple, cast import dask.array as da import numpy as np @@ -11,10 +14,7 @@ ] -IntOrInts = Union[SupportsInt, Iterable[SupportsInt]] - - -def multilook(arr: da.Array, nlooks: IntOrInts) -> da.Array: +def multilook(arr: da.Array, nlooks: int | Iterable[int]) -> da.Array: """ Multilook an array by simple averaging. diff --git a/src/tophu/multiscale.py b/src/tophu/multiscale.py index 38627c7..a876c8c 100644 --- a/src/tophu/multiscale.py +++ b/src/tophu/multiscale.py @@ -1,5 +1,6 @@ +from __future__ import annotations + import warnings -from typing import Optional, Tuple import dask.array as da import numpy as np @@ -20,7 +21,7 @@ def lowpass_filter_and_multilook( arr: da.Array, - downsample_factor: Tuple[int, int], + downsample_factor: tuple[int, int], *, shape_factor: float = 1.5, overhang: float = 0.5, @@ -204,14 +205,14 @@ def coarse_unwrap( coherence: da.Array, nlooks: float, unwrap: UnwrapCallback, - downsample_factor: Tuple[int, int], + downsample_factor: tuple[int, int], *, do_lowpass_filter: bool = True, shape_factor: float = 1.5, overhang: float = 0.5, ripple: float = 0.01, attenuation: float = 40.0, -) -> Tuple[da.Array, da.Array]: +) -> tuple[da.Array, da.Array]: """ Estimate coarse unwrapped phase by unwrapping a downsampled interferogram. @@ -429,14 +430,14 @@ def _multiscale_unwrap( coherence: da.Array, nlooks: float, unwrap: UnwrapCallback, - downsample_factor: Tuple[int, int], + downsample_factor: tuple[int, int], *, do_lowpass_filter: bool = True, shape_factor: float = 1.5, overhang: float = 0.5, ripple: float = 0.01, attenuation: float = 40.0, -) -> Tuple[da.Array, da.Array]: +) -> tuple[da.Array, da.Array]: """ Perform 2-D phase unwrapping using a multi-resolution approach. @@ -557,10 +558,10 @@ def _multiscale_unwrap( def get_tile_dims( - shape: Tuple[int, ...], - ntiles: Tuple[int, ...], - snap_to: Optional[Tuple[int, ...]] = None, -) -> Tuple[int, ...]: + shape: tuple[int, ...], + ntiles: tuple[int, ...], + snap_to: tuple[int, ...] | None = None, +) -> tuple[int, ...]: """ Get tile dimensions of an array partitioned into tiles. @@ -622,8 +623,8 @@ def multiscale_unwrap( coherence: DatasetReader, nlooks: float, unwrap: UnwrapCallback, - downsample_factor: Tuple[int, int], - ntiles: Tuple[int, int], + downsample_factor: tuple[int, int], + ntiles: tuple[int, int], *, do_lowpass_filter: bool = True, shape_factor: float = 1.5, diff --git a/src/tophu/tile.py b/src/tophu/tile.py index 4de4dee..881a5d4 100644 --- a/src/tophu/tile.py +++ b/src/tophu/tile.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import itertools -from typing import Iterable, Iterator, Optional, SupportsInt, Tuple, Union +from collections.abc import Iterable, Iterator import numpy as np @@ -10,10 +12,6 @@ ] -IntOrInts = Union[SupportsInt, Iterable[SupportsInt]] -NDSlice = Tuple[slice, ...] - - class TiledPartition: """ A partitioning of an N-dimensional array into tiles. @@ -31,10 +29,10 @@ class TiledPartition: def __init__( self, - shape: IntOrInts, - ntiles: IntOrInts, - overlap: Optional[IntOrInts] = None, - snap_to: Optional[IntOrInts] = None, + shape: int | Iterable[int], + ntiles: int | Iterable[int], + overlap: int | Iterable[int] | None = None, + snap_to: int | Iterable[int] | None = None, ): """ Construct a new `TiledPartition` object. @@ -111,12 +109,12 @@ def __init__( self._tiledims = tiledims @property - def ntiles(self) -> Tuple[int, ...]: + def ntiles(self) -> tuple[int, ...]: """tuple of int : Number of tiles along each array axis.""" return self._ntiles @property - def tiledims(self) -> Tuple[int, ...]: + def tiledims(self) -> tuple[int, ...]: """ tuple of int : Shape of a typical tile. The last tile along each axis may be smaller. @@ -124,7 +122,7 @@ def tiledims(self) -> Tuple[int, ...]: return tuple(self._tiledims) @property - def strides(self) -> Tuple[int, ...]: + def strides(self) -> tuple[int, ...]: """ tuple of int : Step size between the start of adjacent tiles along each axis. @@ -132,11 +130,11 @@ def strides(self) -> Tuple[int, ...]: return tuple(self._strides) @property - def overlap(self) -> Tuple[int, ...]: + def overlap(self) -> tuple[int, ...]: """tuple of int : Overlap between adjacent tiles along each axis.""" return tuple(self._tiledims - self._strides) - def __getitem__(self, index: IntOrInts) -> NDSlice: + def __getitem__(self, index: int | Iterable[int]) -> tuple[slice, ...]: """ Access a tile. @@ -171,7 +169,7 @@ def wrap_index(i: int, n: int) -> int: return tuple([slice(a, b) for (a, b) in zip(start, stop)]) - def __iter__(self) -> Iterator[NDSlice]: + def __iter__(self) -> Iterator[tuple[slice, ...]]: """ Iterate over tiles in arbitrary order. diff --git a/src/tophu/unwrap.py b/src/tophu/unwrap.py index 4b39b23..37cf5bf 100644 --- a/src/tophu/unwrap.py +++ b/src/tophu/unwrap.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import dataclasses import warnings from os import PathLike from pathlib import Path from tempfile import TemporaryDirectory -from typing import Literal, Optional, Protocol, Tuple, runtime_checkable +from typing import Literal, Protocol, runtime_checkable import isce3 import numpy as np @@ -34,7 +36,7 @@ def __call__( igram: NDArray[np.complexfloating], corrcoef: NDArray[np.floating], nlooks: float, - ) -> Tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: + ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: """ Perform two-dimensional phase unwrapping. @@ -69,7 +71,7 @@ class SnaphuUnwrap(UnwrapCallback): cost: str """str : Statistical cost mode.""" - cost_params: Optional[isce3.unwrap.snaphu.CostParams] + cost_params: isce3.unwrap.snaphu.CostParams | None """ isce3.unwrap.snaphu.CostParams or None : Configuration parameters for the specified cost mode. @@ -81,7 +83,7 @@ class SnaphuUnwrap(UnwrapCallback): def __init__( self, cost: Literal["topo", "defo", "smooth", "p-norm"] = "smooth", - cost_params: Optional[isce3.unwrap.snaphu.CostParams] = None, + cost_params: isce3.unwrap.snaphu.CostParams | None = None, init_method: Literal["mst", "mcf"] = "mcf", ): """ @@ -110,7 +112,7 @@ def __call__( igram: NDArray[np.complexfloating], corrcoef: NDArray[np.floating], nlooks: float, - ) -> Tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: + ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: # Convert input arrays to GDAL rasters with the expected datatypes. igram_data = np.asanyarray(igram, dtype=np.complex64) igram = isce3.io.gdal.Raster(igram_data) @@ -394,7 +396,7 @@ def __call__( igram: NDArray[np.complexfloating], corrcoef: NDArray[np.floating], nlooks: float, - ) -> Tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: + ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: # Configure ICU to unwrap the interferogram as a single tile (no bootstrapping). icu = isce3.unwrap.ICU( buffer_lines=len(igram), @@ -488,7 +490,7 @@ def __call__( igram: NDArray[np.complexfloating], corrcoef: NDArray[np.floating], nlooks: float, - ) -> Tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: + ) -> tuple[NDArray[np.floating], NDArray[np.unsignedinteger]]: # Configure ICU to unwrap the interferogram as a single tile (no bootstrapping). phass = isce3.unwrap.Phass( correlation_threshold=self.coherence_thresh, diff --git a/src/tophu/upsample.py b/src/tophu/upsample.py index 0547825..8e68e6e 100644 --- a/src/tophu/upsample.py +++ b/src/tophu/upsample.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import itertools -from typing import Iterable, SupportsInt, Tuple, Union +from collections.abc import Iterable import dask.array as da import numpy as np @@ -14,10 +16,7 @@ ] -IntOrInts = Union[SupportsInt, Iterable[SupportsInt]] - - -def as_tuple_of_int(ints: IntOrInts) -> Tuple[int, ...]: +def as_tuple_of_int(ints: int | Iterable[int]) -> tuple[int, ...]: """ Convert the input to a tuple of ints. @@ -38,8 +37,8 @@ def as_tuple_of_int(ints: IntOrInts) -> Tuple[int, ...]: def validate_upsample_output_shape( - in_shape: Tuple[int, ...], - out_shape: Tuple[int, ...], + in_shape: tuple[int, ...], + out_shape: tuple[int, ...], ) -> None: """ Check that `out_shape` is a valid upsampled output shape. @@ -70,7 +69,7 @@ def validate_upsample_output_shape( raise ValueError("output shape must be >= input data shape") -def upsample_fft(data: ArrayLike, out_shape: IntOrInts) -> NDArray: +def upsample_fft(data: ArrayLike, out_shape: int | Iterable[int]) -> NDArray: """ Upsample using a Fast Fourier Transform (FFT)-based interpolation method. @@ -183,7 +182,7 @@ def negfreqbins(n: int) -> slice: def pad_to_shape( arr: da.Array, - out_shape: Tuple[int, ...], + out_shape: tuple[int, ...], mode: str = "constant", ) -> da.Array: """ @@ -223,7 +222,7 @@ def pad_to_shape( return da.pad(arr, list(zip(zeros, padding)), mode=mode) -def upsample_nearest(data: da.Array, out_shape: IntOrInts) -> da.Array: +def upsample_nearest(data: da.Array, out_shape: int | Iterable[int]) -> da.Array: """ Upsample an array using nearest neighbor interpolation. diff --git a/src/tophu/util.py b/src/tophu/util.py index 2f84ba8..ed4582e 100644 --- a/src/tophu/util.py +++ b/src/tophu/util.py @@ -1,4 +1,6 @@ -from typing import Iterable, SupportsInt, Tuple, Union +from __future__ import annotations + +from collections.abc import Iterable import dask.array as da import numpy as np @@ -15,10 +17,7 @@ ] -IntOrInts = Union[SupportsInt, Iterable[SupportsInt]] - - -def as_tuple_of_int(ints: IntOrInts) -> Tuple[int, ...]: +def as_tuple_of_int(ints: int | Iterable[int]) -> tuple[int, ...]: """ Convert the input to a tuple of ints. @@ -85,7 +84,7 @@ def iseven(n: int) -> bool: return n % 2 == 0 -def map_blocks(func, *args, **kwargs) -> Union[da.Array, Tuple[da.Array, ...]]: +def map_blocks(func, *args, **kwargs) -> da.Array | tuple[da.Array, ...]: """ Map a function across all blocks of a dask array. diff --git a/test/simulate.py b/test/simulate.py index 3033954..ce5bc64 100644 --- a/test/simulate.py +++ b/test/simulate.py @@ -1,4 +1,4 @@ -from typing import Optional +from __future__ import annotations import numpy as np from numpy.typing import ArrayLike, NDArray @@ -13,7 +13,7 @@ def simulate_phase_noise( corrcoef: ArrayLike, nlooks: float, *, - seed: Optional[int] = None, + seed: int | None = None, ) -> NDArray: r"""Simulate multilooked interferogram phase noise samples. @@ -60,7 +60,7 @@ def simulate_terrain( *, scale: float = 1000.0, smoothness: float = 0.8, - seed: Optional[int] = None, + seed: int | None = None, ) -> NDArray: r"""Simulate topography using the Diamond-Square algorithm. diff --git a/test/test_io.py b/test/test_io.py index 0c0fb77..4d74f5c 100644 --- a/test/test_io.py +++ b/test/test_io.py @@ -1,8 +1,9 @@ +from __future__ import annotations + import os import tempfile -from collections.abc import Callable +from collections.abc import Callable, Iterator from pathlib import Path -from typing import Iterator, Tuple, Union import h5py import numpy as np @@ -13,12 +14,12 @@ import tophu -def filesize(filepath: Union[str, os.PathLike]) -> int: +def filesize(filepath: str | os.PathLike) -> int: """Get file size in bytes.""" return Path(filepath).stat().st_size -def valid_gdal_dtypes() -> Tuple[np.dtype, ...]: +def valid_gdal_dtypes() -> tuple[np.dtype, ...]: return ( np.uint8, np.int8, @@ -36,9 +37,9 @@ def valid_gdal_dtypes() -> Tuple[np.dtype, ...]: def create_raster_dataset( - filepath: Union[str, os.PathLike], + filepath: str | os.PathLike, driver: str, - shape: Tuple[int, int], + shape: tuple[int, int], count: int, dtype: DTypeLike, ) -> None: diff --git a/test/test_multiscale.py b/test/test_multiscale.py index 9a9b798..1c2d3cc 100644 --- a/test/test_multiscale.py +++ b/test/test_multiscale.py @@ -1,4 +1,4 @@ -from typing import List, Tuple +from __future__ import annotations import numpy as np import pytest @@ -10,7 +10,7 @@ from .simulate import simulate_phase_noise, simulate_terrain -UNWRAP_FUNCS: List[UnwrapCallback] = [ +UNWRAP_FUNCS: list[UnwrapCallback] = [ # tophu.ICUUnwrap(), tophu.PhassUnwrap(), tophu.SnaphuUnwrap(), @@ -20,7 +20,7 @@ def dummy_inputs_and_outputs( length: int = 128, width: int = 128, -) -> Tuple[ +) -> tuple[ NDArray[np.floating], NDArray[np.unsignedinteger], NDArray[np.complexfloating], @@ -157,7 +157,7 @@ def test_multiscale_unwrap_phase( @pytest.mark.parametrize("unwrap", UNWRAP_FUNCS) def test_multiscale_unwrap_phase_conncomps( self, - downsample_factor: Tuple[int, int], + downsample_factor: tuple[int, int], unwrap: UnwrapCallback, ): length, width = 512, 512 @@ -255,7 +255,7 @@ def test_multiscale_unwrap_phase_conncomps( assert jaccard_similarity(valid_mask, expected_mask) > 0.975 @pytest.mark.parametrize("downsample_factor", [(1, 1), (1, 4), (5, 1)]) - def test_multiscale_unwrap_single_look(self, downsample_factor: Tuple[int, int]): + def test_multiscale_unwrap_single_look(self, downsample_factor: tuple[int, int]): length, width = map(lambda d: 256 * d, downsample_factor) # Radar sensor/geometry parameters. diff --git a/test/test_util.py b/test/test_util.py index 112417e..8ca543d 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -1,7 +1,8 @@ +from __future__ import annotations + import multiprocessing import threading from multiprocessing.pool import ThreadPool -from typing import Tuple import dask import dask.array as da @@ -114,8 +115,8 @@ def test_iseven(): def random_integer_array( low: int = 0, high: int = 100, - shape: Tuple[int, ...] = (1024, 1024), - chunks: Tuple[int, ...] = (128, 128), + shape: tuple[int, ...] = (1024, 1024), + chunks: tuple[int, ...] = (128, 128), ) -> da.Array: return da.random.randint(low, high, size=shape, chunks=chunks)