from __future__ import annotations
|
|
import numpy as np
|
|
from pandas._libs import lib
|
from pandas._libs.tslibs import (
|
get_unit_from_dtype,
|
is_supported_unit,
|
)
|
from pandas._typing import (
|
AxisInt,
|
Dtype,
|
NpDtype,
|
Scalar,
|
npt,
|
)
|
from pandas.compat.numpy import function as nv
|
|
from pandas.core.dtypes.astype import astype_array
|
from pandas.core.dtypes.cast import construct_1d_object_array_from_listlike
|
from pandas.core.dtypes.common import (
|
is_dtype_equal,
|
pandas_dtype,
|
)
|
from pandas.core.dtypes.dtypes import PandasDtype
|
from pandas.core.dtypes.missing import isna
|
|
from pandas.core import (
|
arraylike,
|
nanops,
|
ops,
|
)
|
from pandas.core.arraylike import OpsMixin
|
from pandas.core.arrays._mixins import NDArrayBackedExtensionArray
|
from pandas.core.construction import ensure_wrapped_if_datetimelike
|
from pandas.core.strings.object_array import ObjectStringArrayMixin
|
|
|
class PandasArray(
|
OpsMixin,
|
NDArrayBackedExtensionArray,
|
ObjectStringArrayMixin,
|
):
|
"""
|
A pandas ExtensionArray for NumPy data.
|
|
This is mostly for internal compatibility, and is not especially
|
useful on its own.
|
|
Parameters
|
----------
|
values : ndarray
|
The NumPy ndarray to wrap. Must be 1-dimensional.
|
copy : bool, default False
|
Whether to copy `values`.
|
|
Attributes
|
----------
|
None
|
|
Methods
|
-------
|
None
|
"""
|
|
# If you're wondering why pd.Series(cls) doesn't put the array in an
|
# ExtensionBlock, search for `ABCPandasArray`. We check for
|
# that _typ to ensure that users don't unnecessarily use EAs inside
|
# pandas internals, which turns off things like block consolidation.
|
_typ = "npy_extension"
|
__array_priority__ = 1000
|
_ndarray: np.ndarray
|
_dtype: PandasDtype
|
_internal_fill_value = np.nan
|
|
# ------------------------------------------------------------------------
|
# Constructors
|
|
def __init__(self, values: np.ndarray | PandasArray, copy: bool = False) -> None:
|
if isinstance(values, type(self)):
|
values = values._ndarray
|
if not isinstance(values, np.ndarray):
|
raise ValueError(
|
f"'values' must be a NumPy array, not {type(values).__name__}"
|
)
|
|
if values.ndim == 0:
|
# Technically we support 2, but do not advertise that fact.
|
raise ValueError("PandasArray must be 1-dimensional.")
|
|
if copy:
|
values = values.copy()
|
|
dtype = PandasDtype(values.dtype)
|
super().__init__(values, dtype)
|
|
@classmethod
|
def _from_sequence(
|
cls, scalars, *, dtype: Dtype | None = None, copy: bool = False
|
) -> PandasArray:
|
if isinstance(dtype, PandasDtype):
|
dtype = dtype._dtype
|
|
# error: Argument "dtype" to "asarray" has incompatible type
|
# "Union[ExtensionDtype, str, dtype[Any], dtype[floating[_64Bit]], Type[object],
|
# None]"; expected "Union[dtype[Any], None, type, _SupportsDType, str,
|
# Union[Tuple[Any, int], Tuple[Any, Union[int, Sequence[int]]], List[Any],
|
# _DTypeDict, Tuple[Any, Any]]]"
|
result = np.asarray(scalars, dtype=dtype) # type: ignore[arg-type]
|
if (
|
result.ndim > 1
|
and not hasattr(scalars, "dtype")
|
and (dtype is None or dtype == object)
|
):
|
# e.g. list-of-tuples
|
result = construct_1d_object_array_from_listlike(scalars)
|
|
if copy and result is scalars:
|
result = result.copy()
|
return cls(result)
|
|
def _from_backing_data(self, arr: np.ndarray) -> PandasArray:
|
return type(self)(arr)
|
|
# ------------------------------------------------------------------------
|
# Data
|
|
@property
|
def dtype(self) -> PandasDtype:
|
return self._dtype
|
|
# ------------------------------------------------------------------------
|
# NumPy Array Interface
|
|
def __array__(self, dtype: NpDtype | None = None) -> np.ndarray:
|
return np.asarray(self._ndarray, dtype=dtype)
|
|
def __array_ufunc__(self, ufunc: np.ufunc, method: str, *inputs, **kwargs):
|
# Lightly modified version of
|
# https://numpy.org/doc/stable/reference/generated/numpy.lib.mixins.NDArrayOperatorsMixin.html
|
# The primary modification is not boxing scalar return values
|
# in PandasArray, since pandas' ExtensionArrays are 1-d.
|
out = kwargs.get("out", ())
|
|
result = ops.maybe_dispatch_ufunc_to_dunder_op(
|
self, ufunc, method, *inputs, **kwargs
|
)
|
if result is not NotImplemented:
|
return result
|
|
if "out" in kwargs:
|
# e.g. test_ufunc_unary
|
return arraylike.dispatch_ufunc_with_out(
|
self, ufunc, method, *inputs, **kwargs
|
)
|
|
if method == "reduce":
|
result = arraylike.dispatch_reduction_ufunc(
|
self, ufunc, method, *inputs, **kwargs
|
)
|
if result is not NotImplemented:
|
# e.g. tests.series.test_ufunc.TestNumpyReductions
|
return result
|
|
# Defer to the implementation of the ufunc on unwrapped values.
|
inputs = tuple(x._ndarray if isinstance(x, PandasArray) else x for x in inputs)
|
if out:
|
kwargs["out"] = tuple(
|
x._ndarray if isinstance(x, PandasArray) else x for x in out
|
)
|
result = getattr(ufunc, method)(*inputs, **kwargs)
|
|
if ufunc.nout > 1:
|
# multiple return values; re-box array-like results
|
return tuple(type(self)(x) for x in result)
|
elif method == "at":
|
# no return value
|
return None
|
elif method == "reduce":
|
if isinstance(result, np.ndarray):
|
# e.g. test_np_reduce_2d
|
return type(self)(result)
|
|
# e.g. test_np_max_nested_tuples
|
return result
|
else:
|
# one return value; re-box array-like results
|
return type(self)(result)
|
|
# ------------------------------------------------------------------------
|
# Pandas ExtensionArray Interface
|
|
def astype(self, dtype, copy: bool = True):
|
dtype = pandas_dtype(dtype)
|
|
if is_dtype_equal(dtype, self.dtype):
|
if copy:
|
return self.copy()
|
return self
|
|
result = astype_array(self._ndarray, dtype=dtype, copy=copy)
|
return result
|
|
def isna(self) -> np.ndarray:
|
return isna(self._ndarray)
|
|
def _validate_scalar(self, fill_value):
|
if fill_value is None:
|
# Primarily for subclasses
|
fill_value = self.dtype.na_value
|
return fill_value
|
|
def _values_for_factorize(self) -> tuple[np.ndarray, float | None]:
|
if self.dtype.kind in ["i", "u", "b"]:
|
fv = None
|
else:
|
fv = np.nan
|
return self._ndarray, fv
|
|
# ------------------------------------------------------------------------
|
# Reductions
|
|
def any(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
out=None,
|
keepdims: bool = False,
|
skipna: bool = True,
|
):
|
nv.validate_any((), {"out": out, "keepdims": keepdims})
|
result = nanops.nanany(self._ndarray, axis=axis, skipna=skipna)
|
return self._wrap_reduction_result(axis, result)
|
|
def all(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
out=None,
|
keepdims: bool = False,
|
skipna: bool = True,
|
):
|
nv.validate_all((), {"out": out, "keepdims": keepdims})
|
result = nanops.nanall(self._ndarray, axis=axis, skipna=skipna)
|
return self._wrap_reduction_result(axis, result)
|
|
def min(
|
self, *, axis: AxisInt | None = None, skipna: bool = True, **kwargs
|
) -> Scalar:
|
nv.validate_min((), kwargs)
|
result = nanops.nanmin(
|
values=self._ndarray, axis=axis, mask=self.isna(), skipna=skipna
|
)
|
return self._wrap_reduction_result(axis, result)
|
|
def max(
|
self, *, axis: AxisInt | None = None, skipna: bool = True, **kwargs
|
) -> Scalar:
|
nv.validate_max((), kwargs)
|
result = nanops.nanmax(
|
values=self._ndarray, axis=axis, mask=self.isna(), skipna=skipna
|
)
|
return self._wrap_reduction_result(axis, result)
|
|
def sum(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
skipna: bool = True,
|
min_count: int = 0,
|
**kwargs,
|
) -> Scalar:
|
nv.validate_sum((), kwargs)
|
result = nanops.nansum(
|
self._ndarray, axis=axis, skipna=skipna, min_count=min_count
|
)
|
return self._wrap_reduction_result(axis, result)
|
|
def prod(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
skipna: bool = True,
|
min_count: int = 0,
|
**kwargs,
|
) -> Scalar:
|
nv.validate_prod((), kwargs)
|
result = nanops.nanprod(
|
self._ndarray, axis=axis, skipna=skipna, min_count=min_count
|
)
|
return self._wrap_reduction_result(axis, result)
|
|
def mean(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
dtype: NpDtype | None = None,
|
out=None,
|
keepdims: bool = False,
|
skipna: bool = True,
|
):
|
nv.validate_mean((), {"dtype": dtype, "out": out, "keepdims": keepdims})
|
result = nanops.nanmean(self._ndarray, axis=axis, skipna=skipna)
|
return self._wrap_reduction_result(axis, result)
|
|
def median(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
out=None,
|
overwrite_input: bool = False,
|
keepdims: bool = False,
|
skipna: bool = True,
|
):
|
nv.validate_median(
|
(), {"out": out, "overwrite_input": overwrite_input, "keepdims": keepdims}
|
)
|
result = nanops.nanmedian(self._ndarray, axis=axis, skipna=skipna)
|
return self._wrap_reduction_result(axis, result)
|
|
def std(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
dtype: NpDtype | None = None,
|
out=None,
|
ddof: int = 1,
|
keepdims: bool = False,
|
skipna: bool = True,
|
):
|
nv.validate_stat_ddof_func(
|
(), {"dtype": dtype, "out": out, "keepdims": keepdims}, fname="std"
|
)
|
result = nanops.nanstd(self._ndarray, axis=axis, skipna=skipna, ddof=ddof)
|
return self._wrap_reduction_result(axis, result)
|
|
def var(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
dtype: NpDtype | None = None,
|
out=None,
|
ddof: int = 1,
|
keepdims: bool = False,
|
skipna: bool = True,
|
):
|
nv.validate_stat_ddof_func(
|
(), {"dtype": dtype, "out": out, "keepdims": keepdims}, fname="var"
|
)
|
result = nanops.nanvar(self._ndarray, axis=axis, skipna=skipna, ddof=ddof)
|
return self._wrap_reduction_result(axis, result)
|
|
def sem(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
dtype: NpDtype | None = None,
|
out=None,
|
ddof: int = 1,
|
keepdims: bool = False,
|
skipna: bool = True,
|
):
|
nv.validate_stat_ddof_func(
|
(), {"dtype": dtype, "out": out, "keepdims": keepdims}, fname="sem"
|
)
|
result = nanops.nansem(self._ndarray, axis=axis, skipna=skipna, ddof=ddof)
|
return self._wrap_reduction_result(axis, result)
|
|
def kurt(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
dtype: NpDtype | None = None,
|
out=None,
|
keepdims: bool = False,
|
skipna: bool = True,
|
):
|
nv.validate_stat_ddof_func(
|
(), {"dtype": dtype, "out": out, "keepdims": keepdims}, fname="kurt"
|
)
|
result = nanops.nankurt(self._ndarray, axis=axis, skipna=skipna)
|
return self._wrap_reduction_result(axis, result)
|
|
def skew(
|
self,
|
*,
|
axis: AxisInt | None = None,
|
dtype: NpDtype | None = None,
|
out=None,
|
keepdims: bool = False,
|
skipna: bool = True,
|
):
|
nv.validate_stat_ddof_func(
|
(), {"dtype": dtype, "out": out, "keepdims": keepdims}, fname="skew"
|
)
|
result = nanops.nanskew(self._ndarray, axis=axis, skipna=skipna)
|
return self._wrap_reduction_result(axis, result)
|
|
# ------------------------------------------------------------------------
|
# Additional Methods
|
|
def to_numpy(
|
self,
|
dtype: npt.DTypeLike | None = None,
|
copy: bool = False,
|
na_value: object = lib.no_default,
|
) -> np.ndarray:
|
mask = self.isna()
|
if na_value is not lib.no_default and mask.any():
|
result = self._ndarray.copy()
|
result[mask] = na_value
|
else:
|
result = self._ndarray
|
|
result = np.asarray(result, dtype=dtype)
|
|
if copy and result is self._ndarray:
|
result = result.copy()
|
|
return result
|
|
# ------------------------------------------------------------------------
|
# Ops
|
|
def __invert__(self) -> PandasArray:
|
return type(self)(~self._ndarray)
|
|
def __neg__(self) -> PandasArray:
|
return type(self)(-self._ndarray)
|
|
def __pos__(self) -> PandasArray:
|
return type(self)(+self._ndarray)
|
|
def __abs__(self) -> PandasArray:
|
return type(self)(abs(self._ndarray))
|
|
def _cmp_method(self, other, op):
|
if isinstance(other, PandasArray):
|
other = other._ndarray
|
|
other = ops.maybe_prepare_scalar_for_op(other, (len(self),))
|
pd_op = ops.get_array_op(op)
|
other = ensure_wrapped_if_datetimelike(other)
|
with np.errstate(all="ignore"):
|
result = pd_op(self._ndarray, other)
|
|
if op is divmod or op is ops.rdivmod:
|
a, b = result
|
if isinstance(a, np.ndarray):
|
# for e.g. op vs TimedeltaArray, we may already
|
# have an ExtensionArray, in which case we do not wrap
|
return self._wrap_ndarray_result(a), self._wrap_ndarray_result(b)
|
return a, b
|
|
if isinstance(result, np.ndarray):
|
# for e.g. multiplication vs TimedeltaArray, we may already
|
# have an ExtensionArray, in which case we do not wrap
|
return self._wrap_ndarray_result(result)
|
return result
|
|
_arith_method = _cmp_method
|
|
def _wrap_ndarray_result(self, result: np.ndarray):
|
# If we have timedelta64[ns] result, return a TimedeltaArray instead
|
# of a PandasArray
|
if result.dtype.kind == "m" and is_supported_unit(
|
get_unit_from_dtype(result.dtype)
|
):
|
from pandas.core.arrays import TimedeltaArray
|
|
return TimedeltaArray._simple_new(result, dtype=result.dtype)
|
return type(self)(result)
|
|
# ------------------------------------------------------------------------
|
# String methods interface
|
_str_na_value = np.nan
|