Skip to content

Commit

Permalink
Add inactivity timeout (#263)
Browse files Browse the repository at this point in the history
This provides a separate timeout while waiting for the response or streaming the body. If no data is received for the response within the given number of milliseconds, the request fails similarly to the transfer timeout.
  • Loading branch information
trowski committed Apr 13, 2020
1 parent f7f6dec commit c977818
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 32 deletions.
69 changes: 46 additions & 23 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
use Amp\Socket\TlsInfo;
use Amp\Success;
use Amp\TimeoutCancellationToken;
use Amp\TimeoutException as PromiseTimeoutException;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\getCurrentTime;
Expand Down Expand Up @@ -157,6 +158,7 @@ public function getStream(Request $request): Promise
private function free(): Promise
{
$this->socket = null;

$this->lastUsedAt = 0;

if ($this->timeoutWatcher !== null) {
Expand Down Expand Up @@ -279,15 +281,20 @@ private function readResponse(
$parser = new Http1Parser($request, $bodyCallback, $trailersCallback);

$start = getCurrentTime();
$timeout = $request->getInactivityTimeout();

try {
while (null !== $chunk = yield $this->socket->read()) {
parseChunk:
$response = $parser->parse($chunk);
while (null !== $chunk = yield $timeout > 0
? Promise\timeout($this->socket->read(), $timeout)
: $this->socket->read()
) {
parseChunk: $response = $parser->parse($chunk);
if ($response === null) {
continue;
}

$this->lastUsedAt = getCurrentTime();

$status = $response->getStatus();

if ($status === Http\Status::SWITCHING_PROTOCOLS) {
Expand Down Expand Up @@ -362,6 +369,7 @@ private function readResponse(
$readingCancellation,
$bodyCancellationToken,
$stream,
$timeout,
&$backpressure,
&$trailers
) {
Expand All @@ -374,24 +382,32 @@ private function readResponse(
// to resolve promise with headers.
$chunk = null;

/** @psalm-suppress PossiblyNullReference */
do {
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$parser->parse($chunk);
/** @noinspection NotOptimalIfConditionsInspection */
if ($parser->isComplete()) {
break;
}

if (!$backpressure instanceof Success) {
yield $this->withCancellation($backpressure, $bodyCancellationToken);
}

/** @psalm-suppress TypeDoesNotContainNull */
if ($this->socket === null) {
throw new SocketException('Socket closed prior to response completion');
}
} while (null !== $chunk = yield $this->socket->read());
try {
/** @psalm-suppress PossiblyNullReference */
do {
/** @noinspection CallableParameterUseCaseInTypeContextInspection */
$parser->parse($chunk);
/** @noinspection NotOptimalIfConditionsInspection */
if ($parser->isComplete()) {
break;
}

if (!$backpressure instanceof Success) {
yield $this->withCancellation($backpressure, $bodyCancellationToken);
}

/** @psalm-suppress TypeDoesNotContainNull */
if ($this->socket === null) {
throw new SocketException('Socket closed prior to response completion');
}
} while (null !== $chunk = yield $timeout > 0
? Promise\timeout($this->socket->read(), $timeout)
: $this->socket->read()
);
} catch (PromiseTimeoutException $e) {
$this->close();
throw new TimeoutException('Inactivity timeout exceeded, more than ' . $timeout . ' ms elapsed from last data received', 0, $e);
}

$originalCancellation->throwIfRequested();

Expand Down Expand Up @@ -453,8 +469,15 @@ private function readResponse(
\strlen($parser->getBuffer()),
getCurrentTime() - $start
));
} catch (StreamException $e) {
throw new SocketException('Receiving the response headers failed: ' . $e->getMessage());
} catch (HttpException $e) {
$this->close();
throw $e;
} catch (PromiseTimeoutException $e) {
$this->close();
throw new TimeoutException('Inactivity timeout exceeded, more than ' . $timeout . ' ms elapsed from last data received', 0, $e);
} catch (\Throwable $e) {
$this->close();
throw new SocketException('Receiving the response headers failed: ' . $e->getMessage(), 0, $e);
}
}

Expand Down
1 change: 0 additions & 1 deletion src/Connection/Http2Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use Amp\Http\Client\Internal\ForbidCloning;
use Amp\Http\Client\Internal\ForbidSerialization;
use Amp\Http\Client\Request;
use Amp\Http\Http2\Http2Processor;
use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use Amp\Socket\SocketAddress;
Expand Down
42 changes: 39 additions & 3 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
}

$stream = $this->streams[$streamId];
$stream->resetInactivityWatcher();

if ($stream->trailers) {
if ($stream->expectedLength && $stream->received !== $stream->expectedLength) {
Expand Down Expand Up @@ -513,8 +514,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool

$this->releaseStream($streamId, $exception);
});

unset($bodyCancellation, $cancellationToken); // Remove reference to cancellation token.
}

public function handlePushPromise(int $parentId, int $streamId, array $pseudo, array $headers): void
Expand Down Expand Up @@ -582,8 +581,8 @@ public function handlePushPromise(int $parentId, int $streamId, array $pseudo, a
return;
}

/** @var Http2Stream $parentStream */
$parentStream = $this->streams[$parentId];
$parentStream->resetInactivityWatcher();

if (\strcasecmp($host, $parentStream->request->getUri()->getHost()) !== 0) {
$this->handleStreamException(new Http2StreamException(
Expand Down Expand Up @@ -627,6 +626,8 @@ public function handlePushPromise(int $parentId, int $streamId, array $pseudo, a
$request->setPushHandler($parentStream->request->getPushHandler());
$request->setHeaderSizeLimit($parentStream->request->getHeaderSizeLimit());
$request->setBodySizeLimit($parentStream->request->getBodySizeLimit());
$request->setInactivityTimeout($parentStream->request->getInactivityTimeout());
$request->setTransferTimeout($parentStream->request->getTransferTimeout());

$stream = new Http2Stream(
$streamId,
Expand All @@ -642,6 +643,7 @@ static function () {
),
$parentStream->cancellationToken,
$parentStream->originalCancellation,
$this->createStreamInactivityWatcher($streamId, $request->getInactivityTimeout()),
self::DEFAULT_WINDOW_SIZE,
0
);
Expand Down Expand Up @@ -768,6 +770,7 @@ public function handleData(int $streamId, string $data): void
}

$stream = $this->streams[$streamId];
$stream->resetInactivityWatcher();

if (!$stream->body) {
$this->handleStreamException(new Http2StreamException(
Expand Down Expand Up @@ -959,6 +962,7 @@ public function request(Request $request, CancellationToken $cancellationToken,
$stream,
$cancellationToken,
$originalCancellation,
$this->createStreamInactivityWatcher($streamId, $request->getInactivityTimeout()),
self::DEFAULT_WINDOW_SIZE,
$this->initialWindowSize
);
Expand Down Expand Up @@ -1292,6 +1296,7 @@ private function writeBufferedData(Http2Stream $stream): Promise
}

$stream->requestBodyBuffer = "";
$stream->resetInactivityWatcher();

return $promise;
}
Expand Down Expand Up @@ -1327,6 +1332,7 @@ private function writeBufferedData(Http2Stream $stream): Promise
);

$stream->requestBodyBuffer = \substr($data, $windowSize);
$stream->resetInactivityWatcher();

return $promise;
}
Expand Down Expand Up @@ -1609,4 +1615,34 @@ private function increaseStreamWindow(Http2Stream $stream): void
);
}
}

private function createStreamInactivityWatcher(int $streamId, int $timeout): ?string
{
if ($timeout <= 0) {
return null;
}

$watcher = Loop::delay($timeout, function () use ($streamId, $timeout): void {
if (!isset($this->streams[$streamId])) {
return;
}

$this->writeFrame(
Http2Parser::RST_STREAM,
Http2Parser::NO_FLAG,
$streamId,
\pack("N", Http2Parser::CANCEL)
);

$this->releaseStream(
$streamId,
new TimeoutException('Inactivity timeout exceeded, more than '
. $timeout . ' ms elapsed from last data received')
);
});

Loop::unreference($watcher);

return $watcher;
}
}
23 changes: 23 additions & 0 deletions src/Connection/Internal/Http2Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Amp\Http\Client\Internal\ForbidSerialization;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Loop;
use Amp\Promise;
use Amp\Struct;

Expand Down Expand Up @@ -87,12 +88,16 @@ final class Http2Stream
/** @var Deferred|null */
public $windowSizeIncrease;

/** @var string|null */
private $watcher;

public function __construct(
int $id,
Request $request,
Stream $stream,
CancellationToken $cancellationToken,
CancellationToken $originalCancellation,
?string $watcher,
int $serverSize,
int $clientSize
) {
Expand All @@ -101,9 +106,27 @@ public function __construct(
$this->stream = $stream;
$this->cancellationToken = $cancellationToken;
$this->originalCancellation = $originalCancellation;
$this->watcher = $watcher;
$this->serverWindow = $serverSize;
$this->clientWindow = $clientSize;
$this->pendingResponse = new Deferred;
$this->requestBodyCompletion = new Deferred;
}

public function __destruct()
{
if ($this->watcher !== null) {
Loop::cancel($this->watcher);
}
}

public function resetInactivityWatcher(): void
{
if ($this->watcher === null) {
return;
}

Loop::disable($this->watcher);
Loop::enable($this->watcher);
}
}
16 changes: 16 additions & 0 deletions src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ private static function clone($value)
/** @var int */
private $transferTimeout = 10000;

/** @var int */
private $inactivityTimeout = 10000;

/** @var int */
private $bodySizeLimit = self::DEFAULT_BODY_SIZE_LIMIT;

Expand Down Expand Up @@ -391,6 +394,19 @@ public function setTransferTimeout(int $transferTimeout): void
$this->transferTimeout = $transferTimeout;
}

/**
* @return int Timeout in milliseconds since the last data was received before the request fails due to inactivity.
*/
public function getInactivityTimeout(): int
{
return $this->inactivityTimeout;
}

public function setInactivityTimeout(int $inactivityTimeout): void
{
$this->inactivityTimeout = $inactivityTimeout;
}

public function getHeaderSizeLimit(): int
{
return $this->headerSizeLimit;
Expand Down
4 changes: 1 addition & 3 deletions test/ClientHttpBinIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public function testCloseAfterConnect(): \Generator
{
$this->givenRawServerResponse("");

$this->client = $this->builder->retry(0)->build();

$this->expectException(SocketException::class);
$this->expectExceptionMessageMatches("(Receiving the response headers for '.*' failed, because the socket to '.*' @ '.*' closed early)");

Expand Down Expand Up @@ -707,7 +705,7 @@ protected function setUp(): void
{
parent::setUp();

$this->builder = new HttpClientBuilder;
$this->builder = (new HttpClientBuilder)->retry(0);
$this->client = $this->builder->build();

if ($this->socket) {
Expand Down
Loading

0 comments on commit c977818

Please sign in to comment.