/* SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. * * Licensed to Elasticsearch B.V. under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ // --------------------------------------------------------------------- // Copyright (c) 2015-2016 Microsoft // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. // --------------------------------------------------------------------- using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Threading; namespace OpenSearch.Net { /// /// Manages pools of RecyclableMemoryStream objects. /// /// /// There are two pools managed in here. The small pool contains same-sized buffers that are handed to streams /// as they write more data. /// For scenarios that need to call GetBuffer(), the large pool contains buffers of various sizes, all /// multiples/exponentials of LargeBufferMultiple (1 MB by default). They are split by size to avoid overly-wasteful buffer /// usage. There should be far fewer 8 MB buffers than 1 MB buffers, for example. /// internal partial class RecyclableMemoryStreamManager { /// /// Generic delegate for handling events without any arguments. /// public delegate void EventHandler(); /// /// Delegate for handling large buffer discard reports. /// /// Reason the buffer was discarded. public delegate void LargeBufferDiscardedEventHandler(Events.MemoryStreamDiscardReason reason); /// /// Delegate for handling reports of stream size when streams are allocated /// /// Bytes allocated. public delegate void StreamLengthReportHandler(long bytes); /// /// Delegate for handling periodic reporting of memory use statistics. /// /// Bytes currently in use in the small pool. /// Bytes currently free in the small pool. /// Bytes currently in use in the large pool. /// Bytes currently free in the large pool. public delegate void UsageReportEventHandler( long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes ); public const int DefaultBlockSize = 128 * 1024; public const int DefaultLargeBufferMultiple = 1024 * 1024; public const int DefaultMaximumBufferSize = 128 * 1024 * 1024; private readonly long[] _largeBufferFreeSize; private readonly long[] _largeBufferInUseSize; /// /// pools[0] = 1x largeBufferMultiple buffers /// pools[1] = 2x largeBufferMultiple buffers /// pools[2] = 3x(multiple)/4x(exponential) largeBufferMultiple buffers /// etc., up to maximumBufferSize /// private readonly ConcurrentStack[] _largePools; private readonly ConcurrentStack _smallPool; private long _smallPoolFreeSize; private long _smallPoolInUseSize; /// /// Initializes the memory manager with the default block/buffer specifications. /// public RecyclableMemoryStreamManager() : this(DefaultBlockSize, DefaultLargeBufferMultiple, DefaultMaximumBufferSize, false) { } /// /// Initializes the memory manager with the given block requiredSize. /// /// Size of each block that is pooled. Must be > 0. /// Each large buffer will be a multiple of this value. /// Buffers larger than this are not pooled /// /// blockSize is not a positive number, or largeBufferMultiple is not a /// positive number, or maximumBufferSize is less than blockSize. /// /// maximumBufferSize is not a multiple of largeBufferMultiple public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int maximumBufferSize) : this(blockSize, largeBufferMultiple, maximumBufferSize, false) { } /// /// Initializes the memory manager with the given block requiredSize. /// /// Size of each block that is pooled. Must be > 0. /// Each large buffer will be a multiple/exponential of this value. /// Buffers larger than this are not pooled /// Switch to exponential large buffer allocation strategy /// /// blockSize is not a positive number, or largeBufferMultiple is not a /// positive number, or maximumBufferSize is less than blockSize. /// /// maximumBufferSize is not a multiple/exponential of largeBufferMultiple public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int maximumBufferSize, bool useExponentialLargeBuffer) { if (blockSize <= 0) throw new ArgumentOutOfRangeException(nameof(blockSize), blockSize, "blockSize must be a positive number"); if (largeBufferMultiple <= 0) throw new ArgumentOutOfRangeException(nameof(largeBufferMultiple), "largeBufferMultiple must be a positive number"); if (maximumBufferSize < blockSize) throw new ArgumentOutOfRangeException(nameof(maximumBufferSize), "maximumBufferSize must be at least blockSize"); BlockSize = blockSize; LargeBufferMultiple = largeBufferMultiple; MaximumBufferSize = maximumBufferSize; UseExponentialLargeBuffer = useExponentialLargeBuffer; if (!IsLargeBufferSize(maximumBufferSize)) throw new ArgumentException(string.Format("maximumBufferSize is not {0} of largeBufferMultiple", UseExponentialLargeBuffer ? "an exponential" : "a multiple"), nameof(maximumBufferSize)); _smallPool = new ConcurrentStack(); var numLargePools = useExponentialLargeBuffer // ReSharper disable once PossibleLossOfFraction // Not our code assume loss is intentional ? (int)Math.Log(maximumBufferSize / largeBufferMultiple, 2) + 1 : maximumBufferSize / largeBufferMultiple; // +1 to store size of bytes in use that are too large to be pooled _largeBufferInUseSize = new long[numLargePools + 1]; _largeBufferFreeSize = new long[numLargePools]; _largePools = new ConcurrentStack[numLargePools]; for (var i = 0; i < _largePools.Length; ++i) _largePools[i] = new ConcurrentStack(); EventsWriter.MemoryStreamManagerInitialized(blockSize, largeBufferMultiple, maximumBufferSize); } /// /// Whether dirty buffers can be immediately returned to the buffer pool. E.g. when GetBuffer() is called on /// a stream and creates a single large buffer, if this setting is enabled, the other blocks will be returned /// to the buffer pool immediately. /// Note when enabling this setting that the user is responsible for ensuring that any buffer previously /// retrieved from a stream which is subsequently modified is not used after modification (as it may no longer /// be valid). /// public bool AggressiveBufferReturn { get; set; } /// /// The size of each block. It must be set at creation and cannot be changed. /// public int BlockSize { get; } /// /// Whether to save callstacks for stream allocations. This can help in debugging. /// It should NEVER be turned on generally in production. /// public bool GenerateCallStacks { get; set; } /// /// All buffers are multiples/exponentials of this number. It must be set at creation and cannot be changed. /// public int LargeBufferMultiple { get; } /// /// How many buffers are in the large pool /// public long LargeBuffersFree { get { long free = 0; foreach (var pool in _largePools) free += pool.Count; return free; } } /// /// Number of bytes in large pool not currently in use /// public long LargePoolFreeSize { get { long sum = 0; foreach (var freeSize in _largeBufferFreeSize) sum += freeSize; return sum; } } /// /// Number of bytes currently in use by streams from the large pool /// public long LargePoolInUseSize { get { long sum = 0; foreach (var inUseSize in _largeBufferInUseSize) sum += inUseSize; return sum; } } /// /// Gets the maximum buffer size. /// /// /// Any buffer that is returned to the pool that is larger than this will be /// discarded and garbage collected. /// public int MaximumBufferSize { get; } /// /// How many bytes of large free buffers to allow before we start dropping /// those returned to us. /// public long MaximumFreeLargePoolBytes { get; set; } /// /// How many bytes of small free blocks to allow before we start dropping /// those returned to us. /// public long MaximumFreeSmallPoolBytes { get; set; } /// /// Maximum stream capacity in bytes. Attempts to set a larger capacity will /// result in an exception. /// /// A value of 0 indicates no limit. public long MaximumStreamCapacity { get; set; } /// /// How many blocks are in the small pool /// public long SmallBlocksFree => _smallPool.Count; /// /// Number of bytes in small pool not currently in use /// public long SmallPoolFreeSize => _smallPoolFreeSize; /// /// Number of bytes currently in use by stream from the small pool /// public long SmallPoolInUseSize => _smallPoolInUseSize; /// /// Use exponential large buffer allocation strategy. It must be set at creation and cannot be changed. /// public bool UseExponentialLargeBuffer { get; } /// /// Use multiple large buffer allocation strategy. It must be set at creation and cannot be changed. /// public bool UseMultipleLargeBuffer => !UseExponentialLargeBuffer; /// /// Removes and returns a single block from the pool. /// /// A byte[] array internal byte[] GetBlock() { byte[] block; if (!_smallPool.TryPop(out block)) { // We'll add this back to the pool when the stream is disposed // (unless our free pool is too large) block = new byte[BlockSize]; EventsWriter.MemoryStreamNewBlockCreated(_smallPoolInUseSize); ReportBlockCreated(); } else Interlocked.Add(ref _smallPoolFreeSize, -BlockSize); Interlocked.Add(ref _smallPoolInUseSize, BlockSize); return block; } /// /// Returns a buffer of arbitrary size from the large buffer pool. This buffer /// will be at least the requiredSize and always be a multiple/exponential of largeBufferMultiple. /// /// The minimum length of the buffer /// The tag of the stream returning this buffer, for logging if necessary. /// A buffer of at least the required size. internal byte[] GetLargeBuffer(int requiredSize, string tag) { requiredSize = RoundToLargeBufferSize(requiredSize); var poolIndex = GetPoolIndex(requiredSize); byte[] buffer; if (poolIndex < _largePools.Length) { if (!_largePools[poolIndex].TryPop(out buffer)) { buffer = new byte[requiredSize]; EventsWriter.MemoryStreamNewLargeBufferCreated(requiredSize, LargePoolInUseSize); ReportLargeBufferCreated(); } else Interlocked.Add(ref _largeBufferFreeSize[poolIndex], -buffer.Length); } else { // Buffer is too large to pool. They get a new buffer. // We still want to track the size, though, and we've reserved a slot // in the end of the inuse array for nonpooled bytes in use. poolIndex = _largeBufferInUseSize.Length - 1; // We still want to round up to reduce heap fragmentation. buffer = new byte[requiredSize]; string callStack = null; if (GenerateCallStacks) // Grab the stack -- we want to know who requires such large buffers callStack = Environment.StackTrace; EventsWriter.MemoryStreamNonPooledLargeBufferCreated(requiredSize, tag, callStack); ReportLargeBufferCreated(); } Interlocked.Add(ref _largeBufferInUseSize[poolIndex], buffer.Length); return buffer; } private int RoundToLargeBufferSize(int requiredSize) { if (UseExponentialLargeBuffer) { var pow = 1; while (LargeBufferMultiple * pow < requiredSize) pow <<= 1; return LargeBufferMultiple * pow; } else return (requiredSize + LargeBufferMultiple - 1) / LargeBufferMultiple * LargeBufferMultiple; } private bool IsLargeBufferSize(int value) => value != 0 && (UseExponentialLargeBuffer ? value == RoundToLargeBufferSize(value) : value % LargeBufferMultiple == 0); private int GetPoolIndex(int length) { if (UseExponentialLargeBuffer) { var index = 0; while (LargeBufferMultiple << index < length) ++index; return index; } else return length / LargeBufferMultiple - 1; } /// /// Returns the buffer to the large pool /// /// The buffer to return. /// The tag of the stream returning this buffer, for logging if necessary. /// buffer is null /// /// buffer.Length is not a multiple/exponential of LargeBufferMultiple (it did not /// originate from this pool) /// internal void ReturnLargeBuffer(byte[] buffer, string tag) { if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (!IsLargeBufferSize(buffer.Length)) throw new ArgumentException( string.Format("buffer did not originate from this memory manager. The size is not {0} of ", UseExponentialLargeBuffer ? "an exponential" : "a multiple") + LargeBufferMultiple); var poolIndex = GetPoolIndex(buffer.Length); if (poolIndex < _largePools.Length) { if ((_largePools[poolIndex].Count + 1) * buffer.Length <= MaximumFreeLargePoolBytes || MaximumFreeLargePoolBytes == 0) { _largePools[poolIndex].Push(buffer); Interlocked.Add(ref _largeBufferFreeSize[poolIndex], buffer.Length); } else { EventsWriter.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag, Events.MemoryStreamDiscardReason.EnoughFree); ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.EnoughFree); } } else { // This is a non-poolable buffer, but we still want to track its size for inuse // analysis. We have space in the inuse array for this. poolIndex = _largeBufferInUseSize.Length - 1; EventsWriter.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag, Events.MemoryStreamDiscardReason.TooLarge); ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.TooLarge); } Interlocked.Add(ref _largeBufferInUseSize[poolIndex], -buffer.Length); ReportUsageReport(_smallPoolInUseSize, _smallPoolFreeSize, LargePoolInUseSize, LargePoolFreeSize); } /// /// Returns the blocks to the pool /// /// Collection of blocks to return to the pool /// The tag of the stream returning these blocks, for logging if necessary. /// blocks is null /// blocks contains buffers that are the wrong size (or null) for this memory manager internal void ReturnBlocks(ICollection blocks, string tag) { if (blocks == null) throw new ArgumentNullException(nameof(blocks)); var bytesToReturn = blocks.Count * BlockSize; Interlocked.Add(ref _smallPoolInUseSize, -bytesToReturn); foreach (var block in blocks) { if (block == null || block.Length != BlockSize) throw new ArgumentException("blocks contains buffers that are not BlockSize in length"); } foreach (var block in blocks) { if (MaximumFreeSmallPoolBytes == 0 || SmallPoolFreeSize < MaximumFreeSmallPoolBytes) { Interlocked.Add(ref _smallPoolFreeSize, BlockSize); _smallPool.Push(block); } else { EventsWriter.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Small, tag, Events.MemoryStreamDiscardReason.EnoughFree); ReportBlockDiscarded(); break; } } ReportUsageReport(_smallPoolInUseSize, _smallPoolFreeSize, LargePoolInUseSize, LargePoolFreeSize); } internal void ReportBlockCreated() => BlockCreated?.Invoke(); internal void ReportBlockDiscarded() => BlockDiscarded?.Invoke(); internal void ReportLargeBufferCreated() => LargeBufferCreated?.Invoke(); internal void ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason reason) => LargeBufferDiscarded?.Invoke(reason); internal void ReportStreamCreated() => StreamCreated?.Invoke(); internal void ReportStreamDisposed() => StreamDisposed?.Invoke(); internal void ReportStreamFinalized() => StreamFinalized?.Invoke(); internal void ReportStreamLength(long bytes) => StreamLength?.Invoke(bytes); internal void ReportStreamToArray() => StreamConvertedToArray?.Invoke(); internal void ReportUsageReport( long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes ) => UsageReport?.Invoke(smallPoolInUseBytes, smallPoolFreeBytes, largePoolInUseBytes, largePoolFreeBytes); /// /// Retrieve a new MemoryStream object with no tag and a default initial capacity. /// /// A MemoryStream. public MemoryStream GetStream() => new RecyclableMemoryStream(this); private class ReportingMemoryStream : MemoryStream { private readonly RecyclableMemoryStreamManager _instance; public ReportingMemoryStream(byte[] bytes, RecyclableMemoryStreamManager instance) : base(bytes) => _instance = instance; } /// /// Shortcut to create a stream that directly wraps bytes but still uses reporting on the stream being created and disposes. /// Note this does NOT use the pooled memory streams as the bytes have already been allocated /// public MemoryStream GetStream(byte[] bytes) => new ReportingMemoryStream(bytes, this); /// /// Retrieve a new MemoryStream object with no tag and a default initial capacity. /// /// A unique identifier which can be used to trace usages of the stream. /// A MemoryStream. public MemoryStream GetStream(Guid id) => new RecyclableMemoryStream(this, id); /// /// Retrieve a new MemoryStream object with the given tag and a default initial capacity. /// /// A tag which can be used to track the source of the stream. /// A MemoryStream. public MemoryStream GetStream(string tag) => new RecyclableMemoryStream(this, tag); /// /// Retrieve a new MemoryStream object with the given tag and a default initial capacity. /// /// A unique identifier which can be used to trace usages of the stream. /// A tag which can be used to track the source of the stream. /// A MemoryStream. public MemoryStream GetStream(Guid id, string tag) => new RecyclableMemoryStream(this, id, tag); /// /// Retrieve a new MemoryStream object with the given tag and at least the given capacity. /// /// A unique identifier which can be used to trace usages of the stream. /// A tag which can be used to track the source of the stream. /// The minimum desired capacity for the stream. /// A MemoryStream. public MemoryStream GetStream(string tag, int requiredSize) => new RecyclableMemoryStream(this, tag, requiredSize); /// /// Retrieve a new MemoryStream object with the given tag and at least the given capacity. /// /// A unique identifier which can be used to trace usages of the stream. /// A tag which can be used to track the source of the stream. /// The minimum desired capacity for the stream. /// A MemoryStream. public MemoryStream GetStream(Guid id, string tag, int requiredSize) => new RecyclableMemoryStream(this, id, tag, requiredSize); /// /// Retrieve a new MemoryStream object with the given tag and at least the given capacity, possibly using /// a single contiguous underlying buffer. /// /// /// Retrieving a MemoryStream which provides a single contiguous buffer can be useful in situations /// where the initial size is known and it is desirable to avoid copying data between the smaller underlying /// buffers to a single large one. This is most helpful when you know that you will always call GetBuffer /// on the underlying stream. /// /// A unique identifier which can be used to trace usages of the stream. /// A tag which can be used to track the source of the stream. /// The minimum desired capacity for the stream. /// Whether to attempt to use a single contiguous buffer. /// A MemoryStream. public MemoryStream GetStream(Guid id, string tag, int requiredSize, bool asContiguousBuffer) { if (!asContiguousBuffer || requiredSize <= BlockSize) return GetStream(id, tag, requiredSize); return new RecyclableMemoryStream(this, id, tag, requiredSize, GetLargeBuffer(requiredSize, tag)); } /// /// Retrieve a new MemoryStream object with the given tag and at least the given capacity, possibly using /// a single contiguous underlying buffer. /// /// /// Retrieving a MemoryStream which provides a single contiguous buffer can be useful in situations /// where the initial size is known and it is desirable to avoid copying data between the smaller underlying /// buffers to a single large one. This is most helpful when you know that you will always call GetBuffer /// on the underlying stream. /// /// A tag which can be used to track the source of the stream. /// The minimum desired capacity for the stream. /// Whether to attempt to use a single contiguous buffer. /// A MemoryStream. public MemoryStream GetStream(string tag, int requiredSize, bool asContiguousBuffer) => GetStream(Guid.NewGuid(), tag, requiredSize, asContiguousBuffer); /// /// Retrieve a new MemoryStream object with the given tag and with contents copied from the provided /// buffer. The provided buffer is not wrapped or used after construction. /// /// The new stream's position is set to the beginning of the stream when returned. /// A unique identifier which can be used to trace usages of the stream. /// A tag which can be used to track the source of the stream. /// The byte buffer to copy data from. /// The offset from the start of the buffer to copy from. /// The number of bytes to copy from the buffer. /// A MemoryStream. public MemoryStream GetStream(Guid id, string tag, byte[] buffer, int offset, int count) { RecyclableMemoryStream stream = null; try { stream = new RecyclableMemoryStream(this, id, tag, count); stream.Write(buffer, offset, count); stream.Position = 0; return stream; } catch { stream?.Dispose(); throw; } } /// /// Retrieve a new MemoryStream object with the given tag and with contents copied from the provided /// buffer. The provided buffer is not wrapped or used after construction. /// /// The new stream's position is set to the beginning of the stream when returned. /// A tag which can be used to track the source of the stream. /// The byte buffer to copy data from. /// The offset from the start of the buffer to copy from. /// The number of bytes to copy from the buffer. /// A MemoryStream. public MemoryStream GetStream(string tag, byte[] buffer, int offset, int count) => GetStream(Guid.NewGuid(), tag, buffer, offset, count); /// /// Triggered when a new block is created. /// public event EventHandler BlockCreated; /// /// Triggered when a new block is created. /// public event EventHandler BlockDiscarded; /// /// Triggered when a new large buffer is created. /// public event EventHandler LargeBufferCreated; /// /// Triggered when a new stream is created. /// public event EventHandler StreamCreated; /// /// Triggered when a stream is disposed. /// public event EventHandler StreamDisposed; /// /// Triggered when a stream is finalized. /// public event EventHandler StreamFinalized; /// /// Triggered when a stream is finalized. /// public event StreamLengthReportHandler StreamLength; /// /// Triggered when a user converts a stream to array. /// public event EventHandler StreamConvertedToArray; /// /// Triggered when a large buffer is discarded, along with the reason for the discard. /// public event LargeBufferDiscardedEventHandler LargeBufferDiscarded; /// /// Periodically triggered to report usage statistics. /// public event UsageReportEventHandler UsageReport; } }