# BasicTick: End of Day (EOD) Processing
This notebook an example of an end of day (EOD) process that adds the contents of an RDB as a changeset to an HDB.

Instead of moving between python and q code this notebook makes use of PyKX to do all its work with the RDB, HDB, and Gateway for end of day processing.

**RDB: Save Day's Data**
1. Save table and sym locally    
2. Savedown: add changeset to database    

**HDB: Update**
1. Update the Cluster's Database to New Changeset ID

**Gateway: Re-Connect**
1. Update the Database Connections

In [1]:
# scratch location on RDB
scratch_path = "/opt/kx/app/scratch"

# clean rdb?
clear_rdb = True

In [2]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import os
import boto3
import json
import datetime

import pykx as kx

from managed_kx import *
from env_kdb_1 import *

from basictick_setup import *

In [3]:
# triggers credential get
session=None

try:
    # aws: use ada for credentials
    subprocess.call(["which", "ada"])
    os.system(f"ada credentials update --account={ACCOUNT_ID} --provider=isengard --role=Admin --once")
except: 
    None

if AWS_ACCESS_KEY_ID is None:
    print("Using Defaults ...")
    # create AWS session: using access variables
    session = boto3.Session()
else:
    print("Using variables ...")
    session = boto3.Session(
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        aws_session_token=AWS_SESSION_TOKEN
    )

# create finspace client
client = session.client(service_name='finspace', endpoint_url=ENDPOINT_URL)

Using variables ...


# Current State of HDB

In [4]:
# Query the HDB
hdb = get_pykx_connection(client, 
                          environmentId=ENV_ID, clusterName=HDB_CLUSTER_NAME, 
                          userName=KDB_USERNAME, boto_session=session)

# Dates and Counts
before_update_pdf = hdb("select counts:count i by date from example").pd()

# Number of Rows
before_rows = hdb("count example").py()

# RDB: Save Day's Data
1. Save table and sym locally   
2. Savedown: add changeset to database

In [5]:
# Connect to the RDB
rdb = get_pykx_connection(client, 
                          environmentId=ENV_ID, clusterName=RDB_CLUSTER_NAME, 
                          userName=KDB_USERNAME, boto_session=session)
# Dates and Counts
rdb_sample_pdf = rdb("select [-5] from example").pd()
rdb_rows = rdb("count example").py()

display(rdb_sample_pdf)
print(f"Rows: {rdb_rows:,}")

Unnamed: 0,sym,time,number
0,aoe,2023-07-26 19:23:44.675384594,50
1,igl,2023-07-26 19:23:44.675384594,79
2,nfp,2023-07-26 19:23:44.675384594,58
3,oop,2023-07-26 19:23:44.685372093,7
4,fac,2023-07-26 19:23:44.685372093,69


Rows: 3,258,921


## Step 1: Save table and sym locally

In [6]:
# date directory
today = datetime.date.today()

date_dir = today.strftime("%Y.%m.%d")

print( f"Saving to: {date_dir}" )
rdb( f".Q.dpfts[`:{scratch_path};{date_dir};`sym;`example;`sym]" )
print( f"Saved to: {scratch_path}" )


Saving to: 2023.07.26
Saved to: /opt/kx/app/scratch


## Step 2: Savedown: add changeset to database

a. Table of changes for the changset  
b. Create Changeset  
c. Wait for Changeset to be added

In [7]:
# pandas table that catalogs the changeset's contents
cr = [
    {'input_path': f'{scratch_path}/{date_dir}', 'database_path': f'/{date_dir}/', 'change_type':'PUT'},
    {'input_path': f'{scratch_path}/sym', 'database_path': f'/', 'change_type':'PUT'},
]

display(cr)

# send to rdb
rdb['c_r'] = cr

[{'input_path': '/opt/kx/app/scratch/2023.07.26',
  'database_path': '/2023.07.26/',
  'change_type': 'PUT'},
 {'input_path': '/opt/kx/app/scratch/sym',
  'database_path': '/',
  'change_type': 'PUT'}]

In [8]:
print(f"Creating changeset for: {DB_NAME}")

res = rdb(f".aws.create_changeset[\"{DB_NAME}\";c_r]")

CHANGESET_ID = str(res.get("id"))
print(f"ChangesetID: {CHANGESET_ID}")

