from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
from pyspark.sql.functions import *
from pyspark import SparkContext
import sys

print (sys.argv)
s3srcpath = sys.argv[1]
s3datapath = sys.argv[2]

s3tgtpath = s3srcpath + "curatedzone"

def main():

        sc =SparkContext()
        sc.setLocalProperty("callSite.short", "AWS EMR on EKS Sample Job")
        spark = SparkSession(sc)
        schema = StructType([
        StructField("id", StringType(), True),
        StructField("year_date", StringType(), True),
        StructField("element", StringType(), True),
        StructField("data_value", StringType(), True),
        StructField("m_flag", StringType(), True),
        StructField("q_flag", StringType(), True),
        StructField("s_flag", StringType(), True),
        StructField("obs_time", StringType(), True)
        ])
        dfreadings = spark.read.csv(s3srcpath + f"{s3datapath}/weather_station_data.csv.gz", sep=',',schema =schema)



        dfstations = spark.read.csv(s3srcpath + f"{s3datapath}/station_lookup.txt")
        dfstationssplit = dfstations.select(
        dfstations._c0.substr(1,11).alias('id'),
        dfstations._c0.substr(1,2).alias('countrycode'),
        dfstations._c0.substr(14,9).alias('lat'),
        dfstations._c0.substr(22,9).alias('long'),
        dfstations._c0.substr(39,2).alias('state')
        )

        dfstationssplit.createOrReplaceTempView("stations_sparkvw")
        dfreadings.createOrReplaceTempView("readings_sparkvw")

        dfprocessed = spark.sql("select a.* ,b.countrycode , b.state from readings_sparkvw a, stations_sparkvw b where a.id = b.id and substring(year_date,1,4) ='2011' and element ='PRCP' and b.countrycode ='US'  ")


        dfprocessed.write.mode("overwrite").format('parquet').save(s3tgtpath)



if __name__ == "__main__":
    main()