/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Text;
using Amazon.Runtime;
using Amazon.Runtime.SharedInterfaces;
using Amazon.SimpleNotificationService.Model;
using Amazon.Auth.AccessControlPolicy;
using Amazon.Auth.AccessControlPolicy.ActionIdentifiers;
using System.Globalization;
namespace Amazon.SimpleNotificationService
{
public partial class AmazonSimpleNotificationServiceClient
{
///
/// Subscribes an existing Amazon SQS queue to an existing Amazon SNS topic asynchronously.
///
/// The policy applied to the SQS queue is similar to this:
///
/// {
/// "Version" : "2008-10-17",
/// "Statement" : [{
/// "Sid" : "topic-subscription-arn:aws:sns:us-west-2:599109622955:myTopic",
/// "Effect" : "Allow",
/// "Principal" : "*",
/// "Action" : ["sqs:SendMessage"],
/// "Resource":["arn:aws:sqs:us-west-2:599109622955:myQueue"],
/// "Condition" : {
/// "ArnLike":{
/// "aws:SourceArn":["arn:aws:sns:us-west-2:599109622955:myTopic"]
/// }
/// }
/// }]
/// }
///
///
///
/// There might be a small time period immediately after
/// subscribing the SQS queue to the SNS topic and updating the SQS queue's
/// policy, where messages are not able to be delivered to the queue. After a
/// moment, the new queue policy will propagate and the queue will be able to
/// receive messages. This delay only occurs immediately after initially
/// subscribing the queue.
///
///
/// The topic to subscribe to
/// The SQS client used to get attributes and set the policy on the SQS queue.
/// The queue to add a subscription to.
/// The subscription ARN as returned by Amazon SNS when the queue is
/// successfully subscribed to the topic.
public async Task SubscribeQueueAsync(string topicArn, ICoreAmazonSQS sqsClient, string sqsQueueUrl)
{
var topics = await SubscribeQueueToTopicsAsync(new List() { topicArn }, sqsClient, sqsQueueUrl).ConfigureAwait(false);
return topics.Values.First();
}
///
/// Subscribes an existing Amazon SQS queue to existing Amazon SNS topics asynchronously.
///
/// The policy applied to the SQS queue is similar to this:
///
/// {
/// "Version" : "2008-10-17",
/// "Statement" : [{
/// "Sid" : "topic-subscription-arn:aws:sns:us-west-2:599109622955:myTopic",
/// "Effect" : "Allow",
/// "Principal" : "*",
/// "Action" : ["sqs:SendMessage"],
/// "Resource":["arn:aws:sqs:us-west-2:599109622955:myQueue"],
/// "Condition" : {
/// "ArnLike":{
/// "aws:SourceArn":["arn:aws:sns:us-west-2:599109622955:myTopic"]
/// }
/// }
/// }]
/// }
///
///
///
/// There might be a small time period immediately after
/// subscribing the SQS queue to the SNS topic and updating the SQS queue's
/// policy, where messages are not able to be delivered to the queue. After a
/// moment, the new queue policy will propagate and the queue will be able to
/// receive messages. This delay only occurs immediately after initially
/// subscribing the queue.
///
///
/// The topics to subscribe to
/// The SQS client used to get attributes and set the policy on the SQS queue.
/// The queue to add a subscription to.
/// The mapping of topic ARNs to subscription ARNs as returned by Amazon SNS when the queue is
/// successfully subscribed to each topic.
public async Task> SubscribeQueueToTopicsAsync(IList topicArns, ICoreAmazonSQS sqsClient, string sqsQueueUrl)
{
// Get the queue's existing policy and ARN
var queueAttributes = await sqsClient.GetAttributesAsync(sqsQueueUrl).ConfigureAwait(false);
string sqsQueueArn = queueAttributes["QueueArn"];
Policy policy;
string policyStr = null;
if(queueAttributes.ContainsKey("Policy"))
policyStr = queueAttributes["Policy"];
if (string.IsNullOrEmpty(policyStr))
policy = new Policy();
else
policy = Policy.FromJson(policyStr);
var subscriptionArns = new Dictionary();
foreach (var topicArn in topicArns)
{
if (!HasSQSPermission(policy, topicArn, sqsQueueArn))
AddSQSPermission(policy, topicArn, sqsQueueArn);
SubscribeResponse response = await this.SubscribeAsync(new SubscribeRequest
{
TopicArn = topicArn,
Protocol = "sqs",
Endpoint = sqsQueueArn,
}).ConfigureAwait(false);
subscriptionArns.Add(topicArn, response.SubscriptionArn);
}
var setAttributes = new Dictionary { { "Policy", policy.ToJson() } };
await sqsClient.SetAttributesAsync(sqsQueueUrl, setAttributes).ConfigureAwait(false);
return subscriptionArns;
}
///
/// Finds an existing Amazon SNS topic by iterating all SNS topics until a match is found asynchronously.
///
/// The ListTopics method is used to fetch upto 100 SNS topics at a time until a SNS topic
/// with an TopicArn that matches is found.
///
///
/// The name of the topic find
/// A Task containing the matched SNS topic.
public async Task FindTopicAsync(string topicName)
{
var nextToken = string.Empty;
do
{
var response = await this.ListTopicsAsync(new ListTopicsRequest { NextToken = nextToken }).ConfigureAwait(false);
var matchedTopic = response.Topics.FirstOrDefault(x => TopicNameMatcher(x.TopicArn, topicName));
if (matchedTopic != null)
{
return matchedTopic;
}
nextToken = response.NextToken;
} while (!string.IsNullOrEmpty(nextToken));
return null;
}
///
/// This is a utility method which updates the policy of a topic to allow the
/// S3 bucket to publish events to it.
///
/// The topic that will have its policy updated.
/// The bucket that will be given access to publish from.
/// /// A Task
public async Task AuthorizeS3ToPublishAsync(string topicArn, string bucket)
{
var attributes = (await this.GetTopicAttributesAsync(new GetTopicAttributesRequest
{
TopicArn = topicArn
}).ConfigureAwait(false)).Attributes;
Policy policy;
Statement newStatement;
GetNewPolicyAndStatementForTopicAttributes(attributes, topicArn, bucket, out policy, out newStatement);
if (!policy.CheckIfStatementExists(newStatement))
{
policy.Statements.Add(newStatement);
var policyString = policy.ToJson();
await this.SetTopicAttributesAsync(new SetTopicAttributesRequest
{
TopicArn = topicArn,
AttributeName = "Policy",
AttributeValue = policyString
}).ConfigureAwait(false);
}
}
}
}