/* SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. * * Licensed to Elasticsearch B.V. under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ // The MIT License (MIT) // // Copyright (c) 2015-2016 Microsoft // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. // // https://github.com/Microsoft/Microsoft.IO.RecyclableMemoryStream/blob/master/src/RecyclableMemoryStream.cs // MIT license: https://github.com/Microsoft/Microsoft.IO.RecyclableMemoryStream/blob/master/LICENSE using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Threading; namespace OpenSearch.Net { /// /// MemoryStream implementation that deals with pooling and managing memory streams which use potentially large /// buffers. /// /// /// This class works in tandem with the RecyclableMemoryStreamManager to supply MemoryStream /// objects to callers, while avoiding these specific problems: /// 1. LOH allocations - since all large buffers are pooled, they will never incur a Gen2 GC /// 2. Memory waste - A standard memory stream doubles its size when it runs out of room. This /// leads to continual memory growth as each stream approaches the maximum allowed size. /// 3. Memory copying - Each time a MemoryStream grows, all the bytes are copied into new buffers. /// This implementation only copies the bytes when GetBuffer is called. /// 4. Memory fragmentation - By using homogeneous buffer sizes, it ensures that blocks of memory /// can be easily reused. /// The stream is implemented on top of a series of uniformly-sized blocks. As the stream's length grows, /// additional blocks are retrieved from the memory manager. It is these blocks that are pooled, not the stream /// object itself. /// The biggest wrinkle in this implementation is when GetBuffer() is called. This requires a single /// contiguous buffer. If only a single block is in use, then that block is returned. If multiple blocks /// are in use, we retrieve a larger buffer from the memory manager. These large buffers are also pooled, /// split by size--they are multiples/exponentials of a chunk size (1 MB by default). /// Once a large buffer is assigned to the stream the blocks are NEVER again used for this stream. All operations take /// place on the /// large buffer. The large buffer can be replaced by a larger buffer from the pool as needed. All blocks and large buffers /// are maintained in the stream until the stream is disposed (unless AggressiveBufferReturn is enabled in the stream /// manager). /// internal sealed class RecyclableMemoryStream : MemoryStream { private const long MaxStreamLength = int.MaxValue; private static readonly byte[] EmptyArray = new byte[0]; /// /// All of these blocks must be the same size /// private readonly List _blocks = new List(1); /// /// This buffer exists so that WriteByte can forward all of its calls to Write /// without creating a new byte[] buffer on every call. /// private readonly byte[] _byteBuffer = new byte[1]; private readonly Guid _id; private readonly RecyclableMemoryStreamManager _memoryManager; private readonly string _tag; /// /// This list is used to store buffers once they're replaced by something larger. /// This is for the cases where you have users of this class that may hold onto the buffers longer /// than they should and you want to prevent race conditions which could corrupt the data. /// private List _dirtyBuffers; // long to allow Interlocked.Read (for .NET Standard 1.4 compat) private long _disposedState; /// /// This is only set by GetBuffer() if the necessary buffer is larger than a single block size, or on /// construction if the caller immediately requests a single large buffer. /// /// /// If this field is non-null, it contains the concatenation of the bytes found in the individual /// blocks. Once it is created, this (or a larger) largeBuffer will be used for the life of the stream. /// private byte[] _largeBuffer; /// /// Unique identifier for this stream across it's entire lifetime /// /// Object has been disposed internal Guid Id { get { CheckDisposed(); return _id; } } /// /// A temporary identifier for the current usage of this stream. /// /// Object has been disposed internal string Tag { get { CheckDisposed(); return _tag; } } /// /// Gets the memory manager being used by this stream. /// /// Object has been disposed internal RecyclableMemoryStreamManager MemoryManager { get { CheckDisposed(); return _memoryManager; } } /// /// Callstack of the constructor. It is only set if MemoryManager.GenerateCallStacks is true, /// which should only be in debugging situations. /// internal string AllocationStack { get; } /// /// Callstack of the Dispose call. It is only set if MemoryManager.GenerateCallStacks is true, /// which should only be in debugging situations. /// internal string DisposeStack { get; private set; } #region Constructors /// /// Allocate a new RecyclableMemoryStream object. /// /// The memory manager public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager) : this(memoryManager, Guid.NewGuid(), null, 0, null) { } /// /// Allocate a new RecyclableMemoryStream object. /// /// The memory manager /// A unique identifier which can be used to trace usages of the stream. public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id) : this(memoryManager, id, null, 0, null) { } /// /// Allocate a new RecyclableMemoryStream object /// /// The memory manager /// A string identifying this stream for logging and debugging purposes public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, string tag) : this(memoryManager, Guid.NewGuid(), tag, 0, null) { } /// /// Allocate a new RecyclableMemoryStream object /// /// The memory manager /// A unique identifier which can be used to trace usages of the stream. /// A string identifying this stream for logging and debugging purposes public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id, string tag) : this(memoryManager, id, tag, 0, null) { } /// /// Allocate a new RecyclableMemoryStream object /// /// The memory manager /// A string identifying this stream for logging and debugging purposes /// The initial requested size to prevent future allocations public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, string tag, int requestedSize) : this(memoryManager, Guid.NewGuid(), tag, requestedSize, null) { } /// /// Allocate a new RecyclableMemoryStream object /// /// The memory manager /// A unique identifier which can be used to trace usages of the stream. /// A string identifying this stream for logging and debugging purposes /// The initial requested size to prevent future allocations public RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id, string tag, int requestedSize) : this(memoryManager, id, tag, requestedSize, null) { } /// /// Allocate a new RecyclableMemoryStream object /// /// The memory manager /// A unique identifier which can be used to trace usages of the stream. /// A string identifying this stream for logging and debugging purposes /// The initial requested size to prevent future allocations /// /// An initial buffer to use. This buffer will be owned by the stream and returned to the /// memory manager upon Dispose. /// internal RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id, string tag, int requestedSize, byte[] initialLargeBuffer ) : base(EmptyArray) { _memoryManager = memoryManager; _id = id; _tag = tag; if (requestedSize < memoryManager.BlockSize) requestedSize = memoryManager.BlockSize; if (initialLargeBuffer == null) EnsureCapacity(requestedSize); else _largeBuffer = initialLargeBuffer; if (_memoryManager.GenerateCallStacks) AllocationStack = Environment.StackTrace; RecyclableMemoryStreamManager.EventsWriter.MemoryStreamCreated(_id, _tag, requestedSize); _memoryManager.ReportStreamCreated(); } #endregion #region Dispose and Finalize ~RecyclableMemoryStream() => Dispose(false); /// /// Returns the memory used by this stream back to the pool. /// /// Whether we're disposing (true), or being called by the finalizer (false) [SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly", Justification = "We have different disposal semantics, so SuppressFinalize is in a different spot.")] protected override void Dispose(bool disposing) { if (Interlocked.CompareExchange(ref _disposedState, 1, 0) != 0) { string doubleDisposeStack = null; if (_memoryManager.GenerateCallStacks) doubleDisposeStack = Environment.StackTrace; RecyclableMemoryStreamManager.EventsWriter.MemoryStreamDoubleDispose(_id, _tag, AllocationStack, DisposeStack, doubleDisposeStack); return; } RecyclableMemoryStreamManager.EventsWriter.MemoryStreamDisposed(_id, _tag); if (_memoryManager.GenerateCallStacks) DisposeStack = Environment.StackTrace; if (disposing) { _memoryManager.ReportStreamDisposed(); GC.SuppressFinalize(this); } else { // We're being finalized. RecyclableMemoryStreamManager.EventsWriter.MemoryStreamFinalized(_id, _tag, AllocationStack); #if !NETSTANDARD1_4 if (AppDomain.CurrentDomain.IsFinalizingForUnload()) { // If we're being finalized because of a shutdown, don't go any further. // We have no idea what's already been cleaned up. Triggering events may cause // a crash. base.Dispose(false); return; } #endif _memoryManager.ReportStreamFinalized(); } _memoryManager.ReportStreamLength(_length); if (_largeBuffer != null) _memoryManager.ReturnLargeBuffer(_largeBuffer, _tag); if (_dirtyBuffers != null) foreach (var buffer in _dirtyBuffers) _memoryManager.ReturnLargeBuffer(buffer, _tag); _memoryManager.ReturnBlocks(_blocks, _tag); _blocks.Clear(); base.Dispose(disposing); } /// /// Equivalent to Dispose /// #if NETSTANDARD1_4 public void Close() #else public override void Close() #endif { Dispose(true); } #endregion #region MemoryStream overrides /// /// Gets or sets the capacity /// /// /// Capacity is always in multiples of the memory manager's block size, unless /// the large buffer is in use. Capacity never decreases during a stream's lifetime. /// Explicitly setting the capacity to a lower value than the current value will have no effect. /// This is because the buffers are all pooled by chunks and there's little reason to /// allow stream truncation. /// /// Object has been disposed public override int Capacity { get { CheckDisposed(); if (_largeBuffer != null) return _largeBuffer.Length; var size = (long)_blocks.Count * _memoryManager.BlockSize; return (int)Math.Min(int.MaxValue, size); } set { CheckDisposed(); EnsureCapacity(value); } } private int _length; /// /// Gets the number of bytes written to this stream. /// /// Object has been disposed public override long Length { get { CheckDisposed(); return _length; } } private int _position; /// /// Gets the current position in the stream /// /// Object has been disposed public override long Position { get { CheckDisposed(); return _position; } set { CheckDisposed(); if (value < 0) throw new ArgumentOutOfRangeException("value", "value must be non-negative"); if (value > MaxStreamLength) throw new ArgumentOutOfRangeException("value", "value cannot be more than " + MaxStreamLength); _position = (int)value; } } /// /// Whether the stream can currently read /// public override bool CanRead => !Disposed; /// /// Whether the stream can currently seek /// public override bool CanSeek => !Disposed; /// /// Always false /// public override bool CanTimeout => false; /// /// Whether the stream can currently write /// public override bool CanWrite => !Disposed; /// /// Returns a single buffer containing the contents of the stream. /// The buffer may be longer than the stream length. /// /// A byte[] buffer /// /// IMPORTANT: Doing a Write() after calling GetBuffer() invalidates the buffer. The old buffer is held onto /// until Dispose is called, but the next time GetBuffer() is called, a new buffer from the pool will be required. /// /// Object has been disposed #if NETSTANDARD1_4 public byte[] GetBuffer() #else public override byte[] GetBuffer() #endif { CheckDisposed(); if (_largeBuffer != null) return _largeBuffer; if (_blocks.Count == 1) return _blocks[0]; // Buffer needs to reflect the capacity, not the length, because // it's possible that people will manipulate the buffer directly // and set the length afterward. Capacity sets the expectation // for the size of the buffer. var newBuffer = _memoryManager.GetLargeBuffer(Capacity, _tag); // InternalRead will check for existence of largeBuffer, so make sure we // don't set it until after we've copied the data. InternalRead(newBuffer, 0, _length, 0); _largeBuffer = newBuffer; if (_blocks.Count > 0 && _memoryManager.AggressiveBufferReturn) { _memoryManager.ReturnBlocks(_blocks, _tag); _blocks.Clear(); } return _largeBuffer; } /// /// Returns an ArraySegment that wraps a single buffer containing the contents of the stream. /// /// An ArraySegment containing a reference to the underlying bytes. /// Always returns true. /// /// GetBuffer has no failure modes (it always returns something, even if it's an empty buffer), therefore this method /// always returns a valid ArraySegment to the same buffer returned by GetBuffer. /// #if NET40 || NET45 public bool TryGetBuffer(out ArraySegment buffer) #else public override bool TryGetBuffer(out ArraySegment buffer) #endif { CheckDisposed(); buffer = new ArraySegment(GetBuffer(), 0, (int)Length); // GetBuffer has no failure modes, so this should always succeed return true; } /// /// Returns a new array with a copy of the buffer's contents. You should almost certainly be using GetBuffer combined with /// the Length to /// access the bytes in this stream. Calling ToArray will destroy the benefits of pooled buffers, but it is included /// for the sake of completeness. /// /// Object has been disposed #pragma warning disable CS0809 [Obsolete("This method has degraded performance vs. GetBuffer and should be avoided.")] public override byte[] ToArray() { CheckDisposed(); var newBuffer = new byte[Length]; InternalRead(newBuffer, 0, _length, 0); var stack = _memoryManager.GenerateCallStacks ? Environment.StackTrace : null; RecyclableMemoryStreamManager.EventsWriter.MemoryStreamToArray(_id, _tag, stack, 0); _memoryManager.ReportStreamToArray(); return newBuffer; } #pragma warning restore CS0809 /// /// Reads from the current position into the provided buffer /// /// Destination buffer /// Offset into buffer at which to start placing the read bytes. /// Number of bytes to read. /// The number of bytes read /// buffer is null /// offset or count is less than 0 /// offset subtracted from the buffer length is less than count /// Object has been disposed public override int Read(byte[] buffer, int offset, int count) => SafeRead(buffer, offset, count, ref _position); /// /// Reads from the specified position into the provided buffer /// /// Destination buffer /// Offset into buffer at which to start placing the read bytes. /// Number of bytes to read. /// Position in the stream to start reading from /// The number of bytes read /// buffer is null /// offset or count is less than 0 /// offset subtracted from the buffer length is less than count /// Object has been disposed public int SafeRead(byte[] buffer, int offset, int count, ref int streamPosition) { CheckDisposed(); if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset), "offset cannot be negative"); if (count < 0) throw new ArgumentOutOfRangeException(nameof(count), "count cannot be negative"); if (offset + count > buffer.Length) throw new ArgumentException("buffer length must be at least offset + count"); var amountRead = InternalRead(buffer, offset, count, streamPosition); streamPosition += amountRead; return amountRead; } #if NETCOREAPP2_1 || NETSTANDARD2_1 /// /// Reads from the current position into the provided buffer /// /// Destination buffer /// The number of bytes read /// Object has been disposed public override int Read(Span buffer) => SafeRead(buffer, ref _position); /// /// Reads from the specified position into the provided buffer /// /// Destination buffer /// Position in the stream to start reading from /// The number of bytes read /// Object has been disposed public int SafeRead(Span buffer, ref int streamPosition) { CheckDisposed(); var amountRead = InternalRead(buffer, streamPosition); streamPosition += amountRead; return amountRead; } #endif /// /// Writes the buffer to the stream /// /// Source buffer /// Start position /// Number of bytes to write /// buffer is null /// offset or count is negative /// buffer.Length - offset is not less than count /// Object has been disposed public override void Write(byte[] buffer, int offset, int count) { CheckDisposed(); if (buffer == null) throw new ArgumentNullException(nameof(buffer)); if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset), offset, "Offset must be in the range of 0 - buffer.Length-1"); if (count < 0) throw new ArgumentOutOfRangeException(nameof(count), count, "count must be non-negative"); if (count + offset > buffer.Length) throw new ArgumentException("count must be greater than buffer.Length - offset"); var blockSize = _memoryManager.BlockSize; var end = (long)_position + count; // Check for overflow if (end > MaxStreamLength) throw new IOException("Maximum capacity exceeded"); EnsureCapacity((int)end); if (_largeBuffer == null) { var bytesRemaining = count; var bytesWritten = 0; var blockAndOffset = GetBlockAndRelativeOffset(_position); while (bytesRemaining > 0) { var currentBlock = _blocks[blockAndOffset.Block]; var remainingInBlock = blockSize - blockAndOffset.Offset; var amountToWriteInBlock = Math.Min(remainingInBlock, bytesRemaining); Buffer.BlockCopy(buffer, offset + bytesWritten, currentBlock, blockAndOffset.Offset, amountToWriteInBlock); bytesRemaining -= amountToWriteInBlock; bytesWritten += amountToWriteInBlock; ++blockAndOffset.Block; blockAndOffset.Offset = 0; } } else Buffer.BlockCopy(buffer, offset, _largeBuffer, _position, count); _position = (int)end; _length = Math.Max(_position, _length); } #if NETCOREAPP2_1 || NETSTANDARD2_1 /// /// Writes the buffer to the stream /// /// Source buffer /// buffer is null /// Object has been disposed public override void Write(ReadOnlySpan source) { CheckDisposed(); var blockSize = _memoryManager.BlockSize; var end = (long)_position + source.Length; // Check for overflow if (end > MaxStreamLength) throw new IOException("Maximum capacity exceeded"); EnsureCapacity((int)end); if (_largeBuffer == null) { var blockAndOffset = GetBlockAndRelativeOffset(_position); while (source.Length > 0) { var currentBlock = _blocks[blockAndOffset.Block]; var remainingInBlock = blockSize - blockAndOffset.Offset; var amountToWriteInBlock = Math.Min(remainingInBlock, source.Length); source.Slice(0, amountToWriteInBlock) .CopyTo(currentBlock.AsSpan(blockAndOffset.Offset)); source = source.Slice(amountToWriteInBlock); ++blockAndOffset.Block; blockAndOffset.Offset = 0; } } else source.CopyTo(_largeBuffer.AsSpan(_position)); _position = (int)end; _length = Math.Max(_position, _length); } #endif /// /// Returns a useful string for debugging. This should not normally be called in actual production code. /// public override string ToString() => $"Id = {Id}, Tag = {Tag}, Length = {Length:N0} bytes"; /// /// Writes a single byte to the current position in the stream. /// /// byte value to write /// Object has been disposed public override void WriteByte(byte value) { CheckDisposed(); _byteBuffer[0] = value; Write(_byteBuffer, 0, 1); } /// /// Reads a single byte from the current position in the stream. /// /// The byte at the current position, or -1 if the position is at the end of the stream. /// Object has been disposed public override int ReadByte() => SafeReadByte(ref _position); /// /// Reads a single byte from the specified position in the stream. /// /// The position in the stream to read from /// The byte at the current position, or -1 if the position is at the end of the stream. /// Object has been disposed public int SafeReadByte(ref int streamPosition) { CheckDisposed(); if (streamPosition == _length) return -1; byte value; if (_largeBuffer == null) { var blockAndOffset = GetBlockAndRelativeOffset(streamPosition); value = _blocks[blockAndOffset.Block][blockAndOffset.Offset]; } else value = _largeBuffer[streamPosition]; streamPosition++; return value; } /// /// Sets the length of the stream /// /// value is negative or larger than MaxStreamLength /// Object has been disposed public override void SetLength(long value) { CheckDisposed(); if (value < 0 || value > MaxStreamLength) throw new ArgumentOutOfRangeException(nameof(value), "value must be non-negative and at most " + MaxStreamLength); EnsureCapacity((int)value); _length = (int)value; if (_position > value) _position = (int)value; } /// /// Sets the position to the offset from the seek location /// /// How many bytes to move /// From where /// The new position /// Object has been disposed /// offset is larger than MaxStreamLength /// Invalid seek origin /// Attempt to set negative position public override long Seek(long offset, SeekOrigin loc) { CheckDisposed(); if (offset > MaxStreamLength) throw new ArgumentOutOfRangeException(nameof(offset), "offset cannot be larger than " + MaxStreamLength); int newPosition; switch (loc) { case SeekOrigin.Begin: newPosition = (int)offset; break; case SeekOrigin.Current: newPosition = (int)offset + _position; break; case SeekOrigin.End: newPosition = (int)offset + _length; break; default: throw new ArgumentException("Invalid seek origin", nameof(loc)); } if (newPosition < 0) throw new IOException("Seek before beginning"); _position = newPosition; return _position; } /// /// Synchronously writes this stream's bytes to the parameter stream. /// /// Destination stream /// Important: This does a synchronous write, which may not be desired in some situations public override void WriteTo(Stream stream) { CheckDisposed(); if (stream == null) throw new ArgumentNullException(nameof(stream)); if (_largeBuffer == null) { var currentBlock = 0; var bytesRemaining = _length; while (bytesRemaining > 0) { var amountToCopy = Math.Min(_blocks[currentBlock].Length, bytesRemaining); stream.Write(_blocks[currentBlock], 0, amountToCopy); bytesRemaining -= amountToCopy; ++currentBlock; } } else stream.Write(_largeBuffer, 0, _length); } #endregion #region Helper Methods private bool Disposed => Interlocked.Read(ref _disposedState) != 0; private void CheckDisposed() { if (Disposed) throw new ObjectDisposedException($"The stream with Id {_id} and Tag {_tag} is disposed."); } private int InternalRead(byte[] buffer, int offset, int count, int fromPosition) { if (_length - fromPosition <= 0) return 0; int amountToCopy; if (_largeBuffer == null) { var blockAndOffset = GetBlockAndRelativeOffset(fromPosition); var bytesWritten = 0; var bytesRemaining = Math.Min(count, _length - fromPosition); while (bytesRemaining > 0) { amountToCopy = Math.Min(_blocks[blockAndOffset.Block].Length - blockAndOffset.Offset, bytesRemaining); Buffer.BlockCopy(_blocks[blockAndOffset.Block], blockAndOffset.Offset, buffer, bytesWritten + offset, amountToCopy); bytesWritten += amountToCopy; bytesRemaining -= amountToCopy; ++blockAndOffset.Block; blockAndOffset.Offset = 0; } return bytesWritten; } amountToCopy = Math.Min(count, _length - fromPosition); Buffer.BlockCopy(_largeBuffer, fromPosition, buffer, offset, amountToCopy); return amountToCopy; } #if NETCOREAPP2_1 || NETSTANDARD2_1 private int InternalRead(Span buffer, int fromPosition) { if (_length - fromPosition <= 0) return 0; int amountToCopy; if (_largeBuffer == null) { var blockAndOffset = GetBlockAndRelativeOffset(fromPosition); var bytesWritten = 0; var bytesRemaining = Math.Min(buffer.Length, _length - fromPosition); while (bytesRemaining > 0) { amountToCopy = Math.Min(_blocks[blockAndOffset.Block].Length - blockAndOffset.Offset, bytesRemaining); _blocks[blockAndOffset.Block].AsSpan(blockAndOffset.Offset, amountToCopy) .CopyTo(buffer.Slice(bytesWritten)); bytesWritten += amountToCopy; bytesRemaining -= amountToCopy; ++blockAndOffset.Block; blockAndOffset.Offset = 0; } return bytesWritten; } amountToCopy = Math.Min(buffer.Length, _length - fromPosition); _largeBuffer.AsSpan(fromPosition, amountToCopy).CopyTo(buffer); return amountToCopy; } #endif private struct BlockAndOffset { public int Block; public int Offset; public BlockAndOffset(int block, int offset) { Block = block; Offset = offset; } } private BlockAndOffset GetBlockAndRelativeOffset(int offset) { var blockSize = _memoryManager.BlockSize; return new BlockAndOffset(offset / blockSize, offset % blockSize); } private void EnsureCapacity(int newCapacity) { if (newCapacity > _memoryManager.MaximumStreamCapacity && _memoryManager.MaximumStreamCapacity > 0) { RecyclableMemoryStreamManager.EventsWriter.MemoryStreamOverCapacity(newCapacity, _memoryManager .MaximumStreamCapacity, _tag, AllocationStack); throw new InvalidOperationException("Requested capacity is too large: " + newCapacity + ". Limit is " + _memoryManager.MaximumStreamCapacity); } if (_largeBuffer != null) { if (newCapacity > _largeBuffer.Length) { var newBuffer = _memoryManager.GetLargeBuffer(newCapacity, _tag); InternalRead(newBuffer, 0, _length, 0); ReleaseLargeBuffer(); _largeBuffer = newBuffer; } } else while (Capacity < newCapacity) _blocks.Add(_memoryManager.GetBlock()); } /// /// Release the large buffer (either stores it for eventual release or returns it immediately). /// private void ReleaseLargeBuffer() { if (_memoryManager.AggressiveBufferReturn) _memoryManager.ReturnLargeBuffer(_largeBuffer, _tag); else { if (_dirtyBuffers == null) // We most likely will only ever need space for one _dirtyBuffers = new List(1); _dirtyBuffers.Add(_largeBuffer); } _largeBuffer = null; } #endregion } }