{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import datetime\n", "import json\n", "from dateutil.tz import tzlocal\n", "\n", "import get_test_details\n", "import query_experiment_details\n", "import aggregate_statistics\n", "import plot\n", "\n", "stepfunctions = boto3.client('stepfunctions')\n", "cloudwatch_logs = boto3.client('logs')\n", "cloudformation = boto3.client('cloudformation')\n", "cloudwatch = boto3.client('cloudwatch')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sample test specification. Submit is as the input of the StepFunction workflow that ends with `-main` to start a test.\n", "```\n", "{\n", " \"test_specification\": {\n", " \"parameters\": {\n", " \"cluster_throughput_mb_per_sec\": [ 8, 16, 24, 32, 40, 44, 48, 52, 56 ],\n", " \"num_producers\": [ 6 ],\n", " \"consumer_groups\": [ { \"num_groups\": 2, \"size\": 6 }\n", " ],\n", " \"client_props\": [\n", " {\n", " \"producer\": \"acks=all linger.ms=5 batch.size=65536 buffer.memory=2147483648 security.protocol=PLAINTEXT\",\n", " \"consumer\": \"security.protocol=PLAINTEXT\"\n", " }\n", " ],\n", " \"num_partitions\": [ 36 ],\n", " \"record_size_byte\": [ 1024 ],\n", " \"replication_factor\": [ 3 ],\n", " \"duration_sec\": [ 3600 ]\n", " },\n", " \"skip_remaining_throughput\": {\n", " \"less-than\": [\n", " \"sent_div_requested_mb_per_sec\",\n", " 0.995\n", " ]\n", " },\n", " \"depletion_configuration\": {\n", " \"upper_threshold\": {\n", " \"mb_per_sec\": 200\n", " },\n", " \"approximate_timeout_hours\": 0.5\n", " }\n", " }\n", "}\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "test_params = []" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "test_params.extend([\n", " {'execution_arn': '' }\n", "])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_details = get_test_details.get_test_details(test_params, stepfunctions, cloudformation)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "(producer_stats, consumer_stats) = query_experiment_details.query_cw_logs(test_details, cloudwatch_logs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "partitions = {\n", " 'ignore_keys': [ 'topic_id', 'cluster_name', 'test_id', 'client_props.consumer', 'cluster_id', 'duration_sec', 'throughput_series_id', 'brokers_type_numeric', ],\n", " 'title_keys': [ 'kafka_version', 'broker_storage', 'provisioned_throughput', 'in_cluster_encryption', 'producer.security.protocol', ],\n", " 'row_keys': [ 'num_producers', 'consumer_groups.num_groups', ],\n", " 'column_keys': [ 'producer.acks', 'producer.batch.size', 'num_partitions', ],\n", " 'metric_color_keys': [ 'brokers_type_numeric', 'brokers', ],\n", "}\n", "\n", "filter_fn = lambda x: True\n", "filter_agg_fn = lambda x: True\n", "\n", "filtered_producer_stats = list(filter(filter_fn, producer_stats))\n", "filtered_consumer_stats = filter(filter_fn, consumer_stats)\n", "\n", "(producer_aggregated_stats, consumer_aggregated_stats, combined_stats) = aggregate_statistics.aggregate_cw_logs(filtered_producer_stats, filtered_consumer_stats, partitions)\n", "filtered_producer_aggregated_stats = list(filter(filter_agg_fn, producer_aggregated_stats))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot.plot_measurements(filtered_producer_aggregated_stats, ['latency_ms_p50_mean', 'latency_ms_p99_mean', ], 'producer put latency (ms)', **partitions, )\n", "plot.plot_measurements(filtered_producer_aggregated_stats, ['latency_ms_p50_stdev', 'latency_ms_p99_stdev', ], 'producer put latency stdev (ms)', **partitions, )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plot.plot_measurements(producer_aggregated_stats, ['sent_div_requested_mb_per_sec'], 'sent / requested throughput (mb/sec)', **partitions, ymin=0.990, ymax=1.01)\n", "plot.plot_measurements(producer_aggregated_stats, ['actual_duration_div_requested_duration_sec_max'], 'actual test duration/requested test duration (ratio)', **partitions)\n", "plot.plot_measurements(producer_aggregated_stats, ['num_tests'], 'number of tests (count)', **partitions)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.2" } }, "nbformat": 4, "nbformat_minor": 4 }