# Copyright Amazon.com, Inc. and its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT # # Licensed under the MIT License. See the LICENSE accompanying this file # for the specific language governing permissions and limitations under # the License. import os import json from datalake_library.commons import init_logger from datalake_library.configuration.resource_configs import SQSConfiguration, StateMachineConfiguration from datalake_library.interfaces.states_interface import StatesInterface from datalake_library.interfaces.sqs_interface import SQSInterface logger = init_logger(__name__) def lambda_handler(event, context): try: team = os.environ['TEAN'] pipeline = os.environ['PIPELINE'] dataset = event['dataset'] state_config = StateMachineConfiguration(team, pipeline) sqs_config = SQSConfiguration(team, pipeline, dataset) dlq_interface = SQSInterface(sqs_config.get_post_stage_dlq_name) messages = dlq_interface.receive_messages(1) if not messages: logger.info('No messages found in {}'.format(sqs_config.get_post_stage_dlq_name)) return logger.info('Received {} messages'.format(len(messages))) for message in messages: logger.info('Starting State Machine Execution') if isinstance(message.body, str): response = json.loads(message.body) StatesInterface().run_state_machine(state_config.get_post_stage_state_machine_arn, response) message.delete() logger.info('Delete message succeeded') except Exception as e: logger.error("Fatal error", exc_info=True) raise e return