# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from __future__ import annotations from collections import OrderedDict from dataclasses import dataclass from decimal import Decimal from enum import Enum from numbers import Number from typing import Iterator, List, Union @dataclass class TimeSeriesItem: time: Number value: Number class StitchBoundaryCondition(str, Enum): MEAN = "mean" LEFT = "left" RIGHT = "right" class TimeSeries: def __init__(self): self._series = OrderedDict() self._sorted = True self._largest_time = -1 def put( self, time: Number, value: Number, ) -> TimeSeries: """Puts a value to the time series at the given time. A value passed to an existing time will overwrite the current value. Args: time (Number): The time of the value. value (Number): The value to add to the time series. Returns: TimeSeries: returns self (to allow for chaining). """ if time in self._series: self._series[time] = TimeSeriesItem(time, value) elif time > self._largest_time: self._series[time] = TimeSeriesItem(time, value) self._largest_time = time else: self._series[time] = TimeSeriesItem(time, value) self._sorted = False return self def times(self) -> List[Number]: """Returns the times in the time series. Returns: List[Number]: The times in the time series. """ self._ensure_sorted() return [item.time for item in self._series.values()] def values(self) -> List[Number]: """Returns the values in the time series. Returns: List[Number]: The values in the time series. """ self._ensure_sorted() return [item.value for item in self._series.values()] def __iter__(self) -> Iterator: self._ensure_sorted() return self._series.values().__iter__() def __len__(self): return self._series.values().__len__() def _ensure_sorted(self) -> None: if not self._sorted: self._series = OrderedDict(sorted(self._series.items())) self._sorted = True @staticmethod def from_lists(times: List[float], values: List[float]) -> TimeSeries: """Create a time series from the list of time and value points Args: times (List[float]): list of time points values (List[float]): list of value points Returns: TimeSeries: time series constructed from lists """ if len(times) != len(values): raise ValueError( f"The lengths of the times({len(times)})\ and values({len(values)}) lists are not equal." ) ts = TimeSeries() for t, v in zip(times, values): ts.put(t, v) return ts @staticmethod def constant_like(times: Union[List[float], TimeSeries], constant: float = 0.0) -> TimeSeries: """Obtain a constant time series given the list of time points and the constant values Args: times (Union[List[float], TimeSeries]): list of time points constant (float): constant value Returns: TimeSeries: A constant time series """ ts = TimeSeries() for t in times: ts.put(t, constant) return ts def concatenate(self, other: TimeSeries) -> TimeSeries: """Concatenates two time series ino to a single time series. The time points in the final time series are obtained by concatenating two lists of time points from the first and the second time series. Similarly, the values in the final time series is a concatenated list of the values in the first and the second time series. Args: other (TimeSeries): The second time series to be concatenated Notes: Keeps the time points in both time series unchanged. Assumes that the time points in the first TimeSeries are at earler times then the time points in the second TimeSeries. Returns: TimeSeries: The concatenated time series. Example: :: time_series_1 = TimeSeries.from_lists(times=[0, 0.1], values=[1, 2]) time_series_2 = TimeSeries.from_lists(times=[0.2, 0.3], values=[4, 5]) concat_ts = time_series_1.concatenate(time_series_2) Result: concat_ts.times() = [0, 0.1, 0.2, 0.3] concat_ts.values() = [1, 2, 4, 5] """ not_empty_ts = len(other.times()) * len(self.times()) != 0 if not_empty_ts and min(other.times()) <= max(self.times()): raise ValueError( "The time points in the first TimeSeries must be strictly smaller \ then the time points in the second TimeSeries." ) new_time_series = TimeSeries() new_times = self.times() + other.times() new_values = self.values() + other.values() for t, v in zip(new_times, new_values): new_time_series.put(t, v) return new_time_series def stitch( self, other: TimeSeries, boundary: StitchBoundaryCondition = StitchBoundaryCondition.MEAN ) -> TimeSeries: """Stitch two time series to a single time series. The time points of the second time series are shifted such that the first time point of the second series coincides with the last time point of the first series. The boundary point value is handled according to StitchBoundaryCondition argument value. Args: other (TimeSeries): The second time series to be stitched with. boundary (StitchBoundaryCondition): {"mean", "left", "right"}. Boundary point handler. Possible options are - "mean" - take the average of the boundary value points of the first and the second time series. - "left" - use the last value from the left time series as the boundary point. - "right" - use the first value from the right time series as the boundary point. Returns: TimeSeries: The stitched time series. Example (StitchBoundaryCondition.MEAN): :: time_series_1 = TimeSeries.from_lists(times=[0, 0.1], values=[1, 2]) time_series_2 = TimeSeries.from_lists(times=[0.2, 0.4], values=[4, 5]) stitch_ts = time_series_1.stitch(time_series_2, boundary=StitchBoundaryCondition.MEAN) Result: stitch_ts.times() = [0, 0.1, 0.3] stitch_ts.values() = [1, 3, 5] Example (StitchBoundaryCondition.LEFT): :: stitch_ts = time_series_1.stitch(time_series_2, boundary=StitchBoundaryCondition.LEFT) Result: stitch_ts.times() = [0, 0.1, 0.3] stitch_ts.values() = [1, 2, 5] Example (StitchBoundaryCondition.RIGHT): :: stitch_ts = time_series_1.stitch(time_series_2, boundary=StitchBoundaryCondition.RIGHT) Result: stitch_ts.times() = [0, 0.1, 0.3] stitch_ts.values() = [1, 4, 5] """ if len(self.times()) == 0: return TimeSeries.from_lists(times=other.times(), values=other.values()) if len(other.times()) == 0: return TimeSeries.from_lists(times=self.times(), values=self.values()) new_time_series = TimeSeries() left_t, right_t = self.times()[-1], other.times()[0] other_times = [t - right_t + left_t for t in other.times()] new_times = self.times() + other_times[1:] left, right = self.values()[-1], other.values()[0] if boundary == StitchBoundaryCondition.MEAN: bndry_val = 0.5 * sum([left, right]) elif boundary == StitchBoundaryCondition.LEFT: bndry_val = left elif boundary == StitchBoundaryCondition.RIGHT: bndry_val = right else: raise ValueError( f"Boundary handler value {boundary} is not allowed. \ Possible options are: 'mean', 'left', 'right'." ) new_values = self.values()[:-1] + [bndry_val] + other.values()[1:] for t, v in zip(new_times, new_values): new_time_series.put(t, v) return new_time_series def discretize(self, time_resolution: Decimal, value_resolution: Decimal) -> TimeSeries: """Creates a discretized version of the time series, rounding all times and values to the closest multiple of the corresponding resolution. Args: time_resolution (Decimal): Time resolution value_resolution (Decimal): Value resolution Returns: TimeSeries: A new discretized time series. """ discretized_ts = TimeSeries() for item in self: discretized_ts.put( time=round(Decimal(item.time) / time_resolution) * time_resolution, value=round(Decimal(item.value) / value_resolution) * value_resolution, ) return discretized_ts @staticmethod def periodic_signal(times: List[float], values: List[float], num_repeat: int = 1) -> TimeSeries: """Create a periodic time series by repeating the same block multiple times. Args: times (List[float]): List of time points in a single block values (List[float]): Values for the time series in a single block num_repeat (int): Number of block repeatitions Returns: TimeSeries: A new periodic time series. """ if not (values[0] == values[-1]): raise ValueError("The first and last values must coinscide to guarantee periodicity") new_time_series = TimeSeries() repeating_block = TimeSeries.from_lists(times=times, values=values) for index in range(num_repeat): new_time_series = new_time_series.stitch(repeating_block) return new_time_series # TODO: Verify if this belongs here. def _all_close(first: TimeSeries, second: TimeSeries, tolerance: Number = 1e-7) -> bool: """ Returns True if the times and values in two time series are all within (less than) a given tolerance range. The values in the TimeSeries must be numbers that can be subtracted from each-other, support getting the absolute value, and can be compared against the tolerance. Args: first (TimeSeries): A time series. second (TimeSeries): A time series. tolerance (Number): The tolerance value. Returns: bool: True if the times and values in two time series are all within (less than) a given tolerance range. If the time series are not the same size, this function will return False. """ if len(first) != len(second): return False if len(first) == 0: return True first_times = first.times() second_times = second.times() first_values = first.values() second_values = second.values() for index in range(len(first)): if abs(first_times[index] - second_times[index]) >= tolerance: return False if abs(first_values[index] - second_values[index]) >= tolerance: return False return True