# BSD 3-Clause License.
#
# Copyright (c) 2019-2024 Robert A. Milton. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
""" **Functionality for sampling and Design of Experiments (DOE)** """
from __future__ import annotations
import numpy as np
from romcomma.base.definitions import *
import scipy.stats
from romcomma.data.storage import Frame, Repository, Fold
from romcomma.user import functions
import shutil
import sys
[docs]
def permute_axes(new_order: Sequence | None) -> NP.Matrix | None:
""" Provide a rotation matrix which reorders axes. Most use cases are to re-order input axes according to GSA.
Args:
new_order: A Tuple or List containing a permutation of ``[0,...,M-1]``, for passing to ``np.transpose``.
Returns: A rotation matrix which will reorder the axes to new_order. Returns ``None`` if ``new_order is None``.
"""
return None if new_order is None else np.eye(len(new_order))[new_order, :]
[docs]
class DOE:
""" Sampling methods for inputs."""
Method: Type = Callable[[int, int, Any], NP.Matrix] #: Function signature of a DOE method.
[docs]
@staticmethod
def latin_hypercube(N: int, M: int, is_centered: bool = True, **kwargs):
""" Latin Hypercube DOE.
Args:
N: The number of samples (rows).
M: The of input dimensions (columns).
is_centered: Boolean ordinate whether to centre each sample in its Latin Hypercube cell.
Default is False, which locates the sample randomly within its cell.
kwargs: Passed directly to
`scipy.stats.qmc.LatinHypercube <https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.qmc.LatinHypercube.html>`_.
Returns: An (N,M) matrix of N samples of dimension M.
"""
return scipy.stats.qmc.LatinHypercube(M, scramble=not is_centered).random(N)
[docs]
@staticmethod
def full_factorial(N: int, M: int):
""" Full factorial DOE.
Args:
N: The number of samples (rows).
M: The of input dimensions (columns).
Returns: An (N,M) matrix of N samples of dimension M.
"""
NM = N // M
N1 = N - M * NM
return np.concatenate([1/(2 * N1) + np.linspace(0, 1, N1, False),] +
(M-1) * [1/(2 * NM) + np.linspace(0, 1, NM, False), ], axis=1)
[docs]
@staticmethod
def space_filling_test(X: NP.Matrix, o: int) -> Dict[str, float]:
""" Test whether ``X`` is a space-filling design matrix, by finding the distance to the nearest point in ``X`` for ``o`` test points.
Args:
X: An (N,M) design matrix.
o: The number of test points used to assess whether ``X`` is a space-filling design.
Returns: A dict of six measures: The theoretical hard upper bound, expected upper bound and expected lower bound for a perfectly space-filling
design matrix, followed by the max, mean and SD of the distance-to-nearest-in-X over the o test points.
"""
N, M = X.shape
test = DOE.latin_hypercube(o, M)
distance = test[:, np.newaxis, :] - X[np.newaxis, :, :]
distance = np.sqrt(np.amin(np.einsum('iIM, iIM -> iI', distance, distance), axis=1))
cell_diag = np.power(N, -1/M) * np.sqrt(M)
return {'perfect hard upper bound': cell_diag, 'perfect expected upper bound': cell_diag / np.sqrt(6), 'perfect expected lower bound': cell_diag / 3,
'max': np.amax(distance, axis=0), 'mean': np.mean(distance), 'SD': np.std(distance)}
[docs]
class GaussianNoise:
""" Sample multivariate, zero-mean Gaussian noise. """
[docs]
class Variance:
""" An artificially generated (co)variance matrix for GaussianNoise, with a useful labelling scheme."""
@property
def matrix(self) -> NP.Matrix:
""" Variance as an (L,L) covariance matrix, suitable for constructing GaussianNoise."""
return self._matrix
@property
def meta(self) -> Dict[str, Any]:
""" Meta data for providing to ``data.storage``. This matches the initials in ``self.__format__()``."""
return {'generator': 'determined' if self.is_determined else 'undetermined',
'is_covariant': 'covariance' if self.is_covariant else 'variance', 'magnitude': self.magnitude}
def __call__(self) -> NP.Matrix:
"""
Returns: Variance as an (L,L) covariance matrix, suitable for constructing GaussianNoise.
The constructor generates the matrix (perhaps stochastically), so repeated calls to any methods produce identical Variance.
"""
return self.matrix
def __format__(self, format_spec: Any) -> str:
""" The label for this Variance, to help name samples informatively. The description is ``d.`` (determined) or ``u.`` (undetermined),
followed by ``v.`` (diagonal variance) or ``c.`` (non-diagonal covariance), followed by ``100 * self.magnitude:.2f``."""
return f'{"d." if self.is_determined else "u."}{"c." if self.is_covariant else "v."}{100 * self.magnitude:.2f}'
def __init__(self, L: int, magnitude: float, is_covariant: bool = False, is_determined: bool = True):
""" Instantiate an (L,L) GaussianNoise (co)variance matrix.
Args:
L: Output dimensionality.
magnitude: The StdDev of noise.
is_covariant: True to create a diagonal variance matrix.
is_determined: False to create a random symmetric matrix.
"""
self.magnitude, self.is_covariant, self.is_determined = magnitude, is_covariant, is_determined
if self.is_determined:
self._matrix = 2 * np.random.random_sample((L, L)) - np.ones((L, L))
self._matrix = np.matmul(self._matrix, self._matrix.transpose())
self._matrix /= np.trace(self._matrix) / L
else:
self._matrix = np.array([[(-1)**(i-j)/(1.0 + abs(i-j)) for i in range(L)] for j in range(L)])
if not self.is_covariant:
self._matrix = np.diag(np.diag(self._matrix))
self._matrix *= self.magnitude ** 2
@property
def variance(self) -> NP.Matrix:
return self._variance
def __call__(self, repo: Repository | None = None) -> NP.Matrix:
""" Generate N samples of L-dimensional Gaussian noise, sampled from :math:`N[0,self.variance]`.
The constructor generates the sample, so repeated calls to any method always refer to the same GaussianNoise.
Args:
repo: An optional Repository which will have GaussianNoise added to Y in data.csv.
Returns: An (N,L) noise matrix, where (L,L) is the shape of `self._variance`.
"""
if repo is not None:
repo.data.df.iloc[:, :] = np.concatenate((repo.X, repo.Y + self._rvs), axis=1)
repo.data.write()
return self._rvs
[docs]
def __init__(self, N: int, variance: NP.MatrixLike):
""" Generate N samples of L-dimensional Gaussian noise, sampled from :math:`\mathsf{N}[0,variance]`.
Args:
N: Number of samples (rows).
variance: (L,L) covariance matrix for homoskedastic noise.
"""
self._variance = np.atleast_2d(variance)
if len(self._variance.shape) == 2 and self._variance.shape[0] == 1:
self._variance = np.diagflat(self._variance)
elif self._variance.shape[0] != self._variance.shape[1] or len(self._variance.shape) > 2:
raise IndexError(f'variance.shape = {self._variance.shape} should be (L,) or (L,L).')
self._rvs = scipy.stats.multivariate_normal.rvs(mean=None, cov=self._variance, size=N)
self._rvs.shape = (N, self._variance.shape[1])
[docs]
class Function:
""" Sample a ``user.function.Vector``."""
@property
def repo(self) -> Repository:
""" The Repository containing the Function sample."""
return self._repo
[docs]
def collection(self, sub_folder: Union[Path, str]) -> Dict[str, Any]:
""" Construct a Dict for user.results.Collect, with appropriate ``extra_columns``.
Args:
folder: The folder under ``self.repo.folder`` housing the csvs to collect.
Returns: The Dict for ``self.repo``.
"""
return {'folder': self._repo.folder / sub_folder, 'N': self._N, 'noise': self._noise_variance.magnitude}
[docs]
def un_rotate_folds(self) -> Function:
""" Create an un-rotated Fold in the Repository, with index ``K+1``."""
shutil.copytree(self._repo.fold_folder(self._repo.K), self._repo.fold_folder(self._repo.K + 1))
fold = Fold(self._repo, self._repo.K + 1)
fold.X_rotation = np.transpose(fold.X_rotation)
Frame(fold.test_csv, fold.normalization.undo_from(fold.test_data.df))
fold = Fold(self._repo, self._repo.K)
Frame(self._repo.folder / 'undo_from.csv', fold.normalization.undo_from(fold.test_data.df))
return self
def _construct(self, folder: Path | str, X: NP.Matrix, function_vector: functions.Vector, noise: NP.Matrix, origin_meta: Dict[str, Any]) -> Repository:
""" Construct Repository housing the sample design matrix ``(X, f(X) + noise)``.
Args:
folder: The Repository folder.
X: An (N,M) design matrix of inputs.
function_vector: An (L,) function.Vector.
noise: An (N,L) design matrix of noise.
origin_meta: A Dict of meta specifying the origin of the sample.
Returns: The ``(X, f(X) + noise)`` sample design matrix Repository, before folding or rotating.
"""
Y = function_vector(X)
std = np.reshape(np.std(Y, axis=0), (1, -1))
Y += std * noise
columns = [('X', f'X.{i:d}') for i in range(X.shape[1])] + [('Y', f'Y.{i:d}') for i in range(Y.shape[1])]
df = pd.DataFrame(np.concatenate((X, Y), axis=1), columns=pd.MultiIndex.from_tuples(columns), dtype=float)
return Repository.from_df(folder=folder, df=df, meta={'origin': origin_meta})
[docs]
def __init__(self, root: Path | str, doe: DOE.Method, function_vector: functions.Vector, N: int, M: int, noise_variance: GaussianNoise.Variance,
ext: str | None = None, overwrite_existing: bool = False, **kwargs: Any):
""" Construct a Repository by sampling a function over a DOE.
Args:
root: The folder under which the Repository will sit.
doe: An experimental design for the sample inputs.
function_vector: A vector function.
N: The number of samples (rows) in the sample.
M: The input dimensionality (columns).
noise_magnitude: The (L,L) homoskedastic ``GaussianNoise.Variance``.
ext: Unless None, the repo name is suffixed by ``.[ext]``.
overwrite_existing: Whether to overwrite an existing Repository.
**kwargs: Options passed straight to doe.
"""
self._N, self._noise_variance = N, noise_variance
folder = Path(root) / f'{function_vector.name}.M.{M:d}.{self._noise_variance}.N.{N:d}{"" if ext is None else "." + ext}'
if folder.is_dir() and not overwrite_existing:
self._repo = Repository(folder)
else:
self._repo = self._construct(folder=folder, X=doe(N, M, **kwargs), function_vector=function_vector,
noise=GaussianNoise(N, self._noise_variance())(repo=None),
origin_meta={'DOE': doe.__name__, 'function_vector': function_vector.meta, 'noise': self._noise_variance.meta})
Frame(folder / 'likelihood.variance.csv', pd.DataFrame(self._noise_variance()))
if __name__ == '__main__':
if len(sys.argv) < 4:
print('Please provide at least 3 arguments: The folder to write to, the number of input dimensions (columns) M, and at least one value for the number '
'of samples (rows) N.')
else:
root = Path(sys.argv[1])
M = int(sys.argv[2])
for N in sys.argv[3:]:
N = int(N)
pd.DataFrame(DOE.latin_hypercube(N, M)).to_csv(root / f'lhs.{N}.csv')