/** * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ #pragma once #include #include #include #include #include namespace Aws { namespace Queues { static const char* MEM_TAG = "Aws::Queues::Queue"; /** * Simple queue class. Allows standard queue operations top, delete, and push. Also has higher level, asynchronous interface * with callbacks. */ template class Queue { typedef std::function MessageReceivedEventHandler; typedef std::function MessageDeleteFailedEventHandler; typedef std::function MessageDeleteSuccessEventHandler; typedef std::function MessageSendFailedEventHandler; typedef std::function MessageSendSuccessEventHandler; public: /** * You are responsible for calling StartPolling() if you intend to use the asynchronous pattern. * * the value of pollingFrequency is how long to wait between queue polls. If the queue poll exceeds this limit then the next poll will start immediately * upon completion of the existing poll, this value is useful only if you intend to use this instance for the asynchronous polling model. */ Queue(unsigned pollingFrequency) : m_continue(true), m_pollingFrequencyMs(pollingFrequency), m_pollingThread(nullptr) { } virtual ~Queue() { StopPolling(); } virtual MESSAGE_TYPE Top() const = 0; virtual void Delete(const MESSAGE_TYPE&) = 0; virtual void Push(const MESSAGE_TYPE&) = 0; /** * Starts a polling thread in the background. You will need to register OnMessageReceived * to receive the messages. This method can be called after StopPolling to resume polling after * being paused. */ void StartPolling() { if(!m_pollingThread) { m_continue = true; m_pollingThread = Aws::MakeUnique(MEM_TAG, &Queue::Main, this); } } /** * Stops the polling thread. Messages in transit will be handled before termination of the thread. * Will be called by the destructor so only call this if you want control over when the thread exits. * This method blocks waiting on the polling thread to stop. After being called, the StartPolling() method * can be called, and the thread will resume. */ void StopPolling() { m_continue = false; if(m_pollingThread) { m_pollingThread->join(); m_pollingThread = nullptr; } } inline void SetMessageReceivedEventHandler(const MessageReceivedEventHandler& messageHandler) { m_messageReceivedHandler = messageHandler; } inline void SetMessageDeleteFailedEventHandler(const MessageDeleteFailedEventHandler& messageHandler) { m_messageDeleteFailedHandler = messageHandler; } inline void SetMessageDeleteSuccessEventHandler(const MessageDeleteSuccessEventHandler& messageHandler) { m_messageDeleteSuccessHandler = messageHandler; } inline void SetMessageSendFailedEventHandler(const MessageSendFailedEventHandler& messageHandler) { m_messageSendFailedHandler = messageHandler; } inline void SetMessageSendSuccessEventHandler(const MessageSendSuccessEventHandler& messageHandler) { m_messageSendSuccessHandler = messageHandler; } inline void SetMessageReceivedEventHandler(MessageReceivedEventHandler&& messageHandler) { m_messageReceivedHandler = messageHandler; } inline void SetMessageDeleteFailedEventHandler(MessageDeleteFailedEventHandler&& messageHandler) { m_messageDeleteFailedHandler = messageHandler; } inline void SetMessageDeleteSuccessEventHandler(MessageDeleteSuccessEventHandler&& messageHandler) { m_messageDeleteSuccessHandler = messageHandler; } inline void SetMessageSendFailedEventHandler(MessageSendFailedEventHandler&& messageHandler) { m_messageSendFailedHandler = messageHandler; } inline void SetMessageSendSuccessEventHandler(MessageSendSuccessEventHandler&& messageHandler) { m_messageSendSuccessHandler = messageHandler; } inline const MessageReceivedEventHandler& GetMessageReceivedEventHandler() const { return m_messageReceivedHandler; } inline const MessageDeleteFailedEventHandler& GetMessageDeleteFailedEventHandler() const { return m_messageDeleteFailedHandler; } inline const MessageDeleteSuccessEventHandler& GetMessageDeleteSuccessEventHandler() const { return m_messageDeleteSuccessHandler; } inline const MessageSendFailedEventHandler& GetMessageSendFailedEventHandler() const { return m_messageSendFailedHandler; } inline const MessageSendSuccessEventHandler& GetMessageSendSuccessEventHandler() const { return m_messageSendSuccessHandler; } protected: std::atomic m_continue; private: void Main() { while(m_continue) { auto start = std::chrono::system_clock::now(); MESSAGE_TYPE topMessage = Top(); bool deleteMessage = false; auto& receivedHandler = GetMessageReceivedEventHandler(); if (receivedHandler) { receivedHandler(this, topMessage, deleteMessage); } if (deleteMessage) { Delete(topMessage); } if(m_continue) { auto stop = std::chrono::system_clock::now(); auto timeTaken = std::chrono::duration_cast(stop - start); if (m_pollingFrequencyMs >= timeTaken.count()) { std::this_thread::sleep_for(std::chrono::milliseconds(m_pollingFrequencyMs - timeTaken.count())); } } } } unsigned m_pollingFrequencyMs; Aws::UniquePtr m_pollingThread; // Handlers MessageReceivedEventHandler m_messageReceivedHandler; MessageDeleteFailedEventHandler m_messageDeleteFailedHandler; MessageDeleteSuccessEventHandler m_messageDeleteSuccessHandler; MessageSendFailedEventHandler m_messageSendFailedHandler; MessageSendSuccessEventHandler m_messageSendSuccessHandler; }; } }