Creating changeset for: basictickdb
ChangesetID: OMTJ1CPJNAvq0RQF3o7xFQ


In [9]:
# wait for ingestion
wait_for_changeset_status(client, environmentId=ENV_ID, databaseName=DB_NAME, changesetId=CHANGESET_ID, show_wait=True)
print("** Done **")

Status is IN_PROGRESS, total wait 0:00:00, waiting 10 sec ...
Status is IN_PROGRESS, total wait 0:00:10, waiting 10 sec ...
Status is IN_PROGRESS, total wait 0:00:20, waiting 10 sec ...
Status is IN_PROGRESS, total wait 0:00:30, waiting 10 sec ...
Status is IN_PROGRESS, total wait 0:00:40, waiting 10 sec ...
** Done **


### Optional: Clean up RDB
Optionally clean up by deleting files created and clear the example table.

In [10]:
# clear the RDB
if clear_rdb:
    print(f"Cleaning: {scratch_path}")

    rdb(f"system \" rm -rf {scratch_path}/*\"")

    # remove tables
    rdb("delete from `example")
    rdb("delete c_r from `.")

Cleaning: /opt/kx/app/scratch


# HDB: Update
Update the cluster's database to new changeset.


In [11]:
DB_CONFIG=[{'databaseName': DB_NAME,
   'cacheConfigurations': [{'cacheType': 'CACHE_1000', 'dbPaths': ['/']}],
   'changesetId': CHANGESET_ID}]

client.update_kx_cluster_databases(environmentId=ENV_ID, clusterName=HDB_CLUSTER_NAME, databases=DB_CONFIG)

wait_for_cluster_status(client, environmentId=ENV_ID, clusterName=HDB_CLUSTER_NAME)
print("** Done **")

** Done **


# Gateway: Re-Connect
Using PyKX, connect to the Gateway cluster and have it re-connect to its Databases. Connection to the HDB would have been lost during the HDB update process.


In [12]:
# Connect to the Gateway with PyKX
gw = get_pykx_connection(client, 
                          environmentId=ENV_ID, clusterName=GW_CLUSTER_NAME, 
                          userName=KDB_USERNAME, boto_session=session)

# reinit the gateway, will re-connect to databases
gw("reinit[hdb_name; rdb_name]")

# State of connected processes
display( gw("select process, handle, connected, address from .conn.procs").pd() )

