In [None]:
import boto3
import datetime
import json
from dateutil.tz import tzlocal

import get_test_details
import query_experiment_details
import aggregate_statistics
import plot

stepfunctions = boto3.client('stepfunctions')
cloudwatch_logs = boto3.client('logs')
cloudformation = boto3.client('cloudformation')
cloudwatch = boto3.client('cloudwatch')

Sample test specification. Submit is as the input of the StepFunction workflow that ends with `-main` to start a test.
```
{
 "test_specification": {
 "parameters": {
 "cluster_throughput_mb_per_sec": [ 8, 16, 24, 32, 40, 44, 48, 52, 56 ],
 "num_producers": [ 6 ],
 "consumer_groups": [ { "num_groups": 2, "size": 6 }
 ],
 "client_props": [
 {
 "producer": "acks=all linger.ms=5 batch.size=65536 buffer.memory=2147483648 security.protocol=PLAINTEXT",
 "consumer": "security.protocol=PLAINTEXT"
 }
 ],
 "num_partitions": [ 36 ],
 "record_size_byte": [ 1024 ],
 "replication_factor": [ 3 ],
 "duration_sec": [ 3600 ]
 },
 "skip_remaining_throughput": {
 "less-than": [
 "sent_div_requested_mb_per_sec",
 0.995
 ]
 },
 "depletion_configuration": {
 "upper_threshold": {
 "mb_per_sec": 200
 },
 "approximate_timeout_hours": 0.5
 }
 }
}
```

In [None]:
test_params = []

In [None]:
test_params.extend([
 {'execution_arn': '' }
])

In [None]:
test_details = get_test_details.get_test_details(test_params, stepfunctions, cloudformation)

In [None]:
(producer_stats, consumer_stats) = query_experiment_details.query_cw_logs(test_details, cloudwatch_logs)

In [None]:
partitions = {
 'ignore_keys': [ 'topic_id', 'cluster_name', 'test_id', 'client_props.consumer', 'cluster_id', 'duration_sec', 'throughput_series_id', 'brokers_type_numeric', ],
 'title_keys': [ 'kafka_version', 'broker_storage', 'provisioned_throughput', 'in_cluster_encryption', 'producer.security.protocol', ],
 'row_keys': [ 'num_producers', 'consumer_groups.num_groups', ],
 'column_keys': [ 'producer.acks', 'producer.batch.size', 'num_partitions', ],
 'metric_color_keys': [ 'brokers_type_numeric', 'brokers', ],
}

filter_fn = lambda x: True
filter_agg_fn = lambda x: True

filtered_producer_stats = list(filter(filter_fn, producer_stats))
filtered_consumer_stats = filter(filter_fn, consumer_stats)

(producer_aggregated_stats, consumer_aggregated_stats, combined_stats) = aggregate_statistics.aggregate_cw_logs(filtered_producer_stats, filtered_consumer_stats, partitions)
filtered_producer_aggregated_stats = list(filter(filter_agg_fn, producer_aggregated_stats))

In [None]:
plot.plot_measurements(filtered_producer_aggregated_stats, ['latency_ms_p50_mean', 'latency_ms_p99_mean', ], 'producer put latency (ms)', **partitions, )
plot.plot_measurements(filtered_producer_aggregated_stats, ['latency_ms_p50_stdev', 'latency_ms_p99_stdev', ], 'producer put latency stdev (ms)', **partitions, )

In [None]:
plot.plot_measurements(producer_aggregated_stats, ['sent_div_requested_mb_per_sec'], 'sent / requested throughput (mb/sec)', **partitions, ymin=0.990, ymax=1.01)
plot.plot_measurements(producer_aggregated_stats, ['actual_duration_div_requested_duration_sec_max'], 'actual test duration/requested test duration (ratio)', **partitions)
plot.plot_measurements(producer_aggregated_stats, ['num_tests'], 'number of tests (count)', **partitions)