# Integrate Modern Data Architectures with Generative AI and interact using prompts for querying SQL databases & APIs

This notebook demonstrates how **_large language models, such as Flan T5 XL,accessible via SamgeMaker JumpStart_** interact with AWS databases, data stores, and third-party data warehousing solutions like Snowflake. We showcase this interaction 1) by generating and running SQL queries, and 2) making requests to API endpoints. We achieve all of this by using the LangChain framework, which allows the language model to interact with its environment and connect with other sources of data. The LangChain framework operates based on the following principles: calling out to a language model, being data-aware, and being agentic. Our notebook focuses on establishing database connections to various data sources, consolidating metadata, and returning fact-based data points in response to user queries using LLMs and LangChain.


<img src='images/img-genai-sql-langchain.png' width="800" height="600">


Step 1. Connection to various channels through which LLMs can talk to your data. These channels include:

    - RedShift Serverless - to connect to datastore 'tickit'(ticket is referred as tickit in the sample data store) to retrieve information regarding ticket sales.
    - Aurora - MySQL Serverless - to connect to datastore that hosts information about the employees.
    - S3/Athena - to connect to the SageMaker's offline feature store on claims information. 
    - Snowflake - to connect to stocks related data residing in finance schema of 3rd party software.
    - APIs - to connect to meteo(in this example we use Langchain's sample dataset on meteo) to retrieve weather information.
    
Step 2. Usage of Dynamic generation of prompt templates by populating metadata of the tables using Glue Data Catalog(GDC) as context. GDC was populated by running a crawler on the databases. Refer to the information here to create and run a glue crawler. In case of api, a line item was created in GDC data extract.

Step 3. Define Functions to 1/ determine the best data channel to answer the user query, 2/ Generate response to  user query

Step 4. Apply user query to LLM and Langchain to determine the data channel. After determining the data channel, run the Langchain SQL Database chain to convert 'text to sql' and run the query against the source data channel. 

Finally, display the results.


### Pre-requisites:
1. Use kernel Base Python 3.0.
2. Deploy resources using the cloudformation template mda-llm-cfn.yml.

[OPTIONAL] - If need is to add any of the sources below, then uncomment code in the relevant sections.

1. Setup [Aurora MySQL Serverless database](https://aws.amazon.com/getting-started/hands-on/building-serverless-applications-with-amazon-aurora-serverless/?ref=gsrchandson). Load sample dataset for Human Resource department. Use this notebook to load the data into Aurora MySQL.
2. Setup [Redshift Serverless](https://catalog.workshops.aws/redshift-immersion/en-US/lab1). Load sample data for Sales & Marketing. For example, 'sample data dev' for 'tickit' dataset available in RedShift examples.
3. Setup External database. In this case, we are using Snowflake account and populating stocks data. Use this notebook to load the data into Snowflake.
4. Add/modify the [Glue Crawler](https://catalog.us-east-1.prod.workshops.aws/workshops/71b5bdcf-7eb1-4549-b851-66adc860cd04/en-US/2-studio/1-crawler) on all the databases mentioned above. 

**Note - This notebook was tested on kernel - conda_python3 in Region us-east-1**

In [None]:
%%writefile requirements.txt
sqlalchemy==1.4.47
snowflake-sqlalchemy
langchain==0.0.166
sqlalchemy-aurora-data-api
PyAthena[SQLAlchemy]==2.25.2
redshift-connector==2.0.910
sqlalchemy-redshift==0.8.14

In [None]:
!pip install -r requirements.txt

In [None]:
import json
import boto3

import sqlalchemy
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL

from langchain.docstore.document import Document
from langchain import PromptTemplate,SagemakerEndpoint,SQLDatabase, SQLDatabaseChain, LLMChain
from langchain.llms.sagemaker_endpoint import LLMContentHandler
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts.prompt import PromptTemplate
from langchain.chains import SQLDatabaseSequentialChain

from langchain.chains.api.prompt import API_RESPONSE_PROMPT
from langchain.chains import APIChain
from langchain.prompts.prompt import PromptTemplate

from langchain.chains.api import open_meteo_docs

from typing import Dict

In [None]:
#define content handler class for flant5xl model
from langchain.llms.sagemaker_endpoint import LLMContentHandler
class ContentHandler(LLMContentHandler):
    content_type = "application/json"
    accepts = "application/json"
    
    def transform_input(self, prompt, model_kwargs) :
        test = {"text_inputs": prompt}
        encoded_json = json.dumps(test).encode("utf-8")
        return encoded_json
    
    def transform_output(self, output):
        response_json = json.loads(output.read().decode("utf-8")).get('generated_texts')
        print("response" , response_json)
        return "".join(response_json)

content_handler = ContentHandler()

The data for this COVID-19 dataset is stored in a public accessible S3 bucket. You can use the following command to explore the dataset.

!aws s3 ls s3://covid19-lake/ --no-sign-request

### Read parameters from Cloud Formation stack
Some of the resources needed for this notebook such as the LLM model endpoint, the AWS Glue database and Glue crawler are created through a cloud formation template. The next block of code extracts the outputs and parameters of the cloud formation stack created from that template to get the value of these parameters.

*The stack name here should match the stack name you used when creating the cloud formation stack.*

In [None]:
# if used a different name while creating the cloud formation stack then change this to match the name you used
CFN_STACK_NAME = "cfn-genai-mda"

In [None]:
stacks = boto3.client('cloudformation').list_stacks()
stack_found = CFN_STACK_NAME in [stack['StackName'] for stack in stacks['StackSummaries']]

In [None]:
from typing import List
def get_cfn_outputs(stackname: str) -> List:
    cfn = boto3.client('cloudformation')
    outputs = {}
    for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

def get_cfn_parameters(stackname: str) -> List:
    cfn = boto3.client('cloudformation')
    params = {}
    for param in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Parameters']:
        params[param['ParameterKey']] = param['ParameterValue']
    return params

if stack_found is True:
    outputs = get_cfn_outputs(CFN_STACK_NAME)
    params = get_cfn_parameters(CFN_STACK_NAME)
    LLMEndpointName = outputs['LLMEndpointName']
    glue_crawler_name = params['CFNCrawlerName']
    glue_database_name = params['CFNDatabaseName']
    glue_databucket_name = params['DataBucketName']
    region = outputs['Region']
    print(f"cfn outputs={outputs}\nparams={params}")
else:
    print("Recheck our cloudformation stack name")

### Copy the sample dataset to your S3 bucket

In [None]:
!aws s3 cp --recursive s3://covid19-lake/rearc-covid-19-testing-data/json/states_daily/ s3://{glue_databucket_name}/covid-dataset/

### Run the crawler

In [None]:
%%writefile python_glueworkshop.py
import boto3
import argparse
import time

argParser = argparse.ArgumentParser()
argParser.add_argument("-c", "--glue_crawler_name", help="script help")
args = argParser.parse_args()
print(args.glue_crawler_name )
client = boto3.client('glue')
crawler_name=args.glue_crawler_name

def get_crawler_status(crawler_name):
    # Create a Glue client
    glue_client = boto3.client('glue')

    # Get the crawler details
    response = glue_client.get_crawler(Name=crawler_name)

    # Extract the crawler state
    crawler_state = response['Crawler']['State']

    return crawler_state

# This is the command to start the Crawler
try:
    response = client.start_crawler(Name=crawler_name )
    print("Successfully started crawler. The crawler may take 2-5 mins to detect the schema.")

    while True:
        # Get the crawler status
        status = get_crawler_status(crawler_name)

        # Print the crawler status
        print(f"Crawler '{crawler_name}' status: {status}")

        if status == 'READY':  # Replace 'READY' with the desired completed state
            break  # Exit the loop if the desired state is reached

        time.sleep(10)  # Sleep for 10 seconds before checking the status again
    
except:
    print("error in starting crawler. Check the logs for the error details.")


Execute the python script by passing the glue crawler name from the cloudformation stack output.

In [None]:
!python python_glueworkshop.py -c {glue_crawler_name}

Before proceeding to the next step, check the status of the crawler. It should change from RUNNING to READY. 

### Step 1 - Connect to databases using SQL Alchemy. 

Under the hood, LangChain uses SQLAlchemy to connect to SQL databases. The SQLDatabaseChain can therefore be used with any SQL dialect supported by SQLAlchemy, 
such as MS SQL, MySQL, MariaDB, PostgreSQL, Oracle SQL, and SQLite. Please refer to the SQLAlchemy documentation for more information about requirements for connecting to your database. 


**Important**: The code below establishes a database connection for data sources and Large Language Models. Please note that the solution will only work if the database connection for your sources is defined in the cell below. Please refer to the Pre-requisites section. If your use case requires data from Aurora MySQL alone, then please comment out other data sources. Furthermore, please update the cluster details and variables for Aurora MySQL accordingly.

In [None]:
#define connections

#LLM 
llm=SagemakerEndpoint(
        endpoint_name=LLMEndpointName, 
        region_name=region,
        model_kwargs={"temperature":1e-10},
        content_handler=content_handler
      )

#S3
# connect to s3 using athena
## athena variables
connathena=f"athena.{region}.amazonaws.com" 
portathena='443' #Update, if port is different
schemaathena=glue_database_name #from cfn params
s3stagingathena=f's3://{glue_databucket_name}/athenaresults/'#from cfn params
wkgrpathena='primary'#Update, if workgroup is different
# tablesathena=['dataset']#[<tabe name>]
##  Create the athena connection string
connection_string = f"awsathena+rest://@{connathena}:{portathena}/{schemaathena}?s3_staging_dir={s3stagingathena}/&work_group={wkgrpathena}"
##  Create the athena  SQLAlchemy engine
engine_athena = create_engine(connection_string, echo=False)
dbathena = SQLDatabase(engine_athena)
# dbathena = SQLDatabase(engine_athena, include_tables=tablesathena)

# collect credentials from Secrets Manager
#Refer here on how to use AWS Secrets Manager - https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html
# client = boto3.client('secretsmanager')

# #SNOWFLAKE
# # connect to snowflake database
# ## snowflake variables
# sf_account_id = <your snowflake account id>
# sf_secret_id =<your snowflake credentials secret id>
# dwh = <your dwh>
# db = <your database>
# schema = <your database schema>
# table = <table name>
# ## snowflake get credentials from secrets manager
# response = client.get_secret_value(SecretId=sf_secret_id)
# secrets_credentials = json.loads(response['SecretString'])
# sf_password = secrets_credentials['password']
# sf_username = secrets_credentials['username']
# ##  Create the snowflake connection string
# connection_string = f"snowflake://{sf_username}:{sf_password}@{sf_account_id}/{db}/{schema}?warehouse={dwh}"
# ##  Create the snowflake  SQLAlchemy engine
# engine_snowflake = create_engine(connection_string, echo=False)
# dbsnowflake = SQLDatabase(engine_snowflake)

# #AURORA MYSQL
# ##connect to aurora mysql
# ##aurora mysql cluster details/variables
# cluster_arn = <your cluster arn>
# secret_arn =<your cluster secret arn>
# rdsdb=<your database>
# rdsdb_tbl = [<table name>]
# ##  Create the aurora connection string
# connection_string = f"mysql+auroradataapi://:@/{rdsdb}"
# ##  Create the aurora  SQLAlchemy engine
# engine_rds = create_engine(connection_string, echo=False,connect_args=dict(aurora_cluster_arn=cluster_arn, secret_arn=secret_arn))
# dbrds = SQLDatabase(engine_rds, include_tables=rdsdb_tbl)

# #REDSHIFT
# # connect to redshift database
# ## redshift variables
# rs_secret_id = <redshift secret id>
# rs_endpoint=<redshift endpoint>
# rs_port=<redshift port>
# rs_db=<redshift database>
# rs_schema=<redshift database schema>
# ## redshift get credentials from secrets manager
# response = client.get_secret_value(SecretId=rs_secret_id)
# secrets_credentials = json.loads(response['SecretString'])
# rs_password = secrets_credentials['password']
# rs_username = secrets_credentials['username']
# ##  Create the redshift connection string
# connection_string = f"redshift+redshift_connector://{rs_username}:{rs_password}@{rs_endpoint}:{rs_port}/{rs_db}"
# engine_redshift = create_engine(connection_string, echo=False)
# dbredshift = SQLDatabase(engine_redshift)

#Glue Data Catalog
##Provide list of all the databases where the table metadata resides after the glue successfully crawls the table
# gdc = ['redshift-sagemaker-sample-data-dev', 'snowflake','rds-aurora-mysql-employees','sagemaker_featurestore'] # mentioned a few examples here
gdc = [schemaathena] 


### Step 2 - Generate Dynamic Prompt Templates
Build a consolidated view of Glue Data Catalog by combining metadata stored for all the databases in pipe delimited format.

In [12]:
#Generate Dynamic prompts to populate the Glue Data Catalog
#harvest aws crawler metadata

def parse_catalog():
    #Connect to Glue catalog
    #get metadata of redshift serverless tables
    columns_str=''
    
    #define glue cient
    glue_client = boto3.client('glue')
    
    for db in gdc:
        response = glue_client.get_tables(DatabaseName =db)
        for tables in response['TableList']:
            #classification in the response for s3 and other databases is different. Set classification based on the response location
            if tables['StorageDescriptor']['Location'].startswith('s3'):  classification='s3' 
            else:  classification = tables['Parameters']['classification']
            for columns in tables['StorageDescriptor']['Columns']:
                    dbname,tblname,colname=tables['DatabaseName'],tables['Name'],columns['Name']
                    columns_str=columns_str+f'\n{classification}|{dbname}|{tblname}|{colname}'                     
    #API
    ## Append the metadata of the API to the unified glue data catalog
    columns_str=columns_str+'\n'+('api|meteo|weather|weather')
    return columns_str

glue_catalog = parse_catalog()

#display a few lines from the catalog
print('\n'.join(glue_catalog.splitlines()[-10:]) )


s3|cfn_covid_lake|cfn_covid_dataset|totaltestresults
s3|cfn_covid_lake|cfn_covid_dataset|fips
s3|cfn_covid_lake|cfn_covid_dataset|deathincrease
s3|cfn_covid_lake|cfn_covid_dataset|hospitalizedincrease
s3|cfn_covid_lake|cfn_covid_dataset|negativeincrease
s3|cfn_covid_lake|cfn_covid_dataset|positiveincrease
s3|cfn_covid_lake|cfn_covid_dataset|totaltestresultsincrease
s3|cfn_covid_lake|cfn_covid_dataset|negative
s3|cfn_covid_lake|cfn_covid_dataset|pending
api|meteo|weather|weather


### Step 3 - Define Functions to 1/ determine the best data channel to answer the user query, 2/ Generate response to  user query

In this code sample, we use the Anthropic Model to generate inferences. You can utilize SageMaker JumpStart models  to achieve the same. 
Guidance on how to use the JumpStart Models is available in the notebook - mda_with_llm_langchain_smjumpstart_flant5xl

In [None]:
#Function 1 'Infer Channel'
#define a function that infers the channel/database/table and sets the database for querying
def identify_channel(query):
    #Prompt 1 'Infer Channel'
    ##set prompt template. It instructs the llm on how to evaluate and respond to the llm. It is referred to as dynamic since glue data catalog is first getting generated and appended to the prompt.
    prompt_template = """
     From the table below, find the database (in column database) which will contain the data (in corresponding column_names) to answer the question 
     {query} \n
     """+glue_catalog +""" 
     Give your answer as database == 
     Also,give your answer as database.table == 
     """
    ##define prompt 1
    PROMPT_channel = PromptTemplate( template=prompt_template, input_variables=["query"]  )

    # define llm chain
    llm_chain = LLMChain(prompt=PROMPT_channel, llm=llm)
    #run the query and save to generated texts
    generated_texts = llm_chain.run(query)
    print(generated_texts)

    #set the best channel from where the query can be answered
    if 'snowflake' in generated_texts: 
            channel='db'
            db=dbsnowflake 
            print("SET database to snowflake")  
    elif 'redshift'  in generated_texts: 
            channel='db'
            db=dbredshift
            print("SET database to redshift")
    elif 's3' in generated_texts: 
            channel='db'
            db=dbathena
            print("SET database to athena")
    elif 'rdsmysql' in generated_texts: 
            channel='db'
            db=dbrds
            print("SET database to rds")    
    elif 'api' in generated_texts: 
            channel='api'
            print("SET database to weather api")        
    else: raise Exception("User question cannot be answered by any of the channels mentioned in the catalog")
    print("Step complete. Channel is: ", channel)
    
    return channel, db

#Function 2 'Run Query'
#define a function that infers the channel/database/table and sets the database for querying
def run_query(query):

    channel, db = identify_channel(query) #call the identify channel function first

    ##Prompt 2 'Run Query'
    #after determining the data channel, run the Langchain SQL Database chain to convert 'text to sql' and run the query against the source data channel. 
    #provide rules for running the SQL queries in default template--> table info.

    _DEFAULT_TEMPLATE = """Given an input question, first create a syntactically correct {dialect} query to run, then look at the results of the query and return the answer.

    Only use the following tables:

    {table_info}
    if someone asks for covid data, then use the table cfn_covid_lake.cfn_covid_dataset.

    Question: {input}"""

    PROMPT_sql = PromptTemplate(
        input_variables=["input", "table_info", "dialect"], template=_DEFAULT_TEMPLATE
    )

    
    if channel=='db':
        db_chain = SQLDatabaseChain.from_llm(llm, db, prompt=PROMPT_sql, verbose=False, return_intermediate_steps=False)
        response=db_chain.run(query)
    elif channel=='api':
        chain_api = APIChain.from_llm_and_api_docs(llm, open_meteo_docs.OPEN_METEO_DOCS, verbose=True)
        response=chain_api.run(query)
    else: raise Exception("Unlisted channel. Check your unified catalog")
    return response



### Step 4 - Run the run_query function that in turn calls the Langchain SQL Database chain to convert 'text to sql' and runs the query against the source data channel

Some samples are provided below for test runs. Uncomment the query to run.

In [14]:
# Enter the query
## Few queries to try out - 
#athena - Healthcare - Covid dataset
# query = """How many covid hospitalizations were reported in NY in June of 2021?"""  
# query = """Which States reported the least and maximum deaths?""" 
query="How many covid cases are there?"

#snowflake - Finance and Investments
# query = """Which stock performed the best and the worst in May of 2013?"""
# query = """What is the average volume stocks traded  in July of 2013?"""

#rds - Human Resources
# query = """Name all employees with birth date this month""" 
# query = """Combien d'employés sont des femmes? """ #Ask question in French - How  many females are there?
# query = """How many employees were hired before 1990?"""  

#athena - Legal - SageMaker offline featurestore
# query = """How many frauds happened in the year 2023 ?"""  
# query = """How many policies were claimed this year ?""" 

#redshift - Sales & Marketing
# query = """How many tickit sales are there""" 
# query = "what was the total commision for the tickit sales in the year 2008?" 

#api - product - weather
# query = """What is the weather like right now in New York City in degrees Farenheit?"""

#Response from Langchain
response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response}')

response ['s3|cfn_covid_lake|cfn_covid']
s3|cfn_covid_lake|cfn_covid
SET database to athena
Step complete. Channel is:  db
response ['SELECT count(*) FROM cfn_covid_dataset W']
response ['[103900]']
----------------------------------------------------------------------
SQL and response from user query How many covid cases are there?  
  [103900]


### Clean-up
After you run the modern data architecture with Generative AI, make sure to clean up any resources that won’t be utilized. Shutdown and delete the databases used (Amazon Redshift, Amazon RDS, Snowflake). In addition, delete the data in Amazon S3 and make sure to stop any SageMaker Studio notebook instances to not incur any further charges. If you used SageMaker Jumpstart to deploy large language model as SageMaker Real-time Endpoint, delete endpoint either through SageMaker console, or through Studio. 

To completely remove all the provisoned resources, go to CloudFormation and delete the stack.
