Source code for nums.sklearn

import warnings
from typing import Optional, Union

import numpy as np

from nums.core.array.blockarray import BlockArray
from nums.core.array.random import NumsRandomState
from nums.core.application_manager import (
    instance,
    call_on_create,
    ArrayApplication,
)


# pylint: disable = import-outside-toplevel


def _register_train_test_split(app: ArrayApplication):
    from sklearn import model_selection
    from numpy.random import Generator, RandomState

    from nums.core.compute.numpy_compute import block_rng

    def train_test_split_wrapper(
        *arrays,
        rng_params,
        test_size=None,
        train_size=None,
        shuffle=True,
        stratify=None
    ):
        if rng_params is None:
            random_state = None
        else:
            rng: Generator = block_rng(*rng_params)
            # TODO:  Because train_test_split doesn't support Generator,
            #  we need a way to obtain RandomState from Generator instance.
            #  Need to ensure this provides same guarantees on collisions that Generator provides.
            seed: int = rng.integers(0, np.iinfo(np.uint32).max)
            random_state = RandomState(seed=seed)
        return model_selection.train_test_split(
            *arrays,
            test_size=test_size,
            train_size=train_size,
            random_state=random_state,
            shuffle=shuffle,
            stratify=stratify
        )

    app.cm.register("train_test_split", train_test_split_wrapper, {})


call_on_create(_register_train_test_split)


def _check_array(array, strict=False):
    if not isinstance(array, BlockArray):
        if strict:
            raise TypeError("Input array is not a BlockArray.")
        # These arrays should be a single block.
        array = instance().array(array, block_shape=array.shape)
    if not array.is_single_block():
        if strict:
            raise ValueError("Input array is not a single block.")
        array_size_gb = array.nbytes / 10 ** 9
        if array_size_gb > 100.0:
            raise MemoryError(
                "Operating on an "
                "array of size %sGB is not supported." % array_size_gb
            )
        elif array_size_gb > 10.0:
            # This is a large array of size 10GB.
            warnings.warn(
                "Attempting to convert an array "
                "of size %sGB to a single block." % array_size_gb
            )
        array = array.to_single_block()
    return array


