AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Twitter Streams Listener. Globals: Function: Timeout: 60 MemorySize: 128 Runtime: python3.8 Parameters: InstanceType: Description: Twitter Listener Instance Type Type: String Default: t3.medium AllowedValues: - t3.small - t3.medium ConstraintDescription: Required instance type for Twitter Listener. LatestAmiId: Type: 'AWS::SSM::Parameter::Value' Default: '/aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2' ConstraintDescription: AMI ID for Stream listener. KeyName: Description: Select an existing EC2 KeyPair to enable SSH access to the instance Type: 'AWS::EC2::KeyPair::KeyName' Resources: LambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyName: ConnectAccess PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - connect:StartTaskContact - connect:StopContact Resource: - '*' - PolicyName: StreamAccess PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'kinesis:GetRecords' - 'kinesis:GetShardIterator' - 'kinesis:DescribeStream' - 'kinesis:ListShards' - 'kinesis:ListStreams' Resource: - !GetAtt ConnectTwitterStream.Arn - PolicyName: ConfigurationAccess PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'secretsManager:GetSecretValue' Resource: - !Ref ConnectTwitterConfig - PolicyName: AnalysisTools PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - comprehend:DetectSentiment - comprehend:DetectDominantLanguage Resource: - '*' TwitterListenerRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - ec2.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore - arn:aws:iam::aws:policy/CloudWatchAgentServerPolicy Policies: - PolicyName: ConfigurationAccess PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'secretsManager:GetSecretValue' Resource: - !Ref ConnectTwitterConfig - PolicyName: Analysis PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - comprehend:DetectSentiment - comprehend:DetectDominantLanguage Resource: - '*' - PolicyName: KinesisAccess PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - kinesis:putRecord Resource: - '*' TwitterListenerProfile: Type: AWS::IAM::InstanceProfile Properties: Roles: - !Ref TwitterListenerRole ConnectTwitterConfig: Type: AWS::SecretsManager::Secret Properties: Name: !Join ['-', ['ConnectTwitter', !Select [4, !Split ['-', !Select [2, !Split ['/', !Ref AWS::StackId]]]]]] SecretString: !Sub '{"CONNECT_INSTANCE_ID":"Replace with instance ID data","CONTACT_FLOW_ID":"Replace with contact flow ID data", "TWITTER_ACCESS_TOKEN":"Replace with Twitter Access Token","TWITTER_ACCESS_TOKEN_SECRET":"Replace with Twitter Access Token","TWITTER_CONSUMER_KEY":"Replace with Twitter Consumer Key","TWITTER_CONSUMER_SECRET":"Replace with Twitter Consumer Secret","KINESIS_STREAM": "${ConnectTwitterStream}","TWITTER_TOPIC":"Replace with Twitter hashtag of interest","TWITTER_BEARER_TOKEN":"Replace with Twitter Bearer Token"}' Tags: - Key: AppName Value: ConnectTwitterConfig ConnectTwitterStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 Tags: - Key: AppName Value: ConnectTwitterStream StreamProcessor: Type: AWS::Serverless::Function Properties: Role: !GetAtt LambdaRole.Arn CodeUri: Stream-Processor/ Handler: lambda_function.queueTweets Environment: Variables: CONNECT_TWITTER_CONFIG: !Ref ConnectTwitterConfig Layers: - !Ref TweepyLayer StreamProcessorMapping: Type: AWS::Lambda::EventSourceMapping Properties: Enabled: True EventSourceArn: !GetAtt ConnectTwitterStream.Arn FunctionName: !Ref StreamProcessor StartingPosition: 'LATEST' TweepyLayer: Type: AWS::Serverless::LayerVersion Properties: ContentUri: twitter-layer/ CompatibleRuntimes: - python3.6 - python3.7 - python3.8 - python3.9 Metadata: BuildMethod: python3.8 TwitterReplier: Type: AWS::Serverless::Function Properties: Role: !GetAtt LambdaRole.Arn CodeUri: Twitter-Replier/ Handler: lambda_function.lambda_handler Environment: Variables: CONNECT_TWITTER_CONFIG: !Ref ConnectTwitterConfig Layers: - !Ref TweepyLayer TwitterListener: Type: 'AWS::EC2::Instance' Properties: ImageId: !Ref LatestAmiId InstanceType: !Ref InstanceType IamInstanceProfile: !Ref TwitterListenerProfile KeyName: !Ref KeyName Tags: - Key: Name Value: !Join - '-' - - !Ref 'AWS::StackName' - TwitterListener UserData: !Base64 'Fn::Join': - '' - - | #!/bin/bash - | yum update -y - | pip3 install tweepy - | pip3 install boto3 - | cd /usr/local - | echo "import tweepy" >> twitter-streamer.py - | echo "import boto3" >> twitter-streamer.py - | echo "import json" >> twitter-streamer.py - | echo "import time" >> twitter-streamer.py - | echo "from botocore.exceptions import ClientError" >> twitter-streamer.py - !Sub 'echo "REGION="\""${AWS::Region}"\""" >> twitter-streamer.py' - | echo " " >> twitter-streamer.py - | echo "client = boto3.client(service_name='secretsmanager',region_name=REGION)" >> twitter-streamer.py - | echo "logGroup='/aws/ec2/twitter-listener'" >> twitter-streamer.py - | echo "try:" >> twitter-streamer.py - !Sub 'echo " get_secret_value_response = client.get_secret_value(SecretId="\""${ConnectTwitterConfig}"\"")" >> twitter-streamer.py' - | echo " " >> twitter-streamer.py - | echo "except ClientError as e:" >> twitter-streamer.py - | echo " raise e" >> twitter-streamer.py - | echo "else:" >> twitter-streamer.py - | echo " STREAM_CONFIG = json.loads(get_secret_value_response['SecretString'])" >> twitter-streamer.py - | echo " BEARER_TOKEN = STREAM_CONFIG['TWITTER_BEARER_TOKEN']" >> twitter-streamer.py - | echo " topic = STREAM_CONFIG['TWITTER_TOPIC']" >> twitter-streamer.py - | echo " kinesis_stream =STREAM_CONFIG['KINESIS_STREAM']" >> twitter-streamer.py - | echo " kinesis_client = boto3.client('kinesis',region_name=REGION)" >> twitter-streamer.py - | echo " cw_logs = boto3.client('logs',region_name=REGION)" >> twitter-streamer.py - | echo " try:" >> twitter-streamer.py - | echo " cw_logs.create_log_group(logGroupName=logGroup)" >> twitter-streamer.py - | echo " except ClientError as e:" >> twitter-streamer.py - | echo " print('Error creating CW Log group')" >> twitter-streamer.py - | echo " print(e)" >> twitter-streamer.py - | echo " logStream = 'Stream'+str(int(time.time()*1000))" >> twitter-streamer.py - | echo " try:" >> twitter-streamer.py - | echo " print(cw_logs.create_log_stream(logGroupName=logGroup,logStreamName=logStream))" >> twitter-streamer.py - | echo " except ClientError as e:" >> twitter-streamer.py - | echo " print('Error creating CW Log stream')" >> twitter-streamer.py - | echo " else:" >> twitter-streamer.py - | echo " cw_response = cw_logs.put_log_events(logGroupName=logGroup,logStreamName=logStream,logEvents=[{'timestamp': int(time.time()*1000),'message': 'Starting'}])" >> twitter-streamer.py - | echo " class TwitListener(tweepy.StreamingClient):" >> twitter-streamer.py - | echo " def on_data(self, data):" >> twitter-streamer.py - | echo " jsonData = json.loads(data)" >> twitter-streamer.py - | echo " parsedData={'tweet_id': jsonData['data']['id'],'text': jsonData['data']['text'],'lang':jsonData['data']['lang'],'user_id':jsonData['data']['author_id']}" >> twitter-streamer.py - | echo " logs=cw_logs.describe_log_streams(logGroupName=logGroup,orderBy='LastEventTime',descending=True,limit=1)" >> twitter-streamer.py - | echo " logsToken = logs['logStreams'][0]['uploadSequenceToken']" >> twitter-streamer.py - | echo " cw_response = cw_logs.put_log_events(logGroupName=logGroup,logStreamName=logStream,logEvents=[{'timestamp': int(time.time()*1000),'message': json.dumps(parsedData)}],sequenceToken=logsToken)" >> twitter-streamer.py - | echo " kin_response=kinesis_client.put_record(StreamName=kinesis_stream,PartitionKey=topic,Data = json.dumps(parsedData))" >> twitter-streamer.py - | echo " cw_logs.put_log_events(logGroupName=logGroup,logStreamName=logStream,logEvents=[{'timestamp': int(time.time()*1000),'message': json.dumps(kin_response)}],sequenceToken=logsToken)" >> twitter-streamer.py - | echo " time.sleep(10)" >> twitter-streamer.py - | echo " return True" >> twitter-streamer.py - | echo " def on_error(self, status):" >> twitter-streamer.py - | echo " print('Error received:' + str(status))" >> twitter-streamer.py - | echo " logs=cw_logs.describe_log_streams(logGroupName=logGroup,orderBy='LastEventTime',descending=True,limit=1)" >> twitter-streamer.py - | echo " logsToken = logs['logStreams'][0]['uploadSequenceToken']" >> twitter-streamer.py - | echo " cw_response = cw_logs.put_log_events(logGroupName=logGroup,logStreamName=logStream,logEvents=[{'timestamp': int(time.time()*1000),'message': status}],sequenceToken=logsToken)" >> twitter-streamer.py - | echo " return True" >> twitter-streamer.py - | echo " def on_limit(track):" >> twitter-streamer.py - | echo " logs=cw_logs.describe_log_streams(logGroupName=logGroup,orderBy='LastEventTime',descending=True,limit=1)" >> twitter-streamer.py - | echo " logsToken = logs['logStreams'][0]['uploadSequenceToken']" >> twitter-streamer.py - | echo " cw_response = cw_logs.put_log_events(logGroupName=logGroup,logStreamName=logStream,logEvents=[{'timestamp': int(time.time()*1000),'message': track}],sequenceToken=logsToken)" >> twitter-streamer.py - | echo " time.sleep(60)" >> twitter-streamer.py - | echo " return True" >> twitter-streamer.py - | echo " def on_disconnect_message(message):" >> twitter-streamer.py - | echo " logs=cw_logs.describe_log_streams(logGroupName=logGroup,orderBy='LastEventTime',descending=True,limit=1)" >> twitter-streamer.py - | echo " logsToken = logs['logStreams'][0]['uploadSequenceToken']" >> twitter-streamer.py - | echo " cw_response = cw_logs.put_log_events(logGroupName=logGroup,logStreamName=logStream,logEvents=[{'timestamp': int(time.time()*1000),'message': message}],sequenceToken=logsToken)" >> twitter-streamer.py - | echo " return True" >> twitter-streamer.py - | echo " feedStream = TwitListener(bearer_token=BEARER_TOKEN)" >> twitter-streamer.py - | echo " feedStream.add_rules(tweepy.StreamRule(topic))" >> twitter-streamer.py - | echo " feedStream.filter(tweet_fields=['attachments','author_id','created_at','entities','geo','id','in_reply_to_user_id','lang','possibly_sensitive','source','text'])" >> twitter-streamer.py - | echo "[Unit]" > /lib/systemd/system/twitter-streamer.service - | echo "Description=Twitter Streams Listener" >> /lib/systemd/system/twitter-streamer.service - | echo "After=multi-user.target" >> /lib/systemd/system/twitter-streamer.service - | echo " " >> /lib/systemd/system/twitter-streamer.service - | echo "[Service]" >> /lib/systemd/system/twitter-streamer.service - | echo "Type=idle" >> /lib/systemd/system/twitter-streamer.service - | echo "ExecStart=/usr/bin/python3 /usr/local/twitter-streamer.py" >> /lib/systemd/system/twitter-streamer.service - | echo "Restart=on-failure" >> /lib/systemd/system/twitter-streamer.service - | echo " " >> /lib/systemd/system/twitter-streamer.service - | echo "[Install]" >> /lib/systemd/system/twitter-streamer.service - | echo "WantedBy=multi-user.target" >> /lib/systemd/system/twitter-streamer.service - | sudo chmod 644 /lib/systemd/system/twitter-streamer.service - | sudo systemctl daemon-reload - | sudo systemctl enable twitter-streamer.service