From 027bb45d494800b0735015603b52ec4bb05b641b Mon Sep 17 00:00:00 2001 From: Anton Vladyka <anvlad@utu.fi> Date: Tue, 19 Dec 2023 14:29:52 +0200 Subject: [PATCH] files renamed --- eca.py | 656 ++++++++++++++++++++++++--------------------------- eca_numpy.py | 383 ++++++++++++++++++++++++++++++ ecat.py | 355 ---------------------------- 3 files changed, 697 insertions(+), 697 deletions(-) create mode 100644 eca_numpy.py delete mode 100644 ecat.py diff --git a/eca.py b/eca.py index 0fe0cff..8fca043 100644 --- a/eca.py +++ b/eca.py @@ -1,383 +1,355 @@ -""" -This class implements Emulator-based component analysis (ECA), as described in -J. Niskanen, A. Vladyka, J. Niemi, and C. J. Sahle, "Emulator-based decomposition for structural sensitivity -of core-level spectra," Royal Society Open Science, vol. 9, 2022, doi: 10.1098/rsos.220093 -Flat implementation is available as Supplementary to this paper. +import torch +from torch import nn +import torch.optim as optim +from torch.utils.data import TensorDataset, DataLoader +from tqdm import trange -The key idea is for a given predictor X -> Y (originally, structure-to-spectrum), we find the dominant directions -(a basis set V) in the X space, such that the predictions Y_pred for the points fromX, being projected onto V basis, -cover the majority of the known variance of Y. In the original work, we deduced the structural features of molecular -system which describe the spectral variations. -""" -import numpy as np -import pickle # for saving|loading the model -from scipy import optimize -from random import shuffle -from sklearn.model_selection import train_test_split # for validation fit - - -class ECA: - """ - Usage - ----- - from eca import ECA - eca = ECA(model.predict_t) - v, yvar, xvar = eca.fit(data_x, data_y, n_comp=2, tol=5E-6, runs=1, validate=0.2, - partial={'batch_size':80, 'tol': 2E-6, 'shuffle': True, 'runs': 1}) - - Jargon - ------ - x : data_x [N x L] - y : data_y [N x M] - v : ECA components up to rank K, K x L - t : ECA scores, N x K - w : projections, N x L, w = t @ v +class ECA(nn.Module): + """ECA approach implemented in pytorch + Requires torch-based emulator to predict X -> Y + Example: + # test_x_t and test_y_t are tensors (test X and Y data, respectively) + # convert numpy array to torch.tensors via e.g. + test_x_t, test_y_t = map(lambda x: torch.tensor(x, dtype=torch.float32), [test_x, test_y]) + eca = ECA(model) # initialize with a trained predictor (for pytorch the model itself is a predictor, + i.e., Y = model(X) <=> Y = model.forward(X)) + options = { + 'tol' : 1E-4, + 'epochs': 10000, + 'lr' : 1E-3, + 'batch_size': 200, + 'seed': 123, + } + v, y_var, x_var = eca.fit(test_x_t, test_y_t, n_comp=3, options=options) + # check orthonormality + # must be unity matrix + print(v @ v.T) """ - DEFAULT_CONSTRAINT_TOLERANCE = 1E-10 - DEFAULT_MAXITER = 10**6 - DEFAULT_RUNS = 5 - DEFAULT_BATCH_SIZE = 50 - DEFAULT_TOLERANCE_PARTIAL = 1E-5 + # default options + DEFAULT_LR = 1E-3 + DEFAULT_TOL = 1E-4 + DEFAULT_EPOCHS = 10000 + DEFAULT_BETAS = (0.9, 0.999) + DEFAULT_BATCH_SIZE = 200 + DEFAULT_LR_INV = 5E-2 + DEFAULT_TOL_INV = 1E-4 + DEFAULT_EPOCHS_INV = 1000 - def __init__(self, predict): + def __init__(self, predict: nn.Module): """ Parameters - ------------- - predict: func - predictor data_x -> data_y + ----------- + predict: nn.Module or func X -> Y + Predictor, torch-based """ + super().__init__() self.predict = predict - self._comp = [] - self._x_var = [] - self._y_var = [] - self.n = 0 + self.V = None # calculated components + self._v = None # currently evaluated component + self.u = None + self.y_var: list = [] # covered variance for Y + self.x_var: list = [] # covered variance for X + self.training_losses: list = [] + self.seed = None - @property - def vectors(self): - """Returns ECA component vectors - """ - return np.array(self._comp) + def set_seed(self, seed: int) -> None: + """ Set seed for torch.manual_seed + Affects initial guess of the vector (via xavier_init_) and batches while training """ + self.seed = seed - @staticmethod - def missing_variance(known: np.array, predicted: np.array): - """Metric to define the spectral coverage - """ - known_mean = np.outer(np.ones(known.shape[0]), np.mean(known, axis=0)) - return (np.trace(np.matmul((known - predicted).T, (known - predicted))) / - np.trace(np.matmul((known - known_mean).T, known - known_mean))) - - def project_x(self, x): - """ Returns projection of the data_x on the given vector during fit - """ - y = np.array(x)[None, :] - return (self.data_x @ y.T) @ y / (y ** 2).sum() + @property + def n_calc(self) -> int: + """Number of already calculated components""" + return self.V.size()[0] if self.V is not None else None - def projection(self, x=None): - """ Returns full projection of the data_x on the given vector AND already - evaluated ECA components during fit + def _init(self, n: int, start: int = 0) -> None: """ - if x is None: - return self.projected_x - return self.project_x(x) + self.projected_x - - def projected_missing_variance(self, x=None): - """ Returns missing variance between prediction of the projected points and known""" - projected = self.projection(x) - predicted = self.predict(projected) - return self.missing_variance(self.data_y, predicted) - - def x0_init(self): - """ Initialization for the first ECA component search. - Initializes as a random unity vector where the variance is the smallest + Initializes the vectors as well as resets them after restart of the fit """ - if len(self._comp) == 1: - x0 = None - var = 100 - for k in range(100): # initial guess - ini = np.random.randn(self.n) - ini /= np.sqrt((ini**2).sum()) - proj = self.projected_missing_variance(ini) - if proj < var: - var = proj - x0 = ini - else: - x0 = np.random.randn(self.n) - x0 = x0 / np.sqrt((x0 ** 2).sum()) - return x0 + self.n = n # dimensionality of input data + if self.V is None: + self.V = torch.empty(0, self.n, dtype=torch.float32) # already calculated EC components + self._v = torch.empty(1, self.n, dtype=torch.float32, requires_grad=True) # currently evaluated component + self.u = torch.empty(1, self.n, dtype=torch.float32) # to keep current vector + assert start >= 0, 'Starting component number must be nonnegative' + self.V = self.V[:start, ] + self.x_var = self.x_var[:start] + self.y_var = self.y_var[:start] + nn.init.xavier_normal_(self._v) - def transform(self, x): - """ Calculates projection of given X vector(s) onto ECA space: x -> t - """ - if x.ndim == 1: - x = x[None, :] - return x @ self.vectors.T + @staticmethod + def r2loss(predicted: torch.Tensor, known: torch.Tensor) -> torch.Tensor: + """Calculated generalized missing variance between known and predicted values. + Covered variance = 1 - missing variance""" + known_mean = torch.outer(torch.ones(known.shape[0]), known.mean(axis=0)) + return torch.trace((known - predicted).T @ (known - predicted)) / torch.trace( + (known - known_mean).T @ (known - known_mean)) - def expand(self, t): - """ Converts vectors from ECA space to X space: t -> w - """ - return t @ self.vectors + @property + def v(self) -> torch.Tensor: + """Returns currently evaluated normalized vector""" + return self._v / torch.linalg.vector_norm(self._v) - def project(self, x): - """ Projects given X vector(s) onto ECA space: x -> (t) -> w - """ - return self.expand(self.transform(x)) + @property + def vectors(self) -> torch.Tensor: + return self.V + + def __add(self) -> None: + """After successful training, found component added to the calculated ones, and reset""" + tmp = self.v.detach() + self.V = torch.vstack((self.V, tmp)) + # reset the weights + nn.init.xavier_normal_(self._v) + # remove the projection on the known components + with torch.no_grad(): + vx = self._v + self._v -= self.project(vx) - def predict_t(self, t): - """ - t -> (w) -> y - """ - t_ = np.array(t) - if t_.ndim == 1: - t_ = t_[None, :] - return self.predict(self.expand(t_)) + def transform(self, x: torch.Tensor, n_comp: int = None) -> torch.Tensor: + """Calculates t-scores (i.e. projection values of X onto ECA basis space""" + if n_comp is None: + return x @ self.V.T + return x @ self.V[:n_comp, ].T - def inverse_transform(self, y, **kwargs): - """ For the given Y vector, calculates the corresponding point in ECA space: y -> t + def expand(self, t: torch.Tensor, n_comp: int = None) -> torch.Tensor: + """Expands the t-scores to X space: t -> x' + Parameters + ------------- + t: torch.tensor + projections in the v-space + n_comp: int + Calculates only $comp components of basis space """ - def mse(x, z): - x_ = np.array(x)[None, :] - y_ = self.predict_t(x_) - return ((y_ - z) ** 2).sum() + if n_comp is None: + return t @ self.V + return t @ self.V[:n_comp, :] - success = False - run = 0 - runs = kwargs.get('runs') or 5 # Number of attempts to calculate the inverse transformation - err = 0 - mse_threshold = 0.1 # threshold to define a `good' fit - kwargs_ = {k: v for k, v in kwargs.items() if k not in ['runs']} - while not (success and err < mse_threshold) and run < runs: # some threshold - x0 = np.zeros(len(self._comp)) - opt = optimize.minimize(mse, x0=x0, args=(y,), **kwargs_) - success = opt['success'] - # print(f'Attempt {run}: Success:', success, mse(r.x, y)) - run += 1 - err = mse(opt.x, y) - return opt.x, {'success': success, 'runs': run, 'mse': err} + def project(self, x: torch.Tensor, n_comp: int = None) -> torch.Tensor: + """Projects x onto ECA basis space""" + return self.expand(self.transform(x, n_comp), n_comp) - def reconstruct(self, y, **kwargs): - """ For the given Y vector, estimates a point in ECA space for which the prediction accuracy is the best - y -> t -> w - """ - t, res = self.inverse_transform(y, **kwargs) - return self.expand(t), res + def forward(self, x: torch.Tensor) -> torch.Tensor: + """Used in the fit only. Projects X onto V space and predicts Y""" + v = self.v + proj = (x @ v.T) @ v + if self.n_calc > 0: + # if some components V already calculated, projection = sum of projections on V and v + return self.predict(self.project(x) + proj) + else: + return self.predict(proj) - def _fit_init(self, data_x, data_y): - """ Loads training data. Implemented to avoid multiple calls during validation fit + def fit(self, data_x: torch.Tensor, data_y: torch.Tensor, n_comp: int = 3, + options: dict = None, verbose: bool = False, keep: int = False) -> (torch.Tensor, list, list): """ - assert data_x.shape[0] == data_y.shape[0], 'X and Y data must have the same number of samples' - self.data_x = data_x - self.data_y = data_y - self.projected_x = np.zeros_like(data_x) - self.n = data_x.shape[1] - return None - - def _fit(self, n_comp=3, start=0, partial=None, **kwargs): - """ Evaluates n_comp ECA components such that the covered variance of the projected data is maximized. - (x, y) -> v Parameters - ---------- - n_comp: int, default 3 - Number of EC components to calculate - start: int, default 0 - Index of first component to calculate, allows to refit not from the beginning - partial: None or dict - If None, standard fit is performed - If dict, initial guess for the first component pre-evaluates using partial fitting + ------------- + data_x: torch.Tensor + data_y: torch.Tensor + n_comp: int + number of EC components to calculate + options: dict + 'lr': float, default 0.001 + initial learning rate + 'betas': tuple(float, float), default (0.9, 0.999) + beta parameters for Adam optimizer, see torch.optim.adam + 'epochs': int, default 10000 + Maximum number of epochs for training + 'batch_size': int, default 200 + Batch size for training + 'seed': int, default None + Seed for random number generator + verbose: bool, default False + If True, uses tqdm.trange to indicate progress of fit + keep: int or bool + Allow to continue the fit starting from specific component + if False or 0: restarts fit from the beginning + if True: starts from already calculated components + if N, int: start from given number of components Returns - ------- - self.vectors: list(np.array) - ECA components list of numpy arrays - self._y_var: list - Covered Y variance - self._x_var: list - Covered X variance - """ - if start > len(self._comp): - start = len(self._comp) - self._comp = self._comp[:start] - self._x_var = self._x_var[:start] - self._y_var = self._y_var[:start] - self.projected_x[:] = 0 - runs = kwargs.get('runs') or self.DEFAULT_RUNS - kwargs_ = {k: v for k, v in kwargs.items() if k not in ['runs']} - for comp in self._comp: - self.projected_x += self.project_x(comp) - for step in range(start, n_comp): - print(f'Running component #{step}') - if step == 0: - constrain = None - else: - constrain = optimize.LinearConstraint(self._comp, - -self.DEFAULT_CONSTRAINT_TOLERANCE, - self.DEFAULT_CONSTRAINT_TOLERANCE) - success = False - run = 0 - while not success and run < runs: - if step == 0 and partial is not None: - x0 = self.init_partial(partial) - else: - x0 = self.x0_init() - opt = optimize.minimize(self.projected_missing_variance, x0=x0, constraints=constrain, - options={'maxiter': self.DEFAULT_MAXITER}, **kwargs_) - success = opt['success'] - print(f'Attempt {run}: Success:', success) - run += 1 - ec = opt['x'] / np.sqrt((opt['x']**2).sum()) - self._comp.append(ec) - self.projected_x += self.project_x(ec) - self._y_var.append(1 - self.projected_missing_variance()) - self._x_var.append(1 - self.missing_variance(self.data_x, self.projected_x)) - print(self._y_var[-1], self._x_var[-1]) - return self.vectors, self._y_var, self._x_var - - def init_partial(self, partial=None): - """Performs initial guess for the first ECA component by partial fit - Parameters --------- - partial: dict - 'batch_size': int, - 'shuffle': bool, - 'tol': float, - 'runs': int, - Returns - ------- - np.array """ - runs = partial.get('runs') or self.DEFAULT_RUNS - batch_size = partial.get('batch_size') or self.DEFAULT_BATCH_SIZE - tol = partial.get('tol') or self.DEFAULT_TOLERANCE_PARTIAL - shuffle_ = True if partial.get('shuffle') is None else partial.get('shuffle') # None -> T, F -> F, T -> T - - def projected_partial_variance(x, y, w): - """ Returns missing partial variance between prediction of the projected points and known""" - p = y.copy() - p[w] = x - projected = self.projection(p) - predicted = self.predict(projected) - return self.missing_variance(self.data_y, predicted) + n = data_x.shape[1] + options = options or {} + seed = options.get('seed') or self.seed + if isinstance(seed, int): + torch.manual_seed(seed) + start = 0 + if keep: + n_comp_calculated = self.n_calc + if isinstance(keep, bool): + start = n_comp_calculated if keep else 0 + elif isinstance(keep, int): + if keep > n_comp_calculated: + print(f'Warning: Starting from {n_comp_calculated} components instead of {keep}') + start = n_comp_calculated + elif keep < 0: + start = n_comp_calculated + keep + if start < 0: + start = 0 + else: + start = keep + else: + raise TypeError('continue_ must be integer number or True/False') + self._init(n, start) # initialize V and v tensors + lr = options.get('lr') or self.DEFAULT_LR + betas = options.get('betas') or self.DEFAULT_BETAS + epochs = options.get('epochs') or self.DEFAULT_EPOCHS + batch_size = options.get('batch_size') or self.DEFAULT_BATCH_SIZE + tol = options.get('tol') or self.DEFAULT_TOL + optimizer = optim.Adam([self._v], lr=lr, betas=betas) + loss_fn = self.r2loss + train_dataset = TensorDataset(data_x, data_y) + for comp in range(start, n_comp): + print(f'Start component #{comp + 1}, start={start}') + if verbose: + epochs_ = trange(epochs) + else: + epochs_ = range(epochs) + training_loss = [] + for epoch in epochs_: + train_loader = DataLoader(train_dataset, batch_size) + batch_loss = [] + for x_batch, y_batch in train_loader: + # Generate predictions + pred = self.forward(x_batch) + loss = loss_fn(pred, y_batch) + optimizer.zero_grad() + batch_loss.append(loss.item()) + loss.backward() + optimizer.step() + if comp > 0: + # to ensure search of the orthogonal components, remove from the gradient its projection + # on the known components + with torch.no_grad(): + v = self._v + self._v.data -= self.project(v) - x0 = self.x0_init() - run = 0 - while run < runs: - batches = range(0, self.n, batch_size) - idx = list(range(self.n)) - if shuffle_: - shuffle(idx) - for batch in batches: - sub = idx[batch:batch + batch_size] - print(sub) - opt = optimize.minimize(projected_partial_variance, np.zeros(len(sub)), args=(x0, sub), - options={'maxiter': 1000000}, tol=tol) - success = opt['success'] - x0[sub] = opt.x - print(f'Ini run #{run}: Success: {success} Covered variance: {1 - self.projected_missing_variance(x0)}') - run += 1 - return x0 / np.sqrt((x0 ** 2).sum()) + training_loss.append(sum(batch_loss) / len(batch_loss)) + if len(training_loss) > 50: + # take mean of the last 5 training losses + dl = sum(training_loss[-6:-1]) / 5 - training_loss[-1] + if dl < tol: + print(f'Accuracy change level of {dl:2.4g} has been reached after epoch #{epoch+1}') + if dl < 0: + self._v.data = self.u.data # returns previous vector + break + self.u.data = self._v.data # keep track of the current vector + else: + print(f'Max number of epochs ({epoch+1}) has been reached. Accuracy change = {dl:2.4g}') + self.training_losses.append(training_loss) + self.__add() # save found component and reinitialize v + pred_y = self.predict(self.project(data_x)) + y_loss = loss_fn(pred_y, data_y).item() + self.y_var.append(1 - y_loss) + print(f'Covered variance for component {comp+1}:', 1-y_loss) + x_loss = loss_fn(self.project(data_x), data_x).item() + self.x_var.append(1 - x_loss) + return self.V, self.y_var, self.x_var - def _fit_validate(self, data_x, data_y, n_comp=3, start=0, validate=0.2, partial=None, **kwargs): - """ Performs fit with validation after each step + def _inverse(self, y: torch.Tensor, n_comp: int = None, options: dict = None) -> (torch.Tensor, torch.Tensor): + """This method returns an approximation of the point in t-space such that the prediction from this point + is closest to the given y. + Note: potentially, this can be done for all Y values simultaneously but in practice convergence is bad. Parameters ----------- - data_x: np.array - data_y: np.array + y: torch.tensor + given Y vector(-s) to calculate inverse n_comp: int - Number of components to calculate - start: int - First component to calculate, deault 0 - validate: float (0,..., 1) or int - Part of the data used for the validation of the results - If float, denoted fraction of the data for validation - If int, denoted number of samples for validation - Cf. validation for `sklearn.model_selection.train_test_split' - """ - self.validation = [] - test_x, val_x, test_y, val_y = train_test_split(data_x, data_y, test_size=validate, random_state=1) - self._fit_init(test_x, test_y) - for i in range(start, n_comp): - self._fit(n_comp=i + 1, start=i, partial=partial, **kwargs) - self.validation.append((1 - self.missing_variance(test_y, self.predict_t(self.transform(test_x))), - 1 - self.missing_variance(val_y, self.predict_t(self.transform(val_x))))) - print(i, self.validation[-1]) - return self.vectors, self._y_var, self._x_var - - @property - def validation_scores(self): - if self.validation: - return [v[1] for v in self.validation] - return None - - @property - def fit_scores(self): - return self._y_var - - def fit(self, data_x, data_y, n_comp=3, options=None, **kwargs): - """ Evaluates n_comp ECA components such that the covered variance of the projected data is maximized. - (x, y) -> v - Parameters - ---------- - data_x: np.array - input X data - data_y: np.array - input Y data - n_comp: int, default 3 - Number of EC components to calculate + if set, uses n_comp to calculate inverse transformations options: dict - validate: float, default 0.2, or None - If given, performs fit with validation after each component calculation. - Defines the part of the data to as validation set (must be < 1) - start: int, default 0 - Index of first component to calculate, allows to refit not from the very beginning - partial: None or dict - If None, standard fit is performed - If dict, initial guess for the first component pre-evaluates but partial fitting - Returns - ------- - self.vectors: list(np.array) - ECA components list of numpy arrays - self._y_var: list - Covered Y variance - self._x_var: list - Covered X variance + 'optimizer': str + optimizer for inverse transform, 'adam' (default) or 'sgd' + 'tol_inv' or 'tol': float, default 1E-4 + tolerance level to reach upon approximation + 'epochs_inv' or 'epochs': int, default 1000 + number of epochs to run + 'lr_inv' or 'lr' : float, default 0.05 + learning rate """ options = options or {} - validate = options.get('validate') - start = options.get('start') or kwargs.get('start') or 0 - partial = options.get('partial') or None - kwargs_ = kwargs - for k, v in options.items(): - if k not in ['start', 'validate', 'partial']: - kwargs_[k] = v + lr = options.get('lr_inv') or options.get('lr') or self.DEFAULT_LR_INV + epochs = options.get('epochs_inv') or options.get('epochs') or self.DEFAULT_EPOCHS_INV + betas = options.get('betas') or self.DEFAULT_BETAS + tol = options.get('tol_inv') or options.get('tol') or self.DEFAULT_TOL_INV + if n_comp is None: + n_comp = self.n_calc + assert 0 < n_comp <= self.n_calc, 'n_comp must be positive and not greater than number of calculated ' \ + 'components ' + t = torch.empty(y.size()[0], n_comp, dtype=torch.float32, requires_grad=True) + optimizer = optim.Adam([t], lr=lr, betas=betas) + nn.init.xavier_normal_(t) + err = None + loss_fn = nn.MSELoss() + training_loss = [] + for epoch in range(epochs): + proj = self.expand(t, n_comp) # <- t @ V[:n_comp] + pred = self.predict(proj) + loss = loss_fn(pred, y) + optimizer.zero_grad() + loss.backward() + optimizer.step() + err = loss.item() + training_loss.append(err) + if epoch > 100: + dl = sum(training_loss[-6:-1]) / 5 - err + if 0 < dl < tol: + break + return t.detach(), err - if validate is None: - self._fit_init(data_x, data_y) - return self._fit(n_comp=n_comp, start=start, partial=partial, **kwargs_) + def inverse(self, y: torch.Tensor, options: dict = None, n_comp: int = None, + verbose: bool = False, return_error: bool = False) -> (torch.Tensor, ): + """This method returns an approximation of the point in t-space such that the prediction from this point + is closest to the given y. + Parameters + ----------- + y: torch.tensor + given Y vector(-s) to calculate inverse + n_comp: int + if set, uses n_comp to calculate inverse transformations + options: dict + tol_inv or tol: float, default 1E-4 + tolerance level to reach upon approximation + epochs_inv or epochs: int, default 1000 + number of epochs to run + lr_inv or lr: float, default 0.05 + learning rate for optimizer + return_error: bool, default False + return MSE errors + verbose: bool, default False + to output progressbar, uses tqdm.trange + """ + if y.ndim == 1: + y = y[None, :] + n = y.shape[0] + if n_comp: + assert 0 < n_comp <= self.n_calc, 'n_comp must be positive and not greater than' \ + 'number of calculated components' + t_scores = torch.zeros((n, n_comp)) else: - return self._fit_validate(data_x, data_y, n_comp=n_comp, validate=validate, start=start, partial=partial, - **kwargs_) + t_scores = torch.zeros((n, self.n_calc)) + err = torch.zeros(n) + rng = trange(n) if verbose else range(n) + for idx in rng: + t_scores[idx, :], err[idx] = self._inverse(y[[idx]], n_comp=n_comp, options=options) + if return_error: + return t_scores, err + return t_scores - def load(self, filename): - """Load vectors and predictor from the file - Parameters - ----------- - filename: str - filename which must contain - - an emulator - - calculated ECA components + def reconstruct(self, y: torch.Tensor, options: dict = None, n_comp: int = None, + verbose: bool = False, return_error: bool = False) -> (torch.Tensor, ): + """ + See help(ECA.inverse) for parameters """ - with open(filename, 'rb') as f: - tmp = pickle.load(f) - self.predict = tmp['predict'] - self._comp = tmp['comp'] - self.n = len(self._comp[0]) + if return_error: + t_scores, err = self.inverse(y, options=options, n_comp=n_comp, verbose=verbose, return_error=True) + return self.expand(t_scores, n_comp=n_comp), err + else: + t_scores = self.inverse(y, options=options, n_comp=n_comp, verbose=verbose, return_error=False) + return self.expand(t_scores, n_comp=n_comp) - def save(self, filename): - """Save vectors and predictor to the file - Parameters - ----------- - filename: str + def test(self, x: torch.Tensor, y: torch.Tensor, n_comp: int = None) -> float: + """ Calculates covered variance for the given x, y, i.e. 1 - R2(predicted from projected, known) """ - tmp = {'predict': self.predict, - 'comp': self._comp} - with open(filename, 'wb+') as f: - pickle.dump(tmp, f) + y_predicted = self.predict(self.project(x, n_comp)) + return 1 - self.r2loss(y_predicted, y).item() diff --git a/eca_numpy.py b/eca_numpy.py new file mode 100644 index 0000000..0fe0cff --- /dev/null +++ b/eca_numpy.py @@ -0,0 +1,383 @@ +""" +This class implements Emulator-based component analysis (ECA), as described in +J. Niskanen, A. Vladyka, J. Niemi, and C. J. Sahle, "Emulator-based decomposition for structural sensitivity +of core-level spectra," Royal Society Open Science, vol. 9, 2022, doi: 10.1098/rsos.220093 +Flat implementation is available as Supplementary to this paper. + +The key idea is for a given predictor X -> Y (originally, structure-to-spectrum), we find the dominant directions +(a basis set V) in the X space, such that the predictions Y_pred for the points fromX, being projected onto V basis, +cover the majority of the known variance of Y. In the original work, we deduced the structural features of molecular +system which describe the spectral variations. +""" + +import numpy as np +import pickle # for saving|loading the model +from scipy import optimize +from random import shuffle +from sklearn.model_selection import train_test_split # for validation fit + + +class ECA: + """ + Usage + ----- + from eca import ECA + eca = ECA(model.predict_t) + v, yvar, xvar = eca.fit(data_x, data_y, n_comp=2, tol=5E-6, runs=1, validate=0.2, + partial={'batch_size':80, 'tol': 2E-6, 'shuffle': True, 'runs': 1}) + + Jargon + ------ + x : data_x [N x L] + y : data_y [N x M] + v : ECA components up to rank K, K x L + t : ECA scores, N x K + w : projections, N x L, w = t @ v + """ + DEFAULT_CONSTRAINT_TOLERANCE = 1E-10 + DEFAULT_MAXITER = 10**6 + DEFAULT_RUNS = 5 + DEFAULT_BATCH_SIZE = 50 + DEFAULT_TOLERANCE_PARTIAL = 1E-5 + + def __init__(self, predict): + """ + Parameters + ------------- + predict: func + predictor data_x -> data_y + """ + self.predict = predict + self._comp = [] + self._x_var = [] + self._y_var = [] + self.n = 0 + + @property + def vectors(self): + """Returns ECA component vectors + """ + return np.array(self._comp) + + @staticmethod + def missing_variance(known: np.array, predicted: np.array): + """Metric to define the spectral coverage + """ + known_mean = np.outer(np.ones(known.shape[0]), np.mean(known, axis=0)) + return (np.trace(np.matmul((known - predicted).T, (known - predicted))) / + np.trace(np.matmul((known - known_mean).T, known - known_mean))) + + def project_x(self, x): + """ Returns projection of the data_x on the given vector during fit + """ + y = np.array(x)[None, :] + return (self.data_x @ y.T) @ y / (y ** 2).sum() + + def projection(self, x=None): + """ Returns full projection of the data_x on the given vector AND already + evaluated ECA components during fit + """ + if x is None: + return self.projected_x + return self.project_x(x) + self.projected_x + + def projected_missing_variance(self, x=None): + """ Returns missing variance between prediction of the projected points and known""" + projected = self.projection(x) + predicted = self.predict(projected) + return self.missing_variance(self.data_y, predicted) + + def x0_init(self): + """ Initialization for the first ECA component search. + Initializes as a random unity vector where the variance is the smallest + """ + if len(self._comp) == 1: + x0 = None + var = 100 + for k in range(100): # initial guess + ini = np.random.randn(self.n) + ini /= np.sqrt((ini**2).sum()) + proj = self.projected_missing_variance(ini) + if proj < var: + var = proj + x0 = ini + else: + x0 = np.random.randn(self.n) + x0 = x0 / np.sqrt((x0 ** 2).sum()) + return x0 + + def transform(self, x): + """ Calculates projection of given X vector(s) onto ECA space: x -> t + """ + if x.ndim == 1: + x = x[None, :] + return x @ self.vectors.T + + def expand(self, t): + """ Converts vectors from ECA space to X space: t -> w + """ + return t @ self.vectors + + def project(self, x): + """ Projects given X vector(s) onto ECA space: x -> (t) -> w + """ + return self.expand(self.transform(x)) + + def predict_t(self, t): + """ + t -> (w) -> y + """ + t_ = np.array(t) + if t_.ndim == 1: + t_ = t_[None, :] + return self.predict(self.expand(t_)) + + def inverse_transform(self, y, **kwargs): + """ For the given Y vector, calculates the corresponding point in ECA space: y -> t + """ + def mse(x, z): + x_ = np.array(x)[None, :] + y_ = self.predict_t(x_) + return ((y_ - z) ** 2).sum() + + success = False + run = 0 + runs = kwargs.get('runs') or 5 # Number of attempts to calculate the inverse transformation + err = 0 + mse_threshold = 0.1 # threshold to define a `good' fit + kwargs_ = {k: v for k, v in kwargs.items() if k not in ['runs']} + while not (success and err < mse_threshold) and run < runs: # some threshold + x0 = np.zeros(len(self._comp)) + opt = optimize.minimize(mse, x0=x0, args=(y,), **kwargs_) + success = opt['success'] + # print(f'Attempt {run}: Success:', success, mse(r.x, y)) + run += 1 + err = mse(opt.x, y) + return opt.x, {'success': success, 'runs': run, 'mse': err} + + def reconstruct(self, y, **kwargs): + """ For the given Y vector, estimates a point in ECA space for which the prediction accuracy is the best + y -> t -> w + """ + t, res = self.inverse_transform(y, **kwargs) + return self.expand(t), res + + def _fit_init(self, data_x, data_y): + """ Loads training data. Implemented to avoid multiple calls during validation fit + """ + assert data_x.shape[0] == data_y.shape[0], 'X and Y data must have the same number of samples' + self.data_x = data_x + self.data_y = data_y + self.projected_x = np.zeros_like(data_x) + self.n = data_x.shape[1] + return None + + def _fit(self, n_comp=3, start=0, partial=None, **kwargs): + """ Evaluates n_comp ECA components such that the covered variance of the projected data is maximized. + (x, y) -> v + Parameters + ---------- + n_comp: int, default 3 + Number of EC components to calculate + start: int, default 0 + Index of first component to calculate, allows to refit not from the beginning + partial: None or dict + If None, standard fit is performed + If dict, initial guess for the first component pre-evaluates using partial fitting + Returns + ------- + self.vectors: list(np.array) + ECA components list of numpy arrays + self._y_var: list + Covered Y variance + self._x_var: list + Covered X variance + """ + if start > len(self._comp): + start = len(self._comp) + self._comp = self._comp[:start] + self._x_var = self._x_var[:start] + self._y_var = self._y_var[:start] + self.projected_x[:] = 0 + runs = kwargs.get('runs') or self.DEFAULT_RUNS + kwargs_ = {k: v for k, v in kwargs.items() if k not in ['runs']} + for comp in self._comp: + self.projected_x += self.project_x(comp) + for step in range(start, n_comp): + print(f'Running component #{step}') + if step == 0: + constrain = None + else: + constrain = optimize.LinearConstraint(self._comp, + -self.DEFAULT_CONSTRAINT_TOLERANCE, + self.DEFAULT_CONSTRAINT_TOLERANCE) + success = False + run = 0 + while not success and run < runs: + if step == 0 and partial is not None: + x0 = self.init_partial(partial) + else: + x0 = self.x0_init() + opt = optimize.minimize(self.projected_missing_variance, x0=x0, constraints=constrain, + options={'maxiter': self.DEFAULT_MAXITER}, **kwargs_) + success = opt['success'] + print(f'Attempt {run}: Success:', success) + run += 1 + ec = opt['x'] / np.sqrt((opt['x']**2).sum()) + self._comp.append(ec) + self.projected_x += self.project_x(ec) + self._y_var.append(1 - self.projected_missing_variance()) + self._x_var.append(1 - self.missing_variance(self.data_x, self.projected_x)) + print(self._y_var[-1], self._x_var[-1]) + return self.vectors, self._y_var, self._x_var + + def init_partial(self, partial=None): + """Performs initial guess for the first ECA component by partial fit + Parameters + --------- + partial: dict + 'batch_size': int, + 'shuffle': bool, + 'tol': float, + 'runs': int, + Returns + ------- + np.array + """ + runs = partial.get('runs') or self.DEFAULT_RUNS + batch_size = partial.get('batch_size') or self.DEFAULT_BATCH_SIZE + tol = partial.get('tol') or self.DEFAULT_TOLERANCE_PARTIAL + shuffle_ = True if partial.get('shuffle') is None else partial.get('shuffle') # None -> T, F -> F, T -> T + + def projected_partial_variance(x, y, w): + """ Returns missing partial variance between prediction of the projected points and known""" + p = y.copy() + p[w] = x + projected = self.projection(p) + predicted = self.predict(projected) + return self.missing_variance(self.data_y, predicted) + + x0 = self.x0_init() + run = 0 + while run < runs: + batches = range(0, self.n, batch_size) + idx = list(range(self.n)) + if shuffle_: + shuffle(idx) + for batch in batches: + sub = idx[batch:batch + batch_size] + print(sub) + opt = optimize.minimize(projected_partial_variance, np.zeros(len(sub)), args=(x0, sub), + options={'maxiter': 1000000}, tol=tol) + success = opt['success'] + x0[sub] = opt.x + print(f'Ini run #{run}: Success: {success} Covered variance: {1 - self.projected_missing_variance(x0)}') + run += 1 + return x0 / np.sqrt((x0 ** 2).sum()) + + def _fit_validate(self, data_x, data_y, n_comp=3, start=0, validate=0.2, partial=None, **kwargs): + """ Performs fit with validation after each step + Parameters + ----------- + data_x: np.array + data_y: np.array + n_comp: int + Number of components to calculate + start: int + First component to calculate, deault 0 + validate: float (0,..., 1) or int + Part of the data used for the validation of the results + If float, denoted fraction of the data for validation + If int, denoted number of samples for validation + Cf. validation for `sklearn.model_selection.train_test_split' + """ + self.validation = [] + test_x, val_x, test_y, val_y = train_test_split(data_x, data_y, test_size=validate, random_state=1) + self._fit_init(test_x, test_y) + for i in range(start, n_comp): + self._fit(n_comp=i + 1, start=i, partial=partial, **kwargs) + self.validation.append((1 - self.missing_variance(test_y, self.predict_t(self.transform(test_x))), + 1 - self.missing_variance(val_y, self.predict_t(self.transform(val_x))))) + print(i, self.validation[-1]) + return self.vectors, self._y_var, self._x_var + + @property + def validation_scores(self): + if self.validation: + return [v[1] for v in self.validation] + return None + + @property + def fit_scores(self): + return self._y_var + + def fit(self, data_x, data_y, n_comp=3, options=None, **kwargs): + """ Evaluates n_comp ECA components such that the covered variance of the projected data is maximized. + (x, y) -> v + Parameters + ---------- + data_x: np.array + input X data + data_y: np.array + input Y data + n_comp: int, default 3 + Number of EC components to calculate + options: dict + validate: float, default 0.2, or None + If given, performs fit with validation after each component calculation. + Defines the part of the data to as validation set (must be < 1) + start: int, default 0 + Index of first component to calculate, allows to refit not from the very beginning + partial: None or dict + If None, standard fit is performed + If dict, initial guess for the first component pre-evaluates but partial fitting + Returns + ------- + self.vectors: list(np.array) + ECA components list of numpy arrays + self._y_var: list + Covered Y variance + self._x_var: list + Covered X variance + """ + options = options or {} + validate = options.get('validate') + start = options.get('start') or kwargs.get('start') or 0 + partial = options.get('partial') or None + kwargs_ = kwargs + for k, v in options.items(): + if k not in ['start', 'validate', 'partial']: + kwargs_[k] = v + + if validate is None: + self._fit_init(data_x, data_y) + return self._fit(n_comp=n_comp, start=start, partial=partial, **kwargs_) + else: + return self._fit_validate(data_x, data_y, n_comp=n_comp, validate=validate, start=start, partial=partial, + **kwargs_) + + def load(self, filename): + """Load vectors and predictor from the file + Parameters + ----------- + filename: str + filename which must contain + - an emulator + - calculated ECA components + """ + with open(filename, 'rb') as f: + tmp = pickle.load(f) + self.predict = tmp['predict'] + self._comp = tmp['comp'] + self.n = len(self._comp[0]) + + def save(self, filename): + """Save vectors and predictor to the file + Parameters + ----------- + filename: str + """ + tmp = {'predict': self.predict, + 'comp': self._comp} + with open(filename, 'wb+') as f: + pickle.dump(tmp, f) diff --git a/ecat.py b/ecat.py deleted file mode 100644 index 8fca043..0000000 --- a/ecat.py +++ /dev/null @@ -1,355 +0,0 @@ -import torch -from torch import nn -import torch.optim as optim -from torch.utils.data import TensorDataset, DataLoader -from tqdm import trange - - -class ECA(nn.Module): - """ECA approach implemented in pytorch - Requires torch-based emulator to predict X -> Y - Example: - # test_x_t and test_y_t are tensors (test X and Y data, respectively) - # convert numpy array to torch.tensors via e.g. - test_x_t, test_y_t = map(lambda x: torch.tensor(x, dtype=torch.float32), [test_x, test_y]) - eca = ECA(model) # initialize with a trained predictor (for pytorch the model itself is a predictor, - i.e., Y = model(X) <=> Y = model.forward(X)) - options = { - 'tol' : 1E-4, - 'epochs': 10000, - 'lr' : 1E-3, - 'batch_size': 200, - 'seed': 123, - } - v, y_var, x_var = eca.fit(test_x_t, test_y_t, n_comp=3, options=options) - # check orthonormality - # must be unity matrix - print(v @ v.T) - """ - # default options - DEFAULT_LR = 1E-3 - DEFAULT_TOL = 1E-4 - DEFAULT_EPOCHS = 10000 - DEFAULT_BETAS = (0.9, 0.999) - DEFAULT_BATCH_SIZE = 200 - DEFAULT_LR_INV = 5E-2 - DEFAULT_TOL_INV = 1E-4 - DEFAULT_EPOCHS_INV = 1000 - - def __init__(self, predict: nn.Module): - """ - Parameters - ----------- - predict: nn.Module or func X -> Y - Predictor, torch-based - """ - super().__init__() - self.predict = predict - self.V = None # calculated components - self._v = None # currently evaluated component - self.u = None - self.y_var: list = [] # covered variance for Y - self.x_var: list = [] # covered variance for X - self.training_losses: list = [] - self.seed = None - - def set_seed(self, seed: int) -> None: - """ Set seed for torch.manual_seed - Affects initial guess of the vector (via xavier_init_) and batches while training """ - self.seed = seed - - @property - def n_calc(self) -> int: - """Number of already calculated components""" - return self.V.size()[0] if self.V is not None else None - - def _init(self, n: int, start: int = 0) -> None: - """ - Initializes the vectors as well as resets them after restart of the fit - """ - self.n = n # dimensionality of input data - if self.V is None: - self.V = torch.empty(0, self.n, dtype=torch.float32) # already calculated EC components - self._v = torch.empty(1, self.n, dtype=torch.float32, requires_grad=True) # currently evaluated component - self.u = torch.empty(1, self.n, dtype=torch.float32) # to keep current vector - assert start >= 0, 'Starting component number must be nonnegative' - self.V = self.V[:start, ] - self.x_var = self.x_var[:start] - self.y_var = self.y_var[:start] - nn.init.xavier_normal_(self._v) - - @staticmethod - def r2loss(predicted: torch.Tensor, known: torch.Tensor) -> torch.Tensor: - """Calculated generalized missing variance between known and predicted values. - Covered variance = 1 - missing variance""" - known_mean = torch.outer(torch.ones(known.shape[0]), known.mean(axis=0)) - return torch.trace((known - predicted).T @ (known - predicted)) / torch.trace( - (known - known_mean).T @ (known - known_mean)) - - @property - def v(self) -> torch.Tensor: - """Returns currently evaluated normalized vector""" - return self._v / torch.linalg.vector_norm(self._v) - - @property - def vectors(self) -> torch.Tensor: - return self.V - - def __add(self) -> None: - """After successful training, found component added to the calculated ones, and reset""" - tmp = self.v.detach() - self.V = torch.vstack((self.V, tmp)) - # reset the weights - nn.init.xavier_normal_(self._v) - # remove the projection on the known components - with torch.no_grad(): - vx = self._v - self._v -= self.project(vx) - - def transform(self, x: torch.Tensor, n_comp: int = None) -> torch.Tensor: - """Calculates t-scores (i.e. projection values of X onto ECA basis space""" - if n_comp is None: - return x @ self.V.T - return x @ self.V[:n_comp, ].T - - def expand(self, t: torch.Tensor, n_comp: int = None) -> torch.Tensor: - """Expands the t-scores to X space: t -> x' - Parameters - ------------- - t: torch.tensor - projections in the v-space - n_comp: int - Calculates only $comp components of basis space - """ - if n_comp is None: - return t @ self.V - return t @ self.V[:n_comp, :] - - def project(self, x: torch.Tensor, n_comp: int = None) -> torch.Tensor: - """Projects x onto ECA basis space""" - return self.expand(self.transform(x, n_comp), n_comp) - - def forward(self, x: torch.Tensor) -> torch.Tensor: - """Used in the fit only. Projects X onto V space and predicts Y""" - v = self.v - proj = (x @ v.T) @ v - if self.n_calc > 0: - # if some components V already calculated, projection = sum of projections on V and v - return self.predict(self.project(x) + proj) - else: - return self.predict(proj) - - def fit(self, data_x: torch.Tensor, data_y: torch.Tensor, n_comp: int = 3, - options: dict = None, verbose: bool = False, keep: int = False) -> (torch.Tensor, list, list): - """ - Parameters - ------------- - data_x: torch.Tensor - data_y: torch.Tensor - n_comp: int - number of EC components to calculate - options: dict - 'lr': float, default 0.001 - initial learning rate - 'betas': tuple(float, float), default (0.9, 0.999) - beta parameters for Adam optimizer, see torch.optim.adam - 'epochs': int, default 10000 - Maximum number of epochs for training - 'batch_size': int, default 200 - Batch size for training - 'seed': int, default None - Seed for random number generator - verbose: bool, default False - If True, uses tqdm.trange to indicate progress of fit - keep: int or bool - Allow to continue the fit starting from specific component - if False or 0: restarts fit from the beginning - if True: starts from already calculated components - if N, int: start from given number of components - Returns - --------- - """ - n = data_x.shape[1] - options = options or {} - seed = options.get('seed') or self.seed - if isinstance(seed, int): - torch.manual_seed(seed) - start = 0 - if keep: - n_comp_calculated = self.n_calc - if isinstance(keep, bool): - start = n_comp_calculated if keep else 0 - elif isinstance(keep, int): - if keep > n_comp_calculated: - print(f'Warning: Starting from {n_comp_calculated} components instead of {keep}') - start = n_comp_calculated - elif keep < 0: - start = n_comp_calculated + keep - if start < 0: - start = 0 - else: - start = keep - else: - raise TypeError('continue_ must be integer number or True/False') - self._init(n, start) # initialize V and v tensors - lr = options.get('lr') or self.DEFAULT_LR - betas = options.get('betas') or self.DEFAULT_BETAS - epochs = options.get('epochs') or self.DEFAULT_EPOCHS - batch_size = options.get('batch_size') or self.DEFAULT_BATCH_SIZE - tol = options.get('tol') or self.DEFAULT_TOL - optimizer = optim.Adam([self._v], lr=lr, betas=betas) - loss_fn = self.r2loss - train_dataset = TensorDataset(data_x, data_y) - for comp in range(start, n_comp): - print(f'Start component #{comp + 1}, start={start}') - if verbose: - epochs_ = trange(epochs) - else: - epochs_ = range(epochs) - training_loss = [] - for epoch in epochs_: - train_loader = DataLoader(train_dataset, batch_size) - batch_loss = [] - for x_batch, y_batch in train_loader: - # Generate predictions - pred = self.forward(x_batch) - loss = loss_fn(pred, y_batch) - optimizer.zero_grad() - batch_loss.append(loss.item()) - loss.backward() - optimizer.step() - if comp > 0: - # to ensure search of the orthogonal components, remove from the gradient its projection - # on the known components - with torch.no_grad(): - v = self._v - self._v.data -= self.project(v) - - training_loss.append(sum(batch_loss) / len(batch_loss)) - if len(training_loss) > 50: - # take mean of the last 5 training losses - dl = sum(training_loss[-6:-1]) / 5 - training_loss[-1] - if dl < tol: - print(f'Accuracy change level of {dl:2.4g} has been reached after epoch #{epoch+1}') - if dl < 0: - self._v.data = self.u.data # returns previous vector - break - self.u.data = self._v.data # keep track of the current vector - else: - print(f'Max number of epochs ({epoch+1}) has been reached. Accuracy change = {dl:2.4g}') - self.training_losses.append(training_loss) - self.__add() # save found component and reinitialize v - pred_y = self.predict(self.project(data_x)) - y_loss = loss_fn(pred_y, data_y).item() - self.y_var.append(1 - y_loss) - print(f'Covered variance for component {comp+1}:', 1-y_loss) - x_loss = loss_fn(self.project(data_x), data_x).item() - self.x_var.append(1 - x_loss) - return self.V, self.y_var, self.x_var - - def _inverse(self, y: torch.Tensor, n_comp: int = None, options: dict = None) -> (torch.Tensor, torch.Tensor): - """This method returns an approximation of the point in t-space such that the prediction from this point - is closest to the given y. - Note: potentially, this can be done for all Y values simultaneously but in practice convergence is bad. - Parameters - ----------- - y: torch.tensor - given Y vector(-s) to calculate inverse - n_comp: int - if set, uses n_comp to calculate inverse transformations - options: dict - 'optimizer': str - optimizer for inverse transform, 'adam' (default) or 'sgd' - 'tol_inv' or 'tol': float, default 1E-4 - tolerance level to reach upon approximation - 'epochs_inv' or 'epochs': int, default 1000 - number of epochs to run - 'lr_inv' or 'lr' : float, default 0.05 - learning rate - """ - options = options or {} - lr = options.get('lr_inv') or options.get('lr') or self.DEFAULT_LR_INV - epochs = options.get('epochs_inv') or options.get('epochs') or self.DEFAULT_EPOCHS_INV - betas = options.get('betas') or self.DEFAULT_BETAS - tol = options.get('tol_inv') or options.get('tol') or self.DEFAULT_TOL_INV - if n_comp is None: - n_comp = self.n_calc - assert 0 < n_comp <= self.n_calc, 'n_comp must be positive and not greater than number of calculated ' \ - 'components ' - t = torch.empty(y.size()[0], n_comp, dtype=torch.float32, requires_grad=True) - optimizer = optim.Adam([t], lr=lr, betas=betas) - nn.init.xavier_normal_(t) - err = None - loss_fn = nn.MSELoss() - training_loss = [] - for epoch in range(epochs): - proj = self.expand(t, n_comp) # <- t @ V[:n_comp] - pred = self.predict(proj) - loss = loss_fn(pred, y) - optimizer.zero_grad() - loss.backward() - optimizer.step() - err = loss.item() - training_loss.append(err) - if epoch > 100: - dl = sum(training_loss[-6:-1]) / 5 - err - if 0 < dl < tol: - break - return t.detach(), err - - def inverse(self, y: torch.Tensor, options: dict = None, n_comp: int = None, - verbose: bool = False, return_error: bool = False) -> (torch.Tensor, ): - """This method returns an approximation of the point in t-space such that the prediction from this point - is closest to the given y. - Parameters - ----------- - y: torch.tensor - given Y vector(-s) to calculate inverse - n_comp: int - if set, uses n_comp to calculate inverse transformations - options: dict - tol_inv or tol: float, default 1E-4 - tolerance level to reach upon approximation - epochs_inv or epochs: int, default 1000 - number of epochs to run - lr_inv or lr: float, default 0.05 - learning rate for optimizer - return_error: bool, default False - return MSE errors - verbose: bool, default False - to output progressbar, uses tqdm.trange - """ - if y.ndim == 1: - y = y[None, :] - n = y.shape[0] - if n_comp: - assert 0 < n_comp <= self.n_calc, 'n_comp must be positive and not greater than' \ - 'number of calculated components' - t_scores = torch.zeros((n, n_comp)) - else: - t_scores = torch.zeros((n, self.n_calc)) - err = torch.zeros(n) - rng = trange(n) if verbose else range(n) - for idx in rng: - t_scores[idx, :], err[idx] = self._inverse(y[[idx]], n_comp=n_comp, options=options) - if return_error: - return t_scores, err - return t_scores - - def reconstruct(self, y: torch.Tensor, options: dict = None, n_comp: int = None, - verbose: bool = False, return_error: bool = False) -> (torch.Tensor, ): - """ - See help(ECA.inverse) for parameters - """ - if return_error: - t_scores, err = self.inverse(y, options=options, n_comp=n_comp, verbose=verbose, return_error=True) - return self.expand(t_scores, n_comp=n_comp), err - else: - t_scores = self.inverse(y, options=options, n_comp=n_comp, verbose=verbose, return_error=False) - return self.expand(t_scores, n_comp=n_comp) - - def test(self, x: torch.Tensor, y: torch.Tensor, n_comp: int = None) -> float: - """ Calculates covered variance for the given x, y, i.e. 1 - R2(predicted from projected, known) - """ - y_predicted = self.predict(self.project(x, n_comp)) - return 1 - self.r2loss(y_predicted, y).item() -- GitLab