""" Define extension dtypes. """ from __future__ import annotations import re from typing import ( TYPE_CHECKING, Any, MutableMapping, cast, ) import numpy as np import pytz from pandas._libs import missing as libmissing from pandas._libs.interval import Interval from pandas._libs.properties import cache_readonly from pandas._libs.tslibs import ( BaseOffset, NaT, NaTType, Period, Timestamp, timezones, to_offset, tz_compare, ) from pandas._libs.tslibs.dtypes import ( PeriodDtypeBase, abbrev_to_npy_unit, ) from pandas._typing import ( Dtype, DtypeObj, Ordered, npt, type_t, ) from pandas.core.dtypes.base import ( ExtensionDtype, register_extension_dtype, ) from pandas.core.dtypes.generic import ( ABCCategoricalIndex, ABCIndex, ) from pandas.core.dtypes.inference import ( is_bool, is_list_like, ) if TYPE_CHECKING: from datetime import tzinfo import pyarrow from pandas import ( Categorical, Index, ) from pandas.core.arrays import ( BaseMaskedArray, DatetimeArray, IntervalArray, PandasArray, PeriodArray, ) str_type = str class PandasExtensionDtype(ExtensionDtype): """ A np.dtype duck-typed class, suitable for holding a custom dtype. THIS IS NOT A REAL NUMPY DTYPE """ type: Any kind: Any # The Any type annotations above are here only because mypy seems to have a # problem dealing with multiple inheritance from PandasExtensionDtype # and ExtensionDtype's @properties in the subclasses below. The kind and # type variables in those subclasses are explicitly typed below. subdtype = None str: str_type num = 100 shape: tuple[int, ...] = () itemsize = 8 base: DtypeObj | None = None isbuiltin = 0 isnative = 0 _cache_dtypes: dict[str_type, PandasExtensionDtype] = {} def __repr__(self) -> str_type: """ Return a string representation for a particular object. """ return str(self) def __hash__(self) -> int: raise NotImplementedError("sub-classes should implement an __hash__ method") def __getstate__(self) -> dict[str_type, Any]: # pickle support; we don't want to pickle the cache return {k: getattr(self, k, None) for k in self._metadata} @classmethod def reset_cache(cls) -> None: """clear the cache""" cls._cache_dtypes = {} class CategoricalDtypeType(type): """ the type of CategoricalDtype, this metaclass determines subclass ability """ @register_extension_dtype class CategoricalDtype(PandasExtensionDtype, ExtensionDtype): """ Type for categorical data with the categories and orderedness. Parameters ---------- categories : sequence, optional Must be unique, and must not contain any nulls. The categories are stored in an Index, and if an index is provided the dtype of that index will be used. ordered : bool or None, default False Whether or not this categorical is treated as a ordered categorical. None can be used to maintain the ordered value of existing categoricals when used in operations that combine categoricals, e.g. astype, and will resolve to False if there is no existing ordered to maintain. Attributes ---------- categories ordered Methods ------- None See Also -------- Categorical : Represent a categorical variable in classic R / S-plus fashion. Notes ----- This class is useful for specifying the type of a ``Categorical`` independent of the values. See :ref:`categorical.categoricaldtype` for more. Examples -------- >>> t = pd.CategoricalDtype(categories=['b', 'a'], ordered=True) >>> pd.Series(['a', 'b', 'a', 'c'], dtype=t) 0 a 1 b 2 a 3 NaN dtype: category Categories (2, object): ['b' < 'a'] An empty CategoricalDtype with a specific dtype can be created by providing an empty index. As follows, >>> pd.CategoricalDtype(pd.DatetimeIndex([])).categories.dtype dtype(' None: self._finalize(categories, ordered, fastpath=False) @classmethod def _from_fastpath( cls, categories=None, ordered: bool | None = None ) -> CategoricalDtype: self = cls.__new__(cls) self._finalize(categories, ordered, fastpath=True) return self @classmethod def _from_categorical_dtype( cls, dtype: CategoricalDtype, categories=None, ordered: Ordered = None ) -> CategoricalDtype: if categories is ordered is None: return dtype if categories is None: categories = dtype.categories if ordered is None: ordered = dtype.ordered return cls(categories, ordered) @classmethod def _from_values_or_dtype( cls, values=None, categories=None, ordered: bool | None = None, dtype: Dtype | None = None, ) -> CategoricalDtype: """ Construct dtype from the input parameters used in :class:`Categorical`. This constructor method specifically does not do the factorization step, if that is needed to find the categories. This constructor may therefore return ``CategoricalDtype(categories=None, ordered=None)``, which may not be useful. Additional steps may therefore have to be taken to create the final dtype. The return dtype is specified from the inputs in this prioritized order: 1. if dtype is a CategoricalDtype, return dtype 2. if dtype is the string 'category', create a CategoricalDtype from the supplied categories and ordered parameters, and return that. 3. if values is a categorical, use value.dtype, but override it with categories and ordered if either/both of those are not None. 4. if dtype is None and values is not a categorical, construct the dtype from categories and ordered, even if either of those is None. Parameters ---------- values : list-like, optional The list-like must be 1-dimensional. categories : list-like, optional Categories for the CategoricalDtype. ordered : bool, optional Designating if the categories are ordered. dtype : CategoricalDtype or the string "category", optional If ``CategoricalDtype``, cannot be used together with `categories` or `ordered`. Returns ------- CategoricalDtype Examples -------- >>> pd.CategoricalDtype._from_values_or_dtype() CategoricalDtype(categories=None, ordered=None) >>> pd.CategoricalDtype._from_values_or_dtype( ... categories=['a', 'b'], ordered=True ... ) CategoricalDtype(categories=['a', 'b'], ordered=True) >>> dtype1 = pd.CategoricalDtype(['a', 'b'], ordered=True) >>> dtype2 = pd.CategoricalDtype(['x', 'y'], ordered=False) >>> c = pd.Categorical([0, 1], dtype=dtype1, fastpath=True) >>> pd.CategoricalDtype._from_values_or_dtype( ... c, ['x', 'y'], ordered=True, dtype=dtype2 ... ) Traceback (most recent call last): ... ValueError: Cannot specify `categories` or `ordered` together with `dtype`. The supplied dtype takes precedence over values' dtype: >>> pd.CategoricalDtype._from_values_or_dtype(c, dtype=dtype2) CategoricalDtype(categories=['x', 'y'], ordered=False) """ if dtype is not None: # The dtype argument takes precedence over values.dtype (if any) if isinstance(dtype, str): if dtype == "category": if ordered is None and cls.is_dtype(values): # GH#49309 preserve orderedness ordered = values.dtype.ordered dtype = CategoricalDtype(categories, ordered) else: raise ValueError(f"Unknown dtype {repr(dtype)}") elif categories is not None or ordered is not None: raise ValueError( "Cannot specify `categories` or `ordered` together with `dtype`." ) elif not isinstance(dtype, CategoricalDtype): raise ValueError(f"Cannot not construct CategoricalDtype from {dtype}") elif cls.is_dtype(values): # If no "dtype" was passed, use the one from "values", but honor # the "ordered" and "categories" arguments dtype = values.dtype._from_categorical_dtype( values.dtype, categories, ordered ) else: # If dtype=None and values is not categorical, create a new dtype. # Note: This could potentially have categories=None and # ordered=None. dtype = CategoricalDtype(categories, ordered) return cast(CategoricalDtype, dtype) @classmethod def construct_from_string(cls, string: str_type) -> CategoricalDtype: """ Construct a CategoricalDtype from a string. Parameters ---------- string : str Must be the string "category" in order to be successfully constructed. Returns ------- CategoricalDtype Instance of the dtype. Raises ------ TypeError If a CategoricalDtype cannot be constructed from the input. """ if not isinstance(string, str): raise TypeError( f"'construct_from_string' expects a string, got {type(string)}" ) if string != cls.name: raise TypeError(f"Cannot construct a 'CategoricalDtype' from '{string}'") # need ordered=None to ensure that operations specifying dtype="category" don't # override the ordered value for existing categoricals return cls(ordered=None) def _finalize(self, categories, ordered: Ordered, fastpath: bool = False) -> None: if ordered is not None: self.validate_ordered(ordered) if categories is not None: categories = self.validate_categories(categories, fastpath=fastpath) self._categories = categories self._ordered = ordered def __setstate__(self, state: MutableMapping[str_type, Any]) -> None: # for pickle compat. __get_state__ is defined in the # PandasExtensionDtype superclass and uses the public properties to # pickle -> need to set the settable private ones here (see GH26067) self._categories = state.pop("categories", None) self._ordered = state.pop("ordered", False) def __hash__(self) -> int: # _hash_categories returns a uint64, so use the negative # space for when we have unknown categories to avoid a conflict if self.categories is None: if self.ordered: return -1 else: return -2 # We *do* want to include the real self.ordered here return int(self._hash_categories) def __eq__(self, other: Any) -> bool: """ Rules for CDT equality: 1) Any CDT is equal to the string 'category' 2) Any CDT is equal to itself 3) Any CDT is equal to a CDT with categories=None regardless of ordered 4) A CDT with ordered=True is only equal to another CDT with ordered=True and identical categories in the same order 5) A CDT with ordered={False, None} is only equal to another CDT with ordered={False, None} and identical categories, but same order is not required. There is no distinction between False/None. 6) Any other comparison returns False """ if isinstance(other, str): return other == self.name elif other is self: return True elif not (hasattr(other, "ordered") and hasattr(other, "categories")): return False elif self.categories is None or other.categories is None: # For non-fully-initialized dtypes, these are only equal to # - the string "category" (handled above) # - other CategoricalDtype with categories=None return self.categories is other.categories elif self.ordered or other.ordered: # At least one has ordered=True; equal if both have ordered=True # and the same values for categories in the same order. return (self.ordered == other.ordered) and self.categories.equals( other.categories ) else: # Neither has ordered=True; equal if both have the same categories, # but same order is not necessary. There is no distinction between # ordered=False and ordered=None: CDT(., False) and CDT(., None) # will be equal if they have the same categories. left = self.categories right = other.categories # GH#36280 the ordering of checks here is for performance if not left.dtype == right.dtype: return False if len(left) != len(right): return False if self.categories.equals(other.categories): # Check and see if they happen to be identical categories return True if left.dtype != object: # Faster than calculating hash indexer = left.get_indexer(right) # Because left and right have the same length and are unique, # `indexer` not having any -1s implies that there is a # bijection between `left` and `right`. return (indexer != -1).all() # With object-dtype we need a comparison that identifies # e.g. int(2) as distinct from float(2) return hash(self) == hash(other) def __repr__(self) -> str_type: if self.categories is None: data = "None" else: data = self.categories._format_data(name=type(self).__name__) if data is None: # self.categories is RangeIndex data = str(self.categories._range) data = data.rstrip(", ") return f"CategoricalDtype(categories={data}, ordered={self.ordered})" @cache_readonly def _hash_categories(self) -> int: from pandas.core.util.hashing import ( combine_hash_arrays, hash_array, hash_tuples, ) categories = self.categories ordered = self.ordered if len(categories) and isinstance(categories[0], tuple): # assumes if any individual category is a tuple, then all our. ATM # I don't really want to support just some of the categories being # tuples. cat_list = list(categories) # breaks if a np.array of categories cat_array = hash_tuples(cat_list) else: if categories.dtype == "O" and len({type(x) for x in categories}) != 1: # TODO: hash_array doesn't handle mixed types. It casts # everything to a str first, which means we treat # {'1', '2'} the same as {'1', 2} # find a better solution hashed = hash((tuple(categories), ordered)) return hashed if DatetimeTZDtype.is_dtype(categories.dtype): # Avoid future warning. categories = categories.view("datetime64[ns]") cat_array = hash_array(np.asarray(categories), categorize=False) if ordered: cat_array = np.vstack( [cat_array, np.arange(len(cat_array), dtype=cat_array.dtype)] ) else: cat_array = np.array([cat_array]) combined_hashed = combine_hash_arrays(iter(cat_array), num_items=len(cat_array)) return np.bitwise_xor.reduce(combined_hashed) @classmethod def construct_array_type(cls) -> type_t[Categorical]: """ Return the array type associated with this dtype. Returns ------- type """ from pandas import Categorical return Categorical @staticmethod def validate_ordered(ordered: Ordered) -> None: """ Validates that we have a valid ordered parameter. If it is not a boolean, a TypeError will be raised. Parameters ---------- ordered : object The parameter to be verified. Raises ------ TypeError If 'ordered' is not a boolean. """ if not is_bool(ordered): raise TypeError("'ordered' must either be 'True' or 'False'") @staticmethod def validate_categories(categories, fastpath: bool = False) -> Index: """ Validates that we have good categories Parameters ---------- categories : array-like fastpath : bool Whether to skip nan and uniqueness checks Returns ------- categories : Index """ from pandas.core.indexes.base import Index if not fastpath and not is_list_like(categories): raise TypeError( f"Parameter 'categories' must be list-like, was {repr(categories)}" ) if not isinstance(categories, ABCIndex): categories = Index._with_infer(categories, tupleize_cols=False) if not fastpath: if categories.hasnans: raise ValueError("Categorical categories cannot be null") if not categories.is_unique: raise ValueError("Categorical categories must be unique") if isinstance(categories, ABCCategoricalIndex): categories = categories.categories return categories def update_dtype(self, dtype: str_type | CategoricalDtype) -> CategoricalDtype: """ Returns a CategoricalDtype with categories and ordered taken from dtype if specified, otherwise falling back to self if unspecified Parameters ---------- dtype : CategoricalDtype Returns ------- new_dtype : CategoricalDtype """ if isinstance(dtype, str) and dtype == "category": # dtype='category' should not change anything return self elif not self.is_dtype(dtype): raise ValueError( f"a CategoricalDtype must be passed to perform an update, " f"got {repr(dtype)}" ) else: # from here on, dtype is a CategoricalDtype dtype = cast(CategoricalDtype, dtype) # update categories/ordered unless they've been explicitly passed as None new_categories = ( dtype.categories if dtype.categories is not None else self.categories ) new_ordered = dtype.ordered if dtype.ordered is not None else self.ordered return CategoricalDtype(new_categories, new_ordered) @property def categories(self) -> Index: """ An ``Index`` containing the unique categories allowed. """ return self._categories @property def ordered(self) -> Ordered: """ Whether the categories have an ordered relationship. """ return self._ordered @property def _is_boolean(self) -> bool: from pandas.core.dtypes.common import is_bool_dtype return is_bool_dtype(self.categories) def _get_common_dtype(self, dtypes: list[DtypeObj]) -> DtypeObj | None: from pandas.core.arrays.sparse import SparseDtype # check if we have all categorical dtype with identical categories if all(isinstance(x, CategoricalDtype) for x in dtypes): first = dtypes[0] if all(first == other for other in dtypes[1:]): return first # special case non-initialized categorical # TODO we should figure out the expected return value in general non_init_cats = [ isinstance(x, CategoricalDtype) and x.categories is None for x in dtypes ] if all(non_init_cats): return self elif any(non_init_cats): return None # categorical is aware of Sparse -> extract sparse subdtypes dtypes = [x.subtype if isinstance(x, SparseDtype) else x for x in dtypes] # extract the categories' dtype non_cat_dtypes = [ x.categories.dtype if isinstance(x, CategoricalDtype) else x for x in dtypes ] # TODO should categorical always give an answer? from pandas.core.dtypes.cast import find_common_type return find_common_type(non_cat_dtypes) @register_extension_dtype class DatetimeTZDtype(PandasExtensionDtype): """ An ExtensionDtype for timezone-aware datetime data. **This is not an actual numpy dtype**, but a duck type. Parameters ---------- unit : str, default "ns" The precision of the datetime data. Currently limited to ``"ns"``. tz : str, int, or datetime.tzinfo The timezone. Attributes ---------- unit tz Methods ------- None Raises ------ pytz.UnknownTimeZoneError When the requested timezone cannot be found. Examples -------- >>> pd.DatetimeTZDtype(tz='UTC') datetime64[ns, UTC] >>> pd.DatetimeTZDtype(tz='dateutil/US/Central') datetime64[ns, tzfile('/usr/share/zoneinfo/US/Central')] """ type: type[Timestamp] = Timestamp kind: str_type = "M" num = 101 base = np.dtype("M8[ns]") # TODO: depend on reso? _metadata = ("unit", "tz") _match = re.compile(r"(datetime64|M8)\[(?P.+), (?P.+)\]") _cache_dtypes: dict[str_type, PandasExtensionDtype] = {} @property def na_value(self) -> NaTType: return NaT # error: Signature of "str" incompatible with supertype "PandasExtensionDtype" @cache_readonly def str(self) -> str: # type: ignore[override] return f"|M8[{self.unit}]" def __init__(self, unit: str_type | DatetimeTZDtype = "ns", tz=None) -> None: if isinstance(unit, DatetimeTZDtype): # error: "str" has no attribute "tz" unit, tz = unit.unit, unit.tz # type: ignore[attr-defined] if unit != "ns": if isinstance(unit, str) and tz is None: # maybe a string like datetime64[ns, tz], which we support for # now. result = type(self).construct_from_string(unit) unit = result.unit tz = result.tz msg = ( f"Passing a dtype alias like 'datetime64[ns, {tz}]' " "to DatetimeTZDtype is no longer supported. Use " "'DatetimeTZDtype.construct_from_string()' instead." ) raise ValueError(msg) if unit not in ["s", "ms", "us", "ns"]: raise ValueError("DatetimeTZDtype only supports s, ms, us, ns units") if tz: tz = timezones.maybe_get_tz(tz) tz = timezones.tz_standardize(tz) elif tz is not None: raise pytz.UnknownTimeZoneError(tz) if tz is None: raise TypeError("A 'tz' is required.") self._unit = unit self._tz = tz @cache_readonly def _creso(self) -> int: """ The NPY_DATETIMEUNIT corresponding to this dtype's resolution. """ return abbrev_to_npy_unit(self.unit) @property def unit(self) -> str_type: """ The precision of the datetime data. """ return self._unit @property def tz(self) -> tzinfo: """ The timezone. """ return self._tz @classmethod def construct_array_type(cls) -> type_t[DatetimeArray]: """ Return the array type associated with this dtype. Returns ------- type """ from pandas.core.arrays import DatetimeArray return DatetimeArray @classmethod def construct_from_string(cls, string: str_type) -> DatetimeTZDtype: """ Construct a DatetimeTZDtype from a string. Parameters ---------- string : str The string alias for this DatetimeTZDtype. Should be formatted like ``datetime64[ns, ]``, where ```` is the timezone name. Examples -------- >>> DatetimeTZDtype.construct_from_string('datetime64[ns, UTC]') datetime64[ns, UTC] """ if not isinstance(string, str): raise TypeError( f"'construct_from_string' expects a string, got {type(string)}" ) msg = f"Cannot construct a 'DatetimeTZDtype' from '{string}'" match = cls._match.match(string) if match: d = match.groupdict() try: return cls(unit=d["unit"], tz=d["tz"]) except (KeyError, TypeError, ValueError) as err: # KeyError if maybe_get_tz tries and fails to get a # pytz timezone (actually pytz.UnknownTimeZoneError). # TypeError if we pass a nonsense tz; # ValueError if we pass a unit other than "ns" raise TypeError(msg) from err raise TypeError(msg) def __str__(self) -> str_type: return f"datetime64[{self.unit}, {self.tz}]" @property def name(self) -> str_type: """A string representation of the dtype.""" return str(self) def __hash__(self) -> int: # make myself hashable # TODO: update this. return hash(str(self)) def __eq__(self, other: Any) -> bool: if isinstance(other, str): if other.startswith("M8["): other = f"datetime64[{other[3:]}" return other == self.name return ( isinstance(other, DatetimeTZDtype) and self.unit == other.unit and tz_compare(self.tz, other.tz) ) def __setstate__(self, state) -> None: # for pickle compat. __get_state__ is defined in the # PandasExtensionDtype superclass and uses the public properties to # pickle -> need to set the settable private ones here (see GH26067) self._tz = state["tz"] self._unit = state["unit"] @register_extension_dtype class PeriodDtype(PeriodDtypeBase, PandasExtensionDtype): """ An ExtensionDtype for Period data. **This is not an actual numpy dtype**, but a duck type. Parameters ---------- freq : str or DateOffset The frequency of this PeriodDtype. Attributes ---------- freq Methods ------- None Examples -------- >>> pd.PeriodDtype(freq='D') period[D] >>> pd.PeriodDtype(freq=pd.offsets.MonthEnd()) period[M] """ type: type[Period] = Period kind: str_type = "O" str = "|O08" base = np.dtype("O") num = 102 _metadata = ("freq",) _match = re.compile(r"(P|p)eriod\[(?P.+)\]") _cache_dtypes: dict[str_type, PandasExtensionDtype] = {} def __new__(cls, freq=None): """ Parameters ---------- freq : frequency """ if isinstance(freq, PeriodDtype): return freq elif freq is None: # empty constructor for pickle compat # -10_000 corresponds to PeriodDtypeCode.UNDEFINED u = PeriodDtypeBase.__new__(cls, -10_000) u._freq = None return u if not isinstance(freq, BaseOffset): freq = cls._parse_dtype_strict(freq) try: return cls._cache_dtypes[freq.freqstr] except KeyError: dtype_code = freq._period_dtype_code u = PeriodDtypeBase.__new__(cls, dtype_code) u._freq = freq cls._cache_dtypes[freq.freqstr] = u return u def __reduce__(self): return type(self), (self.freq,) @property def freq(self): """ The frequency object of this PeriodDtype. """ return self._freq @classmethod def _parse_dtype_strict(cls, freq: str_type) -> BaseOffset: if isinstance(freq, str): # note: freq is already of type str! if freq.startswith("period[") or freq.startswith("Period["): m = cls._match.search(freq) if m is not None: freq = m.group("freq") freq_offset = to_offset(freq) if freq_offset is not None: return freq_offset raise ValueError("could not construct PeriodDtype") @classmethod def construct_from_string(cls, string: str_type) -> PeriodDtype: """ Strict construction from a string, raise a TypeError if not possible """ if ( isinstance(string, str) and (string.startswith("period[") or string.startswith("Period[")) or isinstance(string, BaseOffset) ): # do not parse string like U as period[U] # avoid tuple to be regarded as freq try: return cls(freq=string) except ValueError: pass if isinstance(string, str): msg = f"Cannot construct a 'PeriodDtype' from '{string}'" else: msg = f"'construct_from_string' expects a string, got {type(string)}" raise TypeError(msg) def __str__(self) -> str_type: return self.name @property def name(self) -> str_type: return f"period[{self.freq.freqstr}]" @property def na_value(self) -> NaTType: return NaT def __hash__(self) -> int: # make myself hashable return hash(str(self)) def __eq__(self, other: Any) -> bool: if isinstance(other, str): return other in [self.name, self.name.title()] elif isinstance(other, PeriodDtype): # For freqs that can be held by a PeriodDtype, this check is # equivalent to (and much faster than) self.freq == other.freq sfreq = self.freq ofreq = other.freq return ( sfreq.n == ofreq.n and sfreq._period_dtype_code == ofreq._period_dtype_code ) return False def __ne__(self, other: Any) -> bool: return not self.__eq__(other) def __setstate__(self, state) -> None: # for pickle compat. __getstate__ is defined in the # PandasExtensionDtype superclass and uses the public properties to # pickle -> need to set the settable private ones here (see GH26067) self._freq = state["freq"] @classmethod def is_dtype(cls, dtype: object) -> bool: """ Return a boolean if we if the passed type is an actual dtype that we can match (via string or type) """ if isinstance(dtype, str): # PeriodDtype can be instantiated from freq string like "U", # but doesn't regard freq str like "U" as dtype. if dtype.startswith("period[") or dtype.startswith("Period["): try: return cls._parse_dtype_strict(dtype) is not None except ValueError: return False else: return False return super().is_dtype(dtype) @classmethod def construct_array_type(cls) -> type_t[PeriodArray]: """ Return the array type associated with this dtype. Returns ------- type """ from pandas.core.arrays import PeriodArray return PeriodArray def __from_arrow__( self, array: pyarrow.Array | pyarrow.ChunkedArray ) -> PeriodArray: """ Construct PeriodArray from pyarrow Array/ChunkedArray. """ import pyarrow from pandas.core.arrays import PeriodArray from pandas.core.arrays.arrow._arrow_utils import ( pyarrow_array_to_numpy_and_mask, ) if isinstance(array, pyarrow.Array): chunks = [array] else: chunks = array.chunks results = [] for arr in chunks: data, mask = pyarrow_array_to_numpy_and_mask(arr, dtype=np.dtype(np.int64)) parr = PeriodArray(data.copy(), freq=self.freq, copy=False) # error: Invalid index type "ndarray[Any, dtype[bool_]]" for "PeriodArray"; # expected type "Union[int, Sequence[int], Sequence[bool], slice]" parr[~mask] = NaT # type: ignore[index] results.append(parr) if not results: return PeriodArray(np.array([], dtype="int64"), freq=self.freq, copy=False) return PeriodArray._concat_same_type(results) @register_extension_dtype class IntervalDtype(PandasExtensionDtype): """ An ExtensionDtype for Interval data. **This is not an actual numpy dtype**, but a duck type. Parameters ---------- subtype : str, np.dtype The dtype of the Interval bounds. Attributes ---------- subtype Methods ------- None Examples -------- >>> pd.IntervalDtype(subtype='int64', closed='both') interval[int64, both] """ name = "interval" kind: str_type = "O" str = "|O08" base = np.dtype("O") num = 103 _metadata = ( "subtype", "closed", ) _match = re.compile( r"(I|i)nterval\[(?P[^,]+(\[.+\])?)" r"(, (?P(right|left|both|neither)))?\]" ) _cache_dtypes: dict[str_type, PandasExtensionDtype] = {} def __new__(cls, subtype=None, closed: str_type | None = None): from pandas.core.dtypes.common import ( is_string_dtype, pandas_dtype, ) if closed is not None and closed not in {"right", "left", "both", "neither"}: raise ValueError("closed must be one of 'right', 'left', 'both', 'neither'") if isinstance(subtype, IntervalDtype): if closed is not None and closed != subtype.closed: raise ValueError( "dtype.closed and 'closed' do not match. " "Try IntervalDtype(dtype.subtype, closed) instead." ) return subtype elif subtype is None: # we are called as an empty constructor # generally for pickle compat u = object.__new__(cls) u._subtype = None u._closed = closed return u elif isinstance(subtype, str) and subtype.lower() == "interval": subtype = None else: if isinstance(subtype, str): m = cls._match.search(subtype) if m is not None: gd = m.groupdict() subtype = gd["subtype"] if gd.get("closed", None) is not None: if closed is not None: if closed != gd["closed"]: raise ValueError( "'closed' keyword does not match value " "specified in dtype string" ) closed = gd["closed"] try: subtype = pandas_dtype(subtype) except TypeError as err: raise TypeError("could not construct IntervalDtype") from err if CategoricalDtype.is_dtype(subtype) or is_string_dtype(subtype): # GH 19016 msg = ( "category, object, and string subtypes are not supported " "for IntervalDtype" ) raise TypeError(msg) key = f"{subtype}{closed}" try: return cls._cache_dtypes[key] except KeyError: u = object.__new__(cls) u._subtype = subtype u._closed = closed cls._cache_dtypes[key] = u return u @cache_readonly def _can_hold_na(self) -> bool: subtype = self._subtype if subtype is None: # partially-initialized raise NotImplementedError( "_can_hold_na is not defined for partially-initialized IntervalDtype" ) if subtype.kind in ["i", "u"]: return False return True @property def closed(self): return self._closed @property def subtype(self): """ The dtype of the Interval bounds. """ return self._subtype @classmethod def construct_array_type(cls) -> type[IntervalArray]: """ Return the array type associated with this dtype. Returns ------- type """ from pandas.core.arrays import IntervalArray return IntervalArray @classmethod def construct_from_string(cls, string: str_type) -> IntervalDtype: """ attempt to construct this type from a string, raise a TypeError if its not possible """ if not isinstance(string, str): raise TypeError( f"'construct_from_string' expects a string, got {type(string)}" ) if string.lower() == "interval" or cls._match.search(string) is not None: return cls(string) msg = ( f"Cannot construct a 'IntervalDtype' from '{string}'.\n\n" "Incorrectly formatted string passed to constructor. " "Valid formats include Interval or Interval[dtype] " "where dtype is numeric, datetime, or timedelta" ) raise TypeError(msg) @property def type(self) -> type[Interval]: return Interval def __str__(self) -> str_type: if self.subtype is None: return "interval" if self.closed is None: # Only partially initialized GH#38394 return f"interval[{self.subtype}]" return f"interval[{self.subtype}, {self.closed}]" def __hash__(self) -> int: # make myself hashable return hash(str(self)) def __eq__(self, other: Any) -> bool: if isinstance(other, str): return other.lower() in (self.name.lower(), str(self).lower()) elif not isinstance(other, IntervalDtype): return False elif self.subtype is None or other.subtype is None: # None should match any subtype return True elif self.closed != other.closed: return False else: from pandas.core.dtypes.common import is_dtype_equal return is_dtype_equal(self.subtype, other.subtype) def __setstate__(self, state) -> None: # for pickle compat. __get_state__ is defined in the # PandasExtensionDtype superclass and uses the public properties to # pickle -> need to set the settable private ones here (see GH26067) self._subtype = state["subtype"] # backward-compat older pickles won't have "closed" key self._closed = state.pop("closed", None) @classmethod def is_dtype(cls, dtype: object) -> bool: """ Return a boolean if we if the passed type is an actual dtype that we can match (via string or type) """ if isinstance(dtype, str): if dtype.lower().startswith("interval"): try: return cls.construct_from_string(dtype) is not None except (ValueError, TypeError): return False else: return False return super().is_dtype(dtype) def __from_arrow__( self, array: pyarrow.Array | pyarrow.ChunkedArray ) -> IntervalArray: """ Construct IntervalArray from pyarrow Array/ChunkedArray. """ import pyarrow from pandas.core.arrays import IntervalArray if isinstance(array, pyarrow.Array): chunks = [array] else: chunks = array.chunks results = [] for arr in chunks: if isinstance(arr, pyarrow.ExtensionArray): arr = arr.storage left = np.asarray(arr.field("left"), dtype=self.subtype) right = np.asarray(arr.field("right"), dtype=self.subtype) iarr = IntervalArray.from_arrays(left, right, closed=self.closed) results.append(iarr) if not results: return IntervalArray.from_arrays( np.array([], dtype=self.subtype), np.array([], dtype=self.subtype), closed=self.closed, ) return IntervalArray._concat_same_type(results) def _get_common_dtype(self, dtypes: list[DtypeObj]) -> DtypeObj | None: if not all(isinstance(x, IntervalDtype) for x in dtypes): return None closed = cast("IntervalDtype", dtypes[0]).closed if not all(cast("IntervalDtype", x).closed == closed for x in dtypes): return np.dtype(object) from pandas.core.dtypes.cast import find_common_type common = find_common_type([cast("IntervalDtype", x).subtype for x in dtypes]) if common == object: return np.dtype(object) return IntervalDtype(common, closed=closed) class PandasDtype(ExtensionDtype): """ A Pandas ExtensionDtype for NumPy dtypes. This is mostly for internal compatibility, and is not especially useful on its own. Parameters ---------- dtype : object Object to be converted to a NumPy data type object. See Also -------- numpy.dtype """ _metadata = ("_dtype",) def __init__(self, dtype: npt.DTypeLike | PandasDtype | None) -> None: if isinstance(dtype, PandasDtype): # make constructor univalent dtype = dtype.numpy_dtype self._dtype = np.dtype(dtype) def __repr__(self) -> str: return f"PandasDtype({repr(self.name)})" @property def numpy_dtype(self) -> np.dtype: """ The NumPy dtype this PandasDtype wraps. """ return self._dtype @property def name(self) -> str: """ A bit-width name for this data-type. """ return self._dtype.name @property def type(self) -> type[np.generic]: """ The type object used to instantiate a scalar of this NumPy data-type. """ return self._dtype.type @property def _is_numeric(self) -> bool: # exclude object, str, unicode, void. return self.kind in set("biufc") @property def _is_boolean(self) -> bool: return self.kind == "b" @classmethod def construct_from_string(cls, string: str) -> PandasDtype: try: dtype = np.dtype(string) except TypeError as err: if not isinstance(string, str): msg = f"'construct_from_string' expects a string, got {type(string)}" else: msg = f"Cannot construct a 'PandasDtype' from '{string}'" raise TypeError(msg) from err return cls(dtype) @classmethod def construct_array_type(cls) -> type_t[PandasArray]: """ Return the array type associated with this dtype. Returns ------- type """ from pandas.core.arrays import PandasArray return PandasArray @property def kind(self) -> str: """ A character code (one of 'biufcmMOSUV') identifying the general kind of data. """ return self._dtype.kind @property def itemsize(self) -> int: """ The element size of this data-type object. """ return self._dtype.itemsize class BaseMaskedDtype(ExtensionDtype): """ Base class for dtypes for BaseMaskedArray subclasses. """ name: str base = None type: type @property def na_value(self) -> libmissing.NAType: return libmissing.NA @cache_readonly def numpy_dtype(self) -> np.dtype: """Return an instance of our numpy dtype""" return np.dtype(self.type) @cache_readonly def kind(self) -> str: return self.numpy_dtype.kind @cache_readonly def itemsize(self) -> int: """Return the number of bytes in this dtype""" return self.numpy_dtype.itemsize @classmethod def construct_array_type(cls) -> type_t[BaseMaskedArray]: """ Return the array type associated with this dtype. Returns ------- type """ raise NotImplementedError @classmethod def from_numpy_dtype(cls, dtype: np.dtype) -> BaseMaskedDtype: """ Construct the MaskedDtype corresponding to the given numpy dtype. """ if dtype.kind == "b": from pandas.core.arrays.boolean import BooleanDtype return BooleanDtype() elif dtype.kind in ["i", "u"]: from pandas.core.arrays.integer import INT_STR_TO_DTYPE return INT_STR_TO_DTYPE[dtype.name] elif dtype.kind == "f": from pandas.core.arrays.floating import FLOAT_STR_TO_DTYPE return FLOAT_STR_TO_DTYPE[dtype.name] else: raise NotImplementedError(dtype) def _get_common_dtype(self, dtypes: list[DtypeObj]) -> DtypeObj | None: # We unwrap any masked dtypes, find the common dtype we would use # for that, then re-mask the result. from pandas.core.dtypes.cast import find_common_type new_dtype = find_common_type( [ dtype.numpy_dtype if isinstance(dtype, BaseMaskedDtype) else dtype for dtype in dtypes ] ) if not isinstance(new_dtype, np.dtype): # If we ever support e.g. Masked[DatetimeArray] then this will change return None try: return type(self).from_numpy_dtype(new_dtype) except (KeyError, NotImplementedError): return None