Skip to content

Commit

Permalink
feat: choo-choo train (#232)
Browse files Browse the repository at this point in the history
* feat: allow reinitialization of open telemetry context singleton

Co-authored-by: Francine Lucca <francinelucca@users.noreply.github.com>

* fix(core): limit concurrent runCommand invocation to 5

Co-authored-by: Francine Lucca <francinelucca@users.noreply.github.com>

* feat: first pass at choo-choo train

- re-instantiate otel context for each IbmTelemetry instance
- add cwd to environment class tracked file enumeration
- enhance logger to accept multiple objects to log in same call
- add choo choo train logic for server and client first pass

* feat: add ChooChooTrain class to separate logic from background process

* fix: add missing comment to runBackgroundProcess

* fix: address code review comments

we increased the backlog and didn't see an uptick in resources

---------

Co-authored-by: Francine Lucca <francinelucca@users.noreply.github.com>
  • Loading branch information
jharvey10 and francinelucca committed Apr 18, 2024
1 parent 9870632 commit 3217a91
Show file tree
Hide file tree
Showing 29 changed files with 341 additions and 90 deletions.
21 changes: 13 additions & 8 deletions src/main/background-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@
* This source code is licensed under the Apache-2.0 license found in the
* LICENSE file in the root directory of this source tree.
*/
import configSchemaJson from '@ibm/telemetry-config-schema/config.schema.json' assert { type: 'json' }
import * as os from 'node:os'
import * as path from 'node:path'

import * as commander from 'commander'

import { ChooChooTrain } from './core/choo-choo-train.js'
import { Environment } from './core/environment.js'
import { createLogFilePath } from './core/log/create-log-file-path.js'
import { Logger } from './core/log/logger.js'
import { IbmTelemetry } from './ibm-telemetry.js'

interface CommandLineOptions {
config: string
log?: string
}

const IPC_ADDR = path.join(os.tmpdir(), 'ibmtelemetry-ipc')

const { Command } = commander

/**
Expand All @@ -28,7 +32,7 @@ function run() {
.description('Collect telemetry data for a package.')
.requiredOption('--config <config-path>', 'Path to a telemetry configuration file')
.option('--log <log-path>', 'Path to temp log file')
.action(collect)
.action(runBackgroundProcess)

program.parseAsync().catch((err) => {
// As a failsafe, catch any uncaught exception, print it to stderr, and silently exit
Expand All @@ -45,25 +49,26 @@ function run() {
*
* @param opts - The command line options provided when the program was executed.
*/
async function collect(opts: CommandLineOptions) {
async function runBackgroundProcess(opts: CommandLineOptions) {
const date = new Date().toISOString()
const logFilePath = opts.log ?? createLogFilePath(date)
const logger = new Logger(logFilePath)
const environment = new Environment()

const ibmTelemetry = new IbmTelemetry(opts.config, configSchemaJson, environment, logger)
logger.traceEnter('', 'runBackgroundProcess', [opts])

const chooChooTrain = new ChooChooTrain(IPC_ADDR, new Environment(), opts.config, logger)

try {
await ibmTelemetry.run()
await chooChooTrain.run()
} catch (err) {
// Catch any exception thrown, log it, and quietly exit
if (err instanceof Error) {
logger.error(err)
} else {
logger.error(String(err))
}
}

logger.traceExit('', 'runBackgroundProcess', undefined)
await logger.close()
}

Expand Down
207 changes: 207 additions & 0 deletions src/main/core/choo-choo-train.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright IBM Corp. 2024, 2024
*
* This source code is licensed under the Apache-2.0 license found in the
* LICENSE file in the root directory of this source tree.
*/
import * as net from 'node:net'

import configSchemaJson from '@ibm/telemetry-config-schema/config.schema.json' assert { type: 'json' }

import { IbmTelemetry } from '../ibm-telemetry.js'
import { Environment } from './environment.js'
import { Loggable } from './log/loggable.js'
import type { Logger } from './log/logger.js'
import { Trace } from './log/trace.js'

const MAX_RETRIES = 3
const MAX_BACKLOG = 64

interface Work {
cwd: string
configFilePath: string
}

/**
* Encapsulates all logic for orchestrating the running of multiple telemetry processes.
*/
export class ChooChooTrain extends Loggable {
private readonly workQueue: Work[] = []
private readonly ipcAddr: string

/**
* Constructs a new ChooChooTrain instance.
*
* @param ipcAddr - The address of the IPC pipe.
* @param environment - Environment variable configuration for this run.
* @param configFilePath - Path to a config file.
* @param logger - A logger instance.
*/
public constructor(
ipcAddr: string,
environment: Environment,
configFilePath: string,
logger: Logger
) {
super(logger)

this.ipcAddr = ipcAddr

this.workQueue.push({ cwd: environment.cwd, configFilePath })
}

/**
* Establishes ourself as either the conductor or a client.
* If we are the conductor, run all work in the queue (including our work).
* If we are a client, send our work to the conductor.
*/
public async run(): Promise<void> {
let connection: net.Socket | net.Server | undefined

for (let i = 0; i < MAX_RETRIES && !connection; i++) {
// Try to be the server
try {
connection = await this.createServerSocket(this.handleServerConnection.bind(this))
} catch {}

if (!connection) {
// Try to be the client
try {
connection = await this.createClientSocket()
} catch {}
}
}

// give up 🥲
if (!connection) {
this.logger.debug('Could not establish server or client connection. Exiting')
return
}

try {
if (connection instanceof net.Server) {
await this.doWork(connection)
} else {
await this.sendWorkToConductor(connection)
}
} finally {
await this.logger.close()
}
}

@Trace({ argFormatter: () => '[onConnect]' })
private async createServerSocket(onConnect: (socket: net.Socket) => void): Promise<net.Server> {
return new Promise((resolve, reject) => {
const server = net.createServer({})

server.on('connection', onConnect)
server.on('listening', () => {
resolve(server)
})
server.on('error', reject)

// Set up signal handler to gracefully close the IPC socket
process.on('SIGINT', server.close)
process.on('SIGTERM', server.close)

server.listen(this.ipcAddr, MAX_BACKLOG)
})
}

@Trace()
private createClientSocket(): Promise<net.Socket> {
return new Promise((resolve, reject) => {
const socket = net.connect(this.ipcAddr)

socket.on('connect', () => resolve(socket))
socket.on('error', reject)
})
}

@Trace({ argFormatter: () => '[socket]' })
private handleServerConnection(socket: net.Socket) {
let buf = ''

socket.on('data', (data) => {
buf += data.toString()
})

socket.on('close', () => {
const obj = JSON.parse(buf)

this.workQueue.push(obj)
})
}

/**
* We are the client. Send work through the IPC pipe to the conductor.
*
* @param socket - Client socket connection to use to communicate to server.
* @returns Void.
*/
@Trace({ argFormatter: () => '[socket]' })
private sendWorkToConductor(socket: net.Socket) {
return new Promise((resolve, reject) => {
const work = this.workQueue.shift()

this.logger.debug('Sending work through IPC: ', JSON.stringify(work))

socket.on('close', resolve)
socket.on('error', reject)
socket.on('timeout', reject)

socket.write(Buffer.from(JSON.stringify(work)))
socket.end()
})
}

@Trace({ argFormatter: () => '[server]' })
private async doWork(server: net.Server) {
this.logger.debug(
'We are the conductor of the choo-choo train. Running all available work in queue'
)

// Consume work until the queue is empty
while (this.workQueue.length > 0) {
this.logger.debug('Queue length', this.workQueue.length)

const currentWork = this.workQueue.shift()

if (!currentWork) {
return
}

// collect for current work
await this.collect(new Environment({ cwd: currentWork.cwd }), currentWork.configFilePath)
}

server.close()
}

/**
* This is the main entrypoint for telemetry collection.
*
* @param environment - Environment variable configuration for this run.
* @param configFilePath - Path to a config file.
*/
@Trace()
private async collect(environment: Environment, configFilePath: string) {
const ibmTelemetry = new IbmTelemetry(
configFilePath,
configSchemaJson,
environment,
this.logger
)

try {
await ibmTelemetry.run()
} catch (err) {
// Catch any exception thrown, log it, and quietly exit
if (err instanceof Error) {
this.logger.error(err)
} else {
this.logger.error(String(err))
}
}
}
}
10 changes: 10 additions & 0 deletions src/main/core/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import { isCI } from 'ci-info'

interface EnvironmentConfig {
cwd?: string
isCI?: boolean
isExportEnabled?: boolean
isTelemetryEnabled?: boolean
Expand All @@ -32,10 +33,16 @@ export class Environment {
*/
readonly isTelemetryEnabled: boolean

/**
* Working directory to run process on.
*/
readonly cwd: string

constructor(config?: EnvironmentConfig) {
this.isCI = isCI
this.isExportEnabled = process.env['IBM_TELEMETRY_EXPORT_DISABLED'] !== 'true'
this.isTelemetryEnabled = process.env['IBM_TELEMETRY_DISABLED'] !== 'true'
this.cwd = process.cwd()

// Config object supersedes environment variable values

Expand All @@ -48,5 +55,8 @@ export class Environment {
if (config?.isTelemetryEnabled !== undefined) {
this.isTelemetryEnabled = config.isTelemetryEnabled
}
if (config?.cwd !== undefined) {
this.cwd = config.cwd
}
}
}
23 changes: 16 additions & 7 deletions src/main/core/get-tracked-source-files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import { TrackedFileEnumerator } from './tracked-file-enumerator.js'
* Gets all tracked source files to consider for data collection,
* filtered by supplied file extension array.
*
* @param cwd - Working directory for underlying command execution.
* @param root - Root directory in which to search for tracked source files. This is an absolute
* path.
* @param logger - Logger instance to use.
* @param fileExtensions - List of file extensions to filter files by.
* @returns An array of source file objects.
*/
export async function getTrackedSourceFiles(
cwd: string,
root: string,
logger: Logger,
fileExtensions: string[]
Expand All @@ -37,17 +39,24 @@ export async function getTrackedSourceFiles(
files.push(root)
} else {
files.push(
...(await fileEnumerator.find(root, (file) => fileExtensions.includes(path.extname(file))))
...(await fileEnumerator.find(cwd, root, (file) =>
fileExtensions.includes(path.extname(file))
))
)
}

const promises = files.map(async (file) => {
return ts.createSourceFile(
file,
(await readFile(file)).toString(),
ts.ScriptTarget.ES2021,
/* setParentNodes */ true
)
return {
fileName: file,
async createSourceFile(): Promise<ts.SourceFile> {
return ts.createSourceFile(
file,
(await readFile(file)).toString(),
ts.ScriptTarget.ES2021,
/* setParentNodes */ true
)
}
}
})

const results = await Promise.all(promises)
Expand Down
13 changes: 7 additions & 6 deletions src/main/core/log/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ export class Logger {
* Debug logs a given log message.
*
* @param msg - The message to log.
* @param others - Other (optional) messages.
*/
public debug(msg: string) {
this.log('debug', msg)
public debug(msg: string, ...others: Array<string | number>) {
this.log('debug', [msg, ...others])
}

/**
Expand All @@ -78,7 +79,7 @@ export class Logger {
entry = msg
}

this.log('error', entry)
this.log('error', [entry])
}

/**
Expand Down Expand Up @@ -133,11 +134,11 @@ export class Logger {
* Logs a given message to the log file.
*
* @param level - 'debug' or 'error'.
* @param msg - Message to log.
* @param msgs - Messages to log.
*/
private log(level: Level, msg: string) {
private log(level: Level, msgs: Array<string | number>) {
const date = new Date().toISOString()

this.#buffer.push(level + ' ' + process.pid + ' ' + date + ' ' + msg + '\n')
this.#buffer.push(level + ' ' + process.pid + ' ' + date + ' ' + msgs.join(' ') + '\n')
}
}
Loading

0 comments on commit 3217a91

Please sign in to comment.