import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import time
import pg8000
import boto3
import re
from decimal import *

def runQuery(query,tableName,job_configs):
    hostname=re.search('jdbc:redshift://(.+?)\:.*',job_configs['jdbcURL']).group(1)
    port=re.search('jdbc:redshift://.*\:(.+?)\/.*',job_configs['jdbcURL']).group(1)
    db=re.search('jdbc:redshift://.*\/(.+?)\?.*',job_configs['jdbcURL']).group(1)
    conn=pg8000.connect(user=job_configs['user'], host=hostname, unix_sock=None, port=int(port), database=db, password=job_configs['password'], ssl=False, timeout=None, application_name=None)
    unloadPath="{}/unload_{}_{}/".format(job_configs['TempDir'],tableName,time.time())
    unloadQuery="unload ('{}') to '{}' credentials 'aws_access_key_id={};aws_secret_access_key={};token={}'  delimiter '\001'  escape allowoverwrite".format(query.replace("'","\\'"),unloadPath,job_configs['aws_access_key_id'],job_configs['aws_secret_access_key'],job_configs['aws_session_token'])
    cursor = conn.cursor()
    cursor.execute(unloadQuery)
    zeroRowQuery=buildZeroRowQuery(query)
    spark=job_configs['spark_session']
    schemaDF=spark.read.format("com.databricks.spark.redshift") \
        .option("url",job_configs['jdbcURL']) \
        .option("query",zeroRowQuery) \
        .option("tempdir", job_configs['TempDir']) \
        .option("temporary_aws_access_key_id", job_configs['aws_access_key_id']) \
        .option("temporary_aws_secret_access_key", job_configs['aws_secret_access_key']) \
        .option("temporary_aws_session_token", job_configs['aws_session_token']).load()
    schemaDF.printSchema()
    resultDF=spark.read.schema(schemaDF.schema).csv(unloadPath,sep="\001",inferSchema=True)
    return resultDF

def getMaxValue(resultDF, columnName,job_configs):
    maxValue=resultDF.agg({columnName: "max"}).collect()[0]
    return maxValue;

def updateLastProcessedTSValue(tableName,tsValue,job_configs):
    dynamodb = boto3.resource('dynamodb',region_name=job_configs['REGION'])
    table = dynamodb.Table('rs_querylogs_last_processed_ts')
    response_update = table.update_item(
        Key = {
        'table_name': tableName
        },
        UpdateExpression="set last_processed_ts = :tsValue, last_update_timestmap = :curTime",
        ExpressionAttributeValues={
            ':tsValue': str(tsValue),
            ':curTime': Decimal(str(round(time.time()*1000)))
        },
        ReturnValues="UPDATED_NEW"
    )
    return

def getLastProcessedTSValue(trackingEntry,job_configs):
    dynamodb = boto3.resource('dynamodb',region_name=job_configs['REGION'])
    table = dynamodb.Table('rs_querylogs_last_processed_ts')
    response = table.get_item(
    Key={
        'table_name': trackingEntry
        }
    )
    if 'Item' not in response:
        initialTSValue='2017-01-01 01:00:00.0000'
        table.put_item(Item={'table_name':trackingEntry,'last_processed_ts':initialTSValue})
        return initialTSValue
    lastProcessedTSValue = response['Item']['last_processed_ts']
    return lastProcessedTSValue

def saveToS3(dataframe,s3Prefix,tableName,partitionColumns,job_configs):
	dataframe.write.partitionBy(partitionColumns).mode("append").save(path="{}/{}/".format(s3Prefix, tableName), format="parquet")
	return

def buildZeroRowQuery(query):

    query=query.lower()
    limitClause=re.search(r'limit \d+$', query)
    if limitClause is None:
        query=query + " limit 0"
    else:
        query=re.sub(r"limit \d+$", "limit 0", query)
    return query

def getJobConfigurations(clusterId,job_configs):
    client = boto3.client('ssm',region_name=job_configs['REGION'])
    configs={}
    configs["user"]=client.get_parameter(Name='redshift_query_logs.'+clusterId+".user")["Parameter"]["Value"]
    configs["password"]=client.get_parameter(Name='redshift_query_logs.'+clusterId+".password",WithDecryption=True)["Parameter"]["Value"]
    configs["s3_prefix"]=client.get_parameter(Name='redshift_query_logs.global.s3_prefix')["Parameter"]["Value"]
    return configs