Source code for romcomma.gpr.models

#  BSD 3-Clause License.
# 
#  Copyright (c) 2019-2024 Robert A. Milton. All rights reserved.
# 
#  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
# 
#  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
# 
#  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the
#     documentation and/or other materials provided with the distribution.
# 
#  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this
#     software without specific prior written permission.
# 
#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
#  THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
#  CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
#  PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
#  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
#  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

""" Contains the MOGP class implementing Gaussian Process Regression. """

from __future__ import annotations

import numpy as np
import pandas as pd

from romcomma.base.definitions import *
from romcomma.data.storage import Fold, Frame
from romcomma.base.classes import Data, Model
from romcomma.gpr.kernels import Kernel


[docs] class Likelihood(Model):
[docs] class Data(Data): """ The Data set of a MOGP."""
[docs] @classmethod @property def NamedTuple(cls) -> Type[NamedTuple]: """ The NamedTuple underpinning this Data set.""" class Values(NamedTuple): """ The data set of a MOGP. Attributes: variance (NP.Matrix): An (L,L), (1,L) or (1,1) noise variance matrix. (1,L) represents an (L,L) diagonal matrix. log_marginal (NP.Matrix): A numpy [[float]] used to record the log marginal likelihood. This is an output parameter, not input. """ variance: NP.Matrix = np.atleast_2d(0.02) log_marginal: NP.Matrix = np.atleast_2d(1.0) return Values
@classmethod @property def META(cls) -> Dict[str, Any]: return {'variance': True, 'covariance': True} @classmethod @property def VARIANCE_FLOOR(cls) -> float: return 1.0001E-6 @property def is_covariant(self) -> bool: return self._data.frames.variance.df.shape[0] > 1
[docs] def calibrate(self, **kwargs) -> Dict[str, Any]: """ Merely sets the trainable data.""" meta = self.META | kwargs if self.is_covariant: gf.set_trainable(self._parent._implementation[0].likelihood.variance._cholesky_diagonal, meta['variance']) gf.set_trainable(self._parent._implementation[0].likelihood.variance._cholesky_lower_triangle, meta['covariance']) else: for implementation in self._parent.implementation: gf.set_trainable(implementation.likelihood.variance, meta['variance']) return meta
[docs] def __init__(self, parent: GPR, read_data: bool = False, **kwargs: NP.Matrix): super().__init__(parent.folder / 'likelihood', read_data, **kwargs) self._parent = parent
# noinspection PyPep8Naming
[docs] class GPR(Model): """ Interface to a Gaussian Process."""
[docs] class Data(Data): """ The Data set of a MOGP."""
[docs] @classmethod @property def NamedTuple(cls) -> Type[NamedTuple]: """ The NamedTuple underpinning this Data set.""" class Values(NamedTuple): """ The data set of a MOGP. Attributes: kernel (NP.Matrix): A numpy [[str]] identifying the type of Kernel, as returned by gp.kernel.TypeIdentifier(). This is never set externally. The kernel parameter, when provided, must be a ``[[Kernel.Data]]`` set storing the desired kernel data. The kernel is constructed by inferring its type from the type of Kernel.Data. """ kernel: NP.Matrix = np.atleast_2d(None) return Values
@classmethod @property @abstractmethod def META(cls) -> Dict[str, Any]: """ Hyper-parameter optimizer meta""" @classmethod @property def KERNEL_FOLDER_NAME(cls) -> str: """ The name of the folder where kernel data are stored.""" return "kernel" @property def fold(self) -> Fold: """ The parent fold. """ return self._fold @property def test_csv(self) -> Path: return self._folder / 'test.csv' @property def test_summary_csv(self) -> Path: return self._folder / "test_summary.csv" @property def kernel(self) -> Kernel: return self._kernel @property def likelihood(self) -> Likelihood: return self._likelihood @property @abstractmethod def implementation(self) -> Tuple[Any, ...]: """ The implementation of this MOGP in GPFlow. If ``noise_variance.shape == (1,L)`` an L-tuple of kernels is returned. If ``noise_variance.shape == (L,L)`` a 1-tuple of multi-output kernels is returned. """ @property def L(self) -> int: """ The output (Y) dimensionality.""" return self._L @property def M(self) -> int: """ The input (X) dimensionality.""" return self._M @property def N(self) -> int: """ The the number of training samples.""" return self._N @property @abstractmethod def X(self) -> Any: """ The implementation training inputs.""" @property @abstractmethod def Y(self) -> Any: """ The implementation training outputs.""" @property @abstractmethod def K_cho(self) -> Union[NP.Matrix, TF.Tensor]: """ The Cholesky decomposition of the LNxLN noisy kernel(X, X) + likelihood.variance. Shape is (LN, LN) if self.kernel.is_covariant, else (L,N,N).""" @property @abstractmethod def K_inv_Y(self) -> Union[NP.Matrix, TF.Tensor]: """ The LN-Vector, which pre-multiplied by the LoxLN kernel k(x, X) gives the Lo-Vector predictive mean f(x). Shape is (L,1,N). Returns: ChoSolve(self.K_cho, self.Y) """ @abstractmethod def calibrate(self, **kwargs) -> Dict[str, Any]: raise NotImplementedError
[docs] @abstractmethod def predict(self, x: NP.Matrix, y_instead_of_f: bool = True) -> Tuple[NP.Matrix, NP.Matrix]: """ Predicts the response to input X. Args: x: An (o, M) design Matrix of inputs. y_instead_of_f: True to include noise in the variance of the result. Returns: The distribution of y or f, as a pair (mean (o, L) Matrix, std (o, L) Matrix). """
[docs] def predict_df(self, x: NP.Matrix, y_instead_of_f: bool = True, is_normalized: bool = True) -> pd.DataFrame: """ Predicts the response to input X. Args: x: An (o, M) design Matrix of inputs. y_instead_of_f: True to include noise in the variance of the result. is_normalized: Whether the results are normalized or not. Returns: The distribution of y or f, as a dataframe with M+L+L columns of the form (X, Mean, Predictive Std). """ X_heading, Y_heading = self._fold.meta['data']['X_heading'], self._fold.meta['data']['Y_heading'] prediction = self.predict(x, y_instead_of_f) result = pd.DataFrame(np.concatenate([x, prediction[0]], axis=1), columns=self._fold.test_data.df.columns) predictive_std = result.loc[:, [Y_heading]].copy() predictive_std.iloc[:] = prediction[1] if not is_normalized: result = self._fold.normalization.undo_from(result) predictive_std = self._fold.normalization.unscale_Y(predictive_std) result = result.rename(columns={Y_heading: 'Mean'}, level=0) predictive_std = predictive_std.rename(columns={Y_heading: 'SD'}, level=0) result = result.join([predictive_std]) return result
[docs] @abstractmethod def predict_gradient(self, x: NP.Matrix, y_instead_of_f: bool = True) -> Tuple[TF.Tensor, TF.Tensor]: """ Predicts the gradient GP dy/dx (or df/dx) where ``self`` is the GP for y(x). Args: x: An (o, M) design Matrix of inputs. y_instead_of_f: True to include noise in the variance of the result. Returns: The distribution of dy/dx or df/dx, as a pair (mean (o, L, M), cov (o, L, M, O, l, m)) if ``self.likelihood.is_covariant``, else (mean (o, L, M), cov (o, O, L, M)). """
[docs] def test(self) -> Frame: """ Tests the MOGP on the test data in self._fold.test_data. Test results comprise three values for each output at each sample: The mean prediction, the std error of prediction and the Z score of prediction (i.e. error of prediction scaled by std error of prediction). Returns: The test_data results as a Frame backed by MOGP.test_result_csv. """ result = Frame(self.test_csv, self._fold.test_data.df) Y_heading = self._fold.meta['data']['Y_heading'] prediction = self.predict(self._fold.test_x.values) predictive_mean = result.df.loc[:, [Y_heading]].copy().rename(columns={Y_heading: 'Mean'}, level=0) predictive_mean.iloc[:] = prediction[0] predictive_std = result.df.loc[:, [Y_heading]].copy().rename(columns={Y_heading: 'SD'}, level=0) predictive_std.iloc[:] = prediction[1] predictive_score = result.df.loc[:, [Y_heading]].copy().rename(columns={Y_heading: 'Z Score'}, level=0) predictive_score.iloc[:] -= predictive_mean.to_numpy(dtype=float, copy=False) abs_err = result.df.loc[:, [Y_heading]].copy().rename(columns={Y_heading: 'Abs Error'}, level=0) abs_err.iloc[:] -= predictive_mean.to_numpy(dtype=float, copy=False) abs_err = abs(abs_err) rmse = abs_err.iloc[:].copy().rename(columns={'Abs Error': 'RMSE'}, level=0) predictive_score.iloc[:] /= predictive_std.to_numpy(dtype=float, copy=False) result.df = result.df.join([predictive_mean, predictive_std, abs_err, predictive_score]) result.write() rmse = rmse**2 rmse = rmse.sum(axis=0)/rmse.count(axis=0) r2 = 1 - rmse rmse = rmse**(1/2) rmse = rmse if isinstance(rmse, pd.DataFrame) else pd.DataFrame(rmse).transpose() r2 = r2 if isinstance(r2, pd.DataFrame) else pd.DataFrame(r2).transpose() r2 = r2.rename(columns={'RMSE': 'R^2'}, level=0) predictive_std = predictive_std.sum(axis=0)/predictive_std.count(axis=0) predictive_std = predictive_std if isinstance(predictive_std, pd.DataFrame) else pd.DataFrame(predictive_std).transpose() ci = (predictive_std.iloc[:].copy().rename(columns={'SD': '95% CI'}, level=0)) ci = ci * 2 outliers = predictive_score[predictive_score**2 > 4].count(axis=0)/predictive_score.count(axis=0) outliers = outliers if isinstance(outliers, pd.DataFrame) else pd.DataFrame(outliers).transpose() outliers = outliers.rename(columns={'Z Score': 'outliers'}) summary = rmse.join([r2, predictive_std, ci, outliers]) summary = Frame(self.test_summary_csv, summary) return result
[docs] def broadcast_parameters(self, is_covariant: bool, is_isotropic: bool) -> GPR: """ Broadcast the data of the MOGP (including kernels) to higher dimensions. Shrinkage raises errors, unchanged dimensions silently do nothing. Args: is_covariant: Whether the outputs will be treated as dependent. is_isotropic: Whether to restrict the kernel to be isotropic. Returns: ``self``, for chaining calls. """ target_shape = (self._L, self._L) if is_covariant else (1, self._L) self._likelihood.data.frames.variance.broadcast_value(target_shape=target_shape, is_diagonal=True) self._kernel.broadcast_parameters(variance_shape=target_shape, M=1 if is_isotropic else self._M) self._implementation = None self._implementation = self.implementation return self
[docs] def __init__(self, name: str, fold: Fold, is_read: bool | None, is_covariant: bool, is_isotropic: bool, kernel_parameters: Kernel.Data | None = None, likelihood_variance: NP.Matrix | None = None): """ Set up data, and checks dimensions. Args: name: The name of this MOGP. fold: The Fold housing this MOGP. is_read: If True, the MOGP.kernel.data and MOGP.data and are read from ``fold.folder/name``, otherwise defaults are used. is_covariant: Whether the outputs will be treated as independent. is_isotropic: Whether to restrict the kernel to be isotropic. kernel_parameters: A Kernel.Data to use for MOGP.kernel.data. If not None, this replaces the kernel specified by file/defaults. If None, the kernel is read from file, or set to the default Kernel.Data(), according to read_from_file. likelihood_variance: The likelihood variance to use instead of file or default. Raises: IndexError: If a parameter is mis-shaped. """ self._fold = fold self._X, self._Y = self._fold.X.to_numpy(dtype=FLOAT(), copy=True), self._fold.Y.to_numpy(dtype=FLOAT(), copy=True) self._N, self._M, self._L = self._fold.N, self._fold.M, self._fold.L super().__init__(self._fold.folder / name, is_read) self._likelihood = Likelihood(self, is_read) if likelihood_variance is None else Likelihood(self, is_read, variance=likelihood_variance) if is_read and kernel_parameters is None: KernelType = Kernel.TypeFromIdentifier(self.data.frames.kernel.np[0, 0]) self._kernel = KernelType(self._folder / self.KERNEL_FOLDER_NAME, is_read) else: if kernel_parameters is None: kernel_parameters = Kernel.Data(self._folder / self.KERNEL_FOLDER_NAME) KernelType = Kernel.TypeFromParameters(kernel_parameters) self._kernel = KernelType(self._folder / self.KERNEL_FOLDER_NAME, is_read, **kernel_parameters.asdict()) self._data.replace(kernel=np.atleast_2d(KernelType.TYPE_IDENTIFIER)) self.broadcast_parameters(is_covariant, is_isotropic)
# noinspection PyPep8Naming
[docs] class MOGP(GPR): """ Implementation of a Gaussian Process.""" @classmethod @property def META(cls) -> Dict[str, Any]: return {'maxiter': 5000, 'gtol': 1E-16} @property def implementation(self) -> Tuple[Any, ...]: if self._implementation is None: if self._likelihood.is_covariant: self._implementation = tuple(mf.models.MOGPR(data=(self._X, self._Y), kernel=kernel, mean_function=None, noise_variance=self._likelihood._data.frames.variance.np) for kernel in self._kernel.implementation) else: self._implementation = tuple(gf.models.GPR(data=(self._X, self._Y[:, [l]]), kernel=kernel, mean_function=None, noise_variance=max(self._likelihood._data.frames.variance.np[0, l], self._likelihood.VARIANCE_FLOOR)) for l, kernel in enumerate(self._kernel.implementation)) return self._implementation
[docs] def calibrate(self, method: str = 'L-BFGS-B', **kwargs) -> Dict[str, Any]: """ Optimize the MOGP hyper-data. Args: method: The optimization algorithm (see https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html). kwargs: A Dict of implementation-dependent optimizer meta, following the format of GPR.META. Options for the kernel should be passed as kernel={see kernel.META for format}. Options for the likelihood should be passed as likelihood={see likelihood.META for format}. """ meta = (self.read_meta() if self._meta_json.exists() else self.META) kernel_options = self._kernel.calibrate(**(meta.pop('kernel', {}) | kwargs.pop('kernel', {}))) likelihood_options = self._likelihood.calibrate(**(meta.pop('likelihood', {}) | kwargs.pop('likelihood', {}))) meta.update(kwargs) meta.pop('result', None) opt = gf.optimizers.Scipy() meta.update({'result': str(tuple(opt.minimize(closure=gp.training_loss, variables=gp.trainable_variables, method=method, options=meta) for gp in self._implementation)), 'kernel': kernel_options, 'likelihood': likelihood_options}) self.write_meta(meta) if self._likelihood.is_covariant: self._likelihood.parameters = self.likelihood.data.replace(variance=self._implementation[0].likelihood.variance.value.numpy(), log_marginal=self._implementation[0].log_marginal_likelihood().numpy()) self._kernel.parameters = self.kernel.data.replace(variance=self._implementation[0].kernel.variance.value.numpy(), lengthscales=tf.squeeze(self._implementation[0].kernel.lengthscales)) else: self._likelihood.parameters = self._likelihood.data.replace(variance=tuple(gp.likelihood.variance.numpy() for gp in self._implementation), log_marginal=tuple(gp.log_marginal_likelihood() for gp in self._implementation)) self._kernel.parameters = self._kernel.data.replace(variance=tuple(gp.kernel.variance.numpy() for gp in self._implementation), lengthscales=tuple(gp.kernel.lengthscales.numpy() for gp in self._implementation)) return meta
[docs] def predict(self, X: NP.Matrix, y_instead_of_f: bool = True) -> Tuple[NP.Matrix, NP.Matrix]: X = X.astype(dtype=FLOAT()) if self._likelihood.is_covariant: gp = self.implementation[0] results = gp.predict_y(X) if y_instead_of_f else gp.predict_f(X) else: results = tuple(gp.predict_y(X) if y_instead_of_f else gp.predict_f(X) for gp in self._implementation) results = tuple(np.transpose(result) for result in zip(*results)) results = tuple(results[i][0] for i in range(len(results))) return np.atleast_2d(results[0]), np.atleast_2d(np.sqrt(results[1]))
[docs] def predict_gradient(self, x: NP.Matrix, y_instead_of_f: bool = True) -> Tuple[TF.Tensor, TF.Tensor]: x = tf.Variable(x.astype(dtype=FLOAT())) Lambda = tf.broadcast_to(1.0 / tf.constant(self.kernel.data.frames.lengthscales.np, dtype=FLOAT()), [x.shape[0], self.L, self.M]) with tf.GradientTape() as tape: @tf.function def _KXx(x: tf.Variable) -> TF.Tensor: if self._likelihood.is_covariant: return tf.reshape(self._implementation[0].kernel(self.X, x), [self._L, self._N, self._L, x.shape[0]]) else: return tf.stack([gp.kernel(self.X, x) for gp in self._implementation], axis=0) KXx = _KXx(x) dxKXx = tape.jacobian(KXx, x) if self._likelihood.is_covariant: dxKXx = tf.einsum('LNlooM -> LNloM', dxKXx) mean = tf.einsum('LNloM, LiN -> olM', dxKXx, self.K_inv_Y) dxKXx = tf.reshape(dxKXx, [self._L * self._N, self._L * x.shape[0] * self._M]) var = tf.reshape(tf.linalg.triangular_solve(self.K_cho, dxKXx, lower=True), [self._L, self._N, self._L, x.shape[0], self._M]) var = -tf.einsum('LNlOM, LNlom -> OLolMm', var, var) ddxxkxx = tf.einsum('OLM, olM, LOlo -> OLolM', Lambda, Lambda, tf.reshape(self._implementation[0].kernel(x), [self._L, x.shape[0], self._L, x.shape[0]])) else: dxKXx = tf.einsum('LNooM -> LNoM', dxKXx) mean = tf.einsum('lNoM, liN -> olM', dxKXx, self.K_inv_Y) dxKXx = tf.reshape(dxKXx, [self._L, self._N, x.shape[0] * self._M]) var = tf.reshape(tf.linalg.triangular_solve(self.K_cho, dxKXx, lower=True), [self._L, self._N, x.shape[0], self._M]) var = -tf.einsum('LNOM, LNom -> OoLMm', var, var) ddxxkxx = tf.einsum('OLM, oLM, LOo -> OoLM', Lambda, Lambda, tf.stack([gp.kernel(x) for gp in self._implementation], axis=0)) var = tf.linalg.set_diag(var, tf.linalg.diag_part(var) + ddxxkxx) return mean, var
@property def X(self) -> TF.Matrix: """ The implementation training inputs as an (N,M) design matrix.""" return self._implementation[0].data[0] @property def Y(self) -> TF.Matrix: """ The implementation training outputs as an (N,L) design matrix. """ return self._implementation[0].data[1] if self._likelihood.is_covariant else tf.concat([gp.data[1] for gp in self._implementation], axis=1) @property def K_cho(self) -> TF.Tensor: if self._likelihood.is_covariant: gp = self._implementation[0] result = gp.likelihood.add_to(gp.KXX) else: result = [] for gp in self._implementation: K = gp.kernel(self.X) K_diag = tf.linalg.diag_part(K) result.append(tf.linalg.set_diag(K, K_diag + tf.fill(tf.shape(K_diag), gp.likelihood.variance))) result = tf.stack(result) return tf.linalg.cholesky(result) @property def K_inv_Y(self) -> TF.Tensor: Y = tf.reshape(tf.transpose(self.Y), [-1, 1]) if self._likelihood.is_covariant else tf.transpose(self.Y)[..., tf.newaxis] return tf.reshape(tf.linalg.cholesky_solve(self.K_cho, Y), [self._L, 1, self._N])
[docs] def check_K_inv_Y(self, x: NP.Matrix) -> NP.Matrix: """ FOR TESTING PURPOSES ONLY. Should return 0 Vector (to within numerical error tolerance). Args: x: An (o, M) matrix of inputs. Returns: Should return zeros((Lo)) (to within numerical error tolerance). """ predicted = self.predict(x)[0] o = predicted.shape[0] if self._likelihood.is_covariant: kernel = tf.reshape(self._implementation[0].kernel(x, self.X), [self._L, o, self._L, self._N]) ein = 'loLN, LiN -> ol' else: kernel = tf.stack([gp.kernel(x, self.X) for gp in self._implementation], axis=0) ein = 'loN, liN -> ol' result = tf.einsum(ein, kernel, self.K_inv_Y) result -= predicted return tf.sqrt(tf.reduce_sum(result * result, axis=0)/o)