Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch rows for consumption #180

Merged
merged 1 commit into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions Sources/PostgresNIO/Connection/PostgresConnection+Database.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@ extension PostgresConnection: PostgresDatabase {

switch command {
case .query(let query, let binds, let onMetadata, let onRow):
resultFuture = self.underlying.query(query, binds, logger: logger).flatMap { stream in
let fields = stream.rowDescription.map { column in
PostgresMessage.RowDescription.Field(
name: column.name,
tableOID: UInt32(column.tableOID),
columnAttributeNumber: column.columnAttributeNumber,
dataType: PostgresDataType(UInt32(column.dataType.rawValue)),
dataTypeSize: column.dataTypeSize,
dataTypeModifier: column.dataTypeModifier,
formatCode: .init(psqlFormatCode: column.format)
)
}

let lookupTable = PostgresRow.LookupTable(rowDescription: .init(fields: fields), resultFormat: [.binary])
return stream.iterateRowsWithoutBackpressureOption(lookupTable: lookupTable, onRow: onRow).map { _ in
onMetadata(PostgresQueryMetadata(string: stream.commandTag)!)
}
}
case .queryAll(let query, let binds, let onResult):
resultFuture = self.underlying.query(query, binds, logger: logger).flatMap { rows in
let fields = rows.rowDescription.map { column in
PostgresMessage.RowDescription.Field(
Expand All @@ -29,10 +48,18 @@ extension PostgresConnection: PostgresDatabase {
}

let lookupTable = PostgresRow.LookupTable(rowDescription: .init(fields: fields), resultFormat: [.binary])
return rows.iterateRowsWithoutBackpressureOption(lookupTable: lookupTable, onRow: onRow).map { _ in
onMetadata(PostgresQueryMetadata(string: rows.commandTag)!)
return rows.all().map { allrows in
let r = allrows.map { psqlRow -> PostgresRow in
let columns = psqlRow.data.columns.map {
PostgresMessage.DataRow.Column(value: $0)
}
return PostgresRow(dataRow: .init(columns: columns), lookupTable: lookupTable)
}

onResult(.init(metadata: PostgresQueryMetadata(string: rows.commandTag)!, rows: r))
}
}

case .prepareQuery(let request):
resultFuture = self.underlying.prepareStatement(request.query, with: request.name, logger: self.logger).map {
request.prepared = PreparedQuery(underlying: $0, database: self)
Expand Down Expand Up @@ -62,6 +89,9 @@ internal enum PostgresCommands: PostgresRequest {
binds: [PostgresData],
onMetadata: (PostgresQueryMetadata) -> () = { _ in },
onRow: (PostgresRow) throws -> ())
case queryAll(query: String,
binds: [PostgresData],
onResult: (PostgresQueryResult) -> ())
case prepareQuery(request: PrepareQueryRequest)
case executePreparedStatement(query: PreparedQuery, binds: [PostgresData], onRow: (PostgresRow) throws -> ())

Expand All @@ -82,18 +112,12 @@ extension PSQLRowStream {

func iterateRowsWithoutBackpressureOption(lookupTable: PostgresRow.LookupTable, onRow: @escaping (PostgresRow) throws -> ()) -> EventLoopFuture<Void> {
self.onRow { psqlRow in
let columns = psqlRow.data.columns.map { bytes in
PostgresMessage.DataRow.Column(value: bytes)
let columns = psqlRow.data.columns.map {
PostgresMessage.DataRow.Column(value: $0)
}

let row = PostgresRow(dataRow: .init(columns: columns), lookupTable: lookupTable)

do {
try onRow(row)
return self.eventLoop.makeSucceededFuture(Void())
} catch {
return self.eventLoop.makeFailedFuture(error)
}
try onRow(row)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,9 @@ struct ConnectionStateMachine {

// --- streaming actions
// actions if query has requested next row but we are waiting for backend
case forwardRow(PSQLBackendMessage.DataRow, to: EventLoopPromise<StateMachineStreamNextResult>)
case forwardCommandComplete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String, to: EventLoopPromise<StateMachineStreamNextResult>)
case forwardStreamError(PSQLError, to: EventLoopPromise<StateMachineStreamNextResult>, cleanupContext: CleanUpContext?)
// actions if query has not asked for next row but are pushing the final bytes to it
case forwardStreamErrorToCurrentQuery(PSQLError, read: Bool, cleanupContext: CleanUpContext?)
case forwardStreamCompletedToCurrentQuery(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String, read: Bool)
case forwardRows(CircularBuffer<PSQLBackendMessage.DataRow>)
case forwardStreamComplete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String)
case forwardStreamError(PSQLError, read: Bool, cleanupContext: CleanUpContext?)

// Prepare statement actions
case sendParseDescribeSync(name: String, query: String)
Expand Down Expand Up @@ -172,8 +169,10 @@ struct ConnectionStateMachine {
switch self.state {
case .initialized:
preconditionFailure("How can a connection be closed, if it was never connected.")

case .closed:
preconditionFailure("How can a connection be closed, if it is already closed.")

case .authenticated,
.sslRequestSent,
.sslNegotiated,
Expand All @@ -185,10 +184,12 @@ struct ConnectionStateMachine {
.prepareStatement,
.closeCommand:
return self.errorHappened(.uncleanShutdown)

case .error, .closing:
self.state = .closed
self.quiescingState = .notQuiescing
return .fireChannelInactive

case .modifying:
preconditionFailure("Invalid state")
}
Expand All @@ -199,36 +200,102 @@ struct ConnectionStateMachine {
case .sslRequestSent:
self.state = .sslNegotiated
return .establishSSLConnection
default:

case .initialized,
.sslNegotiated,
.sslHandlerAdded,
.waitingToStartAuthentication,
.authenticating,
.authenticated,
.readyForQuery,
.extendedQuery,
.prepareStatement,
.closeCommand,
.error,
.closing,
.closed:
return self.closeConnectionAndCleanup(.unexpectedBackendMessage(.sslSupported))

case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}

mutating func sslUnsupportedReceived() -> ConnectionAction {
switch self.state {
case .sslRequestSent:
return self.closeConnectionAndCleanup(.sslUnsupported)
default:

case .initialized,
.sslNegotiated,
.sslHandlerAdded,
.waitingToStartAuthentication,
.authenticating,
.authenticated,
.readyForQuery,
.extendedQuery,
.prepareStatement,
.closeCommand,
.error,
.closing,
.closed:
return self.closeConnectionAndCleanup(.unexpectedBackendMessage(.sslSupported))

case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}

mutating func sslHandlerAdded() -> ConnectionAction {
guard case .sslNegotiated = self.state else {
preconditionFailure("Can only add a ssl handler after negotiation")
switch self.state {
case .initialized,
.sslRequestSent,
.sslHandlerAdded,
.waitingToStartAuthentication,
.authenticating,
.authenticated,
.readyForQuery,
.extendedQuery,
.prepareStatement,
.closeCommand,
.error,
.closing,
.closed:
preconditionFailure("Can only add a ssl handler after negotiation: \(self.state)")

case .sslNegotiated:
self.state = .sslHandlerAdded
return .wait

case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}

self.state = .sslHandlerAdded
return .wait
}

mutating func sslEstablished() -> ConnectionAction {
guard case .sslHandlerAdded = self.state else {
preconditionFailure("Can only establish a ssl connection after adding a ssl handler")
switch self.state {
case .initialized,
.sslRequestSent,
.sslNegotiated,
.waitingToStartAuthentication,
.authenticating,
.authenticated,
.readyForQuery,
.extendedQuery,
.prepareStatement,
.closeCommand,
.error,
.closing,
.closed:
preconditionFailure("Can only establish a ssl connection after adding a ssl handler: \(self.state)")

case .sslHandlerAdded:
self.state = .waitingToStartAuthentication
return .provideAuthenticationContext

case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}

self.state = .waitingToStartAuthentication
return .provideAuthenticationContext
}

mutating func authenticationMessageReceived(_ message: PSQLBackendMessage.Authentication) -> ConnectionAction {
Expand Down Expand Up @@ -518,6 +585,35 @@ struct ConnectionStateMachine {
}
}

mutating func channelReadComplete() -> ConnectionAction {
switch self.state {
case .initialized,
.sslRequestSent,
.sslNegotiated,
.sslHandlerAdded,
.waitingToStartAuthentication,
.authenticating,
.authenticated,
.readyForQuery,
.prepareStatement,
.closeCommand,
.error,
.closing,
.closed:
return .wait

case .extendedQuery(var extendedQuery, let connectionContext):
return self.avoidingStateMachineCoW { machine in
let action = extendedQuery.channelReadComplete()
machine.state = .extendedQuery(extendedQuery, connectionContext)
return machine.modify(with: action)
}

case .modifying:
preconditionFailure("Invalid state")
}
}

mutating func readEventCaught() -> ConnectionAction {
switch self.state {
case .initialized:
Expand Down Expand Up @@ -562,7 +658,6 @@ struct ConnectionStateMachine {
preconditionFailure("How can we receive a read, if the connection is closed")
case .modifying:
preconditionFailure("Invalid state")

}
}

Expand Down Expand Up @@ -714,13 +809,13 @@ struct ConnectionStateMachine {
preconditionFailure("Unimplemented")
}

mutating func consumeNextQueryRow(promise: EventLoopPromise<StateMachineStreamNextResult>) -> ConnectionAction {
mutating func requestQueryRows() -> ConnectionAction {
guard case .extendedQuery(var queryState, let connectionContext) = self.state, !queryState.isComplete else {
preconditionFailure("Tried to consume next row, without active query")
}

return self.avoidingStateMachineCoW { machine -> ConnectionAction in
let action = queryState.consumeNextRow(promise: promise)
let action = queryState.requestQueryRows()
machine.state = .extendedQuery(queryState, connectionContext)
return machine.modify(with: action)
}
Expand Down Expand Up @@ -783,18 +878,15 @@ struct ConnectionStateMachine {
.sendBindExecuteSync,
.succeedQuery,
.succeedQueryNoRowsComming,
.forwardRow,
.forwardCommandComplete,
.forwardStreamCompletedToCurrentQuery,
.forwardRows,
.forwardStreamComplete,
.wait,
.read:
preconditionFailure("Expecting only failure actions if an error happened")
case .failQuery(let queryContext, with: let error):
return .failQuery(queryContext, with: error, cleanupContext: cleanupContext)
case .forwardStreamError(let error, to: let promise):
return .forwardStreamError(error, to: promise, cleanupContext: cleanupContext)
case .forwardStreamErrorToCurrentQuery(let error, read: let read):
return .forwardStreamErrorToCurrentQuery(error, read: read, cleanupContext: cleanupContext)
case .forwardStreamError(let error, let read):
return .forwardStreamError(error, read: read, cleanupContext: cleanupContext)
}
case .prepareStatement(var prepareStateMachine, _):
let cleanupContext = self.setErrorAndCreateCleanupContext(error)
Expand Down Expand Up @@ -1025,18 +1117,13 @@ extension ConnectionStateMachine {
return .succeedQuery(requestContext, columns: columns)
case .succeedQueryNoRowsComming(let requestContext, let commandTag):
return .succeedQueryNoRowsComming(requestContext, commandTag: commandTag)
case .forwardRow(let data, to: let promise):
return .forwardRow(data, to: promise)
case .forwardCommandComplete(let buffer, let commandTag, to: let promise):
return .forwardCommandComplete(buffer, commandTag: commandTag, to: promise)
case .forwardStreamError(let error, to: let promise):
let cleanupContext = self.setErrorAndCreateCleanupContextIfNeeded(error)
return .forwardStreamError(error, to: promise, cleanupContext: cleanupContext)
case .forwardStreamErrorToCurrentQuery(let error, let read):
case .forwardRows(let buffer):
return .forwardRows(buffer)
case .forwardStreamComplete(let buffer, let commandTag):
return .forwardStreamComplete(buffer, commandTag: commandTag)
case .forwardStreamError(let error, let read):
let cleanupContext = self.setErrorAndCreateCleanupContextIfNeeded(error)
return .forwardStreamErrorToCurrentQuery(error, read: read, cleanupContext: cleanupContext)
case .forwardStreamCompletedToCurrentQuery(let buffer, let commandTag, let read):
return .forwardStreamCompletedToCurrentQuery(buffer, commandTag: commandTag, read: read)
return .forwardStreamError(error, read: read, cleanupContext: cleanupContext)
case .read:
return .read
case .wait:
Expand Down Expand Up @@ -1104,14 +1191,6 @@ extension ConnectionStateMachine {
}
}

enum StateMachineStreamNextResult {
/// the next row
case row(PSQLBackendMessage.DataRow)

/// the query has completed, all remaining rows and the command completion tag
case complete(CircularBuffer<PSQLBackendMessage.DataRow>, commandTag: String)
}

struct SendPrepareStatement {
let name: String
let query: String
Expand Down
Loading