/* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// ---------------------------------------------------------------------
// Copyright (c) 2015-2016 Microsoft
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
// ---------------------------------------------------------------------
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Threading;
namespace OpenSearch.Net
{
///
/// Manages pools of RecyclableMemoryStream objects.
///
///
/// There are two pools managed in here. The small pool contains same-sized buffers that are handed to streams
/// as they write more data.
/// For scenarios that need to call GetBuffer(), the large pool contains buffers of various sizes, all
/// multiples/exponentials of LargeBufferMultiple (1 MB by default). They are split by size to avoid overly-wasteful buffer
/// usage. There should be far fewer 8 MB buffers than 1 MB buffers, for example.
///
internal partial class RecyclableMemoryStreamManager
{
///
/// Generic delegate for handling events without any arguments.
///
public delegate void EventHandler();
///
/// Delegate for handling large buffer discard reports.
///
/// Reason the buffer was discarded.
public delegate void LargeBufferDiscardedEventHandler(Events.MemoryStreamDiscardReason reason);
///
/// Delegate for handling reports of stream size when streams are allocated
///
/// Bytes allocated.
public delegate void StreamLengthReportHandler(long bytes);
///
/// Delegate for handling periodic reporting of memory use statistics.
///
/// Bytes currently in use in the small pool.
/// Bytes currently free in the small pool.
/// Bytes currently in use in the large pool.
/// Bytes currently free in the large pool.
public delegate void UsageReportEventHandler(
long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes
);
public const int DefaultBlockSize = 128 * 1024;
public const int DefaultLargeBufferMultiple = 1024 * 1024;
public const int DefaultMaximumBufferSize = 128 * 1024 * 1024;
private readonly long[] _largeBufferFreeSize;
private readonly long[] _largeBufferInUseSize;
///
/// pools[0] = 1x largeBufferMultiple buffers
/// pools[1] = 2x largeBufferMultiple buffers
/// pools[2] = 3x(multiple)/4x(exponential) largeBufferMultiple buffers
/// etc., up to maximumBufferSize
///
private readonly ConcurrentStack[] _largePools;
private readonly ConcurrentStack _smallPool;
private long _smallPoolFreeSize;
private long _smallPoolInUseSize;
///
/// Initializes the memory manager with the default block/buffer specifications.
///
public RecyclableMemoryStreamManager()
: this(DefaultBlockSize, DefaultLargeBufferMultiple, DefaultMaximumBufferSize, false) { }
///
/// Initializes the memory manager with the given block requiredSize.
///
/// Size of each block that is pooled. Must be > 0.
/// Each large buffer will be a multiple of this value.
/// Buffers larger than this are not pooled
///
/// blockSize is not a positive number, or largeBufferMultiple is not a
/// positive number, or maximumBufferSize is less than blockSize.
///
/// maximumBufferSize is not a multiple of largeBufferMultiple
public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int maximumBufferSize)
: this(blockSize, largeBufferMultiple, maximumBufferSize, false) { }
///
/// Initializes the memory manager with the given block requiredSize.
///
/// Size of each block that is pooled. Must be > 0.
/// Each large buffer will be a multiple/exponential of this value.
/// Buffers larger than this are not pooled
/// Switch to exponential large buffer allocation strategy
///
/// blockSize is not a positive number, or largeBufferMultiple is not a
/// positive number, or maximumBufferSize is less than blockSize.
///
/// maximumBufferSize is not a multiple/exponential of largeBufferMultiple
public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int maximumBufferSize, bool useExponentialLargeBuffer)
{
if (blockSize <= 0) throw new ArgumentOutOfRangeException(nameof(blockSize), blockSize, "blockSize must be a positive number");
if (largeBufferMultiple <= 0)
throw new ArgumentOutOfRangeException(nameof(largeBufferMultiple),
"largeBufferMultiple must be a positive number");
if (maximumBufferSize < blockSize)
throw new ArgumentOutOfRangeException(nameof(maximumBufferSize),
"maximumBufferSize must be at least blockSize");
BlockSize = blockSize;
LargeBufferMultiple = largeBufferMultiple;
MaximumBufferSize = maximumBufferSize;
UseExponentialLargeBuffer = useExponentialLargeBuffer;
if (!IsLargeBufferSize(maximumBufferSize))
throw new ArgumentException(string.Format("maximumBufferSize is not {0} of largeBufferMultiple",
UseExponentialLargeBuffer ? "an exponential" : "a multiple"),
nameof(maximumBufferSize));
_smallPool = new ConcurrentStack();
var numLargePools = useExponentialLargeBuffer
// ReSharper disable once PossibleLossOfFraction
// Not our code assume loss is intentional
? (int)Math.Log(maximumBufferSize / largeBufferMultiple, 2) + 1
: maximumBufferSize / largeBufferMultiple;
// +1 to store size of bytes in use that are too large to be pooled
_largeBufferInUseSize = new long[numLargePools + 1];
_largeBufferFreeSize = new long[numLargePools];
_largePools = new ConcurrentStack[numLargePools];
for (var i = 0; i < _largePools.Length; ++i) _largePools[i] = new ConcurrentStack();
EventsWriter.MemoryStreamManagerInitialized(blockSize, largeBufferMultiple, maximumBufferSize);
}
///
/// Whether dirty buffers can be immediately returned to the buffer pool. E.g. when GetBuffer() is called on
/// a stream and creates a single large buffer, if this setting is enabled, the other blocks will be returned
/// to the buffer pool immediately.
/// Note when enabling this setting that the user is responsible for ensuring that any buffer previously
/// retrieved from a stream which is subsequently modified is not used after modification (as it may no longer
/// be valid).
///
public bool AggressiveBufferReturn { get; set; }
///
/// The size of each block. It must be set at creation and cannot be changed.
///
public int BlockSize { get; }
///
/// Whether to save callstacks for stream allocations. This can help in debugging.
/// It should NEVER be turned on generally in production.
///
public bool GenerateCallStacks { get; set; }
///
/// All buffers are multiples/exponentials of this number. It must be set at creation and cannot be changed.
///
public int LargeBufferMultiple { get; }
///
/// How many buffers are in the large pool
///
public long LargeBuffersFree
{
get
{
long free = 0;
foreach (var pool in _largePools) free += pool.Count;
return free;
}
}
///
/// Number of bytes in large pool not currently in use
///
public long LargePoolFreeSize
{
get
{
long sum = 0;
foreach (var freeSize in _largeBufferFreeSize) sum += freeSize;
return sum;
}
}
///
/// Number of bytes currently in use by streams from the large pool
///
public long LargePoolInUseSize
{
get
{
long sum = 0;
foreach (var inUseSize in _largeBufferInUseSize) sum += inUseSize;
return sum;
}
}
///
/// Gets the maximum buffer size.
///
///
/// Any buffer that is returned to the pool that is larger than this will be
/// discarded and garbage collected.
///
public int MaximumBufferSize { get; }
///
/// How many bytes of large free buffers to allow before we start dropping
/// those returned to us.
///
public long MaximumFreeLargePoolBytes { get; set; }
///
/// How many bytes of small free blocks to allow before we start dropping
/// those returned to us.
///
public long MaximumFreeSmallPoolBytes { get; set; }
///
/// Maximum stream capacity in bytes. Attempts to set a larger capacity will
/// result in an exception.
///
/// A value of 0 indicates no limit.
public long MaximumStreamCapacity { get; set; }
///
/// How many blocks are in the small pool
///
public long SmallBlocksFree => _smallPool.Count;
///
/// Number of bytes in small pool not currently in use
///
public long SmallPoolFreeSize => _smallPoolFreeSize;
///
/// Number of bytes currently in use by stream from the small pool
///
public long SmallPoolInUseSize => _smallPoolInUseSize;
///
/// Use exponential large buffer allocation strategy. It must be set at creation and cannot be changed.
///
public bool UseExponentialLargeBuffer { get; }
///
/// Use multiple large buffer allocation strategy. It must be set at creation and cannot be changed.
///
public bool UseMultipleLargeBuffer => !UseExponentialLargeBuffer;
///
/// Removes and returns a single block from the pool.
///
/// A byte[] array
internal byte[] GetBlock()
{
byte[] block;
if (!_smallPool.TryPop(out block))
{
// We'll add this back to the pool when the stream is disposed
// (unless our free pool is too large)
block = new byte[BlockSize];
EventsWriter.MemoryStreamNewBlockCreated(_smallPoolInUseSize);
ReportBlockCreated();
}
else
Interlocked.Add(ref _smallPoolFreeSize, -BlockSize);
Interlocked.Add(ref _smallPoolInUseSize, BlockSize);
return block;
}
///
/// Returns a buffer of arbitrary size from the large buffer pool. This buffer
/// will be at least the requiredSize and always be a multiple/exponential of largeBufferMultiple.
///
/// The minimum length of the buffer
/// The tag of the stream returning this buffer, for logging if necessary.
/// A buffer of at least the required size.
internal byte[] GetLargeBuffer(int requiredSize, string tag)
{
requiredSize = RoundToLargeBufferSize(requiredSize);
var poolIndex = GetPoolIndex(requiredSize);
byte[] buffer;
if (poolIndex < _largePools.Length)
{
if (!_largePools[poolIndex].TryPop(out buffer))
{
buffer = new byte[requiredSize];
EventsWriter.MemoryStreamNewLargeBufferCreated(requiredSize, LargePoolInUseSize);
ReportLargeBufferCreated();
}
else
Interlocked.Add(ref _largeBufferFreeSize[poolIndex], -buffer.Length);
}
else
{
// Buffer is too large to pool. They get a new buffer.
// We still want to track the size, though, and we've reserved a slot
// in the end of the inuse array for nonpooled bytes in use.
poolIndex = _largeBufferInUseSize.Length - 1;
// We still want to round up to reduce heap fragmentation.
buffer = new byte[requiredSize];
string callStack = null;
if (GenerateCallStacks)
// Grab the stack -- we want to know who requires such large buffers
callStack = Environment.StackTrace;
EventsWriter.MemoryStreamNonPooledLargeBufferCreated(requiredSize, tag, callStack);
ReportLargeBufferCreated();
}
Interlocked.Add(ref _largeBufferInUseSize[poolIndex], buffer.Length);
return buffer;
}
private int RoundToLargeBufferSize(int requiredSize)
{
if (UseExponentialLargeBuffer)
{
var pow = 1;
while (LargeBufferMultiple * pow < requiredSize) pow <<= 1;
return LargeBufferMultiple * pow;
}
else
return (requiredSize + LargeBufferMultiple - 1) / LargeBufferMultiple * LargeBufferMultiple;
}
private bool IsLargeBufferSize(int value) =>
value != 0 && (UseExponentialLargeBuffer
? value == RoundToLargeBufferSize(value)
: value % LargeBufferMultiple == 0);
private int GetPoolIndex(int length)
{
if (UseExponentialLargeBuffer)
{
var index = 0;
while (LargeBufferMultiple << index < length) ++index;
return index;
}
else
return length / LargeBufferMultiple - 1;
}
///
/// Returns the buffer to the large pool
///
/// The buffer to return.
/// The tag of the stream returning this buffer, for logging if necessary.
/// buffer is null
///
/// buffer.Length is not a multiple/exponential of LargeBufferMultiple (it did not
/// originate from this pool)
///
internal void ReturnLargeBuffer(byte[] buffer, string tag)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (!IsLargeBufferSize(buffer.Length))
throw new ArgumentException(
string.Format("buffer did not originate from this memory manager. The size is not {0} of ",
UseExponentialLargeBuffer ? "an exponential" : "a multiple") +
LargeBufferMultiple);
var poolIndex = GetPoolIndex(buffer.Length);
if (poolIndex < _largePools.Length)
{
if ((_largePools[poolIndex].Count + 1) * buffer.Length <= MaximumFreeLargePoolBytes ||
MaximumFreeLargePoolBytes == 0)
{
_largePools[poolIndex].Push(buffer);
Interlocked.Add(ref _largeBufferFreeSize[poolIndex], buffer.Length);
}
else
{
EventsWriter.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag,
Events.MemoryStreamDiscardReason.EnoughFree);
ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.EnoughFree);
}
}
else
{
// This is a non-poolable buffer, but we still want to track its size for inuse
// analysis. We have space in the inuse array for this.
poolIndex = _largeBufferInUseSize.Length - 1;
EventsWriter.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag,
Events.MemoryStreamDiscardReason.TooLarge);
ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.TooLarge);
}
Interlocked.Add(ref _largeBufferInUseSize[poolIndex], -buffer.Length);
ReportUsageReport(_smallPoolInUseSize, _smallPoolFreeSize, LargePoolInUseSize,
LargePoolFreeSize);
}
///
/// Returns the blocks to the pool
///
/// Collection of blocks to return to the pool
/// The tag of the stream returning these blocks, for logging if necessary.
/// blocks is null
/// blocks contains buffers that are the wrong size (or null) for this memory manager
internal void ReturnBlocks(ICollection blocks, string tag)
{
if (blocks == null) throw new ArgumentNullException(nameof(blocks));
var bytesToReturn = blocks.Count * BlockSize;
Interlocked.Add(ref _smallPoolInUseSize, -bytesToReturn);
foreach (var block in blocks)
{
if (block == null || block.Length != BlockSize)
throw new ArgumentException("blocks contains buffers that are not BlockSize in length");
}
foreach (var block in blocks)
{
if (MaximumFreeSmallPoolBytes == 0 || SmallPoolFreeSize < MaximumFreeSmallPoolBytes)
{
Interlocked.Add(ref _smallPoolFreeSize, BlockSize);
_smallPool.Push(block);
}
else
{
EventsWriter.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Small, tag,
Events.MemoryStreamDiscardReason.EnoughFree);
ReportBlockDiscarded();
break;
}
}
ReportUsageReport(_smallPoolInUseSize, _smallPoolFreeSize, LargePoolInUseSize,
LargePoolFreeSize);
}
internal void ReportBlockCreated() => BlockCreated?.Invoke();
internal void ReportBlockDiscarded() => BlockDiscarded?.Invoke();
internal void ReportLargeBufferCreated() => LargeBufferCreated?.Invoke();
internal void ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason reason) => LargeBufferDiscarded?.Invoke(reason);
internal void ReportStreamCreated() => StreamCreated?.Invoke();
internal void ReportStreamDisposed() => StreamDisposed?.Invoke();
internal void ReportStreamFinalized() => StreamFinalized?.Invoke();
internal void ReportStreamLength(long bytes) => StreamLength?.Invoke(bytes);
internal void ReportStreamToArray() => StreamConvertedToArray?.Invoke();
internal void ReportUsageReport(
long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes
) =>
UsageReport?.Invoke(smallPoolInUseBytes, smallPoolFreeBytes, largePoolInUseBytes, largePoolFreeBytes);
///
/// Retrieve a new MemoryStream object with no tag and a default initial capacity.
///
/// A MemoryStream.
public MemoryStream GetStream() => new RecyclableMemoryStream(this);
private class ReportingMemoryStream : MemoryStream
{
private readonly RecyclableMemoryStreamManager _instance;
public ReportingMemoryStream(byte[] bytes, RecyclableMemoryStreamManager instance) : base(bytes) => _instance = instance;
}
///
/// Shortcut to create a stream that directly wraps bytes but still uses reporting on the stream being created and disposes.
/// Note this does NOT use the pooled memory streams as the bytes have already been allocated
///
public MemoryStream GetStream(byte[] bytes) => new ReportingMemoryStream(bytes, this);
///
/// Retrieve a new MemoryStream object with no tag and a default initial capacity.
///
/// A unique identifier which can be used to trace usages of the stream.
/// A MemoryStream.
public MemoryStream GetStream(Guid id) => new RecyclableMemoryStream(this, id);
///
/// Retrieve a new MemoryStream object with the given tag and a default initial capacity.
///
/// A tag which can be used to track the source of the stream.
/// A MemoryStream.
public MemoryStream GetStream(string tag) => new RecyclableMemoryStream(this, tag);
///
/// Retrieve a new MemoryStream object with the given tag and a default initial capacity.
///
/// A unique identifier which can be used to trace usages of the stream.
/// A tag which can be used to track the source of the stream.
/// A MemoryStream.
public MemoryStream GetStream(Guid id, string tag) => new RecyclableMemoryStream(this, id, tag);
///
/// Retrieve a new MemoryStream object with the given tag and at least the given capacity.
///
/// A unique identifier which can be used to trace usages of the stream.
/// A tag which can be used to track the source of the stream.
/// The minimum desired capacity for the stream.
/// A MemoryStream.
public MemoryStream GetStream(string tag, int requiredSize) => new RecyclableMemoryStream(this, tag, requiredSize);
///
/// Retrieve a new MemoryStream object with the given tag and at least the given capacity.
///
/// A unique identifier which can be used to trace usages of the stream.
/// A tag which can be used to track the source of the stream.
/// The minimum desired capacity for the stream.
/// A MemoryStream.
public MemoryStream GetStream(Guid id, string tag, int requiredSize) => new RecyclableMemoryStream(this, id, tag, requiredSize);
///
/// Retrieve a new MemoryStream object with the given tag and at least the given capacity, possibly using
/// a single contiguous underlying buffer.
///
///
/// Retrieving a MemoryStream which provides a single contiguous buffer can be useful in situations
/// where the initial size is known and it is desirable to avoid copying data between the smaller underlying
/// buffers to a single large one. This is most helpful when you know that you will always call GetBuffer
/// on the underlying stream.
///
/// A unique identifier which can be used to trace usages of the stream.
/// A tag which can be used to track the source of the stream.
/// The minimum desired capacity for the stream.
/// Whether to attempt to use a single contiguous buffer.
/// A MemoryStream.
public MemoryStream GetStream(Guid id, string tag, int requiredSize, bool asContiguousBuffer)
{
if (!asContiguousBuffer || requiredSize <= BlockSize) return GetStream(id, tag, requiredSize);
return new RecyclableMemoryStream(this, id, tag, requiredSize, GetLargeBuffer(requiredSize, tag));
}
///
/// Retrieve a new MemoryStream object with the given tag and at least the given capacity, possibly using
/// a single contiguous underlying buffer.
///
///
/// Retrieving a MemoryStream which provides a single contiguous buffer can be useful in situations
/// where the initial size is known and it is desirable to avoid copying data between the smaller underlying
/// buffers to a single large one. This is most helpful when you know that you will always call GetBuffer
/// on the underlying stream.
///
/// A tag which can be used to track the source of the stream.
/// The minimum desired capacity for the stream.
/// Whether to attempt to use a single contiguous buffer.
/// A MemoryStream.
public MemoryStream GetStream(string tag, int requiredSize, bool asContiguousBuffer) =>
GetStream(Guid.NewGuid(), tag, requiredSize, asContiguousBuffer);
///
/// Retrieve a new MemoryStream object with the given tag and with contents copied from the provided
/// buffer. The provided buffer is not wrapped or used after construction.
///
/// The new stream's position is set to the beginning of the stream when returned.
/// A unique identifier which can be used to trace usages of the stream.
/// A tag which can be used to track the source of the stream.
/// The byte buffer to copy data from.
/// The offset from the start of the buffer to copy from.
/// The number of bytes to copy from the buffer.
/// A MemoryStream.
public MemoryStream GetStream(Guid id, string tag, byte[] buffer, int offset, int count)
{
RecyclableMemoryStream stream = null;
try
{
stream = new RecyclableMemoryStream(this, id, tag, count);
stream.Write(buffer, offset, count);
stream.Position = 0;
return stream;
}
catch
{
stream?.Dispose();
throw;
}
}
///
/// Retrieve a new MemoryStream object with the given tag and with contents copied from the provided
/// buffer. The provided buffer is not wrapped or used after construction.
///
/// The new stream's position is set to the beginning of the stream when returned.
/// A tag which can be used to track the source of the stream.
/// The byte buffer to copy data from.
/// The offset from the start of the buffer to copy from.
/// The number of bytes to copy from the buffer.
/// A MemoryStream.
public MemoryStream GetStream(string tag, byte[] buffer, int offset, int count) => GetStream(Guid.NewGuid(), tag, buffer, offset, count);
///
/// Triggered when a new block is created.
///
public event EventHandler BlockCreated;
///
/// Triggered when a new block is created.
///
public event EventHandler BlockDiscarded;
///
/// Triggered when a new large buffer is created.
///
public event EventHandler LargeBufferCreated;
///
/// Triggered when a new stream is created.
///
public event EventHandler StreamCreated;
///
/// Triggered when a stream is disposed.
///
public event EventHandler StreamDisposed;
///
/// Triggered when a stream is finalized.
///
public event EventHandler StreamFinalized;
///
/// Triggered when a stream is finalized.
///
public event StreamLengthReportHandler StreamLength;
///
/// Triggered when a user converts a stream to array.
///
public event EventHandler StreamConvertedToArray;
///
/// Triggered when a large buffer is discarded, along with the reason for the discard.
///
public event LargeBufferDiscardedEventHandler LargeBufferDiscarded;
///
/// Periodically triggered to report usage statistics.
///
public event UsageReportEventHandler UsageReport;
}
}