Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostgresNIO/ListenStateMachine.swift:182: Fatal error: Invalid state: initialized #458

Open
CrownedPhoenix opened this issue Feb 21, 2024 · 12 comments
Labels
bug Something isn't working

Comments

@CrownedPhoenix
Copy link

CrownedPhoenix commented Feb 21, 2024

Description

ListenStateMachine reaches an invalid state during concurrent operation of addListener on a pool that is shutting down.
I'm not sure if this behavior is expected given the obvious concurrent execution described in the reproducer below.

At any rate, a fatalError here seems out of place unless there's something I'm misunderstanding about the ListenStateMachine.

To Reproduce

Simply run the following:

import NIO
import AsyncKit
import PostgresNIO
import PostgresKit

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)

let configuration = SQLPostgresConfiguration(
    hostname: "localhost",
    port: 5432,
    username: "postgres",
    password: "postgres",
    database: "test",
    tls: .disable
)
for _ in 0..<1000 {
    let pool = EventLoopGroupConnectionPool(
        source: PostgresConnectionSource(sqlConfiguration: configuration),
        on:  group
    )

    Task {
        for _ in 0..<100 {
            _ = pool.withConnection({ conn in
                let channel = "default"
                _ = conn.addListener(channel: channel) { _, notifications in
                    assert(notifications.channel == channel, "Notification channel mismatch")
                }

                return group.future()
            })
        }
    }

    let promise: EventLoopPromise<Void> = group.next().makePromise()

    pool.shutdownGracefully({ _ in
        promise.succeed(())

    })
    try promise.futureResult.wait()
}

with the following dependencies:

        .package(url: "https://github.com/apple/swift-nio.git",  from: "2.54.0"),
        .package(url: "https://github.com/vapor/async-kit.git", exact: "1.17.0"),
        .package(url: "https://github.com/vapor/postgres-nio.git", exact: "1.20.0"),
        .package(url: "https://github.com/vapor/postgres-kit.git", exact: "2.11.3"),

and the following for the database setup:

docker run \
    --detach \
    --name timescaledb \
    --publish 5432:5432 \
    --env POSTGRES_HOST_AUTH_METHOD=trust \
    --env TIMESCALEDB_TELEMETRY=off \
    timescale/timescaledb:latest-pg16;

psql -h localhost -U postgres -w -c 'create database test'

the result will be:

PostgresNIO/ListenStateMachine.swift:182: Fatal error: Invalid state: initialized

Expected behavior

My assumption about the behavior for the code would simply be that once the EventLoopGroupConnectionPool gracefully shuts down, all outstanding listeners created with addListener would be cleaned up or closed since the connections they are listening on have been shutdown.

Environment

  • Xcode 15.0 w/ default swift toolchain
  • swift package init from the terminal
  • Vapor Framework version: N/A
  • Vapor Toolbox version: N/A
  • OS version: Sonoma 14.1.1
@CrownedPhoenix CrownedPhoenix added the bug Something isn't working label Feb 21, 2024
@fabianfett
Copy link
Collaborator

fabianfett commented Feb 21, 2024

@CrownedPhoenix Thank you for the bug report. In order to prioritize this, what do you want to achieve? Above code very much looks like it is geared towards reproducing the issue.

To achieve your use-case can you use the new async listen API? Maybe in connection with the new PostgresClient? Those APIs should be way more stable as well.

The code should look like this:

@_spi(ConnectionPool) import PostgresNIO
import Logging

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
enum App {
    static func main() async throws {
        let logger = Logger(label: "test")
        let configuration = PostgresClient.Configuration(
            host: "localhost",
            username: "postgres",
            password: "postgres",
            database: "test",
            tls: .disable
        )
        let client = PostgresClient(configuration: configuration, backgroundLogger: logger)

        await withTaskGroup(of: Void.self) { taskGroup in
            taskGroup.addTask {
                await client.run()
            }

            do {
                try await client.withConnection { connection in
                    for try await message in try await connection.listen("default") {
                        // you will stay forever in this loop, until you break out of it!
                        logger.info("Message received", metadata: ["message": "\(message)"])
                    }
                }
            } catch {
                logger.error("Error happened", metadata: ["error": "String(reflecting: error)"])
            }

            taskGroup.cancelAll() // shutdown postgres client when done
        }
    }
}

