AWSTemplateFormatVersion: '2010-09-09' Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: Required Parameters Parameters: - S3QueryResultsBucketName - APIPassphrase Parameters: S3QueryResultsBucketName: Type: String Description: Amazon S3 bucket name for storing Amazon Athena query results. The bucket must be in the same region. AllowedPattern: (?=^.{3,63}$)(^[a-z0-9]+[a-z0-9\\-]+[a-z0-9]+$) APIPassphrase: Type: String Description: Passphrase that will need to be passed as 'Authorization' request header NoEcho: True Resources: AthenaWorkGroup: Type: AWS::Athena::WorkGroup Properties: Name: ParameterizedStatementsWG State: ENABLED RecursiveDeleteOption: true WorkGroupConfiguration: EnforceWorkGroupConfiguration: True EngineVersion: SelectedEngineVersion: AUTO ResultConfiguration: EncryptionConfiguration: EncryptionOption: SSE_S3 OutputLocation: !Sub s3://${S3QueryResultsBucketName}/results/ GlueDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: athena_prepared_statements GlueTableAmazonReviews: Type: AWS::Glue::Table Properties: DatabaseName: !Ref GlueDatabase CatalogId: !Ref AWS::AccountId TableInput: Name: amazon_reviews_parquet Parameters: EXTERNAL: 'TRUE' PartitionKeys: - Name: product_category Type: string StorageDescriptor: BucketColumns: [] Columns: - Name: marketplace Type: string - Name: customer_id Type: string - Name: review_id Type: string - Name: product_id Type: string - Name: product_parent Type: string - Name: product_title Type: string - Name: star_rating Type: int - Name: helpful_votes Type: int - Name: total_votes Type: int - Name: vine Type: string - Name: verified_purchase Type: string - Name: review_headline Type: string - Name: review_body Type: string - Name: review_date Type: bigint - Name: year Type: int Compressed: false InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat Location: s3://amazon-reviews-pds/parquet/ NumberOfBuckets: -1 OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat Parameters: {} SerdeInfo: Parameters: serialization.format: '1' SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe SkewedInfo: SkewedColumnNames: [] SkewedColumnValueLocationMaps: {} SkewedColumnValues: [] SortColumns: [] StoredAsSubDirectories: false TableType: EXTERNAL_TABLE ProductReviewsPrepStatement: Type: AWS::Athena::PreparedStatement Properties: Description: "Amazon Athena prepared statement which retrieves a specific Amazon.com product id's reviews which have the given star rating and are marked as helpful more than a given number of times (e.g. product_id = 'BT00DDVMVQ', star_rating = 4, helpful_votes > 10)." StatementName: product_helpful_reviews WorkGroup: !Ref AthenaWorkGroup QueryStatement: > SELECT product_id, product_title, star_rating, helpful_votes, review_headline, review_body FROM amazon_reviews_parquet WHERE product_id = ? AND star_rating = ? AND helpful_votes > ? ORDER BY helpful_votes DESC LIMIT 10; ProductReviewsNamedQuery: Type: AWS::Athena::NamedQuery Properties: Description: "Amazon Athena SQL statement which retrieves a specific Amazon.com product id's reviews which have the given star rating and are marked as helpful more than a given number of times (e.g. product_id = 'BT00DDVMVQ', star_rating = 4, helpful_votes > 10)." Database: !Ref GlueDatabase Name: product_helpful_reviews_nq WorkGroup: !Ref AthenaWorkGroup QueryString: > SELECT product_id, product_title, star_rating, helpful_votes, review_headline, review_body FROM amazon_reviews_parquet WHERE product_id = ? AND star_rating = ? AND helpful_votes > ? ORDER BY helpful_votes DESC LIMIT 10; CategoryReviewCountPrepStatement: Type: AWS::Athena::PreparedStatement Properties: Description: "Amazon Athena prepared statement which retrieves the count of reviews for a given Amazon.com product category (e.g. product_category: 'Video_Games', 'Books')." StatementName: category_review_count WorkGroup: !Ref AthenaWorkGroup QueryStatement: > SELECT marketplace, count(*) as reviews FROM "amazon_reviews_parquet" WHERE product_category = ? GROUP BY marketplace CategoryReviewCountNamedQuery: Type: AWS::Athena::NamedQuery Properties: Description: "Amazon Athena SQL statement which retrieves the count of reviews for a given Amazon.com product category (e.g. product_category: 'Video_Games', 'Books')." Database: !Ref GlueDatabase Name: category_review_count_nq WorkGroup: !Ref AthenaWorkGroup QueryString: > SELECT marketplace, count(*) as reviews FROM "amazon_reviews_parquet" WHERE product_category = ? GROUP BY marketplace TopRegionalProductsPrepStatement: Type: AWS::Athena::PreparedStatement Properties: Description: "Amazon Athena prepared statement which retrieves Amazon.com products based on average star rating in a given country code (e.g. marketplace: 'US', 'UK', 'JP')." StatementName: top_regional_products WorkGroup: !Ref AthenaWorkGroup QueryStatement: > SELECT marketplace, product_id, product_title, count(*) as reviews, avg(star_rating) as avg_rating FROM "amazon_reviews_parquet" GROUP BY marketplace, product_id, product_title HAVING marketplace = ? ORDER BY avg_rating desc, reviews desc LIMIT 10 TopRegionalProductsNamedQuery: Type: AWS::Athena::NamedQuery Properties: Description: "Amazon Athena SQL statement which retrieves Amazon.com products based on average star rating in a given country code (e.g. marketplace: 'US', 'UK', 'JP')." Database: !Ref GlueDatabase Name: top_regional_products_nq WorkGroup: !Ref AthenaWorkGroup QueryString: > SELECT marketplace, product_id, product_title, count(*) as reviews, avg(star_rating) as avg_rating FROM "amazon_reviews_parquet" GROUP BY marketplace, product_id, product_title HAVING marketplace = ? ORDER BY avg_rating desc, reviews desc LIMIT 10 IAMRoleLambdaAthena: Type: AWS::IAM::Role Properties: RoleName: !Sub LambdaAthenaExecutionRole-${AWS::StackName} Description: Lambda IAM service role for running Athena queries in ParameterizedStatementsWG workgroup Path: /service-role/ AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: sts:AssumeRole Effect: Allow Principal: Service: lambda.amazonaws.com ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyName: !Sub 'IAMPolicyLambdaAthena-${AWS::StackName}' PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - athena:StartQueryExecution - athena:GetQueryExecution - athena:GetPreparedStatement - athena:GetNamedQuery - athena:GetQueryResults - athena:ListNamedQueries - athena:ListPreparedStatements Resource: !Sub arn:aws:athena:*:*:workgroup/${AthenaWorkGroup} - Effect: Allow Action: - glue:GetTable - glue:GetPartition - glue:GetPartitions - glue:GetDatabase - glue:UpdateTable - glue:UpdatePartitions - glue:UpdateDatabase - glue:BatchCreatePartition Resource: - arn:aws:glue:*:*:catalog - arn:aws:glue:*:*:database/default - arn:aws:glue:*:*:database/athena_prepared_statements - arn:aws:glue:*:*:table/default/amazon_reviews_parquet - arn:aws:glue:*:*:table/athena_prepared_statements/amazon_reviews_parquet - Effect: Allow Action: - s3:ListBucket - s3:GetObject - s3:DescribeJob Resource: - arn:aws:s3:::amazon-reviews-pds - arn:aws:s3:::amazon-reviews-pds/parquet/* - Effect: Allow Action: - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:PutObject - s3:DescribeJob Resource: - !Sub arn:aws:s3:::${S3QueryResultsBucketName} - !Sub arn:aws:s3:::${S3QueryResultsBucketName}/* - !Sub arn:aws:s3:::athena-results-${AWS::AccountId}-${AWS::Region} - !Sub arn:aws:s3:::athena-results-${AWS::AccountId}-${AWS::Region}/* LambdaTableRepair: Type: AWS::Lambda::Function Properties: Description: Lambda function to repair table FunctionName: !Sub LambdaRepairFunction-${AWS::StackName} Handler: index.lambda_handler MemorySize: 128 Role: !GetAtt IAMRoleLambdaAthena.Arn Runtime: python3.9 Timeout: 180 Code: ZipFile: | """ Disclaimer: The sample code is provided to you as AWS Content under the AWS Customer Agreement, or the relevant written agreement between you and AWS (whichever applies). You are responsible for testing, securing, and optimizing the AWS Content, such as sample code, as appropriate for production grade use based on your specific quality control practices and standards. You should not use this AWS Content in your production accounts, or on production or other critical data. Deploying AWS Content may incur AWS charges for creating or using AWS chargeable resources, such as running Amazon EC2 instances or using Amazon S3 storage """ import boto3 import cfnresponse import time athena = boto3.client('athena') def lambda_handler(event, context): # --- Publish event details to CloudWatch for troubleshooting print(event) response = athena.start_query_execution( QueryString=f'MSCK REPAIR TABLE amazon_reviews_parquet', WorkGroup='ParameterizedStatementsWG', QueryExecutionContext={ 'Database': 'athena_prepared_statements' } ) query_id = response['QueryExecutionId'] state = athena.get_query_execution( QueryExecutionId=query_id )['QueryExecution']['Status']['State'] # Once the query execution has started, Lambda can return the execution id # to the requestor where a separate Lambda function can be used to check # on its status. # For simplicity of this example, I'm including a wait state instead while state == 'QUEUED' or state == 'RUNNING': time.sleep(5) state = athena.get_query_execution( QueryExecutionId=query_id )['QueryExecution']['Status']['State'] data = { 'QueryExecutionId': query_id } cfnresponse.send(event, context, cfnresponse.SUCCESS, data) ExecuteRepair: Type: AWS::CloudFormation::CustomResource Properties: ServiceToken: !GetAtt LambdaTableRepair.Arn IAMRoleLambdaAuthorizer: Type: AWS::IAM::Role Properties: RoleName: !Sub LambdaAuthorizerExecutionRole-${AWS::StackName} Description: Lambda IAM service role for running Athena queries in ParameterizedStatementsWG workgroup Path: /service-role/ AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: sts:AssumeRole Effect: Allow Principal: Service: lambda.amazonaws.com ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole LambdaAthena: Type: AWS::Lambda::Function Properties: Description: Lambda function to call Athena FunctionName: !Sub LambdaAthenaFunction-${AWS::StackName} Handler: index.lambda_handler MemorySize: 128 Role: !GetAtt IAMRoleLambdaAthena.Arn Runtime: python3.9 Timeout: 30 Layers: - !Sub arn:aws:lambda:${AWS::Region}:280475519630:layer:boto3-1_24:1 Code: ZipFile: | """ Disclaimer: The sample code is provided to you as AWS Content under the AWS Customer Agreement, or the relevant written agreement between you and AWS (whichever applies). You are responsible for testing, securing, and optimizing the AWS Content, such as sample code, as appropriate for production grade use based on your specific quality control practices and standards. You should not use this AWS Content in your production accounts, or on production or other critical data. Deploying AWS Content may incur AWS charges for creating or using AWS chargeable resources, such as running Amazon EC2 instances or using Amazon S3 storage """ import json import boto3 import time athena = boto3.client('athena') def lambda_handler(event, context): # --- Publish event details to CloudWatch for troubleshooting print("Event:") print(json.dumps(event, sort_keys=True)) if 'body' not in event: raise Exception("Request is missing body") body = json.loads(event['body']) statement = body.get('statement') statement_type = body.get('type') input_parameters = body.get('parameters', []) workgroup = body.get('workgroup', 'ParameterizedStatementsWG') try: if statement_type == "prepared": # Executing prepared statement with parameterized queries response = athena.start_query_execution( QueryString=f'EXECUTE {statement}', WorkGroup=workgroup, QueryExecutionContext={ 'Database': 'athena_prepared_statements' }, ExecutionParameters=input_parameters ) else: # Executing standard SQL statement with parameterized queries response = athena.start_query_execution( QueryString=statement, WorkGroup=workgroup, QueryExecutionContext={ 'Database': 'athena_prepared_statements' }, ExecutionParameters=input_parameters ) except Exception as error: print(error) return { 'statusCode': 500, 'body': error.args[0] } query_id = response['QueryExecutionId'] state = athena.get_query_execution( QueryExecutionId=query_id )['QueryExecution']['Status']['State'] # Once the query execution has started, Lambda can return the execution id # to the requestor where a separate Lambda function can be used to check # on its status. # For simplicity of this example, I'm including a wait state instead while state == 'QUEUED' or state == 'RUNNING': time.sleep(1) response = athena.get_query_execution( QueryExecutionId=query_id ) state = response['QueryExecution']['Status']['State'] if state != 'SUCCEEDED': return json.dumps(response, indent=4, sort_keys=True, default=str) else: return athena.get_query_results(QueryExecutionId=query_id) LambdaAuthorizer: Type: AWS::Lambda::Function Properties: Description: API Gateway Lambda authorizer function FunctionName: !Sub LambdaAuthorizerFunction-${AWS::StackName} Handler: index.lambda_handler MemorySize: 128 Role: !GetAtt IAMRoleLambdaAuthorizer.Arn Runtime: python3.9 Timeout: 30 Environment: Variables: PASSPHRASE: !Ref APIPassphrase Code: ZipFile: | """ Disclaimer: The sample code is provided to you as AWS Content under the AWS Customer Agreement, or the relevant written agreement between you and AWS (whichever applies). You are responsible for testing, securing, and optimizing the AWS Content, such as sample code, as appropriate for production grade use based on your specific quality control practices and standards. You should not use this AWS Content in your production accounts, or on production or other critical data. Deploying AWS Content may incur AWS charges for creating or using AWS chargeable resources, such as running Amazon EC2 instances or using Amazon S3 storage """ import os def lambda_handler(event, context): # --- Publish event details to CloudWatch for troubleshooting print(event) # --- Authorization check passphrase = os.getenv('PASSPHRASE') auth = False if event['headers'].get('authorization', None) == passphrase: auth = True return { "isAuthorized": auth } APIGatewayAthena: Type: AWS::ApiGatewayV2::Api Properties: Name: !Sub 'AthenaAPI-${AWS::StackName}' ProtocolType: HTTP CorsConfiguration: AllowOrigins: - '*' AllowMethods: - '*' AllowHeaders: - '*' APIGWStage: Type: AWS::ApiGatewayV2::Stage Properties: StageName: $default ApiId: !Ref APIGatewayAthena AutoDeploy: True APIGWAuthorizer: Type: AWS::ApiGatewayV2::Authorizer Properties: Name: !Sub 'LambdaAPIAuthorizer-${AWS::StackName}' ApiId: !Ref APIGatewayAthena AuthorizerType: REQUEST AuthorizerPayloadFormatVersion: 2.0 AuthorizerUri: !Sub 'arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaAuthorizer.Arn}/invocations' EnableSimpleResponses: True IdentitySource: - $request.header.Authorization APILambdaAthenaIntegration: Type: AWS::ApiGatewayV2::Integration Properties: ApiId: !Ref APIGatewayAthena Description: Lambda Integration IntegrationMethod: POST IntegrationType: AWS_PROXY IntegrationUri: !GetAtt LambdaAthena.Arn PayloadFormatVersion: '2.0' APIGWRoute: Type: AWS::ApiGatewayV2::Route Properties: RouteKey: POST / ApiId: !Ref APIGatewayAthena AuthorizationType: CUSTOM AuthorizerId: !Ref APIGWAuthorizer Target: !Join - / - - integrations - !Ref APILambdaAthenaIntegration AthenaLambdaAPIGWResourcePermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref LambdaAthena Principal: apigateway.amazonaws.com SourceArn: !Sub arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${APIGatewayAthena}/*/* LambdaAuthAPIGWResourcePermissions: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref LambdaAuthorizer Principal: apigateway.amazonaws.com SourceArn: !Sub arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${APIGatewayAthena}/*/* LambdaGetPrepStatements: Type: AWS::Lambda::Function Properties: Description: Lambda function to list prepared statements FunctionName: !Sub GetPrepStatements-${AWS::StackName} Handler: index.lambda_handler MemorySize: 128 Role: !GetAtt IAMRoleLambdaAthena.Arn Runtime: python3.9 Timeout: 30 Code: ZipFile: | """ Disclaimer: The sample code is provided to you as AWS Content under the AWS Customer Agreement, or the relevant written agreement between you and AWS (whichever applies). You are responsible for testing, securing, and optimizing the AWS Content, such as sample code, as appropriate for production grade use based on your specific quality control practices and standards. You should not use this AWS Content in your production accounts, or on production or other critical data. Deploying AWS Content may incur AWS charges for creating or using AWS chargeable resources, such as running Amazon EC2 instances or using Amazon S3 storage """ import boto3 from collections import defaultdict athena = boto3.client("athena") def lambda_handler(event, context): print("Event:") print(event) workgroup = "ParameterizedStatementsWG" prep_statements = defaultdict(list) prepared_statements = athena.list_prepared_statements( WorkGroup=workgroup ) for statement in prepared_statements['PreparedStatements']: query_details = athena.get_prepared_statement( StatementName=statement['StatementName'], WorkGroup=workgroup )['PreparedStatement'] prep_statements[workgroup].append({ 'name': query_details['StatementName'], 'description': query_details['Description'], 'query': query_details['QueryStatement'] }) return prep_statements LambdaGetNamedQueries: Type: AWS::Lambda::Function Properties: Description: Lambda function to list named queries FunctionName: !Sub GetNamedQueries-${AWS::StackName} Handler: index.lambda_handler MemorySize: 128 Role: !GetAtt IAMRoleLambdaAthena.Arn Runtime: python3.9 Timeout: 30 Code: ZipFile: | """ Disclaimer: The sample code is provided to you as AWS Content under the AWS Customer Agreement, or the relevant written agreement between you and AWS (whichever applies). You are responsible for testing, securing, and optimizing the AWS Content, such as sample code, as appropriate for production grade use based on your specific quality control practices and standards. You should not use this AWS Content in your production accounts, or on production or other critical data. Deploying AWS Content may incur AWS charges for creating or using AWS chargeable resources, such as running Amazon EC2 instances or using Amazon S3 storage """ import boto3 from collections import defaultdict athena = boto3.client("athena") def lambda_handler(event, context): print("Event:") print(event) workgroup = "ParameterizedStatementsWG" named_queries = defaultdict(list) list_of_named_queries = athena.list_named_queries( WorkGroup=workgroup ) for query_id in list_of_named_queries['NamedQueryIds']: query_details = athena.get_named_query( NamedQueryId=query_id )['NamedQuery'] named_queries[workgroup].append({ 'name': query_details['Name'], 'description': query_details['Description'], 'query': query_details['QueryString'] }) return named_queries APILambdaGetPrepStatementsIntegration: Type: AWS::ApiGatewayV2::Integration Properties: ApiId: !Ref APIGatewayAthena Description: Lambda Integration IntegrationMethod: GET IntegrationType: AWS_PROXY IntegrationUri: !GetAtt LambdaGetPrepStatements.Arn PayloadFormatVersion: '2.0' APILambdaGetNamedQueriesIntegration: Type: AWS::ApiGatewayV2::Integration Properties: ApiId: !Ref APIGatewayAthena Description: Lambda Integration IntegrationMethod: GET IntegrationType: AWS_PROXY IntegrationUri: !GetAtt LambdaGetNamedQueries.Arn PayloadFormatVersion: '2.0' APIPrepStatementsRoute: Type: AWS::ApiGatewayV2::Route Properties: RouteKey: GET /prepared-statements ApiId: !Ref APIGatewayAthena AuthorizationType: CUSTOM AuthorizerId: !Ref APIGWAuthorizer Target: !Join - / - - integrations - !Ref APILambdaGetPrepStatementsIntegration APINamedQueriesRoute: Type: AWS::ApiGatewayV2::Route Properties: RouteKey: GET /named-queries ApiId: !Ref APIGatewayAthena AuthorizationType: CUSTOM AuthorizerId: !Ref APIGWAuthorizer Target: !Join - / - - integrations - !Ref APILambdaGetNamedQueriesIntegration LambdaGetPrepStatementsAPIGWPermissions: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref LambdaGetPrepStatements Principal: apigateway.amazonaws.com SourceArn: !Sub arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${APIGatewayAthena}/*/* LambdaGetNamedQueriesAPIGWPermissions: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref LambdaGetNamedQueries Principal: apigateway.amazonaws.com SourceArn: !Sub arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${APIGatewayAthena}/*/* Outputs: APIEndpoint: Value: !Sub ${APIGatewayAthena.ApiEndpoint} RepairQueryId: Value: !GetAtt - ExecuteRepair - QueryExecutionId