Source code for glotaran.analysis.problem

import collections
import itertools
from typing import Any
from typing import Callable
from typing import Deque
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Tuple
from typing import Union

import numpy as np
import xarray as xr

from glotaran.analysis.nnls import residual_nnls
from glotaran.analysis.variable_projection import residual_variable_projection
from glotaran.model import DatasetDescriptor
from glotaran.model import Model
from glotaran.parameter import ParameterGroup

from .scheme import Scheme


[docs]class ParameterError(ValueError): def __init__(self): super().__init__("Parameter not initialized")
[docs]class ProblemDescriptor(NamedTuple): dataset: DatasetDescriptor data: xr.DataArray model_axis: np.ndarray global_axis: np.ndarray weight: xr.DataArray
[docs]class GroupedProblemDescriptor(NamedTuple): label: str index: Any axis: np.ndarray
[docs]class GroupedProblem(NamedTuple): data: np.ndarray weight: np.ndarray has_scaling: bool """Indicates if at least one dataset in the group needs scaling.""" group: str """The concatenated labels of the involved datasets.""" data_sizes: List[int] """Holds the sizes of the concatenated datasets.""" descriptor: GroupedProblemDescriptor
UngroupedBag = Dict[str, ProblemDescriptor] GroupedBag = Deque[GroupedProblem]
[docs]class LabelAndMatrix(NamedTuple): clp_label: List[str] matrix: np.ndarray
[docs]class Problem: """A Problem class """ def __init__(self, scheme: Scheme): """Initializes the Problem class from a scheme (:class:`glotaran.analysis.scheme.Scheme`) Args: scheme (Scheme): An instance of :class:`glotaran.analysis.scheme.Scheme` which defines your model, parameters, and data """ self._scheme = scheme self._model = scheme.model self._global_dimension = scheme.model.global_dimension self._model_dimension = scheme.model.model_dimension self._data = scheme.data self._index_dependent = scheme.model.index_dependent() self._grouped = scheme.model.grouped() self._bag = None self._groups = None self._residual_function = ( residual_nnls if scheme.non_negative_least_squares else residual_variable_projection ) self._parameters = None self._filled_dataset_descriptors = None self.parameters = scheme.parameters.copy() self._parameter_history = [] # all of the above are always not None self._clp_labels = None self._matrices = None self._reduced_clp_labels = None self._reduced_matrices = None self._reduced_clps = None self._clps = None self._weighted_residuals = None self._residuals = None self._additional_penalty = None self._full_axis = None self._full_penalty = None @property def scheme(self) -> Scheme: """Property providing access to the used scheme Returns: Scheme: An instance of :class:`glotaran.analysis.scheme.Scheme` Provides access to data, model, parameters and optimization arguments. """ return self._scheme @property def model(self) -> Model: """Property providing access to the used model The model is a subclass of :class:`glotaran.model.Model` decorated with the `@model` decorator :class:`glotaran.model.model_decorator.model` For an example implementation see e.g. :class:`glotaran.builtin.models.kinetic_spectrum` Returns: Model: A subclass of :class:`glotaran.model.Model` The model must be decorated with the `@model` decorator :class:`glotaran.model.model_decorator.model` """ return self._model @property def data(self) -> Dict[str, xr.Dataset]: return self._data @property def parameters(self) -> ParameterGroup: return self._parameters @parameters.setter def parameters(self, parameters: ParameterGroup): self._parameters = parameters self.reset() @property def parameter_history(self) -> List[ParameterGroup]: return self._parameter_history @property def grouped(self) -> bool: return self._grouped @property def index_dependent(self) -> bool: return self._index_dependent @property def filled_dataset_descriptors(self) -> Dict[str, DatasetDescriptor]: return self._filled_dataset_descriptors @property def bag(self) -> Union[UngroupedBag, GroupedBag]: if not self._bag: self._init_bag() return self._bag @property def groups(self) -> Dict[str, List[str]]: if not self._groups and self._grouped: self._init_bag() return self._groups @property def clp_labels( self, ) -> Dict[str, Union[List[str], List[List[str]]]]: if self._clp_labels is None: self.calculate_matrices() return self._clp_labels @property def matrices( self, ) -> Dict[str, Union[np.ndarray, List[np.ndarray]]]: if self._matrices is None: self.calculate_matrices() return self._matrices @property def reduced_clp_labels( self, ) -> Dict[str, Union[List[str], List[List[str]]]]: if self._reduced_clp_labels is None: self.calculate_matrices() return self._reduced_clp_labels @property def reduced_matrices( self, ) -> Union[Dict[str, np.ndarray], Dict[str, List[np.ndarray]], List[np.ndarray],]: if self._reduced_matrices is None: self.calculate_matrices() return self._reduced_matrices @property def reduced_clps( self, ) -> Dict[str, List[np.ndarray]]: if self._reduced_clps is None: self.calculate_residual() return self._reduced_clps @property def clps( self, ) -> Dict[str, List[np.ndarray]]: if self._clps is None: self.calculate_residual() return self._clps @property def weighted_residuals( self, ) -> Dict[str, List[np.ndarray]]: if self._weighted_residuals is None: self.calculate_residual() return self._weighted_residuals @property def residuals( self, ) -> Dict[str, List[np.ndarray]]: if self._residuals is None: self.calculate_residual() return self._residuals @property def additional_penalty( self, ) -> Dict[str, List[float]]: if self._additional_penalty is None: self.calculate_additional_penalty() return self._additional_penalty @property def full_penalty(self) -> np.ndarray: if self._full_penalty is None: residuals = self.weighted_residuals additional_penalty = self.additional_penalty if not self.grouped: residuals = [np.concatenate(residuals[label]) for label in residuals.keys()] self._full_penalty = ( np.concatenate((np.concatenate(residuals), additional_penalty)) if additional_penalty is not None else np.concatenate(residuals) ) return self._full_penalty
[docs] def save_parameters_for_history(self): self._parameter_history.append(self._parameters)
[docs] def reset(self): """Resets all results and `DatasetDescriptors`. Use after updating parameters.""" self._filled_dataset_descriptors = { label: descriptor.fill(self._model, self._parameters) for label, descriptor in self._model.dataset.items() } self._reset_results()
def _reset_results(self): self._clp_labels = None self._matrices = None self._reduced_clp_labels = None self._reduced_matrices = None self._reduced_clps = None self._clps = None self._weighted_residuals = None self._residuals = None self._additional_penalty = None self._full_penalty = None def _init_bag(self): if self._grouped: self._init_grouped_bag() else: self._init_ungrouped_bag() def _init_ungrouped_bag(self): self._bag = {} for label in self._scheme.model.dataset: dataset = self._scheme.data[label] data = dataset.data weight = dataset.weight if "weight" in dataset else None if weight is not None: data = data * weight dataset["weighted_data"] = data self._bag[label] = ProblemDescriptor( self._scheme.model.dataset[label], data, dataset.coords[self._model_dimension].values, dataset.coords[self._global_dimension].values, weight, ) def _init_grouped_bag(self): datasets = None for label in self._model.dataset: dataset = self._data[label] if "weight" in dataset: weight = dataset.weight data = dataset.data * weight dataset["weighted_data"] = data else: weight = xr.DataArray(np.ones_like(dataset.data), coords=dataset.data.coords) data = dataset.data global_axis = dataset.coords[self._global_dimension].values model_axis = dataset.coords[self._model_dimension].values has_scaling = self._model.dataset[label].scale is not None if self._bag is None: self._bag = collections.deque( GroupedProblem( data=data.isel({self._global_dimension: i}).values, weight=weight.isel({self._global_dimension: i}).values, has_scaling=has_scaling, group=label, data_sizes=[data.isel({self._global_dimension: i}).values.size], descriptor=[GroupedProblemDescriptor(label, value, model_axis)], ) for i, value in enumerate(global_axis) ) datasets = collections.deque([label] for _, _ in enumerate(global_axis)) self._full_axis = collections.deque(global_axis) else: self._append_to_grouped_bag( label, datasets, global_axis, model_axis, data, weight, has_scaling ) self._full_axis = np.asarray(self._full_axis) self._groups = {"".join(d): d for d in datasets} def _append_to_grouped_bag( self, label: str, datasets: Deque[str], global_axis: np.ndarray, model_axis: np.ndarray, data: xr.DataArray, weight: xr.DataArray, has_scaling: bool, ): i1, i2 = _find_overlap(self._full_axis, global_axis, atol=self._scheme.group_tolerance) for i, j in enumerate(i1): datasets[j].append(label) data_stripe = data.isel({self._global_dimension: i2[i]}).values self._bag[j] = GroupedProblem( data=np.concatenate( [ self._bag[j].data, data_stripe, ] ), weight=np.concatenate( [ self._bag[j].weight, weight.isel({self._global_dimension: i2[i]}).values, ] ), has_scaling=has_scaling or self._bag[j].has_scaling, group=self._bag[j].group + label, data_sizes=self._bag[j].data_sizes + [data_stripe.size], descriptor=self._bag[j].descriptor + [GroupedProblemDescriptor(label, global_axis[i2[i]], model_axis)], ) # Add non-overlaping regions begin_overlap = i2[0] if len(i2) != 0 else 0 end_overlap = i2[-1] + 1 if len(i2) != 0 else 0 for i in itertools.chain(range(begin_overlap), range(end_overlap, len(global_axis))): data_stripe = data.isel({self._global_dimension: i}).values problem = GroupedProblem( data=data_stripe, weight=weight.isel({self._global_dimension: i}).values, has_scaling=has_scaling, group=label, data_sizes=[data_stripe.size], descriptor=[GroupedProblemDescriptor(label, global_axis[i], model_axis)], ) if i < end_overlap: datasets.appendleft([label]) self._full_axis.appendleft(global_axis[i]) self._bag.appendleft(problem) else: datasets.append([label]) self._full_axis.append(global_axis[i]) self._bag.append(problem)
[docs] def calculate_matrices(self): if self._index_dependent: if self._grouped: self.calculate_index_dependent_grouped_matrices() else: self.calculate_index_dependent_ungrouped_matrices() else: if self._grouped: self.calculate_index_independent_grouped_matrices() else: self.calculate_index_independent_ungrouped_matrices()
[docs] def calculate_index_dependent_grouped_matrices( self, ) -> Tuple[ Dict[str, List[List[str]]], Dict[str, List[np.ndarray]], List[List[str]], List[np.ndarray], ]: if self._parameters is None: raise ParameterError def calculate_group( group: GroupedProblem, descriptors: Dict[str, DatasetDescriptor] ) -> Tuple[List[Tuple[LabelAndMatrix, str]], float]: result = [ ( _calculate_matrix( self._model.matrix, descriptors[problem.label], problem.axis, {}, index=problem.index, ), problem.label, ) for problem in group.descriptor ] return result, group.descriptor[0].index def reduce_and_combine_matrices( results: Tuple[List[Tuple[LabelAndMatrix, str]], float], ) -> LabelAndMatrix: index_results, index = results constraint_labels_and_matrices = list( map( lambda result: _reduce_matrix( self._model, result[1], self.parameters, result[0], index ), index_results, ) ) clp, matrix = _combine_matrices(constraint_labels_and_matrices) return LabelAndMatrix(clp, matrix) results = list( map(lambda group: calculate_group(group, self._filled_dataset_descriptors), self._bag) ) clp_labels = list(map(lambda result: [r[0].clp_label for r in result[0]], results)) matrices = list(map(lambda result: [r[0].matrix for r in result[0]], results)) self._clp_labels = {} self._matrices = {} for i, grouped_problem in enumerate(self._bag): for j, descriptor in enumerate(grouped_problem.descriptor): if descriptor.label not in self._clp_labels: self._clp_labels[descriptor.label] = [] self._matrices[descriptor.label] = [] self._clp_labels[descriptor.label].append(clp_labels[i][j]) self._matrices[descriptor.label].append(matrices[i][j]) reduced_results = list(map(reduce_and_combine_matrices, results)) self._reduced_clp_labels = list(map(lambda result: result.clp_label, reduced_results)) self._reduced_matrices = list(map(lambda result: result.matrix, reduced_results)) return self._clp_labels, self._matrices, self._reduced_clp_labels, self._reduced_matrices
[docs] def calculate_index_dependent_ungrouped_matrices( self, ) -> Tuple[ Dict[str, List[List[str]]], Dict[str, List[np.ndarray]], Dict[str, List[str]], Dict[str, List[np.ndarray]], ]: if self._parameters is None: raise ParameterError self._clp_labels = {} self._matrices = {} self._reduced_clp_labels = {} self._reduced_matrices = {} for label, problem in self.bag.items(): self._clp_labels[label] = [] self._matrices[label] = [] self._reduced_clp_labels[label] = [] self._reduced_matrices[label] = [] descriptor = self._filled_dataset_descriptors[label] for index in problem.global_axis: result = _calculate_matrix( self._model.matrix, descriptor, problem.model_axis, {}, index=index, ) self._clp_labels[label].append(result.clp_label) self._matrices[label].append(result.matrix) reduced_labels_and_matrix = _reduce_matrix( self._model, label, self._parameters, result, index ) self._reduced_clp_labels[label].append(reduced_labels_and_matrix.clp_label) self._reduced_matrices[label].append(reduced_labels_and_matrix.matrix) return self._clp_labels, self._matrices, self._reduced_clp_labels, self._reduced_matrices
[docs] def calculate_index_independent_grouped_matrices( self, ) -> Tuple[Dict[str, List[str]], Dict[str, np.ndarray], Dict[str, LabelAndMatrix],]: # We just need to create groups from the ungrouped matrices self.calculate_index_independent_ungrouped_matrices() for group_label, group in self._groups.items(): if group_label not in self._matrices: reduced_labels_and_matrix = _combine_matrices( [ LabelAndMatrix( self._reduced_clp_labels[label], self._reduced_matrices[label] ) for label in group ] ) self._reduced_clp_labels[group_label] = reduced_labels_and_matrix.clp_label self._reduced_matrices[group_label] = reduced_labels_and_matrix.matrix return self._clp_labels, self._matrices, self._reduced_clp_labels, self._reduced_matrices
[docs] def calculate_index_independent_ungrouped_matrices( self, ) -> Tuple[ Dict[str, List[str]], Dict[str, np.ndarray], Dict[str, List[str]], Dict[str, np.ndarray], ]: if self._parameters is None: raise ParameterError self._clp_labels = {} self._matrices = {} self._reduced_clp_labels = {} self._reduced_matrices = {} for label, descriptor in self._filled_dataset_descriptors.items(): axis = self._data[label].coords[self._model_dimension].values result = _calculate_matrix( self._model.matrix, descriptor, axis, {}, ) self._clp_labels[label] = result.clp_label self._matrices[label] = result.matrix reduced_result = _reduce_matrix(self._model, label, self._parameters, result, None) self._reduced_clp_labels[label] = reduced_result.clp_label self._reduced_matrices[label] = reduced_result.matrix return self._clp_labels, self._matrices, self._reduced_clp_labels, self._reduced_matrices
[docs] def calculate_residual(self): if self._index_dependent: if self._grouped: self.calculate_index_dependent_grouped_residual() else: self.calculate_index_dependent_ungrouped_residual() else: if self._grouped: self.calculate_index_independent_grouped_residual() else: self.calculate_index_independent_ungrouped_residual()
[docs] def calculate_index_dependent_grouped_residual( self, ) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray], List[np.ndarray],]: def residual_function( problem: GroupedProblem, matrix: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: matrix = matrix.copy() for i in range(matrix.shape[1]): matrix[:, i] *= problem.weight data = problem.data if problem.has_scaling: for i, descriptor in enumerate(problem.descriptor): label = descriptor.label if self.filled_dataset_descriptors[label] is not None: start = sum(problem.data_sizes[0:i]) end = start + problem.data_sizes[i] matrix[start:end, :] *= self.filled_dataset_descriptors[label].scale clp, residual = self._residual_function(matrix, data) return clp, residual, residual / problem.weight results = list(map(residual_function, self.bag, self.reduced_matrices)) self._weighted_residuals = list(map(lambda result: result[1], results)) self._residuals = list(map(lambda result: result[2], results)) reduced_clps = list(map(lambda result: result[0], results)) self._ungroup_clps(reduced_clps) return self._reduced_clps, self._clps, self._weighted_residuals, self._residuals
[docs] def calculate_index_dependent_ungrouped_residual( self, ) -> Tuple[ Dict[str, List[np.ndarray]], Dict[str, List[np.ndarray]], Dict[str, List[np.ndarray]], Dict[str, List[np.ndarray]], ]: self._reduced_clps = {} self._weighted_residuals = {} self._residuals = {} for label, problem in self.bag.items(): self._reduced_clps[label] = [] self._residuals[label] = [] self._weighted_residuals[label] = [] data = problem.data for i in range(len(problem.global_axis)): matrix_at_index = self.reduced_matrices[label][i] if problem.dataset.scale is not None: matrix_at_index *= self.filled_dataset_descriptors[label].scale if problem.weight is not None: matrix_at_index = matrix_at_index.copy() for j in range(matrix_at_index.shape[1]): matrix_at_index[:, j] *= problem.weight.isel({self._global_dimension: i}) clp, residual = self._residual_function( matrix_at_index, data.isel({self._global_dimension: i}).values ) self._reduced_clps[label].append(clp) self._weighted_residuals[label].append(residual) if problem.weight is not None: self._residuals[label].append( residual / problem.weight.isel({self._global_dimension: i}) ) else: self._residuals[label].append(residual) self._clps = ( self.model.retrieve_clp_function( self.parameters, self.clp_labels, self.reduced_clp_labels, self.reduced_clps, self.data, ) if callable(self.model.retrieve_clp_function) else self.reduced_clps ) return self._reduced_clps, self._clps, self._weighted_residuals, self._residuals
[docs] def calculate_index_independent_grouped_residual( self, ) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray], List[np.ndarray],]: def residual_function(problem: GroupedProblem): matrix = self.reduced_matrices[problem.group].copy() for i in range(matrix.shape[1]): matrix[:, i] *= problem.weight data = problem.data if problem.has_scaling: for i, descriptor in enumerate(problem.descriptor): label = descriptor.label if self.filled_dataset_descriptors[label] is not None: start = sum(problem.data_sizes[0:i]) end = start + problem.data_sizes[i] matrix[start:end, :] *= self.filled_dataset_descriptors[label].scale clp, residual = self._residual_function(matrix, data) return clp, residual, residual / problem.weight results = list(map(residual_function, self.bag)) self._weighted_residuals = list(map(lambda result: result[1], results)) self._residuals = list(map(lambda result: result[2], results)) reduced_clps = list(map(lambda result: result[0], results)) self._ungroup_clps(reduced_clps) return self._reduced_clps, self._clps, self._weighted_residuals, self._residuals
[docs] def calculate_index_independent_ungrouped_residual( self, ) -> Tuple[ Dict[str, List[np.ndarray]], Dict[str, List[np.ndarray]], Dict[str, List[np.ndarray]], Dict[str, List[np.ndarray]], ]: self._clps = {} self._reduced_clps = {} self._weighted_residuals = {} self._residuals = {} for label, problem in self.bag.items(): self._clps[label] = [] self._reduced_clps[label] = [] self._weighted_residuals[label] = [] self._residuals[label] = [] data = problem.data for i in range(len(problem.global_axis)): matrix = self.reduced_matrices[label].copy() # TODO: .copy() or not if problem.dataset.scale is not None: matrix *= self.filled_dataset_descriptors[label].scale if problem.weight is not None: for j in range(matrix.shape[1]): matrix[:, j] *= problem.weight.isel({self._global_dimension: i}).values clp, residual = self._residual_function( matrix, data.isel({self._global_dimension: i}).values ) self._reduced_clps[label].append(clp) self._weighted_residuals[label].append(residual) if problem.weight is not None: self._residuals[label].append( residual / problem.weight.isel({self._global_dimension: i}) ) else: self._residuals[label].append(residual) self._clps = ( self.model.retrieve_clp_function( self.parameters, self.clp_labels, self.reduced_clp_labels, self.reduced_clps, self.data, ) if callable(self.model.retrieve_clp_function) else self._reduced_clps ) return self._reduced_clps, self._clps, self._weighted_residuals, self._residuals
def _ungroup_clps(self, reduced_clps: np.ndarray): reduced_clp_labels = self.reduced_clp_labels self._reduced_clp_labels = {} self._reduced_clps = {} for label, clp_labels in self.clp_labels.items(): # find offset in the full axis offset = _find_closest_index( self.data[label].coords[self._global_dimension][0].values, self._full_axis ) self._reduced_clp_labels[label] = [] self._reduced_clps[label] = [] for i in range(self.data[label].coords[self._global_dimension].size): group_label = self.bag[i].group dataset_clp_labels = clp_labels[i] if self._index_dependent else clp_labels index_clp_labels = ( reduced_clp_labels[i + offset] if self._index_dependent else reduced_clp_labels[group_label] ) self._reduced_clp_labels[label].append( [ clp_label for clp_label in dataset_clp_labels if clp_label in index_clp_labels ] ) mask = [ clp_label in self._reduced_clp_labels[label][i] for clp_label in index_clp_labels ] self._reduced_clps[label].append(reduced_clps[i + offset][mask]) self._clps = ( self.model.retrieve_clp_function( self.parameters, self.clp_labels, self.reduced_clp_labels, self.reduced_clps, self.data, ) if callable(self.model.retrieve_clp_function) else self._reduced_clps )
[docs] def calculate_additional_penalty(self) -> Union[np.ndarray, Dict[str, np.ndarray]]: """Calculates additional penalties by calling the model.additional_penalty function.""" if ( callable(self.model.has_additional_penalty_function) and self.model.has_additional_penalty_function() ): self._additional_penalty = self.model.additional_penalty_function( self.parameters, self.clp_labels, self.clps, self.matrices, self.data, self._scheme.group_tolerance, ) else: self._additional_penalty = None return self._additional_penalty
[docs] def create_result_data( self, copy: bool = True, history_index: int = None ) -> Dict[str, xr.Dataset]: if history_index is not None and history_index != -1: self.parameters = self.parameter_history[history_index] result_data = {label: self._create_result_dataset(label, copy=copy) for label in self.data} if callable(self.model.finalize_data): self.model.finalize_data(self, result_data) return result_data
def _create_result_dataset(self, label: str, copy: bool = True) -> xr.Dataset: dataset = self.data[label] if copy: dataset = dataset.copy() if self.grouped: if self.index_dependent: dataset = self._create_index_dependent_grouped_result_dataset(label, dataset) else: dataset = self._create_index_independent_grouped_result_dataset(label, dataset) else: if self.index_dependent: dataset = self._create_index_dependent_ungrouped_result_dataset(label, dataset) else: dataset = self._create_index_independent_ungrouped_result_dataset(label, dataset) self._create_svd("weighted_residual", dataset) self._create_svd("residual", dataset) # Calculate RMS size = dataset.residual.shape[0] * dataset.residual.shape[1] dataset.attrs["root_mean_square_error"] = np.sqrt( (dataset.residual ** 2).sum() / size ).values size = dataset.weighted_residual.shape[0] * dataset.weighted_residual.shape[1] dataset.attrs["weighted_root_mean_square_error"] = np.sqrt( (dataset.weighted_residual ** 2).sum() / size ).values # reconstruct fitted data dataset["fitted_data"] = dataset.data - dataset.residual return dataset def _create_index_dependent_grouped_result_dataset( self, label: str, dataset: xr.Dataset ) -> xr.Dataset: for index, grouped_problem in enumerate(self.bag): if label in grouped_problem.group: group_index = [ descriptor.label for descriptor in grouped_problem.descriptor ].index(label) global_index = grouped_problem.descriptor[group_index].index self._add_grouped_residual_to_dataset( dataset, grouped_problem, index, group_index, global_index ) # we assume that the labels are the same, this might not be true in # future models dataset.coords["clp_label"] = self.clp_labels[label][0] dataset["matrix"] = ( ( (self._global_dimension), (self._model_dimension), ("clp_label"), ), self.matrices[label], ) dataset["clp"] = ( ( (self._global_dimension), ("clp_label"), ), self.clps[label], ) return dataset def _create_index_independent_grouped_result_dataset( self, label: str, dataset: xr.Dataset ) -> xr.Dataset: self._add_index_independent_matrix_to_dataset(label, dataset) for index, grouped_problem in enumerate(self.bag): if label in grouped_problem.group: group_index = [ descriptor.label for descriptor in grouped_problem.descriptor ].index(label) global_index = grouped_problem.descriptor[group_index].index self._add_grouped_residual_to_dataset( dataset, grouped_problem, index, group_index, global_index ) dataset["clp"] = ( ( (self._global_dimension), ("clp_label"), ), self.clps[label], ) return dataset def _create_index_dependent_ungrouped_result_dataset( self, label: str, dataset: xr.Dataset ) -> xr.Dataset: self._add_index_dependent_ungrouped_matrix_to_dataset(label, dataset) self._add_ungrouped_residual_and_full_clp_to_dataset(label, dataset) return dataset def _create_index_independent_ungrouped_result_dataset( self, label: str, dataset: xr.Dataset ) -> xr.Dataset: self._add_index_independent_matrix_to_dataset(label, dataset) self._add_ungrouped_residual_and_full_clp_to_dataset(label, dataset) return dataset def _add_index_dependent_ungrouped_matrix_to_dataset(self, label: str, dataset: xr.Dataset): # we assume that the labels are the same, this might not be true in # future models dataset.coords["clp_label"] = self.clp_labels[label][0] dataset["matrix"] = ( ( (self._global_dimension), (self._model_dimension), ("clp_label"), ), np.asarray(self.matrices[label]), ) def _add_index_independent_matrix_to_dataset(self, label: str, dataset: xr.Dataset): dataset.coords["clp_label"] = self.clp_labels[label] dataset["matrix"] = ( ( (self._model_dimension), ("clp_label"), ), np.asarray(self.matrices[label]), ) def _add_grouped_residual_to_dataset( self, dataset: xr.Dataset, grouped_problem: GroupedProblem, index: int, group_index: int, global_index: int, ): if "residual" not in dataset: dim1 = dataset.coords[self._model_dimension].size dim2 = dataset.coords[self._global_dimension].size dataset["weighted_residual"] = ( (self._model_dimension, self._global_dimension), np.zeros((dim1, dim2), dtype=np.float64), ) dataset["residual"] = ( (self._model_dimension, self._global_dimension), np.zeros((dim1, dim2), dtype=np.float64), ) start = sum( self.data[grouped_problem.descriptor[i].label].coords[self._model_dimension].size for i in range(group_index) ) end = start + dataset.coords[self._model_dimension].size dataset.weighted_residual.loc[ {self._global_dimension: global_index} ] = self.weighted_residuals[index][start:end] dataset.residual.loc[{self._global_dimension: global_index}] = self.residuals[index][ start:end ] def _add_ungrouped_residual_and_full_clp_to_dataset(self, label: str, dataset: xr.Dataset): dataset["clp"] = ( ( (self._global_dimension), ("clp_label"), ), np.asarray(self.clps[label]), ) dataset["weighted_residual"] = ( ( (self._model_dimension), (self._global_dimension), ), np.transpose(np.asarray(self.weighted_residuals[label])), ) dataset["residual"] = ( ( (self._model_dimension), (self._global_dimension), ), np.transpose(np.asarray(self.residuals[label])), ) def _create_svd(self, name: str, dataset: xr.Dataset): l, v, r = np.linalg.svd(dataset[name], full_matrices=False) dataset[f"{name}_left_singular_vectors"] = ( (self._model_dimension, "left_singular_value_index"), l, ) dataset[f"{name}_right_singular_vectors"] = ( ("right_singular_value_index", self._global_dimension), r, ) dataset[f"{name}_singular_values"] = (("singular_value_index"), v)
def _find_overlap(a, b, rtol=1e-05, atol=1e-08): ovr_a = [] ovr_b = [] start_b = 0 for i, ai in enumerate(a): for j, bj in itertools.islice(enumerate(b), start_b, None): if np.isclose(ai, bj, rtol=rtol, atol=atol, equal_nan=False): ovr_a.append(i) ovr_b.append(j) elif bj > ai: # (more than tolerance) break # all the rest will be farther away else: # bj < ai (more than tolerance) start_b += 1 # ignore further tests of this item return (ovr_a, ovr_b) def _calculate_matrix( matrix_function: Callable, dataset_descriptor: DatasetDescriptor, axis: np.ndarray, extra: Dict, index: float = None, ) -> LabelAndMatrix: args = { "dataset_descriptor": dataset_descriptor, "axis": axis, } for k, v in extra: args[k] = v if index is not None: args["index"] = index clp_label, matrix = matrix_function(**args) return LabelAndMatrix(clp_label, matrix) def _reduce_matrix( model: Model, label: str, parameters: ParameterGroup, result: LabelAndMatrix, index: float, ) -> LabelAndMatrix: clp_labels = result.clp_label.copy() if callable(model.has_matrix_constraints_function) and model.has_matrix_constraints_function(): clp_label, matrix = model.constrain_matrix_function( label, parameters, clp_labels, result.matrix, index ) return LabelAndMatrix(clp_label, matrix) return LabelAndMatrix(clp_labels, result.matrix) def _combine_matrices(labels_and_matrices: List[LabelAndMatrix]) -> LabelAndMatrix: masks = [] full_clp_labels = None sizes = [] for label_and_matrix in labels_and_matrices: (clp_label, matrix) = label_and_matrix sizes.append(matrix.shape[0]) if full_clp_labels is None: full_clp_labels = clp_label masks.append([i for i, _ in enumerate(clp_label)]) else: mask = [] for c in clp_label: if c not in full_clp_labels: full_clp_labels.append(c) mask.append(full_clp_labels.index(c)) masks.append(mask) dim1 = np.sum(sizes) dim2 = len(full_clp_labels) full_matrix = np.zeros((dim1, dim2), dtype=np.float64) start = 0 for i, m in enumerate(labels_and_matrices): end = start + sizes[i] full_matrix[start:end, masks[i]] = m[1] start = end return LabelAndMatrix(full_clp_labels, full_matrix) def _find_closest_index(index: float, axis: np.ndarray): return np.abs(axis - index).argmin()