Doing multiple things at once and time deltas#

In the first tutorial, we applied 1 aggregation function with 1 lookperiod to 1 value at a time. In reality, you’d likely want to create features for the same value at multiple lookbehind intervals and with multiple aggregation functions. Additionally, some common features, such as the current age, require slightly different handling than what we’ve covered so far.

Luckily, timeseriesflattener makes all of this extremely simple.

This tutorial covers:

  • Multiple aggregations, lookdistances, and features at once.

  • Creating features based on a timedelta.

Multiple aggregation functions and lookperiods#

To apply multiple aggregation functions across all combinations of lookperiods, you just have to supply more aggregators and lookperiods to the list! Let’s see an example. We start off by loading the prediction times and a dataframe containing the values we wish to aggregate.

Note: The interface for creating multiple featuers at once works in exactly the same way for StaticSpec as for PredictorSpec. We will use PredictorSpec for illustration.

# Load the prediction times
from __future__ import annotations

from timeseriesflattener.testing.load_synth_data import load_synth_prediction_times

df_prediction_times = load_synth_prediction_times()
# load values for a temporal predictor
from timeseriesflattener.testing.load_synth_data import load_synth_predictor_float

df_synth_predictors = load_synth_predictor_float()
df_synth_predictors.head()
shape: (5, 3)
entity_idtimestampvalue
i64datetime[μs]f64
94761969-03-05 08:08:000.816995
46311967-04-10 22:48:004.818074
38901969-12-15 14:07:002.503789
10981965-11-19 03:53:003.515041
16261966-05-03 14:07:004.353115

Note that df_synth_predictors is not sorted by entity_id, but there are multiple values per id.

Time to make a spec.

import datetime as dt

import numpy as np
from timeseriesflattener import PredictorSpec, ValueFrame
from timeseriesflattener.aggregators import LatestAggregator, MeanAggregator


# helper function to create tuples of timedelta interval
def make_timedelta_interval(start_days: int, end_days: int) -> tuple[dt.timedelta, dt.timedelta]:
    return (dt.timedelta(days=start_days), dt.timedelta(days=end_days))


predictor_spec = PredictorSpec(
    value_frame=ValueFrame(
        init_df=df_synth_predictors,
        entity_id_col_name="entity_id",
        value_timestamp_col_name="timestamp",
    ),
    aggregators=[MeanAggregator(), LatestAggregator(timestamp_col_name="timestamp")],
    lookbehind_distances=[
        make_timedelta_interval(0, 30),
        make_timedelta_interval(30, 365),
        make_timedelta_interval(365, 730),
    ],
    fallback=np.nan,
)

Let’s break it down. We supply two aggregators, MeanAggregator and LatestAggregator which will be applied across all lookbehind distances, which we specified as 3 intervals: 0-30 days, 30-365 days, and 365-730 days. We therefore expected to make n_aggregators * n_lookbehind_distances = 2 * 3 = 6 features from this single column. Let’s flatten the data and see!

from timeseriesflattener import Flattener, PredictionTimeFrame

flattener = Flattener(
    predictiontime_frame=PredictionTimeFrame(
        init_df=df_prediction_times, entity_id_col_name="entity_id", timestamp_col_name="timestamp"
    )
)

df = flattener.aggregate_timeseries(specs=[predictor_spec]).df
Processing spec: ['value']

df.head()
shape: (5, 9)
entity_idtimestampprediction_time_uuidpred_value_within_0_to_30_days_mean_fallback_nanpred_value_within_0_to_30_days_latest_fallback_nanpred_value_within_30_to_365_days_mean_fallback_nanpred_value_within_30_to_365_days_latest_fallback_nanpred_value_within_365_to_730_days_mean_fallback_nanpred_value_within_365_to_730_days_latest_fallback_nan
i64datetime[μs]strf64f64f64f64f64f64
98521965-01-02 09:35:00"9852-1965-01-02 09:35:00.00000…NaNNaNNaNNaNNaNNaN
14671965-01-02 10:05:00"1467-1965-01-02 10:05:00.00000…NaNNaNNaNNaNNaNNaN
11251965-01-02 12:55:00"1125-1965-01-02 12:55:00.00000…NaNNaNNaNNaNNaNNaN
6491965-01-02 14:01:00"649-1965-01-02 14:01:00.000000"NaNNaNNaNNaNNaNNaN
20701965-01-03 08:01:00"2070-1965-01-03 08:01:00.00000…NaNNaNNaNNaNNaNNaN

