# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import os import boto3 import json from botocore import config from aws_xray_sdk.core import xray_recorder from aws_xray_sdk.core import patch_all from MediaInsightsEngineLambdaHelper import MediaInsightsOperationHelper from MediaInsightsEngineLambdaHelper import MasExecutionError patch_all() region = os.environ["AWS_REGION"] mie_config = json.loads(os.environ['botoConfig']) config = config.Config(**mie_config) mediaconvert = boto3.client("mediaconvert", config=config, region_name=region) media_convert_client = None def get_mediaconvert_client(): mediaconvert_endpoint = os.environ["MEDIACONVERT_ENDPOINT"] global media_convert_client if media_convert_client is None: media_convert_client = boto3.client("mediaconvert", region_name=region, endpoint_url=mediaconvert_endpoint) return media_convert_client def lambda_handler(event, _context): print("We got the following event:\n", event) operator_object = MediaInsightsOperationHelper(event) try: job_id = operator_object.metadata["MediaconvertJobId"] workflow_id = operator_object.workflow_execution_id input_file = operator_object.metadata["MediaconvertInputFile"] except KeyError as e: operator_object.update_workflow_status("Error") operator_object.add_workflow_metadata(MediaconvertError="Missing a required metadata key {e}".format(e=e)) raise MasExecutionError(operator_object.return_output_object()) try: asset_id = operator_object.asset_id except KeyError: print("No asset_id in this workflow") asset_id = '' customer_mediaconvert = get_mediaconvert_client() try: response = customer_mediaconvert.get_job(Id=job_id) except Exception as e: print("Exception:\n", e) operator_object.update_workflow_status("Error") operator_object.add_workflow_metadata(MediaconvertError=e, MediaconvertJobId=job_id) raise MasExecutionError(operator_object.return_output_object()) else: if response["Job"]["Status"] == 'IN_PROGRESS' or response["Job"]["Status"] == 'PROGRESSING': operator_object.update_workflow_status("Executing") operator_object.add_workflow_metadata(MediaconvertJobId=job_id, MediaconvertInputFile=input_file, AssetId=asset_id, WorkflowExecutionId=workflow_id) return operator_object.return_output_object() elif response["Job"]["Status"] == 'COMPLETE': # TODO: Store job details as metadata in dataplane # TODO: Get output uri from dataplane output_uri = response["Job"]["Settings"]["OutputGroups"][0]["OutputGroupSettings"]["FileGroupSettings"][ "Destination"] extension = response["Job"]["Settings"]["OutputGroups"][0]["Outputs"][0]["Extension"] modifier = response["Job"]["Settings"]["OutputGroups"][0]["Outputs"][0]["NameModifier"] bucket = output_uri.split("/")[2] folder = "/".join(output_uri.split("/")[3:-1]) file_name = os.path.splitext(operator_object.metadata["MediaconvertInputFile"])[0].split("/")[-1] key = folder + "/" + file_name + modifier + "." + extension operator_object.add_media_object("Audio", bucket, key) operator_object.add_workflow_metadata(MediaconvertJobId=job_id) operator_object.update_workflow_status("Complete") return operator_object.return_output_object() else: operator_object.update_workflow_status("Error") operator_object.add_workflow_metadata( MediaconvertError="Unhandled exception, unable to get status from mediaconvert: {response}".format( response=response), MediaconvertJobId=job_id) raise MasExecutionError(operator_object.return_output_object())