Skip to content

Commit

Permalink
Add delegate for collecting eventloop tick metrics (#2608)
Browse files Browse the repository at this point in the history
* Add delegate for collecting eventloop tick metrics

* add warning to docc

* add missing headers

---------

Co-authored-by: Cory Benfield <lukasa@apple.com>
  • Loading branch information
hamzahrmalik and Lukasa committed Mar 27, 2024
1 parent 05fbcac commit 082ac21
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Sources/NIOPosix/LinuxCPUSet.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ import CNIOLinux
t.affinity = set
}
}
self.init(threadInitializers: initializers)
self.init(threadInitializers: initializers, metricsDelegate: nil)
}
}
#endif
46 changes: 39 additions & 7 deletions Sources/NIOPosix/MultiThreadedEventLoopGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
canEventLoopBeShutdownIndividually: Bool,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>,
initializer: @escaping ThreadInitializer,
metricsDelegate: NIOEventLoopMetricsDelegate?,
_ callback: @escaping (SelectableEventLoop) -> Void) {
assert(NIOThread.current == thread)
initializer(thread)
Expand All @@ -84,7 +85,8 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
let loop = SelectableEventLoop(thread: thread,
parentGroup: parentGroup,
selector: try selectorFactory(),
canBeShutdownIndividually: canEventLoopBeShutdownIndividually)
canBeShutdownIndividually: canEventLoopBeShutdownIndividually,
metricsDelegate: metricsDelegate)
threadSpecificEventLoop.currentValue = loop
defer {
threadSpecificEventLoop.currentValue = nil
Expand All @@ -101,7 +103,8 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
private static func setupThreadAndEventLoop(name: String,
parentGroup: MultiThreadedEventLoopGroup,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>,
initializer: @escaping ThreadInitializer) -> SelectableEventLoop {
initializer: @escaping ThreadInitializer,
metricsDelegate: NIOEventLoopMetricsDelegate?) -> SelectableEventLoop {
let lock = ConditionLock(value: 0)

/* synchronised by `lock` */
Expand All @@ -112,7 +115,8 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
parentGroup: parentGroup,
canEventLoopBeShutdownIndividually: false, // part of MTELG
selectorFactory: selectorFactory,
initializer: initializer) { l in
initializer: initializer,
metricsDelegate: metricsDelegate) { l in
lock.lock(whenValue: 0)
_loop = l
lock.unlock(withValue: 1)
Expand All @@ -135,6 +139,24 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
public convenience init(numberOfThreads: Int) {
self.init(numberOfThreads: numberOfThreads,
canBeShutDown: true,
metricsDelegate: nil,
selectorFactory: NIOPosix.Selector<NIORegistration>.init)
}

/// Creates a `MultiThreadedEventLoopGroup` instance which uses `numberOfThreads`.
///
/// - note: Don't forget to call `shutdownGracefully` or `syncShutdownGracefully` when you no longer need this
/// `EventLoopGroup`. If you forget to shut the `EventLoopGroup` down you will leak `numberOfThreads`
/// (kernel) threads which are costly resources. This is especially important in unit tests where one
/// `MultiThreadedEventLoopGroup` is started per test case.
///
/// - Parameters:
/// - numberOfThreads: The number of `Threads` to use.
/// - metricsDelegate: Delegate for collecting information from this eventloop
public convenience init(numberOfThreads: Int, metricsDelegate: NIOEventLoopMetricsDelegate) {
self.init(numberOfThreads: numberOfThreads,
canBeShutDown: true,
metricsDelegate: metricsDelegate,
selectorFactory: NIOPosix.Selector<NIORegistration>.init)
}

Expand All @@ -146,41 +168,48 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
return self.init(numberOfThreads: numberOfThreads,
canBeShutDown: false,
threadNamePrefix: threadNamePrefix,
metricsDelegate: nil,
selectorFactory: NIOPosix.Selector<NIORegistration>.init)
}

internal convenience init(numberOfThreads: Int,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(threadInitializers: initializers, canBeShutDown: true, selectorFactory: selectorFactory)
self.init(threadInitializers: initializers, canBeShutDown: true, metricsDelegate: metricsDelegate, selectorFactory: selectorFactory)
}

internal convenience init(numberOfThreads: Int,
canBeShutDown: Bool,
threadNamePrefix: String,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(threadInitializers: initializers,
canBeShutDown: canBeShutDown,
threadNamePrefix: threadNamePrefix,
metricsDelegate: metricsDelegate,
selectorFactory: selectorFactory)
}

internal convenience init(numberOfThreads: Int,
canBeShutDown: Bool,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(threadInitializers: initializers,
canBeShutDown: canBeShutDown,
metricsDelegate: metricsDelegate,
selectorFactory: selectorFactory)
}

internal convenience init(threadInitializers: [ThreadInitializer],
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>.init) {
self.init(threadInitializers: threadInitializers, canBeShutDown: true, selectorFactory: selectorFactory)
self.init(threadInitializers: threadInitializers, canBeShutDown: true, metricsDelegate: metricsDelegate, selectorFactory: selectorFactory)
}

/// Creates a `MultiThreadedEventLoopGroup` instance which uses the given `ThreadInitializer`s. One `NIOThread` per `ThreadInitializer` is created and used.
Expand All @@ -190,6 +219,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
internal init(threadInitializers: [ThreadInitializer],
canBeShutDown: Bool,
threadNamePrefix: String = "NIO-ELT-",
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>.init) {
self.threadNamePrefix = threadNamePrefix
let myGroupID = nextEventLoopGroupID.loadThenWrappingIncrement(ordering: .relaxed)
Expand All @@ -202,7 +232,8 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
let ev = MultiThreadedEventLoopGroup.setupThreadAndEventLoop(name: "\(threadNamePrefix)\(myGroupID)-#\(idx)",
parentGroup: self,
selectorFactory: selectorFactory,
initializer: initializer)
initializer: initializer,
metricsDelegate: metricsDelegate)
idx += 1
return ev
}
Expand Down Expand Up @@ -374,7 +405,8 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
parentGroup: nil,
canEventLoopBeShutdownIndividually: true,
selectorFactory: NIOPosix.Selector<NIORegistration>.init,
initializer: { _ in }) { loop in
initializer: { _ in },
metricsDelegate: nil) { loop in
loop.assertInEventLoop()
callback(loop)
}
Expand Down
48 changes: 45 additions & 3 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,31 @@ internal func withAutoReleasePool<T>(_ execute: () throws -> T) rethrows -> T {
#endif
}

