# Copyright (C) 2020 NumS Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Tuple, Dict, Union
import numpy as np
from nums.core.array import utils as array_utils
from nums.core.array.blockarray import BlockArray, Block
from nums.core.array.random import NumsRandomState
from nums.core.compute.compute_manager import ComputeManager
from nums.core.grid.grid import ArrayGrid
from nums.core.storage.storage import StoredArray, StoredArrayS3
from nums.core.systems.filesystem import FileSystem
# pylint: disable = too-many-lines
[docs]class ArrayApplication:
def __init__(self, cm: ComputeManager, fs: FileSystem):
self.cm: ComputeManager = cm
self._fs: FileSystem = fs
self._array_grids: Dict[str, ArrayGrid] = {}
self.random = self.random_state()
self.one_half = self.scalar(0.5)
self.two = self.scalar(2.0)
self.one = self.scalar(1.0)
self.zero = self.scalar(0.0)
[docs] def compute_block_shape(
self,
shape: tuple,
dtype: Union[type, np.dtype],
cluster_shape=None,
num_cores=None,
):
return self.cm.compute_block_shape(shape, dtype, cluster_shape, num_cores)
[docs] def get_block_shape(self, shape, dtype):
return self.cm.get_block_shape(shape, dtype)
def _get_array_grid(self, filename: str, stored_array_cls) -> ArrayGrid:
if filename not in self._array_grids:
store_inst: StoredArray = stored_array_cls(filename)
self._array_grids[filename] = store_inst.get_grid()
return self._array_grids[filename]
######################################
# Filesystem API
######################################
[docs] def write_fs(self, ba: BlockArray, filename: str) -> BlockArray:
res = self._write(ba, filename, self._fs.write_block_fs)
self._fs.write_meta_fs(ba, filename)
return res
[docs] def read_fs(self, filename: str) -> BlockArray:
return self._fs.read_array_fs(filename)
[docs] def delete_fs(self, filename: str) -> bool:
results = []
for device_id in self.cm.devices():
result = self._fs.delete_file_fs(
filename, syskwargs={"device_id": device_id}
)
results.append(result)
results = self.cm.get(results)
# Were files deleted anywhere?
return np.any(results)
[docs] def write_s3(self, ba: BlockArray, filename: str):
grid_entry = tuple(np.zeros_like(ba.shape, dtype=int))
result = self._fs.write_meta_s3(
filename,
grid_meta=ba.grid.to_meta(),
syskwargs={"grid_entry": grid_entry, "grid_shape": ba.grid.grid_shape},
)
assert "ETag" in self.cm.get(result).item(), "Metadata write failed."
return self._write(ba, filename, self._fs.write_block_s3)
def _write(self, ba: BlockArray, filename, remote_func):
grid = ba.grid
result_grid = ArrayGrid(
grid.grid_shape,
tuple(np.ones_like(grid.shape, dtype=np.int)),
dtype=dict.__name__,
)
rarr = BlockArray(result_grid, self.cm)
for grid_entry in grid.get_entry_iterator():
rarr.blocks[grid_entry].oid = remote_func(
ba.blocks[grid_entry].oid,
filename,
grid_entry,
grid.to_meta(),
syskwargs={"grid_entry": grid_entry, "grid_shape": grid.grid_shape},
)
return rarr
[docs] def read_s3(self, filename: str):
store_cls, remote_func = StoredArrayS3, self._fs.read_block_s3
grid = self._get_array_grid(filename, store_cls)
grid_meta = grid.to_meta()
grid_entry_iterator = grid.get_entry_iterator()
rarr = BlockArray(grid, self.cm)
for grid_entry in grid_entry_iterator:
rarr.blocks[grid_entry].oid = remote_func(
filename,
grid_entry,
grid_meta,
syskwargs={"grid_entry": grid_entry, "grid_shape": grid.grid_shape},
)
return rarr
[docs] def delete_s3(self, filename: str) -> bool:
grid = self._get_array_grid(filename, StoredArrayS3)
grid_entry = tuple(np.zeros_like(grid.shape, dtype=np.int))
result = self._fs.delete_meta_s3(
filename,
syskwargs={"grid_entry": grid_entry, "grid_shape": grid.grid_shape},
)
deleted_key = self.cm.get(result).item()["Deleted"][0]["Key"]
assert deleted_key == StoredArrayS3(filename, grid).get_meta_key()
results: BlockArray = self._delete(
filename, StoredArrayS3, self._fs.delete_block_s3
)
# Test whether data has been deleted from S3.
data_deleted = True
delete_result_arr = self.get(results)
stored_array = StoredArrayS3(filename, results.grid)
for grid_entry in results.grid.get_entry_iterator():
deleted_key = delete_result_arr[grid_entry]["Deleted"][0]["Key"]
entry_deleted = deleted_key == stored_array.get_key(grid_entry)
data_deleted = data_deleted and entry_deleted
return data_deleted
def _delete(self, filename, store_cls, remote_func):
grid = self._get_array_grid(filename, store_cls)
result_grid = ArrayGrid(
grid.grid_shape,
tuple(np.ones_like(grid.shape, dtype=np.int)),
dtype=dict.__name__,
)
rarr = BlockArray(result_grid, self.cm)
for grid_entry in grid.get_entry_iterator():
rarr.blocks[grid_entry].oid = remote_func(
filename,
grid_entry,
grid.to_meta(),
syskwargs={"grid_entry": grid_entry, "grid_shape": grid.grid_shape},
)
return rarr
[docs] def read_csv(
self, filename, dtype=float, delimiter=",", has_header=False, num_workers=None
):
if num_workers is None:
num_workers = self.cm.num_cores_total()
arrays: list = self._fs.read_csv(
filename, dtype, delimiter, has_header, num_workers
)
shape = np.zeros(len(arrays[0].shape), dtype=int)
for array in arrays:
shape += np.array(array.shape, dtype=int)
shape = tuple(shape)
block_shape = self.cm.get_block_shape(shape, dtype)
result = self.concatenate(arrays, axis=0, axis_block_size=block_shape[0])
# Release references immediately, in case we need to do another reshape.
del arrays
if result.block_shape[1] != block_shape[1]:
result = result.reshape(block_shape=block_shape)
return result
[docs] def loadtxt(
self,
fname,
dtype=float,
comments="# ",
delimiter=" ",
converters=None,
skiprows=0,
usecols=None,
unpack=False,
ndmin=0,
encoding="bytes",
max_rows=None,
num_workers=None,
) -> BlockArray:
if num_workers is None:
num_workers = self.cm.num_cores_total()
return self._fs.loadtxt(
fname,
dtype=dtype,
comments=comments,
delimiter=delimiter,
converters=converters,
skiprows=skiprows,
usecols=usecols,
unpack=unpack,
ndmin=ndmin,
encoding=encoding,
max_rows=max_rows,
num_workers=num_workers,
)
######################################
# Array Operations API
######################################
[docs] def scalar(self, value):
return BlockArray.from_scalar(value, self.cm)
[docs] def array(self, array: Union[np.ndarray, List[float]], block_shape: tuple = None):
if not isinstance(array, np.ndarray):
if array_utils.is_array_like(array):
array = np.array(array)
else:
raise ValueError(
"Unable to instantiate array from type %s" % type(array)
)
assert len(array.shape) == len(block_shape)
return BlockArray.from_np(
array, block_shape=block_shape, copy=False, cm=self.cm
)
[docs] def zeros(self, shape: tuple, block_shape: tuple, dtype: np.dtype = None):
return self._new_array("zeros", shape, block_shape, dtype)
[docs] def ones(self, shape: tuple, block_shape: tuple, dtype: np.dtype = None):
return self._new_array("ones", shape, block_shape, dtype)
[docs] def empty(self, shape: tuple, block_shape: tuple, dtype: np.dtype = None):
return self._new_array("empty", shape, block_shape, dtype)
def _new_array(
self, op_name: str, shape: tuple, block_shape: tuple, dtype: np.dtype = None
):
assert len(shape) == len(block_shape)
if dtype is None:
dtype = np.float64
grid = ArrayGrid(shape, block_shape, dtype.__name__)
grid_meta = grid.to_meta()
rarr = BlockArray(grid, self.cm)
for grid_entry in grid.get_entry_iterator():
rarr.blocks[grid_entry].oid = self.cm.new_block(
op_name,
grid_entry,
grid_meta,
syskwargs={"grid_entry": grid_entry, "grid_shape": grid.grid_shape},
)
return rarr
[docs] def concatenate(self, arrays: List, axis: int, axis_block_size: int = None):
num_arrs = len(arrays)
assert num_arrs > 0
first_arr: BlockArray = arrays[0]
if num_arrs == 1:
return first_arr
num_axes = len(first_arr.shape)
# Check assumptions and define result shapes and block shapes.
for i in range(num_arrs):
curr_ba: BlockArray = arrays[i]
assert num_axes == len(curr_ba.shape), "Unequal num axes."
assert curr_ba.dtype == first_arr.dtype, "Incompatible dtypes " "%s, %s" % (
curr_ba.dtype,
first_arr.dtype,
)
for curr_axis in range(num_axes):
first_block_size = first_arr.block_shape[curr_axis]
block_size = curr_ba.block_shape[curr_axis]
if first_block_size == block_size:
continue
elif axis == curr_axis:
assert axis_block_size is not None, (
"block axis size is required " "when block shapes are neq."
)
else:
raise ValueError(
"Other axis shapes and block shapes must be equal."
)
# Compute result shapes.
result_shape = []
result_block_shape = []
for curr_axis in range(num_axes):
if curr_axis == axis:
if axis_block_size is None:
# They are all equal.
axis_block_size = first_arr.block_shape[curr_axis]
result_block_size = axis_block_size
result_size = 0
for i in range(num_arrs):
curr_ba: BlockArray = arrays[i]
size = curr_ba.shape[curr_axis]
result_size += size
else:
result_size = first_arr.shape[curr_axis]
result_block_size = first_arr.block_shape[curr_axis]
result_shape.append(result_size)
result_block_shape.append(result_block_size)
result_shape, result_block_shape = tuple(result_shape), tuple(
result_block_shape
)
result_ba = self.empty(result_shape, result_block_shape, first_arr.dtype)
# Write result blocks.
# TODO (hme): This can be optimized by updating blocks directly.
pos = 0
for arr in arrays:
delta = arr.shape[axis]
axis_slice = slice(pos, pos + delta)
result_selector = tuple(
[slice(None, None) for _ in range(axis)] + [axis_slice, ...]
)
result_ba[result_selector] = arr
pos += delta
return result_ba
[docs] def eye(self, shape: tuple, block_shape: tuple, dtype: np.dtype = None):
assert len(shape) == len(block_shape) == 2
if dtype is None:
dtype = np.float64
grid = ArrayGrid(shape, block_shape, dtype.__name__)
grid_meta = grid.to_meta()
rarr = BlockArray(grid, self.cm)
for grid_entry in grid.get_entry_iterator():
syskwargs = {"grid_entry": grid_entry, "grid_shape": grid.grid_shape}
if np.all(np.diff(grid_entry) == 0):
# This is a diagonal block.
rarr.blocks[grid_entry].oid = self.cm.new_block(
"eye", grid_entry, grid_meta, syskwargs=syskwargs
)
else:
rarr.blocks[grid_entry].oid = self.cm.new_block(
"zeros", grid_entry, grid_meta, syskwargs=syskwargs
)
return rarr
[docs] def diag(self, X: BlockArray) -> BlockArray:
def find_diag_output_blocks(X: BlockArray, total_elements: int):
# The i,j entry corresponding to a block in X_blocks.
block_i, block_j = 0, 0
# The i,j entry within the current block.
element_i, element_j = 0, 0
# Keep track of the no of elements found so far.
count = 0
# Start at block 0,0.
block = X.blocks[(0, 0)]
# Each element contains block indices, diag offset,
# and the total elements required from the block.
diag_meta = []
while count < total_elements:
if element_i > block.shape[0] - 1:
block_i = block_i + 1
element_i = 0
if element_j > block.shape[1] - 1:
block_j = block_j + 1
element_j = 0
block = X.blocks[(block_i, block_j)]
block_rows, block_cols = block.shape[0], block.shape[1]
offset = -element_i if element_i > element_j else element_j
total_elements_block = (
min(block_rows - 1 - element_i, block_cols - 1 - element_j) + 1
)
diag_meta.append(((block_i, block_j), offset, total_elements_block))
count, element_i = (
count + total_elements_block,
element_i + total_elements_block,
)
element_j = element_j + total_elements_block
return diag_meta
if len(X.shape) == 1:
shape = X.shape[0], X.shape[0]
block_shape = X.block_shape[0], X.block_shape[0]
grid = ArrayGrid(shape, block_shape, X.dtype.__name__)
grid_meta = grid.to_meta()
rarr = BlockArray(grid, self.cm)
for grid_entry in grid.get_entry_iterator():
syskwargs = {"grid_entry": grid_entry, "grid_shape": grid.grid_shape}
if np.all(np.diff(grid_entry) == 0):
# This is a diagonal block.
rarr.blocks[grid_entry].oid = self.cm.diag(
X.blocks[grid_entry[0]].oid, 0, syskwargs=syskwargs
)
else:
rarr.blocks[grid_entry].oid = self.cm.new_block(
"zeros", grid_entry, grid_meta, syskwargs=syskwargs
)
elif len(X.shape) == 2:
out_shape = (min(X.shape),)
out_block_shape = (min(X.block_shape),)
# Obtain the block indices which contain the diagonal of the matrix.
diag_meta = find_diag_output_blocks(X, out_shape[0])
output_block_arrays = []
out_grid_shape = (len(diag_meta),)
count = 0
# Obtain the diagonals.
for block_indices, offset, total_elements in diag_meta:
syskwargs = {"grid_entry": (count,), "grid_shape": out_grid_shape}
result_block_shape = (total_elements,)
block_grid = ArrayGrid(
result_block_shape,
result_block_shape,
X.blocks[block_indices].dtype.__name__,
)
block_array = BlockArray(block_grid, self.cm)
block_array.blocks[0].oid = self.cm.diag(
X.blocks[block_indices].oid, offset, syskwargs=syskwargs
)
output_block_arrays.append(block_array)
count += 1
if len(output_block_arrays) > 1:
# If there are multiple blocks, concatenate them.
return self.concatenate(
output_block_arrays, axis=0, axis_block_size=out_block_shape[0]
)
return output_block_arrays[0]
else:
raise ValueError("X must have 1 or 2 axes.")
return rarr
[docs] def arange(self, start_in, shape, block_shape, step=1, dtype=None) -> BlockArray:
assert step == 1
if dtype is None:
dtype = np.__getattribute__(
str(np.result_type(start_in, shape[0] + start_in))
)
# Generate ranges per block.
grid = ArrayGrid(shape, block_shape, dtype.__name__)
rarr = BlockArray(grid, self.cm)
for _, grid_entry in enumerate(grid.get_entry_iterator()):
syskwargs = {"grid_entry": grid_entry, "grid_shape": grid.grid_shape}
start = start_in + block_shape[0] * grid_entry[0]
entry_shape = grid.get_block_shape(grid_entry)
stop = start + entry_shape[0]
rarr.blocks[grid_entry].oid = self.cm.arange(
start, stop, step, dtype, syskwargs=syskwargs
)
return rarr
[docs] def linspace(self, start, stop, shape, block_shape, endpoint, retstep, dtype, axis):
assert axis == 0
assert endpoint is True
assert retstep is False
step_size = (stop - start) / (shape[0] - 1)
result = self.arange(0, shape, block_shape)
result = start + result * step_size
if dtype is not None and dtype != result.dtype:
result = result.astype(dtype)
return result
[docs] def log(self, X: BlockArray):
return X.ufunc("log")
[docs] def exp(self, X: BlockArray):
return X.ufunc("exp")
[docs] def abs(self, X: BlockArray):
return X.ufunc("abs")
[docs] def min(self, X: BlockArray, axis=None, keepdims=False):
return self.reduce("min", X, axis, keepdims)
[docs] def max(self, X: BlockArray, axis=None, keepdims=False):
return self.reduce("max", X, axis, keepdims)
[docs] def sum(self, X: BlockArray, axis=None, keepdims=False, dtype=None):
return self.reduce("sum", X, axis, keepdims, dtype)
[docs] def reduce(
self, op_name: str, X: BlockArray, axis=None, keepdims=False, dtype=None
):
res = X.reduce_axis(op_name, axis, keepdims=keepdims)
if dtype is not None:
res = res.astype(dtype)
return res
[docs] def mean(self, X: BlockArray, axis=None, keepdims=False, dtype=None):
if X.dtype not in (float, np.float32, np.float64):
X = X.astype(np.float64)
num_summed = np.product(X.shape) if axis is None else X.shape[axis]
res = self.sum(X, axis=axis, keepdims=keepdims) / num_summed
if dtype is not None:
res = res.astype(dtype)
return res
[docs] def var(self, X: BlockArray, axis=None, ddof=0, keepdims=False, dtype=None):
mean = self.mean(X, axis=axis, keepdims=True)
ss = self.sum((X - mean) ** self.two, axis=axis, keepdims=keepdims)
num_summed = (np.product(X.shape) if axis is None else X.shape[axis]) - ddof
res = ss / num_summed
if dtype is not None:
res = res.astype(dtype)
return res
[docs] def std(self, X: BlockArray, axis=None, ddof=0, keepdims=False, dtype=None):
res = self.sqrt(self.var(X, axis, ddof, keepdims))
if dtype is not None:
res = res.astype(dtype)
return res
[docs] def cov(self, X: BlockArray, rowvar=True, bias=False, dtype=None):
n = X.shape[1] if rowvar else X.shape[0]
const = n if bias else n - 1
Xc = X - self.mean(X, axis=(1 if rowvar else 0), keepdims=True)
r = (
Xc @ Xc.transpose(defer=True) if rowvar else Xc.transpose(defer=True) @ Xc
) / const
if dtype is not None:
r = r.astype(dtype)
return r
[docs] def argop(self, op_name: str, arr: BlockArray, axis=None):
if len(arr.shape) > 1:
raise NotImplementedError(
"%s currently supports one-dimensional arrays." % op_name
)
if axis is None:
axis = 0
assert axis == 0
grid = ArrayGrid(shape=(), block_shape=(), dtype=np.int64.__name__)
result = BlockArray(grid, self.cm)
reduction_result = None, None
for grid_entry in arr.grid.get_entry_iterator():
block_slice: slice = arr.grid.get_slice(grid_entry)[0]
block: Block = arr.blocks[grid_entry]
syskwargs = {
"grid_entry": grid_entry,
"grid_shape": arr.grid.grid_shape,
"options": {"num_returns": 2},
}
reduction_result = self.cm.arg_op(
op_name, block.oid, block_slice, *reduction_result, syskwargs=syskwargs
)
argoptima, _ = reduction_result
result.blocks[()].oid = argoptima
return result
[docs] def sqrt(self, X: BlockArray) -> BlockArray:
if X.dtype not in (float, np.float32, np.float64):
X = X.astype(np.float64)
return X.ufunc("sqrt")
[docs] def norm(self, X: BlockArray, order=2) -> BlockArray:
assert len(X.shape) == 1, "Only vector norms are supported."
assert order in (1, 2), "Only order 1 and 2 norms supported."
if order == 2:
return self.sqrt(X.transpose(defer=True) @ X)
else:
return self.sum(self.abs(X))
[docs] def xlogy(self, x: BlockArray, y: BlockArray) -> BlockArray:
if x.dtype not in (float, np.float32, np.float64):
x = x.astype(np.float64)
if x.dtype not in (float, np.float32, np.float64):
y = y.astype(np.float64)
return self.map_bop("xlogy", x, y)
[docs] def where(self, condition: BlockArray, x: BlockArray = None, y: BlockArray = None):
result_oids = []
shape_oids = []
num_axes = max(1, len(condition.shape))
# Stronger constraint than necessary, but no reason for anything stronger.
if x is not None or y is not None:
assert x is not None and y is not None
assert condition.shape == x.shape == y.shape
assert condition.block_shape == x.block_shape == y.block_shape
assert x.dtype == y.dtype
result = BlockArray(x.grid.copy(), self.cm)
for grid_entry in condition.grid.get_entry_iterator():
cond_oid = condition.blocks[grid_entry].oid
x_oid = x.blocks[grid_entry].oid
y_oid = y.blocks[grid_entry].oid
r_oid = self.cm.where(
cond_oid,
x_oid,
y_oid,
None,
syskwargs={
"grid_entry": grid_entry,
"grid_shape": condition.grid.grid_shape,
"options": {"num_returns": 1},
},
)
result.blocks[grid_entry].oid = r_oid
return result
else:
for grid_entry in condition.grid.get_entry_iterator():
block: Block = condition.blocks[grid_entry]
block_slice_tuples = condition.grid.get_slice_tuples(grid_entry)
roids = self.cm.where(
block.oid,
None,
None,
block_slice_tuples,
syskwargs={
"grid_entry": grid_entry,
"grid_shape": condition.grid.grid_shape,
"options": {"num_returns": num_axes + 1},
},
)
block_oids, shape_oid = roids[:-1], roids[-1]
shape_oids.append(shape_oid)
result_oids.append(block_oids)
shapes = self.cm.get(shape_oids)
result_shape = (np.sum(shapes),)
if result_shape == (0,):
return (self.array(np.array([], dtype=np.int64), block_shape=(0,)),)
# Remove empty shapes.
result_shape_pair = []
for i, shape in enumerate(shapes):
if np.sum(shape) > 0:
result_shape_pair.append((result_oids[i], shape))
result_block_shape = self.cm.compute_block_shape(result_shape, np.int64)
result_arrays = []
for axis in range(num_axes):
block_arrays = []
for i in range(len(result_oids)):
if shapes[i] == (0,):
continue
block_arrays.append(
BlockArray.from_oid(
result_oids[i][axis], shapes[i], np.int64, self.cm
)
)
if len(block_arrays) == 1:
axis_result = block_arrays[0]
else:
axis_result = self.concatenate(
block_arrays, 0, result_block_shape[0]
)
result_arrays.append(axis_result)
return tuple(result_arrays)
[docs] def quantile(
self, arr: BlockArray, q: float, interpolation="linear", method="tdigest"
) -> BlockArray:
"""Compute the q-th quantile of the array elements.
Args:
arr: BlockArray.
q: quantile to compute, which must be between 0 and 1 inclusive.
interpolation: interpolation method to use when the desired quantile lies between two
data points i < j.
also see https://numpy.org/doc/1.20/reference/generated/numpy.quantile.html.
also see https://docs.dask.org/en/latest/_modules/dask/array/percentile.html.
Returns:
Returns the q-th quantile of the array elements.
"""
# pylint: disable = import-outside-toplevel, unused-import
try:
import crick
except Exception as e:
raise Exception(
"Unable to import crick. \
Install crick with command 'pip install cython; pip install crick'"
) from e
if arr.ndim != 1:
raise NotImplementedError("Only 1D 'arr' is currently supported.")
if q < 0.0 or q > 1.0:
raise ValueError("Quantiles must be in the range [0, 1]")
assert interpolation == "linear"
assert method == "tdigest"
arr_oids = arr.flattened_oids()
num_arrs = len(arr_oids)
q = [q]
t_oids = []
for i, arr_oid in enumerate(arr_oids):
syskwargs = {
"grid_entry": (i,),
"grid_shape": (num_arrs,),
"options": {"num_returns": 1},
}
t_oids.append(self.cm.tdigest_chunk(arr_oid, syskwargs=syskwargs))
p_oid = self.cm.percentiles_from_tdigest(q, *t_oids, syskwargs=syskwargs)
return BlockArray.from_oid(p_oid, (1,), np.float64, self.cm)
[docs] def percentile(
self,
arr: BlockArray,
q: float,
interpolation: str = "linear",
method: str = "tdigest",
) -> BlockArray:
"""Compute the q-th percentile of the array elements.
Args:
arr: BlockArray.
q: Percentile to compute, which must be between 0 and 100 inclusive.
interpolation: interpolation method to use when the desired percentile lies between two
data points i < j.
also see https://numpy.org/doc/1.20/reference/generated/numpy.percentile.html.
Returns:
Returns the q-th percentile of the array elements.
"""
if arr.ndim != 1:
raise NotImplementedError("Only 1D 'arr' is currently supported.")
if not 0 <= q <= 100:
raise ValueError("Percentiles must be in the range [0, 100]")
q = q / 100
return self.quantile(arr, q, interpolation=interpolation, method=method)
def _quickselect(self, arr_oids: List[object], kth: int):
"""Find the `kth` largest element in a 1D BlockArray in O(n) time.
Used as a subprocedure in `median` and `top_k`.
Args:
arr_oids: Flattened list of oids for array objects.
kth: Index of the element (in ascending order) to select.
Returns:
Value of the `kth` largest element.
"""
num_arrs = len(arr_oids)
# Compute weighted median of medians (wmm).
m_oids, s_oids = [], []
for i, arr_oid in enumerate(arr_oids):
syskwargs = {
"grid_entry": (i,),
"grid_shape": (num_arrs,),
"options": {"num_returns": 1},
}
m_oids.append(self.cm.select_median(arr_oid, syskwargs=syskwargs))
s_oids.append(self.cm.size(arr_oid, syskwargs=syskwargs))
ms_oids = m_oids + s_oids
device_0 = self.cm.devices()[0]
wmm_oid = self.cm.system.call("weighted_median", ms_oids, {}, device_0, {})
total_size = sum(self.cm.get(s_oids))
if kth < 0:
kth += total_size
if kth < 0 or total_size <= kth:
raise IndexError("'kth' must be a valid index for 'arr'.")
# Compute "greater than" partition using wmm as pivot, recurse if kth is in range.
gr_size_oids, gr_oids = [], []
for i, arr_oid in enumerate(arr_oids):
gr_size_oid, gr_oid = self.cm.pivot_partition(
arr_oid,
wmm_oid,
"gt",
syskwargs={
"grid_entry": (i,),
"grid_shape": (num_arrs,),
"options": {"num_returns": 2},
},
)
gr_size_oids.append(gr_size_oid)
gr_oids.append(gr_oid)
gr_size = sum(self.cm.get(gr_size_oids))
if kth < gr_size:
del arr_oids
return self._quickselect(gr_oids, kth)
# Compute "less than" partition using wmm as pivot, recurse if kth is in range.
ls_size_oids, ls_oids = [], []
for i, arr_oid in enumerate(arr_oids):
ls_size_oid, ls_oid = self.cm.pivot_partition(
arr_oid,
wmm_oid,
"lt",
syskwargs={
"grid_entry": (i,),
"grid_shape": (num_arrs,),
"options": {"num_returns": 2},
},
)
ls_size_oids.append(ls_size_oid)
ls_oids.append(ls_oid)
ls_size = sum(self.cm.get(ls_size_oids))
if kth >= total_size - ls_size:
del arr_oids
return self._quickselect(ls_oids, kth - (total_size - ls_size))
# The kth value is wmm.
return wmm_oid
[docs] def top_k(
self, arr: BlockArray, k: int, largest=True
) -> Tuple[BlockArray, BlockArray]:
"""Find the `k` largest or smallest elements of a BlockArray.
If there are multiple kth elements that are equal in value, then no guarantees are made as
to which ones are included in the top k.
Args:
arr: A BlockArray.
k: Number of top elements to return.
largest: Whether to return largest or smallest elements.
Returns:
A tuple containing two BlockArrays, (`values`, `indices`).
values: Values of the top k elements, unsorted.
indices: Indices of the top k elements, ordered by their corresponding values.
"""
if arr.ndim != 1:
raise NotImplementedError("Only 1D 'arr' is currently supported.")
if k <= 0 or arr.size < k:
raise IndexError("'k' must be at least 1 and at most the size of 'arr'.")
arr_oids = arr.flattened_oids()
if largest:
k_oid = self._quickselect(arr_oids, k - 1)
k_val = BlockArray.from_oid(k_oid, (1,), arr.dtype, self.cm)
ie_indices = self.where(arr > k_val[0])[0]
else:
k_oid = self._quickselect(arr_oids, -k)
k_val = BlockArray.from_oid(k_oid, (1,), arr.dtype, self.cm)
ie_indices = self.where(arr < k_val[0])[0]
eq_indices = self.where(arr == k_val[0])[0]
eq_indices_pad = eq_indices[: k - ie_indices.size]
axis_block_size = self.compute_block_shape((k,), int)[0]
indices = self.concatenate([ie_indices, eq_indices_pad], 0, axis_block_size)
return arr[indices], indices
[docs] def map_uop(
self,
op_name: str,
arr: BlockArray,
out: BlockArray = None,
where=True,
args=None,
kwargs=None,
) -> BlockArray:
"""A map, for unary operators, that applies to every entry of an array.
Args:
op_name: An element-wise unary operator.
arr: A BlockArray.
out: A BlockArray to which the result is written.
where: An indicator specifying the indices to which op is applied.
args: Args provided to op.
kwargs: Keyword args provided to op.
Returns:
A BlockArray.
"""
if where is not True:
raise NotImplementedError("'where' argument is not yet supported.")
args = () if args is None else args
kwargs = {} if kwargs is None else kwargs
shape = arr.shape
block_shape = arr.block_shape
dtype = array_utils.get_uop_output_type(op_name, arr.dtype)
assert len(shape) == len(block_shape)
if out is None:
grid = ArrayGrid(shape, block_shape, dtype.__name__)
rarr = BlockArray(grid, self.cm)
else:
rarr = out
grid = rarr.grid
assert rarr.shape == arr.shape and rarr.block_shape == arr.block_shape
for grid_entry in grid.get_entry_iterator():
# TODO(hme): Faster to create ndarray first,
# and instantiate block array on return
# to avoid instantiating blocks on BlockArray initialization.
rarr.blocks[grid_entry] = arr.blocks[grid_entry].uop_map(
op_name, args=args, kwargs=kwargs
)
return rarr
[docs] def matmul(self, arr_1: BlockArray, arr_2: BlockArray) -> BlockArray:
return arr_1 @ arr_2
[docs] def tensordot(
self, arr_1: BlockArray, arr_2: BlockArray, axes: int = 2
) -> BlockArray:
return arr_1.tensordot(arr_2, axes)
[docs] def map_bop(
self,
op_name: str,
arr_1: BlockArray,
arr_2: BlockArray,
out: BlockArray = None,
where=True,
args=None,
kwargs=None,
) -> BlockArray:
# TODO (hme): Move this into BlockArray, and invoke on operator implementations.
"""A map for binary operators that applies element-wise to every entry of the input arrays.
Args:
op_name: An element-wise binary operator.
arr_1: A BlockArray.
arr_2: A BlockArray.
out: A BlockArray to which the result is written.
where: An indicator specifying the indices to which op is applied.
args: Args provided to op.
kwargs: Keyword args provided to op.
Returns:
A BlockArray.
"""
if where is not True:
raise NotImplementedError("'where' argument is not yet supported.")
if args is not None:
raise NotImplementedError("'args' is not yet supported.")
if not (kwargs is None or len(kwargs) == 0):
raise NotImplementedError("'kwargs' is not yet supported.")
if not array_utils.broadcastable(
arr_1.shape, arr_2.shape, arr_1.block_shape, arr_2.block_shape
):
raise ValueError("Operands cannot be broadcasted.")
try:
ufunc = np.__getattribute__(op_name)
if (
op_name.endswith("max")
or op_name == "maximum"
or op_name.endswith("min")
or op_name == "minimum"
or op_name.startswith("logical")
):
rarr = self._broadcast_bop(op_name, arr_1, arr_2)
else:
result_blocks: np.ndarray = ufunc(arr_1.blocks, arr_2.blocks)
rarr = BlockArray.from_blocks(
result_blocks, result_shape=None, cm=self.cm
)
except Exception as _:
rarr = self._broadcast_bop(op_name, arr_1, arr_2)
if out is not None:
assert out.grid.grid_shape == rarr.grid.grid_shape
assert out.shape == rarr.shape
assert out.block_shape == rarr.block_shape
out.blocks[:] = rarr.blocks[:]
rarr = out
return rarr
def _broadcast_bop(self, op_name, arr_1, arr_2) -> BlockArray:
"""We want to avoid invoking this op whenever possible; NumPy's imp is faster.
Args:
op_name: Name of binary operation.
arr_1: A BlockArray.
arr_2: A BlockArray.
Returns:
A BlockArray.
"""
if arr_1.shape != arr_2.shape:
output_grid_shape = array_utils.broadcast_shape(
arr_1.grid.grid_shape, arr_2.grid.grid_shape
)
arr_1 = arr_1.broadcast_to(output_grid_shape)
arr_2 = arr_2.broadcast_to(output_grid_shape)
dtype = array_utils.get_bop_output_type(op_name, arr_1.dtype, arr_2.dtype)
grid = ArrayGrid(arr_1.shape, arr_1.block_shape, dtype.__name__)
rarr = BlockArray(grid, self.cm)
for grid_entry in rarr.grid.get_entry_iterator():
block_1: Block = arr_1.blocks[grid_entry]
block_2: Block = arr_2.blocks[grid_entry]
rarr.blocks[grid_entry] = block_1.bop(op_name, block_2, {})
return rarr
[docs] def get(self, *arrs):
if len(arrs) == 1:
if isinstance(arrs[0], BlockArray):
return arrs[0].get()
else:
return arrs[0]
else:
r = []
for item in arrs:
if isinstance(item, BlockArray):
r.append(item.get())
else:
r.append(item)
return r
[docs] def array_compare(self, func_name, a: BlockArray, b: BlockArray, *args):
assert a.shape == b.shape and a.block_shape == b.block_shape
bool_list = []
grid_shape = a.grid.grid_shape
for grid_entry in a.grid.get_entry_iterator():
a_block, b_block = a.blocks[grid_entry].oid, b.blocks[grid_entry].oid
bool_list.append(
self.cm.array_compare(
func_name,
a_block,
b_block,
args,
syskwargs={"grid_entry": grid_entry, "grid_shape": grid_shape},
)
)
oid = self.cm.logical_and(
*bool_list, syskwargs={"grid_entry": (0, 0), "grid_shape": (1, 1)}
)
return BlockArray.from_oid(oid, (), np.bool, self.cm)
[docs] def array_equal(self, a: BlockArray, b: BlockArray):
return self.array_compare("array_equal", a, b)
[docs] def array_equiv(self, a: BlockArray, b: BlockArray):
return self.array_compare("array_equiv", a, b)
[docs] def allclose(self, a: BlockArray, b: BlockArray, rtol=1.0e-5, atol=1.0e-8):
return self.array_compare("allclose", a, b, rtol, atol)
[docs] def vec_from_oids(self, oids, shape, block_shape, dtype):
arr = BlockArray(
ArrayGrid(shape=shape, block_shape=shape, dtype=dtype.__name__), self.cm
)
# Make sure resulting grid shape is a vector (1 dimensional).
assert np.sum(arr.grid.grid_shape) == (
max(arr.grid.grid_shape) + len(arr.grid.grid_shape) - 1
)
for i, grid_entry in enumerate(arr.grid.get_entry_iterator()):
arr.blocks[grid_entry].oid = oids[i]
if block_shape != shape:
return arr.reshape(block_shape=block_shape)
return arr
[docs] def random_state(self, seed=None):
return NumsRandomState(self.cm, seed)
[docs] def isnan(self, X: BlockArray):
return self.map_uop("isnan", X)
[docs] def nanmean(self, a: BlockArray, axis=None, keepdims=False, dtype=None):
if not array_utils.is_float(a):
a = a.astype(np.float64)
num_summed = self.sum(
~a.ufunc("isnan"), axis=axis, dtype=a.dtype, keepdims=keepdims
)
if num_summed.ndim == 0 and num_summed == 0:
return self.scalar(np.nan)
if num_summed.ndim > 0:
num_summed = self.where(
num_summed == 0,
self.empty(num_summed.shape, num_summed.block_shape) * np.nan,
num_summed,
)
res = (
self.reduce("nansum", a, axis=axis, dtype=dtype, keepdims=keepdims)
/ num_summed
)
if dtype is not None:
res = res.astype(dtype)
return res
[docs] def nanvar(self, a: BlockArray, axis=None, ddof=0, keepdims=False, dtype=None):
mean = self.nanmean(a, axis=axis, keepdims=True)
ss = self.reduce(
"nansum", (a - mean) ** self.two, axis=axis, dtype=dtype, keepdims=keepdims
)
num_summed = (
self.sum(~a.ufunc("isnan"), axis=axis, dtype=a.dtype, keepdims=keepdims)
- ddof
)
res = ss / num_summed
if dtype is not None:
res = res.astype(dtype)
return res
[docs] def nanstd(self, a: BlockArray, axis=None, ddof=0, keepdims=False, dtype=None):
res = self.sqrt(self.nanvar(a, axis, ddof, keepdims))
if dtype is not None:
res = res.astype(dtype)
return res
[docs] def atleast_1d(self, *arys):
res = []
for ary in arys:
ary = BlockArray.to_block_array(ary, self.cm)
if ary.ndim == 0:
result = ary.reshape(1)
else:
result = ary
res.append(result)
if len(res) == 1:
return res[0]
else:
return res
[docs] def atleast_2d(self, *arys):
res = []
for ary in arys:
ary = BlockArray.to_block_array(ary, self.cm)
if ary.ndim == 0:
result = ary.reshape(1, 1)
# TODO (MWE): Implement this using newaxis when supported
# This is because ary.base needs to stay consistent,
# which reshape will not accomplish
elif ary.ndim == 1:
result = ary.reshape(1, ary.shape[0])
else:
result = ary
res.append(result)
if len(res) == 1:
return res[0]
else:
return res
[docs] def atleast_3d(self, *arys):
res = []
for ary in arys:
ary = BlockArray.to_block_array(ary, self.cm)
if ary.ndim == 0:
result = ary.reshape(1, 1, 1)
# TODO (MWE): Implement this using newaxis when supported
# This is because ary.base needs to stay consistent,
# which reshape will not accomplish
elif ary.ndim == 1:
result = ary.reshape(1, ary.shape[0], 1)
elif ary.ndim == 2:
result = ary.reshape(ary.shape[0], ary.shape[1], 1)
else:
result = ary
res.append(result)
if len(res) == 1:
return res[0]
else:
return res
def _check_or_covert_stack_array(self, arrs):
if not isinstance(arrs, list):
arrs = [arrs]
return arrs
[docs] def hstack(self, tup, axis_block_size=None):
arrs = self._check_or_covert_stack_array(self.atleast_1d(*tup))
# As a special case, dimension 0 of 1-dimensional arrays is "horizontal"
if arrs and arrs[0].ndim == 1:
return self.concatenate(arrs, 0, axis_block_size=axis_block_size)
else:
return self.concatenate(arrs, 1, axis_block_size=axis_block_size)
[docs] def vstack(self, tup, axis_block_size=None):
arrs = self._check_or_covert_stack_array(self.atleast_2d(*tup))
return self.concatenate(arrs, 0, axis_block_size=axis_block_size)
[docs] def dstack(self, tup, axis_block_size=None):
arrs = self._check_or_covert_stack_array(self.atleast_3d(*tup))
return self.concatenate(arrs, 2, axis_block_size=axis_block_size)
[docs] def row_stack(self, tup, axis_block_size=None):
return self.vstack(tup, axis_block_size=axis_block_size)
[docs] def column_stack(self, tup, axis_block_size=None):
# Based on numpy source.
arrays = []
for obj in tup:
arr = BlockArray.to_block_array(obj, self.cm)
if arr.ndim < 2:
arrays.append(self.atleast_2d(arr).T)
else:
arrays.append(self.atleast_2d(arr))
return self.concatenate(arrays, 1, axis_block_size=axis_block_size)