""" Test the entire flow under a happy path scenario """ import os import random import string import time import boto3 import pytest import requests from fixtures import iam_auth, get_order, get_product # pylint: disable=import-error from helpers import get_parameter # pylint: disable=import-error,no-name-in-module @pytest.fixture def products_table(): # TODO: replace data manipulation with backoffice API calls table_name = get_parameter("/ecommerce/{Environment}/products/table/name") return boto3.resource("dynamodb").Table(table_name) # pylint: disable=no-member @pytest.fixture def delivery_pricing_api_url(): return get_parameter("/ecommerce/{Environment}/delivery-pricing/api/url") @pytest.fixture def frontend_api_url(): return get_parameter("/ecommerce/{Environment}/frontend-api/api/url") @pytest.fixture def payment_3p_api_url(): return get_parameter("/ecommerce/{Environment}/payment-3p/api/url") @pytest.fixture(scope="module") def user_pool_id(): return get_parameter("/ecommerce/{Environment}/users/user-pool/id") @pytest.fixture(scope="module") def password(): """ Generate a unique password for the user """ return "".join( random.choices(string.ascii_uppercase, k=10) + random.choices(string.ascii_lowercase, k=10) + random.choices(string.digits, k=5) + random.choices(string.punctuation, k=3) ) @pytest.fixture(scope="module") def email(): """ Generate a unique email address for the user """ return "".join(random.choices(string.ascii_lowercase, k=20))+"@example.local" @pytest.fixture(scope="module") def client_id(user_pool_id): """ Return a user pool client """ cognito = boto3.client("cognito-idp") # Create a Cognito User Pool Client response = cognito.create_user_pool_client( UserPoolId=user_pool_id, ClientName="ecommerce-{}-frontend-api-test".format(os.environ["ECOM_ENVIRONMENT"]), GenerateSecret=False, ExplicitAuthFlows=["ADMIN_NO_SRP_AUTH"] ) # Return the client ID client_id = response["UserPoolClient"]["ClientId"] yield client_id # Delete the client cognito.delete_user_pool_client( UserPoolId=user_pool_id, ClientId=client_id ) @pytest.fixture(scope="module") def user_id(user_pool_id, email, password): """ User ID generated by Cognito """ cognito = boto3.client("cognito-idp") # Create a Cognito user response = cognito.admin_create_user( UserPoolId=user_pool_id, Username=email, UserAttributes=[{ "Name": "email", "Value": email }], MessageAction="SUPPRESS" ) user_id = response["User"]["Username"] cognito.admin_set_user_password( UserPoolId=user_pool_id, Username=user_id, Password=password, Permanent=True ) cognito.admin_add_user_to_group( UserPoolId=user_pool_id, Username=user_id, GroupName="admin" ) # Return the user ID yield user_id # Delete the user cognito.admin_delete_user( UserPoolId=user_pool_id, Username=user_id ) @pytest.fixture(scope="module") def jwt_token(user_pool_id, user_id, client_id, email, password): """ Returns a JWT token for API Gateway """ cognito = boto3.client("cognito-idp") response = cognito.admin_initiate_auth( UserPoolId=user_pool_id, ClientId=client_id, AuthFlow="ADMIN_NO_SRP_AUTH", AuthParameters={ "USERNAME": email, "PASSWORD": password } ) return response["AuthenticationResult"]["IdToken"] def test_happy_path(get_order, products_table, jwt_token, frontend_api_url, payment_3p_api_url): """ Test an order journey with a happy path """ # Generate an order request order = get_order() order_request = { "products": order["products"], "address": order["address"] } del order # Seed the products table # TODO: use backoffice API instead with products_table.batch_writer() as batch: for product in order_request["products"]: batch.put_item(Item=product) # Get the delivery price query = """ query($input: DeliveryPricingInput!) { getDeliveryPricing(input: $input) { pricing } } """ variables = { "input": { "products": order_request["products"], "address": order_request["address"] } } res = requests.post( frontend_api_url, headers={"Authorization": jwt_token}, json={ "query": query, "variables": variables } ) body = res.json() print("GET DELIVERY PRICING", body) assert "pricing" in body["data"]["getDeliveryPricing"] order_request["deliveryPrice"] = body["data"]["getDeliveryPricing"]["pricing"] # Get the paymentToken total = order_request["deliveryPrice"] + sum([p["price"]*p.get("quantity", 1) for p in order_request["products"]]) res = requests.post(payment_3p_api_url+"/preauth", json={ "cardNumber": "1234567890123456", "amount": total }) order_request["paymentToken"] = res.json()["paymentToken"] # Create the order query = """ mutation ($order: CreateOrderRequest!) { createOrder(order: $order) { success message errors order { orderId } } } """ variables = { "order": order_request } res = requests.post( frontend_api_url, headers={"Authorization": jwt_token}, json={ "query": query, "variables": variables } ) body = res.json() print("CREATE ORDER", body) assert body["data"]["createOrder"]["success"] == True order_id = body["data"]["createOrder"]["order"]["orderId"] # Wait time.sleep(5) # Retrieve the packaging request query = """ query ($input: PackagingInput!) { getPackagingRequest(input: $input) { orderId status products { productId quantity } } } """ variables = { "input": { "orderId": order_id } } res = requests.post( frontend_api_url, headers={"Authorization": jwt_token}, json={ "query": query, "variables": variables } ) body = res.json() print("GET PACKAGE REQUEST", body) pr = body["data"]["getPackagingRequest"] packaging_products = {p["productId"]: p for p in pr["products"]} for product in order_request["products"]: assert product["productId"] in packaging_products.keys() assert packaging_products[product["productId"]]["quantity"] == product.get("quantity", 1) # Start working on the packaging request query = """ mutation ($input: PackagingInput!) { startPackaging(input: $input) { success } } """ variables = { "input": { "orderId": order_id } } res = requests.post( frontend_api_url, headers={"Authorization": jwt_token}, json={ "query": query, "variables": variables } ) body = res.json() print("START PACKAGING", body) assert body["data"]["startPackaging"]["success"] == True # Complete the packaging request query = """ mutation ($input: PackagingInput!) { completePackaging(input: $input) { success } } """ variables = { "input": { "orderId": order_id } } res = requests.post( frontend_api_url, headers={"Authorization": jwt_token}, json={ "query": query, "variables": variables } ) body = res.json() print("COMPLETE PACKAGING", body) assert body["data"]["completePackaging"]["success"] == True # Wait time.sleep(10) # Check the order query = """ query($orderId: ID!) { getOrder(orderId: $orderId) { orderId status } } """ variables = { "orderId": order_id } res = requests.post( frontend_api_url, headers={"Authorization": jwt_token}, json={ "query": query, "variables": variables } ) body = res.json() print("CHECK ORDER", body) assert body["data"]["getOrder"]["status"] == "PACKAGED" # Retrieve the delivery request query = """ query($input: DeliveryInput!) { getDelivery(input: $input) { orderId address { name companyName streetAddress postCode city state country phoneNumber } } } """ variables = { "input": { "orderId": order_id } } res = requests.post( frontend_api_url, headers={"Authorization": jwt_token}, json={ "query": query, "variables": variables } ) body = res.json() print("GET DELIVERY", body) assert body["data"]["getDelivery"]["orderId"] == order_id assert body["data"]["getDelivery"]["address"] == order_request["address"] # Start delivery query = """ mutation ($input: DeliveryInput!) { startDelivery(input: $input) { success } } """ variables = { "input": { "orderId": order_id } } res = requests.post( frontend_api_url, headers={"Authorization": jwt_token}, json={ "query": query, "variables": variables } ) body = res.json() print("START DELIVERY", body) assert body["data"]["startDelivery"]["success"] == True # Complete delivery query = """ mutation ($input: DeliveryInput!) { completeDelivery(input: $input) { success } } """ variables = { "input": { "orderId": order_id } } res = requests.post( frontend_api_url, headers={"Authorization": jwt_token}, json={ "query": query, "variables": variables } ) body = res.json() print("COMPLETE DELIVERY", body) assert body["data"]["completeDelivery"]["success"] == True # Wait time.sleep(5) # Check the order query = """ query($orderId: ID!) { getOrder(orderId: $orderId) { orderId status } } """ variables = { "orderId": order_id } res = requests.post( frontend_api_url, headers={"Authorization": jwt_token}, json={ "query": query, "variables": variables } ) body = res.json() print("CHECK ORDER", body) assert body["data"]["getOrder"]["status"] == "FULFILLED" # Remove products from the products table # TODO: use backoffice API instead with products_table.batch_writer() as batch: for product in order_request["products"]: batch.delete_item(Key={"productId": product["productId"]})