Source code for topopy.TopologicalObject

import sys
import time
import warnings

import numpy as np
import sklearn.preprocessing

import nglpy as ngl
# import nglpy_cuda as ngl


[docs]class TopologicalObject(object): """ A base class for housing common interactions between Morse and Morse-Smale complexes, and Contour and Merge Trees Parameters ---------- graph : nglpy.Graph A graph object used for determining neighborhoods in gradient estimation gradient : str An optional string specifying the type of gradient estimator to use. Currently the only available option is 'steepest'. normalization : str An optional string specifying whether the inputs/output should be scaled before computing. Currently, two modes are supported 'zscore' and 'feature'. 'zscore' will ensure the data has a mean of zero and a standard deviation of 1 by subtracting the mean and dividing by the variance. 'feature' scales the data into the unit hypercube. aggregator : str An optional string that specifies what type of aggregation to do when duplicates are found in the domain space. Default value is None meaning the code will error if duplicates are identified. debug : bool An optional boolean flag for whether debugging output should be enabled. short_circuit : bool An optional boolean flag for whether the contour tree should be short circuited. Enabling this will speed up the processing by bypassing the fully augmented search and only focusing on partially augmented split and join trees """ precision = 16
[docs] @staticmethod def aggregate_duplicates(X, Y, aggregator="mean", precision=precision): """ A function that will attempt to collapse duplicates in domain space, X, by aggregating values over the range space, Y. Parameters ---------- X : np.ndarray An m-by-n array of values specifying m n-dimensional samples Y : np.array A m vector of values specifying the output responses corresponding to the m samples specified by X aggregator : str An optional string or callable object that specifies what type of aggregation to do when duplicates are found in the domain space. Default value is mean meaning the code will calculate the mean range value over each of the unique, duplicated samples. precision : int An optional positive integer specifying how many digits numbers should be rounded to in order to determine if they are unique or not. Returns ------- tuple(np.ndarray, np.array) A tuple where the first value is an m'-by-n array specifying the unique domain samples and the second value is an m' vector specifying the associated range values. m' <= m. """ if callable(aggregator): pass elif "min" in aggregator.lower(): aggregator = np.min elif "max" in aggregator.lower(): aggregator = np.max elif "median" in aggregator.lower(): aggregator = np.median elif aggregator.lower() in ["average", "mean"]: aggregator = np.mean elif "first" in aggregator.lower(): def aggregator(x): return x[0] elif "last" in aggregator.lower(): def aggregator(x): return x[-1] else: warnings.warn( 'Aggregator "{}" not understood. Skipping sample ' "aggregation.".format(aggregator) ) return X, Y is_y_multivariate = Y.ndim > 1 X_rounded = X.round(decimals=precision) unique_xs = np.unique(X_rounded, axis=0) old_size = len(X_rounded) new_size = len(unique_xs) if old_size == new_size: return X, Y if not is_y_multivariate: Y = np.atleast_2d(Y).T reduced_y = np.empty((new_size, Y.shape[1])) warnings.warn( "Domain space duplicates caused a data reduction. " + "Original size: {} vs. New size: {}".format(old_size, new_size) ) for col in range(Y.shape[1]): for i, distinct_row in enumerate(unique_xs): filtered_rows = np.all(X_rounded == distinct_row, axis=1) reduced_y[i, col] = aggregator(Y[filtered_rows, col]) if not is_y_multivariate: reduced_y = reduced_y.flatten() return unique_xs, reduced_y
def __init__( self, graph=None, gradient="steepest", normalization=None, aggregator=None, debug=False, ): super(TopologicalObject, self).__init__() self.reset() if graph is None: graph = ngl.EmptyRegionGraph() self.graph = graph self.gradient = gradient self.normalization = normalization self.debug = debug self.aggregator = aggregator
[docs] def reset(self): """ Empties all internal storage containers Returns ------- None """ self.X = [] self.Y = [] self.w = [] self.Xnorm = []
def __set_data(self, X, Y, w=None): """ Internally assigns the input data and normalizes it according to the user's specifications @ In, X, an m-by-n array of values specifying m n-dimensional samples @ In, Y, a m vector of values specifying the output responses corresponding to the m samples specified by X @ In, w, an optional m vector of values specifying the weights associated to each of the m samples used. Default of None means all points will be equally weighted """ self.X = X self.Y = Y self.check_duplicates() if w is not None: self.w = np.array(w) else: self.w = np.ones(len(Y)) * 1.0 / float(len(Y)) if self.normalization == "feature": # This doesn't work with one-dimensional arrays on older # versions of sklearn min_max_scaler = sklearn.preprocessing.MinMaxScaler() self.Xnorm = min_max_scaler.fit_transform(np.atleast_2d(self.X)) elif self.normalization == "zscore": self.Xnorm = sklearn.preprocessing.scale( self.X, axis=0, with_mean=True, with_std=True, copy=True ) else: self.Xnorm = np.array(self.X)
[docs] def build(self, X, Y, w=None): """ Assigns data to this object and builds the requested topological structure Uses an internal graph given in the constructor to build a topological object on the passed in data. Weights are currently ignored. Parameters ---------- X : np.ndarray An m-by-n array of values specifying m n-dimensional samples Y : np.array An m vector of values specifying the output responses corresponding to the m samples specified by X w : np.array An optional m vector of values specifying the weights associated to each of the m samples used. Default of None means all points will be equally weighted Returns ------- None """ self.reset() if X is None or Y is None: return self.__set_data(X, Y, w) if self.debug: sys.stdout.write("Graph Preparation: ") start = time.perf_counter() self.graph.build(self.Xnorm) if self.debug: end = time.perf_counter() sys.stdout.write("%f s\n" % (end - start))
[docs] def load_data_and_build(self, filename, delimiter=","): """ Convenience function for directly working with a data file. This opens a file and reads the data into an array, sets the data as an nparray and list of dimnames Parameters ---------- filename : str string representing the data file Returns ------- None """ data = np.genfromtxt( filename, dtype=float, delimiter=delimiter, names=True ) data = data.view(np.float64).reshape(data.shape + (-1,)) X = data[:, 0:-1] Y = data[:, -1] self.build(X=X, Y=Y)
[docs] def get_normed_x(self, rows=None, cols=None): """ Returns the normalized input data requested by the user. Parameters ---------- rows : list of int A list of non-negative integers specifying the row indices to return cols : list of int A list of non-negative integers specifying the column indices to return Returns ------- np.ndarray A matrix of floating point values specifying the normalized data values used in internal computations filtered by the three input parameters. """ if rows is None: rows = list(range(0, self.get_sample_size())) if cols is None: cols = list(range(0, self.get_dimensionality())) if not hasattr(rows, "__iter__"): rows = [rows] rows = sorted(list(set(rows))) retValue = self.Xnorm[rows, :] return retValue[:, cols]
[docs] def get_x(self, rows=None, cols=None): """ Returns the input data requested by the user Parameters ---------- rows : list of int A list of non-negative integers specifying the row indices to return cols : list of int A list of non-negative integers specifying the column indices to return Returns ------- np.ndarray A matrix of floating point values specifying the input data values filtered by the two input parameters. """ if rows is None: rows = list(range(0, self.get_sample_size())) if cols is None: cols = list(range(0, self.get_dimensionality())) if not hasattr(rows, "__iter__"): rows = [rows] rows = sorted(list(set(rows))) retValue = self.X[rows, :] if len(rows) == 0: return [] return retValue[:, cols]
[docs] def get_y(self, indices=None): """ Returns the output data requested by the user Parameters ---------- indices : list of int A list of non-negative integers specifying the row indices to return Returns ------- np.array An array of floating point values specifying the output data values filtered by the indices input parameter. """ if indices is None: indices = list(range(0, self.get_sample_size())) else: if not hasattr(indices, "__iter__"): indices = [indices] indices = sorted(list(set(indices))) if len(indices) == 0: return [] return self.Y[indices]
[docs] def get_weights(self, indices=None): """ Returns the weights requested by the user Parameters ---------- indices : list of int A list of non-negative integers specifying the row indices to return Returns ------- np.array An array of floating point values specifying the weights associated to the input data rows filtered by the indices input parameter. """ if indices is None: indices = list(range(0, self.get_sample_size())) else: indices = sorted(list(set(indices))) if len(indices) == 0: return [] return self.w[indices]
[docs] def get_sample_size(self): """ Returns the number of samples in the input data Returns ------- int Integer specifying the number of samples. """ return len(self.Y)
[docs] def get_dimensionality(self): """ Returns the dimensionality of the input space of the input data Returns ------- int Integer specifying the dimensionality of the input samples. """ return self.X.shape[1]
[docs] def get_neighbors(self, idx): """ Returns a list of neighbors for the specified index Parameters ---------- idx : int An integer specifying the query point Returns ------- list of int Integer list of neighbors indices """ return self.graph.neighbors(int(idx))
[docs] def check_duplicates(self): """ Function to test whether duplicates exist in the input or output space. First, if an aggregator function has been specified, the domain space duplicates will be consolidated using the function to generate a new range value for that shared point. Otherwise, it will raise a ValueError. The function will raise a warning if duplicates exist in the output space Returns ------- None """ if self.aggregator is not None: X, Y = TopologicalObject.aggregate_duplicates( self.X, self.Y, self.aggregator ) self.X = X self.Y = Y temp_x = self.X.round(decimals=TopologicalObject.precision) unique_xs = len(np.unique(temp_x, axis=0)) # unique_ys = len(np.unique(self.Y, axis=0)) # if len(self.Y) != unique_ys: # warnings.warn('Range space has duplicates. Simulation of ' # 'simplicity may help, but artificial noise may ' # 'occur in flat regions of the domain. Sample size:' # '{} vs. Unique Records: {}'.format(len(self.Y), # unique_ys)) if len(self.X) != unique_xs: raise ValueError( "Domain space has duplicates. Try using an " "aggregator function to consolidate duplicates " "into a single sample with one range value. " "e.g., " + self.__class__.__name__ + "(aggregator='max'). " "\n\tNumber of " "Records: {}\n\tNumber of Unique Records: {}\n".format( len(self.X), unique_xs ) )