compare two time series (simulation results)

  • Last Update :
  • Techknowledgy :

Therefore, what you could do is to perform a point by point check whether the two functions are close (within some tolerance) and allow at most num_exceptions points to be off.

import numpy as np

def is_close_except(arr1, arr2, num_exceptions = 0.01, ** kwargs):
   #
if float, calculate as percentage of number of points
if isinstance(num_exceptions, float):
   num_exceptions = int(len(arr1) * num_exceptions)
num = len(arr1) - np.sum(np.isclose(arr1, arr2, ** kwargs))
return num <= num_exceptions

By contrast the standard L^2 norm discretization would lead to something like this integrated (and normalized) metric:

import numpy as np

def is_close_l2(arr1, arr2, ** kwargs):
   norm1 = np.sum(arr1 ** 2)
norm2 = np.sum(arr2 ** 2)
norm = np.sum((arr1 - arr2) ** 2)
return np.isclose(2 * norm / (norm1 + norm2), 0.0, ** kwargs)

As a test, you could run:

import numpy as np
import numpy.random

np.random.seed(0)

num = 1000
snr = 100
n_peaks = 5
x = np.linspace(-10, 10, num)
# generate ground truth
y = np.sin(x)
# distributed noise
y2 = y + np.random.random(num) / snr
# distributed noise + peaks
y3 = y + np.random.random(num) / snr
peak_positions = [np.random.randint(num) for _ in range(n_peaks)]
for i in peak_positions:
   y3[i] += np.random.random() * snr

#
for distributed noise, both work with a 1 / snr tolerance
is_close_l2(y, y2, atol = 1 / snr)
# output: True
is_close_except(y, y2, atol = 1 / snr)
# output: True

#
for peak noise, since n_peaks < num_exceptions, this works
is_close_except(y, y3, atol = 1 / snr)
# output: True
# and
if you allow 0 exceptions, than it fails, as expected
is_close_except(y, y3, num_exceptions = 0, atol = 1 / snr)
# output: False

#
for peak noise, this fails because the contribution from the peaks
# in the integral is much larger than the contribution from the rest
is_close_l2(y, y3, atol = 1 / snr)
# output: False

Assuming you have your list of results in the form we discussed in the comments already loaded:

from random
import randint
import numpy
l1 = [(i, randint(0, 99)) for i in range(10)]
l2 = [(i, randint(0, 99)) for i in range(10)]
# I generate some random lists e.g:
   #[(0, 46), (1, 33), (2, 85), (3, 63), (4, 63), (5, 76), (6, 85), (7, 83), (8, 25), (9, 72)]
# where the first element is the time and the second a value
print(l1)
# Then I just evaluate
for each time step the difference between the values
differences = [abs(x[0][1] - x[1][1]) for x in zip(l1, l2)]
print(differences)
# And I can just print hte maximum difference and its index:
   print(max(differences))
print(differences.index(max(differences)))

you could also try the following?

    from tslearn.metrics import dtw
    print(dtw(arr1,arr2)*100/<lengthOfArray>)

I'm sure there are more elegant ways to do this, but a very crudely coded brute force example would be something like the following using pandas:

import pandas as pd

data = pd.DataFrame()
data['benchmark'] = [0.1, 0.2, 0.3] # or whatever you pull from your expected value data set
data['under_test'] = [0.2, 0.3, 0.1] # or whatever you pull from your simulation results data set

sample_rate = 20 # or whatever the data sample rate is
st = 0.05 * sample_rate # shift tolerance adjusted to time series sample rate
# best to make it an integer so we can use standard
# series shift functions and whatnot

at = 0.05 # amplitude tolerance

bounding = pd.DataFrame()
#
if we didn 't care about time shifts, the following two would be sufficient
#(i.e.if the data didn 't have severe discontinuities between samples)
      bounding['top'] = data[['benchmark']] + at bounding['bottom'] = data[['benchmark']] - at

      #
      if you want to be able to tolerate large discontinuities # the bounds can be widened along the time axis to accommodate
      for large jumps bounding['bottomleft'] = data[['benchmark']].shift(-st) - at bounding['topleft'] = data[['benchmark']].shift(-st) + at bounding['topright'] = data[['benchmark']].shift(st) + at bounding['bottomright'] = data[['benchmark']].shift(st) - at

      # minimums and maximums give us a rough(but hopefully good enough) envelope # these can be plotted as a parametric replacement of the 'pink tube' of line width data['min'] = bounding.min(1) data['max'] = bounding.max(1)

      # see
      if the test data falls inside the envelope data['pass/fail'] = data['under_test'].between(data['min'], data['max'])

      # You now have a machine - readable column of booleans # indicating which data points are outside the envelope

