From 78b5f40a60d9e095abb2b0aabd8c062b171fb9ab Mon Sep 17 00:00:00 2001 From: Miha Zupan Date: Fri, 22 Mar 2024 20:06:47 +0100 Subject: [PATCH] Improve the throughput of SocketsHttpHandler's HTTP/1.1 connection pool (#99364) * Improve the throughput of SocketsHttpHandler's HTTP/1.1 connection pool * More comments * Switch to ConcurrentStack --- .../SocketsHttpHandler/HttpConnectionPool.cs | 469 ++++++++++++------ 1 file changed, 308 insertions(+), 161 deletions(-) diff --git a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs index 11e220b9a9e8b..552661e68bed5 100644 --- a/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs +++ b/src/libraries/System.Net.Http/src/System/Net/Http/SocketsHttpHandler/HttpConnectionPool.cs @@ -1,6 +1,8 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Buffers; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; @@ -8,7 +10,6 @@ using System.IO; using System.Net.Http.Headers; using System.Net.Http.HPack; -using System.Net.Http.Metrics; using System.Net.Http.QPack; using System.Net.Quic; using System.Net.Security; @@ -69,8 +70,10 @@ internal sealed class HttpConnectionPool : IDisposable // HTTP/1.1 connection pool - /// List of available HTTP/1.1 connections stored in the pool. - private readonly List _availableHttp11Connections = new List(); + /// Stack of currently available HTTP/1.1 connections stored in the pool. + private readonly ConcurrentStack _http11Connections = new(); + /// Controls whether we can use a fast path when returning connections to the pool and skip calling into . + private bool _http11RequestQueueIsEmptyAndNotDisposed; /// The maximum number of HTTP/1.1 connections allowed to be associated with the pool. private readonly int _maxHttp11Connections; /// The number of HTTP/1.1 connections associated with the pool, including in use, available, and pending. @@ -401,12 +404,12 @@ private object SyncObj { get { - Debug.Assert(!Monitor.IsEntered(_availableHttp11Connections)); - return _availableHttp11Connections; + Debug.Assert(!Monitor.IsEntered(_http11Connections)); + return _http11Connections; } } - public bool HasSyncObjLock => Monitor.IsEntered(_availableHttp11Connections); + public bool HasSyncObjLock => Monitor.IsEntered(_http11Connections); // Overview of connection management (mostly HTTP version independent): // @@ -516,7 +519,7 @@ private async Task AddHttp11ConnectionAsync(RequestQueue.QueueIt if (connection is not null) { // Add the established connection to the pool. - ReturnHttp11Connection(connection, isNewConnection: true, queueItem.Waiter); + AddNewHttp11Connection(connection, queueItem.Waiter); } else { @@ -532,14 +535,14 @@ private void CheckForHttp11ConnectionInjection() _http11RequestQueue.PruneCompletedRequestsFromHeadOfQueue(this); // Determine if we can and should add a new connection to the pool. - bool willInject = _availableHttp11Connections.Count == 0 && // No available connections - _http11RequestQueue.Count > _pendingHttp11ConnectionCount && // More requests queued than pending connections - _associatedHttp11ConnectionCount < _maxHttp11Connections && // Under the connection limit - _http11RequestQueue.RequestsWithoutAConnectionAttempt > 0; // There are requests we haven't issued a connection attempt for + bool willInject = + _http11RequestQueue.Count > _pendingHttp11ConnectionCount && // More requests queued than pending connections + _associatedHttp11ConnectionCount < _maxHttp11Connections && // Under the connection limit + _http11RequestQueue.RequestsWithoutAConnectionAttempt > 0; // There are requests we haven't issued a connection attempt for if (NetEventSource.Log.IsEnabled()) { - Trace($"Available HTTP/1.1 connections: {_availableHttp11Connections.Count}, Requests in the queue: {_http11RequestQueue.Count}, " + + Trace($"Available HTTP/1.1 connections: {_http11Connections.Count}, Requests in the queue: {_http11RequestQueue.Count}, " + $"Requests without a connection attempt: {_http11RequestQueue.RequestsWithoutAConnectionAttempt}, " + $"Pending HTTP/1.1 connections: {_pendingHttp11ConnectionCount}, Total associated HTTP/1.1 connections: {_associatedHttp11ConnectionCount}, " + $"Max HTTP/1.1 connection limit: {_maxHttp11Connections}, " + @@ -556,36 +559,114 @@ private void CheckForHttp11ConnectionInjection() } } - private bool TryGetPooledHttp11Connection(HttpRequestMessage request, bool async, [NotNullWhen(true)] out HttpConnection? connection, [NotNullWhen(false)] out HttpConnectionWaiter? waiter) + /// + /// This method is called: + ///
- When returning a connection and observing that the request queue is not empty ( is ). + ///
- After adding a request to the queue if we fail to obtain a connection from . + ///
- After scavenging or disposing the pool to ensure that any pending requests are handled or connections disposed. + /// The method will attempt to match one request from the to an available connection. + /// The can either be provided as an argument (when returning a connection to the pool), or one will be rented from . + /// As we'll only process a single request, we are expecting the method to be called every time a request is enqueued, and every time a connection is returned while the request queue is not empty. + /// If the becomes empty, this method will reset the flag back to , + /// such that returning connections will use the fast path again and skip calling into this method. + /// Notably, this method will not be called on the fast path as long as we have enough connections to handle all new requests. + ///
+ /// The connection to use for a pending request, or return to the pool. + private void ProcessHttp11RequestQueue(HttpConnection? connection) { + // Loop in case the request we try to signal was already cancelled or handled by a different connection. while (true) { + HttpConnectionWaiter? waiter = null; + lock (SyncObj) { - _usedSinceLastCleanup = true; +#if DEBUG + // Other threads may still interact with the connections stack. Read the count once to keep the assert message accurate. + int connectionCount = _http11Connections.Count; + Debug.Assert(_associatedHttp11ConnectionCount >= connectionCount + _pendingHttp11ConnectionCount, + $"Expected {_associatedHttp11ConnectionCount} >= {connectionCount} + {_pendingHttp11ConnectionCount}"); +#endif + Debug.Assert(_associatedHttp11ConnectionCount <= _maxHttp11Connections, + $"Expected {_associatedHttp11ConnectionCount} <= {_maxHttp11Connections}"); + Debug.Assert(_associatedHttp11ConnectionCount >= _pendingHttp11ConnectionCount, + $"Expected {_associatedHttp11ConnectionCount} >= {_pendingHttp11ConnectionCount}"); - int availableConnectionCount = _availableHttp11Connections.Count; - if (availableConnectionCount > 0) + if (_http11RequestQueue.Count != 0) { - // We have a connection that we can attempt to use. - // Validate it below outside the lock, to avoid doing expensive operations while holding the lock. - connection = _availableHttp11Connections[availableConnectionCount - 1]; - _availableHttp11Connections.RemoveAt(availableConnectionCount - 1); + if (connection is not null || _http11Connections.TryPop(out connection)) + { + // TryDequeueWaiter will prune completed requests from the head of the queue, + // so it's possible for it to return false even though we checked that Count != 0. + bool success = _http11RequestQueue.TryDequeueWaiter(this, out waiter); + Debug.Assert(success == waiter is not null); + } } - else + + // Update the empty queue flag now. + // If the request queue is now empty, returning connections will use the fast path and skip calling into this method. + _http11RequestQueueIsEmptyAndNotDisposed = _http11RequestQueue.Count == 0 && !_disposed; + + if (waiter is null) { - // No available connections. Add to the request queue. - waiter = _http11RequestQueue.EnqueueRequest(request); + // We didn't find a waiter to signal, or there were no connections available. - CheckForHttp11ConnectionInjection(); + if (connection is not null) + { + // A connection was provided to this method, or we rented one from the pool. + // Return it back to the pool since we're not going to use it yet. + + // We're returning it while holding the lock to avoid a scenario where + // - thread A sees no requests are waiting in the queue (current thread) + // - thread B adds a request to the queue, and sees no connections are available + // - thread A returns the connection to the pool + // We'd have both a connection and a request waiting in the pool, but nothing to pair the two. + + // The main scenario where we'll reach this branch is when we enqueue a request to the queue + // and set the _http11RequestQueueIsEmptyAndNotDisposed flag to false, followed by multiple + // returning connections observing the flag and calling into this method before we clear the flag. + // This should be a relatively rare case, so the added contention should be minimal. + _http11Connections.Push(connection); + } + else + { + CheckForHttp11ConnectionInjection(); + } - // There were no available idle connections. This request has been added to the request queue. - if (NetEventSource.Log.IsEnabled()) Trace($"No available HTTP/1.1 connections; request queued."); - connection = null; - return false; + break; } } + Debug.Assert(connection is not null); + + if (TrySignalWaiter(waiter, connection)) + { + // Success. Note that we did not call connection.PrepareForReuse + // before signaling the waiter. This is intentional, as the fact that + // this method was called indicates that the connection is either new, + // or was just returned to the pool and is still in a good state. + return; + } + + // The request was already cancelled or handled by a different connection. + // Loop again to try to find another request to signal, or return the connection. + } + + if (_disposed) + { + // The pool is being disposed and there are no more requests to handle. + // Clean up any idle connections still waiting in the pool. + while (_http11Connections.TryPop(out connection)) + { + connection.Dispose(); + } + } + } + + private bool TryGetPooledHttp11Connection(HttpRequestMessage request, bool async, [NotNullWhen(true)] out HttpConnection? connection, [NotNullWhen(false)] out HttpConnectionWaiter? waiter) + { + while (_http11Connections.TryPop(out connection)) + { if (CheckExpirationOnGet(connection)) { if (NetEventSource.Log.IsEnabled()) connection.Trace("Found expired HTTP/1.1 connection in pool."); @@ -604,6 +685,27 @@ private bool TryGetPooledHttp11Connection(HttpRequestMessage request, bool async waiter = null; return true; } + + // Slow path - no available connection found. + // Push the request onto the request queue and check if we should inject a new connection. + + waiter = new HttpConnectionWaiter(); + + // Technically this block under the lock could be a part of ProcessHttp11RequestQueue to avoid taking the lock twice. + // It is kept separate to simplify that method (avoid extra arguments that are only relevant for this caller). + lock (SyncObj) + { + _http11RequestQueue.EnqueueRequest(request, waiter); + + // Disable the fast path and force connections returned to the pool to check the request queue first. + _http11RequestQueueIsEmptyAndNotDisposed = false; + } + + // Other threads may have added a connection to the pool before we were able to + // add the request to the queue, so we must check for an available connection again. + + ProcessHttp11RequestQueue(null); + return false; } private async Task HandleHttp11Downgrade(HttpRequestMessage request, Stream stream, TransportContext? transportContext, IPEndPoint? remoteEndPoint, CancellationToken cancellationToken) @@ -677,7 +779,7 @@ private async Task HandleHttp11Downgrade(HttpRequestMessage request, Stream stre return; } - ReturnHttp11Connection(http11Connection, isNewConnection: true); + AddNewHttp11Connection(http11Connection, initialRequestWaiter: null); } private async Task AddHttp2ConnectionAsync(RequestQueue.QueueItem queueItem) @@ -772,7 +874,7 @@ private void CheckForHttp2ConnectionInjection() if (NetEventSource.Log.IsEnabled()) { Trace($"Available HTTP/2.0 connections: {availableHttp2ConnectionCount}, " + - $"Pending HTTP/2.0 connection: {_pendingHttp2Connection}" + + $"Pending HTTP/2.0 connection: {_pendingHttp2Connection}, " + $"Requests in the queue: {_http2RequestQueue.Count}, " + $"Requests without a connection attempt: {_http2RequestQueue.RequestsWithoutAConnectionAttempt}, " + $"Total associated HTTP/2.0 connections: {_associatedHttp2ConnectionCount}, " + @@ -798,8 +900,6 @@ private bool TryGetPooledHttp2Connection(HttpRequestMessage request, [NotNullWhe { lock (SyncObj) { - _usedSinceLastCleanup = true; - if (!_http2Enabled) { waiter = null; @@ -888,7 +988,6 @@ private async ValueTask GetHttp3ConnectionAsync(HttpRequestMess { // Connection exists and it is still good to use. if (NetEventSource.Log.IsEnabled()) Trace("Using existing HTTP3 connection."); - _usedSinceLastCleanup = true; return http3Connection; } } @@ -1024,6 +1123,8 @@ private void ProcessAltSvc(HttpResponseMessage response) public async ValueTask SendWithVersionDetectionAndRetryAsync(HttpRequestMessage request, bool async, bool doRequestAuth, CancellationToken cancellationToken) { + _usedSinceLastCleanup = true; + // Loop on connection failures (or other problems like version downgrade) and retry if possible. int retryCount = 0; while (true) @@ -1848,7 +1949,7 @@ public void InvalidateHttp11Connection(HttpConnection connection, bool disposing lock (SyncObj) { Debug.Assert(_associatedHttp11ConnectionCount > 0); - Debug.Assert(!disposing || !_availableHttp11Connections.Contains(connection)); + Debug.Assert(!disposing || Array.IndexOf(_http11Connections.ToArray(), connection) < 0); _associatedHttp11ConnectionCount--; @@ -1901,99 +2002,107 @@ private bool CheckExpirationOnReturn(HttpConnectionBase connection) return false; } - public void RecycleHttp11Connection(HttpConnection connection) => ReturnHttp11Connection(connection, false); + public void RecycleHttp11Connection(HttpConnection connection) + { + if (CheckExpirationOnReturn(connection)) + { + if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing HTTP/1.1 connection when returning to pool. Connection lifetime expired."); + connection.Dispose(); + return; + } - private void ReturnHttp11Connection(HttpConnection connection, bool isNewConnection, HttpConnectionWaiter? initialRequestWaiter = null) + ReturnHttp11Connection(connection); + } + + private void AddNewHttp11Connection(HttpConnection connection, HttpConnectionWaiter? initialRequestWaiter) { - if (NetEventSource.Log.IsEnabled()) connection.Trace($"{nameof(isNewConnection)}={isNewConnection}"); + if (NetEventSource.Log.IsEnabled()) Trace(""); - Debug.Assert(isNewConnection || initialRequestWaiter is null, "Shouldn't have a request unless the connection is new"); + lock (SyncObj) + { + Debug.Assert(_pendingHttp11ConnectionCount > 0); + _pendingHttp11ConnectionCount--; - if (!isNewConnection && CheckExpirationOnReturn(connection)) + if (initialRequestWaiter is not null) + { + // If we're about to signal the initial waiter, that request must be removed from the queue if it was at the head to avoid rooting it forever. + // Normally, TryDequeueWaiter would handle the removal. TryDequeueSpecificWaiter matches this behavior for the initial request case. + // We don't care if this fails; that means the request was previously canceled, handled by a different connection, or not at the head of the queue. + _http11RequestQueue.TryDequeueSpecificWaiter(initialRequestWaiter); + + // There's no need for us to hold the lock while signaling the waiter. + } + } + + if (initialRequestWaiter is not null && + TrySignalWaiter(initialRequestWaiter, connection)) { - if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing HTTP/1.1 connection return to pool. Connection lifetime expired."); - connection.Dispose(); return; } - // Loop in case we get a request that has already been canceled or handled by a different connection. - while (true) + ReturnHttp11Connection(connection); + } + + private void ReturnHttp11Connection(HttpConnection connection) + { + connection.MarkConnectionAsIdle(); + + // The fast path when there are enough connections and no pending requests + // is that we'll see _http11RequestQueueIsEmptyAndNotDisposed being true both + // times, and all we'll have to do as part of returning the connection is + // a Push call on the concurrent stack. + + if (Volatile.Read(ref _http11RequestQueueIsEmptyAndNotDisposed)) { - HttpConnectionWaiter? waiter = null; - bool added = false; - lock (SyncObj) - { - Debug.Assert(!_availableHttp11Connections.Contains(connection), $"Connection already in available list"); - Debug.Assert(_associatedHttp11ConnectionCount > _availableHttp11Connections.Count, - $"Expected _associatedHttp11ConnectionCount={_associatedHttp11ConnectionCount} > _availableHttp11Connections.Count={_availableHttp11Connections.Count}"); - Debug.Assert(_associatedHttp11ConnectionCount <= _maxHttp11Connections, - $"Expected _associatedHttp11ConnectionCount={_associatedHttp11ConnectionCount} <= _maxHttp11Connections={_maxHttp11Connections}"); + _http11Connections.Push(connection); - if (isNewConnection) - { - Debug.Assert(_pendingHttp11ConnectionCount > 0); - _pendingHttp11ConnectionCount--; - isNewConnection = false; - } + // When we add a connection to the pool, we must ensure that there are + // either no pending requests waiting, or that _something_ will pair those + // requests with the connection we just added. - if (initialRequestWaiter is not null) - { - // Try to handle the request that we initiated the connection for first - waiter = initialRequestWaiter; - initialRequestWaiter = null; - - // If this method found a request to service, that request must be removed from the queue if it was at the head to avoid rooting it forever. - // Normally, TryDequeueWaiter would handle the removal. TryDequeueSpecificWaiter matches this behavior for the initial request case. - // We don't care if this fails; that means the request was previously canceled, handled by a different connection, or not at the head of the queue. - _http11RequestQueue.TryDequeueSpecificWaiter(waiter); - } - else if (_http11RequestQueue.TryDequeueWaiter(this, out waiter)) - { - Debug.Assert(_availableHttp11Connections.Count == 0, $"With {_availableHttp11Connections.Count} available HTTP/1.1 connections, we shouldn't have a waiter."); - } - else if (!_disposed) - { - // Add connection to the pool. - added = true; - connection.MarkConnectionAsIdle(); - _availableHttp11Connections.Add(connection); - } + // When adding a request to the queue, we'll first check if there's + // an available connection waiting in the pool that we could use. + // If there isn't, we'll set the _http11RequestQueueIsEmptyAndNotDisposed + // flag and check for available connections again. - // If the pool has been disposed of, we will dispose the connection below outside the lock. - // We do this after processing the queue above so that any queued requests will be handled by existing connections if possible. - } + // To avoid a race where we add the connection after a request was enqueued, + // we'll check the flag again and try to process one request from the queue. - if (waiter is not null) + if (!Volatile.Read(ref _http11RequestQueueIsEmptyAndNotDisposed)) { - Debug.Assert(!added); - if (waiter.TrySetResult(connection)) - { - if (NetEventSource.Log.IsEnabled()) connection.Trace("Dequeued waiting HTTP/1.1 request."); - return; - } - else - { - if (NetEventSource.Log.IsEnabled()) - { - Trace(waiter.Task.IsCanceled - ? "Discarding canceled HTTP/1.1 request from queue." - : "Discarding signaled HTTP/1.1 request waiter from queue."); - } - // Loop and process the queue again - } - } - else if (added) - { - if (NetEventSource.Log.IsEnabled()) connection.Trace("Put HTTP/1.1 connection in pool."); - return; + ProcessHttp11RequestQueue(null); } - else + } + else + { + // ProcessHttp11RequestQueue is responsible for handing the connection to a pending request, + // or to return it back to the pool if there aren't any. + + // We hand over the connection directly instead of pushing it on the stack first to ensure + // that pending requests are processed in a fair (FIFO) order. + ProcessHttp11RequestQueue(connection); + } + } + + private bool TrySignalWaiter(HttpConnectionWaiter waiter, T connection) + where T : HttpConnectionBase? + { + Debug.Assert(connection is not null); + + if (waiter.TrySetResult(connection)) + { + if (NetEventSource.Log.IsEnabled()) connection.Trace("Dequeued waiting request."); + return true; + } + else + { + if (NetEventSource.Log.IsEnabled()) { - Debug.Assert(_disposed); - if (NetEventSource.Log.IsEnabled()) connection.Trace("Disposing HTTP/1.1 connection returned to pool. Pool was disposed."); - connection.Dispose(); - return; + Trace(waiter.Task.IsCanceled + ? "Discarding canceled request from queue." + : "Discarding signaled request waiter from queue."); } + return false; } } @@ -2071,21 +2180,13 @@ private void ReturnHttp2Connection(Http2Connection connection, bool isNewConnect if (waiter is not null) { Debug.Assert(!added); - if (waiter.TrySetResult(connection)) + + if (TrySignalWaiter(waiter, connection)) { - if (NetEventSource.Log.IsEnabled()) connection.Trace("Dequeued waiting HTTP2 request."); break; } - else - { - if (NetEventSource.Log.IsEnabled()) - { - Trace(waiter.Task.IsCanceled - ? "Discarding canceled HTTP/2 request from queue." - : "Discarding signaled HTTP/2 request waiter from queue."); - } - // Loop and process the queue again - } + + // Loop and process the queue again } else { @@ -2195,55 +2296,53 @@ public void Dispose() lock (SyncObj) { - if (!_disposed) + if (_disposed) { - if (NetEventSource.Log.IsEnabled()) Trace("Disposing pool."); - - _disposed = true; - - toDispose = new List(_availableHttp11Connections.Count + (_availableHttp2Connections?.Count ?? 0)); - toDispose.AddRange(_availableHttp11Connections); - if (_availableHttp2Connections is not null) - { - toDispose.AddRange(_availableHttp2Connections); - } + return; + } - // Note: Http11 connections will decrement the _associatedHttp11ConnectionCount when disposed. - // Http2 connections will not, hence the difference in handing _associatedHttp2ConnectionCount. + _disposed = true; + _http11RequestQueueIsEmptyAndNotDisposed = false; - Debug.Assert(_associatedHttp11ConnectionCount >= _availableHttp11Connections.Count, - $"Expected {nameof(_associatedHttp11ConnectionCount)}={_associatedHttp11ConnectionCount} >= {nameof(_availableHttp11Connections)}.Count={_availableHttp11Connections.Count}"); - _availableHttp11Connections.Clear(); + if (NetEventSource.Log.IsEnabled()) Trace("Disposing the pool."); - Debug.Assert(_associatedHttp2ConnectionCount >= (_availableHttp2Connections?.Count ?? 0)); - _associatedHttp2ConnectionCount -= (_availableHttp2Connections?.Count ?? 0); - _availableHttp2Connections?.Clear(); + if (_availableHttp2Connections is not null) + { + toDispose = [.. _availableHttp2Connections]; + _associatedHttp2ConnectionCount -= _availableHttp2Connections.Count; + _availableHttp2Connections.Clear(); + } - if (_http3Connection is not null) - { - toDispose.Add(_http3Connection); - _http3Connection = null; - } + if (_http3Connection is not null) + { + toDispose ??= new(); + toDispose.Add(_http3Connection); + _http3Connection = null; + } - if (_authorityExpireTimer != null) - { - _authorityExpireTimer.Dispose(); - _authorityExpireTimer = null; - } + if (_authorityExpireTimer != null) + { + _authorityExpireTimer.Dispose(); + _authorityExpireTimer = null; + } - if (_altSvcBlocklistTimerCancellation != null) - { - _altSvcBlocklistTimerCancellation.Cancel(); - _altSvcBlocklistTimerCancellation.Dispose(); - _altSvcBlocklistTimerCancellation = null; - } + if (_altSvcBlocklistTimerCancellation != null) + { + _altSvcBlocklistTimerCancellation.Cancel(); + _altSvcBlocklistTimerCancellation.Dispose(); + _altSvcBlocklistTimerCancellation = null; } - Debug.Assert(_availableHttp11Connections.Count == 0, $"Expected {nameof(_availableHttp11Connections)}.{nameof(_availableHttp11Connections.Count)} == 0"); Debug.Assert((_availableHttp2Connections?.Count ?? 0) == 0, $"Expected {nameof(_availableHttp2Connections)}.{nameof(_availableHttp2Connections.Count)} == 0"); } - // Dispose outside the lock to avoid lock re-entrancy issues. + // Dispose connections outside the lock to avoid lock re-entrancy issues. + + // This will trigger the disposal of Http11 connections. + // Note: Http11 connections will decrement the _associatedHttp11ConnectionCount when disposed. + // Http2 connections will not, hence the difference in handing _associatedHttp2ConnectionCount. + ProcessHttp11RequestQueue(null); + toDispose?.ForEach(c => c.Dispose()); } @@ -2279,7 +2378,8 @@ public bool CleanCacheAndDisposeIfUnused() // will be purged next time around. _usedSinceLastCleanup = false; - ScavengeConnectionList(_availableHttp11Connections, ref toDispose, nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout); + ScavengeConnectionStack(this, _http11Connections, ref toDispose, nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout); + if (_availableHttp2Connections is not null) { int removed = ScavengeConnectionList(_availableHttp2Connections, ref toDispose, nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout); @@ -2301,6 +2401,47 @@ public bool CleanCacheAndDisposeIfUnused() // Pool is active. Should not be removed. return false; + static void ScavengeConnectionStack(HttpConnectionPool pool, ConcurrentStack connections, ref List? toDispose, long nowTicks, TimeSpan pooledConnectionLifetime, TimeSpan pooledConnectionIdleTimeout) + { + // We can't simply enumerate the connections stack as other threads may still be adding and removing entries. + // If we want to check the state of a connection, we must take it from the stack first to ensure we own it. + + // We're about to starve the connection pool of all available connections for a moment. + // We must be holding the lock while doing so to ensure that any new requests that + // come in during this time will be blocked waiting in ProcessHttp11RequestQueue. + // If this were not the case, requests would repeatedly call into CheckForHttp11ConnectionInjection + // and trigger new connection attempts, even if we have enough connections in our copy. + Debug.Assert(pool.HasSyncObjLock); + Debug.Assert(connections.Count <= pool._associatedHttp11ConnectionCount); + + HttpConnection[] stackCopy = ArrayPool.Shared.Rent(pool._associatedHttp11ConnectionCount); + int usableConnections = 0; + + while (connections.TryPop(out HttpConnection? connection)) + { + if (IsUsableConnection(connection, nowTicks, pooledConnectionLifetime, pooledConnectionIdleTimeout)) + { + stackCopy[usableConnections++] = connection; + } + else + { + toDispose ??= new List(); + toDispose.Add(connection); + } + } + + if (usableConnections > 0) + { + // Add them back in reverse to maintain the LIFO order. + Span usable = stackCopy.AsSpan(0, usableConnections); + usable.Reverse(); + connections.PushRange(stackCopy, 0, usableConnections); + usable.Clear(); + } + + ArrayPool.Shared.Return(stackCopy); + } + static int ScavengeConnectionList(List list, ref List? toDispose, long nowTicks, TimeSpan pooledConnectionLifetime, TimeSpan pooledConnectionIdleTimeout) where T : HttpConnectionBase { @@ -2542,10 +2683,16 @@ private void Grow() public HttpConnectionWaiter EnqueueRequest(HttpRequestMessage request) { var waiter = new HttpConnectionWaiter(); - Enqueue(new QueueItem { Request = request, Waiter = waiter }); + EnqueueRequest(request, waiter); return waiter; } + + public void EnqueueRequest(HttpRequestMessage request, HttpConnectionWaiter waiter) + { + Enqueue(new QueueItem { Request = request, Waiter = waiter }); + } + public void PruneCompletedRequestsFromHeadOfQueue(HttpConnectionPool pool) { while (TryPeek(out QueueItem queueItem) && queueItem.Waiter.Task.IsCompleted)