Skip to content

Commit

Permalink
[7.11][Telemetry] Diagnostic Alert Telemetry (#84422)
Browse files Browse the repository at this point in the history
* Port @tsg's work on task manager.

Remove 2nd var to track telemetry opt in.

Add ES client to start querying index.

Use query to get docs from a dummy index.

Change how index is queried.

Get diagnostic alerts to send to staging cluster.

Record last timestamp.

PoC on telemetry opt in via 2 processes.

Revert to original solution

* Update on agreed method. Fixes race condition.

* Expand wildcards.

* stage.

* Add rule.ruleset collection.

* Update telemetry sender with correct query for loading diag alerts.

* Add similar task tests to endpont artifact work.

* Fix broken import statement.

* Create sender mocks.

* Update test to check for func call.

* Update unused reference.

* record last run.

* Update index.

* fix import

* Fix test.

* test fix.

* Pass unit to time diff calc.

* Tests should pass now hopefully.

* Add additional process fields to allowlist.

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
pjhampton and kibanamachine committed Dec 10, 2020
1 parent 692247f commit 6e7fb4a
Show file tree
Hide file tree
Showing 5 changed files with 383 additions and 8 deletions.
38 changes: 38 additions & 0 deletions x-pack/plugins/security_solution/server/lib/telemetry/mocks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TelemetryEventsSender } from './sender';
import { TelemetryDiagTask } from './task';

/**
* Creates a mocked Telemetry Events Sender
*/
export const createMockTelemetryEventsSender = (
enableTelemtry: boolean
): jest.Mocked<TelemetryEventsSender> => {
return ({
setup: jest.fn(),
start: jest.fn(),
stop: jest.fn(),
fetchDiagnosticAlerts: jest.fn(),
queueTelemetryEvents: jest.fn(),
processEvents: jest.fn(),
isTelemetryOptedIn: jest.fn().mockReturnValue(enableTelemtry ?? jest.fn()),
sendIfDue: jest.fn(),
fetchClusterInfo: jest.fn(),
fetchTelemetryUrl: jest.fn(),
fetchLicenseInfo: jest.fn(),
copyLicenseFields: jest.fn(),
sendEvents: jest.fn(),
} as unknown) as jest.Mocked<TelemetryEventsSender>;
};

/**
* Creates a mocked Telemetry Diagnostic Task
*/
export class MockTelemetryDiagnosticTask extends TelemetryDiagTask {
public runTask = jest.fn();
}
79 changes: 73 additions & 6 deletions x-pack/plugins/security_solution/server/lib/telemetry/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import {
TelemetryPluginStart,
TelemetryPluginSetup,
} from '../../../../../../src/plugins/telemetry/server';
import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../task_manager/server';
import { TelemetryDiagTask } from './task';