Suggestion : 2

The representation of the multidimensional signal in the Simulation Data Inspector as a single signal with nonscalar sample values does not change.,Compare the simulation data in your two runs, using the alignment criteria you specified. The comparison uses a small time tolerance to account for the effect of differences in the step size used by the solver on the transition of the square wave input.,To access the run data to compare, use the Simulink.sdi.getAllRunIDs function to get the run IDs that correspond to the last two simulation runs. ,You can use the Simulink.sdi.compareRuns function to compare the runs. The comparison algorithm converts the signal data to the double data type and synchronizes the signal data before computing the difference signal.

Simulink.sdi.load('AircraftExample.mldatx');
runIDs = Simulink.sdi.getAllRunIDs;
runID1 = runIDs(end - 1);
runID2 = runIDs(end);
runResult = Simulink.sdi.compareRuns(runID1, runID2, 'reltol', 0.2, 'timetol', 0.5);
runResult.Summary
ans = struct with fields:
   OutOfTolerance: 0
WithinTolerance: 3
Unaligned: 0
UnitsMismatch: 0
Empty: 0
Canceled: 0
EmptySynced: 0
DataTypeMismatch: 0
TimeMismatch: 0
StartStopMismatch: 0
Unsupported: 0
saveResult(runResult, 'InputFilterComparison');

Suggestion : 3

Our discussion proceeds in three parts. First, we compare simulations of time series data to other kinds of data simulations, noting what new areas of particular concern emerge when we have to account for time passing. Second, we look at a few code-based simulations. Third, we discuss some general trends in the simulation of time series.,Up to this point, we have discussed where to find time series data and how to process it. Now we will look at how to create times series data via simulation.,This is not a classic time series simulation, so it may feel quite a bit more like an exercise in generating tabular data. It absolutely is that as well, but we did have to be time series–aware:,Such a simulation could be exceptionally complicated. For demonstration purposes, we accept that we will build a simpler world than what we imagine to truly be the case (“All models are wrong…”). We start by trying to understand what a Python generator is.

We start by defining the membership universe—that is, how many members we have and when each joined the organization. We also pair each member with a member status:

# # python
   >>>
   # # membership status >>>
   years = ['2014', '2015', '2016', '2017', '2018'] >>>
   memberStatus = ['bronze', 'silver', 'gold', 'inactive']

   >>>
   memberYears = np.random.choice(years, 1000, >>>
      p = [0.1, 0.1, 0.15, 0.30, 0.35]) >>>
   memberStats = np.random.choice(memberStatus, 1000, >>>
      p = [0.5, 0.3, 0.1, 0.1])

   >>>
   yearJoined = pd.DataFrame({
      'yearJoined': memberYears,
      >>>
      'memberStats': memberStats
   })

We can imagine ways to make this more complex and nuanced depending on anecdotal observations from veterans or novel hypotheses we have about unobservable processes affecting the data:

# # python
   >>>
   NUM_EMAILS_SENT_WEEKLY = 3

   >>>
   # # we define several functions
for different patterns
   >>>
   def never_opens(period_rng):
   >>>
   return []

      >>>
      def constant_open_rate(period_rng):
      >>>
      n, p = NUM_EMAILS_SENT_WEEKLY, np.random.uniform(0, 1) >>>
      num_opened = np.random.binomial(n, p, len(period_rng)) >>>
      return num_opened

         >>>
         def increasing_open_rate(period_rng):
         >>>
         return open_rate_with_factor_change(period_rng, >>>
               np.random.uniform(1.01, >>>
                  1.30))

            >>>
            def decreasing_open_rate(period_rng):
            >>>
            return open_rate_with_factor_change(period_rng, >>>
                  np.random.uniform(0.5, >>>
                     0.99))

               >>>
               def open_rate_with_factor_change(period_rng, fac):
               >>>
               if len(period_rng) < 1:
   >>>
   return [] >>>
      times = np.random.randint(0, len(period_rng), >>>
         int(0.1 * len(period_rng))) >>>
      num_opened = np.zeros(len(period_rng)) >>>
      for prd in range(0, len(period_rng), 2):
      >>>
      try:
      >>>
      n, p = NUM_EMAILS_SENT_WEEKLY, np.random.uniform(0, >>>
         1) >>>
      num_opened[prd: (prd + 2)] = np.random.binomial(n, p, >>>
         2) >>>
      p = max(min(1, p * fac), 0) >>>
      except:
      >>>
      num_opened[prd] = np.random.binomial(n, p, 1) >>>
      for t in range(len(times)):
      >>>
      num_opened[times[t]] = 0 >>>
      return num_opened

