"""Glotaran IO utility module."""
from __future__ import annotations
import html
import inspect
import os
from collections.abc import Mapping
from collections.abc import MutableMapping
from collections.abc import Sequence
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
import xarray as xr
from glotaran.plugin_system.data_io_registration import load_dataset
from glotaran.typing.types import DatasetMappable
if TYPE_CHECKING:
from typing import Iterator
import pandas as pd
from glotaran.project.result import Result
from glotaran.typing.types import StrOrPath
def _load_datasets(dataset_mappable: DatasetMappable, index: int = 1) -> dict[str, xr.Dataset]:
"""Implement functionality for ``load_datasets`` and internal use.
Parameters
----------
dataset_mappable : DatasetMappable
Instance of ``DatasetMappable`` that can be used to create a dataset mapping.
index : int
Index used to create key and ``source_path`` if not present.
, by default 1
Returns
-------
dict[str, xr.Dataset]
Mapping of datasets to initialize :class:`DatasetMapping`.
Raises
------
TypeError
If the type of ``dataset_mappable`` is not explicitly supported.
"""
dataset_mapping = {}
if isinstance(dataset_mappable, (str, Path)):
dataset_mapping[Path(dataset_mappable).stem] = load_dataset(dataset_mappable)
elif isinstance(dataset_mappable, (xr.Dataset, xr.DataArray)):
if isinstance(dataset_mappable, xr.DataArray):
dataset_mappable: xr.Dataset = dataset_mappable.to_dataset( # type:ignore[no-redef]
name="data"
)
if "source_path" not in dataset_mappable.attrs:
dataset_mappable.attrs["source_path"] = f"dataset_{index}.nc"
dataset_mapping[Path(dataset_mappable.source_path).stem] = dataset_mappable
elif isinstance(dataset_mappable, Sequence):
for index, dataset in enumerate(dataset_mappable, start=1):
key, value = next(iter(_load_datasets(dataset, index=index).items()))
dataset_mapping[key] = value
elif isinstance(dataset_mappable, Mapping):
for key, dataset in dataset_mappable.items():
_, value = next(iter(_load_datasets(dataset).items()))
dataset_mapping[key] = value
else:
raise TypeError(
f"Type '{type(dataset_mappable).__name__}' for 'dataset_mappable' of value "
f"'{dataset_mappable}' is not supported."
f"\nSupported types are:\n {DatasetMappable}."
)
return dataset_mapping
[docs]class DatasetMapping(MutableMapping):
"""Wrapper class for a mapping of datasets which can be used for a ``file_loadable_field``."""
def __init__(self, init_map: Mapping[str, xr.Dataset] = None) -> None:
"""Initialize an instance of :class:`DatasetMapping`.
Parameters
----------
init_dict : dict[str, xr.Dataset], optional
Mapping to initially populate the instance., by default None
"""
super().__init__()
self.__data_dict: dict[str, xr.Dataset] = {}
if init_map is not None:
for key, dataset in init_map.items():
self[key] = dataset
[docs] @classmethod
def loader(cls: type[DatasetMapping], dataset_mappable: DatasetMappable) -> DatasetMapping:
"""Loader function utilized by ``file_loadable_field``.
Parameters
----------
dataset_mappable : DatasetMappable
Mapping of datasets to initialize :class:`DatasetMapping`.
Returns
-------
DatasetMapping
Populated instance of :class:`DatasetMapping`.
"""
return cls(_load_datasets(dataset_mappable))
@property
def source_path(self):
"""Map the ``source_path`` attribute of each dataset to a standalone mapping.
Note
----
When the ``source_path`` attribute of the dataset gets updated
(e.g. by calling ``save_dataset`` with the default ``update_source_path=True``)
this value will be updated as well.
Returns
-------
Mapping[str, str]
Mapping of the dataset source paths.
"""
return {key: val.source_path for key, val in self.__data_dict.items()}
def __getitem__(self, key: str) -> xr.Dataset:
"""Implement retrieving an element by its key."""
return self.__data_dict[key]
def __setitem__(self, key: str, value: xr.Dataset) -> None:
"""Implement setting an elements value."""
if "source_path" not in value.attrs:
value.attrs["source_path"] = f"{key}.nc"
self.__data_dict[key] = value
def __iter__(self) -> Iterator[str]:
"""Implement looping over an instance."""
yield from self.__data_dict.keys()
def __delitem__(self, key: str) -> None:
"""Implement deleting an item."""
del self.__data_dict[key]
def __len__(self) -> int:
"""Implement calling ``len`` on an instance."""
return len(self.__data_dict)
def __repr__(self) -> str:
"""Implement calling ``repr`` on an instance."""
items = [f"{dataset_name!r}: <xarray.Dataset>" for dataset_name in self]
return f"{{{', '.join(items)}}}"
def _repr_html_(self) -> str:
"""Return a html representation str.
Special method used by ``ipython`` to render html.
Returns
-------
str
DatasetMapping as html string.
"""
items = [
f"<details><summary>{dataset_name}</summary>{dataset._repr_html_()}</details>\n"
for dataset_name, dataset in self.items()
]
return f"<pre>{html.escape(repr(self))}</pre>\n{''.join(items)}"
[docs]def load_datasets(dataset_mappable: DatasetMappable) -> DatasetMapping:
"""Load multiple datasets into a mapping (convenience function).
This is used for ``file_loadable_field`` of a dataset mapping e.g.
in :class:`Scheme`
Parameters
----------
dataset_mappable : DatasetMappable
Single dataset/file path to a dataset or sequence or mapping of it.
Returns
-------
DatasetMapping
Mapping of dataset with string keys, where datasets hare ensured to have
the ``source_path`` attr.
"""
return DatasetMapping.loader(dataset_mappable)
[docs]def relative_posix_path(source_path: StrOrPath, base_path: StrOrPath | None = None) -> str:
"""Ensure that ``source_path`` is a posix path, relative to ``base_path`` if defined.
On Windows if ``source_path`` and ``base_path`` are on different drives, it will return
the absolute posix path to the file.
Parameters
----------
source_path : StrOrPath
Path which should be converted to a relative posix path.
base_path : StrOrPath, optional
Base path the resulting path string should be relative to., by default None
Returns
-------
str
``source_path`` as posix path relative to ``base_path`` if defined.
"""
source_path = Path(source_path).as_posix()
if base_path is not None and os.path.isabs(source_path):
try:
source_path = os.path.relpath(source_path, Path(base_path).as_posix())
except ValueError:
pass
return Path(source_path).as_posix()
[docs]def safe_dataframe_fillna(df: pd.DataFrame, column_name: str, fill_value: Any) -> None:
"""Fill NaN values with ``fill_value`` if the column exists or do nothing.
Parameters
----------
df : pd.DataFrame
DataFrame from which specific column values will be replaced
column_name : str
Name of column of ``df`` to fill NaNs
fill_value : Any
Value to fill NaNs with
"""
if column_name in df.columns:
df[column_name].fillna(fill_value, inplace=True)
[docs]def safe_dataframe_replace(
df: pd.DataFrame, column_name: str, to_be_replaced_values: Any, replace_value: Any
) -> None:
"""Replace column values with ``replace_value`` if the column exists or do nothing.
If ``to_be_replaced_values`` is not list or tuple format,
convert into list with same ``to_be_replaced_values`` as element.
Parameters
----------
df : pd.DataFrame
DataFrame from which specific column values will be replaced
column_name : str
Name of column of ``df`` to replace values for
to_be_replaced_values : Any
Values to be replaced
replace_value : Any
Value to replace ``to_be_replaced_values`` with
"""
if not isinstance(to_be_replaced_values, (list, tuple)):
to_be_replaced_values = [to_be_replaced_values]
if column_name in df.columns:
df[column_name].replace(to_be_replaced_values, replace_value, inplace=True)
[docs]def get_script_dir(*, nesting: int = 0) -> Path:
"""Get the parent folder a script is executed in.
This is a helper function for cross compatibility with jupyter notebooks.
In notebooks the global ``__file__`` variable isn't set, thus we need different
means to get the folder a script is defined in, which doesn't change with the
current working director the ``python interpreter`` was called from.
Parameters
----------
nesting : int
Number to go up in the call stack to get to the initially calling function.
This is only needed for library code and not for user code.
, by default 0 (direct call)
Returns
-------
Path
Path to the folder the script was resides in.
"""
calling_frame = inspect.stack()[nesting + 1].frame
file_var = calling_frame.f_globals.get("__file__", ".")
file_path = Path(file_var).resolve()
return file_path if file_var == "." else file_path.parent
[docs]def make_path_absolute_if_relative(path: Path) -> Path:
"""Get a path as absolute if relative.
Parameters
----------
path : Path
The path to make absolute.
Returns
-------
Path
Either the original path or the path as absolute relative to the script directory.
"""
if not path.is_absolute():
path = get_script_dir(nesting=2) / path
return path
[docs]def create_clp_guide_dataset(
result: Result | xr.Dataset, clp_label: str, dataset_name: str | None = None
) -> xr.Dataset:
"""Create dataset for clp guidance.
Parameters
----------
result: Result | xr.Dataset
Optimization result object or dataset, created with pyglotaran>=0.6.0.
clp_label : str
Label of the clp to guide.
dataset_name : str | None
Name of dataset to extract the guide from. Defaults to None.
Returns
-------
xr.Dataset
DataArray containing the clp guide, with ``clp_label`` dimension replaced by the
model dimensions first value.
Raises
------
ValueError
If result is an instance of ``Result`` and ``dataset_name`` is ``None`` or not in result.
ValueError
If ``clp_labels`` is not in result.
ValueError
The result dataset was created with pyglotaran<0.6.0.
Examples
--------
Extracting the clp guide from an optimization result object.
.. code-block:: python
from glotaran.io import save_dataset
from glotaran.utils.io import create_clp_guide_dataset
clp_guide = create_clp_guide_dataset(result, "species_1", "dataset_1")
save_dataset(clp_guide, "clp_guide__result_dataset_1__species_1.nc")
Extracting the clp guide from a result dataset loaded from file.
.. code-block:: python
from glotaran.io import load_dataset
from glotaran.io import save_dataset
from glotaran.utils.io import create_clp_guide_dataset
result_dataset = load_dataset("result_dataset_1.nc")
clp_guide = create_clp_guide_dataset(result_dataset, "species_1")
save_dataset(clp_guide, "clp_guide__result_dataset_1__species_1.nc")
"""
if isinstance(result, xr.Dataset):
dataset = result
elif dataset_name is None or dataset_name not in result.data:
raise ValueError(
f"Unknown dataset {dataset_name!r}. "
f"Known datasets are:\n {list(result.data.keys())}"
)
else:
dataset = result.data[dataset_name]
if clp_label not in dataset.clp_label:
raise ValueError(
f"Unknown clp_label {clp_label!r}. "
f"Known clp_labels are:\n {list(dataset.clp_label.values)}"
)
if "model_dimension" not in dataset.attrs:
raise ValueError(
"Result dataset is missing attribute 'model_dimension', "
"which means that it was created with pyglotaran<0.6.0."
"Please recreate the result with the latest version of pyglotaran."
)
clp_values = dataset.clp.sel(clp_label=[clp_label])
value_dimension = next(filter(lambda x: x != dataset.model_dimension, clp_values.dims))
return xr.DataArray(
clp_values.values.T,
coords={
dataset.model_dimension: [dataset.coords[dataset.model_dimension][0].item()],
value_dimension: clp_values.coords[value_dimension].values,
},
).to_dataset(name="data")