Source code for greykite.common.features.timeseries_features

# BSD 2-CLAUSE LICENSE

# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:

# Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
# #ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# original author: Reza Hosseini, Albert Chen, Kaixu Yang, Sayan Patra
"""Functions to generate derived time features useful
in forecasting, such as growth, seasonality, holidays.
"""

import math
from datetime import datetime

import numpy as np
import pandas as pd
from holidays_ext import get_holidays as get_hdays
from scipy.special import expit

from greykite.common import constants as cst


[docs]def convert_date_to_continuous_time(dt): """Converts date to continuous time. Each year is one unit. Parameters ---------- dt : datetime object the date to convert Returns ------- conti_date : `float` the date represented in years """ year_length = datetime(dt.year, 12, 31).timetuple().tm_yday tt = dt.timetuple() return (dt.year + (tt.tm_yday - 1 + dt.hour / 24 + dt.minute / (24 * 60) + dt.second / (24 * 3600)) / float(year_length))
def get_default_origin_for_time_vars(df, time_col): """Sets default value for origin_for_time_vars Parameters ---------- df : `pandas.DataFrame` Training data. A data frame which includes the timestamp and value columns time_col : `str` The column name in `df` representing time for the time series data. Returns ------- dt_continuous_time : `float` The time origin used to create continuous variables for time """ date = pd.to_datetime(df[time_col].iloc[0]) return convert_date_to_continuous_time(date)
[docs]def build_time_features_df(dt, conti_year_origin): """This function gets a datetime-like vector and creates new columns containing temporal features useful for time series analysis and forecasting e.g. year, week of year, etc. Parameters ---------- dt : array-like (1-dimensional) A vector of datetime-like values conti_year_origin : float The origin used for creating continuous time. Returns ------- time_features_df : `pandas.DataFrame` Dataframe with the following time features. * "datetime": `datetime.datetime` object, a combination of date and a time * "date": `datetime.date` object, date with the format (year, month, day) * "year": integer, year of the date e.g. 2018 * "year_length": integer, number of days in the year e.g. 365 or 366 * "quarter": integer, quarter of the date, 1, 2, 3, 4 * "quarter_start": `pandas.DatetimeIndex`, date of beginning of the current quarter * "quarter_length": integer, number of days in the quarter, 90/91 for Q1, 91 for Q2, 92 for Q3 and Q4 * "month": integer, month of the year, January=1, February=2, ..., December=12 * "month_length": integer, number of days in the month, 28/ 29/ 30/ 31 * "woy": integer, ISO 8601 week of the year where a week starts from Monday, 1, 2, ..., 53 * "doy": integer, ordinal day of the year, 1, 2, ..., year_length * "doq": integer, ordinal day of the quarter, 1, 2, ..., quarter_length * "dom": integer, ordinal day of the month, 1, 2, ..., month_length * "dow": integer, day of the week, Monday=1, Tuesday=2, ..., Sunday=7 * "str_dow": string, day of the week as a string e.g. "1-Mon", "2-Tue", ..., "7-Sun" * "str_doy": string, day of the year e.g. "2020-03-20" for March 20, 2020 * "hour": integer, discrete hours of the datetime, 0, 1, ..., 23 * "minute": integer, minutes of the datetime, 0, 1, ..., 59 * "second": integer, seconds of the datetime, 0, 1, ..., 3599 * "year_month": string, (year, month) e.g. "2020-03" for March 2020 * "year_woy": string, (year, week of year) e.g. "2020_42" for 42nd week of 2020 * "month_dom": string, (month, day of month) e.g. "02/20" for February 20th * "year_woy_dow": string, (year, week of year, day of week) e.g. "2020_03_6" for Saturday of 3rd week in 2020 * "woy_dow": string, (week of year, day of week) e.g. "03_6" for Saturday of 3rd week * "dow_hr": string, (day of week, hour) e.g. "4_09" for 9am on Thursday * "dow_hr_min": string, (day of week, hour, minute) e.g. "4_09_10" for 9:10am on Thursday * "tod": float, time of day, continuous, 0.0 to 24.0 * "tow": float, time of week, continuous, 0.0 to 7.0 * "tom": float, standardized time of month, continuous, 0.0 to 1.0 * "toq": float, time of quarter, continuous, 0.0 to 1.0 * "toy": float, standardized time of year, continuous, 0.0 to 1.0 * "conti_year": float, year in continuous time, eg 2018.5 means middle of the year 2018 * "is_weekend": boolean, weekend indicator, True for weekend, else False * "dow_grouped": string, Monday-Thursday=1234-MTuWTh, Friday=5-Fri, Saturday=6-Sat, Sunday=7-Sun * "ct1": float, linear growth based on conti_year_origin, -infinity to infinity * "ct2": float, signed quadratic growth, -infinity to infinity * "ct3": float, signed cubic growth, -infinity to infinity * "ct_sqrt": float, signed square root growth, -infinity to infinity * "ct_root3": float, signed cubic root growth, -infinity to infinity """ dt = pd.DatetimeIndex(dt) if len(dt) == 0: raise ValueError("Length of dt cannot be zero.") # basic time features date = dt.date year = dt.year year_length = (365.0 + dt.is_leap_year) quarter = dt.quarter month = dt.month month_length = dt.days_in_month # finds first day of quarter quarter_start = pd.DatetimeIndex( dt.year.map(str) + "-" + (3 * quarter - 2).map(int).map(str) + "-01") next_quarter_start = dt + pd.tseries.offsets.QuarterBegin(startingMonth=1) quarter_length = (next_quarter_start - quarter_start).days # finds offset from first day of quarter (rounds down to nearest day) doq = ((dt - quarter_start) / pd.to_timedelta("1D") + 1).astype(int) # week of year, "woy", follows ISO 8601: # - Week 01 is the week with the year's first Thursday in it. # - A week begins with Monday and ends with Sunday. # So the week number of the week that overlaps both years, is 1, 52, or 53, # depending on whether it has more days in the previous year or new year. # - e.g. Jan 1st, 2018 is Monday. woy of first 8 days = [1, 1, 1, 1, 1, 1, 1, 2] # - e.g. Jan 1st, 2019 is Tuesday. woy of first 8 days = [1, 1, 1, 1, 1, 1, 2, 2] # - e.g. Jan 1st, 2020 is Wednesday. woy of first 8 days = [1, 1, 1, 1, 1, 2, 2, 2] # - e.g. Jan 1st, 2015 is Thursday. woy of first 8 days = [1, 1, 1, 1, 2, 2, 2, 2] # - e.g. Jan 1st, 2021 is Friday. woy of first 8 days = [53, 53, 53, 1, 1, 1, 1, 1] # - e.g. Jan 1st, 2022 is Saturday. woy of first 8 days = [52, 52, 1, 1, 1, 1, 1, 1] # - e.g. Jan 1st, 2023 is Sunday. woy of first 8 days = [52, 1, 1, 1, 1, 1, 1, 1] woy = dt.strftime("%V").astype(int) doy = dt.dayofyear dom = dt.day dow = dt.strftime("%u").astype(int) str_dow = dt.strftime("%u-%a") # e.g. 1-Mon, 2-Tue, ..., 7-Sun hour = dt.hour minute = dt.minute second = dt.second # grouped time feature str_doy = dt.strftime("%Y-%m-%d") # e.g. 2020-03-20 for March 20, 2020 year_month = dt.strftime("%Y-%m") # e.g. 2020-03 for March 2020 month_dom = dt.strftime("%m/%d") # e.g. 02/20 for February 20th year_woy = dt.strftime("%Y_%V") # e.g. 2020_42 for 42nd week of 2020 year_woy_dow = dt.strftime("%Y_%V_%u") # e.g. 2020_03_6 for Saturday of 3rd week in 2020 woy_dow = dt.strftime("%W_%u") # e.g. 03_6 for Saturday of 3rd week dow_hr = dt.strftime("%u_%H") # e.g. 4_09 for 9am on Thursday dow_hr_min = dt.strftime("%u_%H_%M") # e.g. 4_09_10 for 9:10am on Thursday # derived time features tod = hour + (minute / 60.0) + (second / 3600.0) tow = dow - 1 + (tod / 24.0) tom = (dom - 1 + (tod / 24.0)) / month_length toq = (doq - 1 + (tod / 24.0)) / quarter_length # time of year, continuous, 0.0 to 1.0. e.g. Jan 1, 12 am = 0/365, Jan 2, 12 am = 1/365, ... # To handle leap years, Feb 28 = 58/365 - 59/365, Feb 29 = 59/365, Mar 1 = 59/365 - 60/365 # offset term is nonzero only in leap years # doy_offset reduces doy by 1 from from Mar 1st (doy > 60) doy_offset = (year_length == 366) * 1.0 * (doy > 60) # tod_offset sets tod to 0 on Feb 29th (doy == 60) tod_offset = 1 - (year_length == 366) * 1.0 * (doy == 60) toy = (doy - 1 - doy_offset + (tod / 24.0) * tod_offset) / 365.0 # year of date in continuous time, eg 2018.5 means middle of year 2018 # this is useful for modeling features that do not care about leap year e.g. environmental variables conti_year = year + (doy - 1 + (tod / 24.0)) / year_length is_weekend = pd.Series(dow).apply(lambda x: x in [6, 7]).values # weekend indicator # categorical var with levels (Mon-Thu, Fri, Sat, Sun), could help when training data are sparse. dow_grouped = pd.Series(str_dow).apply(lambda x: "1234-MTuWTh" if (x in ["1-Mon", "2-Tue", "3-Wed", "4-Thu"]) else x).values # growth terms ct1 = conti_year - conti_year_origin ct2 = signed_pow(ct1, 2) ct3 = signed_pow(ct1, 3) ct_sqrt = signed_pow(ct1, 1/2) ct_root3 = signed_pow(ct1, 1/3) # All keys must be added to constants. features_dict = { "datetime": dt, "date": date, "year": year, "year_length": year_length, "quarter": quarter, "quarter_start": quarter_start, "quarter_length": quarter_length, "month": month, "month_length": month_length, "woy": woy, "doy": doy, "doq": doq, "dom": dom, "dow": dow, "str_dow": str_dow, "str_doy": str_doy, "hour": hour, "minute": minute, "second": second, "year_month": year_month, "year_woy": year_woy, "month_dom": month_dom, "year_woy_dow": year_woy_dow, "woy_dow": woy_dow, "dow_hr": dow_hr, "dow_hr_min": dow_hr_min, "tod": tod, "tow": tow, "tom": tom, "toq": toq, "toy": toy, "conti_year": conti_year, "is_weekend": is_weekend, "dow_grouped": dow_grouped, "ct1": ct1, "ct2": ct2, "ct3": ct3, "ct_sqrt": ct_sqrt, "ct_root3": ct_root3, } df = pd.DataFrame(features_dict) return df
def add_time_features_df(df, time_col, conti_year_origin): """Adds a time feature data frame to a data frame :param df: the input data frame :param time_col: the name of the time column of interest :param conti_year_origin: the origin of time for the continuous time variable :return: the same data frame (df) augmented with new columns """ df = df.reset_index(drop=True) time_df = build_time_features_df( dt=df[time_col], conti_year_origin=conti_year_origin) time_df = time_df.reset_index(drop=True) return pd.concat([df, time_df], axis=1)
[docs]def get_holidays(countries, year_start, year_end): """This function extracts a holiday data frame for the period of interest [year_start to year_end] for the given countries. This is done using the holidays libraries in pypi:holidays-ext Parameters ---------- countries : `list` [`str`] countries for which we need holidays year_start : `int` first year of interest, inclusive year_end : `int` last year of interest, inclusive Returns ------- holiday_df_dict : `dict` [`str`, `pandas.DataFrame`] - key: country name - value: data frame with holidays for that country Each data frame has two columns: EVENT_DF_DATE_COL, EVENT_DF_LABEL_COL """ country_holiday_dict = {} year_list = list(range(year_start, year_end + 1)) country_holidays = get_hdays.get_holiday( country_list=countries, years=year_list ) for country, holidays in country_holidays.items(): country_df = pd.DataFrame({ cst.EVENT_DF_DATE_COL: list(holidays.keys()), cst.EVENT_DF_LABEL_COL: list(holidays.values())}) country_df[cst.EVENT_DF_DATE_COL] = pd.to_datetime(country_df[cst.EVENT_DF_DATE_COL]) country_holiday_dict[country] = country_df return country_holiday_dict
[docs]def get_available_holiday_lookup_countries(countries=None): """Returns list of available countries for modeling holidays :param countries: List[str] only look for available countries in this set :return: List[str] list of available countries for modeling holidays """ return get_hdays.get_available_holiday_lookup_countries( countries=countries )
def get_available_holidays_in_countries( countries, year_start, year_end): """Returns a dictionary mapping each country to its holidays between the years specified. :param countries: List[str] countries for which we need holidays :param year_start: int first year of interest :param year_end: int last year of interest :return: Dict[str, List[str]] key: country name value: list of holidays in that country between [year_start, year_end] """ return get_hdays.get_available_holidays_in_countries( countries=countries, year_start=year_start, year_end=year_end )
[docs]def get_available_holidays_across_countries( countries, year_start, year_end): """Returns a list of holidays that occur any of the countries between the years specified. :param countries: List[str] countries for which we need holidays :param year_start: int first year of interest :param year_end: int last year of interest :return: List[str] names of holidays in any of the countries between [year_start, year_end] """ return get_hdays.get_available_holidays_across_countries( countries=countries, year_start=year_start, year_end=year_end )
[docs]def add_daily_events( df, event_df_dict, date_col=cst.EVENT_DF_DATE_COL, regular_day_label=cst.EVENT_DEFAULT): """For each key of event_df_dict, it adds a new column to a data frame (df) with a date column (date_col). Each new column will represent the events given for that key. Notes ----- As a side effect, the columns in ``event_df_dict`` are renamed. Parameters ---------- df : `pandas.DataFrame` The data frame which has a date column. event_df_dict : `dict` [`str`, `pandas.DataFrame`] A dictionary of data frames, each representing events data for the corresponding key. Values are DataFrames with two columns: - The first column contains the date. Must be at the same frequency as ``df[date_col]`` for proper join. Must be in a format recognized by `pandas.to_datetime`. - The second column contains the event label for each date date_col : `str` Column name in ``df`` that contains the dates for joining against the events in ``event_df_dict``. regular_day_label : `str` The label used for regular days which are not "events". Returns ------- df_daily_events : `pandas.DataFrame` An augmented data frame version of df with new label columns -- one for each key of ``event_df_dict``. """ df[date_col] = pd.to_datetime(df[date_col]) for label, event_df in event_df_dict.items(): event_df = event_df.copy() new_col = f"{cst.EVENT_PREFIX}_{label}" event_df.columns = [date_col, new_col] event_df[date_col] = pd.to_datetime(event_df[date_col]) df = df.merge(event_df, on=date_col, how="left") df[new_col] = df[new_col].fillna(regular_day_label) return df
def add_event_window( df, time_col, label_col, time_delta="1D", pre_num=1, post_num=1, events_name=""): """For a data frame of events with a time_col and label_col it adds shifted events prior and after the given events For example if the event data frame includes the row '2019-12-25, Christmas' the function will produce dataframes with the events: '2019-12-24, Christmas' and '2019-12-26, Christmas' if pre_num and post_num are 1 or more. :param df: pd.DataFrame the events data frame with two columns 'time_col' and 'label_col' :param time_col: str The column with the timestamp of the events. This can be daily but does not have to :param label_col: str the column with labels for the events :param time_delta: str the amount of the shift for each unit specified by a string e.g. "1D" stands for one day delta :param pre_num: int the number of events to be added prior to the given event for each event in df :param post_num: int the number of events to be added after to the given event for each event in df :param events_name: str for each shift, we generate a new data frame and those data frames will be stored in a dictionary with appropriate keys. Each key starts with "events_name" and follow up with: "_minus_1", "_minus_2", "_plus_1", "_plus_2", ... depending on pre_num and post_num :return: dict[key: pd.Dataframe] A dictionary of dataframes for each needed shift. For example if pre_num=2 and post_num=3. 2 + 3 = 5 data frames will be stored in the return dictionary. """ df_dict = {} pd_time_delta = pd.to_timedelta(time_delta) for num in range(pre_num): df0 = pd.DataFrame() df0[time_col] = df[time_col] - (num + 1) * pd_time_delta df0[label_col] = df[label_col] df_dict[events_name + "_minus_" + f"{(num + 1):.0f}"] = df0 for num in range(post_num): df0 = pd.DataFrame() df0[time_col] = df[time_col] + (num + 1) * pd_time_delta df0[label_col] = df[label_col] df_dict[events_name + "_plus_" + f"{(num + 1):.0f}"] = df0 return df_dict def get_evenly_spaced_changepoints_values( df, continuous_time_col="ct1", n_changepoints=2): """Partitions interval into n_changepoints + 1 segments, placing a changepoint at left endpoint of each segment. The left most segment doesn't get a changepoint. Changepoints should be determined from training data. :param df: pd.DataFrame training dataset. contains continuous_time_col :param continuous_time_col: str name of continuous time column (e.g. conti_year, ct1) :param n_changepoints: int number of changepoints requested :return: np.array values of df[continuous_time_col] at the changepoints """ if not n_changepoints > 0: raise ValueError("n_changepoints must be > 0") n = df.shape[0] n_steps = n_changepoints + 1 step_size = n / n_steps indices = np.floor(np.arange(start=1, stop=n_steps) * step_size) return df[continuous_time_col][indices].values def get_evenly_spaced_changepoints_dates( df, time_col, n_changepoints): """Partitions interval into n_changepoints + 1 segments, placing a changepoint at left endpoint of each segment. The left most segment doesn't get a changepoint. Changepoints should be determined from training data. :param df: pd.DataFrame training dataset. contains continuous_time_col :param time_col: str name of time column :param n_changepoints: int number of changepoints requested :return: pd.Series values of df[time_col] at the changepoints """ if not n_changepoints >= 0: raise ValueError("n_changepoints must be >= 0") changepoint_indices = np.floor(np.arange(start=1, stop=n_changepoints + 1) * (df.shape[0] / (n_changepoints + 1))) changepoint_indices = df.index[np.concatenate([[0], changepoint_indices.astype(int)])] return df.loc[changepoint_indices, time_col] def get_custom_changepoints_values( df, changepoint_dates, time_col=cst.TIME_COL, continuous_time_col="ct1"): """Returns the values of continuous_time_col at the requested changepoint_dates. :param df: pd.DataFrame training dataset. contains continuous_time_col and time_col :param changepoint_dates: Iterable[Union[int, float, str, datetime]] Changepoint dates, interpreted by pd.to_datetime. Changepoints are set at the closest time on or after these dates in the dataset :param time_col: str The column name in `df` representing time for the time series data The time column can be anything that can be parsed by pandas DatetimeIndex :param continuous_time_col: str name of continuous time column (e.g. conti_year, ct1) :return: np.array values of df[continuous_time_col] at the changepoints """ ts = pd.to_datetime(df[time_col]) changepoint_dates = pd.to_datetime(changepoint_dates) # maps each changepoint to first date >= changepoint in the dataframe # if there is no such date, the changepoint is dropped (it would not be useful anyway) changepoint_ts = [ts[ts >= date].min() for date in changepoint_dates if any(ts >= date)] indices = ts.isin(changepoint_ts) changepoints = df[indices][continuous_time_col].values if changepoints.shape[0] == 0: changepoints = None return changepoints def get_changepoint_string(changepoint_dates): """Gets proper formatted strings for changepoint dates. The default format is "_%Y_%m_%d_%H". When necessary, it appends "_%M" or "_%M_%S". Parameters ---------- changepoint_dates : `list` List of changepoint dates, parsable by `pandas.to_datetime`. Returns ------- date_strings : `list[`str`]` List of string formatted changepoint dates. """ changepoint_dates = list(pd.to_datetime(changepoint_dates)) time_format = "_%Y_%m_%d_%H" if any([stamp.second != 0 for stamp in changepoint_dates]): time_format += "_%M_%S" elif any([stamp.minute != 0 for stamp in changepoint_dates]): time_format += "_%M" date_strings = [date.strftime(time_format) for date in changepoint_dates] return date_strings def get_changepoint_features( df, changepoint_values, continuous_time_col="ct1", growth_func=None, changepoint_dates=None): """Returns features for growth terms with continuous time origins at the changepoint_values (locations) specified Generates a time series feature for each changepoint: Let t = continuous_time value, c = changepoint value Then the changepoint feature value at time point t is `growth_func(t - c) * I(t >= c)`, where I is the indicator function This represents growth as a function of time, where the time origin is the changepoint In the typical case where growth_func(0) = 0 (has origin at 0), the total effect of the changepoints is continuous in time. If `growth_func` is the identity function, and `continuous_time` represents the year in continuous time, these terms form the basis for a continuous, piecewise linear curve to the growth trend. Fitting these terms with linear model, the coefficents represent slope change at each changepoint Intended usage ---------- To make predictions (on test set) Allow growth term as a function of time to change at these points. Parameters ---------- :param df: pd.Dataframe The dataset to make predictions. Contains column continuous_time_col. :param changepoint_values: array-like List of changepoint values (on same scale as df[continuous_time_col]). Should be determined from training data :param continuous_time_col: Optional[str] Name of continuous time column in df growth_func is applied to this column to generate growth term If None, uses "ct1", linear growth :param growth_func: Optional[callable] Growth function for defining changepoints (scalar -> scalar). If None, uses identity function to use continuous_time_col directly as growth term :param changepoint_dates: Optional[list] List of change point dates, parsable by `pandas.to_datetime`. :return: pd.DataFrame, shape (df.shape[0], len(changepoints)) Changepoint features, 0-indexed """ if continuous_time_col is None: continuous_time_col = "ct1" if growth_func is None: def growth_func(x): return x if changepoint_dates is not None: time_postfixes = get_changepoint_string(changepoint_dates) else: time_postfixes = [""] * len(changepoint_values) changepoint_df = pd.DataFrame() for i, changepoint in enumerate(changepoint_values): time_feature = np.array(df[continuous_time_col]) - changepoint # shifted time column (t - c_i) growth_term = np.array([growth_func(max(x, 0)) for x in time_feature]) # growth as a function of time time_feature_ind = time_feature >= 0 # Indicator(t >= c_i), lets changepoint take effect starting at c_i new_col = growth_term * time_feature_ind new_changepoint = pd.Series(new_col, name=f"{cst.CHANGEPOINT_COL_PREFIX}{i}{time_postfixes[i]}") changepoint_df = pd.concat([changepoint_df, new_changepoint], axis=1) return changepoint_df def get_changepoint_values_from_config( changepoints_dict, time_features_df, time_col=cst.TIME_COL): """Applies the changepoint method specified in `changepoints_dict` to return the changepoint values :param changepoints_dict: Optional[Dict[str, any]] Specifies the changepoint configuration. "method": str The method to locate changepoints. Valid options: "uniform". Places n_changepoints evenly spaced changepoints to allow growth to change. "custom". Places changepoints at the specified dates. Additional keys to provide parameters for each particular method are described below. "continuous_time_col": Optional[str] Column to apply `growth_func` to, to generate changepoint features Typically, this should match the growth term in the model "growth_func": Optional[func] Growth function (scalar -> scalar). Changepoint features are created by applying `growth_func` to "continuous_time_col" with offsets. If None, uses identity function to use `continuous_time_col` directly as growth term If changepoints_dict["method"] == "uniform", this other key is required: "n_changepoints": int number of changepoints to evenly space across training period If changepoints_dict["method"] == "custom", this other key is required: "dates": Iterable[Union[int, float, str, datetime]] Changepoint dates. Must be parsable by pd.to_datetime. Changepoints are set at the closest time on or after these dates in the dataset. :param time_features_df: pd.Dataframe training dataset. contains column "continuous_time_col" :param time_col: str The column name in `time_features_df` representing time for the time series data The time column can be anything that can be parsed by pandas DatetimeIndex Used only in the "custom" method. :return: np.array values of df[continuous_time_col] at the changepoints """ changepoint_values = None if changepoints_dict is not None: valid_changepoint_methods = ["uniform", "custom"] changepoint_method = changepoints_dict.get("method") continuous_time_col = changepoints_dict.get("continuous_time_col") if changepoint_method is None: raise Exception("changepoint method must be specified") if changepoint_method not in valid_changepoint_methods: raise NotImplementedError( f"changepoint method {changepoint_method} not recognized. " f"Must be one of {valid_changepoint_methods}") if changepoint_method == "uniform": if changepoints_dict["n_changepoints"] > 0: params = {"continuous_time_col": continuous_time_col} if continuous_time_col is not None else {} changepoint_values = get_evenly_spaced_changepoints_values( df=time_features_df, n_changepoints=changepoints_dict["n_changepoints"], **params) elif changepoint_method == "custom": params = {} if time_col is not None: params["time_col"] = time_col if continuous_time_col is not None: params["continuous_time_col"] = continuous_time_col changepoint_values = get_custom_changepoints_values( df=time_features_df, changepoint_dates=changepoints_dict["dates"], **params) return changepoint_values def get_changepoint_features_and_values_from_config( df, time_col, changepoints_dict=None, origin_for_time_vars=None): """Extracts changepoints from changepoint configuration and input data :param df: pd.DataFrame Training data. A data frame which includes the timestamp and value columns :param time_col: str The column name in `df` representing time for the time series data The time column can be anything that can be parsed by pandas DatetimeIndex :param changepoints_dict: Optional[Dict[str, any]] Specifies the changepoint configuration. "method": str The method to locate changepoints. Valid options: "uniform". Places n_changepoints evenly spaced changepoints to allow growth to change. "custom". Places changepoints at the specified dates. Additional keys to provide parameters for each particular method are described below. "continuous_time_col": Optional[str] Column to apply `growth_func` to, to generate changepoint features Typically, this should match the growth term in the model "growth_func": Optional[func] Growth function (scalar -> scalar). Changepoint features are created by applying `growth_func` to "continuous_time_col" with offsets. If None, uses identity function to use `continuous_time_col` directly as growth term If changepoints_dict["method"] == "uniform", this other key is required: "n_changepoints": int number of changepoints to evenly space across training period If changepoints_dict["method"] == "custom", this other key is required: "dates": Iterable[Union[int, float, str, datetime]] Changepoint dates. Must be parsable by pd.to_datetime. Changepoints are set at the closest time on or after these dates in the dataset. :param origin_for_time_vars: Optional[float] The time origin used to create continuous variables for time :return: Dict[str, any] Dictionary with the requested changepoints and associated information changepoint_df: pd.DataFrame, shape (df.shape[0], len(changepoints)) Changepoint features for modeling the training data changepoint_values: array-like List of changepoint values (on same scale as df[continuous_time_col]) Can be used to generate changepoints for prediction. continuous_time_col: Optional[str] Name of continuous time column in df growth_func is applied to this column to generate growth term. If None, uses "ct1", linear growth Can be used to generate changepoints for prediction. growth_func: Optional[callable] Growth function for defining changepoints (scalar -> scalar). If None, uses identity function to use continuous_time_col directly as growth term. Can be used to generate changepoints for prediction. changepoint_cols: List[str] Names of the changepoint columns for modeling """ # extracts changepoint values if changepoints_dict is None: changepoint_values = None continuous_time_col = None growth_func = None else: if origin_for_time_vars is None: origin_for_time_vars = get_default_origin_for_time_vars(df, time_col) time_features_df = build_time_features_df( df[time_col], conti_year_origin=origin_for_time_vars) changepoint_values = get_changepoint_values_from_config( changepoints_dict=changepoints_dict, time_features_df=time_features_df, time_col="datetime") # datetime column generated by `build_time_features_df` continuous_time_col = changepoints_dict.get("continuous_time_col") growth_func = changepoints_dict.get("growth_func") # extracts changepoint column names if changepoint_values is None: changepoint_df = None changepoint_cols = [] else: if changepoints_dict is None: changepoint_dates = None elif changepoints_dict["method"] == "custom": changepoint_dates = list(pd.to_datetime(changepoints_dict["dates"])) elif changepoints_dict["method"] == "uniform": changepoint_dates = get_evenly_spaced_changepoints_dates( df=df, time_col=time_col, n_changepoints=changepoints_dict["n_changepoints"] ).tolist()[1:] # the changepoint features does not include the growth term else: changepoint_dates = None changepoint_df = get_changepoint_features( df=time_features_df, changepoint_values=changepoint_values, continuous_time_col=continuous_time_col, growth_func=growth_func, changepoint_dates=changepoint_dates) changepoint_cols = list(changepoint_df.columns) return { "changepoint_df": changepoint_df, "changepoint_values": changepoint_values, "continuous_time_col": continuous_time_col, "growth_func": growth_func, "changepoint_cols": changepoint_cols } def get_changepoint_dates_from_changepoints_dict( changepoints_dict, df=None, time_col=None): """Gets the changepoint dates from ``changepoints_dict`` Parameters ---------- changepoints_dict : `dict` or `None` The ``changepoints_dict`` which is compatible with `~greykite.algo.forecast.silverkite.forecast_silverkite.SilverkiteForecast.forecast` df : `pandas.DataFrame` or `None`, default `None` The data df to put changepoints on. time_col : `str` or `None`, default `None` The column name of time column in ``df``. Returns ------- changepoint_dates : `list` List of changepoint dates. """ if (changepoints_dict is None or "method" not in changepoints_dict.keys() or changepoints_dict["method"] not in ["auto", "uniform", "custom"]): return None method = changepoints_dict["method"] if method == "custom": # changepoints_dict["dates"] is `Iterable`, converts to list changepoint_dates = list(changepoints_dict["dates"]) elif method == "uniform": if df is None or time_col is None: raise ValueError("When the method of ``changepoints_dict`` is 'uniform', ``df`` and " "``time_col`` must be provided.") changepoint_dates = get_evenly_spaced_changepoints_dates( df=df, time_col=time_col, n_changepoints=changepoints_dict["n_changepoints"] ) # the output is `pandas.Series`, converts to list changepoint_dates = changepoint_dates.tolist()[1:] else: raise ValueError("The method of ``changepoints_dict`` can not be 'auto'. " "Please specify or detect change points first.") return changepoint_dates
[docs]def add_event_window_multi( event_df_dict, time_col, label_col, time_delta="1D", pre_num=1, post_num=1, pre_post_num_dict=None): """For a given dictionary of events data frames with a time_col and label_col it adds shifted events prior and after the given events For example if the event data frame includes the row '2019-12-25, Christmas' as a row the function will produce dataframes with the events '2019-12-24, Christmas' and '2019-12-26, Christmas' if pre_num and post_num are 1 or more. Parameters ---------- event_df_dict: `dict` [`str`, `pandas.DataFrame`] A dictionary of events data frames with each having two columns: ``time_col`` and ``label_col``. time_col: `str` The column with the timestamp of the events. This can be daily but does not have to be. label_col : `str` The column with labels for the events. time_delta : `str`, default "1D" The amount of the shift for each unit specified by a string e.g. '1D' stands for one day delta pre_num : `int`, default 1 The number of events to be added prior to the given event for each event in df. post_num: `int`, default 1 The number of events to be added after to the given event for each event in df. pre_post_num_dict : `dict` [`str`, (`int`, `int`)] or None, default None Optionally override ``pre_num`` and ``post_num`` for each key in ``event_df_dict``. For example, if ``event_df_dict`` has keys "US" and "India", this parameter can be set to ``pre_post_num_dict = {"US": [1, 3], "India": [1, 2]}``, denoting that the "US" ``pre_num`` is 1 and ``post_num`` is 3, and "India" ``pre_num`` is 1 and ``post_num`` is 2. Keys not specified by ``pre_post_num_dict`` use the default given by ``pre_num`` and ``post_num``. Returns ------- df : `dict` [`str`, `pandas.DataFrame`] A dictionary of dataframes for each needed shift. For example if pre_num=2 and post_num=3. 2 + 3 = 5 data frames will be stored in the return dictionary. """ if pre_post_num_dict is None: pre_post_num_dict = {} shifted_df_dict = {} for event_df_key, event_df in event_df_dict.items(): if event_df_key in pre_post_num_dict.keys(): pre_num0 = pre_post_num_dict[event_df_key][0] post_num0 = pre_post_num_dict[event_df_key][1] else: pre_num0 = pre_num post_num0 = post_num df_dict0 = add_event_window( df=event_df, time_col=time_col, label_col=label_col, time_delta=time_delta, pre_num=pre_num0, post_num=post_num0, events_name=event_df_key) shifted_df_dict.update(df_dict0) return shifted_df_dict
def get_fourier_col_name(k, col_name, function_name="sin", seas_name=None): """Returns column name corresponding to a particular fourier term, as returned by fourier_series_fcn :param k: int fourier term :param col_name: str column in the dataframe used to generate fourier series :param function_name: str sin or cos :param seas_name: strcols_interact appended to new column names added for fourier terms :return: str column name in DataFrame returned by fourier_series_fcn """ # patsy doesn't allow "." in formula term. Replace "." with "_" rather than quoting "Q()" all fourier terms name = f"{function_name}{k:.0f}_{col_name}" if seas_name is not None: name = f"{name}_{seas_name}" return name def fourier_series_fcn(col_name, period=1.0, order=1, seas_name=None): """Generates a function which creates fourier series matrix for a column of an input df :param col_name: str is the column name in the dataframe which is to be used for generating fourier series. It needs to be a continuous variable. :param period: float the period of the fourier series :param order: int the order of the fourier series :param seas_name: Optional[str] appended to new column names added for fourier terms. Useful to distinguish multiple fourier series on same col_name with different periods. :return: callable a function which can be applied to any data.frame df with a column name being equal to col_name """ def fs_func(df): out_df = pd.DataFrame() out_cols = [] if col_name not in df.columns: raise ValueError("The data frame does not have the column: " + col_name) x = df[col_name] x = np.array(x) for i in range(order): k = i + 1 sin_col_name = get_fourier_col_name( k, col_name, function_name="sin", seas_name=seas_name) cos_col_name = get_fourier_col_name( k, col_name, function_name="cos", seas_name=seas_name) out_cols.append(sin_col_name) out_cols.append(cos_col_name) omega = 2 * math.pi / period u = omega * k * x out_df[sin_col_name] = np.sin(u) out_df[cos_col_name] = np.cos(u) return {"df": out_df, "cols": out_cols} return fs_func def fourier_series_multi_fcn( col_names, periods=None, orders=None, seas_names=None): """Generates a func which adds multiple fourier series with multiple periods. Parameters ---------- col_names : `list` [`str`] the column names which are to be used to generate Fourier series. Each column can have its own period and order. periods: `list` [`float`] or None the periods corresponding to each column given in col_names orders : `list` [`int`] or None the orders for each of the Fourier series seas_names : `list` [`str`] or None Appended to the Fourier series name. If not provided (None) col_names will be used directly. """ k = len(col_names) if periods is None: periods = [1.0] * k if orders is None: orders = [1] * k if len(periods) != len(orders): raise ValueError("periods and orders must have the same length.") def fs_multi_func(df): out_df = None out_cols = [] for i in range(k): col_name = col_names[i] period = periods[i] order = orders[i] seas_name = None if seas_names is not None: seas_name = seas_names[i] func0 = fourier_series_fcn( col_name=col_name, period=period, order=order, seas_name=seas_name) res = func0(df) fs_df = res["df"] fs_cols = res["cols"] out_df = pd.concat([out_df, fs_df], axis=1) out_cols = out_cols + fs_cols return {"df": out_df, "cols": out_cols} return fs_multi_func def signed_pow(x, y): """ Takes the absolute value of x and raises it to power of y. Then it multiplies the result by sign of x. This guarantees this function is non-decreasing. This is useful in many contexts e.g. statistical modeling. :param x: the base number which can be any real number :param y: the power which can be any real number :return: returns abs(x) to power of y multiplied by sign of x """ return np.sign(x) * np.power(np.abs(x), y) def signed_pow_fcn(y): return lambda x: signed_pow(x, y) signed_sqrt = signed_pow_fcn(1 / 2) signed_sq = signed_pow_fcn(2) def logistic(x, growth_rate=1.0, capacity=1.0, floor=0.0, inflection_point=0.0): """Evaluates the logistic function at x with the specified growth rate, capacity, floor, and inflection point. :param x: value to evaluate the logistic function :type x: float :param growth_rate: growth rate :type growth_rate: float :param capacity: max value (carrying capacity) :type capacity: float :param floor: min value (lower bound) :type floor: float :param inflection_point: the t value of the inflection point :type inflection_point: float :return: value of the logistic function at t :rtype: float """ return floor + capacity * expit(growth_rate * (x - inflection_point)) def get_logistic_func(growth_rate=1.0, capacity=1.0, floor=0.0, inflection_point=0.0): """Returns a function that evaluates the logistic function at t with the specified growth rate, capacity, floor, and inflection point. f(x) = floor + capacity / (1 + exp(-growth_rate * (x - inflection_point))) :param growth_rate: growth rate :type growth_rate: float :param capacity: max value (carrying capacity) :type capacity: float :param floor: min value (lower bound) :type floor: float :param inflection_point: the t value of the inflection point :type inflection_point: float :return: the logistic function with specified parameters :rtype: callable """ return lambda t: logistic(t, growth_rate, capacity, floor, inflection_point)