:toc: = Analyse dataset using Spark Data Type Validation Framework output == Overview This section describes how the output of the Spark Data Type Validation Framework can be used for data analysis using AWS Glue and AWS Athena. The framework library JARS are attached to a Glue job that reads input data set and expected schema to store output as a Glue Catalog table with data stored on S3 bucket. Next section provides with set of Athena queries on the Glue Catalog table that shows various data analysis mechanizms using the error information provided by the framework. The same functionality can be achieved by using the framework library JAR files with Amazon EMR cluster or any Apache Spark processing system. == How to setup Glue Job Create a Glue job using Glue Version 2.0 and ETL Language as Scala. Upload below JAR files to S3 bucket and pass them as --extra-jars in the job configuration. The Glue Job configuratio would look similar to below screenshot. image:media/glue_job_conf.png[image,width=1000,height=400] The JAR files can be built as per the instructions in the README.md. Below is a list of required JARs. Framework JAR: * spark-datatype-com-awsproserve-validation_2.11-1.0.jar Dependency Lib JARs: * play-json_2.11-2.7.4.jar, * play-functional_2.11-2.7.4.jar The sample data is available at link:data/delimited_file.txt[delimited_file.txt] === Glue Job Code |=== a| [source,scala] ---- import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import com.awsproserve.validation.Delimited object SparkDataTypeValidationExample { def main(sysArgs: Array[String]) { val spark: SparkSession = getSparkSession val file = "s3:///data/" val df = Delimited.validate(spark = spark, jsonMultilineString = getJsonTestString, file = file) df.write.mode("overwrite").option("path", "s3:///output_data/raw_table/").saveAsTable("raw_table") } def getJsonTestString: String = { """ \|{ \| "schemaVersion": "0.0.1", \| "delimiter": ",", \| "fileHeader": true, \| "quoteChar": "\"", \| "escapeChar": "\\", \| "lineSeparator": "\n", \| "dataStructure": [ \| { \| "fieldName": "int1", \| "dataType": "int", \| "nullable": true \| }, \| { \| "fieldName": "int2", \| "dataType": "int", \| "nullable": false \| }, \| { \| "fieldName": "string1", \| "dataType": "string", \| "nullable": true \| }, \| { \| "fieldName": "string2", \| "dataType": "string", \| "nullable": false \| }, \| { \| "fieldName": "decimal1", \| "dataType": "decimal", \| "dataFormat": "5,2", \| "nullable": true \| }, \| { \| "fieldName": "decimal2", \| "dataType": "decimal", \| "dataFormat": "5,2", \| "nullable": false \| }, \| { \| "fieldName": "date1", \| "dataType": "date", \| "nullable": true, \| "dataFormat": "yyyy-MM-dd" \| \| }, \| { \| "fieldName": "date2", \| "dataType": "date", \| "nullable": false, \| "dataFormat": "yyyy-MM-dd" \| \| }, \| { \| "fieldName": "timestamp1", \| "dataType": "timestamp", \| "nullable": true, \| "dataFormat": "yyyy/MM/dd'T'HH:mm:ss'Z'" \| }, \| { \| "fieldName": "timestamp2", \| "dataType": "timestamp", \| "nullable": false, \| "dataFormat": "yyyy/MM/dd'T'HH:mm:ss'Z'" \| } \| ] \|} \|""".stripMargin } def getSparkSession: SparkSession = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession sparkSession } } ---- |=== == Data Analysis Queries === Query 1: Get All Rows [cols="",] |=== a| [source,sql] ---- SELECT * FROM "default"."raw_table" ---- |=== image:media/query1_result.png[image,width=1000,height=400] === Query 2: Get Error Rows Summary [cols="",] |=== a| [source,sql] ---- SELECT error_flag, CASE WHEN error_flag = 0 THEN 'No Error' WHEN error_flag = 1 THEN 'Data Type Errors' WHEN error_flag = 2 THEN 'Num of Columns Less Than expected' WHEN error_flag = 3 THEN 'Num of Columns More Than expected' ELSE 'UNKNOWN' END AS error_description, count(*) AS NUM_OF_ROWS FROM "default"."raw_table" GROUP BY 1, 2 ---- |=== image:media/query2_result.png[image,width=1000,height=400] === Query 3: Get all rows where DATE and TIMESTAMP columns have invalid values [cols="",] |=== a| [source,sql] ---- WITH base AS ( SELECT * FROM "default"."raw_table" CROSS JOIN UNNEST(error_message) as T(exp_error_message) ) SELECT element_at(exp_error_message, 'value') as invalid_date, element_at(exp_error_message, 'dataFormat') as date_timestamp_format FROM base WHERE error_flag = 1 AND element_at(exp_error_message, 'dataType') IN ('DATE', 'TIMESTAMP') ---- |=== image:media/query3_result.png[image,width=1000,height=400] === Query 4: Get all rows where number of columns do not match expected number of columns [cols="",] |=== a| [source,sql] ---- WITH base AS ( SELECT * FROM "default"."raw_table" CROSS JOIN UNNEST(error_message) as T(exp_error_message) ) SELECT element_at(exp_error_message, 'expectedNumberOfColumns') as expectedNumberOfColumns, element_at(exp_error_message, 'actualNumberOfColumns') as actualNumberOfColumns, element_at(exp_error_message, 'record') as invalidRecord FROM base WHERE error_flag IN (2, 3) ---- |=== image:media/query4_result.png[image,width=1000,height=400] === Query 5: Get all rows where NOT NULL columns have NULL values [cols="",] |=== a| [source,sql] ---- WITH base AS ( SELECT * FROM "default"."raw_table" CROSS JOIN UNNEST(error_message) as T(exp_error_message) ) SELECT exp_error_message, * FROM base WHERE error_flag = 1 AND element_at(exp_error_message, 'nullability') = 'false' AND element_at(exp_error_message, 'value') in (NULL, '', 'null') ---- |=== image:media/query5_result.png[image,width=1000,height=400]