Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Query Hangs if Connection is Closed #487

Merged
merged 25 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9206b25
Add a test to reproduce the problem
MahdiBM Jun 19, 2024
5ff285b
fail query promise when channel write fails
MahdiBM Jun 19, 2024
3e49ef1
make sure the test succeeds
MahdiBM Jun 19, 2024
c2192a7
do the same for prepared statements query
MahdiBM Jun 19, 2024
a288cc2
minor refinements
MahdiBM Jun 19, 2024
d98944a
more clear query
MahdiBM Jun 19, 2024
00c11b6
more writePromise s catching channel failures
MahdiBM Jun 19, 2024
2ed7946
one more place
MahdiBM Jun 19, 2024
e92b2f0
move query into the loop
MahdiBM Jun 19, 2024
e127327
use a private function to simplify cascading channel failures
MahdiBM Jun 20, 2024
9ff54d9
better `write` function accepting `HandlerTask`, not a generic
MahdiBM Jun 20, 2024
fe8f9fe
better test, move the test to `PostgresConnectionTests`
MahdiBM Jun 20, 2024
2a43b88
move/rename test
MahdiBM Jun 20, 2024
e41c922
[build fix] forgot to change the passed parameter
MahdiBM Jun 20, 2024
607f26e
add a test for listen
MahdiBM Jun 20, 2024
f00099b
add another test for mid-way listens + fix the code
MahdiBM Jun 20, 2024
c43e781
spaces
MahdiBM Jun 20, 2024
8fe12f5
fix for resuming continuation multiple times
MahdiBM Jun 20, 2024
843d6a0
use backward-compatible `Task.sleep()`
MahdiBM Jun 20, 2024
3e56e49
add a listen test that does hang
MahdiBM Jun 22, 2024
552c913
aesthetics
MahdiBM Jun 22, 2024
c97e127
fix the hanging test: was a testing issue, not code
MahdiBM Jun 22, 2024
138aea3
add test for prepareStatement
MahdiBM Jun 22, 2024
22db6e4
add execute function tests
MahdiBM Jun 22, 2024
ceaa688
minor
MahdiBM Jun 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions Sources/PostgresNIO/Connection/PostgresConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ public final class PostgresConnection: @unchecked Sendable {
promise: promise
)

self.channel.write(HandlerTask.extendedQuery(context), promise: nil)
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
MahdiBM marked this conversation as resolved.
Show resolved Hide resolved

return promise.futureResult
}
Expand All @@ -239,7 +241,10 @@ public final class PostgresConnection: @unchecked Sendable {
promise: promise
)

self.channel.write(HandlerTask.extendedQuery(context), promise: nil)
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
MahdiBM marked this conversation as resolved.
Show resolved Hide resolved

return promise.futureResult.map { rowDescription in
PSQLPreparedStatement(name: name, query: query, connection: self, rowDescription: rowDescription)
}
Expand All @@ -255,15 +260,21 @@ public final class PostgresConnection: @unchecked Sendable {
logger: logger,
promise: promise)

self.channel.write(HandlerTask.extendedQuery(context), promise: nil)
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)

return promise.futureResult
}

func close(_ target: CloseTarget, logger: Logger) -> EventLoopFuture<Void> {
let promise = self.channel.eventLoop.makePromise(of: Void.self)
let context = CloseCommandContext(target: target, logger: logger, promise: promise)

self.channel.write(HandlerTask.closeCommand(context), promise: nil)
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(HandlerTask.closeCommand(context), promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)

return promise.futureResult
}

Expand Down Expand Up @@ -426,7 +437,9 @@ extension PostgresConnection {
promise: promise
)

self.channel.write(HandlerTask.extendedQuery(context), promise: nil)
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)

do {
return try await promise.futureResult.map({ $0.asyncSequence() }).get()
Expand Down Expand Up @@ -455,7 +468,11 @@ extension PostgresConnection {

let task = HandlerTask.startListening(listener)

self.channel.write(task, promise: nil)
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(task, promise: writePromise)
writePromise.futureResult.whenFailure { error in
continuation.resume(throwing: error)
}
}
} onCancel: {
let task = HandlerTask.cancelListening(channel, id)
Expand All @@ -480,7 +497,11 @@ extension PostgresConnection {
logger: logger,
promise: promise
))
self.channel.write(task, promise: nil)

let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(task, promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)

do {
return try await promise.futureResult
.map { $0.asyncSequence() }
Expand Down Expand Up @@ -515,7 +536,11 @@ extension PostgresConnection {
logger: logger,
promise: promise
))
self.channel.write(task, promise: nil)

let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(task, promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)

do {
return try await promise.futureResult
.map { $0.commandTag }
Expand Down Expand Up @@ -674,7 +699,7 @@ internal enum PostgresCommands: PostgresRequest {

/// Context for receiving NotificationResponse messages on a connection, used for PostgreSQL's `LISTEN`/`NOTIFY` support.
public final class PostgresListenContext: Sendable {
private let promise: EventLoopPromise<Void>
let promise: EventLoopPromise<Void>

var future: EventLoopFuture<Void> {
self.promise.futureResult
Expand Down Expand Up @@ -714,7 +739,9 @@ extension PostgresConnection {
)

let task = HandlerTask.startListening(listener)
self.channel.write(task, promise: nil)
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(task, promise: writePromise)
writePromise.futureResult.cascadeFailure(to: listenContext.promise)

listenContext.future.whenComplete { _ in
let task = HandlerTask.cancelListening(channel, id)
Expand Down
18 changes: 18 additions & 0 deletions Tests/IntegrationTests/PSQLIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,22 @@ final class IntegrationTests: XCTestCase {
}
}

func testConnectionClosureMidQueryDoesNotHang() async throws {
MahdiBM marked this conversation as resolved.
Show resolved Hide resolved
_ = await withThrowingTaskGroup(of: Void.self) { taskGroup in
for _ in (0 ..< 1_000) {
MahdiBM marked this conversation as resolved.
Show resolved Hide resolved
taskGroup.addTask {
let conn = try await PostgresConnection.test(
on: NIOSingletons.posixEventLoopGroup.next()
MahdiBM marked this conversation as resolved.
Show resolved Hide resolved
).get()

async let close: () = conn.closeGracefully()
async let query = conn.query("SELECT 1", logger: .psqlTest)
MahdiBM marked this conversation as resolved.
Show resolved Hide resolved

_ = try await (close, query)
}
}

// Ignore failures
}
}
}
Loading