We also need to come up with a system to model donation behavior. We don’t want to be totally naive, or our simulation will not give us insights into what we should expect. That is, we want to build into the model our current hypotheses about member behavior and then test whether the simulations based on those hypotheses match what we see in our real data. Here, we make donation behavior loosely but not deterministically related to the number of emails a member has opened:

# # python
   >>>
   # # donation behavior >>>
   def produce_donations(period_rng, member_behavior, num_emails, >>>
      use_id, member_join_year):
   >>>
   donation_amounts = np.array([0, 25, 50, 75, 100, 250, 500, >>>
      1000, 1500, 2000
   ]) >>>
   member_has = np.random.choice(donation_amounts) >>>
   email_fraction = num_emails /
   >>>
   (NUM_EMAILS_SENT_WEEKLY * len(period_rng)) >>>
   member_gives = member_has * email_fraction >>>
   member_gives_idx = np.where(member_gives >>>
      >= donation_amounts)[0][-1] >>>
   member_gives_idx = max(min(member_gives_idx, >>>
         len(donation_amounts) - 2), >>>
      1) >>>
   num_times_gave = np.random.poisson(2) *
   >>>
   (2018 - member_join_year) >>>
   times = np.random.randint(0, len(period_rng), num_times_gave) >>>
   dons = pd.DataFrame({
      'member': [],
      >>>
      'amount': [],
      >>>
      'timestamp': []
   })

   >>>
   for n in range(num_times_gave):
   >>>
   donation = donation_amounts[member_gives_idx >>>
      +np.random.binomial(1, .3)] >>>
   ts = str(period_rng[times[n]].start_time >>>
      +random_weekly_time_delta()) >>>
   dons = dons.append(pd.DataFrame( >>>
      {
         'member': [use_id],
         >>>
         'amount': [donation],
         >>>
         'timestamp': [ts]
      })) >>>
   >>>
   if dons.shape[0] > 0:
   >>>
   dons = dons[dons.amount != 0] >>>
   # # we don 't report zero donation events as this would not >>>
   # # be recorded in a real world database >>>
   >>>
   return dons

Finally, we put all the components just developed together to simulate a certain number of members and associated events in a way that ensures that events happen only once a member has joined and that a member’s email events have some relation (but not an unrealistically small relation) to their donation events:

# # python
   >>>
   behaviors = [never_opens, >>>
      constant_open_rate, >>>
      increasing_open_rate, >>>
      decreasing_open_rate
   ] >>>
   member_behaviors = np.random.choice(behaviors, 1000, >>>
      [0.2, 0.5, 0.1, 0.2])

   >>>
   rng = pd.period_range('2015-02-14', '2018-06-01', freq = 'W') >>>
   emails = pd.DataFrame({
      'member': [],
      >>>
      'week': [],
      >>>
      'emailsOpened': []
   }) >>>
   donations = pd.DataFrame({
      'member': [],
      >>>
      'amount': [],
      >>>
      'timestamp': []
   })

   >>>
   for idx in range(yearJoined.shape[0]):
   >>>
   # # randomly generate the date when a member would have joined >>>
   join_date = pd.Timestamp(yearJoined.iloc[idx].yearJoined) +
   >>>
   pd.Timedelta(str(np.random.randint(0, 365)) +
      >>>
      ' days') >>>
   join_date = min(join_date, pd.Timestamp('2018-06-01')) >>>
   >>>
   # # member should not have action timestamps before joining >>>
   member_rng = rng[rng > join_date] >>>
   >>>
   if len(member_rng) < 1:
   >>>
   continue >>>
   >>>
   info = member_behaviors[idx](member_rng) >>>
   if len(info) == len(member_rng):
   >>>
   emails = emails.append(pd.DataFrame( >>>
      {
         'member': [idx] * len(info),
         >>>
         'week': [str(r.start_time) for r in member_rng],
         >>>
         'emailsOpened': info
      })) >>>
   donations = donations.append( >>>
      produce_donations(member_rng, member_behaviors[idx], >>>
         sum(info), idx, join_date.year))

We then look at the temporal behavior of the donations to get a sense of how we might try this for further analysis or forecasting. We plot the total sum of donations we received for each month of the data set (see Figure 4-1):

# # python
   >>>
   df.set_index(pd.to_datetime(df.timestamp), inplace = True) >>>
   df.sort_index(inplace = True) >>>
   df.groupby(pd.Grouper(freq = 'M')).amount.sum().plot()

Let’s first consider a method I wrote to retrieve a taxi identification number:

# # python
   >>>
   import numpy as np

   >>>
   def taxi_id_number(num_taxis):
   >>>
   arr = np.arange(num_taxis) >>>
   np.random.shuffle(arr) >>>
   for i in range(num_taxis):
   >>>
   yield arr[i]

