diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 72e53728..b1d7b4a4 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -409,11 +409,9 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $cancellation = $deferredCancellation->getCancellation(); $body = new ResponseBodyStream(new ReadableIterableStream($iterator), $deferredCancellation); - $cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($streamId): void { - if (isset($this->streams[$streamId])) { - $this->releaseStream($streamId, $exception, false); - } - }); + $cancellationId = $cancellation->subscribe( + fn (CancelledException $exception) => $this->releaseStream($streamId, $exception, false), + ); $trailers ->finally(static fn () => $cancellation->unsubscribe($cancellationId)) @@ -678,13 +676,9 @@ static function () { } EventLoop::queue(function () use ($pushId, $deferredCancellation, $stream, $cancellation): void { - $cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ( - $pushId - ): void { - if (isset($this->streams[$pushId])) { - $this->releaseStream($pushId, $exception, false); - } - }); + $cancellationId = $cancellation->subscribe( + fn (CancelledException $exception) => $this->releaseStream($pushId, $exception, false), + ); $onPush = $stream->request->getPushHandler(); @@ -750,9 +744,7 @@ public function handleStreamException(Http2StreamException $exception): void $exception = new SocketException($exception->getMessage(), $code, $exception); - if (isset($this->streams[$id])) { - $this->releaseStream($id, $exception, $code === Http2Parser::REFUSED_STREAM); - } + $this->releaseStream($id, $exception, $code === Http2Parser::REFUSED_STREAM); } public function handleConnectionException(Http2ConnectionException $exception): void @@ -986,11 +978,9 @@ public function request(Request $request, Cancellation $cancellation, Stream $st $this->initialWindowSize, ); - $cancellationId = $cancellation->subscribe(function (CancelledException $exception) use ($streamId): void { - if (isset($this->streams[$streamId])) { - $this->releaseStream($streamId, $exception, false); - } - }); + $cancellationId = $cancellation->subscribe( + fn (CancelledException $exception) => $this->releaseStream($streamId, $exception, false), + ); \assert($http2stream->trailers !== null); $http2stream->trailers->getFuture() @@ -1070,9 +1060,7 @@ public function request(Request $request, Cancellation $cancellation, Stream $st $http2stream->requestBodyCompletion->error($exception); } - if (isset($this->streams[$streamId])) { - $this->releaseStream($streamId, $exception, false); - } + $this->releaseStream($streamId, $exception, false); throw $exception; } @@ -1328,9 +1316,10 @@ private function writeBufferedData(Http2Stream $stream): Future private function releaseStream(int $streamId, ?\Throwable $exception, bool $unprocessed): void { - \assert(isset($this->streams[$streamId])); - - $stream = $this->streams[$streamId]; + $stream = $this->streams[$streamId] ?? null; + if (!$stream) { + return; + } unset($this->streams[$streamId]);