"""Reduced Basis module."""
from anytree import Node
import numpy as np
from . import integrals
# import logging
# logger = logging.getLogger("arby.basis")
[docs]class ReducedBasis:
"""Class for building a reduced basis (RB) using the RB greedy algorithm.
The reduced basis is built from the training data set with a user
specified tolerance by linear combinations of its elements. The
reduced basis can be also constructed with a domain decomposition
of the parameter space, building local basis in each subspace.
Parameters
----------
index_seed_global_rb : int, optional
The seed for construct the reduced basis, by default 0.
lmax : int, optional
The maximum number domain partitions performed.
nmax : int, optional
The maximum number of basis functions to be used, by default np.inf.
greedy_tol : float, optional
The greedy tolerance, by default 1e-12.
normalize : bool, optional
Indicates if the training set should be normalized, by default False.
integration_rule : str, optional
The integration rule to be used, by default "riemann".
Attributes
----------
tree : Node
The tree data structure for the reduced basis.
"""
def __init__(
self,
index_seed_global_rb=0,
lmax=0,
nmax=np.inf,
greedy_tol=1e-12,
normalize=False,
integration_rule="riemann",
) -> None:
"""summary.
Parameters
----------
index_seed_global_rb : int, optional
Index of the training_set element that is going to be the seed
of the greedy algorithm, by default 0
lmax : int, optional
Maximum depth of the tree built by the partitioning of the
parameter space, by default 0
nmax : _type_, optional
Maximum dimension of the reduced basis, by default np.inf
greedy_tol : _type_, optional
Precision to reach by the greedy algorithm, by default 1e-12
normalize : bool, optional
Normalize the training set, by default False
integration_rule : str, optional
By default "riemann"
"""
# the default seed is the first of the array "parameters"
self.index_seed_global_rb = index_seed_global_rb
self.lmax = lmax
self.nmax = nmax
self.greedy_tol = greedy_tol
self.normalize = normalize
self.integration_rule = integration_rule
self.__first_iteration = True
self._trained = False
assert self.nmax > 0 and self.lmax >= 0
# comenzamos la implementacion de reduced_basis
# la idea es acoplar esto al método fit de ReducedModel.
[docs] def fit(
self,
training_set,
parameters,
physical_points,
) -> None:
"""Build a reduced basis from training data.
This function implements the Reduced Basis (RB) greedy algorithm to
build an orthonormalized reduced basis out from training data. The
basis is built to reproduce the training functions with the user
specified tolerance by linear combinations
of its elements.
Parameters
----------
training_set : numpy.ndarray
Training set functions.
parameters : numpy.ndarray
Associated parameters to the training set functions.
physical_points : numpy.ndarray
Physical points for quadrature rules.
"""
self._fit(
training_set,
parameters,
physical_points,
index_seed=self.index_seed_global_rb,
parent=None,
node_idx=0,
deep=0,
)
self._trained = True
def _fit(
self,
training_set,
parameters,
physical_points,
# Los siguientes son parametros internos para la función self.fit().
index_seed,
parent,
node_idx,
deep,
) -> None:
_validate_parameters(parameters)
_validate_physical_points(physical_points)
_validate_training_set(training_set)
if self.__first_iteration is True:
assert parent is None and node_idx == 0 and deep == 0
self.__first_iteration = False
# Create a node for the tree.
# If the tree does not exists, create it.
if parent is not None:
node = Node(
name=parent.name + (node_idx,),
parent=parent,
train_parameters=parameters,
)
else:
self.tree = Node(name=(node_idx,), train_parameters=parameters)
node = self.tree
integration = integrals.Integration(
physical_points, rule=self.integration_rule
)
# useful constants
ntrain = training_set.shape[0]
nsamples = training_set.shape[1]
max_rank = min(ntrain, nsamples)
# validate inputs
if nsamples != np.size(integration.weights_):
raise ValueError(
"Number of samples is inconsistent with quadrature rule."
)
if np.allclose(np.abs(training_set), 0, atol=1e-30):
raise ValueError("Null training set!")
# ====== Seed the greedy algorithm and allocate memory ======
# memory allocation
greedy_errors = np.empty(max_rank, dtype=np.float64)
proj_matrix = np.empty((max_rank, ntrain), dtype=training_set.dtype)
basis_data = np.empty((max_rank, nsamples), dtype=training_set.dtype)
norms = integration.norm(training_set)
if self.normalize:
# normalize training set
training_set = np.array(
[
h if np.allclose(h, 0, atol=1e-15) else h / norms[i]
for i, h in enumerate(training_set)
]
)
# choose seed
next_index = index_seed
seed = training_set[next_index]
aux = 0
while aux < ntrain - 1:
if np.allclose(np.abs(seed), 0):
if next_index < ntrain - 1:
next_index += 1
else:
next_index = 0
seed = training_set[next_index]
aux += 1
else:
break
greedy_indices = [next_index]
basis_data[0] = training_set[next_index]
proj_matrix[0] = integration.dot(basis_data[0], training_set)
sq_errors = _sq_errs_rel
errs = sq_errors(np.ones(ntrain), proj_matrix[0])
else:
# choose seed
next_index = index_seed # old version: np.argmax(norms)
greedy_indices = [next_index]
basis_data[0] = training_set[next_index] / norms[next_index]
proj_matrix[0] = integration.dot(basis_data[0], training_set)
sq_errors = _sq_errs_abs
errs, diff_training = sq_errors(
proj_matrix[0], basis_data[0], integration.dot, training_set
)
next_index = np.argmax(errs)
greedy_errors[0] = errs[next_index]
sigma = greedy_errors[0]
# ====== Start greedy loop ======
# logger.debug("\n Step", "\t", "Error")
nn = 0
# print(nn, sigma, next_index)
while sigma > self.greedy_tol and self.nmax > nn + 1:
if next_index in greedy_indices:
break
nn += 1
greedy_indices.append(next_index)
basis_data[nn], _ = _gs_one_element(
training_set[greedy_indices[nn]],
basis_data[:nn],
integration,
)
proj_matrix[nn] = integration.dot(basis_data[nn], training_set)
if self.normalize:
errs = sq_errors(errs, proj_matrix[nn])
else:
errs, diff_training = sq_errors(
proj_matrix[nn],
basis_data[nn],
integration.dot,
diff_training,
)
next_index = np.argmax(errs)
greedy_errors[nn] = errs[next_index]
sigma = errs[next_index]
# print(nn, sigma, next_index)
# logger.debug(nn, "\t", sigma)
# Prune excess allocated entries
greedy_errors, proj_matrix = _prune(greedy_errors, proj_matrix, nn + 1)
if self.normalize:
# restore proj matrix
proj_matrix = norms * proj_matrix
# a datos de acá se los puede guardar solo cuando el nodo
# es una hoja del árbol.
node.basis = basis_data[: nn + 1]
node.indices = greedy_indices
node.idx_anchor_0 = node.indices[0]
if len(node.indices) > 1:
node.idx_anchor_1 = node.indices[1]
node.errors = greedy_errors
node.projection_matrix = proj_matrix.T
node.integration = integration
node.training_set = training_set
node.train_parameters = parameters
if (
deep < self.lmax
and self.greedy_tol < node.errors[-1]
and len(node.indices) > 1
):
idxs_subspace0, idxs_subspace1 = self.partition(
parameters, node.idx_anchor_0, node.idx_anchor_1
)
# agrego estos atributos para usarlos al testear
node.idxs_subspace0 = idxs_subspace0
node.idxs_subspace1 = idxs_subspace1
self._fit(
training_set[idxs_subspace0],
parameters[idxs_subspace0],
physical_points,
parent=node,
node_idx=0,
deep=deep + 1,
index_seed=0,
)
self._fit(
training_set[idxs_subspace1],
parameters[idxs_subspace1],
physical_points,
parent=node,
node_idx=1,
deep=deep + 1,
index_seed=0,
)
@property
def is_trained(self):
"""Return True only if the instance is trained, False otherwise.
Returns
-------
Bool
"""
return self._trained
[docs] def search_leaf(self, parameters, node):
"""Search Leaf.
This function finds the leaf node in a tree by recursively
evaluating each node using the select_child_node function.
If a node is a leaf, the node is returned, otherwise the
search continues on the selected child node.
Parameters
----------
parameters : np.ndarray
Set of parameters to search in the tree.
Returns
-------
node : np.ndarray
Set of nodes where the parameters are located in the tree.
"""
# parameters: conjunto de parametros que se quiere buscar sus
# respectivas bases reducidas.
if not node.is_leaf:
child = select_child_node(parameters, node)
return self.search_leaf(parameters, child)
else:
return node
[docs] def partition(self, parameters, idx_anchor_0, idx_anchor_1):
"""Partition the parameter space.
This code partitions a list of parameters into two subspaces
based on their distances to two anchors. The function calculates
the distances and returns two arrays of indices representing
the parameters in each subspace. For equal distances the function
make a random decision.
Parameters
----------
parameters : np.ndarray
array of parameter from the domain of problem
idx_anchor_0 : float, integer
first greedy parameter of the space to divide
idx_anchor_1 : float, integer
second greedy parameter of the space to divide
Returns
-------
np.ndarray
indices of parameter that correspond to each subspace
References
----------
[CerinoAndTiglio2023] An automated parameter domain decomposition
approach for gravitational wave surrogates using hp-greedy
refinement. Cerino, F. and Tiglio M. arXiv:2212.08554 (2023)
"""
anchor_0 = parameters[idx_anchor_0]
anchor_1 = parameters[idx_anchor_1]
assert not np.array_equal(anchor_0, anchor_1)
seed = 12345
rng = np.random.default_rng(seed)
# caso de arby con ts normalizado:
# la semilla es el primer elemento,
# por lo tanto, si quiero que los anchors vayan primero:
# #idxs_subspace0 = []
# #idxs_subspace1 = []
idxs_subspace_0 = [idx_anchor_0]
idxs_subspace_1 = [idx_anchor_1]
# y usar a continuación del for --> if idx != idx_anchor0
# and idx != idx_anchor1:
# sirve para el caso de usar normalize = True en
# reduced_basis()
# da error en splines porque scipy
# pide los parametros de forma ordenada
for idx, parameter in enumerate(parameters):
if idx != idx_anchor_0 and idx != idx_anchor_1:
dist_anchor_0 = np.linalg.norm(anchor_0 - parameter) # 2-norm
dist_anchor_1 = np.linalg.norm(anchor_1 - parameter)
if dist_anchor_0 < dist_anchor_1:
idxs_subspace_0.append(idx)
elif dist_anchor_0 > dist_anchor_1:
idxs_subspace_1.append(idx)
else:
# para distancias iguales se realiza
# una elección aleatoria.
# tener en cuenta que se puede agregar el
# parametro a ambos subespacios!
if rng.integers(2):
idxs_subspace_0.append(idx)
else:
idxs_subspace_1.append(idx)
return np.array(idxs_subspace_0), np.array(idxs_subspace_1)
def _prune(greedy_errors, proj_matrix, num):
"""Prune arrays to have size num."""
return greedy_errors[:num], proj_matrix[:num]
def _sq_errs_rel(errs, proj_vector):
"""Square of projection errors from precomputed projection coefficients.
This function takes advantage of an orthonormalized basis and a normalized
training set to compute fewer floating-point operations than in the
non-normalized case.
Parameters
----------
errs : numpy.array
Projection errors.
proj_vector : numpy.ndarray
Stores the projection coefficients of the training set onto the actual
basis element.
Returns
-------
proj_errors : numpy.ndarray
Squared projection errors.
"""
return np.subtract(errs, np.abs(proj_vector) ** 2)
def _sq_errs_abs(proj_vector, basis_element, dot_product, diff_training):
"""Square of projection errors from precomputed projection coefficients.
Since the training set is not a-priori normalized, this function computes
errors computing the squared norm of the difference between training set
and the approximation. This method trades accuracy by memory.
Parameters
----------
proj_vector : numpy.ndarray
Stores projection coefficients of training functions onto the actual
basis.
basis_element : numpy.ndarray
Actual basis element.
dot_product : arby.Integration.dot
Inherited dot product.
diff_training : numpy.ndarray
Difference between training set and projected set aiming to be
actualized.
Returns
-------
proj_errors : numpy.ndarray
Squared projection errors.
diff_training : numpy.ndarray
Actualized difference training set and projected set.
"""
diff_training = np.subtract(
diff_training, np.tensordot(proj_vector, basis_element, axes=0)
)
return np.real(dot_product(diff_training, diff_training)), diff_training
def _gs_one_element(h, basis, integration, max_iter=3):
"""Orthonormalize a function against an orthonormal basis."""
norm = integration.norm(h)
e = h / norm
for _ in range(max_iter):
for b in basis:
e -= b * integration.dot(b, e)
new_norm = integration.norm(e)
if new_norm / norm > 0.5:
break
norm = new_norm
else:
raise StopIteration("Max number of iterations reached ({max_iter}).")
return e / new_norm, new_norm
seed1 = 12345
rng1 = np.random.default_rng(seed1)
[docs]def select_child_node(parameter, node):
"""Select child node.
The function selects a child node from a binary tree structure,
given a parameter. The function calculates the distances
between the parameter and two anchors, stored in the node,
and returns the child node with the closest anchor. If the
distances are equal, a random choice is made.
Parameters
----------
parameter : np.ndarray
parameter to evaluate by the sustitute model in a subspace
node : np.ndarray
node where the parameter is located in the subspace
"""
# node : se da la raiz del arbol binario para realizar la evaluación.
# parameter : parámetro a evaluar por el modelo sustituto de un subespacio.
anchor_0 = node.train_parameters[node.idx_anchor_0]
anchor_1 = node.train_parameters[node.idx_anchor_1]
dist_anchor_0 = np.linalg.norm(anchor_0 - parameter) # 2-norm used.
dist_anchor_1 = np.linalg.norm(anchor_1 - parameter) # 2-norm used.
if dist_anchor_0 < dist_anchor_1:
if node.children[0].name[-1] == 0:
child = node.children[0]
else:
child = node.children[1]
elif dist_anchor_0 > dist_anchor_1:
# child = node.children[1]
if node.children[0].name[-1] == 1:
child = node.children[0]
else:
child = node.children[1]
else:
# para distancias iguales se realiza una elección aleatoria.
if rng1.integers(2):
child = node.children[0]
else:
child = node.children[1]
return child
[docs]def normalize_set(array, domain, rule="riemann"):
"""Normalize set.
Normalize a set of functions or arrays.
Parameters
----------
array : np.ndarray
Set of functions to normalize.
domain : np.ndarray
Physical domain of the set of functions.
rule : str, optional
Integration rule used to calculate the norm (default="riemann"),
by default "riemann"
Returns
-------
np.ndarray
Normalized set of functions.
"""
integration = integrals.Integration(domain, rule)
norms = integration.norm(array)
return np.array(
[
h if np.allclose(h, 0, atol=1e-15) else h / norms[i]
for i, h in enumerate(array)
]
)
[docs]def error(h1, h2, domain, rule="riemann"):
"""Error function.
The error is computed in the L2 norm (continuous case) or the 2-norm
(discrete case), that is, ||h1 - h2||^2.
Parameters
----------
h1 : np.ndarray
h2 : np.ndarray
domain : np.ndarray
rule : integration rule used to compute the error (default="riemann")
"""
integration = integrals.Integration(domain, rule)
diff = h1 - h2
return np.real(integration.dot(diff, diff))
def _validate_parameters(input_value):
"""Validate parameters.
This function validate the parameters
"""
if not isinstance(input_value, np.ndarray):
raise TypeError(
"The input parameters must be a numpy array."
"Got instead type {}".format(type(input_value))
)
def _validate_training_set(input_value):
"""Validate training set.
This function validate the parameters
"""
if not isinstance(input_value, np.ndarray):
raise TypeError(
"The input training_set must be a numpy array."
"Got instead type {}".format(type(input_value))
)
def _validate_physical_points(input_value):
"""Validate physical points.
This function validate the parameters
"""
if not isinstance(input_value, np.ndarray):
raise TypeError(
"The input physical_points must be a numpy array."
"Got instead type {}".format(type(input_value))
)