from __future__ import annotations
|
|
from functools import wraps
|
from typing import (
|
TYPE_CHECKING,
|
Any,
|
Literal,
|
Sequence,
|
TypeVar,
|
cast,
|
overload,
|
)
|
|
import numpy as np
|
|
from pandas._libs import lib
|
from pandas._libs.arrays import NDArrayBacked
|
from pandas._typing import (
|
ArrayLike,
|
AxisInt,
|
Dtype,
|
F,
|
PositionalIndexer2D,
|
PositionalIndexerTuple,
|
ScalarIndexer,
|
SequenceIndexer,
|
Shape,
|
TakeIndexer,
|
npt,
|
type_t,
|
)
|
from pandas.errors import AbstractMethodError
|
from pandas.util._decorators import doc
|
from pandas.util._validators import (
|
validate_bool_kwarg,
|
validate_fillna_kwargs,
|
validate_insert_loc,
|
)
|
|
from pandas.core.dtypes.common import (
|
is_dtype_equal,
|
pandas_dtype,
|
)
|
from pandas.core.dtypes.dtypes import (
|
DatetimeTZDtype,
|
ExtensionDtype,
|
PeriodDtype,
|
)
|
from pandas.core.dtypes.missing import array_equivalent
|
|
from pandas.core import missing
|
from pandas.core.algorithms import (
|
take,
|
unique,
|
value_counts,
|
)
|
from pandas.core.array_algos.quantile import quantile_with_mask
|
from pandas.core.array_algos.transforms import shift
|
from pandas.core.arrays.base import ExtensionArray
|
from pandas.core.construction import extract_array
|
from pandas.core.indexers import check_array_indexer
|
from pandas.core.sorting import nargminmax
|
|
NDArrayBackedExtensionArrayT = TypeVar(
|
"NDArrayBackedExtensionArrayT", bound="NDArrayBackedExtensionArray"
|
)
|
|
if TYPE_CHECKING:
|
from pandas._typing import (
|
NumpySorter,
|
NumpyValueArrayLike,
|
)
|
|
from pandas import Series
|
|
|
def ravel_compat(meth: F) -> F:
|
"""
|
Decorator to ravel a 2D array before passing it to a cython operation,
|
then reshape the result to our own shape.
|
"""
|
|
@wraps(meth)
|
def method(self, *args, **kwargs):
|
if self.ndim == 1:
|
return meth(self, *args, **kwargs)
|
|
flags = self._ndarray.flags
|
flat = self.ravel("K")
|
result = meth(flat, *args, **kwargs)
|
order = "F" if flags.f_contiguous else "C"
|
return result.reshape(self.shape, order=order)
|
|
return cast(F, method)
|
|
|
class NDArrayBackedExtensionArray(NDArrayBacked, ExtensionArray):
|
"""
|
ExtensionArray that is backed by a single NumPy ndarray.
|
"""
|
|
_ndarray: np.ndarray
|
|
# scalar used to denote NA value inside our self._ndarray, e.g. -1
|
# for Categorical, iNaT for Period. Outside of object dtype,
|
# self.isna() should be exactly locations in self._ndarray with
|
# _internal_fill_value.
|
_internal_fill_value: Any
|
|
def _box_func(self, x):
|
"""
|
Wrap numpy type in our dtype.type if necessary.
|
"""
|
return x
|
|
def _validate_scalar(self, value):
|
# used by NDArrayBackedExtensionIndex.insert
|
raise AbstractMethodError(self)
|
|
# ------------------------------------------------------------------------
|
|
def view(self, dtype: Dtype | None = None) -> ArrayLike:
|
# We handle datetime64, datetime64tz, timedelta64, and period
|
# dtypes here. Everything else we pass through to the underlying
|
# ndarray.
|
if dtype is None or dtype is self.dtype:
|
return self._from_backing_data(self._ndarray)
|
|
if isinstance(dtype, type):
|
# we sometimes pass non-dtype objects, e.g np.ndarray;
|
# pass those through to the underlying ndarray
|
return self._ndarray.view(dtype)
|
|
dtype = pandas_dtype(dtype)
|
arr = self._ndarray
|
|
if isinstance(dtype, (PeriodDtype, DatetimeTZDtype)):
|
cls = dtype.construct_array_type()
|
return cls(arr.view("i8"), dtype=dtype)
|
elif dtype == "M8[ns]":
|
from pandas.core.arrays import DatetimeArray
|
|
return DatetimeArray(arr.view("i8"), dtype=dtype)
|
elif dtype == "m8[ns]":
|
from pandas.core.arrays import TimedeltaArray
|
|
return TimedeltaArray(arr.view("i8"), dtype=dtype)
|
|
# error: Argument "dtype" to "view" of "_ArrayOrScalarCommon" has incompatible
|
# type "Union[ExtensionDtype, dtype[Any]]"; expected "Union[dtype[Any], None,
|
# type, _SupportsDType, str, Union[Tuple[Any, int], Tuple[Any, Union[int,
|
# Sequence[int]]], List[Any], _DTypeDict, Tuple[Any, Any]]]"
|
return arr.view(dtype=dtype) # type: ignore[arg-type]
|
|
def take(
|
self: NDArrayBackedExtensionArrayT,
|
indices: TakeIndexer,
|
*,
|
allow_fill: bool = False,
|
fill_value: Any = None,
|
axis: AxisInt = 0,
|
) -> NDArrayBackedExtensionArrayT:
|
if allow_fill:
|
fill_value = self._validate_scalar(fill_value)
|
|
new_data = take(
|
self._ndarray,
|
indices,
|
allow_fill=allow_fill,
|
fill_value=fill_value,
|
axis=axis,
|
)
|
return self._from_backing_data(new_data)
|
|
# ------------------------------------------------------------------------
|
|
def equals(self, other) -> bool:
|
if type(self) is not type(other):
|
return False
|
if not is_dtype_equal(self.dtype, other.dtype):
|
return False
|
return bool(array_equivalent(self._ndarray, other._ndarray))
|
|
@classmethod
|
def _from_factorized(cls, values, original):
|
assert values.dtype == original._ndarray.dtype
|
return original._from_backing_data(values)
|
|
def _values_for_argsort(self) -> np.ndarray:
|
return self._ndarray
|
|
def _values_for_factorize(self):
|
return self._ndarray, self._internal_fill_value
|
|
# Signature of "argmin" incompatible with supertype "ExtensionArray"
|
def argmin(self, axis: AxisInt = 0, skipna: bool = True): # type: ignore[override]
|
# override base class by adding axis keyword
|
validate_bool_kwarg(skipna, "skipna")
|
if not skipna and self._hasna:
|
raise NotImplementedError
|
return nargminmax(self, "argmin", axis=axis)
|
|
# Signature of "argmax" incompatible with supertype "ExtensionArray"
|
def argmax(self, axis: AxisInt = 0, skipna: bool = True): # type: ignore[override]
|
# override base class by adding axis keyword
|
validate_bool_kwarg(skipna, "skipna")
|
if not skipna and self._hasna:
|
raise NotImplementedError
|
return nargminmax(self, "argmax", axis=axis)
|
|
def unique(self: NDArrayBackedExtensionArrayT) -> NDArrayBackedExtensionArrayT:
|
new_data = unique(self._ndarray)
|
return self._from_backing_data(new_data)
|
|
@classmethod
|
@doc(ExtensionArray._concat_same_type)
|
def _concat_same_type(
|
cls: type[NDArrayBackedExtensionArrayT],
|
to_concat: Sequence[NDArrayBackedExtensionArrayT],
|
axis: AxisInt = 0,
|
) -> NDArrayBackedExtensionArrayT:
|
dtypes = {str(x.dtype) for x in to_concat}
|
if len(dtypes) != 1:
|
raise ValueError("to_concat must have the same dtype (tz)", dtypes)
|
|
new_values = [x._ndarray for x in to_concat]
|
new_arr = np.concatenate(new_values, axis=axis)
|
return to_concat[0]._from_backing_data(new_arr)
|
|
@doc(ExtensionArray.searchsorted)
|
def searchsorted(
|
self,
|
value: NumpyValueArrayLike | ExtensionArray,
|
side: Literal["left", "right"] = "left",
|
sorter: NumpySorter = None,
|
) -> npt.NDArray[np.intp] | np.intp:
|
npvalue = self._validate_setitem_value(value)
|
return self._ndarray.searchsorted(npvalue, side=side, sorter=sorter)
|
|
@doc(ExtensionArray.shift)
|
def shift(self, periods: int = 1, fill_value=None, axis: AxisInt = 0):
|
fill_value = self._validate_scalar(fill_value)
|
new_values = shift(self._ndarray, periods, axis, fill_value)
|
|
return self._from_backing_data(new_values)
|
|
def __setitem__(self, key, value) -> None:
|
key = check_array_indexer(self, key)
|
value = self._validate_setitem_value(value)
|
self._ndarray[key] = value
|
|
def _validate_setitem_value(self, value):
|
return value
|
|
@overload
|
def __getitem__(self, key: ScalarIndexer) -> Any:
|
...
|
|
@overload
|
def __getitem__(
|
self: NDArrayBackedExtensionArrayT,
|
key: SequenceIndexer | PositionalIndexerTuple,
|
) -> NDArrayBackedExtensionArrayT:
|
...
|
|
def __getitem__(
|
self: NDArrayBackedExtensionArrayT,
|
key: PositionalIndexer2D,
|
) -> NDArrayBackedExtensionArrayT | Any:
|
if lib.is_integer(key):
|
# fast-path
|
result = self._ndarray[key]
|
if self.ndim == 1:
|
return self._box_func(result)
|
return self._from_backing_data(result)
|
|
# error: Incompatible types in assignment (expression has type "ExtensionArray",
|
# variable has type "Union[int, slice, ndarray]")
|
key = extract_array(key, extract_numpy=True) # type: ignore[assignment]
|
key = check_array_indexer(self, key)
|
result = self._ndarray[key]
|
if lib.is_scalar(result):
|
return self._box_func(result)
|
|
result = self._from_backing_data(result)
|
return result
|
|
def _fill_mask_inplace(
|
self, method: str, limit, mask: npt.NDArray[np.bool_]
|
) -> None:
|
# (for now) when self.ndim == 2, we assume axis=0
|
func = missing.get_fill_func(method, ndim=self.ndim)
|
func(self._ndarray.T, limit=limit, mask=mask.T)
|
|
@doc(ExtensionArray.fillna)
|
def fillna(
|
self: NDArrayBackedExtensionArrayT, value=None, method=None, limit=None
|
) -> NDArrayBackedExtensionArrayT:
|
value, method = validate_fillna_kwargs(
|
value, method, validate_scalar_dict_value=False
|
)
|
|
mask = self.isna()
|
# error: Argument 2 to "check_value_size" has incompatible type
|
# "ExtensionArray"; expected "ndarray"
|
value = missing.check_value_size(
|
value, mask, len(self) # type: ignore[arg-type]
|
)
|
|
if mask.any():
|
if method is not None:
|
# TODO: check value is None
|
# (for now) when self.ndim == 2, we assume axis=0
|
func = missing.get_fill_func(method, ndim=self.ndim)
|
npvalues = self._ndarray.T.copy()
|
func(npvalues, limit=limit, mask=mask.T)
|
npvalues = npvalues.T
|
|
# TODO: PandasArray didn't used to copy, need tests for this
|
new_values = self._from_backing_data(npvalues)
|
else:
|
# fill with value
|
new_values = self.copy()
|
new_values[mask] = value
|
else:
|
# We validate the fill_value even if there is nothing to fill
|
if value is not None:
|
self._validate_setitem_value(value)
|
|
new_values = self.copy()
|
return new_values
|
|
# ------------------------------------------------------------------------
|
# Reductions
|
|
def _wrap_reduction_result(self, axis: AxisInt | None, result):
|
if axis is None or self.ndim == 1:
|
return self._box_func(result)
|
return self._from_backing_data(result)
|
|
# ------------------------------------------------------------------------
|
# __array_function__ methods
|
|
def _putmask(self, mask: npt.NDArray[np.bool_], value) -> None:
|
"""
|
Analogue to np.putmask(self, mask, value)
|
|
Parameters
|
----------
|
mask : np.ndarray[bool]
|
value : scalar or listlike
|
|
Raises
|
------
|
TypeError
|
If value cannot be cast to self.dtype.
|
"""
|
value = self._validate_setitem_value(value)
|
|
np.putmask(self._ndarray, mask, value)
|
|
def _where(
|
self: NDArrayBackedExtensionArrayT, mask: npt.NDArray[np.bool_], value
|
) -> NDArrayBackedExtensionArrayT:
|
"""
|
Analogue to np.where(mask, self, value)
|
|
Parameters
|
----------
|
mask : np.ndarray[bool]
|
value : scalar or listlike
|
|
Raises
|
------
|
TypeError
|
If value cannot be cast to self.dtype.
|
"""
|
value = self._validate_setitem_value(value)
|
|
res_values = np.where(mask, self._ndarray, value)
|
return self._from_backing_data(res_values)
|
|
# ------------------------------------------------------------------------
|
# Index compat methods
|
|
def insert(
|
self: NDArrayBackedExtensionArrayT, loc: int, item
|
) -> NDArrayBackedExtensionArrayT:
|
"""
|
Make new ExtensionArray inserting new item at location. Follows
|
Python list.append semantics for negative values.
|
|
Parameters
|
----------
|
loc : int
|
item : object
|
|
Returns
|
-------
|
type(self)
|
"""
|
loc = validate_insert_loc(loc, len(self))
|
|
code = self._validate_scalar(item)
|
|
new_vals = np.concatenate(
|
(
|
self._ndarray[:loc],
|
np.asarray([code], dtype=self._ndarray.dtype),
|
self._ndarray[loc:],
|
)
|
)
|
return self._from_backing_data(new_vals)
|
|
# ------------------------------------------------------------------------
|
# Additional array methods
|
# These are not part of the EA API, but we implement them because
|
# pandas assumes they're there.
|
|
def value_counts(self, dropna: bool = True) -> Series:
|
"""
|
Return a Series containing counts of unique values.
|
|
Parameters
|
----------
|
dropna : bool, default True
|
Don't include counts of NA values.
|
|
Returns
|
-------
|
Series
|
"""
|
if self.ndim != 1:
|
raise NotImplementedError
|
|
from pandas import (
|
Index,
|
Series,
|
)
|
|
if dropna:
|
# error: Unsupported operand type for ~ ("ExtensionArray")
|
values = self[~self.isna()]._ndarray # type: ignore[operator]
|
else:
|
values = self._ndarray
|
|
result = value_counts(values, sort=False, dropna=dropna)
|
|
index_arr = self._from_backing_data(np.asarray(result.index._data))
|
index = Index(index_arr, name=result.index.name)
|
return Series(result._values, index=index, name=result.name, copy=False)
|
|
def _quantile(
|
self: NDArrayBackedExtensionArrayT,
|
qs: npt.NDArray[np.float64],
|
interpolation: str,
|
) -> NDArrayBackedExtensionArrayT:
|
# TODO: disable for Categorical if not ordered?
|
|
mask = np.asarray(self.isna())
|
arr = self._ndarray
|
fill_value = self._internal_fill_value
|
|
res_values = quantile_with_mask(arr, mask, fill_value, qs, interpolation)
|
|
res_values = self._cast_quantile_result(res_values)
|
return self._from_backing_data(res_values)
|
|
# TODO: see if we can share this with other dispatch-wrapping methods
|
def _cast_quantile_result(self, res_values: np.ndarray) -> np.ndarray:
|
"""
|
Cast the result of quantile_with_mask to an appropriate dtype
|
to pass to _from_backing_data in _quantile.
|
"""
|
return res_values
|
|
# ------------------------------------------------------------------------
|
# numpy-like methods
|
|
@classmethod
|
def _empty(
|
cls: type_t[NDArrayBackedExtensionArrayT], shape: Shape, dtype: ExtensionDtype
|
) -> NDArrayBackedExtensionArrayT:
|
"""
|
Analogous to np.empty(shape, dtype=dtype)
|
|
Parameters
|
----------
|
shape : tuple[int]
|
dtype : ExtensionDtype
|
"""
|
# The base implementation uses a naive approach to find the dtype
|
# for the backing ndarray
|
arr = cls._from_sequence([], dtype=dtype)
|
backing = np.empty(shape, dtype=arr._ndarray.dtype)
|
return arr._from_backing_data(backing)
|