For those who are not familiar with generators, here is the preceding code in action:

# # python
   >>>
   ids = taxi_id_number(10) >>>
   print(next(ids)) >>>
   print(next(ids)) >>>
   print(next(ids))

which might print out:

7
2
5

Now we create a more complex generator that will use the preceding generators to establish individual taxi parameters as well as create individual taxi timelines:

# # python
   >>>
   def taxi_process(taxi_id_generator, shift_info_generator):
   >>>
   taxi_id = next(taxi_id_generator) >>>
   shift_start, shift_end, shift_mean_trips = >>>
   next(shift_info_generator) >>>
   actual_trips = round(np.random.normal(loc = shift_mean_trips, >>>
      scale = 2)) >>>
   average_trip_time = 6.5 / shift_mean_trips * 60 >>>
   # convert mean trip time to minutes >>>
   between_events_time = 1.0 / (shift_mean_trips - 1) * 60 >>>
   # this is an efficient city where cabs are seldom unused >>>
   time = shift_start >>>
   yield TimePoint(taxi_id, 'start shift', time) >>>
      deltaT = np.random.poisson(between_events_time) / 60 >>>
      time += deltaT >>>
      for i in range(actual_trips):
      >>>
      yield TimePoint(taxi_id, 'pick up    ', time) >>>
         deltaT = np.random.poisson(average_trip_time) / 60 >>>
         time += deltaT >>>
         yield TimePoint(taxi_id, 'drop off   ', time) >>>
            deltaT = np.random.poisson(between_events_time) / 60 >>>
            time += deltaT >>>
            deltaT = np.random.poisson(between_events_time) / 60 >>>
            time += deltaT >>>
            yield TimePoint(taxi_id, 'end shift  ', time)

The taxi generator yields TimePoints, which are defined as follows:

# # python
   >>>
   from dataclasses
import dataclass

   >>>
   @dataclass >>>
   class TimePoint:
   >>>
   taxi_id: int >>>
   name: str >>>
   time: float

   >>>
   def __lt__(self, other):
   >>>
   return self.time < other.time

First we configure our system, as follows:

# # python
   >>>
   # # # CONFIGURATION >>>
   # # physical layout >>>
   N = 5 # width of lattice >>>
   M = 5 # height of lattice >>>
   # # temperature settings >>>
   temperature = 0.5 >>>
   BETA = 1 / temperature

Then we have some utility methods, such as random initialization of our starting block:

>>> def initRandState(N, M):
   >>>
   block = np.random.choice([-1, 1], size = (N, M)) >>>
   return block

We also calculate the energy for a given center state alignment relative to its neighbors:

# # python
   >>>
   def costForCenterState(state, i, j, n, m):
   >>>
   centerS = state[i, j] >>>
   neighbors = [((i + 1) % n, j), ((i - 1) % n, j), >>>
      (i, (j + 1) % m), (i, (j - 1) % m)
   ] >>>
   # # notice the % n because we impose periodic boundary cond >>>
   # # ignore this
if it doesn 't make sense - it'
s merely a
   >>>
   # # physical constraint on the system saying 2 D system is like >>>
   # # the surface of a donut >>>
   interactionE = [state[x, y] * centerS
      for (x, y) in neighbors
   ] >>>
   return np.sum(interactionE)

Here’s where we introduce the MCMC steps discussed earlier:

# # python
   >>>
   def mcmcAdjust(state):
   >>>
   n = state.shape[0] >>>
   m = state.shape[1] >>>
   x, y = np.random.randint(0, n), np.random.randint(0, m) >>>
   centerS = state[x, y] >>>
   cost = costForCenterState(state, x, y, n, m) >>>
   if cost < 0:
   >>>
   centerS *= -1 >>>
   elif np.random.random() < np.exp(-cost * BETA):
   >>>
   centerS *= -1 >>>
   state[x, y] = centerS >>>
   return state

Now to actually run a simulation, we need some recordkeeping as well as repeated calls to the MCMC adjustment:

# # python
   >>>
   def runState(state, n_steps, snapsteps = None):
   >>>
   if snapsteps is None:
   >>>
   snapsteps = np.linspace(0, n_steps, num = round(n_steps / (M * N * 100)), >>>
      dtype = np.int32) >>>
   saved_states = [] >>>
   sp = 0 >>>
   magnet_hist = [] >>>
   for i in range(n_steps):
   >>>
   state = mcmcAdjust(state) >>>
   magnet_hist.append(magnetizationForState(state)) >>>
   if sp < len(snapsteps) and i == snapsteps[sp]:
   >>>
   saved_states.append(np.copy(state)) >>>
   sp += 1 >>>
   return state, saved_states, magnet_hist