# -*- coding: utf-8 -*- """ """ from __future__ import print_function #from pyspark.sql.functions import date_format, max as max_, lit, from_json, udf, col, decode, concat, regexp_replace, current_timestamp from pyspark.sql.functions import substring, concat, split, size, lit, sha2, udf from pyspark.sql.types import StringType, StructType, StructField, Row from pyspark.sql.utils import AnalysisException from pyspark.context import SparkContext import sys from random import random from operator import add from pyspark.sql import SparkSession import re import hashlib import argparse import sys import json import logging from datamask_pyutil.utility import setup_logging import datamask_pyutil.conf_process as conf_process from datamask_pyutil import __version__ __author__ = "Diogo Kato" __copyright__ = "Diogo Kato" __license__ = "mit" _logger = logging.getLogger(__name__) class DatamaskPysparkError(Exception): pass def parse_args(args): """Parse command line parameters Args: args ([str]): command line parameters as list of strings Returns: :obj:`argparse.Namespace`: command line parameters namespace """ parser = argparse.ArgumentParser( description="Sql4spark is a pyspark module to execute SQL on spark and merge data with output") parser.add_argument( "--part-list", dest="partList", required=False, help="Partitions list, Comma ',' delimited", metavar="STRING") parser.add_argument( "--config-path", required=True, dest="configPath", help="Config Path", metavar="STRING") parser.add_argument( "--is-glue-job", required=False, default=False, dest="isGlueJob", help="Is a Glue Job?", action="store_true") parser.add_argument( "--disable-glue-catalog", required=False, default=False, dest="disableGlueCatalog", help="Glue catalog", action="store_true") parser.add_argument( "--job-name", required=True, dest="jobName", help="Job name in the config json", metavar="STRING") parser.add_argument( "--version", action="version", version="datamask-pyutil {ver}".format(ver=__version__)) parser.add_argument( "-v", "--verbose", dest="loglevel", help="Set loglevel to INFO", action="store_const", const=logging.INFO) parser.add_argument( "-vv", "--very-verbose", dest="loglevel", help="Set loglevel to DEBUG", action="store_const", const=logging.DEBUG) return parser.parse_known_args(args) def exit_error(spark, message, code): _logger.error(message) raise DatamaskPysparkError(message) def main(argsv): """Main entry point allowing external calls Args: args ([str]): command line parameter list """ args, unknown_args = parse_args(argsv) setup_logging(args.loglevel) _logger.info("###################################") _logger.info("###################################") _logger.info("Unknow Parameters") _logger.info("{}".format(unknown_args)) _logger.info("###################################") _logger.info("###################################") _logger.debug("Starting datamsk-pyutil-pyspark") if args.isGlueJob: from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job args_glue = getResolvedOptions(argsv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args_glue['JOB_NAME'], args_glue) else: if args.disableGlueCatalog: spark = SparkSession.builder.appName("datamask-pyutil-{}".format(args.jobName)).enableHiveSupport().getOrCreate() else: spark = SparkSession.builder.appName("datamask-pyutil-{}".format(args.jobName)).config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory").enableHiveSupport().getOrCreate() _logger.info("Initialize datamask-pyutil") _logger.info("###################################") _logger.info("") _logger.info("spark: {}".format(spark)) _logger.info("partList: {}".format(args.partList)) _logger.info("configPath: {}".format(args.configPath)) _logger.info("jobName: {}".format(args.jobName)) _logger.info("isGlueJob: {}".format(args.isGlueJob)) _logger.info("disableGlueCatalog: {}".format(args.disableGlueCatalog)) _logger.info("") _logger.info("###################################") _logger.info("Getting Json Conf") cf = conf_process.DatamaskConfProcess(args.configPath, args.jobName) if not cf.is_job_active(): exit_error(spark,"Jobname[{}] is not active".format(args.jobName)) part_vet = [] if args.partList and args.partList != '': part_vet = args.partList.split(",") cf.process_spark(part_vet, spark) spark.stop() _logger.info("Script ends here") def run(): """Entry point for console_scripts """ main(sys.argv) if __name__ == "__main__": run()