{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "configuration parameters below optimize Spark configuration to the data." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2023-05-31T12:12:45.165641Z", "iopub.status.busy": "2023-05-31T12:12:45.164895Z", "iopub.status.idle": "2023-05-31T12:13:04.693194Z", "shell.execute_reply": "2023-05-31T12:13:04.693062Z", "shell.execute_reply.started": "2023-05-31 12:12:45.174688+00:00" }, "tags": [] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[I 2023-05-31 12:12:45,174.174 configure_magic] Magic cell payload received: {\"conf\": {\"spark.pyspark.python\": \"python3\", \"spark.pyspark.virtualenv.enabled\": \"true\", \"spark.kubernetes.executor.node.selector.node-lifecycle\": \"spot\", \"spark.pyspark.virtualenv.type\": \"native\", \"spark.pyspark.virtualenv.bin.path\": \"/usr/bin/virtualenv\", \"spark.sql.files.ignoreCorruptFiles\": \"true\", \"spark.dynamicAllocation.executorIdleTimeout\": \"18000\", \"spark.driver.memory\": \"32g\", \"spark.driver.cores\": \"32\", \"spark.driver.maxResultSize\": \"24g\", \"spark.executor.memory\": \"32g\", \"spark.network.timeout\": \"300\", \"spark.executor.cores\": \"6\", \"spark.yarn.executor.Overhead\": \"12g\", \"spark.dynamicAllocation.maxExecutors\": \"500\", \"livy.server.session.timeout\": \"24h\", \"spark.sql.shuffle.partitions\": \"15000\"}, \"proxyUser\": \"assumed-role_fdp_blitvin-Isengard\"}\n", "\n", "[I 2023-05-31 12:12:45,174.174 configure_magic] Sending request to update kernel. Please wait while the kernel will be refreshed.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "The kernel is successfully refreshed." ] } ], "source": [ "%%configure -f\n", "{ \"conf\":{\n", " \"spark.pyspark.python\": \"python3\"\n", " ,\"spark.pyspark.virtualenv.enabled\": \"true\"\n", " ,\"spark.kubernetes.executor.node.selector.node-lifecycle\":\"spot\"\n", " ,\"spark.pyspark.virtualenv.type\":\"native\"\n", " ,\"spark.pyspark.virtualenv.bin.path\":\"/usr/bin/virtualenv\"\n", " ,\"spark.sql.files.ignoreCorruptFiles\":\"true\"\n", " ,\"spark.dynamicAllocation.executorIdleTimeout\":\"18000\"\n", " ,\"spark.driver.memory\":\"32g\"\n", " ,\"spark.driver.cores\":\"32\"\n", " ,\"spark.driver.maxResultSize\":\"24g\"\n", " ,\"spark.executor.memory\":\"32g\"\n", " ,\"spark.network.timeout\":\"300\"\n", " ,\"spark.executor.cores\":\"6\"\n", " ,\"spark.yarn.executor.Overhead\":\"12g\"\n", " ,\"spark.dynamicAllocation.maxExecutors\":\"500\"\n", " ,\"livy.server.session.timeout\":\"24h\"\n", " ,\"spark.sql.shuffle.partitions\":\"15000\"\n", " }\n", "} " ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "execution": { "iopub.execute_input": "2023-05-31T12:14:57.493175Z", "iopub.status.busy": "2023-05-31T12:14:57.492575Z", "iopub.status.idle": "2023-05-31T12:14:57.498422Z", "shell.execute_reply": "2023-05-31T12:14:57.497211Z", "shell.execute_reply.started": "2023-05-31T12:14:57.493139Z" }, "tags": [] }, "outputs": [], "source": [ "# ,\"spark.sql.adaptive.enabled\":\"true\"\n", "# ,\"spark.sql.adaptive.coalescePartitions.enabled\":\"true\"\n", "# ,\"spark.sql.adaptive.coalescePartitions.initialPartitionNum\":\"1000\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "https://towardsdatascience.com/basics-of-apache-spark-configuration-settings-ca4faff40d45\n", "https://luminousmen.com/post/spark-tips-partition-tuning\n", "https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-partitionby/" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "execution": { "iopub.execute_input": "2023-05-31T12:14:58.566941Z", "iopub.status.busy": "2023-05-31T12:14:58.566364Z", "iopub.status.idle": "2023-05-31T12:15:00.902252Z", "shell.execute_reply": "2023-05-31T12:15:00.901081Z", "shell.execute_reply.started": "2023-05-31T12:14:58.566906Z" }, "tags": [] }, "outputs": [], "source": [ "import pandas as pd\n", "import pyarrow\n", "import s3fs\n", "import fsspec\n", "import matplotlib\n", "import matplotlib.pyplot as plt\n", "import pyspark.sql.functions as py_f\n", "from pyspark.sql.window import Window" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "execution": { "iopub.execute_input": "2023-05-31T12:15:00.904718Z", "iopub.status.busy": "2023-05-31T12:15:00.904173Z", "iopub.status.idle": "2023-05-31T12:15:01.069293Z", "shell.execute_reply": "2023-05-31T12:15:01.067974Z", "shell.execute_reply.started": "2023-05-31T12:15:00.904682Z" }, "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "3.3.0-amzn-1 \n", "\n", "\n", "('spark.kubernetes.executor.pod.allowlistFile', '/etc/spark/conf/executor-pod-template-pod-allowlist.txt')\n", "('spark.eventLog.enabled', 'true')\n", "('spark.eventLog.dir', 'file:///var/log/spark/apps')\n", "('spark.kubernetes.memoryOverheadFactor', '0.4')\n", "('spark.kubernetes.executor.podTemplateContainerName', 'spark-kubernetes-executor')\n", "('spark.dynamicAllocation.maxExecutors', '500')\n", "('spark.kubernetes.driverEnv.HTTP2_DISABLE', 'true')\n", "('spark.sql.parquet.output.committer.class', 'com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter')\n", "('spark.driver.cores', '32')\n", "('spark.blacklist.decommissioning.timeout', '1h')\n", "('spark.kubernetes.driver.node.selector.node-lifecycle', 'on-demand')\n", "('spark.kubernetes.driver.label.kernel_id', 'fe646132-3324-45e3-a0b1-486d99cb4b6a')\n", "('spark.hadoop.dynamodb.customAWSCredentialsProvider', 'com.amazonaws.auth.WebIdentityTokenCredentialsProvider')\n", "('spark.kubernetes.driver.container.allowlistFile', '/etc/spark/conf/driver-pod-template-container-allowlist.txt')\n", "('spark.sql.emr.internal.extensions', 'com.amazonaws.emr.spark.EmrSparkSessionExtensions')\n", "('spark.dynamicAllocation.executorAllocationRatio', '1')\n", "('spark.kubernetes.driver.podTemplateContainerName', 'spark-kubernetes-driver')\n", "('spark.kubernetes.namespace', 'adxuseast1emr')\n", "('spark.history.fs.logDirectory', 'file:///var/log/spark/apps')\n", "('spark.kubernetes.executor.selector.node.role', 'notebook')\n", "('spark.kubernetes.executor.node.selector.spark-role', 'executor')\n", "('spark.yarn.heterogeneousExecutors.enabled', 'false')\n", "('spark.driver.extraLibraryPath', '/etc/hadoop/conf:/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native')\n", "('spark.pyspark.python', 'python3')\n", "('spark.kubernetes.container.image.pullPolicy', 'Always')\n", "('spark.kubernetes.submitInDriver', 'true')\n", "('spark.kubernetes.executor.podNamePrefix', 'kfe646132-3324-45e3-a0b1-486d99cb4b6a-660ebf8871b9dedf')\n", "('spark.kubernetes.driver.node.selector.spark-role', 'driver')\n", "('spark.driver.defaultJavaOptions', \"-XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70\")\n", "('spark.executor.defaultJavaOptions', \"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'\")\n", "('spark.executor.extraClassPath', '/etc/hadoop/conf:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/docker/usr/lib/hadoop-lzo/lib/*:/docker/usr/lib/hadoop/hadoop-aws.jar:/docker/usr/share/aws/aws-java-sdk/*:/docker/usr/share/aws/emr/emrfs/conf:/docker/usr/share/aws/emr/emrfs/lib/*:/docker/usr/share/aws/emr/emrfs/auxlib/*:/docker/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/docker/usr/share/aws/emr/security/conf:/docker/usr/share/aws/emr/security/lib/*:/docker/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/docker/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/docker/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/docker/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar')\n", "('spark.driver.extraJavaOptions', \"-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70\")\n", "('spark.driver.bindAddress', '10.0.119.242')\n", "('spark.app.startTime', '1685535185487')\n", "('spark.executor.id', 'driver')\n", "('spark.kubernetes.driver.podTemplateValidation.enabled', 'true')\n", "('spark.hadoop.hive.metastore.client.factory.class', 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory')\n", "('spark.kubernetes.allocation.batch.size', '5')\n", "('spark.driver.port', '7078')\n", "('spark.executor.memory', '32g')\n", "('spark.executorEnv.LOG_CONTEXT_WITH_PATH_SEPARATOR', '')\n", "('spark.decommissioning.timeout.threshold', '20')\n", "('spark.sql.catalogImplementation', 'hive')\n", "('spark.stage.attempt.ignoreOnDecommissionFetchFailure', 'true')\n", "('spark.kubernetes.driver.container.image', '614393260192.dkr.ecr.us-east-1.amazonaws.com/cdk-hnb659fds-container-assets-614393260192-us-east-1:eaafe31dc2e70d39e0b2871e0e6af73ceb184a3978386368425caf36de957b6a')\n", "('spark.pyspark.virtualenv.enabled', 'true')\n", "('spark.kubernetes.driver.pod.allowlistFile', '/etc/spark/conf/driver-pod-template-pod-allowlist.txt')\n", "('spark.kubernetes.driver.label.component', 'kernel')\n", "('spark.dynamicAllocation.shuffleTracking.timeout', '300s')\n", "('spark.dynamicAllocation.executorIdleTimeout', '18000')\n", "('spark.sql.files.ignoreCorruptFiles', 'true')\n", "('spark.kubernetes.executor.label.app', 'enterprise-gateway')\n", "('spark.app.name', 'kfe646132-3324-45e3-a0b1-486d99cb4b6a')\n", "('spark.pyspark.virtualenv.bin.path', '/usr/bin/virtualenv')\n", "('spark.kubernetes.client.dependency.propagation', 'false')\n", "('spark.yarn.executor.Overhead', '12g')\n", "('spark.hadoop.fs.s3.getObject.initialSocketTimeoutMilliseconds', '2000')\n", "('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem', '2')\n", "('spark.authenticate', 'true')\n", "('spark.shuffle.service.enabled', 'false')\n", "('spark.kubernetes.executor.podTemplateFile', '/opt/spark/pod-template/pod-spec-template.yml')\n", "('spark.kubernetes.driver.request.cores', '0.5')\n", "('spark.driver.memory', '32g')\n", "('spark.kubernetes.authenticate.driver.serviceAccountName', 'emr-containers-sa-spark-jeg-kernel-614393260192-1290uyahhbialm60icp0xrrxth1kcn9j6a76dmo6zat8qk68y99ok5m2qt')\n", "('spark.driver.host', 'spark-40308c8871b9b8db-driver-svc.adxuseast1emr.svc')\n", "('spark.pyspark.virtualenv.type', 'native')\n", "('spark.kubernetes.executor.podTemplateValidation.enabled', 'true')\n", "('spark.kubernetes.pyspark.pythonVersion', '3')\n", "('spark.hadoop.fs.defaultFS', 'file:///')\n", "('spark.app.submitTime', '1685535182271')\n", "('spark.kubernetes.driver.pod.name', 'kfe646132-3324-45e3-a0b1-486d99cb4b6a-4f55c78871b9aee2-driver')\n", "('spark.kubernetes.executor.node.selector.node-lifecycle', 'spot')\n", "('spark.serializer.objectStreamReset', '100')\n", "('spark.app.id', 'spark-41c56dec16b348eab9dd44ea60a8981a')\n", "('spark.kubernetes.submission.waitAppCompletion', 'false')\n", "('spark.kubernetes.executor.label.emr-containers.amazonaws.com/kernel-type', 'PySpark')\n", "('spark.submit.deployMode', 'client')\n", "('spark.master', 'k8s://https://172.20.0.1:443')\n", "('spark.kubernetes.executor.container.allowlistFile', '/etc/spark/conf/executor-pod-template-container-allowlist.txt')\n", "('spark.sql.parquet.fs.optimized.committer.optimization-enabled', 'true')\n", "('spark.dynamicAllocation.shuffleTracking.enabled', 'true')\n", "('spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem', 'true')\n", "('spark.kubernetes.executor.label.component', 'worker')\n", "('spark.driver.maxResultSize', '24g')\n", "('spark.network.timeout', '300')\n", "('spark.history.ui.port', '18080')\n", "('spark.driver.blockManager.port', '7079')\n", "('spark.hadoop.fs.s3.customAWSCredentialsProvider', 'com.amazonaws.auth.WebIdentityTokenCredentialsProvider')\n", "('spark.kubernetes.driver.selector.node.role', 'notebook')\n", "('spark.kubernetes.executor.container.image', '614393260192.dkr.ecr.us-east-1.amazonaws.com/cdk-hnb659fds-container-assets-614393260192-us-east-1:eaafe31dc2e70d39e0b2871e0e6af73ceb184a3978386368425caf36de957b6a')\n", "('spark.executor.extraJavaOptions', \"-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'\")\n", "('spark.driver.extraClassPath', '/etc/hadoop/conf:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/docker/usr/lib/hadoop-lzo/lib/*:/docker/usr/lib/hadoop/hadoop-aws.jar:/docker/usr/share/aws/aws-java-sdk/*:/docker/usr/share/aws/emr/emrfs/conf:/docker/usr/share/aws/emr/emrfs/lib/*:/docker/usr/share/aws/emr/emrfs/auxlib/*:/docker/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/docker/usr/share/aws/emr/security/conf:/docker/usr/share/aws/emr/security/lib/*:/docker/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/docker/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/docker/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/docker/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar')\n", "('spark.resourceManager.cleanupExpiredHost', 'true')\n", "('spark.kubernetes.executor.label.kernel_id', 'fe646132-3324-45e3-a0b1-486d99cb4b6a')\n", "('spark.kubernetes.driver.label.app', 'enterprise-gateway')\n", "('spark.files.fetchFailure.unRegisterOutputOnHost', 'true')\n", "('spark.kubernetes.driver.label.emr-containers.amazonaws.com/kernel-type', 'PySpark')\n", "('spark.executor.extraLibraryPath', '/etc/hadoop/conf:/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native')\n", "('spark.eventLog.logBlockUpdates.enabled', 'true')\n", "('spark.kubernetes.resource.type', 'python')\n", "('spark.kubernetes.authenticate.executor.serviceAccountName', 'emr-containers-sa-spark-jeg-kernel-614393260192-1290uyahhbialm60icp0xrrxth1kcn9j6a76dmo6zat8qk68y99ok5m2qt')\n", "('spark.executor.cores', '6')\n", "('spark.rdd.compress', 'True')\n", "('spark.sql.shuffle.partitions', '15000')\n", "('spark.dynamicAllocation.minExecutors', '0')\n", "('spark.submit.pyFiles', '')\n", "('spark.dynamicAllocation.enabled', 'true')\n", "('spark.kubernetes.executor.request.cores', '3.5')\n", "('spark.blacklist.decommissioning.enabled', 'true')\n" ] } ], "source": [ "print(spark.version,\"\\n\\n\")\n", "configurations = spark.sparkContext.getConf().getAll()\n", "for conf in configurations:\n", " print(conf)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We analyzed the data published over the SIP feeds (CTA : CQS / CTS and UTP : UQDF / UTDF) and the depth of the book feeds of the 16 lit venues under the Reg. NMS system to see the impact of these proposed reforms on the quality of the market. \n", "\n", "Definitions \n", "Quoted spread = (bid - ask) / midpoint \n", "Spread - bid - ask \n", "\n", "Results - \n", "1)\tNumber of current odd - lot trades within each bucket. & number of trades in each bucket. \n", " a)\tGraphic concentration \n", " b)\t– look for rationale & see if it is – \n", "2)\tAverage round-lot and odd-lot quoted spreads across each bucket \n", " a)\tmatrix \n", " b)\tHeat map \n", " c)\tHour of the day ? \n", "3)\tEffect on market data- \n", " a)\tAnticipated increase in MD volumes - counts, etc\n", " b)\tNumber of direct feed updates where top of the book is an odd - lot\n", "4)\tCase study around AMZN stock split - \n", " a)\tRound lot spreads for AMZN per exchange - when high priced before the split\n", " b)\tOdd-lot spreads for AMZN per exchange - after the split. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "execution": { "iopub.execute_input": "2023-05-31T12:15:09.747449Z", "iopub.status.busy": "2023-05-31T12:15:09.746925Z", "iopub.status.idle": "2023-05-31T12:27:11.842970Z", "shell.execute_reply": "2023-05-31T12:27:11.841813Z", "shell.execute_reply.started": "2023-05-31T12:15:09.747414Z" }, "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "s3://maystreetdata/feeds_norm/partition_scheme_experiments_7/mstnorm_parquet_0_5_0\n", "mt_oddlot_prepped\n", "mt_roundlot_bbo_prepped\n", "mt_roundlot_nbbo_prepped\n", "mt_oddlot\n", "mt_roundlot_bbo\n", "mt_roundlot_nbbo\n" ] } ], "source": [ "class MtRoundLot():\n", " def __init__(self,part_experiment_id, is_debug,debug_symbol='AMZN'):\n", " self.s3_dir_root=\"s3://maystreetdata/feeds_norm/mstnorm_parquet_0_5_0\"\n", " self.s3_dir_root_prepped=\"s3://maystreetdata/feeds_norm/partition_scheme_experiments_7/mstnorm_parquet_0_5_0/\"\n", " self.s3_dir_partition_experiments =f\"s3://maystreetdata/feeds_norm/partition_scheme_experiments_{part_experiment_id}/mstnorm_parquet_0_5_0\"\n", " self.s3_dir_root_results =f\"s3://maystreetdata/analysis/\"\n", " self.tables={\"mt_roundlot_bbo\":{\"tables\":[f\"{self.s3_dir_root}/mt=bbo_quote/\"]},\n", " \"mt_roundlot_nbbo\":{\"tables\":[f\"{self.s3_dir_root}/mt=nbbo_quote/\"]},\n", " \"mt_oddlot\":{\"tables\":[f\"{self.s3_dir_root}/mt=aggregated_price_update/\"]},\n", " \"mt_trade\":{\"tables\":[f\"{self.s3_dir_root}/mt=trade/\"]}\n", " }\n", " self.tables_prepped={\"mt_roundlot_bbo\":{\"tables\":[f\"{self.s3_dir_root_prepped}/mt_roundlot_bbo.parquet\"]},\n", " \"mt_roundlot_nbbo\":{\"tables\":[f\"{self.s3_dir_root_prepped}/mt_roundlot_nbbo.parquet\"]},\n", " \"mt_oddlot\":{\"tables\":[f\"{self.s3_dir_root_prepped}/mt_oddlot.parquet\"]}\n", " }\n", " self.raw_df ={}\n", " self.raw_df_prepped ={}\n", " self.stats_df ={}\n", " self.data_validation ={}\n", " self.column_map={}\n", " self.joined_df={}\n", " self.column_map['mt_oddlot']={ \n", " 'ask':'AskPrice_1'\n", " ,'bid':'BidPrice_1'\n", " ,'timestamp':'LastExchangeTimestamp'\n", " ,'seq_number':'LastSequenceNumber'\n", " ,'BidQuantity':'BidQuantity_1'\n", " ,'AskQuantity':'AskQuantity_1'\n", " ,'partition_by':['Product','FeedType',\"Feed\",\"dt\",'f','MarketParticipant','is_trading_hours','hour_est']\n", " }\n", " self.column_map['mt_roundlot_bbo']={\n", " 'ask':'AskPrice'\n", " ,'bid':'BidPrice'\n", " ,'timestamp':'ExchangeTimestamp'\n", " ,'seq_number':'SequenceNumber'\n", " ,'BidQuantity':'BidQuantity'\n", " ,'AskQuantity':'AskQuantity'\n", " ,'partition_by':['Product','FeedType',\"Feed\",\"dt\",'f','is_trading_hours','hour_est']\n", " }\n", " self.column_map['mt_roundlot_nbbo']={\n", " 'ask':'AskPrice'\n", " ,'bid':'BidPrice'\n", " ,'timestamp':'ExchangeTimestamp'\n", " ,'seq_number':'SequenceNumber'\n", " ,'BidQuantity':'BidQuantity'\n", " ,'AskQuantity':'AskQuantity'\n", " ,'partition_by':['Product','FeedType',\"Feed\",\"dt\",'f','is_trading_hours','hour_est']\n", " }\n", " self.round_factor=0.333333333333\n", " self.is_debug=is_debug\n", " self.debug_symbol=debug_symbol\n", " print(self.s3_dir_partition_experiments)\n", " def set_data_prepped(self,data_label):\n", " col_map=self.column_map.get(data_label)\n", " data_files = self.tables_prepped.get(data_label).get('tables')\n", " data_df=None\n", " for one_file in data_files:\n", " if self.is_debug:\n", " one_data_df=spark.read.parquet(one_file).where(f\"Product=='{self.debug_symbol}'\")\n", " else:\n", " one_data_df=spark.read.parquet(one_file)\n", " if data_df is None:\n", " data_df=one_data_df\n", " else:\n", " data_df=data_df.union(one_data_df)\n", " self.raw_df_prepped[f\"{data_label}\"]=data_df\n", " def set_data(self,data_label, is_raw=False):\n", " col_map=self.column_map.get(data_label)\n", " data_files = self.tables.get(data_label).get('tables')\n", " data_df=None\n", " for one_file in data_files:\n", " feed_filters = self.tables.get(data_label).get('feeds',None)\n", " path_parts= one_file.split(\"/\")\n", " feed_type=path_parts[len(path_parts)-2:len(path_parts)-1][0]\n", " if feed_filters is not None:\n", " filter_string='\"'+'\",\"'.join(feed_filters)+'\"'\n", " one_data_df = spark.read.parquet(one_file).filter(f'Feed in ({filter_string})') \n", " else:\n", " one_data_df = spark.read.parquet(one_file)\n", " if self.is_debug:\n", " one_data_df=one_data_df.where(f\"Product=='{self.debug_symbol}'\")\n", " if 'f' not in one_data_df.columns:\n", " one_data_df = one_data_df.withColumn('f', py_f.col(\"Feed\"))\n", " if data_df is None:\n", " data_df=one_data_df\n", " else:\n", " data_df=data_df.union(one_data_df)\n", " data_df = data_df.withColumn('FeedType', py_f.lit(feed_type))\\\n", " .select('FeedType','Feed','f','Product',col_map['bid'],col_map['ask'],col_map['timestamp'],col_map['BidQuantity'],col_map['AskQuantity'])\\\n", " .groupBy('FeedType','Feed','f','Product',col_map['timestamp']).agg(\n", " py_f.round(py_f.max(col_map['bid']),3).alias(f'best_bid_{data_label}')\n", " ,py_f.round(py_f.min(col_map['ask']),3).alias(f'best_ask_{data_label}')\n", " ,py_f.round(py_f.max(col_map['BidQuantity']),3).alias(f'bid_quantity_{data_label}')\n", " ,py_f.round(py_f.max(col_map['AskQuantity']),3).alias(f'ask_quantity_{data_label}')\n", " ).withColumnRenamed('FeedType', f'FeedType_{data_label}')\\\n", " .withColumnRenamed('Feed', f'Feed_{data_label}')\\\n", " .withColumnRenamed(col_map['timestamp'], f'exchange_timestamp_{data_label}')\\\n", " .withColumnRenamed('f', f'f_{data_label}')\\\n", " .withColumn(f\"mid_{data_label}\",(py_f.col(f'best_ask_{data_label}')+py_f.col(f'best_bid_{data_label}'))/py_f.lit(2))\\\n", " .withColumn(f\"bid_ask_{data_label}\",(py_f.col(f'best_ask_{data_label}')-py_f.col(f'best_bid_{data_label}'))/py_f.col(f\"mid_{data_label}\")) \\\n", " .withColumn(f'timestamp_ts_utc_{data_label}',py_f.from_unixtime(py_f.col(f'exchange_timestamp_{data_label}')/1000/1000/1000))\\\n", " .withColumn(f'timestamp_ts_est_{data_label}',py_f.from_utc_timestamp((py_f.from_unixtime(py_f.col(f'exchange_timestamp_{data_label}')/1000/1000/1000)),'America/New_York'))\\\n", " .withColumn(f'time_ts_est_{data_label}',py_f.date_format(f'timestamp_ts_est_{data_label}', 'HH:mm:ss'))\\\n", " .withColumn(f'hour_est_{data_label}',py_f.date_format(f'timestamp_ts_est_{data_label}', 'HH'))\\\n", " .withColumn(f'date_est_{data_label}',py_f.date_format(f'timestamp_ts_est_{data_label}', 'yyyy-MM-dd'))\\\n", " .withColumn(f'is_trading_hours', ((py_f.col(f'timestamp_ts_est_{data_label}')>=py_f.lit('09:30:00'))&(py_f.col(f'timestamp_ts_est_{data_label}')<=py_f.lit('15:59:00'))))\n", " \n", " part_by = [f'FeedType_{data_label}',\"Product\"]\n", " self.raw_df[f\"{data_label}\"]=data_df.repartition(15000,*part_by)\n", " def dv_universe(self):\n", " dv_key='universe_check'\n", " self.data_validation[dv_key]={}\n", " for one_key in self.raw_df.keys():\n", " one_df = self.raw_df.get(one_key)\n", " col_name = f\"{one_key}_ticker_count\"\n", " curr_count = one_df.agg(py_f.countDistinct(\"Product\").alias(col_name)).collect()\n", " curr_count = [i.__getitem__(col_name) for i in curr_count][0]\n", " self.data_validation[dv_key][one_key]=curr_count\n", " \n", " def dv_ts_unique(self):\n", " dv_key='ts_unique_check'\n", " self.data_validation[dv_key]={}\n", " for one_key in self.raw_df.keys():\n", " one_df = self.raw_df.get(one_key)\n", " col_map=self.column_map.get(one_key)\n", " ts_field=col_map.get('timestamp')\n", " seq_field=col_map.get('seq_number')\n", " count_alias,countDistinct_alias = f'count_{ts_field}',f'countDistinct_{ts_field}'\n", " uniq_ts_check = one_df.groupBy(col_map.get('partition_by')).agg(\n", " py_f.count(ts_field).alias(count_alias),py_f.countDistinct(ts_field,seq_field).alias(countDistinct_alias)\n", " ).where(f'{count_alias}>{countDistinct_alias}').count()\n", " self.data_validation[dv_key][one_key]=uniq_ts_check\n", " def set_volume_ptile(self):\n", " by_prod_feed=self.raw_df_prepped[\"mt_roundlot_nbbo\"].groupBy('Product').count().orderBy('Product')\n", " by_prod_feed=by_prod_feed.select(\"Product\",'count', \n", " py_f.round((py_f.floor(py_f.percent_rank().over( Window.partitionBy().orderBy(by_prod_feed['count']))/py_f.lit(self.round_factor))*py_f.lit(self.round_factor)),1).alias(\"update_count_pctrank\"))\n", " by_prod_feed=by_prod_feed.withColumn('volume_level',py_f.when(py_f.col('update_count_pctrank')==0.0,'low')\\\n", " .otherwise(py_f.when(py_f.col('update_count_pctrank')==0.3,'moderate').otherwise('high'))).cache()\n", " #by_prod_feed.groupBy('update_count_pctrank').count()\n", " self.volume_rank_df = by_prod_feed\n", "\n", " def set_common_universe(self):\n", " bbo_nbbo_cols = ['Product','Feed','dt','f','bidask_spread_timew_avg','data_count','is_trading_hours','hour_est']\n", " df1=self.stats_df['mt_roundlot_bbo_stats_agg'].select(bbo_nbbo_cols)\\\n", " .withColumnRenamed('bidask_spread_timew_avg',f'bidask_spread_timew_avg_bbo_roundlot').withColumnRenamed('data_count',f'data_count_bbo_roundlot')\n", " df2=self.stats_df['mt_roundlot_nbbo_stats_agg'].select(bbo_nbbo_cols)\\\n", " .withColumnRenamed('bidask_spread_timew_avg',f'bidask_spread_timew_avg_nbbo_roundlot').withColumnRenamed('data_count',f'data_count_nbbo_roundlot')\n", " temp_df = df1.join(df2\n", " ,(df1.Product==df2.Product)\n", " & (df1.Feed==df2.Feed)\n", " & (df1.dt==df2.dt)\n", " & (df1.f==df2.f)\n", " & (df1.is_trading_hours==df2.is_trading_hours)\n", " & (df1.hour_est==df2.hour_est)\n", " ).drop(df2.Product).drop(df2.Feed).drop(df2.dt).drop(df2.f).drop(df2.is_trading_hours).drop(df2.hour_est)\n", "\n", " odd_lot_cols = ['Product','dt','bidask_spread_timew_avg','FeedType','Feed','f','data_count','is_trading_hours','hour_est']\n", " df3 = self.stats_df['mt_oddlot_stats_agg'].select(odd_lot_cols)\\\n", " .withColumnRenamed('bidask_spread_timew_avg',f'bidask_spread_timew_avg_oddlot')\\\n", " .withColumnRenamed('data_count',f'data_count_oddlot')\\\n", " .withColumnRenamed('Feed',f'Feed_oddlot')\\\n", " .withColumnRenamed('f',f'f_oddlot')\n", " final_df=temp_df.join(df3\n", " ,(temp_df.Product==df3.Product)\n", " & (temp_df.dt==df3.dt)\n", " & (temp_df.is_trading_hours==df3.is_trading_hours)\n", " & (temp_df.hour_est==df3.hour_est)\n", " ).drop(df3.Product).drop(df3.dt).drop(temp_df.is_trading_hours).drop(temp_df.hour_est)\n", " volume_rank_df = self.volume_rank_df.select('Product','update_count_pctrank')\n", " final_df=final_df.join(volume_rank_df\n", " ,(volume_rank_df.Product==final_df.Product)).drop(volume_rank_df.Product)\n", " self.stats_df['all_by_symbol_feed_date']=final_df.cache()\n", " \n", " def calc_trade_stats(self):\n", " trades_df= spark.read.parquet('s3://maystreetdata/feeds_norm/mstnorm_parquet_0_5_0/mt=trade/')\n", " trades_df=trades_df.withColumn('is_odd_lot',py_f.when(py_f.col(\"Quantity\")=py_f.lit('09:30:00')) & (py_f.col('time_est')<=py_f.lit('15:59:00'))))\\\n", " .withColumn('avg_price', (py_f.col(f'best_bid_{dl1}')\n", " +py_f.col(f'best_ask_{dl1}')\n", " +py_f.col(f'best_bid_{dl2}')\n", " +py_f.col(f'best_ask_{dl2}'))/4)\\\n", " .withColumn('price_bucket', py_f.when((py_f.col('avg_price')>=0) & (py_f.col('avg_price')<250),'00000_00250').otherwise(\\\n", " py_f.when((py_f.col('avg_price')>=250) & (py_f.col('avg_price')<1000),'00250_01000').otherwise(\\\n", " py_f.when((py_f.col('avg_price')>=1000) & (py_f.col('avg_price')<10000),'01000_10000').otherwise('10000_99999'))\n", " ))\n", " \n", " prev_window = Window.partitionBy(*[part_cols]).orderBy(py_f.col('exchange_timestamp'))\n", " joined_df = joined_df.withColumn(\"prev_exchange_timestamp\", py_f.lag(py_f.col('exchange_timestamp')).over(prev_window))\n", " joined_df = joined_df.withColumn(\"diff_exchange_timestamp\",joined_df.exchange_timestamp-joined_df.prev_exchange_timestamp)\n", " joined_df = joined_df.withColumn(f\"bidask_timeweight_{dl1}\",joined_df[f'bid_ask_{dl1}']*joined_df.diff_exchange_timestamp) \n", " joined_df = joined_df.withColumn(f\"bidask_timeweight_{dl2}\",joined_df[f'bid_ask_{dl2}']*joined_df.diff_exchange_timestamp) \n", " #df_vol_rank=self.volume_rank_df.drop('update_count_pctrank').drop('count')\n", " #joined_df = df_vol_rank.join(py_f.broadcast(joined_df),(df_vol_rank.Product==joined_df.Product)).drop(df_vol_rank.Product)\n", " self.stats_df[f'joined_df_{dl1}_{dl2}']=joined_df\n", " joined_df_stats_by_symbol=joined_df.groupBy('Product',f'Feed_{dl2}',f'f_{dl2}','is_trading_hours','hour_est','price_bucket').agg(\n", " py_f.sum(py_f.col('diff_exchange_timestamp')).alias('diff_exchange_timestamp_sum')\n", " ,py_f.sum(py_f.col(f'bidask_timeweight_{dl1}')).alias(f'bidask_timeweight_{dl1}_sum')\n", " ,py_f.sum(py_f.col(f'bidask_timeweight_{dl2}')).alias(f'bidask_timeweight_{dl2}_sum')\n", " ).withColumn(f'bid_ask_tw_{dl1}',py_f.col(f'bidask_timeweight_{dl1}_sum')/py_f.col('diff_exchange_timestamp_sum'))\\\n", " .withColumn(f'bid_ask_tw_{dl2}',py_f.col(f'bidask_timeweight_{dl2}_sum')/py_f.col('diff_exchange_timestamp_sum'))\\\n", " .orderBy('Product',f'Feed_{dl2}',f'f_{dl2}','is_trading_hours').cache()\n", " self.stats_df[f'joined_df_stats_by_symbol_{dl1}_{dl2}']=joined_df_stats_by_symbol\n", " \n", " joined_df_stats_by_trading_hour=joined_df_stats_by_symbol.groupBy(f'Feed_{dl2}',f'f_{dl2}','is_trading_hours','hour_est')\\\n", " .agg(\n", " py_f.mean(py_f.col(f'bid_ask_tw_{dl1}'))\n", " ,py_f.mean(py_f.col(f'bid_ask_tw_{dl2}'))\n", " ,py_f.count(py_f.col(f'bid_ask_tw_{dl1}'))\n", " ,py_f.count(py_f.col(f'bid_ask_tw_{dl2}'))\n", " )\n", " self.stats_df[f'joined_df_stats_by_trading_hour_{dl1}_{dl2}']=joined_df_stats_by_trading_hour\n", " \n", " self.stats_df[f'2_spread_by_price_bucket_{dl1}_{dl2}']=self.stats_df[f'joined_df_stats_by_symbol_{dl1}_{dl2}']\\\n", " .where(\"is_trading_hours==True\").groupBy('price_bucket')\\\n", " .agg(py_f.mean(f'bid_ask_tw_{dl1}')\\\n", " .alias(f'spread_mean_{dl1}'),py_f.mean(f'bid_ask_tw_{dl2}').alias(f'spread_mean_{dl2}'))\n", " \n", " def add_price_bucket_bbo(self):\n", " roundlot_bbo=self.raw_df.get('mt_roundlot_bbo')\n", " roundlot_bbo =roundlot_bbo.withColumn('avg_price', (py_f.col(f'best_bid_mt_roundlot_bbo')+py_f.col(f'best_ask_mt_roundlot_bbo'))/2)\\\n", " .withColumn('price_bucket', py_f.when((py_f.col('avg_price')>=0) & (py_f.col('avg_price')<250),'00000_00250').otherwise(\\\n", " py_f.when((py_f.col('avg_price')>=250) & (py_f.col('avg_price')<1000),'00250_01000').otherwise(\\\n", " py_f.when((py_f.col('avg_price')>=1000) & (py_f.col('avg_price')<10000),'01000_10000').otherwise('10000_99999'))\n", " ))\n", " self.raw_df['mt_roundlot_bbo']=roundlot_bbo\n", " \n", " def calc_AMZN_before_after_split(self):\n", " \n", " start_date = '2022-06-01'\n", " split_date = '2022-06-06'\n", " end_date = '2022-06-08'\n", " \n", " roundlot_bbo=self.raw_df.get('mt_roundlot_bbo')\\\n", " .where(\"Product=='AMZN'\")\\\n", " .where(f\"date_est_mt_roundlot_bbo >= '{start_date}'\")\\\n", " .where(f\"date_est_mt_roundlot_bbo <= '{end_date}'\")\\\n", " .withColumn('is_odd_lot',py_f.when((py_f.col('bid_quantity_mt_roundlot_bbo')<=100) | (py_f.col('ask_quantity_mt_roundlot_bbo')<=100),'odd lot').otherwise('round lot'))\n", " roundlot_stats_pd=roundlot_bbo.groupBy('price_bucket','is_odd_lot','date_est_mt_roundlot_bbo').agg(py_f.count('exchange_timestamp_mt_roundlot_bbo').alias(\"message_count\")).toPandas()\n", " roundlot_stats_pivot=pd.pivot_table(roundlot_stats_pd,values=\"message_count\",columns=[\"is_odd_lot\"],index=[\"date_est_mt_roundlot_bbo\"])\n", " roundlot_stats_pivot['total']=roundlot_stats_pivot.sum(axis=1)\n", " roundlot_stats_pivot['oddlot_pct']=roundlot_stats_pivot['odd lot']/roundlot_stats_pivot['total']\n", " self.stats_df[f\"3A_AMZN_split\"]=roundlot_stats_pivot\n", " \n", " def calc_oddlot_percent_per_price_bucket(self):\n", " roundlot_bbo=self.raw_df.get('mt_roundlot_bbo')\n", " roundlot_bbo=roundlot_bbo.withColumn('is_odd_lot',py_f.when((py_f.col('bid_quantity_mt_roundlot_bbo')<=100) | (py_f.col('ask_quantity_mt_roundlot_bbo')<=100),'odd lot').otherwise('round lot'))\n", " roundlot_stats = roundlot_bbo.groupBy('price_bucket','is_odd_lot').agg(py_f.count('is_odd_lot').alias('message_count'))\n", " roundlot_stats_pd = roundlot_stats.toPandas()\n", " odd_vs_roundlot_pct=pd.pivot_table(roundlot_stats_pd, values=\"message_count\",index=\"price_bucket\",columns=\"is_odd_lot\")\n", " odd_vs_roundlot_pct=pd.pivot_table(roundlot_stats_pd, values=\"message_count\",index=\"price_bucket\",columns=\"is_odd_lot\")\n", " odd_vs_roundlot_pct['total']=odd_vs_roundlot_pct.sum(axis=1)\n", " odd_vs_roundlot_pct['odd lot pct']=odd_vs_roundlot_pct['odd lot']/odd_vs_roundlot_pct['total']\n", " self.stats_df[f\"3_pct_trading_oddlot_per_price_bucket\"]=odd_vs_roundlot_pct\n", " \n", " def calc_timew_spread(self,data_label):\n", " col_map=self.column_map.get(data_label)\n", " l_df = self.raw_df.get(data_label)\n", " l_df = l_df.withColumn('timestamp_ts_utc',py_f.from_unixtime(py_f.col(col_map.get('timestamp'))/1000/1000/1000))\\\n", " .withColumn('timestamp_ts_est',py_f.from_utc_timestamp((py_f.from_unixtime(py_f.col(col_map.get('timestamp'))/1000/1000/1000)),'America/New_York'))\\\n", " .withColumn('time_est', py_f.date_format('timestamp_ts_est', 'HH:mm:ss'))\\\n", " .withColumn('hour_est', py_f.date_format('timestamp_ts_est', 'HH'))\\\n", " .withColumn('is_trading_hours', ((py_f.col('time_est')>=py_f.lit('09:30:00'))&(py_f.col('time_est')<=py_f.lit('15:59:00'))))\n", " l_df = l_df.withColumn(\"bid_ask\",(py_f.col(col_map.get('ask'))-py_f.col(col_map.get('bid')))/py_f.col(col_map.get('bid')) )\n", " prev_window = Window.partitionBy(*col_map.get('partition_by')).\\\n", " orderBy(py_f.col(col_map.get('timestamp')),py_f.col(col_map.get('seq_number')),l_df.bid_ask.desc())\n", " l_df = l_df.withColumn(\"next_LastReceiptTimestamp\", py_f.lead(py_f.col(col_map.get('timestamp'))).over(prev_window))\n", " l_df = l_df.withColumn(\"diff_LastReceiptTimestamp\",py_f.col(col_map.get('timestamp'))-l_df.next_LastReceiptTimestamp)\n", " l_df = l_df.withColumn(\"bidask_timeweight\",l_df.bid_ask*l_df.diff_LastReceiptTimestamp)\n", " bid_ask_agg= l_df.where('diff_LastReceiptTimestamp is not null and bid_ask<100').groupby(*col_map.get('partition_by')).\\\n", " agg(py_f.sum('diff_LastReceiptTimestamp').alias('time_sum'),\n", " py_f.sum('bidask_timeweight').alias('bidask_timeweight_sum'),\n", " py_f.count(py_f.lit(1)).alias('data_count'))\n", " bid_ask_agg=bid_ask_agg.withColumn(\"bidask_spread_timew_avg\",bid_ask_agg.bidask_timeweight_sum/bid_ask_agg.time_sum) \n", " self.stats_df[f\"{data_label}_stats_intermediate\"]=l_df\n", " self.stats_df[f\"{data_label}_stats_agg\"]=bid_ask_agg\n", " self.stats_df[f\"{data_label}_stats_agg_final\"]=bid_ask_agg.agg(py_f.mean(bid_ask_agg.bidask_spread_timew_avg).alias('bidask_mean_timew'),\n", " py_f.expr('percentile(bidask_spread_timew_avg, array(0.5))').alias('bidask_median_timew'),\n", " py_f.sum(bid_ask_agg.data_count).alias('data_count'))\n", " \n", "\n", "def main(exp_label,is_debug,debug_symbol='AMZN'): \n", " mt_roundlot=MtRoundLot(exp_label,is_debug,debug_symbol) \n", " print('mt_oddlot_prepped')\n", " mt_roundlot.set_data_prepped(\"mt_oddlot\")\n", " print('mt_roundlot_bbo_prepped')\n", " mt_roundlot.set_data_prepped(\"mt_roundlot_bbo\")\n", " print('mt_roundlot_nbbo_prepped')\n", " mt_roundlot.set_data_prepped(\"mt_roundlot_nbbo\")\n", " print('mt_oddlot')\n", " mt_roundlot.set_data(\"mt_oddlot\")\n", " print('mt_roundlot_bbo')\n", " mt_roundlot.set_data(\"mt_roundlot_bbo\")\n", " print('mt_roundlot_nbbo')\n", " mt_roundlot.set_data(\"mt_roundlot_nbbo\")\n", " mt_roundlot.join_dfs('mt_oddlot','mt_roundlot_bbo')\n", " mt_roundlot.join_dfs('mt_oddlot','mt_roundlot_nbbo')\n", " mt_roundlot.calc_timew_spread_paired(\"mt_oddlot\",\"mt_roundlot_bbo\")\n", " mt_roundlot.calc_timew_spread_paired(\"mt_oddlot\",\"mt_roundlot_nbbo\")\n", " mt_roundlot.add_price_bucket_bbo()\n", " mt_roundlot.calc_oddlot_percent_per_price_bucket()\n", " mt_roundlot.calc_AMZN_before_after_split()\n", " mt_roundlot.calc_trade_stats()\n", " return(mt_roundlot)\n", "mt_roundlot = main(7,True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Lets inspect data first" ] }, { "cell_type": "code", "execution_count": 55, "metadata": { "execution": { "iopub.execute_input": "2023-05-31T14:31:51.892397Z", "iopub.status.busy": "2023-05-31T14:31:51.891638Z", "iopub.status.idle": "2023-05-31T14:31:51.897451Z", "shell.execute_reply": "2023-05-31T14:31:51.896412Z", "shell.execute_reply.started": "2023-05-31T14:31:51.892360Z" }, "tags": [] }, "outputs": [], "source": [ "captions=[\"1.Odd-lot trading in 2022 continues to be a major component of US trading\"\n", " ,\"2.Odd-lot vs. Round-lot quoted spread\"\n", " ,\"3.Higher share prices increase odd-lot trading\"\n", " ,\"3a.AMZN pre/post split on June-6-2022\"\n", " ]" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "execution": { "iopub.execute_input": "2023-05-31T12:46:35.567663Z", "iopub.status.busy": "2023-05-31T12:46:35.566909Z", "iopub.status.idle": "2023-05-31T12:46:35.578798Z", "shell.execute_reply": "2023-05-31T12:46:35.577688Z", "shell.execute_reply.started": "2023-05-31T12:46:35.567626Z" }, "tags": [] }, "outputs": [ { "data": { "text/html": [ "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Experimentation findings: 1.Odd-lot trading in 2022 continues to be a major component of US trading
 Oddlot shares trading(%)Oddlot $ volume(%)Oddlot trade count(%)
label   
all US equity securities 8% 17% 57%
all US equity on-exchange 4% 9% 53%
all US equity off-exchange (dark pool) 89% 78% 41%
\n" ], "text/plain": [ "" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "mt_roundlot.stats_df[\"1_display_all\"].style.set_caption(f\"Experimentation findings: {captions[0]}\").format({\"Oddlot shares trading(%)\": \"{:20,.0f}%\", \n", " \"Oddlot $ volume(%)\": \"{:20,.0f}%\", \n", " \"Oddlot trade count(%)\": \"{:20,.0f}%\"\n", " }).set_table_styles([{\n", " 'selector': 'caption',\n", " 'props': [\n", " ('color', 'blue'),\n", " ('font-size', '14px')\n", " ]\n", "}])" ] }, { "cell_type": "code", "execution_count": 35, "metadata": { "execution": { "iopub.execute_input": "2023-05-31T12:59:09.479812Z", "iopub.status.busy": "2023-05-31T12:59:09.479089Z", "iopub.status.idle": "2023-05-31T12:59:09.493795Z", "shell.execute_reply": "2023-05-31T12:59:09.492717Z", "shell.execute_reply.started": "2023-05-31T12:59:09.479776Z" }, "tags": [] }, "outputs": [ { "data": { "text/html": [ "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Experimentation findings: 2.Odd-lot vs. Round-lot quoted spread
price_bucketspread_mean_mt_oddlotspread_mean_mt_roundlot_bboprice_improvement
00000_002500.53%1.43%168.06%
00250_01000386.34%445.19%15.23%
01000_100000.90%3.01%235.73%
10000_999991.48%0.00%-99.99%
\n" ], "text/plain": [ "" ] }, "execution_count": 35, "metadata": {}, "output_type": "execute_result" } ], "source": [ "#oddlot_bbo=mt_roundlot.stats_df['2_spread_by_price_bucket_mt_oddlot_mt_roundlot_bbo'].toPandas().sort_values(\"price_bucket\")\n", "oddlot_bbo['price_improvement']=((oddlot_bbo[\"spread_mean_mt_roundlot_bbo\"]/oddlot_bbo[\"spread_mean_mt_oddlot\"])-1)*1\n", "oddlot_bbo.sort_values(\"price_bucket\").style.set_caption(f\"Experimentation findings: {captions[1]}\").format(\n", " {\"spread_mean_mt_oddlot\": \"{:,.2%}\", \n", " \"spread_mean_mt_roundlot_bbo\": \"{:,.2%}\", \n", " \"price_improvement\": \"{:,.2%}\"\n", " }).set_table_styles([{\n", " 'selector': 'caption',\n", " 'props': [\n", " ('color', 'blue'),\n", " ('font-size', '14px')\n", " ]\n", "}]).hide_index()" ] }, { "cell_type": "code", "execution_count": 52, "metadata": { "execution": { "iopub.execute_input": "2023-05-31T14:27:38.151127Z", "iopub.status.busy": "2023-05-31T14:27:38.150468Z", "iopub.status.idle": "2023-05-31T14:27:38.323265Z", "shell.execute_reply": "2023-05-31T14:27:38.322218Z", "shell.execute_reply.started": "2023-05-31T14:27:38.151091Z" }, "tags": [] }, "outputs": [ { "data": { "text/html": [ "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Experimentation findings: 3.Higher share prices increase odd-lot trading
is_odd_lotodd lotround lottotalodd lot pct
price_bucket    
00000_0025048,908,368.0012,000,912.0060,909,280.0080.30%
00250_01000695.00nan695.00100.00%
01000_100004,083,944.0099,250.004,183,194.0097.63%
10000_9999995,030.0078,764.00173,794.0054.68%
\n" ], "text/plain": [ "" ] }, "execution_count": 52, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "oddlot_price_per_bucket=mt_roundlot.stats_df['3_pct_trading_oddlot_per_price_bucket']#.toPandas()#.sort_values(\"price_bucket\")\n", "\n", "oddlot_price_per_bucket['odd lot pct'].plot.bar()\n", "oddlot_price_per_bucket.sort_values(\"price_bucket\").style.set_caption(f\"Experimentation findings: {captions[2]}\").format(\n", " {\"round lot\": \"{:,.2f}\", \n", " \"odd lot\": \"{:,.2f}\", \n", " \"total\": \"{:,.2f}\", \n", " \"odd lot pct\": \"{:,.2%}\"\n", " }).set_table_styles([{\n", " 'selector': 'caption',\n", " 'props': [\n", " ('color', 'blue'),\n", " ('font-size', '14px')\n", " ]\n", "}])" ] }, { "cell_type": "code", "execution_count": 72, "metadata": { "execution": { "iopub.execute_input": "2023-05-31T14:46:57.420669Z", "iopub.status.busy": "2023-05-31T14:46:57.420056Z", "iopub.status.idle": "2023-05-31T14:46:57.672465Z", "shell.execute_reply": "2023-05-31T14:46:57.671353Z", "shell.execute_reply.started": "2023-05-31T14:46:57.420633Z" }, "tags": [] }, "outputs": [ { "data": { "text/html": [ "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Experimentation findings: 3a.AMZN pre/post split on June-6-2022
is_odd_lotodd lotround lottotaloddlot_pctroundlot_pct
date_est_mt_roundlot_bbo     
2022-06-01795,461.0027,818.50823,279.5096.62%3.38%
2022-06-02710,242.5025,398.50735,641.0096.55%3.45%
2022-06-03575,635.5017,762.00593,397.5097.01%2.99%
2022-06-06771,919.00357,590.001,129,509.0068.34%31.66%
2022-06-07819,418.50210,796.001,030,214.5079.54%20.46%
2022-06-08718,777.00174,090.00892,867.0080.50%19.50%
\n" ], "text/plain": [ "" ] }, "execution_count": 72, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "AMZN_pre_post_split=mt_roundlot.stats_df['3A_AMZN_split']#.toPandas()#.sort_values(\"price_bucket\")\n", "AMZN_pre_post_split[\"roundlot_pct\"]=1-AMZN_pre_post_split[\"oddlot_pct\"]\n", "(AMZN_pre_post_split[['oddlot_pct','roundlot_pct']]*100).plot(kind=\"bar\", ylabel='Percent(%)',stacked=True,figsize=(7,3))\n", "AMZN_pre_post_split.style.set_caption(f\"Experimentation findings: {captions[3]}\").format(\n", " {\"round lot\": \"{:,.2f}\", \n", " \"odd lot\": \"{:,.2f}\", \n", " \"total\": \"{:,.2f}\", \n", " \"oddlot_pct\": \"{:,.2%}\", \n", " \"roundlot_pct\": \"{:,.2%}\"\n", " }).set_table_styles([{\n", " 'selector': 'caption',\n", " 'props': [\n", " ('color', 'blue'),\n", " ('font-size', '14px')\n", " ]\n", "}])" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark (Kubernetes)", "language": "python", "name": "spark_python_kubernetes" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.16" } }, "nbformat": 4, "nbformat_minor": 4 }