""" -*- coding: utf-8 -*- # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of # this software and associated documentation files (the "Software"), to deal in # the Software without restriction, including without limitation the rights to # use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of # the Software, and to permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS # FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR # COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER # IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import boto3 from airflow import DAG from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator from airflow.utils.dates import days_ago # Read paramters ECS_PRIVATE_SUBNETS = os.environ.get('AIRFLOW__CDK__SUBNET_ID') CLUSTER_NAME = os.environ.get('AIRFLOW__CDK__CLUSTER_NAME') TASK_DEFINITION = os.environ.get('AIRFLOW__CDK__STACK_NAME') LOG_GROUP = "/aws/mwaaa/blueprint" LOG_PREFIX = "spark" ECS_SECURITY_GROUP = str(SSM_CLIENT.get_parameter(Name='/ecs/mwaa/security-group', \ WithDecryption=True)['Parameter']['Value']) # Run Spark Processing task with DAG(dag_id="run_ecs_task_dag", schedule_interval=None, \ catchup=False, start_date=days_ago(1)) as dag: SPARK_PROCESSING_TASK = EcsRunTaskOperator( task_id="SPARK_PROCESSING_TASK", task_definition=TASK_DEFINITION, cluster=CLUSTER_NAME, launch_type='FARGATE', aws_conn_id="aws_ecs", network_configuration={ "awsvpcConfiguration": { "securityGroups": [ECS_SECURITY_GROUP], "subnets": list(ECS_PRIVATE_SUBNETS.split(",")), }, }, overrides={ "containerOverrides": [ ], }, awslogs_group=LOG_GROUP, awslogs_stream_prefix=LOG_PREFIX, wait_for_completion=True)