From fff4ff0913a2396cfae61c701f2b59ffe5e27b49 Mon Sep 17 00:00:00 2001 From: Radek Zikmund <32671551+rzikm@users.noreply.github.com> Date: Tue, 24 May 2022 15:50:23 +0200 Subject: [PATCH] Merge several version of MsQuicStream SendAsync code (#68772) * Merge several version of MsQuicStream SendAsync code * Use shared MsQuicBuffers struct * Minor changes * Fixes after rebase * Code review feedback * Use static lambdas instead of adapter structs * Minor change * fixup! Minor change --- .../MsQuic/Internal/MsQuicBuffers.cs | 84 +++++- .../Implementations/MsQuic/MsQuicStream.cs | 264 ++++-------------- 2 files changed, 124 insertions(+), 224 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs index e98e9777058fc..cc0c9c177739c 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs @@ -39,8 +39,29 @@ private void FreeNativeMemory() NativeMemory.Free(buffers); } + private void Reserve(int count) + { + if (_handles.Length < count) + { + _handles = new MemoryHandle[count]; + FreeNativeMemory(); + _buffers = (QUIC_BUFFER*)NativeMemory.Alloc((nuint)count, (nuint)sizeof(QUIC_BUFFER)); + } + + _count = count; + } + + private void SetBuffer(int index, ReadOnlyMemory buffer) + { + MemoryHandle handle = buffer.Pin(); + + _handles[index] = handle; + _buffers[index].Buffer = (byte*)handle.Pointer; + _buffers[index].Length = (uint)buffer.Length; + } + /// - /// The method initializes QUIC_BUFFER* with data from inputs, converted via toBuffer. + /// Initializes QUIC_BUFFER* with data from inputs, converted via toBuffer. /// Note that the struct either needs to be freshly created via new or previously cleaned up with Reset. /// /// Inputs to get their byte array, pin them and pepare them to be passed to MsQuic as QUIC_BUFFER*. @@ -48,26 +69,61 @@ private void FreeNativeMemory() /// The type of the inputs. public void Initialize(IList inputs, Func> toBuffer) { - if (_handles.Length < inputs.Count) + Reserve(inputs.Count); + + for (int i = 0; i < inputs.Count; ++i) + { + ReadOnlyMemory buffer = toBuffer(inputs[i]); + SetBuffer(i, buffer); + } + } + + /// + /// Initializes QUIC_BUFFER* with the provided buffer. + /// Note that the struct either needs to be freshly created via new or previously cleaned up with Reset. + /// + /// Buffer to be passed to MsQuic as QUIC_BUFFER*. + public void Initialize(ReadOnlyMemory buffer) + { + Reserve(1); + SetBuffer(0, buffer); + } + + /// + /// Initializes QUIC_BUFFER* with the provided buffers. + /// Note that the struct either needs to be freshly created via new or previously cleaned up with Reset. + /// + /// Buffers to be passed to MsQuic as QUIC_BUFFER*. + public void Initialize(ReadOnlySequence buffers) + { + int count = 0; + foreach (ReadOnlyMemory _ in buffers) { - _handles = new MemoryHandle[inputs.Count]; + ++count; } - if (_count < inputs.Count) + + Reserve(count); + int i = 0; + foreach (ReadOnlyMemory buffer in buffers) { - FreeNativeMemory(); - _buffers = (QUIC_BUFFER*)NativeMemory.Alloc((nuint)sizeof(QUIC_BUFFER), (nuint)inputs.Count); + SetBuffer(i++, buffer); } + } - _count = inputs.Count; + /// + /// Initializes QUIC_BUFFER* with the provided buffers. + /// Note that the struct either needs to be freshly created via new or previously cleaned up with Reset. + /// + /// Buffers to be passed to MsQuic as QUIC_BUFFER*. + public void Initialize(ReadOnlyMemory> buffers) + { + int count = buffers.Length; + Reserve(count); - for (int i = 0; i < inputs.Count; ++i) + ReadOnlySpan> span = buffers.Span; + for (int i = 0; i < span.Length; i++) { - ReadOnlyMemory buffer = toBuffer(inputs[i]); - MemoryHandle handle = buffer.Pin(); - - _handles[i] = handle; - _buffers[i].Buffer = (byte*)handle.Pointer; - _buffers[i].Length = (uint)buffer.Length; + SetBuffer(i, span[i]); } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 622df81e6aa98..d7367cb4b7509 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -57,11 +57,7 @@ private sealed class State public SendState SendState; public long SendErrorCode = -1; - // Buffers to hold during a call to send. - public MemoryHandle[] BufferArrays = new MemoryHandle[1]; - public IntPtr SendQuicBuffers; - public int SendBufferMaxCount; - public int SendBufferCount; + public MsQuicBuffers SendBuffers; // Resettable completions to be used for multiple calls to send. public readonly ResettableCompletionSource SendResettableCompletionSource = new ResettableCompletionSource(); @@ -84,6 +80,11 @@ private sealed class State // Set once stream have been shutdown. public readonly TaskCompletionSource ShutdownCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + public State() + { + SendBuffers = new MsQuicBuffers(); + } + public void Cleanup() { if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"{Handle} releasing handles."); @@ -91,8 +92,7 @@ public void Cleanup() ShutdownState = ShutdownState.Finished; CleanupSendState(this); Handle?.Dispose(); - Marshal.FreeHGlobal(SendQuicBuffers); - SendQuicBuffers = IntPtr.Zero; + SendBuffers.Dispose(); if (StateGCHandle.IsAllocated) StateGCHandle.Free(); ConnectionState?.RemoveStream(null); } @@ -272,15 +272,9 @@ internal override ValueTask WriteAsync(ReadOnlySequence buffers, Cancellat return WriteAsync(buffers, endStream: false, cancellationToken); } - internal override async ValueTask WriteAsync(ReadOnlySequence buffers, bool endStream, CancellationToken cancellationToken = default) + internal override ValueTask WriteAsync(ReadOnlySequence buffers, bool endStream, CancellationToken cancellationToken = default) { - ThrowIfDisposed(); - - using CancellationTokenRegistration registration = SetupWriteStartState(buffers.IsEmpty, cancellationToken); - - await SendReadOnlySequenceAsync(buffers, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false); - - CleanupWriteCompletedState(); + return WriteAsync(static (state, buffers) => state.SendBuffers.Initialize(buffers), buffers, buffers.IsEmpty, endStream, cancellationToken); } internal override ValueTask WriteAsync(ReadOnlyMemory> buffers, CancellationToken cancellationToken = default) @@ -288,26 +282,62 @@ internal override ValueTask WriteAsync(ReadOnlyMemory> buff return WriteAsync(buffers, endStream: false, cancellationToken); } - internal override async ValueTask WriteAsync(ReadOnlyMemory> buffers, bool endStream, CancellationToken cancellationToken = default) + internal override ValueTask WriteAsync(ReadOnlyMemory> buffers, bool endStream, CancellationToken cancellationToken = default) + { + return WriteAsync(static (state, buffers) => state.SendBuffers.Initialize(buffers), buffers, buffers.IsEmpty, endStream, cancellationToken); + } + + internal override ValueTask WriteAsync(ReadOnlyMemory buffer, bool endStream, CancellationToken cancellationToken = default) + { + return WriteAsync(static (state, buffer) => state.SendBuffers.Initialize(buffer), buffer, buffer.IsEmpty, endStream, cancellationToken); + } + + private async ValueTask WriteAsync(Action stateSetup, TBuffer buffer, bool isEmpty, bool endStream, CancellationToken cancellationToken) { ThrowIfDisposed(); - using CancellationTokenRegistration registration = SetupWriteStartState(buffers.IsEmpty, cancellationToken); + using CancellationTokenRegistration registration = SetupWriteStartState(isEmpty, cancellationToken); - await SendReadOnlyMemoryListAsync(buffers, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false); + await WriteAsyncCore(stateSetup, buffer, isEmpty, endStream).ConfigureAwait(false); CleanupWriteCompletedState(); } - internal override async ValueTask WriteAsync(ReadOnlyMemory buffer, bool endStream, CancellationToken cancellationToken = default) + private unsafe ValueTask WriteAsyncCore(Action stateSetup, TBuffer buffer, bool isEmpty, bool endStream) { - ThrowIfDisposed(); + if (isEmpty) + { + if (endStream) + { + // Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer. + StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0); + } + return default; + } - using CancellationTokenRegistration registration = SetupWriteStartState(buffer.IsEmpty, cancellationToken); + stateSetup(_state, buffer); - await SendReadOnlyMemoryAsync(buffer, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false); + Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); + int status = MsQuicApi.Api.ApiTable->StreamSend( + _state.Handle.QuicHandle, + _state.SendBuffers.Buffers, + (uint)_state.SendBuffers.Count, + endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, + (void*)IntPtr.Zero); - CleanupWriteCompletedState(); + if (StatusFailed(status)) + { + CleanupWriteFailedState(); + CleanupSendState(_state); + + if (status == QUIC_STATUS_ABORTED) + { + throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode); + } + ThrowIfFailure(status, "Could not send data to peer."); + } + + return _state.SendResettableCompletionSource.GetTypelessValueTask(); } private CancellationTokenRegistration SetupWriteStartState(bool emptyBuffer, CancellationToken cancellationToken) @@ -1389,194 +1419,8 @@ private static void CleanupSendState(State state) lock (state) { Debug.Assert(state.SendState != SendState.Pending); - Debug.Assert(state.SendBufferCount <= state.BufferArrays.Length); - - for (int i = 0; i < state.SendBufferCount; i++) - { - state.BufferArrays[i].Dispose(); - } - } - } - - // TODO prevent overlapping sends or consider supporting it. - private unsafe ValueTask SendReadOnlyMemoryAsync( - ReadOnlyMemory buffer, - QUIC_SEND_FLAGS flags) - { - if (buffer.IsEmpty) - { - if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN) - { - // Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer. - StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0); - } - return default; - } - - MemoryHandle handle = buffer.Pin(); - if (_state.SendQuicBuffers == IntPtr.Zero) - { - _state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(QUIC_BUFFER)); - _state.SendBufferMaxCount = 1; - } - - QUIC_BUFFER* quicBuffers = (QUIC_BUFFER*)_state.SendQuicBuffers; - quicBuffers->Length = (uint)buffer.Length; - quicBuffers->Buffer = (byte*)handle.Pointer; - - _state.BufferArrays[0] = handle; - _state.SendBufferCount = 1; - - Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); - int status = MsQuicApi.Api.ApiTable->StreamSend( - _state.Handle.QuicHandle, - quicBuffers, - 1, - flags, - (void*)IntPtr.Zero); - - if (!StatusSucceeded(status)) - { - CleanupWriteFailedState(); - CleanupSendState(_state); - - if (status == QUIC_STATUS_ABORTED) - { - throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode); - } - ThrowIfFailure(status, "Could not send data to peer"); - } - - return _state.SendResettableCompletionSource.GetTypelessValueTask(); - } - - private unsafe ValueTask SendReadOnlySequenceAsync( - ReadOnlySequence buffers, - QUIC_SEND_FLAGS flags) - { - if (buffers.IsEmpty) - { - if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN) - { - // Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer. - StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0); - } - return default; - } - - int count = 0; - - foreach (ReadOnlyMemory buffer in buffers) - { - ++count; - } - - if (_state.SendBufferMaxCount < count) - { - Marshal.FreeHGlobal(_state.SendQuicBuffers); - _state.SendQuicBuffers = IntPtr.Zero; - _state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(QUIC_BUFFER) * count); - _state.SendBufferMaxCount = count; - _state.BufferArrays = new MemoryHandle[count]; + state.SendBuffers.Reset(); } - - _state.SendBufferCount = count; - count = 0; - - QUIC_BUFFER* quicBuffers = (QUIC_BUFFER*)_state.SendQuicBuffers; - foreach (ReadOnlyMemory buffer in buffers) - { - MemoryHandle handle = buffer.Pin(); - quicBuffers[count].Length = (uint)buffer.Length; - quicBuffers[count].Buffer = (byte*)handle.Pointer; - _state.BufferArrays[count] = handle; - ++count; - } - - Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); - int status = MsQuicApi.Api.ApiTable->StreamSend( - _state.Handle.QuicHandle, - quicBuffers, - (uint)count, - flags, - (void*)IntPtr.Zero); - - if (!StatusSucceeded(status)) - { - CleanupWriteFailedState(); - CleanupSendState(_state); - - if (status == QUIC_STATUS_ABORTED) - { - throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode); - } - ThrowIfFailure(status, "Could not send data to peer"); - } - - return _state.SendResettableCompletionSource.GetTypelessValueTask(); - } - - private unsafe ValueTask SendReadOnlyMemoryListAsync( - ReadOnlyMemory> buffers, - QUIC_SEND_FLAGS flags) - { - if (buffers.IsEmpty) - { - if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN) - { - // Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer. - StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0); - } - return default; - } - - ReadOnlyMemory[] array = buffers.ToArray(); - - uint length = (uint)array.Length; - - if (_state.SendBufferMaxCount < array.Length) - { - Marshal.FreeHGlobal(_state.SendQuicBuffers); - _state.SendQuicBuffers = IntPtr.Zero; - _state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(QUIC_BUFFER) * array.Length); - _state.SendBufferMaxCount = array.Length; - _state.BufferArrays = new MemoryHandle[array.Length]; - } - - _state.SendBufferCount = array.Length; - QUIC_BUFFER* quicBuffers = (QUIC_BUFFER*)_state.SendQuicBuffers; - for (int i = 0; i < length; i++) - { - ReadOnlyMemory buffer = array[i]; - MemoryHandle handle = buffer.Pin(); - - quicBuffers[i].Length = (uint)buffer.Length; - quicBuffers[i].Buffer = (byte*)handle.Pointer; - - _state.BufferArrays[i] = handle; - } - - Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); - int status = MsQuicApi.Api.ApiTable->StreamSend( - _state.Handle.QuicHandle, - quicBuffers, - length, - flags, - (void*)IntPtr.Zero); - - if (!StatusSucceeded(status)) - { - CleanupWriteFailedState(); - CleanupSendState(_state); - - if (status == QUIC_STATUS_ABORTED) - { - throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode); - } - ThrowIfFailure(status, "Could not send data to peer"); - } - - return _state.SendResettableCompletionSource.GetTypelessValueTask(); } private unsafe void ReceiveComplete(int bufferLength)