# BSD 2-CLAUSE LICENSE
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
# #ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# original author: Albert Chen, Sayan Patra
"""Plotting functions in plotly."""
import warnings
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.colors import DEFAULT_PLOTLY_COLORS
from greykite.common import constants as cst
from greykite.common.features.timeseries_features import build_time_features_df
from greykite.common.logging import LoggingLevelEnum
from greykite.common.logging import log_message
from greykite.common.python_utils import update_dictionary
from greykite.common.viz.colors_utils import get_color_palette
[docs]def plot_multivariate(
df,
x_col,
y_col_style_dict="plotly",
default_color="rgba(0, 145, 202, 1.0)",
xlabel=None,
ylabel=cst.VALUE_COL,
title=None,
showlegend=True):
"""Plots one or more lines against the same x-axis values.
Parameters
----------
df : `pandas.DataFrame`
Data frame with ``x_col`` and columns named by the keys in ``y_col_style_dict``.
x_col: `str`
Which column to plot on the x-axis.
y_col_style_dict: `dict` [`str`, `dict` or None] or "plotly" or "auto" or "auto-fill", default "plotly"
The column(s) to plot on the y-axis, and how to style them.
If a dictionary:
- key : `str`
column name in ``df``
- value : `dict` or None
Optional styling options, passed as kwargs to `go.Scatter`.
If None, uses the default: line labeled by the column name.
See reference page for `plotly.graph_objects.Scatter` for options
(e.g. color, mode, width/size, opacity).
https://plotly.com/python/reference/#scatter.
If a string, plots all columns in ``df`` besides ``x_col`` against ``x_col``:
- "plotly": plot lines with default plotly styling
- "auto": plot lines with color ``default_color``, sorted by value (ascending)
- "auto-fill": plot lines with color ``default_color``, sorted by value (ascending), and fills between lines
default_color: `str`, default "rgba(0, 145, 202, 1.0)" (blue)
Default line color when ``y_col_style_dict`` is one of "auto", "auto-fill".
xlabel : `str` or None, default None
x-axis label. If None, default is ``x_col``.
ylabel : `str` or None, default ``VALUE_COL``
y-axis label
title : `str` or None, default None
Plot title. If None, default is based on axis labels.
showlegend : `bool`, default True
Whether to show the legend.
Returns
-------
fig : `plotly.graph_objects.Figure`
Interactive plotly graph of one or more columns
in ``df`` against ``x_col``.
See `~greykite.common.viz.timeseries_plotting.plot_forecast_vs_actual`
return value for how to plot the figure and add customization.
"""
if xlabel is None:
xlabel = x_col
if title is None and ylabel is not None:
title = f"{ylabel} vs {xlabel}"
auto_style = {"line": {"color": default_color}}
if y_col_style_dict == "plotly":
# Uses plotly default style
y_col_style_dict = {col: None for col in df.columns if col != x_col}
elif y_col_style_dict in ["auto", "auto-fill"]:
# Columns ordered from low to high
means = df.drop(columns=x_col).mean()
column_order = list(means.sort_values().index)
if y_col_style_dict == "auto":
# Lines with color `default_color`
y_col_style_dict = {col: auto_style for col in column_order}
elif y_col_style_dict == "auto-fill":
# Lines with color `default_color`, with fill between lines
y_col_style_dict = {column_order[0]: auto_style}
y_col_style_dict.update({
col: {
"line": {"color": default_color},
"fill": "tonexty"
} for col in column_order[1:]
})
data = []
default_style = dict(mode="lines")
for column, style_dict in y_col_style_dict.items():
# By default, column name in ``df`` is used to label the line
default_col_style = update_dictionary(default_style, overwrite_dict={"name": column})
# User can overwrite any of the default values, or remove them by setting key value to None
style_dict = update_dictionary(default_col_style, overwrite_dict=style_dict)
line = go.Scatter(
x=df[x_col],
y=df[column],
**style_dict)
data.append(line)
layout = go.Layout(
xaxis=dict(title=xlabel),
yaxis=dict(title=ylabel),
title=title,
title_x=0.5,
showlegend=showlegend,
legend={'traceorder': 'reversed'} # Matches the order of ``y_col_style_dict`` (bottom to top)
)
fig = go.Figure(data=data, layout=layout)
return fig
def plot_multivariate_grouped(
df,
x_col,
y_col_style_dict,
grouping_x_col,
grouping_x_col_values,
grouping_y_col_style_dict,
colors=DEFAULT_PLOTLY_COLORS,
xlabel=None,
ylabel=cst.VALUE_COL,
title=None,
showlegend=True):
"""Plots multiple lines against the same x-axis values. The lines can
partially share the x-axis values.
See parameter descriptions for a running example.
Parameters
----------
df : `pandas.DataFrame`
Data frame with ``x_col`` and columns named by the keys in ``y_col_style_dict``,
``grouping_x_col``, ``grouping_y_col_style_dict``.
For example::
df = pd.DataFrame({
time: [dt(2018, 1, 1),
dt(2018, 1, 2),
dt(2018, 1, 3)],
"y1": [8.5, 2.0, 3.0],
"y2": [1.4, 2.1, 3.4],
"y3": [4.2, 3.1, 3.0],
"y4": [0, 1, 2],
"y5": [10, 9, 8],
"group": [1, 2, 1],
})
This will be our running example.
x_col: `str`
Which column to plot on the x-axis.
"time" in our example.
y_col_style_dict: `dict` [`str`, `dict` or None]
The column(s) to plot on the y-axis, and how to style them.
These columns are plotted against the complete x-axis.
- key : `str`
column name in ``df``
- value : `dict` or None
Optional styling options, passed as kwargs to `go.Scatter`.
If None, uses the default: line labeled by the column name.
If line color is not given, it is added according to ``colors``.
See reference page for `plotly.graph_objects.Scatter` for options
(e.g. color, mode, width/size, opacity).
https://plotly.com/python/reference/#scatter.
For example::
y_col_style_dict={
"y1": {
"name": "y1_name",
"legendgroup": "one",
"mode": "markers",
"line": None # Remove line params since we use mode="markers"
},
"y2": None,
}
The function will add a line color to "y1" and "y2" based on the ``colors`` parameter.
It will also add a name to "y2", since none was given. The "name" of "y1" will be preserved.
The output ``fig`` will have one line each for each of "y1" and "y2", each plot against
the entire "time" column.
grouping_x_col: `str`
Which column to use to group columns in ``grouping_y_col_style_dict``.
"group" in our example.
grouping_x_col_values: `list` [`int`] or None
Which values to use for grouping. If None, uses all the unique values in
``df`` [``grouping_x_col``].
In our example, specifying ``grouping_x_col_values == [1, 2]`` would plot
separate lines corresponding to ``group==1`` and ``group==2``.
grouping_y_col_style_dict: `dict` [`str`, `dict` or None]
The column(s) to plot on the y-axis, and how to style them.
These columns are plotted against partial x-axis.
For each ``grouping_x_col_values`` an element in this dictionary produces
one line.
- key : `str`
column name in ``df``
- value : `dict` or None
Optional styling options, passed as kwargs to `go.Scatter`.
If None, uses the default: line labeled by the ``grouping_x_col_values``,
``grouping_x_col`` and column name.
If a name is given, it is augmented with the ``grouping_x_col_values``.
If line color is not given, it is added according to ``colors``.
All the lines sharing same ``grouping_x_col_values`` have the same color.
See reference page for `plotly.graph_objects.Scatter` for options
(e.g. color, mode, width/size, opacity).
https://plotly.com/python/reference/#scatter.
For example::
grouping_y_col_style_dict={
"y3": {
"line": {
"color": "blue"
}
},
"y4": {
"name": "y4_name",
"line": {
"width": 2,
"dash": "dot"
}
},
"y5": None,
}
The function will add a line color to "y4" and "y5" based on the ``colors`` parameter.
The line color of "y3" will be "blue" as specified. We also preserve the given line
properties of "y4".
` The function adds a name to "y3" and "y5", since none was given. The given "name" of "y4"
will be augmented with ``grouping_x_col_values``.
Each element of ``grouping_y_col_style_dict`` gets one line for each ``grouping_x_col_values``.
In our example, there will be 2 lines corresponding to "y3", named "1_y3" and "2_y3".
"1_y3" is plotted against "time = [dt(2018, 1, 1), dt(2018, 1, 3)]", corresponding to ``group==1``.
"2_y3" is plotted against "time = [dt(2018, 1, 2)", corresponding to ``group==2``.
colors: [`str`, `list` [`str`]], default ``DEFAULT_PLOTLY_COLORS``
Which colors to use to build a color palette for plotting.
This can be a list of RGB colors or a `str` from ``PLOTLY_SCALES``.
Required number of colors equals sum of the length of ``y_col_style_dict``
and length of ``grouping_x_col_values``.
See `~greykite.common.viz.colors_utils.get_color_palette` for details.
xlabel : `str` or None, default None
x-axis label. If None, default is ``x_col``.
ylabel : `str` or None, default ``VALUE_COL``
y-axis label
title : `str` or None, default None
Plot title. If None, default is based on axis labels.
showlegend : `bool`, default True
Whether to show the legend.
Returns
-------
fig : `plotly.graph_objects.Figure`
Interactive plotly graph of one or more columns
in ``df`` against ``x_col``.
See `~greykite.common.viz.timeseries_plotting.plot_forecast_vs_actual`
return value for how to plot the figure and add customization.
"""
available_grouping_x_col_values = np.unique(df[grouping_x_col])
if grouping_x_col_values is None:
grouping_x_col_values = available_grouping_x_col_values
else:
missing_grouping_x_col_values = set(grouping_x_col_values) - set(available_grouping_x_col_values)
if len(missing_grouping_x_col_values) > 0:
raise ValueError(f"Following 'grouping_x_col_values' are missing in '{grouping_x_col}' column: "
f"{missing_grouping_x_col_values}")
# Chooses the color palette
n_color = len(y_col_style_dict) + len(grouping_x_col_values)
color_palette = get_color_palette(num=n_color, colors=colors)
# Updates colors for y_col_style_dict if it is not specified
for color_num, (column, style_dict) in enumerate(y_col_style_dict.items()):
if style_dict is None:
style_dict = {}
default_color = {"color": color_palette[color_num]}
style_dict["line"] = update_dictionary(default_color, overwrite_dict=style_dict.get("line"))
y_col_style_dict[column] = style_dict
# Standardizes dataset for the next figure
df_standardized = df.copy().drop_duplicates(subset=[x_col]).sort_values(by=x_col)
# This figure plots the whole xaxis vs yaxis values
fig = plot_multivariate(
df=df_standardized,
x_col=x_col,
y_col_style_dict=y_col_style_dict,
xlabel=xlabel,
ylabel=ylabel,
title=title,
showlegend=showlegend)
data = fig.data
layout = fig.layout
# These figures plot the sliced xaxis vs yaxis values
for color_num, grouping_x_col_value in enumerate(grouping_x_col_values, len(y_col_style_dict)):
default_color = {"color": color_palette[color_num]}
sliced_y_col_style_dict = grouping_y_col_style_dict.copy()
for column, style_dict in sliced_y_col_style_dict.items():
# Updates colors if it is not specified
if style_dict is None:
style_dict = {}
line_dict = update_dictionary(default_color, overwrite_dict=style_dict.get("line"))
# Augments names with grouping_x_col_value
name = style_dict.get("name")
if name is None:
updated_name = f"{grouping_x_col_value}_{grouping_x_col}_{column}"
else:
updated_name = f"{grouping_x_col_value}_{name}"
overwrite_dict = {
"name": updated_name,
"line": line_dict
}
style_dict = update_dictionary(style_dict, overwrite_dict=overwrite_dict)
sliced_y_col_style_dict[column] = style_dict
df_sliced = df[df[grouping_x_col] == grouping_x_col_value]
fig = plot_multivariate(
df=df_sliced,
x_col=x_col,
y_col_style_dict=sliced_y_col_style_dict)
data = data + fig.data
fig = go.Figure(data=data, layout=layout)
return fig
[docs]def plot_univariate(
df,
x_col,
y_col,
xlabel=None,
ylabel=None,
title=None,
color="rgb(32, 149, 212)", # light blue
showlegend=True):
"""Simple plot of univariate timeseries.
Parameters
----------
df : `pandas.DataFrame`
Data frame with ``x_col`` and ``y_col``
x_col: `str`
x-axis column name, usually the time column
y_col: `str`
y-axis column name, the value the plot
xlabel : `str` or None, default None
x-axis label
ylabel : `str` or None, default None
y-axis label
title : `str` or None, default None
Plot title. If None, default is based on axis labels.
color : `str`, default "rgb(32, 149, 212)" (light blue)
Line color
showlegend : `bool`, default True
Whether to show the legend
Returns
-------
fig : `plotly.graph_objects.Figure`
Interactive plotly graph of the value against time.
See `~greykite.common.viz.timeseries_plotting.plot_forecast_vs_actual`
return value for how to plot the figure and add customization.
See Also
--------
`~greykite.common.viz.timeseries_plotting.plot_multivariate`
Provides more styling options. Also consider using plotly's `go.Scatter` and `go.Layout` directly.
"""
# sets default x and y-axis names based on column names
if xlabel is None:
xlabel = x_col
if ylabel is None:
ylabel = y_col
y_col_style_dict = {
y_col: dict(
name=y_col,
mode="lines",
line=dict(
color=color
),
opacity=0.8
)
}
return plot_multivariate(
df,
x_col,
y_col_style_dict,
xlabel=xlabel,
ylabel=ylabel,
title=title,
showlegend=showlegend,
)
[docs]def plot_forecast_vs_actual(
df,
time_col=cst.TIME_COL,
actual_col=cst.ACTUAL_COL,
predicted_col=cst.PREDICTED_COL,
predicted_lower_col=cst.PREDICTED_LOWER_COL,
predicted_upper_col=cst.PREDICTED_UPPER_COL,
xlabel=cst.TIME_COL,
ylabel=cst.VALUE_COL,
train_end_date=None,
title=None,
showlegend=True,
actual_mode="lines+markers",
actual_points_color="rgba(250, 43, 20, 0.7)", # red
actual_points_size=2.0,
actual_color_opacity=1.0,
forecast_curve_color="rgba(0, 90, 181, 0.7)", # blue
forecast_curve_dash="solid",
ci_band_color="rgba(0, 90, 181, 0.15)", # light blue
ci_boundary_curve_color="rgba(0, 90, 181, 0.5)", # light blue
ci_boundary_curve_width=0.0, # no line
vertical_line_color="rgba(100, 100, 100, 0.9)", # black color with opacity of 0.9
vertical_line_width=1.0):
"""Plots forecast with prediction intervals, against actuals
Adapted from plotly user guide:
https://plot.ly/python/v3/continuous-error-bars/#basic-continuous-error-bars
Parameters
----------
df : `pandas.DataFrame`
Timestamp, predicted, and actual values
time_col : `str`, default `~greykite.common.constants.TIME_COL`
Column in df with timestamp (x-axis)
actual_col : `str`, default `~greykite.common.constants.ACTUAL_COL`
Column in df with actual values
predicted_col : `str`, default `~greykite.common.constants.PREDICTED_COL`
Column in df with predicted values
predicted_lower_col : `str` or None, default `~greykite.common.constants.PREDICTED_LOWER_COL`
Column in df with predicted lower bound
predicted_upper_col : `str` or None, default `~greykite.common.constants.PREDICTED_UPPER_COL`
Column in df with predicted upper bound
xlabel : `str`, default `~greykite.common.constants.TIME_COL`
x-axis label.
ylabel : `str`, default `~greykite.common.constants.VALUE_COL`
y-axis label.
train_end_date : `datetime.datetime` or None, default None
Train end date.
Must be a value in ``df[time_col]``.
title : `str` or None, default None
Plot title.
showlegend : `bool`, default True
Whether to show a plot legend.
actual_mode : `str`, default "lines+markers"
How to show the actuals.
Options: ``markers``, ``lines``, ``lines+markers``
actual_points_color : `str`, default "rgba(99, 114, 218, 1.0)"
Color of actual line/marker.
actual_points_size : `float`, default 2.0
Size of actual markers.
Only used if "markers" is in ``actual_mode``.
actual_color_opacity : `float` or None, default 1.0
Opacity of actual values points.
forecast_curve_color : `str`, default "rgba(0, 145, 202, 1.0)"
Color of forecasted values.
forecast_curve_dash : `str`, default "solid"
'dash' property of forecast ``scatter.line``.
One of: ``['solid', 'dot', 'dash', 'longdash', 'dashdot', 'longdashdot']``
or a string containing a dash length list in pixels or percentages
(e.g. ``'5px 10px 2px 2px'``, ``'5, 10, 2, 2'``, ``'10% 20% 40%'``)
ci_band_color : `str`, default "rgba(0, 145, 202, 0.15)"
Fill color of the prediction bands.
ci_boundary_curve_color : `str`, default "rgba(0, 145, 202, 0.15)"
Color of the prediction upper/lower lines.
ci_boundary_curve_width : `float`, default 0.0
Width of the prediction upper/lower lines.
default 0.0 (hidden)
vertical_line_color : `str`, default "rgba(100, 100, 100, 0.9)"
Color of the vertical line indicating train end date.
Default is black with opacity of 0.9.
vertical_line_width : `float`, default 1.0
width of the vertical line indicating train end date
Returns
-------
fig : `plotly.graph_objects.Figure`
Plotly figure of forecast against actuals, with prediction
intervals if available.
Can show, convert to HTML, update::
# show figure
fig.show()
# get HTML string, write to file
fig.to_html(include_plotlyjs=False, full_html=True)
fig.write_html("figure.html", include_plotlyjs=False, full_html=True)
# customize layout (https://plot.ly/python/v3/user-guide/)
update_layout = dict(
yaxis=dict(title="new ylabel"),
title_text="new title",
title_x=0.5,
title_font_size=30)
fig.update_layout(update_layout)
"""
if title is None:
title = "Forecast vs Actual"
if train_end_date is not None and not all(pd.Series(train_end_date).isin(df[time_col])):
raise Exception(
f"train_end_date {train_end_date} is not found in df['{time_col}']")
fill_dict = {
"mode": "lines",
"fillcolor": ci_band_color,
"fill": "tonexty"
}
data = []
if predicted_lower_col is not None:
lower_bound = go.Scatter(
name="Lower Bound",
x=df[time_col],
y=df[predicted_lower_col],
mode="lines",
line=dict(
width=ci_boundary_curve_width,
color=ci_boundary_curve_color),
legendgroup="interval" # show/hide with the upper bound
)
data.append(lower_bound)
# plotly fills between current and previous element in `data`.
# Only fill if lower bound exists.
forecast_fill_dict = fill_dict if predicted_lower_col is not None else {}
if predicted_upper_col is not None:
upper_bound = go.Scatter(
name="Upper Bound",
x=df[time_col],
y=df[predicted_upper_col],
line=dict(
width=ci_boundary_curve_width,
color=ci_boundary_curve_color),
legendgroup="interval", # show/hide with the lower bound
**forecast_fill_dict)
data.append(upper_bound)
# If `predicted_lower_col` and `predicted_upper_col`, then the full range
# has been filled in. If only one of them, then fill in between that line
# and forecast.
actual_params = {}
if "lines" in actual_mode:
actual_params.update(line=dict(color=actual_points_color))
if "markers" in actual_mode:
actual_params.update(marker=dict(color=actual_points_color, size=actual_points_size))
actual = go.Scatter(
name="Actual",
x=df[time_col],
y=df[actual_col],
mode=actual_mode,
opacity=actual_color_opacity,
**actual_params
)
data.append(actual)
forecast_fill_dict = fill_dict if (predicted_lower_col is None) != (predicted_upper_col is None) else {}
forecast = go.Scatter(
name="Forecast",
x=df[time_col],
y=df[predicted_col],
line=dict(
color=forecast_curve_color,
dash=forecast_curve_dash),
**forecast_fill_dict)
data.append(forecast)
layout = go.Layout(
xaxis=dict(title=xlabel),
yaxis=dict(title=ylabel),
title=title,
title_x=0.5,
showlegend=showlegend,
# legend order from top to bottom: Actual, Forecast, Upper Bound, Lower Bound
legend={'traceorder': 'reversed'}
)
fig = go.Figure(data=data, layout=layout)
fig.update()
# adds a vertical line to separate training and testing phases
if train_end_date is not None:
new_layout = dict(
# add vertical line
shapes=[dict(
type="line",
xref="x",
yref="paper", # y-reference is assigned to the plot paper [0,1]
x0=train_end_date,
y0=0,
x1=train_end_date,
y1=1,
line=dict(
color=vertical_line_color,
width=vertical_line_width)
)],
# add text annotation
annotations=[dict(
xref="x",
x=train_end_date,
yref="paper",
y=.97,
text="Train End Date",
showarrow=True,
arrowhead=0,
ax=-60,
ay=0
)]
)
fig.update_layout(new_layout)
return fig
def split_range_into_groups(
n,
group_size,
which_group_complete="last"):
"""Partitions `n` elements into adjacent groups,
each with `group_size` elements
Group number starts from 0 and increments upward
Can be used to generate groups for sliding window aggregation.
:param n: int
number of elemnts to split into groups
:param group_size: int
number of elements per group
:param which_group_complete: str
If n % group_size > 0, one group will have fewer than `group_size` elements
if "first", the first group is full if possible, and last group may be incomplete
if "last", (default) the last group is full if possible,
and first group may be incomplete
:return: np.array of length n
values correspond to the element's group number
Examples:
>>> split_range_into_groups(10, 1, "last")
array([0., 1., 2., 3., 4., 5., 6., 7., 8., 9.])
>>> split_range_into_groups(10, 2, "last")
array([0., 0., 1., 1., 2., 2., 3., 3., 4., 4.])
>>> split_range_into_groups(10, 3, "last")
array([0., 1., 1., 1., 2., 2., 2., 3., 3., 3.])
>>> split_range_into_groups(10, 4, "last")
array([0., 0., 1., 1., 1., 1., 2., 2., 2., 2.])
>>> split_range_into_groups(10, 4, "first")
array([0., 0., 0., 0., 1., 1., 1., 1., 2., 2.])
>>> split_range_into_groups(10, 5, "last")
array([0., 0., 0., 0., 0., 1., 1., 1., 1., 1.])
>>> split_range_into_groups(10, 6, "last")
array([0., 0., 0., 0., 1., 1., 1., 1., 1., 1.])
>>> split_range_into_groups(10, 10, "last")
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
>>> split_range_into_groups(10, 12, "last")
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
"""
if which_group_complete.lower() == "first":
offset = 0
else:
offset = group_size - n % group_size
offset = offset % group_size # sets offset to 0 if n % group_size == 0
return np.floor(np.arange(offset, n + offset) / group_size)
def add_groupby_column(
df,
time_col,
groupby_time_feature=None,
groupby_sliding_window_size=None,
groupby_custom_column=None):
"""Extracts a column to group by from ``df``.
Exactly one of ``groupby_time_feature``, ``groupby_sliding_window_size``,
`groupby_custom_column` must be provided.
Parameters
----------
df : 'pandas.DataFrame`
Contains the univariate time series / forecast
time_col : `str`
The name of the time column of the univariate time series / forecast
groupby_time_feature : `str` or None, optional
If provided, groups by a column generated by
`~greykite.common.features.timeseries_features.build_time_features_df`.
See that function for valid values.
groupby_sliding_window_size : `int` or None, optional
If provided, sequentially partitions data into groups of size
``groupby_sliding_window_size``.
groupby_custom_column : `pandas.Series` or None, optional
If provided, groups by this column value.
Should be same length as the ``df``.
Returns
-------
result : `dict`
Dictionary with two items:
* ``"df"`` : `pandas.DataFrame`
``df`` with a grouping column added.
The column can be used to group rows together.
* ``"groupby_col"`` : `str`
The name of the groupby column added to ``df``.
The column name depends on the grouping method:
- ``groupby_time_feature`` for ``groupby_time_feature``
- ``{cst.TIME_COL}_downsample`` for ``groupby_sliding_window_size``
- ``groupby_custom_column.name`` for ``groupby_custom_column``.
"""
# Resets index to support indexing in groupby_sliding_window_size
df = df.copy()
dt = pd.Series(df[time_col].values)
# Determines the groups
is_groupby_time_feature = 1 if groupby_time_feature is not None else 0
is_groupby_sliding_window_size = 1 if groupby_sliding_window_size is not None else 0
is_groupby_custom_column = 1 if groupby_custom_column is not None else 0
if is_groupby_time_feature + is_groupby_sliding_window_size + is_groupby_custom_column != 1:
raise ValueError(
"Exactly one of (groupby_time_feature, groupby_rolling_window_size, groupby_custom_column)"
"must be specified")
groups = None
if is_groupby_time_feature == 1:
# Group by a value derived from the time column
time_features = build_time_features_df(dt, conti_year_origin=min(dt).year)
groups = time_features[groupby_time_feature]
groups.name = groupby_time_feature
elif is_groupby_sliding_window_size == 1:
# Group by sliding window for evaluation over time
index_dates = split_range_into_groups(
n=df.shape[0],
group_size=groupby_sliding_window_size,
which_group_complete="last") # ensures the last group is complete (first group may be partial)
groups = dt[index_dates * groupby_sliding_window_size] # uses first date in each group as grouping value
groups.name = f"{time_col}_downsample"
elif is_groupby_custom_column == 1:
# Group by custom column
groups = groupby_custom_column
groups_col_name = groups.name if groups.name is not None else "groups"
df[groups_col_name] = groups.values
if df.index.name in df.columns:
# Removes ambiguity in case the index name is the same as the newly added column,
# (or an existing column).
df.index.name = None
return {
"df": df,
"groupby_col": groups_col_name
}
def grouping_evaluation(
df,
groupby_col,
grouping_func,
grouping_func_name):
"""Groups ``df`` and evaluates a function on each group.
The function takes a `pandas.DataFrame` and returns a scalar.
Parameters
----------
df : `pandas.DataFrame`
Input data. For example, univariate time series, or forecast result.
Contains ``groupby_col`` and columns to apply ``grouping_func`` on.
groupby_col : `str`
Column name in ``df`` to group by.
grouping_func : `callable`
Function that is applied to each group via `pandas.groupBy.apply`.
Signature (grp: `pandas.DataFrame`) -> aggregated value: `float`.
grouping_func_name : `str`
What to call the output column generated by ``grouping_func``.
Returns
-------
grouped_df : `pandas.DataFrame`
Dataframe with ``grouping_func`` evaluated on each level of ``df[groupby_col]``.
Contains two columns:
- ``groupby_col``: The groupby value
- ``grouping_func_name``: The output of ``grouping_func`` on the group
"""
grouped_df = (df
.groupby(groupby_col)
.apply(grouping_func)
.reset_index()
.rename({0: grouping_func_name}, axis=1))
return grouped_df
def flexible_grouping_evaluation(
df,
map_func_dict=None,
groupby_col=None,
agg_kwargs=None,
extend_col_names=True,
unpack_list=True,
list_names_dict=None):
"""Flexible aggregation. Generates additional columns for evaluation via
``map_func_dict``, groups by ``groupby_col``, then aggregates according
to ``agg_kwargs``.
This function calls `pandas.DataFrame.apply` and
`pandas.core.groupby.DataFrameGroupBy.agg` internally.
Parameters
----------
df : `pandas.DataFrame`
DataFrame to transform / aggregate
map_func_dict : `dict` [`str`, `callable`] or None, default None
Row-wise transformation functions to create new columns.
If None, no new columns are added.
key: new column name
value: row-wise function to apply to ``df`` to generate the column value.
Signature (row: `pandas.DataFrame`) -> transformed value: `float`.
For example::
map_func_dict = {
"residual": lambda row: row["predicted"] - row["actual"],
"squared_error": lambda row: (row["predicted"] - row["actual"])**2
}
groupby_col : `str` or None, default None
Which column to group by.
Can be in ``df`` or generated by ``map_func_dict``.
If None, no grouping or aggregation is done.
agg_kwargs : `dict` or None, default None
Passed as keyword args to `pandas.core.groupby.DataFrameGroupBy.aggregate` after creating
new columns and grouping by ``groupby_col``. Must be provided if ``groupby_col is not None``.
To fully customize output column names, pass a dictionary as shown below.
For example::
# Example 1, named aggregation to explicitly name output columns.
# Assume ``df`` contains ``abs_percent_err``, ``abs_err`` columns.
# Output columns are "MedAPE", "MAPE", "MAE", etc. in a single level index.
from functools import partial
agg_kwargs = {
# output column name: (column to aggregate, aggregation function)
"MedAPE": pd.NamedAgg(column="abs_percent_err", aggfunc=np.nanmedian),
"MAPE": pd.NamedAgg(column="abs_percent_err", aggfunc=np.nanmean),
"MAE": pd.NamedAgg(column="abs_err", aggfunc=np.nanmean),
"q95_abs_err": pd.NamedAgg(column="abs_err", aggfunc=partial(np.nanquantile, q=0.95)),
"q05_abs_err": pd.NamedAgg(column="abs_err", aggfunc=partial(np.nanquantile, q=0.05)),
}
# Example 2, multi-level aggregation using `func` parameter
# to `pandas.core.groupby.DataFrameGroupBy.aggregate`.
# Assume ``df`` contains ``y1``, ``y2`` columns.
agg_kwargs = {
"func": {
"y1": [np.nanmedian, np.nanmean],
"y2": [np.nanmedian, np.nanmax],
}
}
# `extend_col_names` controls the output column names
extend_col_names = True # output columns are "y1_nanmean", "y1_nanmedian", "y2_nanmean", "y2_nanmax"
extend_col_names = False # output columns are "nanmean", "nanmedian", "nanmean", "nanmax"
extend_col_names : `bool` or None, default True
How to flatten index after aggregation.
In some cases, the column index after aggregation is a multi-index.
This parameter controls how to flatten an index with 2 levels to 1 level.
- If None, the index is not flattened.
- If True, column name is a composite: ``{index0}_{index1}``
Use this option if index1 is not unique.
- If False, column name is simply ``{index1}``
Ignored if the ColumnIndex after aggregation has only one level (e.g.
if named aggregation is used in ``agg_kwargs``).
unpack_list : `bool`, default True
Whether to unpack (flatten) columns that contain list/tuple after aggregation,
to create one column per element of the list/tuple.
If True, ``list_names_dict`` can be used to rename the unpacked columns.
list_names_dict : `dict` [`str`, `list` [`str`]] or None, default None
If ``unpack_list`` is True, this dictionary can optionally be
used to rename the unpacked columns.
- Key = column name after aggregation, before upacking.
E.g. ``{index0}_{index1}`` or ``{index1}`` depending on ``extend_col_names``.
- Value = list of names to use for the unpacked columns. Length must match
the length of the lists contained in the column.
If a particular list/tuple column is not found in this dictionary, appends
0, 1, 2, ..., n-1 to the original column name, where n = list length.
For example, if the column contains a tuple of length 4 corresponding to
quantiles 0.1, 0.25, 0.75, 0.9, then the following would be appropriate::
aggfunc = lambda grp: partial(np.nanquantile, q=[0.1, 0.25, 0.75, 0.9])(grp).tolist()
agg_kwargs = {
"value_Q": pd.NamedAgg(column="value", aggfunc=aggfunc)
}
list_names_dict = {
# the key is the name of the unpacked column
"value_Q" : ["Q0.10", "Q0.25", "Q0.75", "Q0.90"]
}
# Output columns are "Q0.10", "Q0.25", "Q0.75", "Q0.90"
# In this example, if list_names_dict=None, the default output column names
# would be: "value_Q0", "value_Q1", "value_Q2", "value_Q3"
Returns
-------
df_transformed : `pandas.DataFrame`
df after transformation and optional aggregation.
If ``groupby_col`` is None, returns ``df`` with additional columns as the keys in ``map_func_dict``.
Otherwise, ``df`` is grouped by ``groupby_col`` and this becomes the index. Columns
are determined by ``agg_kwargs`` and ``extend_col_names``.
"""
if groupby_col and not agg_kwargs:
raise ValueError("Must specify `agg_kwargs` if grouping is requested via `groupby_col`.")
if agg_kwargs and not groupby_col:
log_message(f"`agg_kwargs` is ignored because `groupby_col` is None. "
f"Specify `groupby_col` to allow aggregation.", LoggingLevelEnum.WARNING)
df = df.copy()
if map_func_dict is not None:
for col_name, func in map_func_dict.items():
df[col_name] = df.apply(func, axis=1)
if groupby_col is not None:
groups = df.groupby(groupby_col)
with warnings.catch_warnings():
# Ignores pandas FutureWarning. Use NamedAgg in pandas 0.25.+
warnings.filterwarnings(
"ignore",
message="using a dict with renaming is deprecated",
category=FutureWarning)
df_transformed = groups.agg(**agg_kwargs)
if extend_col_names is not None and df_transformed.columns.nlevels > 1:
# Flattens multi-level column index
if extend_col_names:
# By concatenating names
df_transformed.columns = ["_".join(col).strip("_") for col in df_transformed.columns]
else:
# By using level 1 names
df_transformed.columns = list(df_transformed.columns.get_level_values(1))
if np.any(df_transformed.columns.duplicated()):
warnings.warn("Column names are not unique. Use `extend_col_names=True` "
"to uniquely identify every column.")
else:
# No grouping is requested
df_transformed = df
if unpack_list and df_transformed.shape[0] > 0:
# Identifies the columns that contain list elements
which_list_cols = df_transformed.iloc[0].apply(lambda x: isinstance(x, (list, tuple)))
list_cols = list(which_list_cols[which_list_cols].index)
for col in list_cols:
if isinstance(df_transformed[col], pd.DataFrame):
warnings.warn(f"Skipping list unpacking for `{col}`. There are multiple columns "
f"with this name. Make sure column names are unique to enable unpacking.")
continue
# Unpacks the column, creating one column for each list entry
list_df = pd.DataFrame(df_transformed[col].to_list())
n_cols = list_df.shape[1]
# Adds column names
if list_names_dict is not None and col in list_names_dict:
found_length = len(list_names_dict[col])
if found_length != n_cols:
raise ValueError(
f"list_names_dict['{col}'] has length {found_length}, "
f"but there are {n_cols} columns to name. Example row(s):\n"
f"{list_df.head(2)}")
list_df.columns = [f"{list_names_dict.get(col)[i]}" for i in range(n_cols)]
else:
list_df.columns = [f"{col}{i}" for i in range(n_cols)]
# replaces original column with new ones
list_df.index = df_transformed.index
del df_transformed[col]
df_transformed = pd.concat([df_transformed, list_df], axis=1)
if list_names_dict:
unused_names = sorted(list(set(list_names_dict.keys()) - set(list_cols)))
if len(unused_names) > 0:
warnings.warn("These names from `list_names_dict` are not used, because the "
"column (key) is not found in the dataframe after aggregation:\n"
f"{unused_names}.\nAvailable columns are:\n"
f"{list_cols}.")
return df_transformed