From c9778180525d3847bdb51cf9fa9d9fca573d77f8 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Mon, 13 Apr 2020 16:16:18 -0500 Subject: [PATCH] Add inactivity timeout (#263) This provides a separate timeout while waiting for the response or streaming the body. If no data is received for the response within the given number of milliseconds, the request fails similarly to the transfer timeout. --- src/Connection/Http1Connection.php | 69 ++++++++++++------ src/Connection/Http2Connection.php | 1 - .../Internal/Http2ConnectionProcessor.php | 42 ++++++++++- src/Connection/Internal/Http2Stream.php | 23 ++++++ src/Request.php | 16 +++++ test/ClientHttpBinIntegrationTest.php | 4 +- test/Connection/Http1ConnectionTest.php | 70 +++++++++++++++++++ test/Connection/Http2ConnectionTest.php | 55 ++++++++++++++- 8 files changed, 248 insertions(+), 32 deletions(-) diff --git a/src/Connection/Http1Connection.php b/src/Connection/Http1Connection.php index 1944a887..9b839e7f 100644 --- a/src/Connection/Http1Connection.php +++ b/src/Connection/Http1Connection.php @@ -32,6 +32,7 @@ use Amp\Socket\TlsInfo; use Amp\Success; use Amp\TimeoutCancellationToken; +use Amp\TimeoutException as PromiseTimeoutException; use function Amp\asyncCall; use function Amp\call; use function Amp\getCurrentTime; @@ -157,6 +158,7 @@ public function getStream(Request $request): Promise private function free(): Promise { $this->socket = null; + $this->lastUsedAt = 0; if ($this->timeoutWatcher !== null) { @@ -279,15 +281,20 @@ private function readResponse( $parser = new Http1Parser($request, $bodyCallback, $trailersCallback); $start = getCurrentTime(); + $timeout = $request->getInactivityTimeout(); try { - while (null !== $chunk = yield $this->socket->read()) { - parseChunk: - $response = $parser->parse($chunk); + while (null !== $chunk = yield $timeout > 0 + ? Promise\timeout($this->socket->read(), $timeout) + : $this->socket->read() + ) { + parseChunk: $response = $parser->parse($chunk); if ($response === null) { continue; } + $this->lastUsedAt = getCurrentTime(); + $status = $response->getStatus(); if ($status === Http\Status::SWITCHING_PROTOCOLS) { @@ -362,6 +369,7 @@ private function readResponse( $readingCancellation, $bodyCancellationToken, $stream, + $timeout, &$backpressure, &$trailers ) { @@ -374,24 +382,32 @@ private function readResponse( // to resolve promise with headers. $chunk = null; - /** @psalm-suppress PossiblyNullReference */ - do { - /** @noinspection CallableParameterUseCaseInTypeContextInspection */ - $parser->parse($chunk); - /** @noinspection NotOptimalIfConditionsInspection */ - if ($parser->isComplete()) { - break; - } - - if (!$backpressure instanceof Success) { - yield $this->withCancellation($backpressure, $bodyCancellationToken); - } - - /** @psalm-suppress TypeDoesNotContainNull */ - if ($this->socket === null) { - throw new SocketException('Socket closed prior to response completion'); - } - } while (null !== $chunk = yield $this->socket->read()); + try { + /** @psalm-suppress PossiblyNullReference */ + do { + /** @noinspection CallableParameterUseCaseInTypeContextInspection */ + $parser->parse($chunk); + /** @noinspection NotOptimalIfConditionsInspection */ + if ($parser->isComplete()) { + break; + } + + if (!$backpressure instanceof Success) { + yield $this->withCancellation($backpressure, $bodyCancellationToken); + } + + /** @psalm-suppress TypeDoesNotContainNull */ + if ($this->socket === null) { + throw new SocketException('Socket closed prior to response completion'); + } + } while (null !== $chunk = yield $timeout > 0 + ? Promise\timeout($this->socket->read(), $timeout) + : $this->socket->read() + ); + } catch (PromiseTimeoutException $e) { + $this->close(); + throw new TimeoutException('Inactivity timeout exceeded, more than ' . $timeout . ' ms elapsed from last data received', 0, $e); + } $originalCancellation->throwIfRequested(); @@ -453,8 +469,15 @@ private function readResponse( \strlen($parser->getBuffer()), getCurrentTime() - $start )); - } catch (StreamException $e) { - throw new SocketException('Receiving the response headers failed: ' . $e->getMessage()); + } catch (HttpException $e) { + $this->close(); + throw $e; + } catch (PromiseTimeoutException $e) { + $this->close(); + throw new TimeoutException('Inactivity timeout exceeded, more than ' . $timeout . ' ms elapsed from last data received', 0, $e); + } catch (\Throwable $e) { + $this->close(); + throw new SocketException('Receiving the response headers failed: ' . $e->getMessage(), 0, $e); } } diff --git a/src/Connection/Http2Connection.php b/src/Connection/Http2Connection.php index f2322e69..062ccd6f 100644 --- a/src/Connection/Http2Connection.php +++ b/src/Connection/Http2Connection.php @@ -7,7 +7,6 @@ use Amp\Http\Client\Internal\ForbidCloning; use Amp\Http\Client\Internal\ForbidSerialization; use Amp\Http\Client\Request; -use Amp\Http\Http2\Http2Processor; use Amp\Promise; use Amp\Socket\EncryptableSocket; use Amp\Socket\SocketAddress; diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 49525706..c86a8547 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -276,6 +276,7 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool } $stream = $this->streams[$streamId]; + $stream->resetInactivityWatcher(); if ($stream->trailers) { if ($stream->expectedLength && $stream->received !== $stream->expectedLength) { @@ -513,8 +514,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $this->releaseStream($streamId, $exception); }); - - unset($bodyCancellation, $cancellationToken); // Remove reference to cancellation token. } public function handlePushPromise(int $parentId, int $streamId, array $pseudo, array $headers): void @@ -582,8 +581,8 @@ public function handlePushPromise(int $parentId, int $streamId, array $pseudo, a return; } - /** @var Http2Stream $parentStream */ $parentStream = $this->streams[$parentId]; + $parentStream->resetInactivityWatcher(); if (\strcasecmp($host, $parentStream->request->getUri()->getHost()) !== 0) { $this->handleStreamException(new Http2StreamException( @@ -627,6 +626,8 @@ public function handlePushPromise(int $parentId, int $streamId, array $pseudo, a $request->setPushHandler($parentStream->request->getPushHandler()); $request->setHeaderSizeLimit($parentStream->request->getHeaderSizeLimit()); $request->setBodySizeLimit($parentStream->request->getBodySizeLimit()); + $request->setInactivityTimeout($parentStream->request->getInactivityTimeout()); + $request->setTransferTimeout($parentStream->request->getTransferTimeout()); $stream = new Http2Stream( $streamId, @@ -642,6 +643,7 @@ static function () { ), $parentStream->cancellationToken, $parentStream->originalCancellation, + $this->createStreamInactivityWatcher($streamId, $request->getInactivityTimeout()), self::DEFAULT_WINDOW_SIZE, 0 ); @@ -768,6 +770,7 @@ public function handleData(int $streamId, string $data): void } $stream = $this->streams[$streamId]; + $stream->resetInactivityWatcher(); if (!$stream->body) { $this->handleStreamException(new Http2StreamException( @@ -959,6 +962,7 @@ public function request(Request $request, CancellationToken $cancellationToken, $stream, $cancellationToken, $originalCancellation, + $this->createStreamInactivityWatcher($streamId, $request->getInactivityTimeout()), self::DEFAULT_WINDOW_SIZE, $this->initialWindowSize ); @@ -1292,6 +1296,7 @@ private function writeBufferedData(Http2Stream $stream): Promise } $stream->requestBodyBuffer = ""; + $stream->resetInactivityWatcher(); return $promise; } @@ -1327,6 +1332,7 @@ private function writeBufferedData(Http2Stream $stream): Promise ); $stream->requestBodyBuffer = \substr($data, $windowSize); + $stream->resetInactivityWatcher(); return $promise; } @@ -1609,4 +1615,34 @@ private function increaseStreamWindow(Http2Stream $stream): void ); } } + + private function createStreamInactivityWatcher(int $streamId, int $timeout): ?string + { + if ($timeout <= 0) { + return null; + } + + $watcher = Loop::delay($timeout, function () use ($streamId, $timeout): void { + if (!isset($this->streams[$streamId])) { + return; + } + + $this->writeFrame( + Http2Parser::RST_STREAM, + Http2Parser::NO_FLAG, + $streamId, + \pack("N", Http2Parser::CANCEL) + ); + + $this->releaseStream( + $streamId, + new TimeoutException('Inactivity timeout exceeded, more than ' + . $timeout . ' ms elapsed from last data received') + ); + }); + + Loop::unreference($watcher); + + return $watcher; + } } diff --git a/src/Connection/Internal/Http2Stream.php b/src/Connection/Internal/Http2Stream.php index 1b398a46..0a1e8deb 100644 --- a/src/Connection/Internal/Http2Stream.php +++ b/src/Connection/Internal/Http2Stream.php @@ -10,6 +10,7 @@ use Amp\Http\Client\Internal\ForbidSerialization; use Amp\Http\Client\Request; use Amp\Http\Client\Response; +use Amp\Loop; use Amp\Promise; use Amp\Struct; @@ -87,12 +88,16 @@ final class Http2Stream /** @var Deferred|null */ public $windowSizeIncrease; + /** @var string|null */ + private $watcher; + public function __construct( int $id, Request $request, Stream $stream, CancellationToken $cancellationToken, CancellationToken $originalCancellation, + ?string $watcher, int $serverSize, int $clientSize ) { @@ -101,9 +106,27 @@ public function __construct( $this->stream = $stream; $this->cancellationToken = $cancellationToken; $this->originalCancellation = $originalCancellation; + $this->watcher = $watcher; $this->serverWindow = $serverSize; $this->clientWindow = $clientSize; $this->pendingResponse = new Deferred; $this->requestBodyCompletion = new Deferred; } + + public function __destruct() + { + if ($this->watcher !== null) { + Loop::cancel($this->watcher); + } + } + + public function resetInactivityWatcher(): void + { + if ($this->watcher === null) { + return; + } + + Loop::disable($this->watcher); + Loop::enable($this->watcher); + } } diff --git a/src/Request.php b/src/Request.php index ecab9627..f164b031 100644 --- a/src/Request.php +++ b/src/Request.php @@ -60,6 +60,9 @@ private static function clone($value) /** @var int */ private $transferTimeout = 10000; + /** @var int */ + private $inactivityTimeout = 10000; + /** @var int */ private $bodySizeLimit = self::DEFAULT_BODY_SIZE_LIMIT; @@ -391,6 +394,19 @@ public function setTransferTimeout(int $transferTimeout): void $this->transferTimeout = $transferTimeout; } + /** + * @return int Timeout in milliseconds since the last data was received before the request fails due to inactivity. + */ + public function getInactivityTimeout(): int + { + return $this->inactivityTimeout; + } + + public function setInactivityTimeout(int $inactivityTimeout): void + { + $this->inactivityTimeout = $inactivityTimeout; + } + public function getHeaderSizeLimit(): int { return $this->headerSizeLimit; diff --git a/test/ClientHttpBinIntegrationTest.php b/test/ClientHttpBinIntegrationTest.php index 7bacf6ed..ed4064d1 100644 --- a/test/ClientHttpBinIntegrationTest.php +++ b/test/ClientHttpBinIntegrationTest.php @@ -55,8 +55,6 @@ public function testCloseAfterConnect(): \Generator { $this->givenRawServerResponse(""); - $this->client = $this->builder->retry(0)->build(); - $this->expectException(SocketException::class); $this->expectExceptionMessageMatches("(Receiving the response headers for '.*' failed, because the socket to '.*' @ '.*' closed early)"); @@ -707,7 +705,7 @@ protected function setUp(): void { parent::setUp(); - $this->builder = new HttpClientBuilder; + $this->builder = (new HttpClientBuilder)->retry(0); $this->client = $this->builder->build(); if ($this->socket) { diff --git a/test/Connection/Http1ConnectionTest.php b/test/Connection/Http1ConnectionTest.php index 729c7ef8..6278dd9c 100644 --- a/test/Connection/Http1ConnectionTest.php +++ b/test/Connection/Http1ConnectionTest.php @@ -7,7 +7,9 @@ use Amp\Http\Client\Request; use Amp\Http\Client\RequestBody; use Amp\Http\Client\Response; +use Amp\Http\Client\TimeoutException; use Amp\Iterator; +use Amp\Loop; use Amp\NullCancellationToken; use Amp\PHPUnit\AsyncTestCase; use Amp\Promise; @@ -124,6 +126,74 @@ public function testUpgrade(): \Generator $this->assertSame([], (yield $response->getTrailers())->getHeaders()); } + public function testTransferTimeout(): \Generator + { + $this->setMinimumRuntime(500); + $this->setTimeout(600); + + [$server, $client] = Socket\createPair(); + + $connection = new Http1Connection($client); + + $request = new Request('http://localhost'); + $request->setTransferTimeout(500); + + /** @var Stream $stream */ + $stream = yield $connection->getStream($request); + + $server->write("HTTP/1.1 200 Continue\r\nConnection: keep-alive\r\nContent-Length: 8\r\n\r\ntest"); + + /** @var Response $response */ + $response = yield $stream->request($request, new NullCancellationToken); + + $this->assertSame(200, $response->getStatus()); + + try { + yield $response->getBody()->buffer(); + $this->fail("The request should have timed out"); + } catch (TimeoutException $exception) { + $this->assertStringContainsString('transfer timeout', $exception->getMessage()); + } + } + + public function testInactivityTimeout(): \Generator + { + $this->setMinimumRuntime(500); + $this->setTimeout(1000); + + [$server, $client] = Socket\createPair(); + + $connection = new Http1Connection($client); + + $request = new Request('http://localhost'); + $request->setInactivityTimeout(500); + + /** @var Stream $stream */ + $stream = yield $connection->getStream($request); + + $server->write("HTTP/1.1 200 Continue\r\nConnection: keep-alive\r\nContent-Length: 8\r\n\r\n"); + + Loop::unreference(Loop::delay(400, function () use ($server) { + $server->write("test"); // Still missing 4 bytes from the body + })); + + Loop::unreference(Loop::delay(1000, function () use ($server) { + $server->write("test"); // Request should timeout before this is called + })); + + /** @var Response $response */ + $response = yield $stream->request($request, new NullCancellationToken); + + $this->assertSame(200, $response->getStatus()); + + try { + yield $response->getBody()->buffer(); + $this->fail("The request should have timed out"); + } catch (TimeoutException $exception) { + $this->assertStringContainsString('Inactivity timeout', $exception->getMessage()); + } + } + private function createSlowBody() { return new class implements RequestBody { diff --git a/test/Connection/Http2ConnectionTest.php b/test/Connection/Http2ConnectionTest.php index 62fa6f6b..d23acf42 100644 --- a/test/Connection/Http2ConnectionTest.php +++ b/test/Connection/Http2ConnectionTest.php @@ -213,7 +213,7 @@ public function testCancellingWhileStreamingBody(): \Generator $this->assertSame(200, $response->getStatus()); try { - $this->assertSame('test', yield $response->getBody()->buffer()); + yield $response->getBody()->buffer(); $this->fail("The request body should have been cancelled"); } catch (CancelledException $exception) { $buffer = yield $server->read(); @@ -264,7 +264,7 @@ public function testTimeoutWhileStreamingBody(): \Generator $this->assertSame(200, $response->getStatus()); try { - $this->assertSame('test', yield $response->getBody()->buffer()); + yield $response->getBody()->buffer(); $this->fail("The request body should have been cancelled"); } catch (TimeoutException $exception) { $buffer = yield $server->read(); @@ -346,4 +346,55 @@ public function testCancellingPushPromiseBody(): \Generator $this->assertStringEndsWith($expected, $buffer); } } + + public function testInactivityWhileStreamingBody(): \Generator + { + $hpack = new HPack; + + [$server, $client] = Socket\createPair(); + + $connection = new Http2Connection($client); + + $server->write(self::packFrame('', Http2Parser::SETTINGS, 0, 0)); + + yield $connection->initialize(); + + $request = new Request('http://localhost/'); + $request->setInactivityTimeout(500); + + /** @var Stream $stream */ + $stream = yield $connection->getStream($request); + + asyncCall(static function () use ($server, $hpack) { + yield delay(100); + + $server->write(self::packFrame($hpack->encode([ + [":status", Status::OK], + ["content-length", "8"], + ["date", formatDateHeader()], + ]), Http2Parser::HEADERS, Http2Parser::END_HEADERS, 1)); + + yield delay(100); + + $server->write(self::packFrame('test', Http2Parser::DATA, Http2Parser::NO_FLAG, 1)); + + yield delay(1000); + + $server->write(self::packFrame('test', Http2Parser::DATA, Http2Parser::END_STREAM, 1)); + }); + + /** @var Response $response */ + $response = yield $stream->request($request, new NullCancellationToken); + + $this->assertSame(200, $response->getStatus()); + + try { + yield $response->getBody()->buffer(); + $this->fail("The request body should have been cancelled"); + } catch (TimeoutException $exception) { + $buffer = yield $server->read(); + $expected = self::packFrame(\pack("N", Http2Parser::CANCEL), Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, 1); + $this->assertStringEndsWith($expected, $buffer); + } + } }