""" Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0 or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import numpy as np import boto3 import json import uuid import time import random import argparse import AmazonAED import time import datetime AED_STARTUP = 5 AED_ALPHA_LEVEL = 0.01 AED_CHANGE_DIFFERENCE_WINDOW = 5 AED_PRACTICAL_SIGNIFICANCE_THRESHOLD = 0 # This generator def get_args(): """ Parse command line arguments and populate args object. The args object is passed to functions as argument Returns: object (ArgumentParser): arguments and configuration settings """ # First, operations parameters parser = argparse.ArgumentParser(description = 'Generate a time series of events, with some clear shifts. Generates data from a single named device.') parser.add_argument("-r", "--region", required = False, type = str, default = "us-west-2", help = "Region the Kinesis event stream is in") parser.add_argument("-s", "--stream", required = False, type = str, default = "aed_events_data_stream", help = "Kinesis stream name") parser.add_argument("-v", "--verbose", required = False, type = str, choices=["False","True"], default = "False", help = "[False|True] - Print all the data items") parser.add_argument("-t", "--time", required = False, type = int, default = 60, help = "Minimum time that the data creation should run for (in seconds)") deviceId = 'device'+str(random.randint(1, 100)) parser.add_argument("-d", "--deviceid", required = False, type = str, default = deviceId, help = "Device identifier for this stream") # AED specific parameters parser.add_argument("-a", "--alpha", required = False, type = float, default = 0.01, help = "AED alpha level") parser.add_argument("-w", "--window", required = False, type = int, default = 5, help = "AED change difference window") parser.add_argument("-x", "--startup", required = False, type = int, default = 5, help = "AED startup") parser.add_argument("-p", "--practicalthreshold", required = False, type = float, default = 0, help = "AED practical significance threshold") args = parser.parse_args() args.prog = parser.prog return args def send_data_to_kinesis(device, data, region, streamName): event = { 'unixtime': time.time(), 'timestamp': str(datetime.datetime.now()), 'data': data, 'deviceid': device } #print(json.dumps(event)) byte_data = str.encode(json.dumps(event)) kinesis = boto3.client('kinesis', region_name=region) kinesis.put_record( StreamName=streamName, Data=byte_data, PartitionKey=str(uuid.uuid4()) ) if __name__ == "__main__": args = get_args() region = args.region streamName = args.stream deviceId = args.deviceid start_time = time.perf_counter() desired_end_time = start_time + args.time print(f'Generating data for device {deviceId}, desired run time >= {args.time} seconds') print("verbose = " + str(args.verbose)) AED_STARTUP = args.startup AED_ALPHA_LEVEL = args.alpha AED_CHANGE_DIFFERENCE_WINDOW = args.window AED_PRACTICAL_SIGNIFICANCE_THRESHOLD = args.practicalthreshold print('AED parameters: startup: {} alpha level: {} change difference window: {} practical_significance_threshold: {}'.format(AED_STARTUP, AED_ALPHA_LEVEL, AED_CHANGE_DIFFERENCE_WINDOW, AED_PRACTICAL_SIGNIFICANCE_THRESHOLD) ) aed = AmazonAED.AmazonAED() # Define some data. A, B, C, D = [30, 33, 55, 45] means = [30, 33, 55, 45, 37,62] # Define the "mean" value of 4 different stages times = [100, 40, 30, 80, 50, 30] # Length of each stage scale = 1.5 # Standard deviation of distribution itrtn = 0.9 # Factor to modify/scale each iteration's data whole_time_series = np.array([]) time_series = np.array([]) data_points_so_far = 0 change_points = [] detection_points = [] magnitudes = [] p_values = [] period_resets = [] # Loop till we're past time while time.perf_counter() < desired_end_time: # Create data itrtn += 0.1 scale = scale * itrtn for mn, ln in zip(means,times): time_series_bit = mn + np.random.normal(scale=scale,size=(ln, 1)) if time_series.size == 0: time_series = time_series_bit else: time_series = np.vstack([time_series, time_series_bit]) print(f'Number of data points so far {len(time_series)}') if args.verbose == 'True': for i in range(len(time_series)): print(f'time_series[{i}] = {time_series[i][0]}') # Send generated data to Kinesis for a in time_series: send_data_to_kinesis(str(deviceId), a[0], region, streamName) # Run AED locally over the generated time series, for demonstration purposes aed.streamingMPP(time_series, startup=AED_STARTUP, alpha_level=AED_ALPHA_LEVEL, change_difference_window=AED_CHANGE_DIFFERENCE_WINDOW, confidence_intervals=None, practical_significance_threshold=AED_PRACTICAL_SIGNIFICANCE_THRESHOLD) print('Change points found in this set were:') print('Time_changed','Time_detected','Magnitude','p_value') for ch,det,mag,p in zip(aed.change_points, aed.detection_points, aed.magnitudes, aed.p_value_change_points): print("{0:5d} {1:5d} {2:10.3f} {3:10.3e}".format(ch, det, mag, p)) # Collect all the change point data for later change_points.extend( [pnt+data_points_so_far for pnt in aed.change_points] ) detection_points.extend( [pnt+data_points_so_far for pnt in aed.detection_points] ) magnitudes.extend(aed.magnitudes) p_values.extend(aed.p_value_change_points) end_time = time.perf_counter() print(f"Now running for {end_time - start_time:0.4f} seconds") if whole_time_series.size == 0: whole_time_series = time_series else: whole_time_series = np.vstack([whole_time_series, time_series]) data_points_so_far += len(time_series) period_resets.append(data_points_so_far) print('\nChange points found were:') print('Time_changed','Time_detected','Magnitude','p_value') for ch,det,mag,p in zip(change_points, detection_points, magnitudes, p_values): print("{0:5d} {1:5d} {2:10.3f} {3:10.3e}".format(ch, det, mag, p)) # Show plot of the time series import matplotlib.pyplot as mp y = [means[0]] * len(change_points) # where to place change and detection points on y axis mp.ylabel("Data value") mp.xlabel("Generated sequence, "+deviceId) mp.scatter(change_points, y, color='b', marker='^') mp.scatter(detection_points, y, color='r', marker='v') mp.scatter(period_resets, [means[0]] * len(period_resets) , color='c', marker='o') mp.plot(whole_time_series, color='k') mp.show()