diff --git a/src/libraries/Common/src/System/Threading/Tasks/TaskCompletionSourceWithCancellation.cs b/src/libraries/Common/src/System/Threading/Tasks/TaskCompletionSourceWithCancellation.cs index 5fd1509e7bd89..8964fdba9a720 100644 --- a/src/libraries/Common/src/System/Threading/Tasks/TaskCompletionSourceWithCancellation.cs +++ b/src/libraries/Common/src/System/Threading/Tasks/TaskCompletionSourceWithCancellation.cs @@ -21,5 +21,20 @@ public async ValueTask WaitWithCancellationAsync(CancellationToken cancellati return await Task.ConfigureAwait(false); } } + + public T WaitWithCancellation(CancellationToken cancellationToken) + { + using (cancellationToken.UnsafeRegister(static (s, cancellationToken) => ((TaskCompletionSourceWithCancellation)s!).TrySetCanceled(cancellationToken), this)) + { + return Task.GetAwaiter().GetResult(); + } + } + + public ValueTask WaitWithCancellationAsync(bool async, CancellationToken cancellationToken) + { + return async ? + WaitWithCancellationAsync(cancellationToken) : + new ValueTask(WaitWithCancellation(cancellationToken)); + } } } diff --git a/src/libraries/Common/tests/System/Diagnostics/Tracing/ConsoleEventListener.cs b/src/libraries/Common/tests/System/Diagnostics/Tracing/ConsoleEventListener.cs index 457b8e1660b2b..98b51d70539f2 100644 --- a/src/libraries/Common/tests/System/Diagnostics/Tracing/ConsoleEventListener.cs +++ b/src/libraries/Common/tests/System/Diagnostics/Tracing/ConsoleEventListener.cs @@ -32,7 +32,7 @@ protected override void OnEventWritten(EventWrittenEventArgs eventData) { lock (Console.Out) { - string text = $"[{eventData.EventSource.Name}-{eventData.EventId}]{(eventData.Payload != null ? $" ({string.Join(", ", eventData.Payload)})." : "")}"; + string text = $"[{eventData.EventSource.Name}-{eventData.EventName}]{(eventData.Payload != null ? $" ({string.Join(", ", eventData.Payload)})." : "")}"; if (_eventFilter != null && text.Contains(_eventFilter)) { ConsoleColor origForeground = Console.ForegroundColor; diff --git a/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.Cookies.cs b/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.Cookies.cs index dbb1382c9aaf5..09b58739d5b5d 100644 --- a/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.Cookies.cs +++ b/src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.Cookies.cs @@ -315,7 +315,7 @@ await LoopbackServerFactory.CreateClientAndServerAsync(async url => using (HttpClient client = CreateHttpClient(handler)) { client.DefaultRequestHeaders.ConnectionClose = true; // to avoid issues with connection pooling - await client.GetAsync(url1); + await client.GetAsync(url1); } }, async server => diff --git a/src/libraries/System.Net.Http/src/Resources/Strings.resx b/src/libraries/System.Net.Http/src/Resources/Strings.resx index 03081881762ef..501ca3c8f9740 100644 --- a/src/libraries/System.Net.Http/src/Resources/Strings.resx +++ b/src/libraries/System.Net.Http/src/Resources/Strings.resx @@ -540,6 +540,9 @@ The request was canceled due to the configured HttpClient.Timeout of {0} seconds elapsing. + + A connection could not be established within the configured ConnectTimeout. + Connection aborted by peer ({0}). diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/AuthenticationHelper.NtAuth.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/AuthenticationHelper.NtAuth.cs index e9468042f64e1..8ee9130938172 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/AuthenticationHelper.NtAuth.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/AuthenticationHelper.NtAuth.cs @@ -63,8 +63,12 @@ private static async Task SendWithNtAuthAsync(HttpRequestMe if (response.Headers.ConnectionClose.GetValueOrDefault()) { // Server is closing the connection and asking us to authenticate on a new connection. + + // First, detach the current connection from the pool. This means it will no longer count against the connection limit. + // Instead, it will be replaced by the new connection below. + connection.DetachFromPool(); + connection = await connectionPool.CreateHttp11ConnectionAsync(request, async, cancellationToken).ConfigureAwait(false); - connectionPool.IncrementConnectionCount(); connection!.Acquire(); isNewConnection = true; needDrain = false; diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs index af13208467364..26058a454a9e1 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http2Connection.cs @@ -1,7 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System; using System.Buffers.Binary; using System.Collections.Generic; using System.Diagnostics; @@ -15,11 +14,10 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; -using Microsoft.Extensions.Internal; namespace System.Net.Http { - internal sealed partial class Http2Connection : HttpConnectionBase, IDisposable + internal sealed partial class Http2Connection : HttpConnectionBase { private readonly HttpConnectionPool _pool; private readonly Stream _stream; @@ -37,36 +35,41 @@ internal sealed partial class Http2Connection : HttpConnectionBase, IDisposable private readonly Dictionary _httpStreams; private readonly CreditManager _connectionWindow; - private readonly CreditManager _concurrentStreams; private RttEstimator _rttEstimator; private int _nextStream; private bool _expectingSettingsAck; private int _initialServerStreamWindowSize; - private int _maxConcurrentStreams; private int _pendingWindowUpdate; private long _idleSinceTickCount; + private uint _maxConcurrentStreams; + private uint _streamsInUse; + private TaskCompletionSource? _availableStreamsWaiter; + private readonly Channel _writeChannel; private bool _lastPendingWriterShouldFlush; - // This means that the pool has disposed us, but there may still be - // requests in flight that will continue to be processed. - private bool _disposed; + // This flag indicates that the connection is shutting down and cannot accept new requests, because of one of the following conditions: + // (1) We received a GOAWAY frame from the server + // (2) We have exhaustead StreamIds (i.e. _nextStream == MaxStreamId) + // (3) A connection-level error occurred, in which case _abortException below is set. + private bool _shutdown; + private TaskCompletionSource? _shutdownWaiter; - // This will be set when: - // (1) We receive GOAWAY -- will be set to the value sent in the GOAWAY frame - // (2) A connection IO error occurs -- will be set to int.MaxValue - // (meaning we must assume all streams have been processed by the server) - private int _lastStreamId = -1; + // If this is set, the connection is aborting due to an IO failure (IOException) or a protocol violation (Http2ProtocolException). + // _shutdown above is true, and requests in flight have been (or are being) failed. + private Exception? _abortException; + + // This means that the user (i.e. the connection pool) has disposed us and will not submit further requests. + // Requests currently in flight will continue to be processed. + // When all requests have completed, the connection will be torn down. + private bool _disposed; private const int TelemetryStatus_Opened = 1; private const int TelemetryStatus_Closed = 2; private int _markedByTelemetryStatus; - // This will be set when a connection IO error occurs - private Exception? _abortException; - private const int MaxStreamId = int.MaxValue; // Temporary workaround for request burst handling on connection start. @@ -125,6 +128,7 @@ public Http2Connection(HttpConnectionPool pool, Stream stream) { _pool = pool; _stream = stream; + _incomingBuffer = new ArrayBuffer(InitialConnectionBufferSize); _outgoingBuffer = new ArrayBuffer(InitialConnectionBufferSize); @@ -133,7 +137,6 @@ public Http2Connection(HttpConnectionPool pool, Stream stream) _httpStreams = new Dictionary(); _connectionWindow = new CreditManager(this, nameof(_connectionWindow), DefaultInitialWindowSize); - _concurrentStreams = new CreditManager(this, nameof(_concurrentStreams), InitialMaxConcurrentStreams); _rttEstimator = RttEstimator.Create(); @@ -143,10 +146,11 @@ public Http2Connection(HttpConnectionPool pool, Stream stream) _initialServerStreamWindowSize = DefaultInitialWindowSize; _maxConcurrentStreams = InitialMaxConcurrentStreams; + _streamsInUse = 0; + _pendingWindowUpdate = 0; _idleSinceTickCount = Environment.TickCount64; - _keepAlivePingDelay = TimeSpanToMs(_pool.Settings._keepAlivePingDelay); _keepAlivePingTimeout = TimeSpanToMs(_pool.Settings._keepAlivePingTimeout); _nextPingRequestTimestamp = Environment.TickCount64 + _keepAlivePingDelay; @@ -170,8 +174,6 @@ static long TimeSpanToMs(TimeSpan value) { private object SyncObject => _httpStreams; - public bool CanAddNewStream => _concurrentStreams.IsCreditAvailable; - public async ValueTask SetupAsync() { try @@ -222,6 +224,158 @@ public async ValueTask SetupAsync() _ = ProcessOutgoingFramesAsync(); } + // This will complete when the connection begins to shut down and cannot be used anymore, or if it is disposed. + public ValueTask WaitForShutdownAsync() + { + lock (SyncObject) + { + if (_disposed) + { + Debug.Fail("As currently used, we don't expect to call this after disposing and we don't handle the ODE"); + throw new ObjectDisposedException(nameof(Http2Connection)); + } + + if (_shutdown) + { + Debug.Assert(_shutdownWaiter is null); + return default; + } + + _shutdownWaiter ??= new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + return new ValueTask(_shutdownWaiter.Task); + } + } + + private void Shutdown() + { + if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_shutdown)}={_shutdown}, {nameof(_abortException)}={_abortException}"); + + Debug.Assert(Monitor.IsEntered(SyncObject)); + + SignalAvailableStreamsWaiter(false); + SignalShutdownWaiter(); + + // Note _shutdown could already be set, but that's fine. + _shutdown = true; + } + + private void SignalShutdownWaiter() + { + if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_shutdownWaiter)}?={_shutdownWaiter is not null}"); + + Debug.Assert(Monitor.IsEntered(SyncObject)); + + if (_shutdownWaiter is not null) + { + Debug.Assert(!_disposed); + Debug.Assert(!_shutdown); + _shutdownWaiter.SetResult(); + _shutdownWaiter = null; + } + } + + public bool TryReserveStream() + { + lock (SyncObject) + { + if (_disposed) + { + Debug.Fail("As currently used, we don't expect to call this after disposing and we don't handle the ODE"); + throw new ObjectDisposedException(nameof(Http2Connection)); + } + + if (_shutdown) + { + return false; + } + + if (_streamsInUse < _maxConcurrentStreams) + { + _streamsInUse++; + return true; + } + } + + return false; + } + + // Can be called by the HttpConnectionPool after TryReserveStream if the stream doesn't end up being used. + // Otherwise, will be called when the request is complete and stream is closed. + public void ReleaseStream() + { + lock (SyncObject) + { + if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_streamsInUse)}={_streamsInUse}"); + + Debug.Assert(_availableStreamsWaiter is null || _streamsInUse >= _maxConcurrentStreams); + + _streamsInUse--; + + Debug.Assert(_streamsInUse >= _httpStreams.Count); + + if (_streamsInUse < _maxConcurrentStreams) + { + SignalAvailableStreamsWaiter(true); + } + + if (_streamsInUse == 0) + { + _idleSinceTickCount = Environment.TickCount64; + + if (_disposed) + { + FinalTeardown(); + } + } + } + } + + // Returns true to indicate at least one stream is available + // Returns false to indicate that the connection is shutting down and cannot be used anymore + public ValueTask WaitForAvailableStreamsAsync() + { + lock (SyncObject) + { + if (_disposed) + { + Debug.Fail("As currently used, we don't expect to call this after disposing and we don't handle the ODE"); + throw new ObjectDisposedException(nameof(Http2Connection)); + } + + Debug.Assert(_availableStreamsWaiter is null, "As used currently, shouldn't already have a waiter"); + + if (_shutdown) + { + return ValueTask.FromResult(false); + } + + if (_streamsInUse < _maxConcurrentStreams) + { + return ValueTask.FromResult(true); + } + + // Need to wait for streams to become available. + _availableStreamsWaiter = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + return new ValueTask(_availableStreamsWaiter.Task); + } + } + + private void SignalAvailableStreamsWaiter(bool result) + { + if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(result)}={result}, {nameof(_availableStreamsWaiter)}?={_availableStreamsWaiter is not null}"); + + Debug.Assert(Monitor.IsEntered(SyncObject)); + + if (_availableStreamsWaiter is not null) + { + Debug.Assert(!_disposed); + Debug.Assert(!_shutdown); + _availableStreamsWaiter.SetResult(result); + _availableStreamsWaiter = null; + } + } + private async Task FlushOutgoingBytesAsync() { if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_outgoingBuffer.ActiveLength)}={_outgoingBuffer.ActiveLength}"); @@ -305,7 +459,6 @@ void ThrowPrematureEOF(int requiredBytes) => void ThrowMissingFrame() => throw new IOException(SR.net_http_invalid_response_missing_frame); - } private async Task ProcessIncomingFramesAsync() @@ -695,16 +848,18 @@ private void ProcessSettingsFrame(FrameHeader frameHeader, bool initialFrame = f private void ChangeMaxConcurrentStreams(uint newValue) { - if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(newValue)}={newValue}"); + lock (SyncObject) + { + if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(newValue)}={newValue}, {nameof(_streamsInUse)}={_streamsInUse}, {nameof(_availableStreamsWaiter)}?={_availableStreamsWaiter is not null}"); - // The value is provided as a uint. - // Limit this to int.MaxValue since the CreditManager implementation only supports singed values. - // In practice, we should never reach this value. - int effectiveValue = newValue > (uint)int.MaxValue ? int.MaxValue : (int)newValue; - int delta = effectiveValue - _maxConcurrentStreams; - _maxConcurrentStreams = effectiveValue; + Debug.Assert(_availableStreamsWaiter is null || _streamsInUse >= _maxConcurrentStreams); - _concurrentStreams.AdjustCredit(delta); + _maxConcurrentStreams = newValue; + if (_streamsInUse < _maxConcurrentStreams) + { + SignalAvailableStreamsWaiter(true); + } + } } private void ChangeInitialWindowSize(int newSize) @@ -854,20 +1009,45 @@ private void ProcessGoAwayFrame(FrameHeader frameHeader) ThrowProtocolError(Http2ProtocolErrorCode.FrameSizeError); } - // GoAway frames always apply to the whole connection, never to a single stream. - // According to RFC 7540 section 6.8, this should be a connection error. if (frameHeader.StreamId != 0) { ThrowProtocolError(); } - int lastValidStream = (int)(BinaryPrimitives.ReadUInt32BigEndian(_incomingBuffer.ActiveSpan) & 0x7FFFFFFF); - var errorCode = (Http2ProtocolErrorCode)BinaryPrimitives.ReadInt32BigEndian(_incomingBuffer.ActiveSpan.Slice(sizeof(int))); - if (NetEventSource.Log.IsEnabled()) Trace(frameHeader.StreamId, $"{nameof(lastValidStream)}={lastValidStream}, {nameof(errorCode)}={errorCode}"); - - StartTerminatingConnection(lastValidStream, new Http2ConnectionException(errorCode)); + int lastStreamId = (int)(BinaryPrimitives.ReadUInt32BigEndian(_incomingBuffer.ActiveSpan) & 0x7FFFFFFF); + Http2ProtocolErrorCode errorCode = (Http2ProtocolErrorCode)BinaryPrimitives.ReadInt32BigEndian(_incomingBuffer.ActiveSpan.Slice(sizeof(int))); + if (NetEventSource.Log.IsEnabled()) Trace(frameHeader.StreamId, $"{nameof(lastStreamId)}={lastStreamId}, {nameof(errorCode)}={errorCode}"); _incomingBuffer.Discard(frameHeader.PayloadLength); + + Debug.Assert(lastStreamId >= 0); + Exception resetException = new Http2ConnectionException(errorCode); + + // There is no point sending more PING frames for RTT estimation: + _rttEstimator.OnGoAwayReceived(); + + List streamsToAbort = new List(); + lock (SyncObject) + { + Shutdown(); + + foreach (KeyValuePair kvp in _httpStreams) + { + int streamId = kvp.Key; + Debug.Assert(streamId == kvp.Value.StreamId); + + if (streamId > lastStreamId) + { + streamsToAbort.Add(kvp.Value); + } + } + } + + // Avoid calling OnReset under the lock, as it may cause the Http2Stream to call back in to RemoveStream + foreach (Http2Stream s in streamsToAbort) + { + s.OnReset(resetException, canRetry: true); + } } internal Task FlushAsync(CancellationToken cancellationToken) => @@ -924,8 +1104,16 @@ private Task PerformWriteAsync(int writeBytes, T state, Func, if (!_writeChannel.Writer.TryWrite(writeEntry)) { - Debug.Assert(_abortException is not null); - return Task.FromException(GetRequestAbortedException(_abortException)); + if (_abortException is not null) + { + return Task.FromException(GetRequestAbortedException(_abortException)); + } + + // We must be trying to send something asynchronously (like RST_STREAM or a PING or a SETTINGS ACK) and it has raced with the connection tear down. + // As such, it should not matter that we were not able to actually send the frame. + // But just in case, throw ObjectDisposedException. Asynchronous callers will ignore the failure. + Debug.Assert(_disposed && _streamsInUse == 0); + return Task.FromException(new ObjectDisposedException(nameof(Http2Connection))); } return writeEntry.Task; @@ -994,9 +1182,6 @@ private async Task ProcessOutgoingFramesAsync() await FlushOutgoingBytesAsync().ConfigureAwait(false); } } - - // Connection should be aborting at this point. - Debug.Assert(_abortException is not null); } catch (Exception e) { @@ -1297,83 +1482,44 @@ private void WriteHeaders(HttpRequestMessage request, ref ArrayBuffer headerBuff } } - [DoesNotReturn] - private void ThrowShutdownException() - { - Debug.Assert(Monitor.IsEntered(SyncObject)); - - if (_abortException != null) - { - // We had an IO failure on the connection. Don't retry in this case. - throw new HttpRequestException(SR.net_http_client_execution_error, _abortException); - } - - // Connection is being gracefully shutdown. Allow the request to be retried. - Exception innerException; - if (_lastStreamId != -1) - { - // We must have received a GOAWAY. - innerException = new IOException(SR.net_http_server_shutdown); - } - else - { - // We must either be disposed or out of stream IDs. - Debug.Assert(_disposed || _nextStream == MaxStreamId); - - innerException = new ObjectDisposedException(nameof(Http2Connection)); - } - - ThrowRetry(SR.net_http_client_execution_error, innerException); - } - - private async ValueTask SendHeadersAsync(HttpRequestMessage request, CancellationToken cancellationToken, bool mustFlush) + private void AddStream(Http2Stream http2Stream) { - // Enforce MAX_CONCURRENT_STREAMS setting value. We do this before anything else, e.g. renting buffers to serialize headers, - // in order to avoid consuming resources in potentially many requests waiting for access. - try + lock (SyncObject) { - if (!_concurrentStreams.TryRequestCreditNoWait(1)) + if (_nextStream == MaxStreamId) { - if (_pool.EnableMultipleHttp2Connections) - { - throw new HttpRequestException(null, null, RequestRetryType.RetryOnStreamLimitReached); - } + // We have exhausted StreamIds. Shut down the connection. + Shutdown(); + } - if (HttpTelemetry.Log.IsEnabled()) - { - // Only log Http20RequestLeftQueue if we spent time waiting on the queue - ValueStopwatch stopwatch = ValueStopwatch.StartNew(); - await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false); - HttpTelemetry.Log.Http20RequestLeftQueue(stopwatch.GetElapsedTime().TotalMilliseconds); - } - else - { - await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false); - } + if (_shutdown) + { + // The connection has shut down. Throw a retryable exception so that this request will be handled on another connection. + ThrowRetry(SR.net_http_server_shutdown); } - } - catch (ObjectDisposedException) - { - // We have race condition between shutting down and initiating new requests. - // When we are shutting down the connection (e.g. due to receiving GOAWAY, etc) - // we will wait until the stream count goes to 0, and then we will close the connetion - // and perform clean up, including disposing _concurrentStreams. - // So if we get ObjectDisposedException here, we must have shut down the connection. - // Throw a retryable request exception if this is not result of some other error. - // This will cause retry logic to kick in and perform another connection attempt. - // The user should never see this exception. See similar handling below. - // Throw a retryable request exception if this is not result of some other error. - // This will cause retry logic to kick in and perform another connection attempt. - // The user should never see this exception. See also below. - lock (SyncObject) + + if (_streamsInUse > _maxConcurrentStreams) { - Debug.Assert(_disposed || _lastStreamId != -1); - Debug.Assert(_httpStreams.Count == 0); - ThrowShutdownException(); - throw; // unreachable + // The server must have sent a downward adjustment to SETTINGS_MAX_CONCURRENT_STREAMS, so our previous stream reservation is no longer valid. + // We might want a better exception message here, but in general the user shouldn't see this anyway since it will be retried. + ThrowRetry(SR.net_http_request_aborted); } + + // Now that we're holding the lock, configure the stream. The lock must be held while + // assigning the stream ID to ensure only one stream gets an ID, and it must be held + // across setting the initial window size (available credit) and storing the stream into + // collection such that window size updates are able to atomically affect all known streams. + http2Stream.Initialize(_nextStream, _initialServerStreamWindowSize); + + // Client-initiated streams are always odd-numbered, so increase by 2. + _nextStream += 2; + + _httpStreams.Add(http2Stream.StreamId, http2Stream); } + } + private async ValueTask SendHeadersAsync(HttpRequestMessage request, CancellationToken cancellationToken, bool mustFlush) + { ArrayBuffer headerBuffer = default; try { @@ -1402,32 +1548,7 @@ await PerformWriteAsync(totalSize, (thisRef: this, http2Stream, headerBytes, end { if (NetEventSource.Log.IsEnabled()) s.thisRef.Trace(s.http2Stream.StreamId, $"Started writing. Total header bytes={s.headerBytes.Length}"); - // Allocate the next available stream ID. Note that if we fail before sending the headers, - // we'll just skip this stream ID, which is fine. - lock (s.thisRef.SyncObject) - { - if (s.thisRef._nextStream == MaxStreamId || s.thisRef._disposed || s.thisRef._lastStreamId != -1) - { - // We ran out of stream IDs or we raced between acquiring the connection from the pool and shutting down. - // Throw a retryable request exception. This will cause retry logic to kick in - // and perform another connection attempt. The user should never see this exception. - s.thisRef.ThrowShutdownException(); - } - - // Now that we're holding the lock, configure the stream. The lock must be held while - // assigning the stream ID to ensure only one stream gets an ID, and it must be held - // across setting the initial window size (available credit) and storing the stream into - // collection such that window size updates are able to atomically affect all known streams. - s.http2Stream.Initialize(s.thisRef._nextStream, s.thisRef._initialServerStreamWindowSize); - - // Client-initiated streams are always odd-numbered, so increase by 2. - s.thisRef._nextStream += 2; - - // We're about to flush the HEADERS frame, so add the stream to the dictionary now. - // The lifetime of the stream is now controlled by the stream itself and the connection. - // This can fail if the connection is shutting down, in which case we will cancel sending this frame. - s.thisRef._httpStreams.Add(s.http2Stream.StreamId, s.http2Stream); - } + s.thisRef.AddStream(s.http2Stream); Span span = writeBuffer.Span; @@ -1466,7 +1587,7 @@ await PerformWriteAsync(totalSize, (thisRef: this, http2Stream, headerBytes, end } catch { - _concurrentStreams.AdjustCredit(1); + ReleaseStream(); throw; } finally @@ -1572,187 +1693,63 @@ private void ExtendWindow(int amount) LogExceptions(SendWindowUpdateAsync(0, windowUpdateSize)); } - /// Abort all streams and cause further processing to fail. - /// Exception causing Abort to be called. - private void Abort(Exception abortException) + public override long GetIdleTicks(long nowTicks) { - // The connection has failed, e.g. failed IO or a connection-level frame error. - bool alreadyAborting = false; lock (SyncObject) { - if (_abortException is null) - { - _abortException = abortException; - } - else - { - alreadyAborting = true; - } - } - - if (alreadyAborting) - { - if (NetEventSource.Log.IsEnabled()) Trace($"Abort called while already aborting. {nameof(abortException)}=={abortException}"); - - return; - } - - if (NetEventSource.Log.IsEnabled()) Trace($"Abort initiated. {nameof(abortException)}=={abortException}"); - - _writeChannel.Writer.Complete(); - - AbortStreams(_abortException); - } - - /// Gets whether the connection exceeded any of the connection limits. - /// The current tick count. Passed in to amortize the cost of calling Environment.TickCount64. - /// How long a connection can be open to be considered reusable. - /// How long a connection can have been idle in the pool to be considered reusable. - /// - /// true if we believe the connection is expired; otherwise, false. There is an inherent race condition here, - /// in that the server could terminate the connection or otherwise make it unusable immediately after we check it, - /// but there's not much difference between that and starting to use the connection and then having the server - /// terminate it, which would be considered a failure, so this race condition is largely benign and inherent to - /// the nature of connection pooling. - /// - public bool IsExpired(long nowTicks, - TimeSpan connectionLifetime, - TimeSpan connectionIdleTimeout) - { - if (_disposed) - { - return true; - } - - // Check idle timeout when there are not pending requests for a while. - if ((connectionIdleTimeout != Timeout.InfiniteTimeSpan) && - (_httpStreams.Count == 0) && - ((nowTicks - _idleSinceTickCount) > connectionIdleTimeout.TotalMilliseconds)) - { - if (NetEventSource.Log.IsEnabled()) Trace($"HTTP/2 connection no longer usable. Idle {TimeSpan.FromMilliseconds(nowTicks - _idleSinceTickCount)} > {connectionIdleTimeout}."); - - return true; - } - - if (LifetimeExpired(nowTicks, connectionLifetime)) - { - if (NetEventSource.Log.IsEnabled()) Trace($"HTTP/2 connection no longer usable. Lifetime {TimeSpan.FromMilliseconds(nowTicks - CreationTickCount)} > {connectionLifetime}."); - - return true; + return _streamsInUse == 0 ? _idleSinceTickCount - nowTicks : 0; } - - return false; } - private void AbortStreams(Exception abortException) + /// Abort all streams and cause further processing to fail. + /// Exception causing Abort to be called. + private void Abort(Exception abortException) { - if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(abortException)}={abortException}"); - - // Invalidate outside of lock to avoid race with HttpPool Dispose() - // We should not try to grab pool lock while holding connection lock as on disposing pool, - // we could hold pool lock while trying to grab connection lock in Dispose(). - _pool.InvalidateHttp2Connection(this); + if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(abortException)}=={abortException}"); + // The connection has failed, e.g. failed IO or a connection-level protocol error. List streamsToAbort = new List(); - lock (SyncObject) { - // Set _lastStreamId to int.MaxValue to indicate that we are shutting down - // and we must assume all active streams have been processed by the server - _lastStreamId = int.MaxValue; - - foreach (KeyValuePair kvp in _httpStreams) + if (_abortException is not null) { - int streamId = kvp.Key; - Debug.Assert(streamId == kvp.Value.StreamId); - - streamsToAbort.Add(kvp.Value); + if (NetEventSource.Log.IsEnabled()) Trace($"Abort called while already aborting. {nameof(abortException)}={abortException}"); + return; } - CheckForShutdown(); - } - - // Avoid calling OnAbort under the lock, as it may cause the Http2Stream - // to call back in to RemoveStream - foreach (Http2Stream s in streamsToAbort) - { - s.OnReset(abortException); - } - } - - private void StartTerminatingConnection(int lastValidStream, Exception abortException) - { - Debug.Assert(lastValidStream >= 0); - - // Invalidate outside of lock to avoid race with HttpPool Dispose() - // We should not try to grab pool lock while holding connection lock as on disposing pool, - // we could hold pool lock while trying to grab connection lock in Dispose(). - _pool.InvalidateHttp2Connection(this); - - // There is no point sending more PING frames for RTT estimation: - _rttEstimator.OnGoAwayReceived(); - - List streamsToAbort = new List(); - - lock (SyncObject) - { - if (_lastStreamId == -1) - { - _lastStreamId = lastValidStream; - } - else - { - // We have already received GOAWAY before - // In this case the smaller valid stream is used - _lastStreamId = Math.Min(_lastStreamId, lastValidStream); - } + _abortException = abortException; - if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(lastValidStream)}={lastValidStream}, {nameof(_lastStreamId)}={_lastStreamId}"); + Shutdown(); foreach (KeyValuePair kvp in _httpStreams) { int streamId = kvp.Key; Debug.Assert(streamId == kvp.Value.StreamId); - if (streamId > _lastStreamId) - { - streamsToAbort.Add(kvp.Value); - } - else - { - if (NetEventSource.Log.IsEnabled()) Trace($"Found {nameof(streamId)} {streamId} <= {_lastStreamId}."); - } + streamsToAbort.Add(kvp.Value); } - - CheckForShutdown(); } - // Avoid calling OnAbort under the lock, as it may cause the Http2Stream - // to call back in to RemoveStream + // Avoid calling OnReset under the lock, as it may cause the Http2Stream to call back in to RemoveStream foreach (Http2Stream s in streamsToAbort) { - s.OnReset(abortException, canRetry: true); + s.OnReset(_abortException); } } - private void CheckForShutdown() + private void FinalTeardown() { - Debug.Assert(_disposed || _lastStreamId != -1); - Debug.Assert(Monitor.IsEntered(SyncObject)); + if (NetEventSource.Log.IsEnabled()) Trace(""); - // Check if dictionary has become empty - if (_httpStreams.Count != 0) - { - return; - } + Debug.Assert(_disposed); + Debug.Assert(_streamsInUse == 0); GC.SuppressFinalize(this); - // Do shutdown. _stream.Dispose(); _connectionWindow.Dispose(); - _concurrentStreams.Dispose(); + _writeChannel.Writer.Complete(); if (HttpTelemetry.Log.IsEnabled()) { @@ -1763,15 +1760,23 @@ private void CheckForShutdown() } } - public void Dispose() + public override void Dispose() { lock (SyncObject) { + if (NetEventSource.Log.IsEnabled()) Trace($"{nameof(_disposed)}={_disposed}, {nameof(_streamsInUse)}={_streamsInUse}"); + if (!_disposed) { + SignalAvailableStreamsWaiter(false); + SignalShutdownWaiter(); + _disposed = true; - CheckForShutdown(); + if (_streamsInUse == 0) + { + FinalTeardown(); + } } } } @@ -1885,7 +1890,7 @@ private enum SettingId : ushort // Note that this is safe to be called concurrently by multiple threads. - public sealed override async Task SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) + public async Task SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) { Debug.Assert(async); if (NetEventSource.Log.IsEnabled()) Trace($"{request}"); @@ -1969,7 +1974,6 @@ e is Http2ProtocolException || private void RemoveStream(Http2Stream http2Stream) { if (NetEventSource.Log.IsEnabled()) Trace(http2Stream.StreamId, ""); - Debug.Assert(http2Stream != null); lock (SyncObject) { @@ -1978,20 +1982,9 @@ private void RemoveStream(Http2Stream http2Stream) Debug.Fail($"Stream {http2Stream.StreamId} not found in dictionary during RemoveStream???"); return; } - - if (_httpStreams.Count == 0) - { - // If this was last pending request, get timestamp so we can monitor idle time. - _idleSinceTickCount = Environment.TickCount64; - - if (_disposed || _lastStreamId != -1) - { - CheckForShutdown(); - } - } } - _concurrentStreams.AdjustCredit(1); + ReleaseStream(); } private void RefreshPingTimestamp() @@ -2023,7 +2016,10 @@ private void VerifyKeepAlive() { lock (SyncObject) { - if (_httpStreams.Count == 0) return; + if (_streamsInUse == 0) + { + return; + } } } @@ -2067,7 +2063,7 @@ internal void Trace(int streamId, string message, [CallerMemberName] string? mem message); // message [DoesNotReturn] - private static void ThrowRetry(string message, Exception innerException) => + private static void ThrowRetry(string message, Exception? innerException = null) => throw new HttpRequestException(message, innerException, allowRetry: RequestRetryType.RetryOnConnectionFailure); private static Exception GetRequestAbortedException(Exception? innerException = null) => diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs index 4667b7bec32f4..d3e6a90c4305b 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/Http3Connection.cs @@ -18,7 +18,7 @@ namespace System.Net.Http [SupportedOSPlatform("windows")] [SupportedOSPlatform("linux")] [SupportedOSPlatform("macos")] - internal sealed class Http3Connection : HttpConnectionBase, IDisposable + internal sealed class Http3Connection : HttpConnectionBase { // TODO: once HTTP/3 is standardized, create APIs for this. public static readonly SslApplicationProtocol Http3ApplicationProtocol29 = new SslApplicationProtocol("h3-29"); @@ -92,7 +92,7 @@ public Http3Connection(HttpConnectionPool pool, HttpAuthority? origin, HttpAutho /// /// Starts shutting down the . Final cleanup will happen when there are no more active requests. /// - public void Dispose() + public override void Dispose() { lock (SyncObj) { @@ -155,7 +155,7 @@ private void CheckForShutdown() } } - public override async Task SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) + public async Task SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) { Debug.Assert(async); @@ -329,6 +329,8 @@ public void RemoveStream(QuicStream stream) } } + public override long GetIdleTicks(long nowTicks) => throw new NotImplementedException("We aren't scavenging HTTP3 connections yet"); + public override void Trace(string message, [CallerMemberName] string? memberName = null) => Trace(0, message, memberName); diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs index 00e3f3b6fe322..7ef8ca09b06e0 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnection.cs @@ -17,7 +17,7 @@ namespace System.Net.Http { - internal sealed partial class HttpConnection : HttpConnectionBase, IDisposable + internal sealed partial class HttpConnection : HttpConnectionBase { /// Default size of the read buffer used for the connection. private const int InitialReadBufferSize = @@ -61,7 +61,9 @@ internal sealed partial class HttpConnection : HttpConnectionBase, IDisposable private int _readOffset; private int _readLength; + private long _idleSinceTickCount; private bool _inUse; + private bool _detachedFromPool; private bool _canRetry; private bool _startedSendingRequestBody; private bool _connectionClose; // Connection: close was seen on last response @@ -90,6 +92,8 @@ public HttpConnection( _weakThisRef = new WeakReference(this); + _idleSinceTickCount = Environment.TickCount64; + if (HttpTelemetry.Log.IsEnabled()) { HttpTelemetry.Log.Http11ConnectionEstablished(); @@ -101,7 +105,7 @@ public HttpConnection( ~HttpConnection() => Dispose(disposing: false); - public void Dispose() => Dispose(disposing: true); + public override void Dispose() => Dispose(disposing: true); private void Dispose(bool disposing) { @@ -118,7 +122,10 @@ private void Dispose(bool disposing) HttpTelemetry.Log.Http11ConnectionClosed(); } - _pool.DecrementConnectionCount(); + if (!_detachedFromPool) + { + _pool.InvalidateHttp11Connection(this, disposing); + } if (disposing) { @@ -187,7 +194,7 @@ public bool PrepareForReuse(bool async) /// Check whether a currently idle connection is still usable, or should be scavenged. /// True if connection can be used, false if it is invalid due to receiving EOF or unexpected data. - public bool CheckUsabilityOnScavenge() + public override bool CheckUsabilityOnScavenge() { // We may already have a read-ahead task if we did a previous scavenge and haven't used the connection since. if (_readAheadTask is null) @@ -231,6 +238,8 @@ async ValueTask ReadAheadWithZeroByteReadAsync() return null; } + public override long GetIdleTicks(long nowTicks) => nowTicks - _idleSinceTickCount; + public TransportContext? TransportContext => _transportContext; public HttpConnectionKind Kind => _pool.Kind; @@ -709,14 +718,24 @@ public async Task SendAsyncCore(HttpRequestMessage request, // Successful response to CONNECT does not have body. // What ever comes next should be opaque. responseStream = new RawConnectionStream(this); + // Don't put connection back to the pool if we upgraded to tunnel. // We cannot use it for normal HTTP requests any more. _connectionClose = true; + _pool.InvalidateHttp11Connection(this); + _detachedFromPool = true; } else if (response.StatusCode == HttpStatusCode.SwitchingProtocols) { responseStream = new RawConnectionStream(this); + + // Don't put connection back to the pool if we switched protocols. + // We cannot use it for normal HTTP requests any more. + _connectionClose = true; + + _pool.InvalidateHttp11Connection(this); + _detachedFromPool = true; } else if (response.Content.Headers.ContentLength != null) { @@ -802,7 +821,7 @@ public async Task SendAsyncCore(HttpRequestMessage request, } } - public sealed override Task SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) => + public Task SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) => SendAsyncCore(request, async, cancellationToken); private bool MapSendException(Exception exception, CancellationToken cancellationToken, out Exception mappedException) @@ -1932,6 +1951,17 @@ internal void Release() } } + /// + /// Detach the connection from the pool, so it is no longer counted against the connection limit. + /// This is used when we are creating a replacement connection for NT auth challenges. + /// + internal void DetachFromPool() + { + Debug.Assert(_inUse); + + _detachedFromPool = true; + } + private void CompleteResponse() { Debug.Assert(_currentRequest != null, "Expected the connection to be associated with a request."); @@ -2015,8 +2045,12 @@ private void ReturnConnectionToPool() } else { + Debug.Assert(!_detachedFromPool, "Should not be detached from pool unless _connectionClose is true"); + + _idleSinceTickCount = Environment.TickCount64; + // Put connection back in the pool. - _pool.ReturnConnection(this); + _pool.ReturnHttp11Connection(this); } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionBase.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionBase.cs index d0e82bc6d46b6..ac3c852dc844d 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionBase.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionBase.cs @@ -13,7 +13,7 @@ namespace System.Net.Http { - internal abstract class HttpConnectionBase : IHttpTrace + internal abstract class HttpConnectionBase : IDisposable, IHttpTrace { /// Cached string for the last Date header received on this connection. private string? _lastDateHeaderValue; @@ -39,7 +39,6 @@ static string GetOrAddCachedValue([NotNull] ref string? cache, HeaderDescriptor } } - public abstract Task SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken); public abstract void Trace(string message, [CallerMemberName] string? memberName = null); protected void TraceConnection(Stream stream) @@ -60,14 +59,15 @@ protected void TraceConnection(Stream stream) } } - public long CreationTickCount { get; } = Environment.TickCount64; + private readonly long _creationTickCount = Environment.TickCount64; - // Check if lifetime expired on connection. - public bool LifetimeExpired(long nowTicks, TimeSpan lifetime) - { - return lifetime != Timeout.InfiniteTimeSpan && - (lifetime == TimeSpan.Zero || (nowTicks - CreationTickCount) > lifetime.TotalMilliseconds); - } + public long GetLifetimeTicks(long nowTicks) => nowTicks - _creationTickCount; + + public abstract long GetIdleTicks(long nowTicks); + + /// Check whether a connection is still usable, or should be scavenged. + /// True if connection can be used. + public virtual bool CheckUsabilityOnScavenge() => true; internal static bool IsDigit(byte c) => (uint)(c - '0') <= '9' - '0'; @@ -126,5 +126,7 @@ static void LogFaulted(HttpConnectionBase connection, Task task) if (NetEventSource.Log.IsEnabled()) connection.Trace($"Exception from asynchronous processing: {e}"); } } + + public abstract void Dispose(); } } diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs index 480724dd8eb5d..c624cbd677e4f 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs @@ -13,7 +13,7 @@ using System.Net.Security; using System.Net.Sockets; using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; +using System.Runtime.ExceptionServices; using System.Runtime.Versioning; using System.Security.Authentication; using System.Text; @@ -64,15 +64,31 @@ internal sealed class HttpConnectionPool : IDisposable /// The time, in milliseconds, that an authority should remain in . private const int AltSvcBlocklistTimeoutInMilliseconds = 10 * 60 * 1000; - /// List of idle connections stored in the pool. - private readonly List _idleConnections = new List(); - /// The maximum number of connections allowed to be associated with the pool. - private readonly int _maxConnections; + // HTTP/1.1 connection pool + + /// List of available HTTP/1.1 connections stored in the pool. + private readonly List _availableHttp11Connections = new List(); + /// The maximum number of HTTP/1.1 connections allowed to be associated with the pool. + private readonly int _maxHttp11Connections; + /// The number of HTTP/1.1 connections associated with the pool, including in use, available, and pending. + private int _associatedHttp11ConnectionCount; + /// The number of HTTP/1.1 connections that are in the process of being established. + private int _pendingHttp11ConnectionCount; + /// Queue of requests waiting for an HTTP/1.1 connection. + private RequestQueue _http11RequestQueue; + + // HTTP/2 connection pool + + /// List of available HTTP/2 connections stored in the pool. + private List? _availableHttp2Connections; + /// The number of HTTP/2 connections associated with the pool, including in use, available, and pending. + private int _associatedHttp2ConnectionCount; + /// Indicates whether an HTTP/2 connection is in the process of being established. + private bool _pendingHttp2Connection; + /// Queue of requests waiting for an HTTP/2 connection. + private RequestQueue _http2RequestQueue; private bool _http2Enabled; - // This array must be treated as immutable. It can only be replaced with a new value in AddHttp2Connection method. - private volatile Http2Connection[]? _http2Connections; - private SemaphoreSlim? _http2ConnectionCreateLock; private byte[]? _http2AltSvcOriginUri; internal readonly byte[]? _http2EncodedAuthorityHostHeader; private readonly bool _http3Enabled; @@ -88,11 +104,6 @@ internal sealed class HttpConnectionPool : IDisposable private readonly SslClientAuthenticationOptions? _sslOptionsHttp2Only; private readonly SslClientAuthenticationOptions? _sslOptionsHttp3; - /// Queue of waiters waiting for a connection. Created on demand. - private Queue>? _waiters; - - /// The number of connections associated with the pool. Some of these may be in , others may be in use. - private int _associatedConnectionCount; /// Whether the pool has been used since the last time a cleanup occurred. private bool _usedSinceLastCleanup = true; /// Whether the pool has been disposed. @@ -113,7 +124,7 @@ public HttpConnectionPool(HttpConnectionPoolManager poolManager, HttpConnectionK _poolManager = poolManager; _kind = kind; _proxyUri = proxyUri; - _maxConnections = Settings._maxConnectionsPerServer; + _maxHttp11Connections = Settings._maxConnectionsPerServer; if (host != null) { @@ -183,7 +194,7 @@ public HttpConnectionPool(HttpConnectionPoolManager poolManager, HttpConnectionK // Don't enforce the max connections limit on proxy tunnels; this would mean that connections to different origin servers // would compete for the same limited number of connections. // We will still enforce this limit on the user of the tunnel (i.e. ProxyTunnel or SslProxyTunnel). - _maxConnections = int.MaxValue; + _maxHttp11Connections = int.MaxValue; _http2Enabled = false; _http3Enabled = false; @@ -364,10 +375,54 @@ public byte[] Http2AltSvcOriginUri } } - public bool EnableMultipleHttp2Connections => _poolManager.Settings.EnableMultipleHttp2Connections; + private bool EnableMultipleHttp2Connections => _poolManager.Settings.EnableMultipleHttp2Connections; /// Object used to synchronize access to state in the pool. - private object SyncObj => _idleConnections; + private object SyncObj + { + get + { + Debug.Assert(!Monitor.IsEntered(_availableHttp11Connections)); + return _availableHttp11Connections; + } + } + + private bool HasSyncObjLock => Monitor.IsEntered(_availableHttp11Connections); + + // Overview of connection management (mostly HTTP version independent): + // + // Each version of HTTP (1.1, 2, 3) has its own connection pool, and each of these work in a similar manner, + // allowing for differences between the versions (most notably, HTTP/1.1 is not multiplexed.) + // + // When a request is submitted for a particular version (e.g. HTTP/1.1), we first look in the pool for available connections. + // An "available" connection is one that is (hopefully) usable for a new request. + // For HTTP/1.1, this is just an idle connection. + // For HTTP2/3, this is a connection that (hopefully) has available streams to use for new requests. + // If we find an available connection, we will attempt to validate it and then use it. + // We check the lifetime of the connection and discard it if the lifetime is exceeded. + // We check that the connection has not shut down; if so we discard it. + // For HTTP2/3, we reserve a stream on the connection. If this fails, we cannot use the connection right now. + // If validation fails, we will attempt to find a different available connection. + // + // Once we have found a usable connection, we use it to process the request. + // For HTTP/1.1, a connection can handle only a single request at a time, thus it is immediately removed from the list of available connections. + // For HTTP2/3, a connection is only removed from the available list when it has no more available streams. + // In either case, the connection still counts against the total associated connection count for the pool. + // + // If we cannot find a usable available connection, then the request is added the to the request queue for the appropriate version. + // + // Whenever a request is queued, or an existing connection shuts down, we will check to see if we should inject a new connection. + // Injection policy depends on both user settings and some simple heuristics. + // See comments on the relevant routines for details on connection injection policy. + // + // When a new connection is successfully created, or an existing unavailable connection becomes available again, + // we will attempt to use this connection to handle any queued requests (subject to lifetime restrictions on existing connections). + // This may result in the connection becoming unavailable again, because it cannot handle any more requests at the moment. + // If not, we will return the connection to the pool as an available connection for use by new requests. + // + // When a connection shuts down, either gracefully (e.g. GOAWAY) or abortively (e.g. IOException), + // we will remove it from the list of available connections, if it is present there. + // If not, then it must be unavailable at the moment; we will detect this and ensure it is not added back to the available pool. private static HttpRequestException GetVersionException(HttpRequestMessage request, int desiredVersion) { @@ -376,288 +431,369 @@ private static HttpRequestException GetVersionException(HttpRequestMessage reque return new HttpRequestException(SR.Format(SR.net_http_requested_version_cannot_establish, request.Version, request.VersionPolicy, desiredVersion)); } - private ValueTask GetOrReserveHttp11ConnectionAsync(bool async, CancellationToken cancellationToken) + private bool CheckExpirationOnGet(HttpConnectionBase connection) { - long nowTicks = Environment.TickCount64; + TimeSpan pooledConnectionLifetime = _poolManager.Settings._pooledConnectionLifetime; + if (pooledConnectionLifetime != Timeout.InfiniteTimeSpan) + { + return connection.GetLifetimeTicks(Environment.TickCount64) > pooledConnectionLifetime.TotalMilliseconds; + } + return false; + } + + private static Exception CreateConnectTimeoutException(OperationCanceledException oce) + { + // The pattern for request timeouts (on HttpClient) is to throw an OCE with an inner exception of TimeoutException. + // Do the same for ConnectTimeout-based timeouts. + TimeoutException te = new TimeoutException(SR.net_http_connect_timedout, oce.InnerException); + Exception newException = CancellationHelper.CreateOperationCanceledException(te, oce.CancellationToken); + ExceptionDispatchInfo.SetCurrentStackTrace(newException); + return newException; + } + + private async Task AddHttp11ConnectionAsync(HttpRequestMessage request) + { + if (NetEventSource.Log.IsEnabled()) Trace("Creating new HTTP/1.1 connection for pool."); + + HttpConnection connection; + using (CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource()) + { + try + { + connection = await CreateHttp11ConnectionAsync(request, false, cts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException oce) when (oce.CancellationToken == cts.Token) + { + HandleHttp11ConnectionFailure(CreateConnectTimeoutException(oce)); + return; + } + catch (Exception e) + { + HandleHttp11ConnectionFailure(e); + return; + } + } + + // Add the established connection to the pool. + ReturnHttp11Connection(connection, isNewConnection: true); + } + + private void CheckForHttp11ConnectionInjection() + { + Debug.Assert(HasSyncObjLock); + + if (!_http11RequestQueue.TryPeekNextRequest(out HttpRequestMessage? request)) + { + return; + } + + // Determine if we can and should add a new connection to the pool. + if (_availableHttp11Connections.Count == 0 && // No available connections + _http11RequestQueue.Count > _pendingHttp11ConnectionCount && // More requests queued than pending connections + _associatedHttp11ConnectionCount < _maxHttp11Connections) // Under the connection limit + { + _associatedHttp11ConnectionCount++; + _pendingHttp11ConnectionCount++; + + Task.Run(() => AddHttp11ConnectionAsync(request)); + } + } + + private async ValueTask GetHttp11ConnectionAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) + { // Look for a usable idle connection. - TaskCompletionSourceWithCancellation waiter; + TaskCompletionSourceWithCancellation waiter; while (true) { - HttpConnection connection; + HttpConnection? connection = null; lock (SyncObj) { - int idleConnectionCount = _idleConnections.Count; - if (idleConnectionCount > 0) + _usedSinceLastCleanup = true; + + int availableConnectionCount = _availableHttp11Connections.Count; + if (availableConnectionCount > 0) { - // We have an idle connection that we can attempt to use. + // We have a connection that we can attempt to use. // Validate it below outside the lock, to avoid doing expensive operations while holding the lock. - connection = _idleConnections[idleConnectionCount - 1]._connection; - _idleConnections.RemoveAt(idleConnectionCount - 1); + connection = _availableHttp11Connections[availableConnectionCount - 1]; + _availableHttp11Connections.RemoveAt(availableConnectionCount - 1); } else { - // No available idle connections. - if (_associatedConnectionCount < _maxConnections) - { - // We are under the connection limit, so just increment the count and return null - // to indicate to the caller that they should create a new connection. - IncrementConnectionCountNoLock(); - return new ValueTask((HttpConnection?)null); - } - else - { - // We've reached the connection limit and need to wait for an existing connection - // to become available, or to be closed so that we can create a new connection. - // Enqueue a waiter that will be signalled when this happens. - // Break out of the loop and then do the actual wait below. - waiter = EnqueueWaiter(); - break; - } + // No available connections. Add to the request queue. + waiter = _http11RequestQueue.EnqueueRequest(request); + + CheckForHttp11ConnectionInjection(); - // Note that we don't check for _disposed. We may end up disposing the - // created connection when it's returned, but we don't want to block use - // of the pool if it's already been disposed, as there's a race condition - // between getting a pool and someone disposing of it, and we don't want - // to complicate the logic about trying to get a different pool when the - // retrieved one has been disposed of. In the future we could alternatively - // try returning such connections to whatever pool is currently considered - // current for that endpoint, if there is one. + // Break out of the loop and continue processing below. + break; } } - if (connection.LifetimeExpired(nowTicks, _poolManager.Settings._pooledConnectionLifetime)) + if (CheckExpirationOnGet(connection)) { - if (NetEventSource.Log.IsEnabled()) connection.Trace("Found expired connection in pool."); + if (NetEventSource.Log.IsEnabled()) connection.Trace("Found expired HTTP/1.1 connection in pool."); connection.Dispose(); continue; } if (!connection.PrepareForReuse(async)) { - if (NetEventSource.Log.IsEnabled()) connection.Trace("Found invalid connection in pool."); + if (NetEventSource.Log.IsEnabled()) connection.Trace("Found invalid HTTP/1.1 connection in pool."); connection.Dispose(); continue; } - if (NetEventSource.Log.IsEnabled()) connection.Trace("Found usable connection in pool."); - return new ValueTask(connection); + if (NetEventSource.Log.IsEnabled()) connection.Trace("Found usable HTTP/1.1 connection in pool."); + return connection; } - // We are at the connection limit. Wait for an available connection or connection count. - if (NetEventSource.Log.IsEnabled()) Trace($"{(async ? "As" : "S")}ynchronous request. Connection limit reached, waiting for available connection."); + // There were no available idle connections. This request has been added to the request queue. + if (NetEventSource.Log.IsEnabled()) Trace($"No available HTTP/1.1 connections; request queued."); - if (HttpTelemetry.Log.IsEnabled()) + ValueStopwatch stopwatch = ValueStopwatch.StartNew(); + try { - return WaitOnWaiterWithTelemetryAsync(waiter, async, cancellationToken); + return await waiter.WaitWithCancellationAsync(async, cancellationToken).ConfigureAwait(false); } - else + finally { - return waiter.WaitWithCancellationAsync(cancellationToken); + if (HttpTelemetry.Log.IsEnabled()) + { + HttpTelemetry.Log.Http11RequestLeftQueue(stopwatch.GetElapsedTime().TotalMilliseconds); + } } + } + + private async Task HandleHttp11Downgrade(HttpRequestMessage request, Socket? socket, Stream stream, TransportContext? transportContext, CancellationToken cancellationToken) + { + if (NetEventSource.Log.IsEnabled()) Trace("Server does not support HTTP2; disabling HTTP2 use and proceeding with HTTP/1.1 connection"); - static async ValueTask WaitOnWaiterWithTelemetryAsync(TaskCompletionSourceWithCancellation waiter, bool async, CancellationToken cancellationToken) + bool canUse = true; + lock (SyncObj) { - ValueStopwatch stopwatch = ValueStopwatch.StartNew(); + Debug.Assert(_pendingHttp2Connection); + Debug.Assert(_associatedHttp2ConnectionCount > 0); - HttpConnection? connection = await waiter.WaitWithCancellationAsync(cancellationToken).ConfigureAwait(false); + // Server does not support HTTP2. Disable further HTTP2 attempts. + _http2Enabled = false; + _associatedHttp2ConnectionCount--; + _pendingHttp2Connection = false; - HttpTelemetry.Log.Http11RequestLeftQueue(stopwatch.GetElapsedTime().TotalMilliseconds); - return connection; - } - } + // Signal to any queued HTTP2 requests that they must downgrade. + while (_http2RequestQueue.TryDequeueNextRequest(null)) + ; - // Returns null if HTTP2 cannot be used - private async ValueTask GetHttp2ConnectionAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) - { - Debug.Assert(_kind == HttpConnectionKind.Https || _kind == HttpConnectionKind.SslProxyTunnel || _kind == HttpConnectionKind.Http || _kind == HttpConnectionKind.SocksTunnel || _kind == HttpConnectionKind.SslSocksTunnel); - - // See if we have an HTTP2 connection - Http2Connection? http2Connection = GetExistingHttp2Connection(); + if (_associatedHttp11ConnectionCount < _maxHttp11Connections) + { + _associatedHttp11ConnectionCount++; + _pendingHttp11ConnectionCount++; + } + else + { + // We are already at the limit for HTTP/1.1 connections, so do not proceed with this connection. + canUse = false; + } + } - if (http2Connection != null) + if (!canUse) { - // Connection exists and it is still good to use. - if (NetEventSource.Log.IsEnabled()) Trace("Using existing HTTP2 connection."); - _usedSinceLastCleanup = true; - return http2Connection; + if (NetEventSource.Log.IsEnabled()) Trace("Discarding downgraded HTTP/1.1 connection because HTTP/1.1 connection limit is exceeded"); + stream.Dispose(); } - // Ensure that the connection creation semaphore is created - if (_http2ConnectionCreateLock == null) + HttpConnection http11Connection; + try { - lock (SyncObj) - { - if (_http2ConnectionCreateLock == null) - { - _http2ConnectionCreateLock = new SemaphoreSlim(1); - } - } + // Note, the same CancellationToken from the original HTTP2 connection establishment still applies here. + http11Connection = await ConstructHttp11ConnectionAsync(false, socket, stream, transportContext, request, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException oce) when (oce.CancellationToken == cancellationToken) + { + HandleHttp11ConnectionFailure(CreateConnectTimeoutException(oce)); + return; + } + catch (Exception e) + { + HandleHttp11ConnectionFailure(e); + return; } - // Try to establish an HTTP2 connection - Socket? socket = null; - Stream? stream = null; - SslStream? sslStream = null; - TransportContext? transportContext = null; + ReturnHttp11Connection(http11Connection, isNewConnection: true); + } - // Serialize creation attempt - await _http2ConnectionCreateLock.WaitAsync(cancellationToken).ConfigureAwait(false); - try + private async Task AddHttp2ConnectionAsync(HttpRequestMessage request) + { + if (NetEventSource.Log.IsEnabled()) Trace("Creating new HTTP/2 connection for pool."); + + Http2Connection connection; + using (CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource()) { - http2Connection = GetExistingHttp2Connection(); - if (http2Connection != null) + try { - return http2Connection; - } + (Socket? socket, Stream stream, TransportContext? transportContext) = await ConnectAsync(request, false, cts.Token).ConfigureAwait(false); - // Recheck if HTTP2 has been disabled by a previous attempt. - if (_http2Enabled) - { - if (NetEventSource.Log.IsEnabled()) + if (IsSecure) { - Trace("Attempting new HTTP2 connection."); - } - - (socket, stream, transportContext) = - await ConnectAsync(request, async, cancellationToken).ConfigureAwait(false); + SslStream sslStream = (SslStream)stream; - Debug.Assert(stream != null); - - sslStream = stream as SslStream; + if (sslStream.NegotiatedApplicationProtocol == SslApplicationProtocol.Http2) + { + // The server accepted our request for HTTP2. - if (!IsSecure) - { - http2Connection = await ConstructHttp2ConnectionAsync(stream, request, cancellationToken).ConfigureAwait(false); + if (sslStream.SslProtocol < SslProtocols.Tls12) + { + stream.Dispose(); + throw new HttpRequestException(SR.Format(SR.net_ssl_http2_requires_tls12, sslStream.SslProtocol)); + } - if (NetEventSource.Log.IsEnabled()) + connection = await ConstructHttp2ConnectionAsync(stream, request, cts.Token).ConfigureAwait(false); + } + else { - Trace("New unencrypted HTTP2 connection established."); + // We established an SSL connection, but the server denied our request for HTTP2. + await HandleHttp11Downgrade(request, socket, stream, transportContext, cts.Token).ConfigureAwait(false); + return; } - - return http2Connection; } + else + { + connection = await ConstructHttp2ConnectionAsync(stream, request, cts.Token).ConfigureAwait(false); + } + } + catch (OperationCanceledException oce) when (oce.CancellationToken == cts.Token) + { + HandleHttp2ConnectionFailure(CreateConnectTimeoutException(oce)); + return; + } + catch (Exception e) + { + HandleHttp2ConnectionFailure(e); + return; + } + } - Debug.Assert(sslStream != null); + // Register for shutdown notification. + // Do this before we return the connection to the pool, because that may result in it being disposed. + ValueTask shutdownTask = connection.WaitForShutdownAsync(); - if (sslStream.NegotiatedApplicationProtocol == SslApplicationProtocol.Http2) - { - // The server accepted our request for HTTP2. + // Add the new connection to the pool. + ReturnHttp2Connection(connection, isNewConnection: true); - if (sslStream.SslProtocol < SslProtocols.Tls12) - { - sslStream.Dispose(); - throw new HttpRequestException(SR.Format(SR.net_ssl_http2_requires_tls12, sslStream.SslProtocol)); - } + // Wait for connection shutdown. + await shutdownTask.ConfigureAwait(false); - http2Connection = await ConstructHttp2ConnectionAsync(stream, request, cancellationToken).ConfigureAwait(false); + InvalidateHttp2Connection(connection); + } - if (NetEventSource.Log.IsEnabled()) - { - Trace("New HTTP2 connection established."); - } + private void CheckForHttp2ConnectionInjection() + { + Debug.Assert(HasSyncObjLock); - return http2Connection; - } - } - } - finally + if (!_http2RequestQueue.TryPeekNextRequest(out HttpRequestMessage? request)) { - _http2ConnectionCreateLock.Release(); + return; } - if (sslStream != null) + // Determine if we can and should add a new connection to the pool. + if ((_availableHttp2Connections?.Count ?? 0) == 0 && // No available connections + !_pendingHttp2Connection && // Only allow one pending HTTP2 connection at a time + (_associatedHttp2ConnectionCount == 0 || EnableMultipleHttp2Connections)) // We allow multiple connections, or don't have a connection currently { - // We established an SSL connection, but the server denied our request for HTTP2. - // Try to establish a 1.1 connection instead and add it to the pool. + _associatedHttp2ConnectionCount++; + _pendingHttp2Connection = true; - if (NetEventSource.Log.IsEnabled()) - { - Trace("Server does not support HTTP2; disabling HTTP2 use and proceeding with HTTP/1.1 connection"); - } + Task.Run(() => AddHttp2ConnectionAsync(request)); + } + } - bool canUse = true; + private async ValueTask GetHttp2ConnectionAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) + { + Debug.Assert(_kind == HttpConnectionKind.Https || _kind == HttpConnectionKind.SslProxyTunnel || _kind == HttpConnectionKind.Http || _kind == HttpConnectionKind.SocksTunnel || _kind == HttpConnectionKind.SslSocksTunnel); + + // Look for a usable connection. + TaskCompletionSourceWithCancellation waiter; + while (true) + { + Http2Connection connection; lock (SyncObj) { - // Server does not support HTTP2. Disable further HTTP2 attempts. - _http2Enabled = false; + _usedSinceLastCleanup = true; - if (_associatedConnectionCount < _maxConnections) + if (!_http2Enabled) { - IncrementConnectionCountNoLock(); + return null; + } + + int availableConnectionCount = _availableHttp2Connections?.Count ?? 0; + if (availableConnectionCount > 0) + { + // We have a connection that we can attempt to use. + // Validate it below outside the lock, to avoid doing expensive operations while holding the lock. + connection = _availableHttp2Connections![availableConnectionCount - 1]; } else { - // We are already at the limit for HTTP/1.1 connections, so do not proceed with this connection. - canUse = false; + // No available connections. Add to the request queue. + waiter = _http2RequestQueue.EnqueueRequest(request); + + CheckForHttp2ConnectionInjection(); + + // Break out of the loop and continue processing below. + break; } } - if (canUse) + if (CheckExpirationOnGet(connection)) { - HttpConnection http11Connection = await ConstructHttp11ConnectionAsync(async, socket, stream!, transportContext, request, cancellationToken).ConfigureAwait(false); - ReturnConnection(http11Connection); + if (NetEventSource.Log.IsEnabled()) connection.Trace("Found expired HTTP/2 connection in pool."); + + InvalidateHttp2Connection(connection); + continue; } - else + + if (!connection.TryReserveStream()) { - if (NetEventSource.Log.IsEnabled()) + if (NetEventSource.Log.IsEnabled()) connection.Trace("Found HTTP/2 connection in pool without available streams."); + + bool found = false; + lock (SyncObj) { - Trace("Discarding downgraded HTTP/1.1 connection because connection limit is exceeded"); + int index = _availableHttp2Connections.IndexOf(connection); + if (index != -1) + { + found = true; + _availableHttp2Connections.RemoveAt(index); + } } - stream!.Dispose(); + // If we didn't find the connection, then someone beat us to removing it (or it shut down) + if (found) + { + DisableHttp2Connection(connection); + } + continue; } - } - - // We are unable to use HTTP2. - return null; - } - - private Http2Connection? GetExistingHttp2Connection() - { - Http2Connection[]? localConnections = _http2Connections; - if (localConnections == null) - { - return null; + if (NetEventSource.Log.IsEnabled()) connection.Trace("Found usable HTTP/2 connection in pool."); + return connection; } - for (int i = 0; i < localConnections.Length; i++) - { - Http2Connection http2Connection = localConnections[i]; + // There were no available connections. This request has been added to the request queue. + if (NetEventSource.Log.IsEnabled()) Trace($"No available HTTP/2 connections; request queued."); - TimeSpan pooledConnectionLifetime = _poolManager.Settings._pooledConnectionLifetime; - if (http2Connection.LifetimeExpired(Environment.TickCount64, pooledConnectionLifetime)) - { - // Connection expired. - if (NetEventSource.Log.IsEnabled()) http2Connection.Trace("Found expired HTTP2 connection."); - http2Connection.Dispose(); - InvalidateHttp2Connection(http2Connection); - } - else if (!EnableMultipleHttp2Connections || http2Connection.CanAddNewStream) - { - return http2Connection; - } + ValueStopwatch stopwatch = ValueStopwatch.StartNew(); + try + { + return await waiter.WaitWithCancellationAsync(async, cancellationToken).ConfigureAwait(false); } - - return null; - } - - private void AddHttp2Connection(Http2Connection newConnection) - { - lock (SyncObj) + finally { - Http2Connection[]? localHttp2Connections = _http2Connections; - int newCollectionSize = localHttp2Connections == null ? 1 : localHttp2Connections.Length + 1; - Http2Connection[] newHttp2Connections = new Http2Connection[newCollectionSize]; - newHttp2Connections[0] = newConnection; - - if (localHttp2Connections != null) + if (HttpTelemetry.Log.IsEnabled()) { - Array.Copy(localHttp2Connections, 0, newHttp2Connections, 1, localHttp2Connections.Length); + HttpTelemetry.Log.Http20RequestLeftQueue(stopwatch.GetElapsedTime().TotalMilliseconds); } - - _http2Connections = newHttp2Connections; } } @@ -673,8 +809,7 @@ private async ValueTask GetHttp3ConnectionAsync(HttpRequestMess if (http3Connection != null) { - TimeSpan pooledConnectionLifetime = _poolManager.Settings._pooledConnectionLifetime; - if (http3Connection.LifetimeExpired(Environment.TickCount64, pooledConnectionLifetime) || http3Connection.Authority != authority) + if (CheckExpirationOnGet(http3Connection) || http3Connection.Authority != authority) { // Connection expired. if (NetEventSource.Log.IsEnabled()) http3Connection.Trace("Found expired HTTP3 connection."); @@ -831,21 +966,7 @@ private async ValueTask GetHttp3ConnectionAsync(HttpRequestMess private async ValueTask SendUsingHttp11Async(HttpRequestMessage request, bool async, bool doRequestAuth, CancellationToken cancellationToken) { - HttpConnection? connection = await GetOrReserveHttp11ConnectionAsync(async, cancellationToken).ConfigureAwait(false); - if (connection is null) - { - if (NetEventSource.Log.IsEnabled()) Trace("Creating new HTTP/1.1 connection for pool."); - - try - { - connection = await CreateHttp11ConnectionAsync(request, async, cancellationToken).ConfigureAwait(false); - } - catch - { - DecrementConnectionCount(); - throw; - } - } + HttpConnection connection = await GetHttp11ConnectionAsync(request, async, cancellationToken).ConfigureAwait(false); // In case we are doing Windows (i.e. connection-based) auth, we need to ensure that we hold on to this specific connection while auth is underway. connection.Acquire(); @@ -1235,68 +1356,53 @@ public ValueTask SendAsync(HttpRequestMessage request, bool return SendWithProxyAuthAsync(request, async, doRequestAuth, cancellationToken); } + private CancellationTokenSource GetConnectTimeoutCancellationTokenSource() => new CancellationTokenSource(Settings._connectTimeout); + private async ValueTask<(Socket?, Stream, TransportContext?)> ConnectAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) { - // If a non-infinite connect timeout has been set, create and use a new CancellationToken that will be canceled - // when either the original token is canceled or a connect timeout occurs. - CancellationTokenSource? cancellationWithConnectTimeout = null; - if (Settings._connectTimeout != Timeout.InfiniteTimeSpan) - { - cancellationWithConnectTimeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cancellationWithConnectTimeout.CancelAfter(Settings._connectTimeout); - cancellationToken = cancellationWithConnectTimeout.Token; - } - - try + Stream? stream = null; + Socket? socket = null; + switch (_kind) { - Stream? stream = null; - Socket? socket = null; - switch (_kind) - { - case HttpConnectionKind.Http: - case HttpConnectionKind.Https: - case HttpConnectionKind.ProxyConnect: - Debug.Assert(_originAuthority != null); - (socket, stream) = await ConnectToTcpHostAsync(_originAuthority.IdnHost, _originAuthority.Port, request, async, cancellationToken).ConfigureAwait(false); - break; - - case HttpConnectionKind.Proxy: - (socket, stream) = await ConnectToTcpHostAsync(_proxyUri!.IdnHost, _proxyUri.Port, request, async, cancellationToken).ConfigureAwait(false); - break; - - case HttpConnectionKind.ProxyTunnel: - case HttpConnectionKind.SslProxyTunnel: - stream = await EstablishProxyTunnelAsync(async, request.HasHeaders ? request.Headers : null, cancellationToken).ConfigureAwait(false); - break; + case HttpConnectionKind.Http: + case HttpConnectionKind.Https: + case HttpConnectionKind.ProxyConnect: + Debug.Assert(_originAuthority != null); + (socket, stream) = await ConnectToTcpHostAsync(_originAuthority.IdnHost, _originAuthority.Port, request, async, cancellationToken).ConfigureAwait(false); + break; - case HttpConnectionKind.SocksTunnel: - case HttpConnectionKind.SslSocksTunnel: - (socket, stream) = await EstablishSocksTunnel(request, async, cancellationToken).ConfigureAwait(false); - break; - } + case HttpConnectionKind.Proxy: + (socket, stream) = await ConnectToTcpHostAsync(_proxyUri!.IdnHost, _proxyUri.Port, request, async, cancellationToken).ConfigureAwait(false); + break; - Debug.Assert(stream != null); - if (socket is null && stream is NetworkStream ns) - { - // We weren't handed a socket directly. But if we're able to extract one, do so. - // Most likely case here is a ConnectCallback was used and returned a NetworkStream. - socket = ns.Socket; - } + case HttpConnectionKind.ProxyTunnel: + case HttpConnectionKind.SslProxyTunnel: + stream = await EstablishProxyTunnelAsync(async, request.HasHeaders ? request.Headers : null, cancellationToken).ConfigureAwait(false); + break; - TransportContext? transportContext = null; - if (IsSecure) - { - SslStream sslStream = await ConnectHelper.EstablishSslConnectionAsync(GetSslOptionsForRequest(request), request, async, stream, cancellationToken).ConfigureAwait(false); - transportContext = sslStream.TransportContext; - stream = sslStream; - } + case HttpConnectionKind.SocksTunnel: + case HttpConnectionKind.SslSocksTunnel: + (socket, stream) = await EstablishSocksTunnel(request, async, cancellationToken).ConfigureAwait(false); + break; + } - return (socket, stream, transportContext); + Debug.Assert(stream != null); + if (socket is null && stream is NetworkStream ns) + { + // We weren't handed a socket directly. But if we're able to extract one, do so. + // Most likely case here is a ConnectCallback was used and returned a NetworkStream. + socket = ns.Socket; } - finally + + TransportContext? transportContext = null; + if (IsSecure) { - cancellationWithConnectTimeout?.Dispose(); + SslStream sslStream = await ConnectHelper.EstablishSslConnectionAsync(GetSslOptionsForRequest(request), request, async, stream, cancellationToken).ConfigureAwait(false); + transportContext = sslStream.TransportContext; + stream = sslStream; } + + return (socket, stream, transportContext); } private async ValueTask<(Socket?, Stream)> ConnectToTcpHostAsync(string host, int port, HttpRequestMessage initialRequest, bool async, CancellationToken cancellationToken) @@ -1357,8 +1463,7 @@ public ValueTask SendAsync(HttpRequestMessage request, bool internal async ValueTask CreateHttp11ConnectionAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken) { - (Socket? socket, Stream? stream, TransportContext? transportContext) = - await ConnectAsync(request, async, cancellationToken).ConfigureAwait(false); + (Socket? socket, Stream stream, TransportContext? transportContext) = await ConnectAsync(request, async, cancellationToken).ConfigureAwait(false); return await ConstructHttp11ConnectionAsync(async, socket, stream!, transportContext, request, cancellationToken).ConfigureAwait(false); } @@ -1403,6 +1508,11 @@ private async ValueTask ApplyPlaintextFilterAsync(bool async, Stream str newStream = await streamTask.ConfigureAwait(false); } + catch (OperationCanceledException oce) when (oce.CancellationToken == cancellationToken) + { + stream.Dispose(); + throw; + } catch (Exception e) { stream.Dispose(); @@ -1444,8 +1554,6 @@ private async ValueTask ConstructHttp2ConnectionAsync(Stream st throw new HttpRequestException(SR.net_http_client_execution_error, e); } - AddHttp2Connection(http2Connection); - return http2Connection; } @@ -1501,133 +1609,136 @@ private async ValueTask EstablishProxyTunnelAsync(bool async, HttpReques return (socket, stream); } - /// Enqueues a waiter to the waiters list. - private TaskCompletionSourceWithCancellation EnqueueWaiter() + private void HandleHttp11ConnectionFailure(Exception e) { - Debug.Assert(Monitor.IsEntered(SyncObj)); - Debug.Assert(Settings._maxConnectionsPerServer != int.MaxValue); - Debug.Assert(_idleConnections.Count == 0, $"With {_idleConnections.Count} idle connections, we shouldn't have a waiter."); + if (NetEventSource.Log.IsEnabled()) Trace("HTTP/1.1 connection failed"); - if (_waiters == null) + lock (SyncObj) { - _waiters = new Queue>(); - } + Debug.Assert(_associatedHttp11ConnectionCount > 0); + Debug.Assert(_pendingHttp11ConnectionCount > 0); - var waiter = new TaskCompletionSourceWithCancellation(); - _waiters.Enqueue(waiter); - return waiter; - } - - private void IncrementConnectionCountNoLock() - { - Debug.Assert(Monitor.IsEntered(SyncObj), $"Expected to be holding {nameof(SyncObj)}"); + _associatedHttp11ConnectionCount--; + _pendingHttp11ConnectionCount--; - if (NetEventSource.Log.IsEnabled()) Trace(null); - _usedSinceLastCleanup = true; + // Fail the next queued request (if any) with this error. + _http11RequestQueue.TryFailNextRequest(e); - Debug.Assert( - _associatedConnectionCount >= 0 && _associatedConnectionCount < _maxConnections, - $"Expected 0 <= {_associatedConnectionCount} < {_maxConnections}"); - _associatedConnectionCount++; + CheckForHttp11ConnectionInjection(); + } } - internal void IncrementConnectionCount() + private void HandleHttp2ConnectionFailure(Exception e) { + if (NetEventSource.Log.IsEnabled()) Trace("HTTP2 connection failed"); + lock (SyncObj) { - IncrementConnectionCountNoLock(); - } - } + Debug.Assert(_associatedHttp2ConnectionCount > 0); + Debug.Assert(_pendingHttp2Connection); - private bool TransferConnection(HttpConnection? connection) - { - Debug.Assert(Monitor.IsEntered(SyncObj)); + _associatedHttp2ConnectionCount--; + _pendingHttp2Connection = false; - if (_waiters == null) - { - return false; - } + // Fail the next queued request (if any) with this error. + _http2RequestQueue.TryFailNextRequest(e); - Debug.Assert(_maxConnections != int.MaxValue, "_waiters queue is allocated but no connection limit is set??"); + CheckForHttp2ConnectionInjection(); + } + } - while (_waiters.TryDequeue(out TaskCompletionSourceWithCancellation? waiter)) + /// + /// Called when an HttpConnection from this pool is no longer usable. + /// Note, this is always called from HttpConnection.Dispose, which is a bit different than how HTTP2 works. + /// + public void InvalidateHttp11Connection(HttpConnection connection, bool disposing = true) + { + lock (SyncObj) { - Debug.Assert(_idleConnections.Count == 0, $"With {_idleConnections.Count} idle connections, we shouldn't have a waiter."); + Debug.Assert(_associatedHttp11ConnectionCount > 0); + Debug.Assert(!disposing || !_availableHttp11Connections.Contains(connection)); - // Try to complete the task. If it's been cancelled already, this will fail. - if (waiter.TrySetResult(connection)) - { - return true; - } + _associatedHttp11ConnectionCount--; - // Couldn't transfer to that waiter because it was cancelled. Try again. - Debug.Assert(waiter.Task.IsCanceled); + CheckForHttp11ConnectionInjection(); } - - return false; } /// - /// Decrements the number of connections associated with the pool. - /// If there are waiters on the pool due to having reached the maximum, - /// this will instead try to transfer the count to one of them. + /// Called when an Http2Connection from this pool is no longer usable. /// - public void DecrementConnectionCount() + public void InvalidateHttp2Connection(Http2Connection connection) { - if (NetEventSource.Log.IsEnabled()) Trace(null); + if (NetEventSource.Log.IsEnabled()) connection.Trace(""); + + bool found = false; lock (SyncObj) { - Debug.Assert(_associatedConnectionCount > 0 && _associatedConnectionCount <= _maxConnections, - $"Expected 0 < {_associatedConnectionCount} <= {_maxConnections}"); - - // Mark the pool as not being stale. - _usedSinceLastCleanup = true; - - if (TransferConnection(null)) + if (_availableHttp2Connections is not null) { - if (NetEventSource.Log.IsEnabled()) Trace("Transferred connection count to waiter."); - return; + Debug.Assert(_associatedHttp2ConnectionCount >= _availableHttp2Connections.Count); + + int index = _availableHttp2Connections.IndexOf(connection); + if (index != -1) + { + found = true; + _availableHttp2Connections.RemoveAt(index); + _associatedHttp2ConnectionCount--; + } } - // There are no waiters to which the count should logically be transferred, - // so simply decrement the count. - _associatedConnectionCount--; + CheckForHttp2ConnectionInjection(); + } + + // If we found the connection in the available list, then dispose it now. + // Otherwise, when we try to put it back in the pool, we will see it is shut down and dispose it (and adjust connection counts). + if (found) + { + connection.Dispose(); + } + } + + private bool CheckExpirationOnReturn(HttpConnectionBase connection) + { + TimeSpan lifetime = _poolManager.Settings._pooledConnectionLifetime; + if (lifetime != Timeout.InfiniteTimeSpan) + { + return lifetime == TimeSpan.Zero || connection.GetLifetimeTicks(Environment.TickCount64) > lifetime.TotalMilliseconds; } + + return false; } - /// Returns the connection to the pool for subsequent reuse. - /// The connection to return. - public void ReturnConnection(HttpConnection connection) + public void ReturnHttp11Connection(HttpConnection connection, bool isNewConnection = false) { - if (connection.LifetimeExpired(Environment.TickCount64, _poolManager.Settings._pooledConnectionLifetime)) + if (NetEventSource.Log.IsEnabled()) connection.Trace($"{nameof(isNewConnection)}={isNewConnection}"); + + if (!isNewConnection && CheckExpirationOnReturn(connection)) { - if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing connection return to pool. Connection lifetime expired."); + if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing HTTP/1.1 connection return to pool. Connection lifetime expired."); connection.Dispose(); return; } - List list = _idleConnections; lock (SyncObj) { - Debug.Assert(list.Count <= _maxConnections, $"Expected {list.Count} <= {_maxConnections}"); - - // Mark the pool as still being active. - _usedSinceLastCleanup = true; + Debug.Assert(!_availableHttp11Connections.Contains(connection)); - // If there's someone waiting for a connection and this one's still valid, simply transfer this one to them rather than pooling it. - // Note that while we checked connection lifetime above, we don't check idle timeout, as even if idle timeout - // is zero, we consider a connection that's just handed from one use to another to never actually be idle. - if (TransferConnection(connection)) + if (isNewConnection) { - if (NetEventSource.Log.IsEnabled()) connection.Trace("Transferred connection to waiter."); - return; + Debug.Assert(_pendingHttp11ConnectionCount > 0); + _pendingHttp11ConnectionCount--; } - if (_poolManager.Settings._pooledConnectionIdleTimeout == TimeSpan.Zero) + if (_http11RequestQueue.TryDequeueNextRequest(connection)) { - if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing connection returned to pool. Zero idle timeout."); + Debug.Assert(_availableHttp11Connections.Count == 0, $"With {_availableHttp11Connections.Count} available HTTP/1.1 connections, we shouldn't have a waiter."); + + if (NetEventSource.Log.IsEnabled()) connection.Trace("Dequeued waiting HTTP/1.1 request."); + return; } - else if (_disposed) + + if (_disposed) { // If the pool has been disposed of, dispose the connection being returned, // as the pool is being deactivated. We do this after the above in order to @@ -1637,55 +1748,150 @@ public void ReturnConnection(HttpConnection connection) } else { - // Pool the connection by adding it to the list. - list.Add(new CachedConnection(connection)); - if (NetEventSource.Log.IsEnabled()) connection.Trace("Stored connection in pool."); + // Add connection to the pool. + _availableHttp11Connections.Add(connection); + Debug.Assert(_availableHttp11Connections.Count <= _maxHttp11Connections, $"Expected {_availableHttp11Connections.Count} <= {_maxHttp11Connections}"); + if (NetEventSource.Log.IsEnabled()) connection.Trace("Put connection in pool."); return; } } + // We determined that the connection is no longer usable. connection.Dispose(); } - public void InvalidateHttp2Connection(Http2Connection connection) + public void ReturnHttp2Connection(Http2Connection connection, bool isNewConnection) { + if (NetEventSource.Log.IsEnabled()) connection.Trace($"{nameof(isNewConnection)}={isNewConnection}"); + + if (!isNewConnection && CheckExpirationOnReturn(connection)) + { + lock (SyncObj) + { + Debug.Assert(_availableHttp2Connections is null || !_availableHttp2Connections.Contains(connection)); + Debug.Assert(_associatedHttp2ConnectionCount > (_availableHttp2Connections?.Count ?? 0)); + _associatedHttp2ConnectionCount--; + } + + if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing HTTP/2 connection return to pool. Connection lifetime expired."); + connection.Dispose(); + return; + } + + bool usable = true; + bool poolDisposed = false; lock (SyncObj) { - Http2Connection[]? localHttp2Connections = _http2Connections; + Debug.Assert(_availableHttp2Connections is null || !_availableHttp2Connections.Contains(connection)); + Debug.Assert(_associatedHttp2ConnectionCount > (_availableHttp2Connections?.Count ?? 0)); - if (localHttp2Connections == null) + if (isNewConnection) { - return; + Debug.Assert(_pendingHttp2Connection); + _pendingHttp2Connection = false; } - if (localHttp2Connections.Length == 1) + while (!_http2RequestQueue.IsEmpty) { - // Fast shortcut for the most common case. - if (localHttp2Connections[0] == connection) + Debug.Assert((_availableHttp2Connections?.Count ?? 0) == 0, $"With {_availableHttp11Connections.Count} available HTTP2 connections, we shouldn't have a waiter."); + + if (!connection.TryReserveStream()) { - _http2Connections = null; + usable = false; + if (isNewConnection) + { + // The new connection could not handle even one request, either because it shut down before we could use it for any requests, + // or because it immediately set the max concurrent streams limit to 0. + // We don't want to get stuck in a loop where we keep trying to create new connections for the same request. + // Fail the next request, if any. + HttpRequestException hre = new HttpRequestException(SR.net_http_http2_connection_not_established); + ExceptionDispatchInfo.SetCurrentStackTrace(hre); + _http2RequestQueue.TryFailNextRequest(hre); + } + break; } - return; - } - int invalidatedIndex = Array.IndexOf(localHttp2Connections, connection); - if (invalidatedIndex >= 0) - { - Http2Connection[] newHttp2Connections = new Http2Connection[localHttp2Connections.Length - 1]; + isNewConnection = false; - if (invalidatedIndex > 0) + if (!_http2RequestQueue.TryDequeueNextRequest(connection)) { - Array.Copy(localHttp2Connections, newHttp2Connections, invalidatedIndex); + connection.ReleaseStream(); + break; } - if (invalidatedIndex < localHttp2Connections.Length - 1) + if (NetEventSource.Log.IsEnabled()) connection.Trace("Dequeued waiting HTTP/2 request."); + } + + if (_disposed) + { + // If the pool has been disposed of, we want to dispose the connection being returned, as the pool is being deactivated. + // We do this after the above in order to satisfy any requests that were queued before the pool was disposed of. + Debug.Assert(_associatedHttp2ConnectionCount > (_availableHttp2Connections?.Count ?? 0)); + _associatedHttp2ConnectionCount--; + poolDisposed = true; + } + else if (usable) + { + if (_availableHttp2Connections is null) { - Array.Copy(localHttp2Connections, invalidatedIndex + 1, newHttp2Connections, invalidatedIndex, newHttp2Connections.Length - invalidatedIndex); + _availableHttp2Connections = new List(); } - _http2Connections = newHttp2Connections; + // Add connection to the pool. + _availableHttp2Connections.Add(connection); + if (NetEventSource.Log.IsEnabled()) connection.Trace("Put HTTP/2 connection in pool."); + return; } } + + if (poolDisposed) + { + if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing HTTP/2 connection returned to pool. Pool was disposed."); + connection.Dispose(); + return; + } + + Debug.Assert(!usable); + + // We need to wait until the connection is usable again. + DisableHttp2Connection(connection); + } + + /// + /// Disable usage of the specified connection because it cannot handle any more streams at the moment. + /// We will register to be notified when it can handle more streams (or becomes permanently unusable). + /// + private void DisableHttp2Connection(Http2Connection connection) + { + if (NetEventSource.Log.IsEnabled()) connection.Trace(""); + + Task.Run(async () => + { + bool usable = await connection.WaitForAvailableStreamsAsync().ConfigureAwait(false); + + if (NetEventSource.Log.IsEnabled()) connection.Trace($"WaitForAvailableStreamsAsync completed, {nameof(usable)}={usable}"); + + if (usable) + { + ReturnHttp2Connection(connection, isNewConnection: false); + } + else + { + // Connection has shut down. + lock (SyncObj) + { + Debug.Assert(_availableHttp2Connections is null || !_availableHttp2Connections.Contains(connection)); + Debug.Assert(_associatedHttp2ConnectionCount > 0); + + _associatedHttp2ConnectionCount--; + + CheckForHttp2ConnectionInjection(); + } + + if (NetEventSource.Log.IsEnabled()) connection.Trace("HTTP2 connection no longer usable"); + connection.Dispose(); + } + }); } public void InvalidateHttp3Connection(Http3Connection connection) @@ -1705,25 +1911,34 @@ public void InvalidateHttp3Connection(Http3Connection connection) /// public void Dispose() { - List list = _idleConnections; + List? toDispose = null; + lock (SyncObj) { if (!_disposed) { if (NetEventSource.Log.IsEnabled()) Trace("Disposing pool."); + _disposed = true; - list.ForEach(c => c._connection.Dispose()); - list.Clear(); - if (_http2Connections != null) + toDispose = new List(_availableHttp11Connections.Count + (_availableHttp2Connections?.Count ?? 0)); + toDispose.AddRange(_availableHttp11Connections); + if (_availableHttp2Connections is not null) { - for (int i = 0; i < _http2Connections.Length; i++) - { - _http2Connections[i].Dispose(); - } - _http2Connections = null; + toDispose.AddRange(_availableHttp2Connections); } + // Note: Http11 connections will decrement the _associatedHttp11ConnectionCount when disposed. + // Http2 connections will not, hence the difference in handing _associatedHttp2ConnectionCount. + + Debug.Assert(_associatedHttp11ConnectionCount >= _availableHttp11Connections.Count, + $"Expected {nameof(_associatedHttp11ConnectionCount)}={_associatedHttp11ConnectionCount} >= {nameof(_availableHttp11Connections)}.Count={_availableHttp11Connections.Count}"); + _availableHttp11Connections.Clear(); + + Debug.Assert(_associatedHttp2ConnectionCount >= (_availableHttp2Connections?.Count ?? 0)); + _associatedHttp2ConnectionCount -= (_availableHttp2Connections?.Count ?? 0); + _availableHttp2Connections?.Clear(); + if (_authorityExpireTimer != null) { _authorityExpireTimer.Dispose(); @@ -1737,8 +1952,13 @@ public void Dispose() _altSvcBlocklistTimerCancellation = null; } } - Debug.Assert(list.Count == 0, $"Expected {nameof(list)}.{nameof(list.Count)} == 0"); + + Debug.Assert(_availableHttp11Connections.Count == 0, $"Expected {nameof(_availableHttp11Connections)}.{nameof(_availableHttp11Connections.Count)} == 0"); + Debug.Assert((_availableHttp2Connections?.Count ?? 0) == 0, $"Expected {nameof(_availableHttp2Connections)}.{nameof(_availableHttp2Connections.Count)} == 0"); } + + // Dispose outside the lock to avoid lock re-entrancy issues. + toDispose?.ForEach(c => c.Dispose()); } /// @@ -1753,79 +1973,60 @@ public bool CleanCacheAndDisposeIfUnused() TimeSpan pooledConnectionLifetime = _poolManager.Settings._pooledConnectionLifetime; TimeSpan pooledConnectionIdleTimeout = _poolManager.Settings._pooledConnectionIdleTimeout; - List list = _idleConnections; - List? toDispose = null; - bool tookLock = false; + List? toDispose = null; - try + lock (SyncObj) { - if (NetEventSource.Log.IsEnabled()) Trace("Cleaning pool."); - Monitor.Enter(SyncObj, ref tookLock); + // If there are now no connections associated with this pool, we can dispose of it. We + // avoid aggressively cleaning up pools that have recently been used but currently aren't; + // if a pool was used since the last time we cleaned up, give it another chance. New pools + // start out saying they've recently been used, to give them a bit of breathing room and time + // for the initial collection to be added to it. + if (!_usedSinceLastCleanup && _associatedHttp11ConnectionCount == 0 && _associatedHttp2ConnectionCount == 0) + { + _disposed = true; + return true; // Pool is disposed of. It should be removed. + } + + // Reset the cleanup flag. Any pools that are empty and not used since the last cleanup + // will be purged next time around. + _usedSinceLastCleanup = false; - // Get the current time. This is compared against each connection's last returned - // time to determine whether a connection is too old and should be closed. long nowTicks = Environment.TickCount64; - // Copy the reference to a local variable to simplify the removal logic below. - Http2Connection[]? localHttp2Connections = _http2Connections; - if (localHttp2Connections != null) + ScavengeConnectionList(_availableHttp11Connections, ref toDispose, nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout); + if (_availableHttp2Connections is not null) { - Http2Connection[]? newHttp2Connections = null; - int newIndex = 0; - for (int i = 0; i < localHttp2Connections.Length; i++) - { - Http2Connection http2Connection = localHttp2Connections[i]; - if (http2Connection.IsExpired(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout)) - { - http2Connection.Dispose(); + int removed = ScavengeConnectionList(_availableHttp2Connections, ref toDispose, nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout); + _associatedHttp2ConnectionCount -= removed; - if (newHttp2Connections == null) - { - newHttp2Connections = new Http2Connection[localHttp2Connections.Length]; - if (i > 0) - { - // Copy valid connections residing at the beggining of the current collection. - Array.Copy(localHttp2Connections, newHttp2Connections, i); - newIndex = i; - } - } - } - else if (newHttp2Connections != null) - { - newHttp2Connections[newIndex] = localHttp2Connections[i]; - newIndex++; - } - } - - if (newHttp2Connections != null) - { - //Some connections have been removed, so _http2Connections must be replaced. - if (newIndex > 0) - { - Array.Resize(ref newHttp2Connections, newIndex); - _http2Connections = newHttp2Connections; - } - else - { - // All connections expired. - _http2Connections = null; - } - } + // Note: Http11 connections will decrement the _associatedHttp11ConnectionCount when disposed. + // Http2 connections will not, hence the difference in handing _associatedHttp2ConnectionCount. } + } + + // Dispose the stale connections outside the pool lock, to avoid holding the lock too long. + toDispose?.ForEach(c => c.Dispose()); + + // Pool is active. Should not be removed. + return false; - // Find the first item which needs to be removed. + static int ScavengeConnectionList(List list, ref List? toDispose, long nowTicks, TimeSpan pooledConnectionLifetime, TimeSpan pooledConnectionIdleTimeout) + where T : HttpConnectionBase + { int freeIndex = 0; - while (freeIndex < list.Count && list[freeIndex].IsUsable(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout)) + while (freeIndex < list.Count && IsUsableConnection(list[freeIndex], nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout)) { freeIndex++; } // If freeIndex == list.Count, nothing needs to be removed. // But if it's < list.Count, at least one connection needs to be purged. + int removed = 0; if (freeIndex < list.Count) { // We know the connection at freeIndex is unusable, so dispose of it. - toDispose = new List { list[freeIndex]._connection }; + toDispose ??= new List { list[freeIndex] }; // Find the first item after the one to be removed that should be kept. int current = freeIndex + 1; @@ -1833,9 +2034,9 @@ public bool CleanCacheAndDisposeIfUnused() { // Look for the first item to be kept. Along the way, any // that shouldn't be kept are disposed of. - while (current < list.Count && !list[current].IsUsable(nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout)) + while (current < list.Count && !IsUsableConnection(list[current], nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout)) { - toDispose.Add(list[current]._connection); + toDispose.Add(list[current]); current++; } @@ -1851,38 +2052,45 @@ public bool CleanCacheAndDisposeIfUnused() // At this point, good connections have been moved below freeIndex, and garbage connections have // been added to the dispose list, so clear the end of the list past freeIndex. - list.RemoveRange(freeIndex, list.Count - freeIndex); - - // If there are now no connections associated with this pool, we can dispose of it. We - // avoid aggressively cleaning up pools that have recently been used but currently aren't; - // if a pool was used since the last time we cleaned up, give it another chance. New pools - // start out saying they've recently been used, to give them a bit of breathing room and time - // for the initial collection to be added to it. - if (_associatedConnectionCount == 0 && !_usedSinceLastCleanup && _http2Connections == null) - { - Debug.Assert(list.Count == 0, $"Expected {nameof(list)}.{nameof(list.Count)} == 0"); - _disposed = true; - return true; // Pool is disposed of. It should be removed. - } + removed = list.Count - freeIndex; + list.RemoveRange(freeIndex, removed); } - // Reset the cleanup flag. Any pools that are empty and not used since the last cleanup - // will be purged next time around. - _usedSinceLastCleanup = false; + return removed; } - finally + + static bool IsUsableConnection(HttpConnectionBase connection, long nowTicks, TimeSpan pooledConnectionLifetime, TimeSpan pooledConnectionIdleTimeout) { - if (tookLock) + // Validate that the connection hasn't been idle in the pool for longer than is allowed. + if (pooledConnectionIdleTimeout != Timeout.InfiniteTimeSpan) { - Monitor.Exit(SyncObj); + long idleTicks = connection.GetIdleTicks(nowTicks); + if (idleTicks > pooledConnectionIdleTimeout.TotalMilliseconds) + { + if (NetEventSource.Log.IsEnabled()) connection.Trace($"Scavenging connection. Idle {TimeSpan.FromMilliseconds(idleTicks)} > {pooledConnectionIdleTimeout}."); + return false; + } } - // Dispose the stale connections outside the pool lock. - toDispose?.ForEach(c => c.Dispose()); - } + // Validate that the connection lifetime has not been exceeded. + if (pooledConnectionLifetime != Timeout.InfiniteTimeSpan) + { + long lifetimeTicks = connection.GetLifetimeTicks(nowTicks); + if (lifetimeTicks > pooledConnectionLifetime.TotalMilliseconds) + { + if (NetEventSource.Log.IsEnabled()) connection.Trace($"Scavenging connection. Lifetime {TimeSpan.FromMilliseconds(lifetimeTicks)} > {pooledConnectionLifetime}."); + return false; + } + } - // Pool is active. Should not be removed. - return false; + if (!connection.CheckUsabilityOnScavenge()) + { + if (NetEventSource.Log.IsEnabled()) connection.Trace($"Scavenging connection. Unexpected data or EOF received."); + return false; + } + + return true; + } } /// Gets whether we're running on Windows 7 or Windows 2008 R2. @@ -1900,8 +2108,13 @@ private static bool GetIsWindows7Or2008R2() internal void HeartBeat() { - Http2Connection[]? localHttp2Connections = _http2Connections; - if (localHttp2Connections != null) + Http2Connection[]? localHttp2Connections; + lock (SyncObj) + { + localHttp2Connections = _availableHttp2Connections?.ToArray(); + } + + if (localHttp2Connections is not null) { foreach (Http2Connection http2Connection in localHttp2Connections) { @@ -1910,7 +2123,6 @@ internal void HeartBeat() } } - // For diagnostic purposes public override string ToString() => $"{nameof(HttpConnectionPool)} " + @@ -1930,67 +2142,89 @@ private void Trace(string? message, [CallerMemberName] string? memberName = null memberName, // method name message); // message - /// A cached idle connection and metadata about it. - [StructLayout(LayoutKind.Auto)] - private readonly struct CachedConnection : IEquatable + private struct RequestQueue { - /// The cached connection. - internal readonly HttpConnection _connection; - /// The last tick count at which the connection was used. - internal readonly long _returnedTickCount; - - /// Initializes the cached connection and its associated metadata. - /// The connection. - public CachedConnection(HttpConnection connection) - { - Debug.Assert(connection != null); - _connection = connection; - _returnedTickCount = Environment.TickCount64; - } - - /// Gets whether the connection is currently usable. - /// The current tick count. Passed in to amortize the cost of calling Environment.TickCount. - /// How long a connection can be open to be considered reusable. - /// How long a connection can have been idle in the pool to be considered reusable. - /// - /// true if we believe the connection can be reused; otherwise, false. There is an inherent race condition here, - /// in that the server could terminate the connection or otherwise make it unusable immediately after we check it, - /// but there's not much difference between that and starting to use the connection and then having the server - /// terminate it, which would be considered a failure, so this race condition is largely benign and inherent to - /// the nature of connection pooling. - /// - public bool IsUsable( - long nowTicks, - TimeSpan pooledConnectionLifetime, - TimeSpan pooledConnectionIdleTimeout) + private struct QueueItem { - // Validate that the connection hasn't been idle in the pool for longer than is allowed. - if ((pooledConnectionIdleTimeout != Timeout.InfiniteTimeSpan) && - ((nowTicks - _returnedTickCount) > pooledConnectionIdleTimeout.TotalMilliseconds)) + public HttpRequestMessage Request; + public TaskCompletionSourceWithCancellation Waiter; + } + + private Queue? _queue; + + public TaskCompletionSourceWithCancellation EnqueueRequest(HttpRequestMessage request) + { + if (_queue is null) { - if (NetEventSource.Log.IsEnabled()) _connection.Trace($"Scavenging connection. Idle {TimeSpan.FromMilliseconds((nowTicks - _returnedTickCount))} > {pooledConnectionIdleTimeout}."); - return false; + _queue = new Queue(); } - // Validate that the connection lifetime has not been exceeded. - if (_connection.LifetimeExpired(nowTicks, pooledConnectionLifetime)) + TaskCompletionSourceWithCancellation waiter = new TaskCompletionSourceWithCancellation(); + _queue.Enqueue(new QueueItem { Request = request, Waiter = waiter }); + return waiter; + } + + public bool TryFailNextRequest(Exception e) + { + Debug.Assert(e is HttpRequestException or OperationCanceledException, "Unexpected exception type for connection failure"); + + if (_queue is not null) { - if (NetEventSource.Log.IsEnabled()) _connection.Trace($"Scavenging connection. Lifetime {TimeSpan.FromMilliseconds((nowTicks - _connection.CreationTickCount))} > {pooledConnectionLifetime}."); - return false; + // Fail the next queued request (if any) with this error. + while (_queue.TryDequeue(out QueueItem item)) + { + // Try to complete the waiter task. If it's been cancelled already, this will fail. + if (item.Waiter.TrySetException(e)) + { + return true; + } + + // Couldn't transfer to that waiter because it was cancelled. Try again. + Debug.Assert(item.Waiter.Task.IsCanceled); + } } - if (!_connection.CheckUsabilityOnScavenge()) + return false; + } + + public bool TryDequeueNextRequest(T connection) + { + if (_queue is not null) { - if (NetEventSource.Log.IsEnabled()) _connection.Trace($"Scavenging connection. Unexpected data or EOF received."); - return false; + while (_queue.TryDequeue(out QueueItem item)) + { + // Try to complete the task. If it's been cancelled already, this will return false. + if (item.Waiter.TrySetResult(connection)) + { + return true; + } + + // Couldn't transfer to that waiter because it was cancelled. Try again. + Debug.Assert(item.Waiter.Task.IsCanceled); + } } - return true; + return false; + } + + public bool TryPeekNextRequest([NotNullWhen(true)] out HttpRequestMessage? request) + { + if (_queue is not null) + { + if (_queue.TryPeek(out QueueItem item)) + { + request = item.Request; + return true; + } + } + + request = null; + return false; } - public bool Equals(CachedConnection other) => ReferenceEquals(other._connection, _connection); - public override bool Equals([NotNullWhen(true)] object? obj) => obj is CachedConnection && Equals((CachedConnection)obj); - public override int GetHashCode() => _connection?.GetHashCode() ?? 0; + public bool IsEmpty => Count == 0; + + public int Count => (_queue?.Count ?? 0); } } } diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http2.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http2.cs index 39e125abe1ce1..c9c5db60d0f00 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http2.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/HttpClientHandlerTest.Http2.cs @@ -129,7 +129,7 @@ public async Task Http2_DataSentBeforeServerPreface_ProtocolError() DataFrame invalidFrame = new DataFrame(new byte[10], FrameFlags.Padded, 10, 1); await connection.WriteFrameAsync(invalidFrame); - await AssertProtocolErrorAsync(sendTask, ProtocolErrors.PROTOCOL_ERROR); + await Assert.ThrowsAsync(() => sendTask); } } @@ -244,10 +244,10 @@ public async Task Http2_ServerSendsValidSettingsValues_Success() } [ConditionalTheory(nameof(SupportsAlpn))] - [InlineData(SettingId.MaxFrameSize, 16383, ProtocolErrors.PROTOCOL_ERROR)] - [InlineData(SettingId.MaxFrameSize, 162777216, ProtocolErrors.PROTOCOL_ERROR)] - [InlineData(SettingId.InitialWindowSize, 0x80000000, ProtocolErrors.FLOW_CONTROL_ERROR)] - public async Task Http2_ServerSendsInvalidSettingsValue_Error(SettingId settingId, uint value, ProtocolErrors expectedError) + [InlineData(SettingId.MaxFrameSize, 16383)] + [InlineData(SettingId.MaxFrameSize, 162777216)] + [InlineData(SettingId.InitialWindowSize, 0x80000000)] + public async Task Http2_ServerSendsInvalidSettingsValue_Error(SettingId settingId, uint value) { using (Http2LoopbackServer server = Http2LoopbackServer.CreateServer()) using (HttpClient client = CreateHttpClient()) @@ -257,7 +257,7 @@ public async Task Http2_ServerSendsInvalidSettingsValue_Error(SettingId settingI // Send invalid initial SETTINGS value Http2LoopbackConnection connection = await server.EstablishConnectionAsync(new SettingsEntry { SettingId = settingId, Value = value }); - await AssertProtocolErrorAsync(sendTask, expectedError); + await Assert.ThrowsAsync(() => sendTask); connection.Dispose(); } @@ -3383,16 +3383,15 @@ await server.AcceptConnectionAsync(async connection => options.ServerCertificate = Net.Test.Common.Configuration.Certificates.GetServerCertificate(); options.ApplicationProtocols = new List() { SslApplicationProtocol.Http2 }; options.ApplicationProtocols.Add(SslApplicationProtocol.Http2); + // Negotiate TLS. await sslStream.AuthenticateAsServerAsync(options, CancellationToken.None).ConfigureAwait(false); + // Send back HTTP/1.1 response await sslStream.WriteAsync(Encoding.ASCII.GetBytes("HTTP/1.1 400 Unrecognized request\r\n\r\n"), CancellationToken.None); }); - Exception e = await Assert.ThrowsAsync(() => requestTask); - Assert.NotNull(e.InnerException); - Assert.False(e.InnerException is ObjectDisposedException); }); } } diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.cs index 47716f3c2c987..3f7f506aaf355 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/SocketsHttpHandlerTest.Cancellation.cs @@ -18,10 +18,11 @@ protected SocketsHttpHandler_Cancellation_Test(ITestOutputHelper output) : base( private async Task ValidateConnectTimeout(HttpMessageInvoker invoker, Uri uri, int minElapsed, int maxElapsed) { var sw = Stopwatch.StartNew(); - await Assert.ThrowsAnyAsync(() => + var oce = await Assert.ThrowsAnyAsync(() => invoker.SendAsync(TestAsync, new HttpRequestMessage(HttpMethod.Get, uri) { Version = UseVersion }, default)); sw.Stop(); + Assert.IsType(oce.InnerException); Assert.InRange(sw.ElapsedMilliseconds, minElapsed, maxElapsed); } @@ -51,8 +52,10 @@ await LoopbackServerFactory.CreateClientAndServerAsync(async uri => } [OuterLoop] - [Fact] - public async Task ConnectTimeout_ConnectCallbackTimesOut_Throws() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task ConnectTimeout_ConnectCallbackTimesOut_Throws(bool useSsl) { if (UseVersion == HttpVersion.Version30) { @@ -71,7 +74,49 @@ await LoopbackServerFactory.CreateClientAndServerAsync(async uri => await ValidateConnectTimeout(invoker, uri, 500, 85_000); } - }, server => Task.CompletedTask); // doesn't actually connect to server + }, server => Task.CompletedTask, // doesn't actually connect to server + options: new GenericLoopbackOptions() { UseSsl = useSsl }); + } + + [OuterLoop] + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task ConnectTimeout_PlaintextStreamFilterTimesOut_Throws(bool useSsl) + { + if (UseVersion == HttpVersion.Version30) + { + // HTTP3 does not support PlaintextStreamFilter + return; + } + + var releaseServer = new TaskCompletionSource(); + await LoopbackServerFactory.CreateClientAndServerAsync(async uri => + { + using (var handler = CreateHttpClientHandler()) + using (var invoker = new HttpMessageInvoker(handler)) + { + handler.ServerCertificateCustomValidationCallback = TestHelper.AllowAllCertificates; + var socketsHandler = GetUnderlyingSocketsHttpHandler(handler); + socketsHandler.ConnectTimeout = TimeSpan.FromSeconds(1); + socketsHandler.PlaintextStreamFilter = async (context, token) => { await Task.Delay(-1, token); return null; }; + + await ValidateConnectTimeout(invoker, uri, 500, 85_000); + + releaseServer.SetResult(); + } + }, + async server => + { + try + { + await server.AcceptConnectionAsync(async c => + { + await releaseServer.Task; + }); + } + catch (Exception) { } // Eat exception from client timing out + }, options: new GenericLoopbackOptions() { UseSsl = useSsl }); } [OuterLoop("Incurs significant delay")] diff --git a/src/libraries/System.Net.Http/tests/FunctionalTests/TelemetryTest.cs b/src/libraries/System.Net.Http/tests/FunctionalTests/TelemetryTest.cs index dc4f2ef45ab08..660a93df25be0 100644 --- a/src/libraries/System.Net.Http/tests/FunctionalTests/TelemetryTest.cs +++ b/src/libraries/System.Net.Http/tests/FunctionalTests/TelemetryTest.cs @@ -552,10 +552,6 @@ private static void ValidateEventCounters(ConcurrentQueue<(EventWrittenEventArgs Assert.Contains(http11requestQueueDurations, d => d > 0); Assert.All(http11requestQueueDurations, d => Assert.True(d >= 0)); } - else - { - Assert.All(http11requestQueueDurations, d => Assert.True(d == 0)); - } Assert.True(eventCounters.TryGetValue("http20-requests-queue-duration", out double[] http20requestQueueDurations)); Assert.Equal(0, http20requestQueueDurations[^1]); @@ -564,10 +560,6 @@ private static void ValidateEventCounters(ConcurrentQueue<(EventWrittenEventArgs Assert.Contains(http20requestQueueDurations, d => d > 0); Assert.All(http20requestQueueDurations, d => Assert.True(d >= 0)); } - else - { - Assert.All(http20requestQueueDurations, d => Assert.True(d == 0)); - } } [OuterLoop] @@ -655,12 +647,19 @@ await GetFactoryForVersion(version).CreateClientAndServerAsync( ValidateConnectionEstablishedClosed(events, version); - (EventWrittenEventArgs requestLeftQueue, Guid requestLeftQueueId) = Assert.Single(events, e => e.Event.EventName == "RequestLeftQueue"); - Assert.Equal(3, requestLeftQueue.Payload.Count); - Assert.True((double)requestLeftQueue.Payload.Count > 0); // timeSpentOnQueue - Assert.Equal(version.Major, (byte)requestLeftQueue.Payload[1]); - Assert.Equal(version.Minor, (byte)requestLeftQueue.Payload[2]); + var requestLeftEvents = events.Where(e => e.Event.EventName == "RequestLeftQueue"); + Assert.Equal(2, requestLeftEvents.Count()); + + foreach (var (e, _) in requestLeftEvents) + { + Assert.Equal(3, e.Payload.Count); + Assert.True((double)e.Payload[0] > 0); // timeSpentOnQueue + Assert.Equal(version.Major, (byte)e.Payload[1]); + Assert.Equal(version.Minor, (byte)e.Payload[2]); + + } + Guid requestLeftQueueId = requestLeftEvents.Last().ActivityId; Assert.Equal(requestLeftQueueId, events.Where(e => e.Event.EventName == "RequestStart").Last().ActivityId); ValidateRequestResponseStartStopEvents(events, requestContentLength: null, responseContentLength: 0, count: 3);