Source code for nums.core.compute.compute_manager

# Copyright (C) 2020 NumS Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import inspect
from types import FunctionType
from typing import Any, Union, List
import warnings

import numpy as np

from nums.core.array import utils as array_utils
from nums.core.compute.compute_interface import ComputeInterface, RNGInterface
from nums.core.grid.grid import DeviceGrid, DeviceID
from nums.core.systems import utils as systems_utils
from nums.core.systems.system_interface import SystemInterface


[docs]class ComputeManager(ComputeInterface): """ Abstraction to support multiple systems; namely simultaneous support for CPU and GPU system implementations. """ # pylint: disable=abstract-method,useless-super-delegation instance = None
[docs] @classmethod def create(cls, system: SystemInterface, compute_module, device_grid: DeviceGrid): if cls.instance is not None: raise Exception() cls.instance: ComputeManager = ComputeManager( system, compute_module, device_grid ) return cls.instance
[docs] @classmethod def destroy(cls): cls.instance = None
def __init__( self, system: SystemInterface, compute_module, device_grid: DeviceGrid ): self.system: SystemInterface = system self.device_grid: DeviceGrid = device_grid self.rng_cls = None self.methods: dict = {} self._block_shape_map = {} self.init_compute(compute_module)
[docs] def init_compute(self, compute_module): compute_imp = compute_module.ComputeCls # Check that all of kernel interface is implemented. systems_utils.check_implementation(ComputeInterface, compute_imp) if getattr(compute_module, "RNG", None) is None: raise Exception( "No random number generator implemented " "for compute module %s" % str(compute_module) ) self.rng_cls = compute_module.RNG # Collect implemented module functions. module_functions = systems_utils.extract_functions(compute_imp) # Collect function signatures. function_signatures: dict = {} required_methods = inspect.getmembers( ComputeInterface(), predicate=inspect.ismethod ) for name, func in required_methods: function_signatures[name] = func for name, func in module_functions.items(): func_sig = function_signatures[name] try: remote_params = func_sig.remote_params except Exception as _: remote_params = {} self.register(name, func, remote_params) # Add functions as methods of this class. for name, _ in module_functions.items(): self.methods[name] = self.get_callable(name)
[docs] def get_rng(self, seed) -> RNGInterface: return self.rng_cls(seed)
[docs] def get_callable(self, name: str): def new_func(*args, **kwargs): return self.call(name, *args, **kwargs) return new_func
def __getattribute__(self, name: str): methods = object.__getattribute__(self, "methods") if name in methods: return methods[name] return object.__getattribute__(self, name) #################### # System Interface ####################
[docs] def put(self, value: Any, **kwargs): assert "syskwargs" in kwargs kwargs = kwargs.copy() syskwargs = kwargs["syskwargs"] del kwargs["syskwargs"] assert "options" not in syskwargs device_id, options = self._process_syskwargs(syskwargs) assert len(options) == 0 return self.system.put(value, device_id)
[docs] def get(self, object_ids: Union[Any, List]): return self.system.get(object_ids)
[docs] def remote(self, function: FunctionType, remote_params: dict): return self.system.remote(function, remote_params)
[docs] def devices(self): return self.system.devices()
[docs] def register(self, name: str, func: callable, remote_params: dict = None): self.system.register(name, func, remote_params)
def _process_syskwargs(self, syskwargs): if "grid_entry" in syskwargs: assert "grid_shape" in syskwargs assert "device_id" not in syskwargs grid_entry = syskwargs["grid_entry"] grid_shape = syskwargs["grid_shape"] device_id: DeviceID = self.device_grid.get_device_id(grid_entry, grid_shape) elif "device_id" in syskwargs: assert "grid_entry" not in syskwargs and "grid_shape" not in syskwargs device_id: DeviceID = syskwargs["device_id"] else: raise Exception("All calls require device_id or grid_entry and grid_shape.") if "options" in syskwargs: options = syskwargs["options"] else: options = {} return device_id, options
[docs] def call(self, name: str, *args, **kwargs): assert "syskwargs" in kwargs kwargs = kwargs.copy() syskwargs = kwargs["syskwargs"] del kwargs["syskwargs"] device_id, options = self._process_syskwargs(syskwargs) return self.system.call(name, args, kwargs, device_id, options)
[docs] def num_cores_total(self): return self.system.num_cores_total()
[docs] def register_actor(self, name: str, cls: type): return self.system.register_actor(name, cls)
[docs] def make_actor(self, name: str, *args, device_id: DeviceID = None, **kwargs): return self.system.make_actor(name, *args, device_id=device_id, **kwargs)
[docs] def call_actor_method(self, actor, method: str, *args, **kwargs): return self.system.call_actor_method(actor, method, *args, **kwargs)
######################### # Block Shape Management #########################
[docs] @staticmethod def compute_block_shape_static( shape: tuple, dtype: Union[type, np.dtype], cluster_shape: tuple, num_cores: int ): # TODO (hme): This should also compute parameters for DeviceGrid. if array_utils.is_float(dtype, type_test=True): dtype = np.finfo(dtype).dtype elif array_utils.is_int(dtype, type_test=True) or array_utils.is_uint( dtype, type_test=True ): dtype = np.iinfo(dtype).dtype elif array_utils.is_complex(dtype, type_test=True): dtype = np.dtype(dtype) elif dtype in (bool, np.bool_): dtype = np.dtype(np.bool_) else: raise ValueError("dtype %s not supported" % str(dtype)) nbytes = dtype.alignment size = np.product(shape) * nbytes # If the object is less than 100 megabytes, there's not much value in constructing # a block tensor. if size < 10 ** 8: block_shape = shape return block_shape if len(shape) < len(cluster_shape): cluster_shape = cluster_shape[: len(shape)] elif len(shape) > len(cluster_shape): cluster_shape = list(cluster_shape) for axis in range(len(shape)): if axis >= len(cluster_shape): cluster_shape.append(1) cluster_shape = tuple(cluster_shape) shape_np = np.array(shape, dtype=int) # Softmax on cluster shape gives strong preference to larger dimensions. cluster_weights = np.exp(np.array(cluster_shape)) / np.sum( np.exp(cluster_shape) ) shape_fracs = np.array(shape) / np.sum(shape) # cluster_weights weight the proportion of cores available along each axis, # and shape_fracs is the proportion of data along each axis. weighted_shape_fracs = cluster_weights * shape_fracs weighted_shape_fracs = weighted_shape_fracs / np.sum(weighted_shape_fracs) # Compute dimensions of grid shape # so that the number of blocks are close to the number of cores. grid_shape_frac = num_cores ** weighted_shape_fracs grid_shape = np.floor(grid_shape_frac) # Put remainder on largest axis. remaining = np.sum(grid_shape_frac - grid_shape) grid_shape[np.argmax(shape)] += remaining grid_shape = np.ceil(grid_shape).astype(int) # We use ceiling of floating block shape # so that resulting grid shape is <= to what we compute above. block_shape = tuple((shape_np + grid_shape - 1) // grid_shape) return block_shape
[docs] def compute_block_shape( self, shape: tuple, dtype: Union[type, np.dtype], cluster_shape=None, num_cores=None, ): if num_cores is None: num_cores = self.num_cores_total() if cluster_shape is None: cluster_shape = self.device_grid.grid_shape return ComputeManager.compute_block_shape_static( shape, dtype, cluster_shape, num_cores )
[docs] def update_block_shape_map(self, shape_dim, block_shape_dim): if shape_dim in self._block_shape_map: if self._block_shape_map[shape_dim] != block_shape_dim: warnings.warn( "Block size differs for dimensions of size %s, " "this may cause some operations to be slower." % shape_dim ) self._block_shape_map[shape_dim] = block_shape_dim
[docs] def get_block_shape(self, shape, dtype): # Simple way to ensure shape compatibility for basic linear algebra operations. block_shape = self.compute_block_shape(shape, dtype) final_block_shape = [] for axis in range(len(shape)): shape_dim = shape[axis] block_shape_dim = block_shape[axis] if shape_dim not in self._block_shape_map: self.update_block_shape_map(shape_dim, block_shape_dim) final_block_shape.append(self._block_shape_map[shape_dim]) return tuple(final_block_shape)
[docs]def instance() -> ComputeManager: return ComputeManager.instance