import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import time import pg8000 import boto3 import re from decimal import * def runQuery(query,tableName,job_configs): hostname=re.search('jdbc:redshift://(.+?)\:.*',job_configs['jdbcURL']).group(1) port=re.search('jdbc:redshift://.*\:(.+?)\/.*',job_configs['jdbcURL']).group(1) db=re.search('jdbc:redshift://.*\/(.+?)\?.*',job_configs['jdbcURL']).group(1) conn=pg8000.connect(user=job_configs['user'], host=hostname, unix_sock=None, port=int(port), database=db, password=job_configs['password'], ssl=False, timeout=None, application_name=None) unloadPath="{}/unload_{}_{}/".format(job_configs['TempDir'],tableName,time.time()) unloadQuery="unload ('{}') to '{}' credentials 'aws_access_key_id={};aws_secret_access_key={};token={}' delimiter '\001' escape allowoverwrite".format(query.replace("'","\\'"),unloadPath,job_configs['aws_access_key_id'],job_configs['aws_secret_access_key'],job_configs['aws_session_token']) cursor = conn.cursor() cursor.execute(unloadQuery) zeroRowQuery=buildZeroRowQuery(query) spark=job_configs['spark_session'] schemaDF=spark.read.format("com.databricks.spark.redshift") \ .option("url",job_configs['jdbcURL']) \ .option("query",zeroRowQuery) \ .option("tempdir", job_configs['TempDir']) \ .option("temporary_aws_access_key_id", job_configs['aws_access_key_id']) \ .option("temporary_aws_secret_access_key", job_configs['aws_secret_access_key']) \ .option("temporary_aws_session_token", job_configs['aws_session_token']).load() schemaDF.printSchema() resultDF=spark.read.schema(schemaDF.schema).csv(unloadPath,sep="\001",inferSchema=True) return resultDF def getMaxValue(resultDF, columnName,job_configs): maxValue=resultDF.agg({columnName: "max"}).collect()[0] return maxValue; def updateLastProcessedTSValue(tableName,tsValue,job_configs): dynamodb = boto3.resource('dynamodb',region_name=job_configs['REGION']) table = dynamodb.Table('rs_querylogs_last_processed_ts') response_update = table.update_item( Key = { 'table_name': tableName }, UpdateExpression="set last_processed_ts = :tsValue, last_update_timestmap = :curTime", ExpressionAttributeValues={ ':tsValue': str(tsValue), ':curTime': Decimal(str(round(time.time()*1000))) }, ReturnValues="UPDATED_NEW" ) return def getLastProcessedTSValue(trackingEntry,job_configs): dynamodb = boto3.resource('dynamodb',region_name=job_configs['REGION']) table = dynamodb.Table('rs_querylogs_last_processed_ts') response = table.get_item( Key={ 'table_name': trackingEntry } ) if 'Item' not in response: initialTSValue='2017-01-01 01:00:00.0000' table.put_item(Item={'table_name':trackingEntry,'last_processed_ts':initialTSValue}) return initialTSValue lastProcessedTSValue = response['Item']['last_processed_ts'] return lastProcessedTSValue def saveToS3(dataframe,s3Prefix,tableName,partitionColumns,job_configs): dataframe.write.partitionBy(partitionColumns).mode("append").save(path="{}/{}/".format(s3Prefix, tableName), format="parquet") return def buildZeroRowQuery(query): query=query.lower() limitClause=re.search(r'limit \d+$', query) if limitClause is None: query=query + " limit 0" else: query=re.sub(r"limit \d+$", "limit 0", query) return query def getJobConfigurations(clusterId,job_configs): client = boto3.client('ssm',region_name=job_configs['REGION']) configs={} configs["user"]=client.get_parameter(Name='redshift_query_logs.'+clusterId+".user")["Parameter"]["Value"] configs["password"]=client.get_parameter(Name='redshift_query_logs.'+clusterId+".password",WithDecryption=True)["Parameter"]["Value"] configs["s3_prefix"]=client.get_parameter(Name='redshift_query_logs.global.s3_prefix')["Parameter"]["Value"] return configs