#!/bin/bash # # kfeed - Kafka-related commands for publishing a data feed # # kfeed options # [-c|--create-topic topic] # [-l|--list-topics] # [-u|--update-advertised-listeners] # [-a|--allow user|cert.pem producer|consumer|read|write|describe|all topic] # [-r|--remove user|cert.pem producer|consumer|read|write|describe|all topic] # [-d|--delete-topic topic] # function usage() { echo "kfeed [-c|--create-topic topic] " echo " [-l|--list-topics] " echo " [-u|--update-advertised-listeners] " echo " [-a|--allow user|cert.pem producer|consumer|read|write|describe|all topic] " echo " [-r|--remove user|cert.pem producer|consumer|read|write|describe|all topic] " echo " [-d|--delete-topic topic] " } SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) if [[ $# -lt 1 ]]; then usage exit 1 fi if [ "$TLSBROKERS" = "" ]; then echo "TLSBROKERS env vasriable should be set to your MSK cluster broker string" exit 1 fi if [ "$ZKNODES" = "" ]; then echo "ZKNODES env vasriable should be set to your MSK cluster text Zookeeper string" exit 1 fi if [[ ! -f $HOME/kafka/bin/kafka-topics.sh ]]; then echo "Kafka tools are not installed." exit 1 fi function list_topics() { echo "Topics" echo "------" $HOME/kafka/bin/kafka-topics.sh --list --zookeeper $ZKNODES echo " " echo "ACLs" echo "------" $HOME/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=$ZKNODES --list } function create_topic() { $HOME/kafka/bin/kafka-topics.sh --create --topic $topic --zookeeper $ZKNODES --partitions 1 --replication-factor 2 # Give brokers permissions to read from the topic all_brokers=`echo $TLSBROKERS | cut -d":" -f 1 | sed -e 's/b-[1-9]/*/'` $HOME/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=$ZKNODES --add --allow-principal "User:CN=$all_brokers" --operation Read --group=* --topic $topic } function delete_topic() { $HOME/kafka/bin/kafka-topics.sh --delete --topic $topic --zookeeper $ZKNODES $HOME/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=$ZKNODES --topic $topic --remove } function allow_user() { op_command="--operation " case "$operation" in r|read) op="Read" ;; w|write) op="Write" ;; d|describe) op="DescribeConfigs" ;; a|all) op="All" ;; p|producer) op="" op_command="--producer" ;; c|consumer) op="" op_command="--consumer " ;; *) echo "Operation should be consumer, producer, read, write or all" exit 1 ;; esac if [ "$user" = "" ]; then echo "user name is missing" exit 1 fi if [ "$topic" = "" ]; then echo "topic is missing" exit 1 fi if [ -f "$user" ] && [ `file $user | cut -d' ' -f 2` = "PEM" ] then # Find the Owner fields in the certificate file owner=`keytool -printcert -file $user | grep Owner | head -n 1 | cut -d':' -f 2 | tr -d ' '` user=$owner fi echo "Allow $user $op $topic" group="" if [ "$op" = "Read" ] || [ "$op_command" = "--consumer " ] || [ "$op" = "All" ]; then group="--group=*" fi $HOME/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=$ZKNODES --add --allow-principal "User:$user" \ $op_command $op $group --topic $topic } function remove_user() { op_command="--operation " case "$operation" in r|read) op="Read" ;; w|write) op="Write" ;; a|all) op="All" ;; p|producer) op="" op_command="--producer" ;; c|consumer) op="" op_command="--consumer " ;; *) echo "Operation should be consumer, producer, read, write or all" exit 1 ;; esac if [ "$user" = "" ]; then echo "user name is missing" exit 1 fi if [ "$topic" = "" ]; then echo "topic is missing" exit 1 fi if [ -f "$user" ] && [ `file $user | cut -d' ' -f 2` = "PEM" ] then # Find the Owner fields in the certificate file owner=`keytool -printcert -file $user | grep Owner | head -n 1 | cut -d':' -f 2 | tr -d ' '` user=$owner fi echo "Remove $user $op $topic" group="" if [ "$op" = "Read" ] || [ "$op_command" = "--consumer " ]; then group="--group=*" fi $HOME/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=$ZKNODES --remove --allow-principal "User:$user" \ $op_command $op $group --topic $topic } VALID_ARGS=$(getopt -o a:r:hlc:d:u --long allow:,remove:,help,list-topics,create-topic:,delete-topic:,update-advertised-listeners -- "$@") eval set -- "$VALID_ARGS" while [ : ]; do case "$1" in -l|--list-topics) list_topics exit 0 ;; -c|--create-topic) topic=$2 create_topic exit 0 ;; -d|--delete-topic) topic=$2 delete_topic exit 0 ;; -a|--allow) user=$2 shift operation=$3 topic=$4 allow_user exit 0 ;; -r|--remove) user=$2 shift operation=$3 topic=$4 remove_user exit 0 ;; -h|--help) usage exit 0 ;; -u|--update-advertised-listeners) $HOME/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=$ZKNODES --add --allow-principal "User:*" --cluster # python ${SCRIPT_DIR}/update-adv-listeners.py python3.8 ${SCRIPT_DIR}/update_cluster.py $HOME/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=$ZKNODES --remove --allow-principal "User:*" --cluster exit 0 ;; *) usage exit 1 ;; esac done