zmc
2023-10-12 ed135d79df12a2466b52dae1a82326941211dcc9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# cython: boundscheck=False, wraparound=False, cdivision=True
 
import numpy as np
 
from numpy cimport (
    int64_t,
    ndarray,
)
 
# Cython routines for window indexers
 
 
def calculate_variable_window_bounds(
    int64_t num_values,
    int64_t window_size,
    object min_periods,  # unused but here to match get_window_bounds signature
    bint center,
    str closed,
    const int64_t[:] index
):
    """
    Calculate window boundaries for rolling windows from a time offset.
 
    Parameters
    ----------
    num_values : int64
        total number of values
 
    window_size : int64
        window size calculated from the offset
 
    min_periods : object
        ignored, exists for compatibility
 
    center : bint
        center the rolling window on the current observation
 
    closed : str
        string of side of the window that should be closed
 
    index : ndarray[int64]
        time series index to roll over
 
    Returns
    -------
    (ndarray[int64], ndarray[int64])
    """
    cdef:
        bint left_closed = False
        bint right_closed = False
        ndarray[int64_t, ndim=1] start, end
        int64_t start_bound, end_bound, index_growth_sign = 1
        Py_ssize_t i, j
 
    if num_values <= 0:
        return np.empty(0, dtype="int64"), np.empty(0, dtype="int64")
 
    # default is 'right'
    if closed is None:
        closed = "right"
 
    if closed in ["right", "both"]:
        right_closed = True
 
    if closed in ["left", "both"]:
        left_closed = True
 
    # GH 43997:
    # If the forward and the backward facing windows
    # would result in a fraction of 1/2 a nanosecond
    # we need to make both interval ends inclusive.
    if center and window_size % 2 == 1:
        right_closed = True
        left_closed = True
 
    if index[num_values - 1] < index[0]:
        index_growth_sign = -1
 
    start = np.empty(num_values, dtype="int64")
    start.fill(-1)
    end = np.empty(num_values, dtype="int64")
    end.fill(-1)
 
    start[0] = 0
 
    # right endpoint is closed
    if right_closed:
        end[0] = 1
    # right endpoint is open
    else:
        end[0] = 0
    if center:
        end_bound = index[0] + index_growth_sign * window_size / 2
        for j in range(0, num_values):
            if (index[j] - end_bound) * index_growth_sign < 0:
                end[0] = j + 1
            elif (index[j] - end_bound) * index_growth_sign == 0 and right_closed:
                end[0] = j + 1
            elif (index[j] - end_bound) * index_growth_sign >= 0:
                end[0] = j
                break
 
    with nogil:
 
        # start is start of slice interval (including)
        # end is end of slice interval (not including)
        for i in range(1, num_values):
            if center:
                end_bound = index[i] + index_growth_sign * window_size / 2
                start_bound = index[i] - index_growth_sign * window_size / 2
            else:
                end_bound = index[i]
                start_bound = index[i] - index_growth_sign * window_size
 
            # left endpoint is closed
            if left_closed:
                start_bound -= 1 * index_growth_sign
 
            # advance the start bound until we are
            # within the constraint
            start[i] = i
            for j in range(start[i - 1], i):
                if (index[j] - start_bound) * index_growth_sign > 0:
                    start[i] = j
                    break
 
            # for centered window advance the end bound until we are
            # outside the constraint
            if center:
                for j in range(end[i - 1], num_values + 1):
                    if j == num_values:
                        end[i] = j
                    elif ((index[j] - end_bound) * index_growth_sign == 0 and
                          right_closed):
                        end[i] = j + 1
                    elif (index[j] - end_bound) * index_growth_sign >= 0:
                        end[i] = j
                        break
            # end bound is previous end
            # or current index
            elif (index[end[i - 1]] - end_bound) * index_growth_sign <= 0:
                end[i] = i + 1
            else:
                end[i] = end[i - 1]
 
            # right endpoint is open
            if not right_closed and not center:
                end[i] -= 1
    return start, end