diff --git a/Sources/ConnectionPoolModule/Max2Sequence.swift b/Sources/ConnectionPoolModule/Max2Sequence.swift index 0feccd68..9b7d972b 100644 --- a/Sources/ConnectionPoolModule/Max2Sequence.swift +++ b/Sources/ConnectionPoolModule/Max2Sequence.swift @@ -95,8 +95,7 @@ extension Max2Sequence: ExpressibleByArrayLiteral { init(arrayLiteral elements: Element...) { precondition(elements.count <= 2) var iterator = elements.makeIterator() - self.first = iterator.next() - self.second = iterator.next() + self.init(iterator.next(), iterator.next()) } } diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift new file mode 100644 index 00000000..8ec99c7d --- /dev/null +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift @@ -0,0 +1,640 @@ +import Atomics + +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +extension PoolStateMachine { + + @usableFromInline + struct LeaseResult { + @usableFromInline + var connection: Connection + @usableFromInline + var timersToCancel: Max2Sequence + @usableFromInline + var wasIdle: Bool + @usableFromInline + var use: ConnectionGroup.ConnectionUse + + @inlinable + init( + connection: Connection, + timersToCancel: Max2Sequence, + wasIdle: Bool, + use: ConnectionGroup.ConnectionUse + ) { + self.connection = connection + self.timersToCancel = timersToCancel + self.wasIdle = wasIdle + self.use = use + } + } + + @usableFromInline + struct ConnectionGroup: Sendable { + @usableFromInline + struct Stats: Hashable, Sendable { + @usableFromInline var connecting: UInt16 = 0 + @usableFromInline var backingOff: UInt16 = 0 + @usableFromInline var idle: UInt16 = 0 + @usableFromInline var leased: UInt16 = 0 + @usableFromInline var runningKeepAlive: UInt16 = 0 + @usableFromInline var closing: UInt16 = 0 + + @usableFromInline var availableStreams: UInt16 = 0 + @usableFromInline var leasedStreams: UInt16 = 0 + + @usableFromInline var soonAvailable: UInt16 { + self.connecting + self.backingOff + self.runningKeepAlive + } + + @usableFromInline var active: UInt16 { + self.idle + self.leased + self.connecting + self.backingOff + } + } + + /// The minimum number of connections + @usableFromInline + let minimumConcurrentConnections: Int + + /// The maximum number of preserved connections + @usableFromInline + let maximumConcurrentConnectionSoftLimit: Int + + /// The absolute maximum number of connections + @usableFromInline + let maximumConcurrentConnectionHardLimit: Int + + @usableFromInline + let keepAlive: Bool + + @usableFromInline + let keepAliveReducesAvailableStreams: Bool + + /// A connectionID generator. + @usableFromInline + let generator: ConnectionIDGenerator + + /// The connections states + @usableFromInline + private(set) var connections: [ConnectionState] + + @usableFromInline + private(set) var stats = Stats() + + @inlinable + init( + generator: ConnectionIDGenerator, + minimumConcurrentConnections: Int, + maximumConcurrentConnectionSoftLimit: Int, + maximumConcurrentConnectionHardLimit: Int, + keepAlive: Bool, + keepAliveReducesAvailableStreams: Bool + ) { + self.generator = generator + self.connections = [] + self.minimumConcurrentConnections = minimumConcurrentConnections + self.maximumConcurrentConnectionSoftLimit = maximumConcurrentConnectionSoftLimit + self.maximumConcurrentConnectionHardLimit = maximumConcurrentConnectionHardLimit + self.keepAlive = keepAlive + self.keepAliveReducesAvailableStreams = keepAliveReducesAvailableStreams + } + + var isEmpty: Bool { + self.connections.isEmpty + } + + @usableFromInline + var canGrow: Bool { + self.stats.active < self.maximumConcurrentConnectionHardLimit + } + + @usableFromInline + var soonAvailableConnections: UInt16 { + self.stats.soonAvailable + } + + // MARK: - Mutations - + + /// A connection's use. Is it persisted or an overflow connection? + @usableFromInline + enum ConnectionUse: Equatable { + case persisted + case demand + case overflow + } + + /// Information around an idle connection. + @usableFromInline + struct AvailableConnectionContext { + /// The connection's use. Either general purpose or for requests with `EventLoop` + /// requirements. + @usableFromInline + var use: ConnectionUse + + @usableFromInline + var info: ConnectionAvailableInfo + } + + /// Information around the failed/closed connection. + @usableFromInline + struct FailedConnectionContext { + /// Connections that are currently starting + @usableFromInline + var connectionsStarting: Int + + @inlinable + init(connectionsStarting: Int) { + self.connectionsStarting = connectionsStarting + } + } + + mutating func refillConnections() -> [ConnectionRequest] { + let existingConnections = self.stats.active + let missingConnection = self.minimumConcurrentConnections - Int(existingConnections) + guard missingConnection > 0 else { + return [] + } + + var requests = [ConnectionRequest]() + requests.reserveCapacity(missingConnection) + + for _ in 0.. ConnectionRequest? { + precondition(self.minimumConcurrentConnections <= self.stats.active) + guard self.maximumConcurrentConnectionSoftLimit > self.stats.active else { + return nil + } + return self.createNewConnection() + } + + @inlinable + mutating func createNewOverflowConnectionIfPossible() -> ConnectionRequest? { + precondition(self.maximumConcurrentConnectionSoftLimit <= self.stats.active) + guard self.maximumConcurrentConnectionHardLimit > self.stats.active else { + return nil + } + return self.createNewConnection() + } + + @inlinable + /*private*/ mutating func createNewConnection() -> ConnectionRequest { + precondition(self.canGrow) + self.stats.connecting += 1 + let connectionID = self.generator.next() + let connection = ConnectionState(id: connectionID) + self.connections.append(connection) + return ConnectionRequest(connectionID: connectionID) + } + + /// A new ``Connection`` was established. + /// + /// This will put the connection into the idle state. + /// + /// - Parameter connection: The new established connection. + /// - Returns: An index and an IdleConnectionContext to determine the next action for the now idle connection. + /// Call ``parkConnection(at:)``, ``leaseConnection(at:)`` or ``closeConnection(at:)`` + /// with the supplied index after this. + @inlinable + mutating func newConnectionEstablished(_ connection: Connection, maxStreams: UInt16) -> (Int, AvailableConnectionContext) { + guard let index = self.connections.firstIndex(where: { $0.id == connection.id }) else { + preconditionFailure("There is a new connection that we didn't request!") + } + self.stats.connecting -= 1 + self.stats.idle += 1 + self.stats.availableStreams += maxStreams + let connectionInfo = self.connections[index].connected(connection, maxStreams: maxStreams) + // TODO: If this is an overflow connection, but we are currently also creating a + // persisted connection, we might want to swap those. + let context = self.makeAvailableConnectionContextForConnection(at: index, info: connectionInfo) + return (index, context) + } + + @inlinable + mutating func backoffNextConnectionAttempt(_ connectionID: Connection.ID) -> ConnectionTimer { + guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { + preconditionFailure("We tried to create a new connection that we know nothing about?") + } + + self.stats.connecting -= 1 + self.stats.backingOff += 1 + + return self.connections[index].failedToConnect() + } + + @usableFromInline + enum BackoffDoneAction { + case createConnection(ConnectionRequest, TimerCancellationToken?) + case cancelTimers(Max2Sequence) + } + + @inlinable + mutating func backoffDone(_ connectionID: Connection.ID, retry: Bool) -> BackoffDoneAction { + guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { + preconditionFailure("We tried to create a new connection that we know nothing about?") + } + + self.stats.backingOff -= 1 + + if retry || self.stats.active < self.minimumConcurrentConnections { + self.stats.connecting += 1 + let backoffTimerCancellation = self.connections[index].retryConnect() + return .createConnection(.init(connectionID: connectionID), backoffTimerCancellation) + } + + let backoffTimerCancellation = self.connections[index].destroyBackingOffConnection() + var timerCancellations = Max2Sequence(backoffTimerCancellation) + + if let timerCancellationToken = self.swapForDeletion(index: index) { + timerCancellations.append(timerCancellationToken) + } + return .cancelTimers(timerCancellations) + } + + @inlinable + mutating func timerScheduled( + _ timer: ConnectionTimer, + cancelContinuation: TimerCancellationToken + ) -> TimerCancellationToken? { + guard let index = self.connections.firstIndex(where: { $0.id == timer.connectionID }) else { + return cancelContinuation + } + + return self.connections[index].timerScheduled(timer, cancelContinuation: cancelContinuation) + } + + // MARK: Leasing and releasing + + /// Lease a connection, if an idle connection is available. + /// + /// - Returns: A connection to execute a request on. + @inlinable + mutating func leaseConnection() -> LeaseResult? { + if self.stats.availableStreams == 0 { + return nil + } + + guard let index = self.findAvailableConnection() else { + preconditionFailure("Stats and actual count are of.") + } + + return self.leaseConnection(at: index, streams: 1) + } + + @usableFromInline + enum LeasedConnectionOrStartingCount { + case leasedConnection(LeaseResult) + case startingCount(UInt16) + } + + @inlinable + mutating func leaseConnectionOrSoonAvailableConnectionCount() -> LeasedConnectionOrStartingCount { + if let result = self.leaseConnection() { + return .leasedConnection(result) + } + return .startingCount(self.stats.soonAvailable) + } + + @inlinable + mutating func leaseConnection(at index: Int, streams: UInt16) -> LeaseResult { + let leaseResult = self.connections[index].lease(streams: streams) + let use = self.getConnectionUse(index: index) + + if leaseResult.wasIdle { + self.stats.idle -= 1 + self.stats.leased += 1 + } + self.stats.leasedStreams += streams + self.stats.availableStreams -= streams + return LeaseResult( + connection: leaseResult.connection, + timersToCancel: leaseResult.timersToCancel, + wasIdle: leaseResult.wasIdle, + use: use + ) + } + + @inlinable + mutating func parkConnection(at index: Int) -> Max2Sequence { + let scheduleIdleTimeoutTimer: Bool + switch index { + case 0.. (Int, AvailableConnectionContext) { + guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { + preconditionFailure("A connection that we don't know was released? Something is very wrong...") + } + + let connectionInfo = self.connections[index].release(streams: streams) + self.stats.availableStreams += streams + self.stats.leasedStreams -= streams + switch connectionInfo { + case .idle: + self.stats.idle += 1 + self.stats.leased -= 1 + case .leased: + break + } + + let context = self.makeAvailableConnectionContextForConnection(at: index, info: connectionInfo) + return (index, context) + } + + @inlinable + mutating func keepAliveIfIdle(_ connectionID: Connection.ID) -> KeepAliveAction? { + guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { + // because of a race this connection (connection close runs against trigger of ping pong) + // was already removed from the state machine. + return nil + } + + guard let action = self.connections[index].runKeepAliveIfIdle(reducesAvailableStreams: self.keepAliveReducesAvailableStreams) else { + return nil + } + + self.stats.runningKeepAlive += 1 + if self.keepAliveReducesAvailableStreams { + self.stats.availableStreams -= 1 + } + + return action + } + + @inlinable + mutating func keepAliveSucceeded(_ connectionID: Connection.ID) -> (Int, AvailableConnectionContext)? { + guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { + preconditionFailure("A connection that we don't know was released? Something is very wrong...") + } + + guard let connectionInfo = self.connections[index].keepAliveSucceeded() else { + // if we don't get connection info here this means, that the connection already was + // transitioned to closing. when we did this we already decremented the + // runningKeepAlive timer. + return nil + } + + self.stats.runningKeepAlive -= 1 + if self.keepAliveReducesAvailableStreams { + self.stats.availableStreams += 1 + } + + let context = self.makeAvailableConnectionContextForConnection(at: index, info: connectionInfo) + return (index, context) + } + + // MARK: Connection close/removal + + @usableFromInline + struct CloseAction { + @usableFromInline + private(set) var connection: Connection + + @usableFromInline + private(set) var timersToCancel: Max2Sequence + + @inlinable + init(connection: Connection, timersToCancel: Max2Sequence) { + self.connection = connection + self.timersToCancel = timersToCancel + } + } + + /// Closes the connection at the given index. + @inlinable + mutating func closeConnectionIfIdle(at index: Int) -> CloseAction { + guard let closeAction = self.connections[index].closeIfIdle() else { + preconditionFailure("Invalid state: \(self)") + } + + self.stats.idle -= 1 + self.stats.closing += 1 + +// if idleState.runningKeepAlive { +// self.stats.runningKeepAlive -= 1 +// if self.keepAliveReducesAvailableStreams { +// self.stats.availableStreams += 1 +// } +// } + + self.stats.availableStreams -= closeAction.maxStreams + + return CloseAction( + connection: closeAction.connection!, + timersToCancel: closeAction.cancelTimers + ) + } + + @inlinable + mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> CloseAction? { + guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { + // because of a race this connection (connection close runs against trigger of timeout) + // was already removed from the state machine. + return nil + } + + if index < self.minimumConcurrentConnections { + // because of a race a connection might receive a idle timeout after it was moved into + // the persisted connections. If a connection is now persisted, we now need to ignore + // the trigger + return nil + } + + return self.closeConnectionIfIdle(at: index) + } + + /// Connection closed. Call this method, if a connection is closed. + /// + /// This will put the position into the closed state. + /// + /// - Parameter connectionID: The failed connection's id. + /// - Returns: An optional index and an IdleConnectionContext to determine the next action for the closed connection. + /// You must call ``removeConnection(at:)`` or ``replaceConnection(at:)`` with the + /// supplied index after this. If nil is returned the connection was closed by the state machine and was + /// therefore already removed. + @inlinable + mutating func connectionClosed(_ connectionID: Connection.ID) -> FailedConnectionContext? { + guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { + return nil + } + + let closedAction = self.connections[index].closed() + + if closedAction.wasRunningKeepAlive { + self.stats.runningKeepAlive -= 1 + } + self.stats.leasedStreams -= closedAction.usedStreams + self.stats.availableStreams -= closedAction.maxStreams - closedAction.usedStreams + + switch closedAction.previousConnectionState { + case .idle: + self.stats.idle -= 1 + + case .leased: + self.stats.leased -= 1 + + case .closing: + self.stats.closing -= 1 + } + + let lastIndex = self.connections.index(before: self.connections.endIndex) + + if index == lastIndex { + self.connections.remove(at: index) + } else { + self.connections.swapAt(index, lastIndex) + self.connections.remove(at: lastIndex) + } + + return FailedConnectionContext(connectionsStarting: 0) + } + + // MARK: Shutdown + + mutating func triggerForceShutdown(_ cleanup: inout ConnectionAction.Shutdown) { + for var connectionState in self.connections { + guard let closeAction = connectionState.close() else { + continue + } + + if let connection = closeAction.connection { + cleanup.connections.append(connection) + } + cleanup.timersToCancel.append(contentsOf: closeAction.cancelTimers) + } + + self.connections = [] + } + + // MARK: - Private functions - + + @usableFromInline + /*private*/ func getConnectionUse(index: Int) -> ConnectionUse { + switch index { + case 0.. AvailableConnectionContext { + precondition(self.connections[index].isAvailable) + let use = self.getConnectionUse(index: index) + return AvailableConnectionContext(use: use, info: info) + } + + @inlinable + /*private*/ func findAvailableConnection() -> Int? { + return self.connections.firstIndex(where: { $0.isAvailable }) + } + + @inlinable + /*private*/ mutating func swapForDeletion(index indexToDelete: Int) -> TimerCancellationToken? { + let maybeLastConnectedIndex = self.connections.lastIndex(where: { $0.isConnected }) + + if maybeLastConnectedIndex == nil || maybeLastConnectedIndex! < indexToDelete { + self.removeO1(indexToDelete) + return nil + } + + // if maybeLastConnectedIndex == nil, we return early in the above if case. + let lastConnectedIndex = maybeLastConnectedIndex! + + switch indexToDelete { + case 0.. State.Timer { - defer { self.nextTimerID += 1 } - return State.Timer(id: self.nextTimerID) - } - /// The connection failed to start @inlinable mutating func failedToConnect() -> ConnectionTimer { switch self.state { case .starting: - let backoffTimerState = self.nextTimer() + let backoffTimerState = self._nextTimer() self.state = .backingOff(backoffTimerState) return ConnectionTimer(timerID: backoffTimerState.timerID, connectionID: self.id, usecase: .backoff) @@ -311,6 +295,17 @@ extension PoolStateMachine { } } + @inlinable + mutating func destroyBackingOffConnection() -> TimerCancellationToken? { + switch self.state { + case .backingOff(let timer): + self.state = .closed + return timer.cancellationContinuation + case .starting, .idle, .leased, .closing, .closed: + preconditionFailure("Invalid state: \(self.state)") + } + } + @usableFromInline struct LeaseAction { @usableFromInline @@ -468,78 +463,211 @@ extension PoolStateMachine { } } + @inlinable + mutating func cancelIdleTimer() -> TimerCancellationToken? { + switch self.state { + case .starting, .backingOff, .leased, .closing, .closed: + return nil + + case .idle(let connection, let maxStreams, let keepAlive, let idleTimer): + self.state = .idle(connection, maxStreams: maxStreams, keepAlive: keepAlive, idleTimer: nil) + return idleTimer?.cancellationContinuation + } + } + @usableFromInline struct CloseAction { + @usableFromInline - var connection: Connection + enum PreviousConnectionState { + case idle + case leased + case closing + case backingOff + } + + @usableFromInline + var connection: Connection? + @usableFromInline + var previousConnectionState: PreviousConnectionState @usableFromInline var cancelTimers: Max2Sequence @usableFromInline + var usedStreams: UInt16 + @usableFromInline var maxStreams: UInt16 @inlinable - init(connection: Connection, cancelTimers: Max2Sequence, maxStreams: UInt16) { + init( + connection: Connection?, + previousConnectionState: PreviousConnectionState, + cancelTimers: Max2Sequence, + usedStreams: UInt16, + maxStreams: UInt16 + ) { self.connection = connection + self.previousConnectionState = previousConnectionState self.cancelTimers = cancelTimers + self.usedStreams = usedStreams self.maxStreams = maxStreams } } @inlinable - mutating func close() -> CloseAction { + mutating func closeIfIdle() -> CloseAction? { switch self.state { case .idle(let connection, let maxStreams, var keepAlive, let idleTimerState): self.state = .closing(connection) return CloseAction( connection: connection, + previousConnectionState: .idle, cancelTimers: Max2Sequence( keepAlive.cancelTimerIfScheduled(), idleTimerState?.cancellationContinuation ), + usedStreams: keepAlive.usedStreams, maxStreams: maxStreams ) - case .backingOff, .starting, .leased, .closing, .closed: + case .leased, .closed: + return nil + + case .backingOff, .starting, .closing: preconditionFailure("Invalid state: \(self.state)") } } @inlinable - mutating func closeIfIdle() -> CloseAction? { + mutating func close() -> CloseAction? { switch self.state { - case .idle: - return self.close() - case .leased, .closed: + case .starting: + // If we are currently starting, there is nothing we can do about it right now. + // Only once the connection has come up, or failed, we can actually act. return nil - case .backingOff, .starting, .closing: - preconditionFailure("Invalid state: \(self.state)") + + case .closing, .closed: + // If we are already closing, we can't do anything else. + return nil + + case .idle(let connection, let maxStreams, var keepAlive, let idleTimerState): + self.state = .closing(connection) + return CloseAction( + connection: connection, + previousConnectionState: .idle, + cancelTimers: Max2Sequence( + keepAlive.cancelTimerIfScheduled(), + idleTimerState?.cancellationContinuation + ), + usedStreams: keepAlive.usedStreams, + maxStreams: maxStreams + ) + + case .leased(let connection, usedStreams: let usedStreams, maxStreams: let maxStreams, var keepAlive): + self.state = .closing(connection) + return CloseAction( + connection: connection, + previousConnectionState: .leased, + cancelTimers: Max2Sequence( + keepAlive.cancelTimerIfScheduled() + ), + usedStreams: keepAlive.usedStreams + usedStreams, + maxStreams: maxStreams + ) + + case .backingOff(let timer): + self.state = .closed + return CloseAction( + connection: nil, + previousConnectionState: .backingOff, + cancelTimers: Max2Sequence(timer.cancellationContinuation), + usedStreams: 0, + maxStreams: 0 + ) } } @usableFromInline - struct ShutdownAction { + struct ClosedAction { + @usableFromInline - var connection: Connection? + enum PreviousConnectionState { + case idle + case leased + case closing + } + @usableFromInline - var timersToCancel: Max2Sequence + var previousConnectionState: PreviousConnectionState + @usableFromInline + var cancelTimers: Max2Sequence @usableFromInline var maxStreams: UInt16 @usableFromInline var usedStreams: UInt16 + @usableFromInline + var wasRunningKeepAlive: Bool @inlinable init( - connection: Connection? = nil, - timersToCancel: Max2Sequence = .init(), - maxStreams: UInt16 = 0, - usedStreams: UInt16 = 0 + previousConnectionState: PreviousConnectionState, + cancelTimers: Max2Sequence, + maxStreams: UInt16, + usedStreams: UInt16, + wasRunningKeepAlive: Bool ) { - self.connection = connection - self.timersToCancel = timersToCancel + self.previousConnectionState = previousConnectionState + self.cancelTimers = cancelTimers self.maxStreams = maxStreams self.usedStreams = usedStreams + self.wasRunningKeepAlive = wasRunningKeepAlive + } + } + + @inlinable + mutating func closed() -> ClosedAction { + switch self.state { + case .starting, .backingOff, .closed: + preconditionFailure("Invalid state: \(self.state)") + + case .idle(_, let maxStreams, var keepAlive, let idleTimer): + self.state = .closed + return ClosedAction( + previousConnectionState: .idle, + cancelTimers: .init(keepAlive.cancelTimerIfScheduled(), idleTimer?.cancellationContinuation), + maxStreams: maxStreams, + usedStreams: keepAlive.usedStreams, + wasRunningKeepAlive: keepAlive.isRunning + ) + + case .leased(_, let usedStreams, let maxStreams, let keepAlive): + self.state = .closed + return ClosedAction( + previousConnectionState: .leased, + cancelTimers: .init(), + maxStreams: maxStreams, + usedStreams: usedStreams + keepAlive.usedStreams, + wasRunningKeepAlive: keepAlive.isRunning + ) + + case .closing: + self.state = .closed + return ClosedAction( + previousConnectionState: .closing, + cancelTimers: .init(), + maxStreams: 0, + usedStreams: 0, + wasRunningKeepAlive: false + ) } } + + // MARK: - Private Methods - + + @inlinable + mutating /*private*/ func _nextTimer() -> State.Timer { + defer { self.nextTimerID += 1 } + return State.Timer(id: self.nextTimerID) + } } @usableFromInline diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index dc18784f..29349e56 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -37,7 +37,7 @@ struct PoolStateMachine< ConnectionID: Hashable & Sendable, Request: ConnectionRequestProtocol, RequestID, - TimerCancellationToken + TimerCancellationToken: Sendable > where Connection.ID == ConnectionID, ConnectionIDGenerator.ID == ConnectionID, RequestID == Request.ID { @usableFromInline diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift new file mode 100644 index 00000000..4e3a1647 --- /dev/null +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift @@ -0,0 +1,294 @@ +import XCTest +@testable import _ConnectionPoolModule + +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +final class PoolStateMachine_ConnectionGroupTests: XCTestCase { + var idGenerator: ConnectionIDGenerator! + + override func setUp() { + self.idGenerator = ConnectionIDGenerator() + super.setUp() + } + + override func tearDown() { + self.idGenerator = nil + super.tearDown() + } + + func testRefillConnections() { + var connections = TestPoolStateMachine.ConnectionGroup( + generator: self.idGenerator, + minimumConcurrentConnections: 4, + maximumConcurrentConnectionSoftLimit: 4, + maximumConcurrentConnectionHardLimit: 4, + keepAlive: true, + keepAliveReducesAvailableStreams: true + ) + + XCTAssertTrue(connections.isEmpty) + let requests = connections.refillConnections() + XCTAssertFalse(connections.isEmpty) + + XCTAssertEqual(requests.count, 4) + XCTAssertNil(connections.createNewDemandConnectionIfPossible()) + XCTAssertNil(connections.createNewOverflowConnectionIfPossible()) + XCTAssertEqual(connections.stats, .init(connecting: 4)) + XCTAssertEqual(connections.soonAvailableConnections, 4) + + let requests2 = connections.refillConnections() + XCTAssertTrue(requests2.isEmpty) + + var connected: UInt16 = 0 + for request in requests { + let newConnection = MockConnection(id: request.connectionID) + let (_, context) = connections.newConnectionEstablished(newConnection, maxStreams: 1) + XCTAssertEqual(context.info, .idle(availableStreams: 1, newIdle: true)) + XCTAssertEqual(context.use, .persisted) + connected += 1 + XCTAssertEqual(connections.stats, .init(connecting: 4 - connected, idle: connected, availableStreams: connected)) + XCTAssertEqual(connections.soonAvailableConnections, 4 - connected) + } + + let requests3 = connections.refillConnections() + XCTAssertTrue(requests3.isEmpty) + } + + func testMakeConnectionLeaseItAndDropItHappyPath() { + var connections = TestPoolStateMachine.ConnectionGroup( + generator: self.idGenerator, + minimumConcurrentConnections: 0, + maximumConcurrentConnectionSoftLimit: 4, + maximumConcurrentConnectionHardLimit: 4, + keepAlive: true, + keepAliveReducesAvailableStreams: true + ) + + let requests = connections.refillConnections() + XCTAssertTrue(connections.isEmpty) + XCTAssertTrue(requests.isEmpty) + + guard let request = connections.createNewDemandConnectionIfPossible() else { + return XCTFail("Expected to receive a connection request") + } + XCTAssertEqual(request, .init(connectionID: 0)) + XCTAssertFalse(connections.isEmpty) + XCTAssertEqual(connections.soonAvailableConnections, 1) + XCTAssertEqual(connections.stats, .init(connecting: 1)) + + let newConnection = MockConnection(id: request.connectionID) + let (_, establishedContext) = connections.newConnectionEstablished(newConnection, maxStreams: 1) + XCTAssertEqual(establishedContext.info, .idle(availableStreams: 1, newIdle: true)) + XCTAssertEqual(establishedContext.use, .demand) + XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1)) + XCTAssertEqual(connections.soonAvailableConnections, 0) + + guard case .leasedConnection(let leaseResult) = connections.leaseConnectionOrSoonAvailableConnectionCount() else { + return XCTFail("Expected to lease a connection") + } + XCTAssert(newConnection === leaseResult.connection) + XCTAssertEqual(connections.stats, .init(leased: 1, leasedStreams: 1)) + + let (index, releasedContext) = connections.releaseConnection(leaseResult.connection.id, streams: 1) + XCTAssertEqual(releasedContext.info, .idle(availableStreams: 1, newIdle: true)) + XCTAssertEqual(releasedContext.use, .demand) + XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1)) + + let parkTimers = connections.parkConnection(at: index) + XCTAssertEqual(parkTimers, [ + .init(timerID: 0, connectionID: newConnection.id, usecase: .keepAlive), + .init(timerID: 1, connectionID: newConnection.id, usecase: .idleTimeout), + ]) + + guard let keepAliveAction = connections.keepAliveIfIdle(newConnection.id) else { + return XCTFail("Expected to get a connection for ping pong") + } + XCTAssert(newConnection === keepAliveAction.connection) + XCTAssertEqual(connections.stats, .init(idle: 1, runningKeepAlive: 1, availableStreams: 0)) + + guard let (_, pingPongContext) = connections.keepAliveSucceeded(newConnection.id) else { + return XCTFail("Expected to get an AvailableContext") + } + XCTAssertEqual(pingPongContext.info, .idle(availableStreams: 1, newIdle: false)) + XCTAssertEqual(releasedContext.use, .demand) + XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1)) + + guard let closeAction = connections.closeConnectionIfIdle(newConnection.id) else { + return XCTFail("Expected to get a connection for ping pong") + } + XCTAssertEqual(closeAction.timersToCancel, []) + XCTAssert(closeAction.connection === newConnection) + XCTAssertEqual(connections.stats, .init(closing: 1, availableStreams: 0)) + + let closeContext = connections.connectionClosed(newConnection.id) + XCTAssertEqual(closeContext?.connectionsStarting, 0) + XCTAssertTrue(connections.isEmpty) + XCTAssertEqual(connections.stats, .init()) + } + + func testBackoffDoneCreatesANewConnectionToReachMinimumConnectionsEvenThoughRetryIsSetToFalse() { + var connections = TestPoolStateMachine.ConnectionGroup( + generator: self.idGenerator, + minimumConcurrentConnections: 1, + maximumConcurrentConnectionSoftLimit: 4, + maximumConcurrentConnectionHardLimit: 4, + keepAlive: true, + keepAliveReducesAvailableStreams: true + ) + + let requests = connections.refillConnections() + XCTAssertEqual(connections.stats, .init(connecting: 1)) + XCTAssertEqual(connections.soonAvailableConnections, 1) + XCTAssertFalse(connections.isEmpty) + XCTAssertEqual(requests.count, 1) + + guard let request = requests.first else { return XCTFail("Expected to receive a connection request") } + XCTAssertEqual(request, .init(connectionID: 0)) + + let backoffTimer = connections.backoffNextConnectionAttempt(request.connectionID) + XCTAssertEqual(connections.stats, .init(backingOff: 1)) + let backoffTimerCancellationToken = MockTimerCancellationToken(backoffTimer) + XCTAssertNil(connections.timerScheduled(backoffTimer, cancelContinuation: backoffTimerCancellationToken)) + + let backoffDoneAction = connections.backoffDone(request.connectionID, retry: false) + XCTAssertEqual(backoffDoneAction, .createConnection(.init(connectionID: 0), backoffTimerCancellationToken)) + + XCTAssertEqual(connections.stats, .init(connecting: 1)) + } + + func testBackoffDoneCancelsIdleTimerIfAPersistedConnectionIsNotRetried() { + var connections = TestPoolStateMachine.ConnectionGroup( + generator: self.idGenerator, + minimumConcurrentConnections: 2, + maximumConcurrentConnectionSoftLimit: 4, + maximumConcurrentConnectionHardLimit: 4, + keepAlive: true, + keepAliveReducesAvailableStreams: true + ) + + let requests = connections.refillConnections() + XCTAssertEqual(connections.stats, .init(connecting: 2)) + XCTAssertEqual(connections.soonAvailableConnections, 2) + XCTAssertFalse(connections.isEmpty) + XCTAssertEqual(requests.count, 2) + + var requestIterator = requests.makeIterator() + guard let firstRequest = requestIterator.next(), let secondRequest = requestIterator.next() else { + return XCTFail("Expected to get two requests") + } + + guard let thirdRequest = connections.createNewDemandConnectionIfPossible() else { + return XCTFail("Expected to get another request") + } + XCTAssertEqual(connections.stats, .init(connecting: 3)) + + let newSecondConnection = MockConnection(id: secondRequest.connectionID) + let (_, establishedSecondConnectionContext) = connections.newConnectionEstablished(newSecondConnection, maxStreams: 1) + XCTAssertEqual(establishedSecondConnectionContext.info, .idle(availableStreams: 1, newIdle: true)) + XCTAssertEqual(establishedSecondConnectionContext.use, .persisted) + XCTAssertEqual(connections.stats, .init(connecting: 2, idle: 1, availableStreams: 1)) + XCTAssertEqual(connections.soonAvailableConnections, 2) + + let newThirdConnection = MockConnection(id: thirdRequest.connectionID) + let (thirdConnectionIndex, establishedThirdConnectionContext) = connections.newConnectionEstablished(newThirdConnection, maxStreams: 1) + XCTAssertEqual(establishedThirdConnectionContext.info, .idle(availableStreams: 1, newIdle: true)) + XCTAssertEqual(establishedThirdConnectionContext.use, .demand) + XCTAssertEqual(connections.stats, .init(connecting: 1, idle: 2, availableStreams: 2)) + XCTAssertEqual(connections.soonAvailableConnections, 1) + let thirdConnKeepTimer = TestPoolStateMachine.ConnectionTimer(timerID: 0, connectionID: thirdRequest.connectionID, usecase: .keepAlive) + let thirdConnIdleTimer = TestPoolStateMachine.ConnectionTimer(timerID: 1, connectionID: thirdRequest.connectionID, usecase: .idleTimeout) + let thirdConnIdleTimerCancellationToken = MockTimerCancellationToken(thirdConnIdleTimer) + XCTAssertEqual(connections.parkConnection(at: thirdConnectionIndex), [thirdConnKeepTimer, thirdConnIdleTimer]) + + XCTAssertNil(connections.timerScheduled(thirdConnKeepTimer, cancelContinuation: .init(thirdConnKeepTimer))) + XCTAssertNil(connections.timerScheduled(thirdConnIdleTimer, cancelContinuation: thirdConnIdleTimerCancellationToken)) + + let backoffTimer = connections.backoffNextConnectionAttempt(firstRequest.connectionID) + XCTAssertEqual(connections.stats, .init(backingOff: 1, idle: 2, availableStreams: 2)) + let backoffTimerCancellationToken = MockTimerCancellationToken(backoffTimer) + XCTAssertNil(connections.timerScheduled(backoffTimer, cancelContinuation: backoffTimerCancellationToken)) + XCTAssertEqual(connections.stats, .init(backingOff: 1, idle: 2, availableStreams: 2)) + + // connection three should be moved to connection one and for this reason become permanent + + XCTAssertEqual(connections.backoffDone(firstRequest.connectionID, retry: false), .cancelTimers([backoffTimerCancellationToken, thirdConnIdleTimerCancellationToken])) + XCTAssertEqual(connections.stats, .init(idle: 2, availableStreams: 2)) + + XCTAssertNil(connections.closeConnectionIfIdle(newThirdConnection.id)) + } + + func testBackoffDoneReturnsNilIfOverflowConnection() { + var connections = TestPoolStateMachine.ConnectionGroup( + generator: self.idGenerator, + minimumConcurrentConnections: 0, + maximumConcurrentConnectionSoftLimit: 4, + maximumConcurrentConnectionHardLimit: 4, + keepAlive: true, + keepAliveReducesAvailableStreams: true + ) + + guard let firstRequest = connections.createNewDemandConnectionIfPossible() else { + return XCTFail("Expected to get two requests") + } + + guard let secondRequest = connections.createNewDemandConnectionIfPossible() else { + return XCTFail("Expected to get another request") + } + XCTAssertEqual(connections.stats, .init(connecting: 2)) + + let newFirstConnection = MockConnection(id: firstRequest.connectionID) + let (_, establishedFirstConnectionContext) = connections.newConnectionEstablished(newFirstConnection, maxStreams: 1) + XCTAssertEqual(establishedFirstConnectionContext.info, .idle(availableStreams: 1, newIdle: true)) + XCTAssertEqual(establishedFirstConnectionContext.use, .demand) + XCTAssertEqual(connections.stats, .init(connecting: 1, idle: 1, availableStreams: 1)) + XCTAssertEqual(connections.soonAvailableConnections, 1) + + let backoffTimer = connections.backoffNextConnectionAttempt(secondRequest.connectionID) + let backoffTimerCancellationToken = MockTimerCancellationToken(backoffTimer) + XCTAssertEqual(connections.stats, .init(backingOff: 1, idle: 1, availableStreams: 1)) + XCTAssertNil(connections.timerScheduled(backoffTimer, cancelContinuation: backoffTimerCancellationToken)) + + XCTAssertEqual(connections.backoffDone(secondRequest.connectionID, retry: false), .cancelTimers([backoffTimerCancellationToken])) + XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1)) + + XCTAssertNotNil(connections.closeConnectionIfIdle(newFirstConnection.id)) + } + + func testPingPong() { + var connections = TestPoolStateMachine.ConnectionGroup( + generator: self.idGenerator, + minimumConcurrentConnections: 1, + maximumConcurrentConnectionSoftLimit: 4, + maximumConcurrentConnectionHardLimit: 4, + keepAlive: true, + keepAliveReducesAvailableStreams: true + ) + + let requests = connections.refillConnections() + XCTAssertFalse(connections.isEmpty) + XCTAssertEqual(connections.stats, .init(connecting: 1)) + + XCTAssertEqual(requests.count, 1) + guard let firstRequest = requests.first else { return XCTFail("Expected to have a request here") } + + let newConnection = MockConnection(id: firstRequest.connectionID) + let (connectionIndex, establishedConnectionContext) = connections.newConnectionEstablished(newConnection, maxStreams: 1) + XCTAssertEqual(establishedConnectionContext.info, .idle(availableStreams: 1, newIdle: true)) + XCTAssertEqual(establishedConnectionContext.use, .persisted) + XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1)) + let timers = connections.parkConnection(at: connectionIndex) + let keepAliveTimer = TestPoolStateMachine.ConnectionTimer(timerID: 0, connectionID: firstRequest.connectionID, usecase: .keepAlive) + let keepAliveTimerCancellationToken = MockTimerCancellationToken(keepAliveTimer) + XCTAssertEqual(timers, [keepAliveTimer]) + XCTAssertNil(connections.timerScheduled(keepAliveTimer, cancelContinuation: keepAliveTimerCancellationToken)) + let keepAliveAction = connections.keepAliveIfIdle(newConnection.id) + XCTAssertEqual(keepAliveAction, .init(connection: newConnection, keepAliveTimerCancellationContinuation: keepAliveTimerCancellationToken)) + XCTAssertEqual(connections.stats, .init(idle: 1, runningKeepAlive: 1, availableStreams: 0)) + + guard let (_, afterPingIdleContext) = connections.keepAliveSucceeded(newConnection.id) else { + return XCTFail("Expected to receive an AvailableContext") + } + XCTAssertEqual(afterPingIdleContext.info, .idle(availableStreams: 1, newIdle: false)) + XCTAssertEqual(afterPingIdleContext.use, .persisted) + XCTAssertEqual(connections.stats, .init(idle: 1, availableStreams: 1)) + } +} diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionStateTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionStateTests.swift index b1622d0d..7751837e 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionStateTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionStateTests.swift @@ -10,19 +10,19 @@ final class PoolStateMachine_ConnectionStateTests: XCTestCase { let connectionID = 1 var state = TestConnectionState(id: connectionID) XCTAssertEqual(state.id, connectionID) - XCTAssertEqual(state.isIdleOrRunningKeepAlive, false) + XCTAssertEqual(state.isIdle, false) XCTAssertEqual(state.isAvailable, false) XCTAssertEqual(state.isConnected, false) XCTAssertEqual(state.isLeased, false) let connection = MockConnection(id: connectionID) XCTAssertEqual(state.connected(connection, maxStreams: 1), .idle(availableStreams: 1, newIdle: true)) - XCTAssertEqual(state.isIdleOrRunningKeepAlive, true) + XCTAssertEqual(state.isIdle, true) XCTAssertEqual(state.isAvailable, true) XCTAssertEqual(state.isConnected, true) XCTAssertEqual(state.isLeased, false) XCTAssertEqual(state.lease(streams: 1), .init(connection: connection, timersToCancel: .init(), wasIdle: true)) - XCTAssertEqual(state.isIdleOrRunningKeepAlive, false) + XCTAssertEqual(state.isIdle, false) XCTAssertEqual(state.isAvailable, false) XCTAssertEqual(state.isConnected, true) XCTAssertEqual(state.isLeased, true) @@ -257,7 +257,7 @@ final class PoolStateMachine_ConnectionStateTests: XCTestCase { XCTAssertNil(state.timerScheduled(keepAliveTimer, cancelContinuation: keepAliveTimerCancellationToken)) XCTAssertNil(state.timerScheduled(idleTimer, cancelContinuation: idleTimerCancellationToken)) - XCTAssertEqual(state.closeIfIdle(), .init(connection: connection, cancelTimers: [keepAliveTimerCancellationToken, idleTimerCancellationToken], maxStreams: 1)) + XCTAssertEqual(state.closeIfIdle(), .init(connection: connection, previousConnectionState: .idle, cancelTimers: [keepAliveTimerCancellationToken, idleTimerCancellationToken], usedStreams: 0, maxStreams: 1)) XCTAssertEqual(state.runKeepAliveIfIdle(reducesAvailableStreams: true), .none) }