export type SearchTypes =
| string
Expand Down Expand Up @@ -56,20 +61,34 @@ export class TelemetryEventsSender {
private isSending = false;
private queue: TelemetryEvent[] = [];
private isOptedIn?: boolean = true; // Assume true until the first check
private diagTask?: TelemetryDiagTask;

constructor(logger: Logger) {
this.logger = logger.get('telemetry_events');
}

public setup(telemetrySetup?: TelemetryPluginSetup) {
public setup(telemetrySetup?: TelemetryPluginSetup, taskManager?: TaskManagerSetupContract) {
this.telemetrySetup = telemetrySetup;

if (taskManager) {
this.diagTask = new TelemetryDiagTask(this.logger, taskManager, this);
}
}

public start(core?: CoreStart, telemetryStart?: TelemetryPluginStart) {
public start(
core?: CoreStart,
telemetryStart?: TelemetryPluginStart,
taskManager?: TaskManagerStartContract
) {
this.telemetryStart = telemetryStart;
this.core = core;

this.logger.debug(`Starting task`);
if (taskManager && this.diagTask) {
this.logger.debug(`Starting diag task`);
this.diagTask.start(taskManager);
}

this.logger.debug(`Starting local task`);
setTimeout(() => {
this.sendIfDue();
this.intervalId = setInterval(() => this.sendIfDue(), this.checkIntervalMs);
Expand All @@ -82,6 +101,38 @@ export class TelemetryEventsSender {
}
}

public async fetchDiagnosticAlerts(executeFrom: string, executeTo: string) {
const query = {
expand_wildcards: 'open,hidden',
index: 'logs-endpoint.diagnostic.collection-*',
ignore_unavailable: true,
size: this.maxQueueSize,
body: {
query: {
range: {
'event.ingested': {
gte: executeFrom,
lt: executeTo,
},
},
},
sort: [
{
'event.ingested': {
order: 'desc',
},
},
],
},
};

if (!this.core) {
throw Error('could not fetch diagnostic alerts. core is not available');
}
const callCluster = this.core.elasticsearch.legacy.client.callAsInternalUser;
return callCluster('search', query);
}

public queueTelemetryEvents(events: TelemetryEvent[]) {
const qlength = this.queue.length;

Expand Down Expand Up @@ -109,6 +160,11 @@ export class TelemetryEventsSender {
});
}

public async isTelemetryOptedIn() {
this.isOptedIn = await this.telemetryStart?.getIsOptedIn();
return this.isOptedIn === true;
}

private async sendIfDue() {
if (this.isSending) {
return;
Expand All @@ -121,9 +177,7 @@ export class TelemetryEventsSender {
try {
this.isSending = true;

// Checking opt-in status is relatively expensive (calls a saved-object), so
// we only check it when we have things to send.
this.isOptedIn = await this.telemetryStart?.getIsOptedIn();
this.isOptedIn = await this.isTelemetryOptedIn();
if (!this.isOptedIn) {
this.logger.debug(`Telemetry is not opted-in.`);
this.queue = [];
Expand Down Expand Up @@ -245,9 +299,14 @@ const allowlistEventFields: AllowlistFields = {
'@timestamp': true,
agent: true,
Endpoint: true,
Ransomware: true,
data_stream: true,
ecs: true,
elastic: true,
event: true,
rule: {
ruleset: true,
},
file: {
name: true,
path: true,
Expand All @@ -270,6 +329,8 @@ const allowlistEventFields: AllowlistFields = {
executable: true,
command_line: true,
hash: true,
pid: true,
uptime: true,
Ext: {
code_signature: true,
},
Expand All @@ -281,6 +342,12 @@ const allowlistEventFields: AllowlistFields = {
Ext: {
code_signature: true,
},
uptime: true,
pid: true,
ppid: true,
},
token: {
integrity_level_name: true,
},
},
};
Expand Down
149 changes: 149 additions & 0 deletions x-pack/plugins/security_solution/server/lib/telemetry/task.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { loggingSystemMock } from 'src/core/server/mocks';

import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { TaskStatus } from '../../../../task_manager/server';

import { TelemetryDiagTask, TelemetryDiagTaskConstants } from './task';
import { createMockTelemetryEventsSender, MockTelemetryDiagnosticTask } from './mocks';

describe('test', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;

beforeEach(() => {
logger = loggingSystemMock.createLogger();
});

describe('basic diagnostic alert telemetry sanity checks', () => {
test('task can register', () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);

expect(telemetryDiagTask).toBeInstanceOf(TelemetryDiagTask);
});
});

test('diagnostic task should be registered', () => {
const mockTaskManager = taskManagerMock.createSetup();
new TelemetryDiagTask(logger, mockTaskManager, createMockTelemetryEventsSender(true));

expect(mockTaskManager.registerTaskDefinitions).toHaveBeenCalled();
});

test('task should be scheduled', async () => {
const mockTaskManagerSetup = taskManagerMock.createSetup();
const telemetryDiagTask = new TelemetryDiagTask(
logger,
mockTaskManagerSetup,
createMockTelemetryEventsSender(true)
);

const mockTaskManagerStart = taskManagerMock.createStart();
await telemetryDiagTask.start(mockTaskManagerStart);
expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled();
});

test('task should run', async () => {
const mockContext = createMockTelemetryEventsSender(true);
const mockTaskManager = taskManagerMock.createSetup();
const telemetryDiagTask = new MockTelemetryDiagnosticTask(logger, mockTaskManager, mockContext);

const mockTaskInstance = {
id: TelemetryDiagTaskConstants.TYPE,
runAt: new Date(),
attempts: 0,
ownerId: '',
status: TaskStatus.Running,
startedAt: new Date(),
scheduledAt: new Date(),
retryAt: new Date(),
params: {},
state: {},
taskType: TelemetryDiagTaskConstants.TYPE,
};
const createTaskRunner =
mockTaskManager.registerTaskDefinitions.mock.calls[0][0][TelemetryDiagTaskConstants.TYPE]
.createTaskRunner;
const taskRunner = createTaskRunner({ taskInstance: mockTaskInstance });
await taskRunner.run();
expect(telemetryDiagTask.runTask).toHaveBeenCalled();
});

test('task should not query elastic if telemetry is not opted in', async () => {
const mockSender = createMockTelemetryEventsSender(false);
const mockTaskManager = taskManagerMock.createSetup();
new MockTelemetryDiagnosticTask(logger, mockTaskManager, mockSender);

const mockTaskInstance = {
id: TelemetryDiagTaskConstants.TYPE,
runAt: new Date(),
attempts: 0,
ownerId: '',
status: TaskStatus.Running,
startedAt: new Date(),
scheduledAt: new Date(),
retryAt: new Date(),
params: {},
state: {},
taskType: TelemetryDiagTaskConstants.TYPE,
};
const createTaskRunner =
mockTaskManager.registerTaskDefinitions.mock.calls[0][0][TelemetryDiagTaskConstants.TYPE]
.createTaskRunner;
const taskRunner = createTaskRunner({ taskInstance: mockTaskInstance });
await taskRunner.run();
expect(mockSender.fetchDiagnosticAlerts).not.toHaveBeenCalled();
});

test('test -5 mins is returned when there is no previous task run', async () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);

const executeTo = moment().utc().toISOString();
const executeFrom = undefined;
const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom);

expect(newExecuteFrom).toEqual(moment(executeTo).subtract(5, 'minutes').toISOString());
});

test('test -6 mins is returned when there was a previous task run', async () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);

const executeTo = moment().utc().toISOString();
const executeFrom = moment(executeTo).subtract(6, 'minutes').toISOString();
const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom);

expect(newExecuteFrom).toEqual(executeFrom);
});

// it's possible if Kibana is down for a prolonged period the stored lastRun would have drifted
// if that is the case we will just roll it back to a 10 min search window
test('test 10 mins is returned when previous task run took longer than 10 minutes', async () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);

const executeTo = moment().utc().toISOString();
const executeFrom = moment(executeTo).subtract(142, 'minutes').toISOString();
const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom);

expect(newExecuteFrom).toEqual(moment(executeTo).subtract(10, 'minutes').toISOString());
});
});
Loading

0 comments on commit 6e7fb4a

Please sign in to comment.