diff --git a/src/Connection/ConnectionLimitingPool.php b/src/Connection/ConnectionLimitingPool.php index 9de3acf0..6beb0f06 100644 --- a/src/Connection/ConnectionLimitingPool.php +++ b/src/Connection/ConnectionLimitingPool.php @@ -26,6 +26,9 @@ final class ConnectionLimitingPool implements ConnectionPool /** @var Promise[][] */ private $connections = []; + /** @var int[] */ + private $activeRequestCounts = []; + /** @var Deferred[][] */ private $waiting = []; @@ -112,6 +115,9 @@ public function getStream(Request $request, CancellationToken $cancellation): Pr /** @var Stream $stream */ [$connection, $stream] = yield from $this->getStreamFor($uri, $request, $cancellation); + $connectionId = \spl_object_id($connection); + $this->activeRequestCounts[$connectionId] = ($this->activeRequestCounts[$connectionId] ?? 0) + 1; + return HttpStream::fromStream( $stream, coroutine(function (Request $request, CancellationToken $cancellationToken) use ( @@ -170,6 +176,11 @@ private function getStreamFor(string $uri, Request $request, CancellationToken $ $stream = yield $this->getStreamFromConnection($connection, $request); if ($stream === null) { + if (!$this->isAdditionalConnectionAllowed($uri) && $this->isConnectionIdle($connection)) { + $connection->close(); + break; + } + continue; // No stream available for the given request. } @@ -281,6 +292,11 @@ private function isAdditionalConnectionAllowed(string $uri): bool private function onReadyConnection(Connection $connection, string $uri): void { + $connectionId = \spl_object_id($connection); + if (isset($this->activeRequestCounts[$connectionId])) { + $this->activeRequestCounts[$connectionId]--; + } + if (empty($this->waiting[$uri])) { return; } @@ -290,6 +306,18 @@ private function onReadyConnection(Connection $connection, string $uri): void $deferred->resolve($connection); } + private function isConnectionIdle(Connection $connection): bool + { + $connectionId = \spl_object_id($connection); + + \assert( + !isset($this->activeRequestCounts[$connectionId]) + || $this->activeRequestCounts[$connectionId] >= 0 + ); + + return ($this->activeRequestCounts[$connectionId] ?? 0) === 0; + } + private function removeWaiting(string $uri, int $deferredId): void { unset($this->waiting[$uri][$deferredId]); @@ -300,7 +328,7 @@ private function removeWaiting(string $uri, int $deferredId): void private function dropConnection(string $uri, int $connectionId): void { - unset($this->connections[$uri][$connectionId]); + unset($this->connections[$uri][$connectionId], $this->activeRequestCounts[$connectionId]); if (empty($this->connections[$uri])) { unset($this->connections[$uri], $this->waitForPriorConnection[$uri]); diff --git a/src/Connection/Http1Connection.php b/src/Connection/Http1Connection.php index 75994ada..fa1f312c 100644 --- a/src/Connection/Http1Connection.php +++ b/src/Connection/Http1Connection.php @@ -71,7 +71,7 @@ final class Http1Connection implements Connection private $timeoutGracePeriod; /** @var int */ - private $estimatedClose; + private $lastUsedAt; /** @var bool */ private $explicitTimeout = false; @@ -92,7 +92,7 @@ public function __construct(EncryptableSocket $socket, int $timeoutGracePeriod = $this->remoteAddress = $socket->getRemoteAddress(); $this->tlsInfo = $socket->getTlsInfo(); $this->timeoutGracePeriod = $timeoutGracePeriod; - $this->estimatedClose = getCurrentTime() + self::MAX_KEEP_ALIVE_TIMEOUT * 1000; + $this->lastUsedAt = getCurrentTime(); } public function __destruct() @@ -157,7 +157,7 @@ public function getStream(Request $request): Promise private function free(): Promise { $this->socket = null; - $this->estimatedClose = 0; + $this->lastUsedAt = 0; if ($this->timeoutWatcher !== null) { Loop::cancel($this->timeoutWatcher); @@ -177,12 +177,10 @@ private function free(): Promise private function hasStreamFor(Request $request): bool { - $connectionUnlikelyToClose = $this->explicitTimeout && $this->getRemainingTime() > $this->timeoutGracePeriod; - return !$this->busy && $this->socket && !$this->socket->isClosed() - && ($connectionUnlikelyToClose || $request->isIdempotent()); + && ($this->getRemainingTime() > 0 || $request->isIdempotent()); } /** @inheritdoc */ @@ -406,7 +404,6 @@ private function readResponse( if ($timeout > 0 && $parser->getState() !== Http1Parser::BODY_IDENTITY_EOF) { $this->timeoutWatcher = Loop::delay($timeout * 1000, [$this, 'close']); Loop::unreference($this->timeoutWatcher); - $this->estimatedClose = getCurrentTime() + $timeout * 1000; } else { $this->close(); } @@ -482,7 +479,8 @@ private function handleUpgradeResponse(Request $request, Response $response, str */ private function getRemainingTime(): int { - return \max(0, $this->estimatedClose - getCurrentTime()); + $timestamp = $this->lastUsedAt + $this->explicitTimeout ? $this->priorTimeout * 1000 : $this->timeoutGracePeriod; + return \max(0, $timestamp - getCurrentTime()); } private function withCancellation(Promise $promise, CancellationToken $cancellationToken): Promise