Unnamed: 0,process,handle,connected,address
0,rdb,21,True,:ip-192-168-7-230.ec2.internal:5000:GATEWAY_basictickdb:Host=ip-192-168-7-230.ec2.internal&Port=5000&User=GATEWAY_basictickdb&Action=finspace%3AConnectKxCluster&X-Amz-Security-Token=IQoJb3JpZ2luX2VjELT%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FwEaCXVzLWVhc3QtMSJGMEQCICoYJWSJDTdFTxl7%2FxTem%2FjbeZWBzVNZ2nX%2Fjy3jhXWxAiA%2FCya%2BSKa4uMPqWQGRQz%2F9esmOpoROQcid4PznrD0sTir3AghNEAAaDDgyOTg0NTk5ODg4OSIMOOM%2FRAfKEm0ivdJcKtQCkKEzUkVaZ%2BAUhv3wueF6fVku8TEU%2BQBSOoI9kxkSqn07rR8iwMHTgtFbwljpClitcOo%2BT4y7nlVR0cEgPpN8RE%2FUddnpiRRP227%2BvtKUw%2B9Nu4hi9dh046qRMYyQciN%2F234Ve3q7xbucSU0ZX3byCRN3KOfvZfSWjcyNsiteQm%2F57zdXqIKf6ou9jbyw96PBmxp5PtS5MV5Zjo42%2FWNR8QiYeTuRUYbQP8zU38tInY7F3gLUV4M8RTqD34iETO0CF2pXKQ0jXigHPx%2BKGmSAxvRaNTBffnc7MiouBvhywdZS8%2FopWyy8Z2j4vTAk77NlLuT2QaKHDZ1L%2FB2QrjfpxR0Gb6mSPHJoF8cy3f%2F%2Bvki2sLrYJTZLXr6sxH6QJg%2FIKUQzt%2FcuseWFX0hKMUfnxayFviRJtOTwOcXzfXeF%2F02jGpUOfAaUCUB5ktql3JXdxpkOKjCz7IWmBjrAAQu3c8A%2BlKR7KnODdxnQD8ABH9qnClcj%2BBlU7ugIkJwh7O0qwXtZSSUpUL%2BsxiAvyXq79%2BT77Ycx7cxRemmJw7QVM8lqkspWm9bcMNkELspZiIQdflUmXnKvqWXgM%2Fdrjt0YDwcR0u7k1Ht4sB1QPL%2Fmx6lPEqAIEYfQCQu2RuairfTDjGN42WOsZ97hGjtya2M%2F7KUsRZDzsXlXTBE9A4cQ2KVJGu40ZtzFocp8947jUsH9UgvqrrTJjitudfjPJA%3D%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20230726T193827Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=ASIA4CNVNBUU52PW3OOT%2F20230726%2Fus-east-1%2Ffinspace-apricot%2Faws4_request&X-Amz-Signature=467499e2a5fb509565b6c7467fce2f31bdf05c074999b4debc4af2e06cd24309
1,hdb,22,True,:ip-192-168-0-64.ec2.internal:5000:GATEWAY_basictickdb:Host=ip-192-168-0-64.ec2.internal&Port=5000&User=GATEWAY_basictickdb&Action=finspace%3AConnectKxCluster&X-Amz-Security-Token=IQoJb3JpZ2luX2VjELT%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FwEaCXVzLWVhc3QtMSJHMEUCIAeD0ZNIa4IlUo6JTW5rQHMckntQOd8JPGCY1O%2F8nNZ%2FAiEAyE9OBQcX3750NPcRAwmQ%2BY9Jyh81r3c4P1qNWOPjAYwq9wIITRAAGgw4Mjk4NDU5OTg4ODkiDEbg8fzsF40QnAOxOyrUAr38G%2BOA6j3aPYGNUzX7NcEQ6XVJQoVU%2B0Q%2FUkT%2B3bPH%2Fa%2F1QiTfqu7fzmPoUMzaK9upuErbKT3Zz%2FaRZuDPdunUUxFLXQTDvzLkjIgJJxSgQH5KJNtrSG2ibfTXKPivYUbcEmb1awIsD%2FhUTLwE2WPBSileku13X7MbFPbDSWiz3%2FiTjqszL4Bdy7gi0rkCqM5188aipGieWZRdnDWWjTV%2BlTxNFN%2FgKk%2BuZeIiahBtXau38lHKXj22lx8aeLwEC%2BNx7IF0vAtUOzbtZFbsYHKQjzPZ6g1R13PDo9GEhkMUsoolO51ET2v1CwTIAoX4yonOEbVuS%2Ful4tvWlqxVFkts8Qg1jdK9LKcZxMd%2BeMuCpCia%2BdT%2BuGdk3r3sc0BuC0xnF5zgOxexq6xW1A6hc1Tm%2FrhUJzppqORWNGily7R8qITlvPGh3AZoLh%2BJ%2BuCtK39JvhkwtOyFpgY6vwEDo4VFyhjCVrdngBLlP6X1S59cwPb0AXC4g7YhpqNugeEZg2hNLyqrSFxMlnDam5EMfust9x%2F1bFI%2Ft2NeixHQmlSY6bXtiJXUmPY%2Bm%2BC14uveeWySpp2ZFeYSUi1yl1o78B7B7S8KB%2BUu6KNu%2FQ80Xr%2BjYXgHuLSu%2BoJ6pO6YpF6UEja5IXMneKzmTFZkFqm%2FxExfZoufrwkW%2BY%2Bbr1lZRtKOl%2FXeGStGhCPCkXf0dTEFzL1f07zVna6GwhkJ8w%3D%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20230726T193828Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=ASIA4CNVNBUUSMD4MC4A%2F20230726%2Fus-east-1%2Ffinspace-apricot%2Faws4_request&X-Amz-Signature=a13aa88ade2cee1b5f71bb4da94d8d1b017601adca89e548c86291dfdea3187d
2,hdb,23,True,:ip-192-168-5-75.ec2.internal:5000:GATEWAY_basictickdb:Host=ip-192-168-5-75.ec2.internal&Port=5000&User=GATEWAY_basictickdb&Action=finspace%3AConnectKxCluster&X-Amz-Security-Token=IQoJb3JpZ2luX2VjELT%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FwEaCXVzLWVhc3QtMSJIMEYCIQDUrl9KuaLg7uUKtcEpFdf770YBI024i6NQppRuzmKbsgIhAPHt%2Fpdq7bdVXKZ3epOwaBUq9wZSWx31yx3VQhwDr65PKvcCCE0QABoMODI5ODQ1OTk4ODg5Igzh5fk6lck86lrg4TYq1ALag7fx1Kqn06m%2BzYNhEs64LnecDDSxnOmaCqq%2Fr%2BbtKUikWIty2rawg4YXsz6u3gq5sb9aZ7Ynq4GhNy1qrWasiQHSq2khGlQvcSlNiNemUXjGehj1ZVsIZo7QvWmt1YdRLrY%2F8c4GMO8OslrovONLHSnggrk7iFf6lJJhdqsNy8OmF6mMK0g6YkLx3DTJfC6P3%2FiZkqrBR%2FTBlalnD9kckrUvXBC5tRoR6WSvtba2esLfGa9r907Da47Gsu0t2vZtLhm8nIqckV8AJILdEzsw72UbAXBw8cax41zNosqTsXilo93atcnDsF30jor6DwR0MWls0OpkRZ196snJX5TG7ii55ZiydC2oZx%2BfiOfCKqXWzf27ZozRSWsUiVYmhg6U4FflS2BPdbcGQtYt9k%2B66ZwR9N6Y1sET5OzVYDBQ40TNY4sCUOfVFA%2FKny1uB4209V9bMLTshaYGOr4BsdHZLS90vtoseTV1jd2Mtcaxa6qXvySJrJEdFBOSXM7H4S8fpLkKLI8FoZViU8IlGcLNzIQ%2FzLaVn799yKOAy0kvtYp1xZzGw9CBwkdkjclWsLubq%2FSRJdUKDz2RAvNShWGSOgmbVkJFvDYS9oP07YCET8X52vGUOUTjjkAQqa8VkIMqch4qW2GHQXCDuYC3I%2BhgJwBDxaDyQ%2Bs4kYc5VhvnHPNtp3HQli5xi8hwqGaW%2B3aV3JAB1Bb56AAzFQ%3D%3D&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20230726T193828Z&X-Amz-SignedHeaders=host&X-Amz-Expires=900&X-Amz-Credential=ASIA4CNVNBUU2ELVJGLT%2F20230726%2Fus-east-1%2Ffinspace-apricot%2Faws4_request&X-Amz-Signature=4478101265a2e2ee9c97c6658eb9c33f5944b7d7ec7d825b5aa64afe051ae709