If you have any questions please feel free to reach out.

@CrownedPhoenix
Copy link
Author

Yeah - the reproducer I provided was mainly to simulate an error that I'm running into in practice in my test setup.
I think it is concurrency related based on the reproducer and all - but I haven't been able to decide if it's something I need to change in my code or if it's a PGNIO thing.

It just depends on whether the fatalError described above is expected behavior or if it is a state that shouldn't even be reachable under any scenario.

All said, I'll give the code changes you suggest here a try with the reproducer and then see if replicating them in my own test cases will resolve the issue for me.

@fabianfett
Copy link
Collaborator

It just depends on whether the fatalError described above is expected behavior or if it is a state that shouldn't even be reachable under any scenario.

Generally I thought the state should not be reachable. For this reason, I consider this a PostgresNIO issue, which is why I haven't closed this issue. However, to be honest, I never imagined that PostgresNIO was used in the way you do.

There is lot's to unpack in the code you provided and certain issues are foreseeable. This is why I provided you the code to show you, how I recommend to use PostgresNIO's listening support.

@CrownedPhoenix
Copy link
Author

Totally understand that - if I can find a solution that doesn't require you to change anything that will be great for me as well.
I'm going to see if I can refine the reproducer a little more.

In the meantime, do you have any idea why PostgresClient wouldn't be able to be found? I see the @available but I'm on Sonoma so I wouldn't expect that to be an issue.
I'm getting Cannot find ``PostgresClient`` in scope while other types from PostgresNIO are found just fine.
I get the same issue building on linux.
I'm just adding it straight to the code in the reproducer so no other dependencies or environment things have changed.

@CrownedPhoenix
Copy link
Author

Oh I need to do the @_spi thing I bet. Sorry i missed that.

@fabianfett
Copy link
Collaborator

Yeah. We intend to drop the requirement to use @_spi very soon.

@fabianfett
Copy link
Collaborator

Yeah. We intend to drop the requirement to use @_spi very soon.

#460

@CrownedPhoenix
Copy link
Author

CrownedPhoenix commented Feb 22, 2024

Alright, it looks like using PostgresClient resolves this issue.
However, it's not something I'm going to be able to use in the near-term because of sweeping the scope of the change that would be required to swap it out for EventLoopGroupConnectionPool. The main reason being that I can't get an SQLDatabase from PostgresClient synchronously like I can from EventLoopGroupConnectionPool with:

extension EventLoopGroupConnectionPool where Source == PostgresConnectionSource {
    public func database(logger: Logger) -> any PostgresDatabase {
        _EventLoopGroupConnectionPoolPostgresDatabase(pool: self, logger: logger)
    }
}

The analogous code with PostgresClient being:

try await client.withConnection({ $0.sql() })

Because of this there's a bunch of places where I depend on synchronous behavior that I'd have to change to async - which is doable just large scale in my case unfortunately.

I am interested in pursuing the use of PostgresClient though. So in the meantime I'm willing to consider this issue closed and leave a ticket in my backlog for swapping over to use PostgresClient.

@NeedleInAJayStack
Copy link
Contributor

NeedleInAJayStack commented Mar 18, 2024

I've been diving into this issue a bit recently as well, and I think the root of the issue is that @CrownedPhoenix is using let channel = "default", which is a Postgres reserved keyword. I've ensured LISTEN quoting in this MR: #466

@CrownedPhoenix
Copy link
Author

@NeedleInAJayStack makes an important point. At any rate, I would expect different error behavior if PGNIO wasn't able to fulfill the listen request.

@fabianfett
Copy link
Collaborator

@CrownedPhoenix This is why I removed that #466, fixes this issue. But I have now an easy way to replicate it :)

@NeedleInAJayStack
Copy link
Contributor

Oh good call - thanks for un-fixing. I totally agree that we ought to better protect against the fatalError case too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants