""" Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0 or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import numpy as np import boto3 import json import uuid import time import random import argparse #import AmazonAED import time import datetime #AED_STARTUP = 5 #AED_ALPHA_LEVEL = 0.01 #AED_CHANGE_DIFFERENCE_WINDOW = 5 #AED_PRACTICAL_SIGNIFICANCE_THRESHOLD = 0 def get_args(): """ Parse command line arguments and populate args object. The args object is passed to functions as argument Returns: object (ArgumentParser): arguments and configuration settings """ # First, operations parameters parser = argparse.ArgumentParser(description = 'Generate a time series of events, with some clear shifts. Generates data from a single named device.') parser.add_argument("-r", "--region", required = False, type = str, default = "us-west-2", help = "Region the Kinesis event stream is in") parser.add_argument("-s", "--stream", required = False, type = str, default = "aed_events_data_stream", help = "Kinesis stream name") parser.add_argument("-v", "--verbose", required = False, type = str, choices=["False","True"], default = "False", help = "[False|True] - Print all the data items") parser.add_argument("-t", "--time", required = False, type = int, default = 60, help = "Minimum time that the data creation should run for (in seconds)") deviceId = 'device'+str(random.randint(1, 100)) parser.add_argument("-d", "--deviceid", required = False, type = str, default = deviceId, help = "Device identifier for this stream") # AED specific parameters #parser.add_argument("-a", "--alpha", required = False, type = float, # default = 0.01, help = "AED alpha level") #parser.add_argument("-w", "--window", required = False, type = int, # default = 5, help = "AED change difference window") #parser.add_argument("-x", "--startup", required = False, type = int, # default = 5, help = "AED startup") #parser.add_argument("-p", "--practicalthreshold", required = False, type = float, # default = 0, help = "AED practical significance threshold") args = parser.parse_args() args.prog = parser.prog return args def send_data_to_kinesis(device, data, region, streamName): event = { 'unixtime': time.time(), 'timestamp': str(datetime.datetime.now()), 'data': data, 'deviceid': device } #print(json.dumps(event)) byte_data = str.encode(json.dumps(event)) kinesis = boto3.client('kinesis', region_name=region) kinesis.put_record( StreamName=streamName, Data=byte_data, PartitionKey=str(uuid.uuid4()) ) if __name__ == "__main__": args = get_args() region = args.region streamName = args.stream deviceId = args.deviceid start_time = time.perf_counter() desired_end_time = start_time + args.time print(f'Generating data for device {deviceId}, desired run time >= {args.time} seconds') print("verbose = " + str(args.verbose)) #AED_STARTUP = args.startup #AED_ALPHA_LEVEL = args.alpha #AED_CHANGE_DIFFERENCE_WINDOW = args.window #AED_PRACTICAL_SIGNIFICANCE_THRESHOLD = args.practicalthreshold #print('AED parameters: startup: {} alpha level: {} change difference window: {} practical_significance_threshold: {}'.format(AED_STARTUP, AED_ALPHA_LEVEL, AED_CHANGE_DIFFERENCE_WINDOW, AED_PRACTICAL_SIGNIFICANCE_THRESHOLD) ) #aed = AmazonAED.AmazonAED() # Define some data. A, B, C, D = [30, 33, 55, 45] means = [30, 33, 55, 45, 37,62] # Define the "mean" value of 4 different stages times = [100, 40, 30, 80, 50, 30] # Length of each stage scale = 1.5 # Standard deviation of distribution itrtn = 0.9 # Factor to modify/scale each iteration's data whole_time_series = np.array([]) time_series = np.array([]) data_points_so_far = 0 period_resets = [] # Loop till we're past time while time.perf_counter() < desired_end_time: # Create data itrtn += 0.1 scale = scale * itrtn for mn, ln in zip(means,times): time_series_bit = mn + np.random.normal(scale=scale,size=(ln, 1)) if time_series.size == 0: time_series = time_series_bit else: time_series = np.vstack([time_series, time_series_bit]) print(f'Number of data points so far {len(time_series)}') if args.verbose == 'True': for i in range(len(time_series)): print(f'time_series[{i}] = {time_series[i][0]}') # Send generated data to Kinesis for a in time_series: send_data_to_kinesis(str(deviceId), a[0], region, streamName) end_time = time.perf_counter() print(f"Now running for {end_time - start_time:0.4f} seconds") # Keep track of the input data, Just In Case if whole_time_series.size == 0: whole_time_series = time_series else: whole_time_series = np.vstack([whole_time_series, time_series]) data_points_so_far += len(time_series) period_resets.append(data_points_so_far)