Source code for glotaran.utils.io

"""Glotaran IO utility module."""
from __future__ import annotations

import html
import inspect
import os
from collections.abc import Mapping
from collections.abc import MutableMapping
from collections.abc import Sequence
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any

import xarray as xr

from glotaran.plugin_system.data_io_registration import load_dataset
from glotaran.typing.types import DatasetMappable

if TYPE_CHECKING:
    from typing import Iterator

    import pandas as pd

    from glotaran.project.result import Result
    from glotaran.typing.types import StrOrPath


def _load_datasets(dataset_mappable: DatasetMappable, index: int = 1) -> dict[str, xr.Dataset]:
    """Implement functionality for ``load_datasets`` and  internal use.

    Parameters
    ----------
    dataset_mappable : DatasetMappable
        Instance of ``DatasetMappable`` that can be used to create a dataset mapping.
    index : int
        Index used to create key and ``source_path`` if not present.
        , by default 1

    Returns
    -------
    dict[str, xr.Dataset]
        Mapping of datasets to initialize :class:`DatasetMapping`.

    Raises
    ------
    TypeError
        If the type of ``dataset_mappable`` is not explicitly supported.
    """
    dataset_mapping = {}
    if isinstance(dataset_mappable, (str, Path)):
        dataset_mapping[Path(dataset_mappable).stem] = load_dataset(dataset_mappable)
    elif isinstance(dataset_mappable, (xr.Dataset, xr.DataArray)):
        if isinstance(dataset_mappable, xr.DataArray):
            dataset_mappable: xr.Dataset = dataset_mappable.to_dataset(  # type:ignore[no-redef]
                name="data"
            )
        if "source_path" not in dataset_mappable.attrs:
            dataset_mappable.attrs["source_path"] = f"dataset_{index}.nc"
        dataset_mapping[Path(dataset_mappable.source_path).stem] = dataset_mappable
    elif isinstance(dataset_mappable, Sequence):
        for index, dataset in enumerate(dataset_mappable, start=1):
            key, value = next(iter(_load_datasets(dataset, index=index).items()))
            dataset_mapping[key] = value
    elif isinstance(dataset_mappable, Mapping):
        for key, dataset in dataset_mappable.items():
            _, value = next(iter(_load_datasets(dataset).items()))
            dataset_mapping[key] = value
    else:
        raise TypeError(
            f"Type '{type(dataset_mappable).__name__}' for 'dataset_mappable' of value "
            f"'{dataset_mappable}' is not supported."
            f"\nSupported types are:\n {DatasetMappable}."
        )
    return dataset_mapping


