"""
|
Arithmetic operations for PandasObjects
|
|
This is not a public API.
|
"""
|
from __future__ import annotations
|
|
import operator
|
from typing import (
|
TYPE_CHECKING,
|
cast,
|
)
|
|
import numpy as np
|
|
from pandas._libs.ops_dispatch import maybe_dispatch_ufunc_to_dunder_op
|
from pandas._typing import (
|
Axis,
|
AxisInt,
|
Level,
|
)
|
from pandas.util._decorators import Appender
|
|
from pandas.core.dtypes.common import (
|
is_array_like,
|
is_list_like,
|
)
|
from pandas.core.dtypes.generic import (
|
ABCDataFrame,
|
ABCSeries,
|
)
|
from pandas.core.dtypes.missing import isna
|
|
from pandas.core import (
|
algorithms,
|
roperator,
|
)
|
from pandas.core.ops.array_ops import (
|
arithmetic_op,
|
comp_method_OBJECT_ARRAY,
|
comparison_op,
|
get_array_op,
|
logical_op,
|
maybe_prepare_scalar_for_op,
|
)
|
from pandas.core.ops.common import (
|
get_op_result_name,
|
unpack_zerodim_and_defer,
|
)
|
from pandas.core.ops.docstrings import (
|
_flex_comp_doc_FRAME,
|
_op_descriptions,
|
make_flex_doc,
|
)
|
from pandas.core.ops.invalid import invalid_comparison
|
from pandas.core.ops.mask_ops import (
|
kleene_and,
|
kleene_or,
|
kleene_xor,
|
)
|
from pandas.core.ops.methods import add_flex_arithmetic_methods
|
from pandas.core.roperator import (
|
radd,
|
rand_,
|
rdiv,
|
rdivmod,
|
rfloordiv,
|
rmod,
|
rmul,
|
ror_,
|
rpow,
|
rsub,
|
rtruediv,
|
rxor,
|
)
|
|
if TYPE_CHECKING:
|
from pandas import (
|
DataFrame,
|
Series,
|
)
|
|
# -----------------------------------------------------------------------------
|
# constants
|
ARITHMETIC_BINOPS: set[str] = {
|
"add",
|
"sub",
|
"mul",
|
"pow",
|
"mod",
|
"floordiv",
|
"truediv",
|
"divmod",
|
"radd",
|
"rsub",
|
"rmul",
|
"rpow",
|
"rmod",
|
"rfloordiv",
|
"rtruediv",
|
"rdivmod",
|
}
|
|
|
COMPARISON_BINOPS: set[str] = {"eq", "ne", "lt", "gt", "le", "ge"}
|
|
|
# -----------------------------------------------------------------------------
|
# Masking NA values and fallbacks for operations numpy does not support
|
|
|
def fill_binop(left, right, fill_value):
|
"""
|
If a non-None fill_value is given, replace null entries in left and right
|
with this value, but only in positions where _one_ of left/right is null,
|
not both.
|
|
Parameters
|
----------
|
left : array-like
|
right : array-like
|
fill_value : object
|
|
Returns
|
-------
|
left : array-like
|
right : array-like
|
|
Notes
|
-----
|
Makes copies if fill_value is not None and NAs are present.
|
"""
|
if fill_value is not None:
|
left_mask = isna(left)
|
right_mask = isna(right)
|
|
# one but not both
|
mask = left_mask ^ right_mask
|
|
if left_mask.any():
|
# Avoid making a copy if we can
|
left = left.copy()
|
left[left_mask & mask] = fill_value
|
|
if right_mask.any():
|
# Avoid making a copy if we can
|
right = right.copy()
|
right[right_mask & mask] = fill_value
|
|
return left, right
|
|
|
# -----------------------------------------------------------------------------
|
# Series
|
|
|
def align_method_SERIES(left: Series, right, align_asobject: bool = False):
|
"""align lhs and rhs Series"""
|
# ToDo: Different from align_method_FRAME, list, tuple and ndarray
|
# are not coerced here
|
# because Series has inconsistencies described in #13637
|
|
if isinstance(right, ABCSeries):
|
# avoid repeated alignment
|
if not left.index.equals(right.index):
|
if align_asobject:
|
# to keep original value's dtype for bool ops
|
left = left.astype(object)
|
right = right.astype(object)
|
|
left, right = left.align(right, copy=False)
|
|
return left, right
|
|
|
def flex_method_SERIES(op):
|
name = op.__name__.strip("_")
|
doc = make_flex_doc(name, "series")
|
|
@Appender(doc)
|
def flex_wrapper(self, other, level=None, fill_value=None, axis: Axis = 0):
|
# validate axis
|
if axis is not None:
|
self._get_axis_number(axis)
|
|
res_name = get_op_result_name(self, other)
|
|
if isinstance(other, ABCSeries):
|
return self._binop(other, op, level=level, fill_value=fill_value)
|
elif isinstance(other, (np.ndarray, list, tuple)):
|
if len(other) != len(self):
|
raise ValueError("Lengths must be equal")
|
other = self._constructor(other, self.index)
|
result = self._binop(other, op, level=level, fill_value=fill_value)
|
result.name = res_name
|
return result
|
else:
|
if fill_value is not None:
|
self = self.fillna(fill_value)
|
|
return op(self, other)
|
|
flex_wrapper.__name__ = name
|
return flex_wrapper
|
|
|
# -----------------------------------------------------------------------------
|
# DataFrame
|
|
|
def align_method_FRAME(
|
left, right, axis, flex: bool | None = False, level: Level = None
|
):
|
"""
|
Convert rhs to meet lhs dims if input is list, tuple or np.ndarray.
|
|
Parameters
|
----------
|
left : DataFrame
|
right : Any
|
axis : int, str, or None
|
flex : bool or None, default False
|
Whether this is a flex op, in which case we reindex.
|
None indicates not to check for alignment.
|
level : int or level name, default None
|
|
Returns
|
-------
|
left : DataFrame
|
right : Any
|
"""
|
|
def to_series(right):
|
msg = "Unable to coerce to Series, length must be {req_len}: given {given_len}"
|
|
# pass dtype to avoid doing inference, which would break consistency
|
# with Index/Series ops
|
dtype = None
|
if getattr(right, "dtype", None) == object:
|
# can't pass right.dtype unconditionally as that would break on e.g.
|
# datetime64[h] ndarray
|
dtype = object
|
|
if axis is not None and left._get_axis_name(axis) == "index":
|
if len(left.index) != len(right):
|
raise ValueError(
|
msg.format(req_len=len(left.index), given_len=len(right))
|
)
|
right = left._constructor_sliced(right, index=left.index, dtype=dtype)
|
else:
|
if len(left.columns) != len(right):
|
raise ValueError(
|
msg.format(req_len=len(left.columns), given_len=len(right))
|
)
|
right = left._constructor_sliced(right, index=left.columns, dtype=dtype)
|
return right
|
|
if isinstance(right, np.ndarray):
|
if right.ndim == 1:
|
right = to_series(right)
|
|
elif right.ndim == 2:
|
# We need to pass dtype=right.dtype to retain object dtype
|
# otherwise we lose consistency with Index and array ops
|
dtype = None
|
if getattr(right, "dtype", None) == object:
|
# can't pass right.dtype unconditionally as that would break on e.g.
|
# datetime64[h] ndarray
|
dtype = object
|
|
if right.shape == left.shape:
|
right = left._constructor(
|
right, index=left.index, columns=left.columns, dtype=dtype
|
)
|
|
elif right.shape[0] == left.shape[0] and right.shape[1] == 1:
|
# Broadcast across columns
|
right = np.broadcast_to(right, left.shape)
|
right = left._constructor(
|
right, index=left.index, columns=left.columns, dtype=dtype
|
)
|
|
elif right.shape[1] == left.shape[1] and right.shape[0] == 1:
|
# Broadcast along rows
|
right = to_series(right[0, :])
|
|
else:
|
raise ValueError(
|
"Unable to coerce to DataFrame, shape "
|
f"must be {left.shape}: given {right.shape}"
|
)
|
|
elif right.ndim > 2:
|
raise ValueError(
|
"Unable to coerce to Series/DataFrame, "
|
f"dimension must be <= 2: {right.shape}"
|
)
|
|
elif is_list_like(right) and not isinstance(right, (ABCSeries, ABCDataFrame)):
|
# GH 36702. Raise when attempting arithmetic with list of array-like.
|
if any(is_array_like(el) for el in right):
|
raise ValueError(
|
f"Unable to coerce list of {type(right[0])} to Series/DataFrame"
|
)
|
# GH17901
|
right = to_series(right)
|
|
if flex is not None and isinstance(right, ABCDataFrame):
|
if not left._indexed_same(right):
|
if flex:
|
left, right = left.align(right, join="outer", level=level, copy=False)
|
else:
|
raise ValueError(
|
"Can only compare identically-labeled (both index and columns) "
|
"DataFrame objects"
|
)
|
elif isinstance(right, ABCSeries):
|
# axis=1 is default for DataFrame-with-Series op
|
axis = left._get_axis_number(axis) if axis is not None else 1
|
|
if not flex:
|
if not left.axes[axis].equals(right.index):
|
raise ValueError(
|
"Operands are not aligned. Do "
|
"`left, right = left.align(right, axis=1, copy=False)` "
|
"before operating."
|
)
|
|
left, right = left.align(
|
right, join="outer", axis=axis, level=level, copy=False
|
)
|
right = _maybe_align_series_as_frame(left, right, axis)
|
|
return left, right
|
|
|
def should_reindex_frame_op(
|
left: DataFrame, right, op, axis: int, fill_value, level
|
) -> bool:
|
"""
|
Check if this is an operation between DataFrames that will need to reindex.
|
"""
|
assert isinstance(left, ABCDataFrame)
|
|
if op is operator.pow or op is roperator.rpow:
|
# GH#32685 pow has special semantics for operating with null values
|
return False
|
|
if not isinstance(right, ABCDataFrame):
|
return False
|
|
if fill_value is None and level is None and axis == 1:
|
# TODO: any other cases we should handle here?
|
|
# Intersection is always unique so we have to check the unique columns
|
left_uniques = left.columns.unique()
|
right_uniques = right.columns.unique()
|
cols = left_uniques.intersection(right_uniques)
|
if len(cols) and not (
|
len(cols) == len(left_uniques) and len(cols) == len(right_uniques)
|
):
|
# TODO: is there a shortcut available when len(cols) == 0?
|
return True
|
|
return False
|
|
|
def frame_arith_method_with_reindex(left: DataFrame, right: DataFrame, op) -> DataFrame:
|
"""
|
For DataFrame-with-DataFrame operations that require reindexing,
|
operate only on shared columns, then reindex.
|
|
Parameters
|
----------
|
left : DataFrame
|
right : DataFrame
|
op : binary operator
|
|
Returns
|
-------
|
DataFrame
|
"""
|
# GH#31623, only operate on shared columns
|
cols, lcols, rcols = left.columns.join(
|
right.columns, how="inner", level=None, return_indexers=True
|
)
|
|
new_left = left.iloc[:, lcols]
|
new_right = right.iloc[:, rcols]
|
result = op(new_left, new_right)
|
|
# Do the join on the columns instead of using align_method_FRAME
|
# to avoid constructing two potentially large/sparse DataFrames
|
join_columns, _, _ = left.columns.join(
|
right.columns, how="outer", level=None, return_indexers=True
|
)
|
|
if result.columns.has_duplicates:
|
# Avoid reindexing with a duplicate axis.
|
# https://github.com/pandas-dev/pandas/issues/35194
|
indexer, _ = result.columns.get_indexer_non_unique(join_columns)
|
indexer = algorithms.unique1d(indexer)
|
result = result._reindex_with_indexers(
|
{1: [join_columns, indexer]}, allow_dups=True
|
)
|
else:
|
result = result.reindex(join_columns, axis=1)
|
|
return result
|
|
|
def _maybe_align_series_as_frame(frame: DataFrame, series: Series, axis: AxisInt):
|
"""
|
If the Series operand is not EA-dtype, we can broadcast to 2D and operate
|
blockwise.
|
"""
|
rvalues = series._values
|
if not isinstance(rvalues, np.ndarray):
|
# TODO(EA2D): no need to special-case with 2D EAs
|
if rvalues.dtype in ("datetime64[ns]", "timedelta64[ns]"):
|
# We can losslessly+cheaply cast to ndarray
|
rvalues = np.asarray(rvalues)
|
else:
|
return series
|
|
if axis == 0:
|
rvalues = rvalues.reshape(-1, 1)
|
else:
|
rvalues = rvalues.reshape(1, -1)
|
|
rvalues = np.broadcast_to(rvalues, frame.shape)
|
# pass dtype to avoid doing inference
|
return type(frame)(
|
rvalues, index=frame.index, columns=frame.columns, dtype=rvalues.dtype
|
)
|
|
|
def flex_arith_method_FRAME(op):
|
op_name = op.__name__.strip("_")
|
|
na_op = get_array_op(op)
|
doc = make_flex_doc(op_name, "dataframe")
|
|
@Appender(doc)
|
def f(self, other, axis: Axis = "columns", level=None, fill_value=None):
|
axis = self._get_axis_number(axis) if axis is not None else 1
|
axis = cast(int, axis)
|
|
if should_reindex_frame_op(self, other, op, axis, fill_value, level):
|
return frame_arith_method_with_reindex(self, other, op)
|
|
if isinstance(other, ABCSeries) and fill_value is not None:
|
# TODO: We could allow this in cases where we end up going
|
# through the DataFrame path
|
raise NotImplementedError(f"fill_value {fill_value} not supported.")
|
|
other = maybe_prepare_scalar_for_op(other, self.shape)
|
self, other = align_method_FRAME(self, other, axis, flex=True, level=level)
|
|
if isinstance(other, ABCDataFrame):
|
# Another DataFrame
|
new_data = self._combine_frame(other, na_op, fill_value)
|
|
elif isinstance(other, ABCSeries):
|
new_data = self._dispatch_frame_op(other, op, axis=axis)
|
else:
|
# in this case we always have `np.ndim(other) == 0`
|
if fill_value is not None:
|
self = self.fillna(fill_value)
|
|
new_data = self._dispatch_frame_op(other, op)
|
|
return self._construct_result(new_data)
|
|
f.__name__ = op_name
|
|
return f
|
|
|
def flex_comp_method_FRAME(op):
|
op_name = op.__name__.strip("_")
|
|
doc = _flex_comp_doc_FRAME.format(
|
op_name=op_name, desc=_op_descriptions[op_name]["desc"]
|
)
|
|
@Appender(doc)
|
def f(self, other, axis: Axis = "columns", level=None):
|
axis = self._get_axis_number(axis) if axis is not None else 1
|
|
self, other = align_method_FRAME(self, other, axis, flex=True, level=level)
|
|
new_data = self._dispatch_frame_op(other, op, axis=axis)
|
return self._construct_result(new_data)
|
|
f.__name__ = op_name
|
|
return f
|
|
|
__all__ = [
|
"add_flex_arithmetic_methods",
|
"align_method_FRAME",
|
"align_method_SERIES",
|
"ARITHMETIC_BINOPS",
|
"arithmetic_op",
|
"COMPARISON_BINOPS",
|
"comparison_op",
|
"comp_method_OBJECT_ARRAY",
|
"fill_binop",
|
"flex_arith_method_FRAME",
|
"flex_comp_method_FRAME",
|
"flex_method_SERIES",
|
"frame_arith_method_with_reindex",
|
"invalid_comparison",
|
"kleene_and",
|
"kleene_or",
|
"kleene_xor",
|
"logical_op",
|
"maybe_dispatch_ufunc_to_dunder_op",
|
"radd",
|
"rand_",
|
"rdiv",
|
"rdivmod",
|
"rfloordiv",
|
"rmod",
|
"rmul",
|
"ror_",
|
"rpow",
|
"rsub",
|
"rtruediv",
|
"rxor",
|
"should_reindex_frame_op",
|
"unpack_zerodim_and_defer",
|
]
|