Doing multiple things at once and time deltas#
In the first tutorial, we applied 1 aggregation function with 1 lookperiod to 1 value at a time. In reality, you’d likely want to create features for the same value at multiple lookbehind intervals and with multiple aggregation functions. Additionally, some common features, such as the current age, require slightly different handling than what we’ve covered so far.
Luckily, timeseriesflattener
makes all of this extremely simple.
This tutorial covers:
Multiple aggregations, lookdistances, and features at once.
Creating features based on a timedelta.
Multiple aggregation functions and lookperiods#
To apply multiple aggregation functions across all combinations of lookperiods, you just have to supply more aggregators and lookperiods to the list! Let’s see an example. We start off by loading the prediction times and a dataframe containing the values we wish to aggregate.
Note: The interface for creating multiple featuers at once works in exactly the same way for StaticSpec
as for PredictorSpec
. We will use PredictorSpec
for illustration.
# Load the prediction times
from __future__ import annotations
from timeseriesflattener.testing.load_synth_data import load_synth_prediction_times
df_prediction_times = load_synth_prediction_times()
# load values for a temporal predictor
from timeseriesflattener.testing.load_synth_data import load_synth_predictor_float
df_synth_predictors = load_synth_predictor_float()
df_synth_predictors.head()
entity_id | timestamp | value |
---|---|---|
i64 | datetime[μs] | f64 |
9476 | 1969-03-05 08:08:00 | 0.816995 |
4631 | 1967-04-10 22:48:00 | 4.818074 |
3890 | 1969-12-15 14:07:00 | 2.503789 |
1098 | 1965-11-19 03:53:00 | 3.515041 |
1626 | 1966-05-03 14:07:00 | 4.353115 |
Note that df_synth_predictors
is not sorted by entity_id
, but there are multiple values per id.
Time to make a spec.
import datetime as dt
import numpy as np
from timeseriesflattener import PredictorSpec, ValueFrame
from timeseriesflattener.aggregators import LatestAggregator, MeanAggregator
# helper function to create tuples of timedelta interval
def make_timedelta_interval(start_days: int, end_days: int) -> tuple[dt.timedelta, dt.timedelta]:
return (dt.timedelta(days=start_days), dt.timedelta(days=end_days))
predictor_spec = PredictorSpec(
value_frame=ValueFrame(
init_df=df_synth_predictors,
entity_id_col_name="entity_id",
value_timestamp_col_name="timestamp",
),
aggregators=[MeanAggregator(), LatestAggregator(timestamp_col_name="timestamp")],
lookbehind_distances=[
make_timedelta_interval(0, 30),
make_timedelta_interval(30, 365),
make_timedelta_interval(365, 730),
],
fallback=np.nan,
)
Let’s break it down. We supply two aggregators, MeanAggregator
and LatestAggregator
which will be applied across all lookbehind distances, which we specified as 3 intervals: 0-30 days, 30-365 days, and 365-730 days. We therefore expected to make n_aggregators * n_lookbehind_distances = 2 * 3 = 6
features from this single column. Let’s flatten the data and see!
from timeseriesflattener import Flattener, PredictionTimeFrame
flattener = Flattener(
predictiontime_frame=PredictionTimeFrame(
init_df=df_prediction_times, entity_id_col_name="entity_id", timestamp_col_name="timestamp"
)
)
df = flattener.aggregate_timeseries(specs=[predictor_spec]).df
Processing spec: ['value']
df.head()
entity_id | timestamp | prediction_time_uuid | pred_value_within_0_to_30_days_mean_fallback_nan | pred_value_within_0_to_30_days_latest_fallback_nan | pred_value_within_30_to_365_days_mean_fallback_nan | pred_value_within_30_to_365_days_latest_fallback_nan | pred_value_within_365_to_730_days_mean_fallback_nan | pred_value_within_365_to_730_days_latest_fallback_nan |
---|---|---|---|---|---|---|---|---|
i64 | datetime[μs] | str | f64 | f64 | f64 | f64 | f64 | f64 |
9852 | 1965-01-02 09:35:00 | "9852-1965-01-02 09:35:00.00000… | NaN | NaN | NaN | NaN | NaN | NaN |
1467 | 1965-01-02 10:05:00 | "1467-1965-01-02 10:05:00.00000… | NaN | NaN | NaN | NaN | NaN | NaN |
1125 | 1965-01-02 12:55:00 | "1125-1965-01-02 12:55:00.00000… | NaN | NaN | NaN | NaN | NaN | NaN |
649 | 1965-01-02 14:01:00 | "649-1965-01-02 14:01:00.000000" | NaN | NaN | NaN | NaN | NaN | NaN |
2070 | 1965-01-03 08:01:00 | "2070-1965-01-03 08:01:00.00000… | NaN | NaN | NaN | NaN | NaN | NaN |
And that’s what we get!
Multiple values from the same dataframe#
Sometimes, you might have values measured at the same time which you want to aggregate in the same manner. In timeseriesflattener
this is handled by simply having multiple columns in the dataframe you pass to ValueFrame
. Let’s see an example.
import polars as pl
# add a new column to df_synth_predictors to simulate a new predictor measured at the same time
df_synth_predictors = df_synth_predictors.with_columns(
new_predictor=pl.Series(np.random.rand(df_synth_predictors.shape[0]))
)
df_synth_predictors.head()
entity_id | timestamp | value | new_predictor |
---|---|---|---|
i64 | datetime[μs] | f64 | f64 |
9476 | 1969-03-05 08:08:00 | 0.816995 | 0.432937 |
4631 | 1967-04-10 22:48:00 | 4.818074 | 0.592283 |
3890 | 1969-12-15 14:07:00 | 2.503789 | 0.515706 |
1098 | 1965-11-19 03:53:00 | 3.515041 | 0.94408 |
1626 | 1966-05-03 14:07:00 | 4.353115 | 0.658508 |
We make a PredictorSpec
similar to above. Let’s try some new aggregators.
from timeseriesflattener.aggregators import MinAggregator, SlopeAggregator
# create a new predictor spec
predictor_spec = PredictorSpec(
value_frame=ValueFrame(
init_df=df_synth_predictors,
entity_id_col_name="entity_id",
value_timestamp_col_name="timestamp",
),
aggregators=[MinAggregator(), SlopeAggregator(timestamp_col_name="timestamp")],
lookbehind_distances=[
make_timedelta_interval(0, 30),
make_timedelta_interval(30, 365),
make_timedelta_interval(365, 730),
],
fallback=np.nan,
)
Now, all allgregators will be applied to each predictor column for each lookbehind distance. Therefore, we expect to make n_predictors * n_aggregators * n_lookbehind_distances = 2 * 2 * 3 = 12
features with this spec.
df = flattener.aggregate_timeseries(specs=[predictor_spec]).df
df.head()
Processing spec: ['value', 'new_predictor']
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[9], line 1
----> 1 df = flattener.aggregate_timeseries(specs=[predictor_spec]).df
3 df.head()
File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/main.py:160, in Flattener.aggregate_timeseries(self, specs, step_size)
154 if len(errors) > 0:
155 raise SpecError(
156 "Conflicting specs."
157 + "".join(Iter(errors).map(lambda error: f" \n - {error.description}").to_list())
158 )
--> 160 dfs = self._process_specs(specs=specs, step_size=step_size)
162 feature_dfs = horizontally_concatenate_dfs(
163 dfs,
164 prediction_time_uuid_col_name=self.predictiontime_frame.prediction_time_uuid_col_name,
165 )
167 return AggregatedFrame(
168 df=horizontally_concatenate_dfs(
169 [self.predictiontime_frame.df, feature_dfs],
(...)
174 timestamp_col_name=self.predictiontime_frame.timestamp_col_name,
175 )
File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/main.py:117, in Flattener._process_specs(self, specs, step_size)
115 for spec in track(specs, description="Processing specs..."):
116 print(f"Processing spec: {spec.value_frame.value_col_names}")
--> 117 processed_spec = process_spec(
118 predictiontime_frame=self.predictiontime_frame, spec=spec, step_size=step_size
119 )
120 dfs.append(processed_spec.df)
121 else:
File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/__init__.py:30, in process_spec(spec, predictiontime_frame, step_size)
27 if isinstance(spec, StaticSpec):
28 return process_static_spec(spec, predictiontime_frame)
---> 30 return process_temporal_spec(
31 spec=spec, predictiontime_frame=predictiontime_frame, step_size=step_size
32 )
File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:250, in process_temporal_spec(spec, predictiontime_frame, step_size)
244 def process_temporal_spec(
245 spec: TemporalSpec,
246 predictiontime_frame: PredictionTimeFrame,
247 step_size: dt.timedelta | None = None,
248 ) -> ProcessedFrame:
249 if step_size is None:
--> 250 aggregated_value_frames = _flatten_temporal_spec(
251 spec, predictiontime_frame, spec.value_frame
252 )
254 result_frame = horizontally_concatenate_dfs(
255 dfs=aggregated_value_frames,
256 prediction_time_uuid_col_name=predictiontime_frame.prediction_time_uuid_col_name,
257 )
259 else:
File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:223, in _flatten_temporal_spec(spec, predictiontime_frame, value_frame)
218 def _flatten_temporal_spec(
219 spec: TemporalSpec, predictiontime_frame: PredictionTimeFrame, value_frame: ValueFrame
220 ) -> list[pl.DataFrame]:
221 return (
222 Iter(spec.normalised_lookperiod)
--> 223 .map(
224 lambda lookperiod: _slice_and_aggregate_spec(
225 timedelta_frame=_get_timedelta_frame(
226 predictiontime_frame=predictiontime_frame, value_frame=value_frame
227 ),
228 masked_aggregator=lambda sliced_frame: _aggregate_masked_frame(
229 aggregators=spec.aggregators, fallback=spec.fallback, masked_frame=sliced_frame
230 ),
231 time_masker=lambda timedelta_frame: _mask_outside_lookperiod(
232 timedelta_frame=timedelta_frame,
233 lookperiod=lookperiod,
234 column_prefix=spec.column_prefix,
235 value_col_names=spec.value_frame.value_col_names,
236 ),
237 )
238 )
239 .flatten()
240 .to_list()
241 )
File ~/.local/lib/python3.10/site-packages/iterpy/iter.py:94, in Iter.map(self, func)
91 def map( # Ignore that it's shadowing a python built-in
92 self, func: Callable[[T], S]
93 ) -> Iter[S]:
---> 94 return Iter(map(func, self._iterator))
File ~/.local/lib/python3.10/site-packages/iterpy/iter.py:21, in Iter.__init__(self, iterable)
20 def __init__(self, iterable: Iterable[T]) -> None:
---> 21 self._nonconsumable_iterable: list[T] = list(iterable)
22 self._current_index: int = 0
File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:224, in _flatten_temporal_spec.<locals>.<lambda>(lookperiod)
218 def _flatten_temporal_spec(
219 spec: TemporalSpec, predictiontime_frame: PredictionTimeFrame, value_frame: ValueFrame
220 ) -> list[pl.DataFrame]:
221 return (
222 Iter(spec.normalised_lookperiod)
223 .map(
--> 224 lambda lookperiod: _slice_and_aggregate_spec(
225 timedelta_frame=_get_timedelta_frame(
226 predictiontime_frame=predictiontime_frame, value_frame=value_frame
227 ),
228 masked_aggregator=lambda sliced_frame: _aggregate_masked_frame(
229 aggregators=spec.aggregators, fallback=spec.fallback, masked_frame=sliced_frame
230 ),
231 time_masker=lambda timedelta_frame: _mask_outside_lookperiod(
232 timedelta_frame=timedelta_frame,
233 lookperiod=lookperiod,
234 column_prefix=spec.column_prefix,
235 value_col_names=spec.value_frame.value_col_names,
236 ),
237 )
238 )
239 .flatten()
240 .to_list()
241 )
File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:148, in _slice_and_aggregate_spec(timedelta_frame, masked_aggregator, time_masker)
144 def _slice_and_aggregate_spec(
145 timedelta_frame: TimeDeltaFrame, masked_aggregator: MaskedAggregator, time_masker: TimeMasker
146 ) -> pl.DataFrame:
147 sliced_frame = time_masker(timedelta_frame)
--> 148 return masked_aggregator(sliced_frame)
File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:228, in _flatten_temporal_spec.<locals>.<lambda>.<locals>.<lambda>(sliced_frame)
218 def _flatten_temporal_spec(
219 spec: TemporalSpec, predictiontime_frame: PredictionTimeFrame, value_frame: ValueFrame
220 ) -> list[pl.DataFrame]:
221 return (
222 Iter(spec.normalised_lookperiod)
223 .map(
224 lambda lookperiod: _slice_and_aggregate_spec(
225 timedelta_frame=_get_timedelta_frame(
226 predictiontime_frame=predictiontime_frame, value_frame=value_frame
227 ),
--> 228 masked_aggregator=lambda sliced_frame: _aggregate_masked_frame(
229 aggregators=spec.aggregators, fallback=spec.fallback, masked_frame=sliced_frame
230 ),
231 time_masker=lambda timedelta_frame: _mask_outside_lookperiod(
232 timedelta_frame=timedelta_frame,
233 lookperiod=lookperiod,
234 column_prefix=spec.column_prefix,
235 value_col_names=spec.value_frame.value_col_names,
236 ),
237 )
238 )
239 .flatten()
240 .to_list()
241 )
File ~/work/timeseriesflattener/timeseriesflattener/src/timeseriesflattener/processors/temporal.py:134, in _aggregate_masked_frame(masked_frame, aggregators, fallback)
122 value_columns = (
123 Iter(grouped_frame.columns)
124 .filter(
(...)
129 .map(lambda old_name: (old_name, f"{old_name}_fallback_{fallback}"))
130 )
131 rename_mapping = dict(value_columns)
133 with_fallback = grouped_frame.with_columns(
--> 134 cs.contains(*masked_frame.value_col_names).fill_null(fallback)
135 ).rename(rename_mapping)
137 return with_fallback
TypeError: contains() takes 1 positional argument but 2 were given
TimeDelta features#
An example of a commonly used feature that requires slightly different handling than what we’ve seen so far is age. Calculating the age at the prediction time requires us to calculate a time delta between the prediction time and a dataframe containing birthdate timestamps. To do this, we can use the TimeDeltaSpec
. First, we load a dataframe containing the date of birth for each entity in our dataset.
from timeseriesflattener.testing.load_synth_data import load_synth_birthdays
df_birthdays = load_synth_birthdays()
df_birthdays.head()
entity_id | birthday |
---|---|
i64 | datetime[μs] |
9045 | 1932-10-24 03:16:00 |
5532 | 1920-12-09 09:41:00 |
2242 | 1917-03-20 17:00:00 |
789 | 1930-02-15 06:51:00 |
9715 | 1926-08-18 08:35:00 |
df_birthdays
is a dataframe containing a single value for each entity_id
with is their date of birth. Time to make a spec.
from timeseriesflattener import TimeDeltaSpec, TimestampValueFrame
age_spec = TimeDeltaSpec(
init_frame=TimestampValueFrame(
init_df=df_birthdays, entity_id_col_name="entity_id", value_timestamp_col_name="birthday"
),
fallback=np.nan,
output_name="age",
time_format="years", # can be ["seconds", "minutes", "hours", "days", "years"]
)
To make the TimeDeltaSpec
, we define a TimestampValueFrame
where we specify the column containing the entity id, and column containing the timestamps. Fallback is used to set the value for entities without an entry in the TimestampValueFrame
, output_name
determines the name of the output column, and time_format
specifies which format the output should take. Time to make features!
df = flattener.aggregate_timeseries(specs=[age_spec]).df
df.head()
Processing spec: ['age']
entity_id | timestamp | prediction_time_uuid | pred_age_years_fallback_nan |
---|---|---|---|
i64 | datetime[μs] | str | f64 |
9903 | 1968-05-09 21:24:00 | "9903-1968-05-0… | 39.154004 |
7465 | 1966-05-24 01:23:00 | "7465-1966-05-2… | 47.874059 |
6447 | 1967-09-25 18:08:00 | "6447-1967-09-2… | 28.52293 |
2121 | 1966-05-05 20:52:00 | "2121-1966-05-0… | 56.347707 |
4927 | 1968-06-30 12:13:00 | "4927-1968-06-3… | 44.70089 |
Let’s see the values for a random entity to make sure they differ by the timestamp of the prediction time.
import polars as pl
df.filter(pl.col("entity_id") == 9903)
entity_id | timestamp | prediction_time_uuid | pred_age_years_fallback_nan |
---|---|---|---|
i64 | datetime[μs] | str | f64 |
9903 | 1968-05-09 21:24:00 | "9903-1968-05-0… | 39.154004 |
9903 | 1965-11-14 00:33:00 | "9903-1965-11-1… | 36.670773 |