"""
|
Numba 1D min/max kernels that can be shared by
|
* Dataframe / Series
|
* groupby
|
* rolling / expanding
|
|
Mirrors pandas/_libs/window/aggregation.pyx
|
"""
|
from __future__ import annotations
|
|
import numba
|
import numpy as np
|
|
|
@numba.jit(nopython=True, nogil=True, parallel=False)
|
def sliding_min_max(
|
values: np.ndarray,
|
start: np.ndarray,
|
end: np.ndarray,
|
min_periods: int,
|
is_max: bool,
|
) -> np.ndarray:
|
N = len(start)
|
nobs = 0
|
output = np.empty(N, dtype=np.float64)
|
# Use deque once numba supports it
|
# https://github.com/numba/numba/issues/7417
|
Q: list = []
|
W: list = []
|
for i in range(N):
|
curr_win_size = end[i] - start[i]
|
if i == 0:
|
st = start[i]
|
else:
|
st = end[i - 1]
|
|
for k in range(st, end[i]):
|
ai = values[k]
|
if not np.isnan(ai):
|
nobs += 1
|
elif is_max:
|
ai = -np.inf
|
else:
|
ai = np.inf
|
# Discard previous entries if we find new min or max
|
if is_max:
|
while Q and ((ai >= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]):
|
Q.pop()
|
else:
|
while Q and ((ai <= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]):
|
Q.pop()
|
Q.append(k)
|
W.append(k)
|
|
# Discard entries outside and left of current window
|
while Q and Q[0] <= start[i] - 1:
|
Q.pop(0)
|
while W and W[0] <= start[i] - 1:
|
if not np.isnan(values[W[0]]):
|
nobs -= 1
|
W.pop(0)
|
|
# Save output based on index in input value array
|
if Q and curr_win_size > 0 and nobs >= min_periods:
|
output[i] = values[Q[0]]
|
else:
|
output[i] = np.nan
|
|
return output
|