#!/bin/bash # # modify below params before you go ################################################# REGION='your-aws-region' BUCKET='your-ouput-parquet-bucket' TOPPATH='your-local-bin-dir' # # script tunables # every 60s start one new thread max_threads=1 # max threads each time interval=60 # loop check interval 60s ################################################## # get system CPUs cpus=`nproc --all` function run { export S3LOGS_STAGGING_PARTITION_TZIF=UTC+8 export S3LOGS_TRANSFORM_JOB_INTERVAL=300 export S3LOGS_TRANSFORM_AGGREGATE_SECOND=300 export S3LOGS_TRANSFORM_OUTPUT_PREFIX_FMT=year=%Y/month=%m/day=%d export S3LOGS_TRANSFORM_OUTPUT_TARGET_PREFIX=s3logs dt=`date '+%F %H:%M:%S'` echo "$dt - run with s3logs transform with $1 threads" $TOPPATH/s3logs transform -r $REGION -b $BUCKET -t $1 >> $TOPPATH/transform.log 2>&1 & } function calc_running { total=0 pids=`pgrep -f ".*s3logs transform.*"` for pid in $pids; do thr=`ps --no-headers -L $pid | wc -l` # minus 1 for main thread let total=total+thr-1 done if [ $total -lt 0 ]; then return 0 else return $total fi } function main_loop { echo "$cpus of CPUs available in this system, max threads for transformer $max_threads" while true; do available=0 calc_running running=$? let available=$cpus-$running if [ $available -gt 0 ]; then if [ $available -ge $max_threads ]; then run $max_threads else run $available fi else dt=`date '+%F %H:%M:%S'` echo "$dt - no cpu core available, skip this cycle" fi sleep $interval done } main_loop