# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import boto3 import logging import json import time import random from datetime import datetime, timedelta, timezone from ratelimiter import RateLimiter logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) cloudformation = boto3.client('cloudformation') iotsitewise = boto3.client('iotsitewise') MAX_ENTRIES_IN_BATCH = 1 NUM_VALUES_PER_ENTRY = 10 SECONDS_PER_MINUTE = 60 FIRST_INVOCATION_DURATION_TO_UPLOAD_DATA_FOR = timedelta(minutes=15) STANDARD_INVOCATION_DURATION_TO_UPLOAD_DATA_FOR = timedelta(minutes=1) MAX_REQUESTS_PER_PERIOD = 1 PERIOD_LENGTH_IN_SECONDS = 1 CLOUDFORMATION_STACK = 'AWS::CloudFormation::Stack' COMPLETE = 'CREATE_COMPLETE' def should_delete_stack(stack_creation_time, demo_days): now = datetime.now(timezone.utc) deletion_time = stack_creation_time + timedelta(days=demo_days) return now >= deletion_time def delete_stack_if_stack_expired(stack_id, demo_days): stack_creation_time = cloudformation.describe_stacks(StackName=stack_id)['Stacks'][0]['CreationTime'] if should_delete_stack(stack_creation_time, demo_days): logger.info('Deleting the stack because the demo duration is exceeded.') cloudformation.delete_stack(StackName=stack_id) return True return False def get_stack_completion_time(stack_id): describe_stack_events_response = cloudformation.describe_stack_events(StackName=stack_id) stack_events = describe_stack_events_response['StackEvents'] while 'nextToken' in describe_stack_events_response: for stack_event in describe_stack_events_response['StackEvents']: if stack_event['ResourceType'] == CLOUDFORMATION_STACK and stack_event['ResourceStatus'] == COMPLETE: return stack_event['Timestamp'] time.sleep(3) describe_stack_events_response = cloudformation.describe_stack_events(StackName=stack_id,nextToken=describe_stack_events_response['nextToken']) for stack_event in describe_stack_events_response['StackEvents']: if stack_event['ResourceType'] == CLOUDFORMATION_STACK and stack_event['ResourceStatus'] == COMPLETE: return stack_event['Timestamp'] return False # Setting this to false, as the TQV/s limit is reduced to 10 - https://t.corp.amazon.com/V292080605 def is_first_invocation(now, stack_completion_time): return False def get_base_offset(first_invocation, now, stack_completion_time): time_diff = now.replace(second=0) - stack_completion_time.replace(second=0) return 0 if first_invocation else int((FIRST_INVOCATION_DURATION_TO_UPLOAD_DATA_FOR.total_seconds() - timedelta(minutes=1).total_seconds() + time_diff.total_seconds())/SECONDS_PER_MINUTE) def read_data(file_path): with open(file_path) as json_file: return json.load(json_file) def put_all_data(asset_id, start_time, base_offset, num_entries_needed,PROPERTIES_TO_PUT_DATA): try: desc_asset_response = iotsitewise.describe_asset(assetId=asset_id) asset_data_file_path = 'data/'+ desc_asset_response['assetName'] + '.txt' try: asset_data = read_data(asset_data_file_path) except Exception as e: logger.error(e) time.sleep(3) for asset_property in desc_asset_response['assetProperties']: if asset_property['name'] not in PROPERTIES_TO_PUT_DATA: continue asset_property_values = asset_data[asset_property['name']] for batch in get_batches(asset_id, asset_property, asset_property_values, start_time, base_offset, num_entries_needed): try: send_batch_put_asset_property_value(batch) except Exception as e: logger.error(e) return True except Exception as e: logger.error(e) def get_batches(asset_id, asset_property, asset_property_values, start_time, base_offset, num_entries_needed): num_batches = int(num_entries_needed // MAX_ENTRIES_IN_BATCH) if num_entries_needed % MAX_ENTRIES_IN_BATCH != 0: num_batches += 1 for batch_number in range(num_batches): yield get_batch(asset_id, asset_property, asset_property_values, start_time, base_offset, num_entries_needed, batch_number) def get_batch(asset_id, asset_property, asset_property_values, start_time, base_offset, num_entries_needed, batch_number): batch = [] for entry_id in range(MAX_ENTRIES_IN_BATCH): seconds_since_start_for_current_entry = (batch_number*MAX_ENTRIES_IN_BATCH*NUM_VALUES_PER_ENTRY)+(entry_id*NUM_VALUES_PER_ENTRY) property_value_range = get_property_value_range(asset_property_values, base_offset, seconds_since_start_for_current_entry) property_values = get_property_values(property_value_range, start_time, seconds_since_start_for_current_entry) entry = {'entryId': str(entry_id), 'assetId': asset_id, 'propertyId':asset_property['id'], 'propertyValues':property_values} batch.append(entry) num_entries_created = (batch_number*MAX_ENTRIES_IN_BATCH) + len(batch) if num_entries_created == num_entries_needed: break return batch def get_property_value_range(asset_property_values, base_offset, seconds_since_start_for_current_entry): minutes_since_start = seconds_since_start_for_current_entry // SECONDS_PER_MINUTE range_minimum_index = (base_offset + minutes_since_start) % len(asset_property_values) range_maximum_index = (range_minimum_index + 1) % len(asset_property_values) return [asset_property_values[range_minimum_index],asset_property_values[range_maximum_index]] def get_property_values(property_value_range, start_time, seconds_since_start_for_current_entry): property_values = [] start_time_timestamp = int(start_time.timestamp()) for value_second in range(NUM_VALUES_PER_ENTRY): current_timestamp = start_time_timestamp + seconds_since_start_for_current_entry + value_second value = get_value_in_range(property_value_range) property_value = {'value': {'doubleValue': value}, 'timestamp':{'timeInSeconds':current_timestamp,'offsetInNanos':0},'quality':'GOOD'} property_values.append(property_value) return property_values def get_value_in_range(property_value_range): return random.uniform(property_value_range[0], property_value_range[1]) @RateLimiter(max_calls=MAX_REQUESTS_PER_PERIOD, period=PERIOD_LENGTH_IN_SECONDS) def send_batch_put_asset_property_value(entries): return iotsitewise.batch_put_asset_property_value(entries=entries) def handler(event, context): stack_id = event['stackId'] demo_days = int(event['demoDurationDays']) delete_stack_if_stack_expired(stack_id, demo_days) stack_completion_time = get_stack_completion_time(stack_id) now = datetime.now(timezone.utc) if not stack_completion_time: # occasionally the lambda is invoked before the CREATE_COMPLETE event for the stack appears in the describe stack events call, use the current time in this case stack_completion_time = now asset_id = event['turbineAssetId'] PROPERTIES_string = event['PropertyGroup'] PROPERTIES_TO_PUT_DATA = list(PROPERTIES_string.split(" ")) first_invocation = is_first_invocation(now, stack_completion_time) duration_to_upload_data_for = FIRST_INVOCATION_DURATION_TO_UPLOAD_DATA_FOR if first_invocation else STANDARD_INVOCATION_DURATION_TO_UPLOAD_DATA_FOR start_time = now - duration_to_upload_data_for base_offset = get_base_offset(first_invocation, now, stack_completion_time) num_entries_needed = int(duration_to_upload_data_for.total_seconds() // NUM_VALUES_PER_ENTRY) logger.info(f'Time since stack completion {(now - stack_completion_time).total_seconds()} seconds') put_all_data(asset_id, start_time, base_offset, num_entries_needed, PROPERTIES_TO_PUT_DATA)