B `)@sddlmZddlmZddlmZddlmZddlm Z ddl m Z ddl m Z mZdd lmZdd lmZdd lmZGd d d e jZdS))aws_ec2)aws_s3)aws_s3_deployment)aws_kms)core)listdir) emr_profileemr_code)TaskInstanceGroupConfiguration)emr_launch_function)InstanceMarketTypec seZdZdZejeedfdd ZddZ eee eegeeeee egd dd Z d d Z d d Z d ddZddZeejdddZeedddZeejdddZeedddZeedddZeedddZZS)!EMRClusterDefinitionz? Stack to define a Standard EMR Cluster Configuration: )scopeidconfigc stj||f|x |D]\}}t|||qWtj|ddd}|jdtjjd|jdtjj d|j tj j dj d}|j|_tj|d d |jtjjd } tj|d d |jtjjd } tj|dd|jtjjd |_tj|dd|jtjjd |_|j|| | d|_| |_|j|j|jj|jjgdtj|d| dtj dgd} g} x*t!dD]} | "d| j#d| qbW|j$||j%|j&|j'|j(|j)|j*|j+|j,|j-|j.| d |_/|j0|j1|j2dd|_3|4dS)NZMyEMRVpc)max_azsz s3-gateway)servicezdynamo-gateway) subnet_typerZemr_logsz emr-logs-) bucket_name encryptionZ emr_artifactszemr-artifacts-Z emr_outputzsynchronized-rosbag-topics-Zemr_output_sceneszdetected-scenes-)vpc log_bucketartifact_bucket)input_data_bucket_arnsoutput_data_bucket_arnsbootstrap_actionsz,infrastructure/emr_launch/bootstrap_actions/)rZdestination_bucketZdestination_key_prefixsourceszs3://z/bootstrap_actions/) subnetmaster_instance_typecore_instance_typecore_instance_count release_label applications configurationcore_instance_markettask_instance_typetask_instance_markettask_instance_countbootstrap_action_script_pathsT)rcluster_configurationdefault_fail_if_cluster_running)5super__init__itemssetattrec2Vpcadd_gateway_endpointGatewayVpcEndpointAwsServiceS3DYNAMODBselect_subnets SubnetTypePRIVATEsubnets CLUSTER_NAME _cluster_names3BucketaccountBucketEncryption S3_MANAGEDsynchronized_bucket scenes_bucketinit_emr_profile _emr_profilerauthorize_buckets INPUT_BUCKETS bucket_arns3dZBucketDeploymentSourceassetrappendremr_resource_configMASTER_INSTANCE_TYPECORE_INSTANCE_TYPECORE_INSTANCE_COUNT RELEASE_LABEL APPLICATIONS CONFIGURATIONCORE_INSTANCE_MARKETTASK_INSTANCE_TYPETASK_INSTANCE_MARKETTASK_INSTANCE_COUNT_cluster_configurationlaunch_function_configrr+_launch_functionoutputs)selfrrrkwargskvrrrrZbootstrap_coderf) __class__i/Users/soucyk/AmazonCode/Rosbag-topic-extraction-pipeline/infrastructure/emr_launch/cluster_definition.pyr.s            zEMRClusterDefinition.__init__cCs(tj|d|jdtj|d|jddS)za Extend the values here to add additional outputs of the module :return: ZLaunchFunctionARN)valueZInstanceRoleNameN)r CfnOutputlaunch_function_arninstance_role_name)r\rbrbrcr[s zEMRClusterDefinition.outputs) r r!r"r#r$r%r&r'r(r)r*c Cs|} |dkrtj}n|dkr$tj}nt|d| dkrBtj}n| dkrRtj}nt| d| rg}xNt| D]<\}}d|kst|dtjd||d}||qrWnd}t |d|j d |||||| ||| || |d S) NSPOT ON_DEMANDz% must be one of 'SPOT' or 'ON_DEMAND'zs3://z2 must be a full s3 path like s3://bucket/script.shZ bootstrap_)namepathZClusterResourceConfigurationz -resources) Zconfiguration_namerr r!r"r#configurationsr$r&r'r(r)r) r rhri Exception enumerateAssertionErrorr ZEMRBootstrapActionrLr r<)r\rr r!r"r#r$r%r&r'r(r)r*Zdefault_configurationsZ core_marketZ task_marketridxZ script_pathZbootstrap_actionrbrbrcrMsN    z(EMRClusterDefinition.emr_resource_configcCs|j|||ggd}|S)N)r logs_bucketartifacts_bucketinput_kms_keysoutput_kms_keys)security_profile_config)r\rrrprofilerbrbrcrDsz%EMRClusterDefinition.init_emr_profilecstfddt|D}fddt|D}x |D]\}}j|q6Wx |D]\}}j|qXWdS)Ncs*i|]"\}}tjjd||d|qS)z In-bucket)rH)r=r>from_bucket_arn).0ib_name)r\rbrc sz:EMRClusterDefinition.authorize_buckets..cs*i|]"\}}tjjd||d|qS)z Out-bucket)rH)r=r>rw)rxryrz)r\rbrcr{s)rnr/rEZauthorize_input_bucketZauthorize_output_bucket)r\rrZinput_data_bucketsZoutput_data_bucketsrzbucketrb)r\rcrFs    z&EMRClusterDefinition.authorize_bucketsNc Cs|jd}tj||||||d}|r`x:t|D].\}} tjj||d|| d} || q.W|rx:t|D].\}} tjj||d|| d} || qnW|j j |j j t jd|S)Nz -security) profile_namerrqrrZ _input_key_)rkey_arnZ _output_key_i$)r<r EMRProfilernkmsKey from_key_arnZauthorize_input_keyZauthorize_output_keysecurity_groupsZ service_groupadd_ingress_ruleZ master_groupr1Porttcp) r\rrqrrrsrtr}rvryr^kms_keyrbrbrcrus* z,EMRClusterDefinition.security_profile_configcCs6tj||j|jd|||j||jdtjdddgd S)Nz launch-fndefaultGroupZAWSDemo)keyrd) namespaceZlaunch_function_namerr+ cluster_namer,Z allowed_cluster_config_overridesZ cluster_tags)r EMRLaunchFunctionr<Zoverride_interfacesrTag)r\rr+r,rbrbrcrYsz+EMRClusterDefinition.launch_function_config)returncCs|jS)N)rE)r\rbrbrcr.sz EMRClusterDefinition.emr_profilecCs|jS)N)rX)r\rbrbrcr+2sz*EMRClusterDefinition.cluster_configurationcCs|jS)N)rZ)r\rbrbrclaunch_function6sz$EMRClusterDefinition.launch_functioncCs |jjjS)N)rZ state_machinestate_machine_arn)r\rbrbrcrf:sz(EMRClusterDefinition.launch_function_arncCs |jjjjS)N)rE_roles instance_role role_name)r\rbrbrcrg>sz'EMRClusterDefinition.instance_role_namecCs |jjjjS)N)rErrrole_arn)r\rbrbrcinstance_role_arnBsz&EMRClusterDefinition.instance_role_arn)NN)__name__ __module__ __qualname____doc__r Constructstrdictr.r[intrMrDrFrurYpropertyrrr r+r rrrfrgr __classcell__rbrb)rarcrs>k8  !rN)aws_cdkrr1rr=rrIrrrosrZ(aws_emr_launch.constructs.emr_constructsrr Zinstance_group_configr Z(aws_emr_launch.constructs.step_functionsr Z>aws_emr_launch.constructs.emr_constructs.cluster_configurationr Stackrrbrbrbrcs