[docs]class DatasetMapping(MutableMapping): """Wrapper class for a mapping of datasets which can be used for a ``file_loadable_field``.""" def __init__(self, init_map: Mapping[str, xr.Dataset] = None) -> None: """Initialize an instance of :class:`DatasetMapping`. Parameters ---------- init_dict : dict[str, xr.Dataset], optional Mapping to initially populate the instance., by default None """ super().__init__() self.__data_dict: dict[str, xr.Dataset] = {} if init_map is not None: for key, dataset in init_map.items(): self[key] = dataset
[docs] @classmethod def loader(cls: type[DatasetMapping], dataset_mappable: DatasetMappable) -> DatasetMapping: """Loader function utilized by ``file_loadable_field``. Parameters ---------- dataset_mappable : DatasetMappable Mapping of datasets to initialize :class:`DatasetMapping`. Returns ------- DatasetMapping Populated instance of :class:`DatasetMapping`. """ return cls(_load_datasets(dataset_mappable))
@property def source_path(self): """Map the ``source_path`` attribute of each dataset to a standalone mapping. Note ---- When the ``source_path`` attribute of the dataset gets updated (e.g. by calling ``save_dataset`` with the default ``update_source_path=True``) this value will be updated as well. Returns ------- Mapping[str, str] Mapping of the dataset source paths. """ return {key: val.source_path for key, val in self.__data_dict.items()} def __getitem__(self, key: str) -> xr.Dataset: """Implement retrieving an element by its key.""" return self.__data_dict[key] def __setitem__(self, key: str, value: xr.Dataset) -> None: """Implement setting an elements value.""" if "source_path" not in value.attrs: value.attrs["source_path"] = f"{key}.nc" self.__data_dict[key] = value def __iter__(self) -> Iterator[str]: """Implement looping over an instance.""" yield from self.__data_dict.keys() def __delitem__(self, key: str) -> None: """Implement deleting an item.""" del self.__data_dict[key] def __len__(self) -> int: """Implement calling ``len`` on an instance.""" return len(self.__data_dict) def __repr__(self) -> str: """Implement calling ``repr`` on an instance.""" items = [f"{dataset_name!r}: <xarray.Dataset>" for dataset_name in self] return f"{{{', '.join(items)}}}" def _repr_html_(self) -> str: """Return a html representation str. Special method used by ``ipython`` to render html. Returns ------- str DatasetMapping as html string. """ items = [ f"<details><summary>{dataset_name}</summary>{dataset._repr_html_()}</details>\n" for dataset_name, dataset in self.items() ] return f"<pre>{html.escape(repr(self))}</pre>\n{''.join(items)}"
[docs]def load_datasets(dataset_mappable: DatasetMappable) -> DatasetMapping: """Load multiple datasets into a mapping (convenience function). This is used for ``file_loadable_field`` of a dataset mapping e.g. in :class:`Scheme` Parameters ---------- dataset_mappable : DatasetMappable Single dataset/file path to a dataset or sequence or mapping of it. Returns ------- DatasetMapping Mapping of dataset with string keys, where datasets hare ensured to have the ``source_path`` attr. """ return DatasetMapping.loader(dataset_mappable)
[docs]def relative_posix_path(source_path: StrOrPath, base_path: StrOrPath | None = None) -> str: """Ensure that ``source_path`` is a posix path, relative to ``base_path`` if defined. On Windows if ``source_path`` and ``base_path`` are on different drives, it will return the absolute posix path to the file. Parameters ---------- source_path : StrOrPath Path which should be converted to a relative posix path. base_path : StrOrPath, optional Base path the resulting path string should be relative to., by default None Returns ------- str ``source_path`` as posix path relative to ``base_path`` if defined. """ source_path = Path(source_path).as_posix() if base_path is not None and os.path.isabs(source_path): try: source_path = os.path.relpath(source_path, Path(base_path).as_posix()) except ValueError: pass return Path(source_path).as_posix()
[docs]def safe_dataframe_fillna(df: pd.DataFrame, column_name: str, fill_value: Any) -> None: """Fill NaN values with ``fill_value`` if the column exists or do nothing. Parameters ---------- df : pd.DataFrame DataFrame from which specific column values will be replaced column_name : str Name of column of ``df`` to fill NaNs fill_value : Any Value to fill NaNs with """ if column_name in df.columns: df[column_name].fillna(fill_value, inplace=True)
[docs]def safe_dataframe_replace( df: pd.DataFrame, column_name: str, to_be_replaced_values: Any, replace_value: Any ) -> None: """Replace column values with ``replace_value`` if the column exists or do nothing. If ``to_be_replaced_values`` is not list or tuple format, convert into list with same ``to_be_replaced_values`` as element. Parameters ---------- df : pd.DataFrame DataFrame from which specific column values will be replaced column_name : str Name of column of ``df`` to replace values for to_be_replaced_values : Any Values to be replaced replace_value : Any Value to replace ``to_be_replaced_values`` with """ if not isinstance(to_be_replaced_values, (list, tuple)): to_be_replaced_values = [to_be_replaced_values] if column_name in df.columns: df[column_name].replace(to_be_replaced_values, replace_value, inplace=True)
[docs]def get_script_dir(*, nesting: int = 0) -> Path: """Get the parent folder a script is executed in. This is a helper function for cross compatibility with jupyter notebooks. In notebooks the global ``__file__`` variable isn't set, thus we need different means to get the folder a script is defined in, which doesn't change with the current working director the ``python interpreter`` was called from. Parameters ---------- nesting : int Number to go up in the call stack to get to the initially calling function. This is only needed for library code and not for user code. , by default 0 (direct call) Returns ------- Path Path to the folder the script was resides in. """ calling_frame = inspect.stack()[nesting + 1].frame file_var = calling_frame.f_globals.get("__file__", ".") file_path = Path(file_var).resolve() return file_path if file_var == "." else file_path.parent
[docs]def make_path_absolute_if_relative(path: Path) -> Path: """Get a path as absolute if relative. Parameters ---------- path : Path The path to make absolute. Returns ------- Path Either the original path or the path as absolute relative to the script directory. """ if not path.is_absolute(): path = get_script_dir(nesting=2) / path return path
[docs]def create_clp_guide_dataset( result: Result | xr.Dataset, clp_label: str, dataset_name: str | None = None ) -> xr.Dataset: """Create dataset for clp guidance. Parameters ---------- result: Result | xr.Dataset Optimization result object or dataset, created with pyglotaran>=0.6.0. clp_label : str Label of the clp to guide. dataset_name : str | None Name of dataset to extract the guide from. Defaults to None. Returns ------- xr.Dataset DataArray containing the clp guide, with ``clp_label`` dimension replaced by the model dimensions first value. Raises ------ ValueError If result is an instance of ``Result`` and ``dataset_name`` is ``None`` or not in result. ValueError If ``clp_labels`` is not in result. ValueError The result dataset was created with pyglotaran<0.6.0. Examples -------- Extracting the clp guide from an optimization result object. .. code-block:: python from glotaran.io import save_dataset from glotaran.utils.io import create_clp_guide_dataset clp_guide = create_clp_guide_dataset(result, "species_1", "dataset_1") save_dataset(clp_guide, "clp_guide__result_dataset_1__species_1.nc") Extracting the clp guide from a result dataset loaded from file. .. code-block:: python from glotaran.io import load_dataset from glotaran.io import save_dataset from glotaran.utils.io import create_clp_guide_dataset result_dataset = load_dataset("result_dataset_1.nc") clp_guide = create_clp_guide_dataset(result_dataset, "species_1") save_dataset(clp_guide, "clp_guide__result_dataset_1__species_1.nc") """ if isinstance(result, xr.Dataset): dataset = result elif dataset_name is None or dataset_name not in result.data: raise ValueError( f"Unknown dataset {dataset_name!r}. " f"Known datasets are:\n {list(result.data.keys())}" ) else: dataset = result.data[dataset_name] if clp_label not in dataset.clp_label: raise ValueError( f"Unknown clp_label {clp_label!r}. " f"Known clp_labels are:\n {list(dataset.clp_label.values)}" ) if "model_dimension" not in dataset.attrs: raise ValueError( "Result dataset is missing attribute 'model_dimension', " "which means that it was created with pyglotaran<0.6.0." "Please recreate the result with the latest version of pyglotaran." ) clp_values = dataset.clp.sel(clp_label=[clp_label]) value_dimension = next(filter(lambda x: x != dataset.model_dimension, clp_values.dims)) return xr.DataArray( clp_values.values.T, coords={ dataset.model_dimension: [dataset.coords[dataset.model_dimension][0].item()], value_dimension: clp_values.coords[value_dimension].values, }, ).to_dataset(name="data")