From 7a7ec5b95d0c38b377551c31d03130cd9f1882f6 Mon Sep 17 00:00:00 2001 From: Warren James Date: Wed, 15 May 2024 15:44:17 -0400 Subject: [PATCH] fix(NODE-6171): RTT set to zero when serverMonitoringMode=stream (#4110) --- src/sdam/monitor.ts | 16 ++- .../server_discover_and_monitoring.test.ts | 113 ++++++++++++++++++ 2 files changed, 120 insertions(+), 9 deletions(-) diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 769c41d16d..baf75b6332 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -219,8 +219,8 @@ export class Monitor extends TypedEventEmitter { return this.rttSampler.min(); } - get latestRtt(): number { - return this.rttSampler.last ?? 0; // FIXME: Check if this is acceptable + get latestRtt(): number | null { + return this.rttSampler.last; } addRttSample(rtt: number) { @@ -304,7 +304,8 @@ function checkServer(monitor: Monitor, callback: Callback) { } // NOTE: here we use the latestRtt as this measurement corresponds with the value - // obtained for this successful heartbeat + // obtained for this successful heartbeat, if there is no latestRtt, then we calculate the + // duration const duration = isAwaitable && monitor.rttPinger ? monitor.rttPinger.latestRtt ?? calculateDurationInMs(start) @@ -498,7 +499,7 @@ export class RTTPinger { this[kCancellationToken] = monitor[kCancellationToken]; this.closed = false; this.monitor = monitor; - this.latestRtt = monitor.latestRtt; + this.latestRtt = monitor.latestRtt ?? undefined; const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS; this[kMonitorId] = setTimeout(() => this.measureRoundTripTime(), heartbeatFrequencyMS); @@ -520,10 +521,7 @@ export class RTTPinger { this.connection = undefined; } - private measureAndReschedule(start?: number, conn?: Connection) { - if (start == null) { - start = now(); - } + private measureAndReschedule(start: number, conn?: Connection) { if (this.closed) { conn?.destroy(); return; @@ -565,7 +563,7 @@ export class RTTPinger { connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND; // eslint-disable-next-line github/no-then connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then( - () => this.measureAndReschedule(), + () => this.measureAndReschedule(start), () => { this.connection?.destroy(); this.connection = undefined; diff --git a/test/integration/server-discovery-and-monitoring/server_discover_and_monitoring.test.ts b/test/integration/server-discovery-and-monitoring/server_discover_and_monitoring.test.ts index d0b3fc9944..81006928e9 100644 --- a/test/integration/server-discovery-and-monitoring/server_discover_and_monitoring.test.ts +++ b/test/integration/server-discovery-and-monitoring/server_discover_and_monitoring.test.ts @@ -1,3 +1,14 @@ +import { setTimeout } from 'node:timers/promises'; + +import { expect } from 'chai'; +import * as sinon from 'sinon'; + +import { + Connection, + type MongoClient, + promiseWithResolvers, + type ServerHeartbeatSucceededEvent +} from '../../mongodb'; import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; @@ -8,3 +19,105 @@ describe('SDAM Unified Tests (Node Driver)', function () { ); runUnifiedSuite(clonedAndAlteredSpecTests); }); + +describe('Monitoring rtt tests', function () { + let client: MongoClient; + let heartbeatDurations: Record; + const HEARTBEATS_TO_COLLECT_PER_NODE = 65; + const IGNORE_SIZE = 5; + const DELAY_MS = 10; + + beforeEach(function () { + heartbeatDurations = Object.create(null); + }); + + afterEach(async function () { + if (client) { + await client.close(); + } + sinon.restore(); + }); + + for (const serverMonitoringMode of ['poll', 'stream']) { + context(`when serverMonitoringMode is set to '${serverMonitoringMode}'`, function () { + context('after collecting a number of heartbeats', function () { + beforeEach(async function () { + client = this.configuration.newClient({ + heartbeatFrequencyMS: 100, + serverMonitoringMode + }); + + // make sendCommand delay for DELAY_MS ms to ensure that the actual time between sending + // a heartbeat and receiving a response don't drop below 1ms. This is done since our + // testing is colocated with its mongo deployment so network latency is very low + const stub = sinon + // @ts-expect-error accessing private method + .stub(Connection.prototype, 'sendCommand') + .callsFake(async function* (...args) { + await setTimeout(DELAY_MS); + yield* stub.wrappedMethod.call(this, ...args); + }); + await client.connect(); + + const { promise, resolve } = promiseWithResolvers(); + client.on('serverHeartbeatSucceeded', (ev: ServerHeartbeatSucceededEvent) => { + heartbeatDurations[ev.connectionId] ??= []; + if ( + heartbeatDurations[ev.connectionId].length < + HEARTBEATS_TO_COLLECT_PER_NODE + IGNORE_SIZE + ) + heartbeatDurations[ev.connectionId].push(ev.duration); + + // We ignore the first few heartbeats since the problem reported in NODE-6172 showed that the + // first few heartbeats were recorded properly + if ( + Object.keys(heartbeatDurations).length === client.topology.s.servers.size && + Object.values(heartbeatDurations).every( + d => d.length === HEARTBEATS_TO_COLLECT_PER_NODE + IGNORE_SIZE + ) + ) { + client.removeAllListeners('serverHeartbeatSucceeded'); + resolve(); + } + }); + await promise; + }); + + it( + 'heartbeat duration is not incorrectly reported as zero on ServerHeartbeatSucceededEvents', + { + metadata: { + requires: { topology: '!load-balanced' } + }, + test: async function () { + for (const durations of Object.values(heartbeatDurations)) { + const relevantDurations = durations.slice(IGNORE_SIZE); + expect(relevantDurations).to.have.length.gt(0); + const averageDuration = + relevantDurations.reduce((acc, x) => acc + x) / relevantDurations.length; + expect(averageDuration).to.be.gt(DELAY_MS); + } + } + } + ); + + it('ServerDescription.roundTripTime is not incorrectly reported as zero', { + metadata: { + requires: { topology: '!load-balanced' } + }, + test: async function () { + for (const [server, durations] of Object.entries(heartbeatDurations)) { + const relevantDurations = durations.slice(IGNORE_SIZE); + expect(relevantDurations).to.have.length.gt(0); + const averageDuration = + relevantDurations.reduce((acc, x) => acc + x) / relevantDurations.length; + const rtt = client.topology.description.servers.get(server).roundTripTime; + expect(rtt).to.not.equal(0); + expect(rtt).to.be.approximately(averageDuration, 3); + } + } + }); + }); + }); + } +});