And that’s what we get!

Multiple values from the same dataframe#

Sometimes, you might have values measured at the same time which you want to aggregate in the same manner. In timeseriesflattener this is handled by simply having multiple columns in the dataframe you pass to ValueFrame. Let’s see an example.

import polars as pl

# add a new column to df_synth_predictors to simulate a new predictor measured at the same time
df_synth_predictors = df_synth_predictors.with_columns(
    new_predictor=pl.Series(np.random.rand(df_synth_predictors.shape[0]))
)
df_synth_predictors.head()
shape: (5, 4)
entity_idtimestampvaluenew_predictor
i64datetime[μs]f64f64
94761969-03-05 08:08:000.8169950.515646
46311967-04-10 22:48:004.8180740.804902
38901969-12-15 14:07:002.5037890.372697
10981965-11-19 03:53:003.5150410.460792
16261966-05-03 14:07:004.3531150.873847

We make a PredictorSpec similar to above. Let’s try some new aggregators.

from timeseriesflattener.aggregators import MinAggregator, SlopeAggregator

# create a new predictor spec
predictor_spec = PredictorSpec(
    value_frame=ValueFrame(
        init_df=df_synth_predictors,
        entity_id_col_name="entity_id",
        value_timestamp_col_name="timestamp",
    ),
    aggregators=[MinAggregator(), SlopeAggregator(timestamp_col_name="timestamp")],
    lookbehind_distances=[
        make_timedelta_interval(0, 30),
        make_timedelta_interval(30, 365),
        make_timedelta_interval(365, 730),
    ],
    fallback=np.nan,
)

Now, all allgregators will be applied to each predictor column for each lookbehind distance. Therefore, we expect to make n_predictors * n_aggregators * n_lookbehind_distances = 2 * 2 * 3 = 12 features with this spec.

df = flattener.aggregate_timeseries(specs=[predictor_spec]).df

df.head()
Processing spec: ['value', 'new_predictor']

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[9], line 1
----> 1 df = flattener.aggregate_timeseries(specs=[predictor_spec]).df
      3 df.head()

File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/main.py:160, in Flattener.aggregate_timeseries(self, specs, step_size)
    154 if len(errors) > 0:
    155     raise SpecError(
    156         "Conflicting specs."
    157         + "".join(Iter(errors).map(lambda error: f"  \n - {error.description}").to_list())
    158     )