[docs]def train_test_split( *arrays, test_size: Union[int, float] = None, train_size: Union[int, float] = None, random_state: Optional[Union[NumsRandomState, int]] = None, shuffle: bool = True, stratify=None ): # pylint: disable = protected-access updated_arrays = [] for array in arrays: updated_arrays.append(_check_array(array)) syskwargs = { "options": {"num_returns": 2 * len(updated_arrays)}, "grid_entry": (0,), "grid_shape": (1,), } if random_state is None: rng_params = None else: if isinstance(random_state, int): # It's a seed. random_state: NumsRandomState = instance().random_state(random_state) rng_params = random_state._rng.new_block_rng_params() array_oids = [array.flattened_oids()[0] for array in updated_arrays] result_oids = instance().cm.call( "train_test_split", *array_oids, rng_params=rng_params, test_size=test_size, train_size=train_size, shuffle=shuffle, stratify=stratify, syskwargs=syskwargs ) # Optimize by computing this directly. shape_dtype_oids = [ instance().cm.shape_dtype( r_oid, syskwargs={"grid_entry": (0,), "grid_shape": (1,)} ) for r_oid in result_oids ] shape_dtypes = instance().cm.get(shape_dtype_oids) results = [] for i, r_oid in enumerate(result_oids): shape, dtype = shape_dtypes[i] results.append( BlockArray.from_oid(r_oid, shape=shape, dtype=dtype, cm=instance().cm) ) return results
[docs]def build_sklearn_actor(cls: type): from sklearn.base import ClassifierMixin, RegressorMixin name = cls.__name__ predict_dtype = None if issubclass(cls, ClassifierMixin): predict_dtype = int elif issubclass(cls, RegressorMixin): predict_dtype = float # NOTE: # A possibly cleaner way of building actor classes is to check all the superclasses, then # procedurally add the methods inherited from those classes. Many superclasses can be found in # sklearn/base.py, and some in other subpackages. For example, # TransformerMixin: add fit_transform(), transform() (fit_transform calls transform) # ClusterMixin: add fit_predict() # LinearClassifierMixin: add decision_function() # BaseEstimator: add get_params(), set_params() class ModelActor: def __init__(self, *args, **kwargs): self.instance = cls(*args, **kwargs) def fit(self, X, y): self.instance = self.instance.fit(X, y) def fit_transform(self, X, y=None): return self.instance.fit_transform(X, y) def predict(self, X): return self.instance.predict(X) def score(self, X, y, sample_weight=None): return np.array(self.instance.score(X, y, sample_weight)) class NumsModel: def __init__(self, *args, **kwargs): device_id = None if self.__class__ in _place_on_node_0: device_id = instance().cm.devices()[0] self.actor = instance().cm.make_actor( name, *args, device_id=device_id, **kwargs ) # TODO: (all functions) test inputs are single block, if not warn about performance def fit(self, X: BlockArray, y: BlockArray): _check_array(X, True) _check_array(y, True) instance().cm.call_actor_method( self.actor, "fit", X.flattened_oids()[0], y.flattened_oids()[0] ) def fit_transform(self, X: BlockArray, y: BlockArray = None): _check_array(X, True) if y is not None: _check_array(y, True) y = y.flattened_oids()[0] r_oid = instance().cm.call_actor_method( self.actor, "fit_transform", X.flattened_oids()[0], y ) return BlockArray.from_oid( r_oid, shape=X.shape, dtype=float, cm=instance().cm ) def predict(self, X: BlockArray): _check_array(X, True) r_oid = instance().cm.call_actor_method( self.actor, "predict", X.flattened_oids()[0] ) return BlockArray.from_oid( r_oid, shape=(X.shape[0],), dtype=predict_dtype, cm=instance().cm ) def score(self, X: BlockArray, y: BlockArray, sample_weight: BlockArray = None): _check_array(X, True) _check_array(y, True) if sample_weight is not None: _check_array(sample_weight, True) sample_weight = sample_weight.flattened_oids()[0] r_oid = instance().cm.call_actor_method( self.actor, "score", X.flattened_oids()[0], y.flattened_oids()[0], sample_weight, ) return BlockArray.from_oid(r_oid, shape=(), dtype=float, cm=instance().cm) NumsModel.__name__ = "Nums" + cls.__name__ ModelActor.__name__ = cls.__name__ + "Actor" call_on_create(lambda app: app.cm.register_actor(name, ModelActor)) return NumsModel
[docs]def build_supervised_actors(): from sklearn.neural_network import MLPClassifier, MLPRegressor from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor from sklearn.svm import SVC, SVR from sklearn.gaussian_process import ( GaussianProcessClassifier, GaussianProcessRegressor, ) from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor from sklearn.ensemble import ( RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier, RandomForestRegressor, AdaBoostRegressor, GradientBoostingRegressor, ) from sklearn.naive_bayes import GaussianNB, BernoulliNB from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis from sklearn.linear_model import ( LogisticRegression, LinearRegression, Ridge, Lasso, ElasticNet, ) skl_models = [ MLPClassifier, MLPRegressor, KNeighborsClassifier, KNeighborsRegressor, SVC, SVR, GaussianProcessClassifier, GaussianProcessRegressor, DecisionTreeClassifier, DecisionTreeRegressor, RandomForestClassifier, RandomForestRegressor, AdaBoostClassifier, AdaBoostRegressor, GradientBoostingClassifier, GradientBoostingRegressor, GaussianNB, BernoulliNB, QuadraticDiscriminantAnalysis, LogisticRegression, LinearRegression, Ridge, Lasso, ElasticNet, ] return (build_sklearn_actor(skl_model) for skl_model in skl_models)
( MLPClassifier, MLPRegressor, KNeighborsClassifier, KNeighborsRegressor, SVC, SVR, GaussianProcessClassifier, GaussianProcessRegressor, DecisionTreeClassifier, DecisionTreeRegressor, RandomForestClassifier, RandomForestRegressor, AdaBoostClassifier, AdaBoostRegressor, GradientBoostingClassifier, GradientBoostingRegressor, GaussianNB, BernoulliNB, QuadraticDiscriminantAnalysis, LogisticRegression, LinearRegression, Ridge, Lasso, ElasticNet, ) = build_supervised_actors()
[docs]def build_preprocessors(): from sklearn.preprocessing import StandardScaler, RobustScaler sklearn_clses = [StandardScaler, RobustScaler] return (build_sklearn_actor(sklearn_cls) for sklearn_cls in sklearn_clses)
(StandardScaler, RobustScaler) = build_preprocessors() _place_on_node_0 = (StandardScaler, RobustScaler)
[docs]def expose_sklearn_objects(): from sklearn.gaussian_process.kernels import RBF return (RBF,)
(RBF,) = expose_sklearn_objects() if __name__ == "__main__": pass