Skip to content

Commit

Permalink
Always check stream was not previously released
Browse files Browse the repository at this point in the history
Closes #361.
  • Loading branch information
trowski committed May 10, 2024
1 parent c980055 commit dd0b75e
Showing 1 changed file with 15 additions and 26 deletions.
41 changes: 15 additions & 26 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,9 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
$cancellation = $deferredCancellation->getCancellation();
$body = new ResponseBodyStream(new ReadableIterableStream($iterator), $deferredCancellation);

$cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($streamId): void {
if (isset($this->streams[$streamId])) {
$this->releaseStream($streamId, $exception, false);
}
});
$cancellationId = $cancellation->subscribe(
fn (CancelledException $exception) => $this->releaseStream($streamId, $exception, false),
);

$trailers
->finally(static fn () => $cancellation->unsubscribe($cancellationId))
Expand Down Expand Up @@ -678,13 +676,9 @@ static function () {
}

EventLoop::queue(function () use ($pushId, $deferredCancellation, $stream, $cancellation): void {
$cancellationId = $cancellation->subscribe(function (CancelledException $exception) use (
$pushId
): void {
if (isset($this->streams[$pushId])) {
$this->releaseStream($pushId, $exception, false);
}
});
$cancellationId = $cancellation->subscribe(
fn (CancelledException $exception) => $this->releaseStream($pushId, $exception, false),
);

$onPush = $stream->request->getPushHandler();

Expand Down Expand Up @@ -750,9 +744,7 @@ public function handleStreamException(Http2StreamException $exception): void

$exception = new SocketException($exception->getMessage(), $code, $exception);

if (isset($this->streams[$id])) {
$this->releaseStream($id, $exception, $code === Http2Parser::REFUSED_STREAM);
}
$this->releaseStream($id, $exception, $code === Http2Parser::REFUSED_STREAM);
}

public function handleConnectionException(Http2ConnectionException $exception): void
Expand Down Expand Up @@ -986,11 +978,9 @@ public function request(Request $request, Cancellation $cancellation, Stream $st
$this->initialWindowSize,
);

$cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($streamId): void {
if (isset($this->streams[$streamId])) {
$this->releaseStream($streamId, $exception, false);
}
});
$cancellationId = $cancellation->subscribe(
fn (CancelledException $exception) => $this->releaseStream($streamId, $exception, false),
);

\assert($http2stream->trailers !== null);
$http2stream->trailers->getFuture()
Expand Down Expand Up @@ -1070,9 +1060,7 @@ public function request(Request $request, Cancellation $cancellation, Stream $st
$http2stream->requestBodyCompletion->error($exception);
}

if (isset($this->streams[$streamId])) {
$this->releaseStream($streamId, $exception, false);
}
$this->releaseStream($streamId, $exception, false);

throw $exception;
}
Expand Down Expand Up @@ -1328,9 +1316,10 @@ private function writeBufferedData(Http2Stream $stream): Future

private function releaseStream(int $streamId, ?\Throwable $exception, bool $unprocessed): void
{
\assert(isset($this->streams[$streamId]));

$stream = $this->streams[$streamId];
$stream = $this->streams[$streamId] ?? null;
if (!$stream) {
return;
}

unset($this->streams[$streamId]);

Expand Down

0 comments on commit dd0b75e

Please sign in to comment.