|
|
""" |
|
|
Comprehensive frequency management module for time series forecasting. |
|
|
|
|
|
This module centralizes all frequency-related functionality including: |
|
|
- Frequency enum with helper methods |
|
|
- Frequency parsing and validation |
|
|
- Pandas frequency string conversion |
|
|
- Safety checks for date ranges |
|
|
- Frequency selection utilities |
|
|
- All frequency constants and mappings |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import re |
|
|
from enum import Enum |
|
|
from typing import Dict, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from numpy.random import Generator |
|
|
|
|
|
from src.data.constants import BASE_END_DATE, BASE_START_DATE, MAX_YEARS |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class Frequency(Enum): |
|
|
""" |
|
|
Enhanced Frequency enum with comprehensive helper methods. |
|
|
|
|
|
Each frequency includes methods for pandas conversion, safety checks, |
|
|
and other frequency-specific operations. |
|
|
""" |
|
|
|
|
|
A = "A" |
|
|
Q = "Q" |
|
|
M = "M" |
|
|
W = "W" |
|
|
D = "D" |
|
|
H = "h" |
|
|
S = "s" |
|
|
T1 = "1min" |
|
|
T5 = "5min" |
|
|
T10 = "10min" |
|
|
T15 = "15min" |
|
|
T30 = "30min" |
|
|
|
|
|
def to_pandas_freq(self, for_date_range: bool = True) -> str: |
|
|
""" |
|
|
Convert to pandas frequency string. |
|
|
|
|
|
Args: |
|
|
for_date_range: If True, use strings suitable for pd.date_range(). |
|
|
If False, use strings suitable for pd.PeriodIndex(). |
|
|
|
|
|
Returns: |
|
|
Pandas frequency string |
|
|
""" |
|
|
base, prefix, _ = FREQUENCY_MAPPING[self] |
|
|
|
|
|
|
|
|
if for_date_range: |
|
|
|
|
|
if self == Frequency.M: |
|
|
return "ME" |
|
|
elif self == Frequency.A: |
|
|
return "YE" |
|
|
elif self == Frequency.Q: |
|
|
return "QE" |
|
|
else: |
|
|
|
|
|
if self == Frequency.M: |
|
|
return "M" |
|
|
elif self == Frequency.A: |
|
|
return "Y" |
|
|
elif self == Frequency.Q: |
|
|
return "Q" |
|
|
|
|
|
|
|
|
if prefix: |
|
|
return f"{prefix}{base}" |
|
|
else: |
|
|
return base |
|
|
|
|
|
def to_pandas_offset(self) -> str: |
|
|
"""Get pandas offset string for time delta calculations.""" |
|
|
return FREQUENCY_TO_OFFSET[self] |
|
|
|
|
|
def get_days_per_period(self) -> float: |
|
|
"""Get approximate days per period for this frequency.""" |
|
|
_, _, days = FREQUENCY_MAPPING[self] |
|
|
return days |
|
|
|
|
|
def get_max_safe_length(self) -> int: |
|
|
"""Get maximum safe sequence length to prevent timestamp overflow.""" |
|
|
return ALL_FREQUENCY_MAX_LENGTHS.get(self, float("inf")) |
|
|
|
|
|
def is_high_frequency(self) -> bool: |
|
|
"""Check if this is a high frequency (minute/second level).""" |
|
|
return self in [ |
|
|
Frequency.S, |
|
|
Frequency.T1, |
|
|
Frequency.T5, |
|
|
Frequency.T10, |
|
|
Frequency.T15, |
|
|
Frequency.T30, |
|
|
] |
|
|
|
|
|
def is_low_frequency(self) -> bool: |
|
|
"""Check if this is a low frequency (annual/quarterly/monthly).""" |
|
|
return self in [Frequency.A, Frequency.Q, Frequency.M] |
|
|
|
|
|
def get_seasonality(self) -> int: |
|
|
"""Get typical seasonality for this frequency.""" |
|
|
seasonality_map = { |
|
|
Frequency.S: 3600, |
|
|
Frequency.T1: 60, |
|
|
Frequency.T5: 12, |
|
|
Frequency.T10: 6, |
|
|
Frequency.T15: 4, |
|
|
Frequency.T30: 2, |
|
|
Frequency.H: 24, |
|
|
Frequency.D: 7, |
|
|
Frequency.W: 52, |
|
|
Frequency.M: 12, |
|
|
Frequency.Q: 4, |
|
|
Frequency.A: 1, |
|
|
} |
|
|
return seasonality_map.get(self, 1) |
|
|
|
|
|
def get_gift_eval_weight(self) -> float: |
|
|
"""Get GIFT eval dataset frequency weight.""" |
|
|
return GIFT_EVAL_FREQUENCY_WEIGHTS.get(self, 0.1) |
|
|
|
|
|
def get_length_range(self) -> Tuple[int, int, int, int]: |
|
|
"""Get (min_length, max_length, optimal_start, optimal_end) for this frequency.""" |
|
|
return GIFT_EVAL_LENGTH_RANGES.get(self, (50, 1000, 100, 500)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FREQUENCY_MAPPING: Dict[Frequency, Tuple[str, str, float]] = { |
|
|
Frequency.A: ( |
|
|
"YE", |
|
|
"", |
|
|
365.25, |
|
|
), |
|
|
Frequency.Q: ("Q", "", 91.3125), |
|
|
Frequency.M: ("M", "", 30.4375), |
|
|
Frequency.W: ("W", "", 7), |
|
|
Frequency.D: ("D", "", 1), |
|
|
Frequency.H: ("h", "", 1 / 24), |
|
|
Frequency.S: ("s", "", 1 / 86400), |
|
|
Frequency.T1: ("min", "1", 1 / 1440), |
|
|
Frequency.T5: ("min", "5", 1 / 288), |
|
|
Frequency.T10: ("min", "10", 1 / 144), |
|
|
Frequency.T15: ("min", "15", 1 / 96), |
|
|
Frequency.T30: ("min", "30", 1 / 48), |
|
|
} |
|
|
|
|
|
|
|
|
FREQUENCY_TO_OFFSET: Dict[Frequency, str] = { |
|
|
Frequency.A: "AS", |
|
|
Frequency.Q: "QS", |
|
|
Frequency.M: "MS", |
|
|
Frequency.W: "W", |
|
|
Frequency.D: "D", |
|
|
Frequency.H: "H", |
|
|
Frequency.T1: "1T", |
|
|
Frequency.T5: "5T", |
|
|
Frequency.T10: "10T", |
|
|
Frequency.T15: "15T", |
|
|
Frequency.T30: "30T", |
|
|
Frequency.S: "S", |
|
|
} |
|
|
|
|
|
|
|
|
SHORT_FREQUENCY_MAX_LENGTHS = { |
|
|
Frequency.A: MAX_YEARS, |
|
|
Frequency.Q: MAX_YEARS * 4, |
|
|
Frequency.M: MAX_YEARS * 12, |
|
|
Frequency.W: int(MAX_YEARS * 52.1775), |
|
|
Frequency.D: int(MAX_YEARS * 365.2425), |
|
|
} |
|
|
|
|
|
HIGH_FREQUENCY_MAX_LENGTHS = { |
|
|
Frequency.H: int(MAX_YEARS * 365.2425 * 24), |
|
|
Frequency.S: int(MAX_YEARS * 365.2425 * 24 * 60 * 60), |
|
|
Frequency.T1: int(MAX_YEARS * 365.2425 * 24 * 60), |
|
|
Frequency.T5: int(MAX_YEARS * 365.2425 * 24 * 12), |
|
|
Frequency.T10: int(MAX_YEARS * 365.2425 * 24 * 6), |
|
|
Frequency.T15: int(MAX_YEARS * 365.2425 * 24 * 4), |
|
|
Frequency.T30: int(MAX_YEARS * 365.2425 * 24 * 2), |
|
|
} |
|
|
|
|
|
|
|
|
ALL_FREQUENCY_MAX_LENGTHS = { |
|
|
**SHORT_FREQUENCY_MAX_LENGTHS, |
|
|
**HIGH_FREQUENCY_MAX_LENGTHS, |
|
|
} |
|
|
|
|
|
|
|
|
GIFT_EVAL_FREQUENCY_WEIGHTS: Dict[Frequency, float] = { |
|
|
Frequency.H: 25.0, |
|
|
Frequency.D: 23.4, |
|
|
Frequency.W: 12.9, |
|
|
Frequency.T15: 9.7, |
|
|
Frequency.T5: 9.7, |
|
|
Frequency.M: 7.3, |
|
|
Frequency.T10: 4.8, |
|
|
Frequency.S: 4.8, |
|
|
Frequency.T1: 1.6, |
|
|
Frequency.Q: 0.8, |
|
|
Frequency.A: 0.8, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
GIFT_EVAL_LENGTH_RANGES: Dict[Frequency, Tuple[int, int, int, int]] = { |
|
|
|
|
|
Frequency.A: (25, 100, 30, 70), |
|
|
Frequency.Q: (25, 150, 50, 120), |
|
|
Frequency.M: (40, 1000, 100, 600), |
|
|
Frequency.W: (50, 3500, 100, 1500), |
|
|
|
|
|
Frequency.D: (150, 25000, 300, 7000), |
|
|
Frequency.H: (600, 35000, 700, 17000), |
|
|
|
|
|
Frequency.T1: (200, 2500, 1200, 1800), |
|
|
Frequency.S: (7500, 9500, 7900, 9000), |
|
|
Frequency.T15: (1000, 140000, 50000, 130000), |
|
|
Frequency.T5: (200, 105000, 20000, 95000), |
|
|
Frequency.T10: (40000, 55000, 47000, 52000), |
|
|
Frequency.T30: (100, 50000, 10000, 40000), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_frequency(freq_str: str) -> Frequency: |
|
|
""" |
|
|
Parse frequency string to Frequency enum, robust to variations. |
|
|
|
|
|
Handles various frequency string formats: |
|
|
- Standard: "A", "Q", "M", "W", "D", "H", "S" |
|
|
- Pandas-style: "A-DEC", "W-SUN", "QE-MAR" |
|
|
- Minutes: "5T", "10min", "1T" |
|
|
- Case variations: "a", "h", "D" |
|
|
|
|
|
Args: |
|
|
freq_str: The frequency string to parse (e.g., "5T", "W-SUN", "M") |
|
|
|
|
|
Returns: |
|
|
Corresponding Frequency enum member |
|
|
|
|
|
Raises: |
|
|
ValueError: If the frequency string is not supported |
|
|
""" |
|
|
|
|
|
|
|
|
minute_match = re.match(r"^(\d*)T$", freq_str, re.IGNORECASE) or re.match( |
|
|
r"^(\d*)min$", freq_str, re.IGNORECASE |
|
|
) |
|
|
if minute_match: |
|
|
multiplier = int(minute_match.group(1)) if minute_match.group(1) else 1 |
|
|
enum_key = f"T{multiplier}" |
|
|
try: |
|
|
return Frequency[enum_key] |
|
|
except KeyError: |
|
|
logger.warning( |
|
|
f"Unsupported minute frequency '{freq_str}' (multiplier: {multiplier}). " |
|
|
f"Falling back to '1min' ({Frequency.T1.value})." |
|
|
) |
|
|
return Frequency.T1 |
|
|
|
|
|
|
|
|
try: |
|
|
offset = pd.tseries.frequencies.to_offset(freq_str) |
|
|
standardized_freq = offset.name |
|
|
except Exception: |
|
|
standardized_freq = freq_str |
|
|
|
|
|
|
|
|
base_freq = standardized_freq.split("-")[0].upper() |
|
|
|
|
|
freq_map = { |
|
|
"A": Frequency.A, |
|
|
"Y": Frequency.A, |
|
|
"YE": Frequency.A, |
|
|
"Q": Frequency.Q, |
|
|
"QE": Frequency.Q, |
|
|
"M": Frequency.M, |
|
|
"ME": Frequency.M, |
|
|
"W": Frequency.W, |
|
|
"D": Frequency.D, |
|
|
"H": Frequency.H, |
|
|
"S": Frequency.S, |
|
|
} |
|
|
|
|
|
if base_freq in freq_map: |
|
|
return freq_map[base_freq] |
|
|
|
|
|
raise NotImplementedError(f"Frequency '{standardized_freq}' is not supported.") |
|
|
|
|
|
|
|
|
def validate_frequency_safety( |
|
|
start_date: np.datetime64, total_length: int, frequency: Frequency |
|
|
) -> bool: |
|
|
""" |
|
|
Check if start date and frequency combination is safe for pandas datetime operations. |
|
|
|
|
|
This function verifies that pd.date_range(start=start_date, periods=total_length, freq=freq_str) |
|
|
will not raise an OutOfBoundsDatetime error, accounting for pandas' datetime bounds |
|
|
(1677-09-21 to 2262-04-11) and realistic frequency limitations. |
|
|
|
|
|
Args: |
|
|
start_date: The proposed start date for the time series |
|
|
total_length: Total length of the time series |
|
|
frequency: The frequency of the time series |
|
|
|
|
|
Returns: |
|
|
True if the combination is safe, False otherwise |
|
|
""" |
|
|
try: |
|
|
|
|
|
freq_str = frequency.to_pandas_freq(for_date_range=True) |
|
|
|
|
|
|
|
|
start_pd = pd.Timestamp(start_date) |
|
|
|
|
|
|
|
|
if start_pd < pd.Timestamp.min or start_pd > pd.Timestamp.max: |
|
|
return False |
|
|
|
|
|
|
|
|
max_length = frequency.get_max_safe_length() |
|
|
if total_length > max_length: |
|
|
return False |
|
|
|
|
|
|
|
|
if frequency.is_low_frequency(): |
|
|
if frequency == Frequency.A and total_length > 500: |
|
|
return False |
|
|
elif frequency == Frequency.Q and total_length > 2000: |
|
|
return False |
|
|
elif frequency == Frequency.M and total_length > 6000: |
|
|
return False |
|
|
|
|
|
|
|
|
days_per_period = frequency.get_days_per_period() |
|
|
approx_days = total_length * days_per_period |
|
|
|
|
|
|
|
|
if frequency in [Frequency.A, Frequency.Q]: |
|
|
approx_days *= 1.1 |
|
|
|
|
|
end_date = start_pd + pd.Timedelta(days=approx_days) |
|
|
|
|
|
|
|
|
if end_date < pd.Timestamp.min or end_date > pd.Timestamp.max: |
|
|
return False |
|
|
|
|
|
|
|
|
pd.date_range(start=start_pd, periods=total_length, freq=freq_str) |
|
|
return True |
|
|
|
|
|
except (pd.errors.OutOfBoundsDatetime, OverflowError, ValueError): |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def select_safe_random_frequency(total_length: int, rng: Generator) -> Frequency: |
|
|
""" |
|
|
Select a random frequency suitable for a given total length of a time series, |
|
|
based on actual GIFT eval dataset patterns and distributions. |
|
|
|
|
|
The selection logic: |
|
|
1. Filters frequencies that can handle the given total_length |
|
|
2. Applies base weights derived from actual GIFT eval frequency distribution |
|
|
3. Strongly boosts frequencies that are in their optimal length ranges |
|
|
4. Handles edge cases gracefully with fallbacks |
|
|
|
|
|
Args: |
|
|
total_length: The total length of the time series (history + future) |
|
|
rng: A numpy random number generator instance |
|
|
|
|
|
Returns: |
|
|
A randomly selected frequency that matches GIFT eval patterns |
|
|
""" |
|
|
|
|
|
valid_frequencies = [] |
|
|
frequency_scores = [] |
|
|
|
|
|
for freq in Frequency: |
|
|
|
|
|
max_allowed = freq.get_max_safe_length() |
|
|
if total_length > max_allowed: |
|
|
continue |
|
|
|
|
|
|
|
|
min_len, max_len, optimal_start, optimal_end = freq.get_length_range() |
|
|
|
|
|
|
|
|
if total_length < min_len or total_length > max_len: |
|
|
continue |
|
|
|
|
|
valid_frequencies.append(freq) |
|
|
|
|
|
|
|
|
base_weight = freq.get_gift_eval_weight() |
|
|
|
|
|
|
|
|
if optimal_start <= total_length <= optimal_end: |
|
|
|
|
|
length_multiplier = 5.0 |
|
|
else: |
|
|
|
|
|
if total_length < optimal_start: |
|
|
|
|
|
distance_ratio = (optimal_start - total_length) / ( |
|
|
optimal_start - min_len |
|
|
) |
|
|
else: |
|
|
|
|
|
distance_ratio = (total_length - optimal_end) / (max_len - optimal_end) |
|
|
|
|
|
|
|
|
length_multiplier = 0.3 + 1.2 * (1.0 - distance_ratio) |
|
|
|
|
|
final_score = base_weight * length_multiplier |
|
|
frequency_scores.append(final_score) |
|
|
|
|
|
|
|
|
if not valid_frequencies: |
|
|
|
|
|
if total_length <= 100: |
|
|
|
|
|
fallback_order = [ |
|
|
Frequency.A, |
|
|
Frequency.Q, |
|
|
Frequency.M, |
|
|
Frequency.W, |
|
|
Frequency.D, |
|
|
] |
|
|
elif total_length <= 1000: |
|
|
|
|
|
fallback_order = [Frequency.D, Frequency.W, Frequency.H, Frequency.M] |
|
|
else: |
|
|
|
|
|
fallback_order = [Frequency.H, Frequency.D, Frequency.T15, Frequency.T5] |
|
|
|
|
|
for fallback_freq in fallback_order: |
|
|
max_allowed = fallback_freq.get_max_safe_length() |
|
|
if total_length <= max_allowed: |
|
|
return fallback_freq |
|
|
|
|
|
return Frequency.D |
|
|
|
|
|
if len(valid_frequencies) == 1: |
|
|
return valid_frequencies[0] |
|
|
|
|
|
|
|
|
scores = np.array(frequency_scores) |
|
|
probabilities = scores / scores.sum() |
|
|
|
|
|
return rng.choice(valid_frequencies, p=probabilities) |
|
|
|
|
|
|
|
|
def select_safe_start_date( |
|
|
total_length: int, |
|
|
frequency: Frequency, |
|
|
rng: Generator = np.random.default_rng(), |
|
|
max_retries: int = 10, |
|
|
) -> np.datetime64: |
|
|
""" |
|
|
Select a safe start date that ensures the entire time series (history + future) |
|
|
will not exceed pandas' datetime bounds. |
|
|
|
|
|
Args: |
|
|
total_length: Total length of the time series (history + future) |
|
|
frequency: Time series frequency |
|
|
rng: Random number generator instance |
|
|
max_retries: Maximum number of retry attempts |
|
|
|
|
|
Returns: |
|
|
A safe start date that prevents timestamp overflow |
|
|
|
|
|
Raises: |
|
|
ValueError: If no safe start date is found after max_retries or if the required |
|
|
time span exceeds the available date window |
|
|
""" |
|
|
days_per_period = frequency.get_days_per_period() |
|
|
|
|
|
|
|
|
total_days = total_length * days_per_period |
|
|
|
|
|
|
|
|
latest_safe_start = BASE_END_DATE - np.timedelta64(int(total_days), "D") |
|
|
earliest_safe_start = BASE_START_DATE |
|
|
|
|
|
|
|
|
if latest_safe_start < earliest_safe_start: |
|
|
available_days = ( |
|
|
(BASE_END_DATE - BASE_START_DATE).astype("timedelta64[D]").astype(int) |
|
|
) |
|
|
available_years = available_days / 365.25 |
|
|
required_years = total_days / 365.25 |
|
|
raise ValueError( |
|
|
f"Required time span ({required_years:.1f} years, {total_days:.0f} days) " |
|
|
f"exceeds available date window ({available_years:.1f} years, {available_days} days). " |
|
|
f"Reduce total_length ({total_length}) or extend the date window." |
|
|
) |
|
|
|
|
|
|
|
|
earliest_ns = earliest_safe_start.astype("datetime64[ns]").astype(np.int64) |
|
|
latest_ns = latest_safe_start.astype("datetime64[ns]").astype(np.int64) |
|
|
|
|
|
for _ in range(max_retries): |
|
|
|
|
|
random_ns = rng.integers(earliest_ns, latest_ns + 1) |
|
|
start_date = np.datetime64(int(random_ns), "ns") |
|
|
|
|
|
|
|
|
if validate_frequency_safety(start_date, total_length, frequency): |
|
|
return start_date |
|
|
|
|
|
|
|
|
return BASE_START_DATE |
|
|
|