/*
* All or portions of this file Copyright (c) Amazon.com, Inc. or its affiliates or
* its licensors.
*
* For complete copyright and license terms please see the LICENSE at the root of this
* distribution (the "License"). All use of this software is governed by the License,
* or, if provided, by the license below or the license accompanying this file. Do not
* remove or modify any license notices. This file is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
*/
// Original file Copyright Crytek GMBH or its affiliates, used under license.
// This is free and unencumbered software released into the public domain.
// Anyone is free to copy, modify, publish, use, compile, sell, or
// distribute this software, either in source code form or as a compiled
// binary, for any purpose, commercial or non-commercial, and by any
// means.
// In jurisdictions that recognize copyright laws, the author or authors
// of this software dedicate any and all copyright interest in the
// software to the public domain. We make this dedication for the benefit
// of the public at large and to the detriment of our heirs and
// successors. We intend this dedication to be an overt act of
// relinquishment in perpetuity of all present and future rights to this
// software under copyright law.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
// OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
// OTHER DEALINGS IN THE SOFTWARE.
// For more information, please refer to
// Implementation of Dmitry Vyukov's MPMC algorithm
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
#ifndef __MPMC_BOUNDED_QUEUE_INCLUDED__
#define __MPMC_BOUNDED_QUEUE_INCLUDED__
#include
#include
namespace concqueue
{
template
class mpmc_bounded_queue_t
{
public:
mpmc_bounded_queue_t(size_t size)
: _size(size)
, _mask(size - 1)
, _buffer(reinterpret_cast(new aligned_node_t[_size]))
, _head_seq(0)
, _tail_seq(0)
{
// make sure it's a power of 2
assert((_size != 0) && ((_size & (~_size + 1)) == _size));
// populate the sequence initial values
for (size_t i = 0; i < _size; ++i)
{
_buffer[i].seq.store(i, std::memory_order_relaxed);
}
}
~mpmc_bounded_queue_t()
{
delete[] _buffer;
}
bool enqueue(const T& data)
{
// _head_seq only wraps at MAX(_head_seq) instead we use a mask to convert the sequence to an array index
// this is why the ring buffer must be a size which is a power of 2. this also allows the sequence to double as a ticket/lock.
size_t head_seq = _head_seq.load(std::memory_order_relaxed);
for (;; )
{
node_t* node = &_buffer[head_seq & _mask];
size_t node_seq = node->seq.load(std::memory_order_acquire);
intptr_t dif = (intptr_t)node_seq - (intptr_t)head_seq;
// if seq and head_seq are the same then it means this slot is empty
if (dif == 0)
{
// claim our spot by moving head
// if head isn't the same as we last checked then that means someone beat us to the punch
// weak compare is faster, but can return spurious results
// which in this instance is OK, because it's in the loop
if (_head_seq.compare_exchange_weak(head_seq, head_seq + 1, std::memory_order_relaxed))
{
// set the data
node->data = data;
// increment the sequence so that the tail knows it's accessible
node->seq.store(head_seq + 1, std::memory_order_release);
return true;
}
}
else if (dif < 0)
{
// if seq is less than head seq then it means this slot is full and therefore the buffer is full
return false;
}
else
{
// under normal circumstances this branch should never be taken
head_seq = _head_seq.load(std::memory_order_relaxed);
}
}
// never taken
return false;
}
bool dequeue(T& data)
{
size_t tail_seq = _tail_seq.load(std::memory_order_relaxed);
for (;; )
{
node_t* node = &_buffer[tail_seq & _mask];
size_t node_seq = node->seq.load(std::memory_order_acquire);
intptr_t dif = (intptr_t)node_seq - (intptr_t)(tail_seq + 1);
// if seq and head_seq are the same then it means this slot is empty
if (dif == 0)
{
// claim our spot by moving head
// if head isn't the same as we last checked then that means someone beat us to the punch
// weak compare is faster, but can return spurious results
// which in this instance is OK, because it's in the loop
if (_tail_seq.compare_exchange_weak(tail_seq, tail_seq + 1, std::memory_order_relaxed))
{
// set the output
data = node->data;
// set the sequence to what the head sequence should be next time around
node->seq.store(tail_seq + _mask + 1, std::memory_order_release);
return true;
}
}
else if (dif < 0)
{
// if seq is less than head seq then it means this slot is full and therefore the buffer is full
return false;
}
else
{
// under normal circumstances this branch should never be taken
tail_seq = _tail_seq.load(std::memory_order_relaxed);
}
}
// never taken
return false;
}
private:
struct node_t
{
T data;
std::atomic seq;
};
typedef typename std::aligned_storage::value>::type aligned_node_t;
typedef char cache_line_pad_t[64]; // it's either 32 or 64 so 64 is good enough
cache_line_pad_t _pad0;
const size_t _size;
const size_t _mask;
node_t* const _buffer;
cache_line_pad_t _pad1;
std::atomic _head_seq;
cache_line_pad_t _pad2;
std::atomic _tail_seq;
cache_line_pad_t _pad3;
mpmc_bounded_queue_t(const mpmc_bounded_queue_t&) {}
void operator=(const mpmc_bounded_queue_t&) {}
};
}
#endif