--> 160 dfs = self._process_specs(specs=specs, step_size=step_size)
    162 feature_dfs = horizontally_concatenate_dfs(
    163     dfs,
    164     prediction_time_uuid_col_name=self.predictiontime_frame.prediction_time_uuid_col_name,
    165 )
    167 return AggregatedFrame(
    168     df=horizontally_concatenate_dfs(
    169         [self.predictiontime_frame.df, feature_dfs],
   (...)
    174     timestamp_col_name=self.predictiontime_frame.timestamp_col_name,
    175 )

File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/main.py:117, in Flattener._process_specs(self, specs, step_size)
    115     for spec in track(specs, description="Processing specs..."):
    116         print(f"Processing spec: {spec.value_frame.value_col_names}")
--> 117         processed_spec = process_spec(
    118             predictiontime_frame=self.predictiontime_frame, spec=spec, step_size=step_size
    119         )
    120         dfs.append(processed_spec.df)
    121 else:

File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/__init__.py:30, in process_spec(spec, predictiontime_frame, step_size)
     27 if isinstance(spec, StaticSpec):
     28     return process_static_spec(spec, predictiontime_frame)
---> 30 return process_temporal_spec(
     31     spec=spec, predictiontime_frame=predictiontime_frame, step_size=step_size
     32 )

File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:250, in process_temporal_spec(spec, predictiontime_frame, step_size)
    244 def process_temporal_spec(
    245     spec: TemporalSpec,
    246     predictiontime_frame: PredictionTimeFrame,
    247     step_size: dt.timedelta | None = None,
    248 ) -> ProcessedFrame:
    249     if step_size is None:
--> 250         aggregated_value_frames = _flatten_temporal_spec(
    251             spec, predictiontime_frame, spec.value_frame
    252         )
    254         result_frame = horizontally_concatenate_dfs(
    255             dfs=aggregated_value_frames,
    256             prediction_time_uuid_col_name=predictiontime_frame.prediction_time_uuid_col_name,
    257         )
    259     else:

File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:223, in _flatten_temporal_spec(spec, predictiontime_frame, value_frame)
    218 def _flatten_temporal_spec(
    219     spec: TemporalSpec, predictiontime_frame: PredictionTimeFrame, value_frame: ValueFrame
    220 ) -> list[pl.DataFrame]:
    221     return (
    222         Iter(spec.normalised_lookperiod)
--> 223         .map(
    224             lambda lookperiod: _slice_and_aggregate_spec(
    225                 timedelta_frame=_get_timedelta_frame(
    226                     predictiontime_frame=predictiontime_frame, value_frame=value_frame
    227                 ),
    228                 masked_aggregator=lambda sliced_frame: _aggregate_masked_frame(
    229                     aggregators=spec.aggregators, fallback=spec.fallback, masked_frame=sliced_frame
    230                 ),
    231                 time_masker=lambda timedelta_frame: _mask_outside_lookperiod(
    232                     timedelta_frame=timedelta_frame,
    233                     lookperiod=lookperiod,
    234                     column_prefix=spec.column_prefix,
    235                     value_col_names=spec.value_frame.value_col_names,
    236                 ),
    237             )
    238         )
    239         .flatten()
    240         .to_list()
    241     )

File ~/.local/lib/python3.10/site-packages/iterpy/iter.py:94, in Iter.map(self, func)
     91 def map(  # Ignore that it's shadowing a python built-in
     92     self, func: Callable[[T], S]
     93 ) -> Iter[S]:
---> 94     return Iter(map(func, self._iterator))

File ~/.local/lib/python3.10/site-packages/iterpy/iter.py:21, in Iter.__init__(self, iterable)
     20 def __init__(self, iterable: Iterable[T]) -> None:
---> 21     self._nonconsumable_iterable: list[T] = list(iterable)
     22     self._current_index: int = 0

File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:224, in _flatten_temporal_spec.<locals>.<lambda>(lookperiod)
    218 def _flatten_temporal_spec(
    219     spec: TemporalSpec, predictiontime_frame: PredictionTimeFrame, value_frame: ValueFrame
    220 ) -> list[pl.DataFrame]:
    221     return (
    222         Iter(spec.normalised_lookperiod)
    223         .map(
--> 224             lambda lookperiod: _slice_and_aggregate_spec(
    225                 timedelta_frame=_get_timedelta_frame(
    226                     predictiontime_frame=predictiontime_frame, value_frame=value_frame
    227                 ),
    228                 masked_aggregator=lambda sliced_frame: _aggregate_masked_frame(
    229                     aggregators=spec.aggregators, fallback=spec.fallback, masked_frame=sliced_frame
    230                 ),
    231                 time_masker=lambda timedelta_frame: _mask_outside_lookperiod(
    232                     timedelta_frame=timedelta_frame,
    233                     lookperiod=lookperiod,
    234                     column_prefix=spec.column_prefix,
    235                     value_col_names=spec.value_frame.value_col_names,
    236                 ),
    237             )
    238         )
    239         .flatten()
    240         .to_list()
    241     )

File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:148, in _slice_and_aggregate_spec(timedelta_frame, masked_aggregator, time_masker)
    144 def _slice_and_aggregate_spec(
    145     timedelta_frame: TimeDeltaFrame, masked_aggregator: MaskedAggregator, time_masker: TimeMasker
    146 ) -> pl.DataFrame:
    147     sliced_frame = time_masker(timedelta_frame)
--> 148     return masked_aggregator(sliced_frame)

File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:228, in _flatten_temporal_spec.<locals>.<lambda>.<locals>.<lambda>(sliced_frame)
    218 def _flatten_temporal_spec(
    219     spec: TemporalSpec, predictiontime_frame: PredictionTimeFrame, value_frame: ValueFrame
    220 ) -> list[pl.DataFrame]:
    221     return (
    222         Iter(spec.normalised_lookperiod)
    223         .map(
    224             lambda lookperiod: _slice_and_aggregate_spec(
    225                 timedelta_frame=_get_timedelta_frame(
    226                     predictiontime_frame=predictiontime_frame, value_frame=value_frame
    227                 ),
--> 228                 masked_aggregator=lambda sliced_frame: _aggregate_masked_frame(
    229                     aggregators=spec.aggregators, fallback=spec.fallback, masked_frame=sliced_frame
    230                 ),
    231                 time_masker=lambda timedelta_frame: _mask_outside_lookperiod(
    232                     timedelta_frame=timedelta_frame,
    233                     lookperiod=lookperiod,
    234                     column_prefix=spec.column_prefix,
    235                     value_col_names=spec.value_frame.value_col_names,
    236                 ),
    237             )
    238         )
    239         .flatten()
    240         .to_list()
    241     )

File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:134, in _aggregate_masked_frame(masked_frame, aggregators, fallback)
    122 value_columns = (
    123     Iter(grouped_frame.columns)
    124     .filter(
   (...)
    129     .map(lambda old_name: (old_name, f"{old_name}_fallback_{fallback}"))
    130 )
    131 rename_mapping = dict(value_columns)
    133 with_fallback = grouped_frame.with_columns(
--> 134     cs.contains(*masked_frame.value_col_names).fill_null(fallback)
    135 ).rename(rename_mapping)
    137 return with_fallback

TypeError: contains() takes 1 positional argument but 2 were given

TimeDelta features#

An example of a commonly used feature that requires slightly different handling than what we’ve seen so far is age. Calculating the age at the prediction time requires us to calculate a time delta between the prediction time and a dataframe containing birthdate timestamps. To do this, we can use the TimeDeltaSpec. First, we load a dataframe containing the date of birth for each entity in our dataset.

from timeseriesflattener.testing.load_synth_data import load_synth_birthdays

df_birthdays = load_synth_birthdays()
df_birthdays.head()
shape: (5, 2)
entity_idbirthday
i64datetime[μs]
90451932-10-24 03:16:00
55321920-12-09 09:41:00
22421917-03-20 17:00:00
7891930-02-15 06:51:00
97151926-08-18 08:35:00

df_birthdays is a dataframe containing a single value for each entity_id with is their date of birth. Time to make a spec.

from timeseriesflattener import TimeDeltaSpec, TimestampValueFrame

age_spec = TimeDeltaSpec(
    init_frame=TimestampValueFrame(
        init_df=df_birthdays, entity_id_col_name="entity_id", value_timestamp_col_name="birthday"
    ),
    fallback=np.nan,
    output_name="age",
    time_format="years",  # can be ["seconds", "minutes", "hours", "days", "years"]
)

To make the TimeDeltaSpec, we define a TimestampValueFrame where we specify the column containing the entity id, and column containing the timestamps. Fallback is used to set the value for entities without an entry in the TimestampValueFrame, output_name determines the name of the output column, and time_format specifies which format the output should take. Time to make features!

df = flattener.aggregate_timeseries(specs=[age_spec]).df

df.head()
Processing spec: ['age']


shape: (5, 4)
entity_idtimestampprediction_time_uuidpred_age_years_fallback_nan
i64datetime[μs]strf64
99031968-05-09 21:24:00"9903-1968-05-0…39.154004
74651966-05-24 01:23:00"7465-1966-05-2…47.874059
64471967-09-25 18:08:00"6447-1967-09-2…28.52293
21211966-05-05 20:52:00"2121-1966-05-0…56.347707
49271968-06-30 12:13:00"4927-1968-06-3…44.70089

Let’s see the values for a random entity to make sure they differ by the timestamp of the prediction time.

import polars as pl

df.filter(pl.col("entity_id") == 9903)
shape: (2, 4)
entity_idtimestampprediction_time_uuidpred_age_years_fallback_nan
i64datetime[μs]strf64
99031968-05-09 21:24:00"9903-1968-05-0…39.154004
99031965-11-14 00:33:00"9903-1965-11-1…36.670773