import boto3 import uuid import json import os import subprocess import re import random from time import sleep ############################################## # Cleanup Certificates and Things and created certificate and private key file def delete_thing_with_certi(thingName, certiId, certiArn): client.detach_thing_principal( thingName = thingName, principal = certiArn) client.update_certificate( certificateId =certiId, newStatus ='INACTIVE') client.delete_certificate(certificateId = certiId, forceDelete = True) client.delete_thing(thingName = thingName) os.remove(os.environ["DA_CERTI"]) os.remove(os.environ["DA_KEY"]) # Export the testing log and upload it to S3 bucket def process_logs(log_group, log_stream, thing_name): logs_client = boto3.client('logs') response = logs_client.get_log_events( logGroupName=log_group, logStreamName=log_stream ) log_file = "DA_Log_Javascript_" + thing_name + ".log" f = open(log_file, 'w') for event in response["events"]: f.write(event['message']) f.close() try: secrets_client = boto3.client( "secretsmanager", region_name=os.environ["AWS_DEFAULT_REGION"]) s3_bucket_name = secrets_client.get_secret_value(SecretId="ci/DeviceAdvisor/s3bucket")["SecretString"] s3.Bucket(s3_bucket_name).upload_file(log_file, log_file) print("[Device Advisor] Device Advisor Log file uploaded to "+ log_file) except Exception: print ("[Device Advisor] Error: could not store log in S3 bucket!") os.remove(log_file) print("[Device Advisor] Device Advisor Log file uploaded to "+ log_file) # Sleep for a random time between base and bax def sleep_with_backoff(base, max): sleep(random.randint(base,max)) ############################################## # Initialize variables # create aws clients try: client = boto3.client('iot') dataClient = boto3.client('iot-data') deviceAdvisor = boto3.client('iotdeviceadvisor') s3 = boto3.resource('s3') except Exception: print ("[Device Advisor] Error: could not create boto3 clients.") exit(-1) # const BACKOFF_BASE = 5 BACKOFF_MAX = 10 # 60 minutes divided by the maximum back-off = longest time a DA run can last with this script. MAXIMUM_CYCLE_COUNT = (3600 / BACKOFF_MAX) # Did Device Advisor fail a test? If so, this should be true did_at_least_one_test_fail = False # load test config f = open('deviceadvisor/script/DATestConfig.json') DATestConfig = json.load(f) f.close() # create an temporary certificate/key file path certificate_path = os.path.join(os.getcwd(), 'certificate.pem.crt') key_path = os.path.join(os.getcwd(), 'private.pem.key') # load environment variables requried for testing shadowProperty = os.environ['DA_SHADOW_PROPERTY'] shadowDefault = os.environ['DA_SHADOW_VALUE_DEFAULT'] ############################################## # install iot-device-sdk-v2 subprocess.run("npm install --unsafe-perm", shell = True) # test result test_result = {} ############################################## # Run device advisor for test_name in DATestConfig['tests']: ############################################## # create a test thing thing_name = "DATest_" + str(uuid.uuid4()) try: # create_thing_response: # { # 'thingName': 'string', # 'thingArn': 'string', # 'thingId': 'string' # } print("[Device Advisor] Info: Started to create thing...") create_thing_response = client.create_thing( thingName=thing_name ) os.environ["DA_THING_NAME"] = thing_name except Exception: print("[Device Advisor] Error: Failed to create thing: " + thing_name) exit(-1) ############################################## # create certificate and keys used for testing try: print("[Device Advisor]Info: Started to create certificate...") # create_cert_response: # { # 'certificateArn': 'string', # 'certificateId': 'string', # 'certificatePem': 'string', # 'keyPair': # { # 'PublicKey': 'string', # 'PrivateKey': 'string' # } # } create_cert_response = client.create_keys_and_certificate( setAsActive=True ) # write certificate to file f = open(certificate_path, "w") f.write(create_cert_response['certificatePem']) f.close() # write private key to file f = open(key_path, "w") f.write(create_cert_response['keyPair']['PrivateKey']) f.close() # setup environment variable os.environ["DA_CERTI"] = certificate_path os.environ["DA_KEY"] = key_path except Exception: try: client.delete_thing(thingName = thing_name) except Exception: print("[Device Advisor] Error: Could not delete thing.") print("[Device Advisor] Error: Failed to create certificate.") exit(-1) certificate_arn = create_cert_response['certificateArn'] certificate_id = create_cert_response['certificateId'] ############################################## # attach policy to certificate try: secrets_client = boto3.client( "secretsmanager", region_name=os.environ["AWS_DEFAULT_REGION"]) policy_name = secrets_client.get_secret_value(SecretId="ci/DeviceAdvisor/policy_name")["SecretString"] client.attach_policy ( policyName= policy_name, target = certificate_arn ) except Exception as ex: print (ex) delete_thing_with_certi(thing_name, certificate_id, certificate_arn ) print("[Device Advisor] Error: Failed to attach policy.") exit(-1) ############################################## # attach certification to thing try: print("[Device Advisor] Info: Attach certificate to test thing...") # attache the certificate to thing client.attach_thing_principal( thingName = thing_name, principal = create_cert_response['certificateArn'] ) except: delete_thing_with_certi(thing_name, certificate_id ,certificate_arn ) print("[Device Advisor] Error: Failed to attach certificate.") exit(-1) try: ###################################### # set default shadow, for shadow update, if the # shadow does not exists, update will fail print("[Device Advisor] Info: About to update shadow.") payload_shadow = json.dumps( { "state": { "desired": { shadowProperty: shadowDefault }, "reported": { shadowProperty: shadowDefault } } }) shadow_response = dataClient.update_thing_shadow( thingName = thing_name, payload = payload_shadow) get_shadow_response = dataClient.get_thing_shadow(thingName = thing_name) # make sure shadow is created before we go to next step print("[Device Advisor] Info: About to wait for shadow update.") while(get_shadow_response is None): get_shadow_response = dataClient.get_thing_shadow(thingName = thing_name) # start device advisor test # test_start_response # { # 'suiteRunId': 'string', # 'suiteRunArn': 'string', # 'createdAt': datetime(2015, 1, 1) # } print("[Device Advisor] Info: Start device advisor test: " + test_name) sleep_with_backoff(BACKOFF_BASE, BACKOFF_MAX) test_start_response = deviceAdvisor.start_suite_run( suiteDefinitionId=DATestConfig['test_suite_ids'][test_name], suiteRunConfiguration={ 'primaryDevice': { 'thingArn': create_thing_response['thingArn'], }, 'parallelRun': True }) # get DA endpoint print("[Device Advisor] Info: Getting Device Advisor endpoint.") endpoint_response = deviceAdvisor.get_endpoint( thingArn = create_thing_response['thingArn'] ) os.environ['DA_ENDPOINT'] = endpoint_response['endpoint'] working_dir = os.getcwd() exe_path = os.path.join("deviceadvisor/tests",DATestConfig['test_exe_path'][test_name]) os.chdir(exe_path) subprocess.run("npm install --unsafe-perm", shell = True) cycle_number = 0 while True: cycle_number += 1 if (cycle_number >= MAXIMUM_CYCLE_COUNT): print(f"[Device Advisor] Error: {cycle_number} of cycles lasting {BACKOFF_BASE} to {BACKOFF_MAX} seconds have passed.") raise Exception(f"ERROR - {cycle_number} of cycles lasting {BACKOFF_BASE} to {BACKOFF_MAX} seconds have passed.") # Add backoff to avoid TooManyRequestsException sleep_with_backoff(BACKOFF_BASE, BACKOFF_MAX) print ("[Device Advisor] Info: About to get Device Advisor suite run.") test_result_responds = deviceAdvisor.get_suite_run( suiteDefinitionId=DATestConfig['test_suite_ids'][test_name], suiteRunId=test_start_response['suiteRunId'] ) # If the status is PENDING or the responds does not loaded, the test suite is still loading if (test_result_responds['status'] == 'PENDING' or len(test_result_responds['testResult']['groups']) == 0 or # test group has not been loaded len(test_result_responds['testResult']['groups'][0]['tests']) == 0 or #test case has not been loaded test_result_responds['testResult']['groups'][0]['tests'][0]['status'] == 'PENDING'): continue # Start to run the test sample after the status turns into RUNNING elif (test_result_responds['status'] == 'RUNNING' and test_result_responds['testResult']['groups'][0]['tests'][0]['status'] == 'RUNNING'): print ("[Device Advisor] Info: About to get start Device Advisor companion test application.") file_path = 'dist/'+ DATestConfig['test_exe_path'][test_name] + '/index.js' subprocess.run('node ' + file_path, shell = True) # If the test finalizing then store the test result elif (test_result_responds['status'] != 'RUNNING'): test_result[test_name] = test_result_responds['status'] # If the test failed, upload the logs to S3 before clean up if(test_result[test_name] != "PASS"): print ("[Device Advisor] Info: About to upload log to S3.") log_url = test_result_responds['testResult']['groups'][0]['tests'][0]['logUrl'] group_string = re.search('group=(.*);', log_url) log_group = group_string.group(1) stream_string = re.search('stream=(.*)', log_url) log_stream = stream_string.group(1) process_logs(log_group, log_stream, thing_name) delete_thing_with_certi(thing_name, certificate_id ,certificate_arn ) break os.chdir(working_dir) except Exception: delete_thing_with_certi(thing_name, certificate_id ,certificate_arn ) print("[Device Advisor]Error: Failed to test: "+ test_name) did_at_least_one_test_fail = True sleep_with_backoff(BACKOFF_BASE, BACKOFF_MAX) ############################################## # print result and cleanup things print(test_result) failed = False for test in test_result: if(test_result[test] != "PASS" and test_result[test] != "PASS_WITH_WARNINGS"): print("[Device Advisor]Error: Test \"" + test + "\" Failed with status:" + test_result[test]) failed = True if failed: # if the test failed, we dont clean the Thing so that we can track the error exit(-1) if (did_at_least_one_test_fail == True): print("[Device Advisor] At least one test failed!") exit(-1) exit(0)