/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using System.Text; using Amazon.Runtime; using Amazon.Runtime.SharedInterfaces; using Amazon.SimpleNotificationService.Model; using Amazon.Auth.AccessControlPolicy; using Amazon.Auth.AccessControlPolicy.ActionIdentifiers; using System.Globalization; namespace Amazon.SimpleNotificationService { public partial class AmazonSimpleNotificationServiceClient { /// /// Subscribes an existing Amazon SQS queue to an existing Amazon SNS topic asynchronously. /// /// The policy applied to the SQS queue is similar to this: /// /// { /// "Version" : "2008-10-17", /// "Statement" : [{ /// "Sid" : "topic-subscription-arn:aws:sns:us-west-2:599109622955:myTopic", /// "Effect" : "Allow", /// "Principal" : "*", /// "Action" : ["sqs:SendMessage"], /// "Resource":["arn:aws:sqs:us-west-2:599109622955:myQueue"], /// "Condition" : { /// "ArnLike":{ /// "aws:SourceArn":["arn:aws:sns:us-west-2:599109622955:myTopic"] /// } /// } /// }] /// } /// /// /// /// There might be a small time period immediately after /// subscribing the SQS queue to the SNS topic and updating the SQS queue's /// policy, where messages are not able to be delivered to the queue. After a /// moment, the new queue policy will propagate and the queue will be able to /// receive messages. This delay only occurs immediately after initially /// subscribing the queue. /// /// /// The topic to subscribe to /// The SQS client used to get attributes and set the policy on the SQS queue. /// The queue to add a subscription to. /// The subscription ARN as returned by Amazon SNS when the queue is /// successfully subscribed to the topic. public async Task SubscribeQueueAsync(string topicArn, ICoreAmazonSQS sqsClient, string sqsQueueUrl) { var topics = await SubscribeQueueToTopicsAsync(new List() { topicArn }, sqsClient, sqsQueueUrl).ConfigureAwait(false); return topics.Values.First(); } /// /// Subscribes an existing Amazon SQS queue to existing Amazon SNS topics asynchronously. /// /// The policy applied to the SQS queue is similar to this: /// /// { /// "Version" : "2008-10-17", /// "Statement" : [{ /// "Sid" : "topic-subscription-arn:aws:sns:us-west-2:599109622955:myTopic", /// "Effect" : "Allow", /// "Principal" : "*", /// "Action" : ["sqs:SendMessage"], /// "Resource":["arn:aws:sqs:us-west-2:599109622955:myQueue"], /// "Condition" : { /// "ArnLike":{ /// "aws:SourceArn":["arn:aws:sns:us-west-2:599109622955:myTopic"] /// } /// } /// }] /// } /// /// /// /// There might be a small time period immediately after /// subscribing the SQS queue to the SNS topic and updating the SQS queue's /// policy, where messages are not able to be delivered to the queue. After a /// moment, the new queue policy will propagate and the queue will be able to /// receive messages. This delay only occurs immediately after initially /// subscribing the queue. /// /// /// The topics to subscribe to /// The SQS client used to get attributes and set the policy on the SQS queue. /// The queue to add a subscription to. /// The mapping of topic ARNs to subscription ARNs as returned by Amazon SNS when the queue is /// successfully subscribed to each topic. public async Task> SubscribeQueueToTopicsAsync(IList topicArns, ICoreAmazonSQS sqsClient, string sqsQueueUrl) { // Get the queue's existing policy and ARN var queueAttributes = await sqsClient.GetAttributesAsync(sqsQueueUrl).ConfigureAwait(false); string sqsQueueArn = queueAttributes["QueueArn"]; Policy policy; string policyStr = null; if(queueAttributes.ContainsKey("Policy")) policyStr = queueAttributes["Policy"]; if (string.IsNullOrEmpty(policyStr)) policy = new Policy(); else policy = Policy.FromJson(policyStr); var subscriptionArns = new Dictionary(); foreach (var topicArn in topicArns) { if (!HasSQSPermission(policy, topicArn, sqsQueueArn)) AddSQSPermission(policy, topicArn, sqsQueueArn); SubscribeResponse response = await this.SubscribeAsync(new SubscribeRequest { TopicArn = topicArn, Protocol = "sqs", Endpoint = sqsQueueArn, }).ConfigureAwait(false); subscriptionArns.Add(topicArn, response.SubscriptionArn); } var setAttributes = new Dictionary { { "Policy", policy.ToJson() } }; await sqsClient.SetAttributesAsync(sqsQueueUrl, setAttributes).ConfigureAwait(false); return subscriptionArns; } /// /// Finds an existing Amazon SNS topic by iterating all SNS topics until a match is found asynchronously. /// /// The ListTopics method is used to fetch upto 100 SNS topics at a time until a SNS topic /// with an TopicArn that matches is found. /// /// /// The name of the topic find /// A Task containing the matched SNS topic. public async Task FindTopicAsync(string topicName) { var nextToken = string.Empty; do { var response = await this.ListTopicsAsync(new ListTopicsRequest { NextToken = nextToken }).ConfigureAwait(false); var matchedTopic = response.Topics.FirstOrDefault(x => TopicNameMatcher(x.TopicArn, topicName)); if (matchedTopic != null) { return matchedTopic; } nextToken = response.NextToken; } while (!string.IsNullOrEmpty(nextToken)); return null; } /// /// This is a utility method which updates the policy of a topic to allow the /// S3 bucket to publish events to it. /// /// The topic that will have its policy updated. /// The bucket that will be given access to publish from. /// /// A Task public async Task AuthorizeS3ToPublishAsync(string topicArn, string bucket) { var attributes = (await this.GetTopicAttributesAsync(new GetTopicAttributesRequest { TopicArn = topicArn }).ConfigureAwait(false)).Attributes; Policy policy; Statement newStatement; GetNewPolicyAndStatementForTopicAttributes(attributes, topicArn, bucket, out policy, out newStatement); if (!policy.CheckIfStatementExists(newStatement)) { policy.Statements.Add(newStatement); var policyString = policy.ToJson(); await this.SetTopicAttributesAsync(new SetTopicAttributesRequest { TopicArn = topicArn, AttributeName = "Policy", AttributeValue = policyString }).ConfigureAwait(false); } } } }