# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # Step 0 - Get Source Data datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "[DATABASE_NAME]", table_name = "[TABLE_NAME]", transformation_ctx = "datasource0") # Step 1 - Apply Column Mapping applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [[COLUMN_MAPPINGS]], transformation_ctx = "applymapping1") # Step 2 - Convert to DataFrame and Repartition repartition2 = applymapping1.toDF().repartition(1) # Step 3 - Convert to DynamicFrame convert3 = DynamicFrame.fromDF(repartition2, glueContext, "nested") # Step 4 - Save Output Data datasink4 = glueContext.write_dynamic_frame.from_options(frame = convert3, connection_type = "s3", connection_options = {"path": "[OUTPUT_PATH]"}, format = "csv", transformation_ctx = "datasink2") # Step 5 - Run Job job.commit()