# HDB: Before and After
Dates and counts of the HDB before update and after.

In [13]:
# Query the HDB for after state
hdb = get_pykx_connection(client, 
                          environmentId=ENV_ID, clusterName=HDB_CLUSTER_NAME, 
                          userName=KDB_USERNAME, boto_session=session)

# Latest Dates and Counts
after_update_pdf = hdb("select counts:count i by date from example").pd()
after_rows = hdb("count example").py()

### Before

In [14]:
display(before_update_pdf)

# Number of Rows
print(f"Rows: {before_rows:,}")

Unnamed: 0_level_0,counts
date,Unnamed: 1_level_1
2023-04-14,1000000
2023-04-15,1000000
2023-04-16,1000000
2023-04-17,1000000
2023-04-18,1000000
2023-04-19,1000000
2023-04-20,1000000
2023-04-21,1000000
2023-04-22,1000000
2023-04-23,1000000


Rows: 43,061,601


### After

In [15]:
display(after_update_pdf)

# Number of Rows
print(f"Rows: {after_rows:,}")

Unnamed: 0_level_0,counts
date,Unnamed: 1_level_1
2023-04-14,1000000
2023-04-15,1000000
2023-04-16,1000000
2023-04-17,1000000
2023-04-18,1000000
2023-04-19,1000000
2023-04-20,1000000
2023-04-21,1000000
2023-04-22,1000000
2023-04-23,1000000


Rows: 46,320,522


In [16]:
print( f"Last Run: {datetime.datetime.now()}" )

Last Run: 2023-07-26 19:38:29.470993
