import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.sql.functions import * from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME','GLUE_DB_NAME','GLUE_PLAYER_LOG_TABLE','PLAYER_LOG_BUCKET_PROCESSED_PATH']) sc = SparkContext() glueContext = GlueContext(sc) job = Job(glueContext) job.init(args['JOB_NAME'], args) print "Glue Database ", args['GLUE_DB_NAME'] # Create a Glue context glueContext = GlueContext(SparkContext.getOrCreate()) # Create a DynamicFrame using the 'player_logs' table player_logs = glueContext.create_dynamic_frame.from_catalog(database=args['GLUE_DB_NAME'], table_name=args['GLUE_PLAYER_LOG_TABLE']) player_logs.printSchema() # Print out information about this data print "Count: ", player_logs.count() player_logs = player_logs.select_fields(['MetricType','TimeStamp','at','duration','year','month','day','video_id','user_id']).resolveChoice(specs = [('at','cast:double'),('duration','cast:double')]) # player_logs = player_logs.resolveChoice(specs = [('at','cast:double')]) # player_logs.printSchema() player_logs.toDF().show() stop_events = Filter.apply(frame = player_logs, f = lambda x: x["MetricType"] in ["PAUSE"]) stop_events_df = stop_events.toDF() stop_events_df = stop_events_df.withColumn("watch_percent", (round(stop_events_df["at"],0)/stop_events_df["duration"]) * 100) stop_events_df = stop_events_df.withColumn("partition_key",date_format(from_unixtime(stop_events_df["Timestamp"]/1000),"yyyyMMdd")) print stop_events_df.show() stop_events_df.repartition('partition_key').write.mode('append').parquet(args['PLAYER_LOG_BUCKET_PROCESSED_PATH'],partitionBy=['partition_key']) job.commit()