/// Information about an EventLoop tick
public struct NIOEventLoopTickInfo: Sendable, Hashable {
/// The eventloop which ticked
public var eventLoopID: ObjectIdentifier
/// The number of tasks which were executed in this tick
public var numberOfTasks: Int
/// The time at which the tick began
public var startTime: NIODeadline

internal init(eventLoopID: ObjectIdentifier, numberOfTasks: Int, startTime: NIODeadline) {
self.eventLoopID = eventLoopID
self.numberOfTasks = numberOfTasks
self.startTime = startTime
}
}

/// Implement this delegate to receive information about the EventLoop, such as each tick
public protocol NIOEventLoopMetricsDelegate: Sendable {
/// Called after a tick has run
/// This function is called after every tick - avoid long-running tasks here
/// - Warning: This function is called after every event loop tick and on the event loop thread. Any non-trivial work in this function will block the event loop and cause latency increases and performance degradation.
/// - Parameter info: Information about the tick, such as how many tasks were executed
func processedTick(info: NIOEventLoopTickInfo)
}

/// `EventLoop` implementation that uses a `Selector` to get notified once there is more I/O or tasks to process.
/// The whole processing of I/O and tasks is done by a `NIOThread` that is tied to the `SelectableEventLoop`. This `NIOThread`
/// is guaranteed to never change!
Expand Down Expand Up @@ -121,6 +146,8 @@ internal final class SelectableEventLoop: EventLoop {
private let promiseCreationStoreLock = NIOLock()
private var _promiseCreationStore: [_NIOEventLoopFutureIdentifier: (file: StaticString, line: UInt)] = [:]

private let metricsDelegate: (any NIOEventLoopMetricsDelegate)?

@usableFromInline
internal func _promiseCreated(futureIdentifier: _NIOEventLoopFutureIdentifier, file: StaticString, line: UInt) {
precondition(_isDebugAssertConfiguration())
Expand Down Expand Up @@ -183,7 +210,9 @@ Further information:
internal init(thread: NIOThread,
parentGroup: MultiThreadedEventLoopGroup?, /* nil iff thread take-over */
selector: NIOPosix.Selector<NIORegistration>,
canBeShutdownIndividually: Bool) {
canBeShutdownIndividually: Bool,
metricsDelegate: NIOEventLoopMetricsDelegate?) {
self.metricsDelegate = metricsDelegate
self._parentGroup = parentGroup
self._selector = selector
self.thread = thread
Expand Down Expand Up @@ -562,7 +591,13 @@ Further information:
return nextDeadline
}

private func runLoop() -> NIODeadline? {
private func runLoop(selfIdentifier: ObjectIdentifier) -> NIODeadline? {
let tickStartTime: NIODeadline = .now()
var tasksProcessedInTick = 0
defer {
let tickInfo = NIOEventLoopTickInfo(eventLoopID: selfIdentifier, numberOfTasks: tasksProcessedInTick, startTime: tickStartTime)
self.metricsDelegate?.processedTick(info: tickInfo)
}
while true {
let nextReadyDeadline = self._tasksLock.withLock { () -> NIODeadline? in
let deadline = Self._popTasksLocked(
Expand All @@ -583,6 +618,12 @@ Further information:
}

// Execute all the tasks that were submitted
let (partialTotal, totalOverflowed) = tasksProcessedInTick.addingReportingOverflow(self.tasksCopy.count)
if totalOverflowed {
tasksProcessedInTick = Int.max
} else {
tasksProcessedInTick = partialTotal
}
for task in self.tasksCopy {
self.run(task)
}
Expand Down Expand Up @@ -646,6 +687,7 @@ Further information:
nextReadyDeadline = NIODeadline.now()
}
}
let selfIdentifier = ObjectIdentifier(self)
while self.internalState != .noLongerRunning && self.internalState != .exitingThread {
// Block until there are events to handle or the selector was woken up
/* for macOS: in case any calls we make to Foundation put objects into an autoreleasepool */
Expand Down Expand Up @@ -673,7 +715,7 @@ Further information:
}
}
}
nextReadyDeadline = runLoop()
nextReadyDeadline = runLoop(selfIdentifier: selfIdentifier)
}

// This EventLoop was closed so also close the underlying selector.
Expand Down
71 changes: 71 additions & 0 deletions Tests/NIOPosixTests/EventLoopMetricsDelegateTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@testable import NIOPosix
import NIOCore
import NIOConcurrencyHelpers
import XCTest

final class RecorderDelegate: NIOEventLoopMetricsDelegate, Sendable {

private let _infos: NIOLockedValueBox<[NIOEventLoopTickInfo]> = .init([])

var infos: [NIOEventLoopTickInfo] {
_infos.withLockedValue {$0 }
}

func processedTick(info: NIOPosix.NIOEventLoopTickInfo) {
_infos.withLockedValue {
$0.append(info)
}
}
}

final class EventLoopMetricsDelegateTests: XCTestCase {
func testMetricsDelegateNotCalledWhenNoEvents() {
let delegate = RecorderDelegate()
_ = MultiThreadedEventLoopGroup(numberOfThreads: 1, metricsDelegate: delegate)
XCTAssertEqual(delegate.infos.count, 0)
}

func testMetricsDelegateTickInfo() {
let delegate = RecorderDelegate()
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1, metricsDelegate: delegate)
let el = elg.any()
let testStartTime = NIODeadline.now()

XCTAssertEqual(delegate.infos.count, 0)

let promise = el.makePromise(of: Void.self)
el.scheduleTask(in: .seconds(1)) {
promise.succeed()
}
promise.futureResult.whenSuccess {
// There are 3 tasks (scheduleTask, whenSuccess, wait) which can trigger a total of 1...3 ticks
XCTAssertTrue((1...3).contains(delegate.infos.count), "Expected 1...3 ticks, got \(delegate.infos.count)")
// the total number of tasks across these ticks should be either 2 or 3
let totalTasks = delegate.infos.map { $0.numberOfTasks }.reduce(0, { $0 + $1 })
XCTAssertTrue((2...3).contains(totalTasks), "Expected 2...3 tasks, got \(totalTasks)")
for info in delegate.infos {
XCTAssertEqual(info.eventLoopID, ObjectIdentifier(el))
}
if let lastTickStartTime = delegate.infos.last?.startTime {
let timeSinceStart = lastTickStartTime - testStartTime
XCTAssertLessThan(timeSinceStart.nanoseconds, 100_000_000) // This should be near instant, limiting to 100ms
XCTAssertGreaterThan(timeSinceStart.nanoseconds, 0)
}
}
try? promise.futureResult.wait()
}
}
4 changes: 2 additions & 2 deletions Tests/NIOPosixTests/EventLoopTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ public final class EventLoopTest : XCTestCase {
}
let threads: [ThreadInitializer] = [body, body]

let group = MultiThreadedEventLoopGroup(threadInitializers: threads)
let group = MultiThreadedEventLoopGroup(threadInitializers: threads, metricsDelegate: nil)

XCTAssertEqual(2, counter)
XCTAssertNoThrow(try group.syncShutdownGracefully())
Expand All @@ -614,7 +614,7 @@ public final class EventLoopTest : XCTestCase {
}
let threads: [ThreadInitializer] = [body, body]

let group = MultiThreadedEventLoopGroup(threadInitializers: threads)
let group = MultiThreadedEventLoopGroup(threadInitializers: threads, metricsDelegate: nil)

XCTAssertNoThrow(try group.syncShutdownGracefully())
#endif
Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOPosixTests/SyscallAbstractionLayer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ extension SALTest {
}
}
self.wakeups = .init(description: "wakeups")
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1, metricsDelegate: nil) {
try HookedSelector(userToKernel: self.userToKernelBox,
kernelToUser: self.kernelToUserBox,
wakeups: self.wakeups)
Expand Down

0 comments on commit 082ac21

Please sign in to comment.