Skip to content

Commit

Permalink
[APM] Upgrade ES client
Browse files Browse the repository at this point in the history
  • Loading branch information
dgieselaar committed Dec 19, 2020
1 parent e8b21bc commit 7dfe543
Show file tree
Hide file tree
Showing 21 changed files with 316 additions and 191 deletions.
12 changes: 7 additions & 5 deletions x-pack/plugins/apm/scripts/upload-telemetry-data/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { argv } from 'yargs';
import { Logger } from 'kibana/server';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { CollectTelemetryParams } from '../../server/lib/apm_telemetry/collect_data_telemetry';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { unwrapEsResponse } from '../../../observability/server/utils/unwrap_es_response';
import { downloadTelemetryTemplate } from '../shared/download-telemetry-template';
import { mergeApmTelemetryMapping } from '../../common/apm_telemetry';
import { generateSampleDocuments } from './generate-sample-documents';
Expand Down Expand Up @@ -80,18 +82,18 @@ async function uploadData() {
apmAgentConfigurationIndex: '.apm-agent-configuration',
},
search: (body) => {
return client.search(body as any).then((res) => res.body as any);
return unwrapEsResponse(client.search<any>(body));
},
indicesStats: (body) => {
return client.indices.stats(body as any).then((res) => res.body);
return unwrapEsResponse(client.indices.stats<any>(body));
},
transportRequest: ((params) => {
return client.transport
.request({
return unwrapEsResponse(
client.transport.request({
method: params.method,
path: params.path,
})
.then((res) => res.body);
);
}) as CollectTelemetryParams['transportRequest'],
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { merge } from 'lodash';
import { Logger, LegacyCallAPIOptions } from 'kibana/server';
import { IndicesStatsParams, Client } from 'elasticsearch';
import { Logger } from 'kibana/server';
import { RequestParams } from '@elastic/elasticsearch';
import {
ESSearchRequest,
ESSearchResponse,
Expand All @@ -20,9 +20,16 @@ type TelemetryTaskExecutor = (params: {
params: TSearchRequest
): Promise<ESSearchResponse<unknown, TSearchRequest>>;
indicesStats(
params: IndicesStatsParams,
options?: LegacyCallAPIOptions
): ReturnType<Client['indices']['stats']>;
params: RequestParams.IndicesStats
// promise returned by client has an abort property
): Promise<{
_all?: {
total?: { store?: { size_in_bytes?: number }; docs?: { count?: number } };
};
_shards?: {
total?: number;
};
}>;
transportRequest: (params: {
path: string;
method: 'get';
Expand Down
22 changes: 9 additions & 13 deletions x-pack/plugins/apm/server/lib/apm_telemetry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
Logger,
SavedObjectsErrorHelpers,
} from '../../../../../../src/core/server';
import { unwrapEsResponse } from '../../../../observability/server';
import { APMConfig } from '../..';
import {
TaskManagerSetupContract,
Expand Down Expand Up @@ -65,27 +66,22 @@ export async function createApmTelemetry({
const collectAndStore = async () => {
const config = await config$.pipe(take(1)).toPromise();
const [{ elasticsearch }] = await core.getStartServices();
const esClient = elasticsearch.legacy.client;
const esClient = elasticsearch.client;

const indices = await getApmIndices({
config,
savedObjectsClient,
});

const search = esClient.callAsInternalUser.bind(
esClient,
'search'
) as CollectTelemetryParams['search'];
const search: CollectTelemetryParams['search'] = (params) =>
unwrapEsResponse(esClient.asInternalUser.search<any>(params));

const indicesStats = esClient.callAsInternalUser.bind(
esClient,
'indices.stats'
) as CollectTelemetryParams['indicesStats'];
const indicesStats: CollectTelemetryParams['indicesStats'] = (params) =>
unwrapEsResponse(esClient.asInternalUser.indices.stats(params));

const transportRequest = esClient.callAsInternalUser.bind(
esClient,
'transport.request'
) as CollectTelemetryParams['transportRequest'];
const transportRequest: CollectTelemetryParams['transportRequest'] = (
params
) => unwrapEsResponse(esClient.asInternalUser.transport.request(params));

const dataTelemetry = await collectDataTelemetry({
search,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,31 @@
/* eslint-disable no-console */

import chalk from 'chalk';
import {
LegacyAPICaller,
KibanaRequest,
} from '../../../../../../../src/core/server';
import { KibanaRequest } from '../../../../../../../src/core/server';

function formatObj(obj: Record<string, any>) {
return JSON.stringify(obj, null, 2);
}

export async function callClientWithDebug({
apiCaller,
operationName,
params,
export async function callAsyncWithDebug<T>({
cb,
getMessage,
debug,
request,
}: {
apiCaller: LegacyAPICaller;
operationName: string;
params: Record<string, any>;
cb: () => Promise<T>;
getMessage: () => { body: string; title: string };
debug: boolean;
request: KibanaRequest;
}) {
if (!debug) {
return cb();
}

const startTime = process.hrtime();

let res: any;
let esError = null;
try {
res = await apiCaller(operationName, params);
res = await cb();
} catch (e) {
// catch error and throw after outputting debug info
esError = e;
Expand All @@ -44,23 +41,14 @@ export async function callClientWithDebug({
const highlightColor = esError ? 'bgRed' : 'inverse';
const diff = process.hrtime(startTime);
const duration = `${Math.round(diff[0] * 1000 + diff[1] / 1e6)}ms`;
const routeInfo = `${request.route.method.toUpperCase()} ${
request.route.path
}`;

const { title, body } = getMessage();

console.log(
chalk.bold[highlightColor](`=== Debug: ${routeInfo} (${duration}) ===`)
chalk.bold[highlightColor](`=== Debug: ${title} (${duration}) ===`)
);

if (operationName === 'search') {
console.log(`GET ${params.index}/_${operationName}`);
console.log(formatObj(params.body));
} else {
console.log(chalk.bold('ES operation:'), operationName);

console.log(chalk.bold('ES query:'));
console.log(formatObj(params));
}
console.log(body);
console.log(`\n`);
}

Expand All @@ -70,3 +58,17 @@ export async function callClientWithDebug({

return res;
}

export const getSearchDebugBody = (params: Record<string, any>) =>
`GET ${params.index}/_search\n${formatObj(params.body)}`;

export const getDefaultDebugBody = (
params: Record<string, any>,
operationName: string
) =>
`${chalk.bold('ES operation:')} ${operationName}\n${chalk.bold(
'ES query:'
)}\n${formatObj(params)}`;

export const getDebugTitle = (request: KibanaRequest) =>
`${request.route.method.toUpperCase()} ${request.route.path}`;
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
*/

import { ValuesType } from 'utility-types';
import apm from 'elastic-apm-node';
import { last } from 'lodash';
import { unwrapEsResponse } from '../../../../../../observability/server';
import { PromiseReturnType } from '../../../../../../observability/typings/common';
import { APMError } from '../../../../../typings/es_schemas/ui/apm_error';
import {
ElasticsearchClient,
KibanaRequest,
LegacyScopedClusterClient,
} from '../../../../../../../../src/core/server';
import { ProcessorEvent } from '../../../../../common/processor_event';
import {
Expand All @@ -17,11 +21,16 @@ import {
} from '../../../../../../../typings/elasticsearch';
import { ApmIndicesConfig } from '../../../settings/apm_indices/get_apm_indices';
import { addFilterToExcludeLegacyData } from './add_filter_to_exclude_legacy_data';
import { callClientWithDebug } from '../call_client_with_debug';
import { Transaction } from '../../../../../typings/es_schemas/ui/transaction';
import { Span } from '../../../../../typings/es_schemas/ui/span';
import { Metric } from '../../../../../typings/es_schemas/ui/metric';
import { unpackProcessorEvents } from './unpack_processor_events';
import { EventOutcome } from '../../../../../common/event_outcome';
import {
callAsyncWithDebug,
getDebugTitle,
getSearchDebugBody,
} from '../call_async_with_debug';

export type APMEventESSearchRequest = Omit<ESSearchRequest, 'index'> & {
apm: {
Expand Down Expand Up @@ -59,10 +68,7 @@ export function createApmEventClient({
indices,
options: { includeFrozen } = { includeFrozen: false },
}: {
esClient: Pick<
LegacyScopedClusterClient,
'callAsInternalUser' | 'callAsCurrentUser'
>;
esClient: ElasticsearchClient;
debug: boolean;
request: KibanaRequest;
indices: ApmIndicesConfig;
Expand All @@ -71,27 +77,88 @@ export function createApmEventClient({
};
}) {
return {
search<TParams extends APMEventESSearchRequest>(
async search<TParams extends APMEventESSearchRequest>(
params: TParams,
{ includeLegacyData } = { includeLegacyData: false }
{ includeLegacyData = false } = {}
): Promise<TypedSearchResponse<TParams>> {
const withProcessorEventFilter = unpackProcessorEvents(params, indices);

const withPossibleLegacyDataFilter = !includeLegacyData
? addFilterToExcludeLegacyData(withProcessorEventFilter)
: withProcessorEventFilter;

return callClientWithDebug({
apiCaller: esClient.callAsCurrentUser,
operationName: 'search',
params: {
let span: ReturnType<typeof apm.startSpan> = null;

if (apm.isStarted()) {
span = apm.startSpan('apm_event_search', 'apm_event_search');
await new Promise((resolve) => {
setTimeout(resolve, 0);
});
}

let response: PromiseReturnType<typeof esClient.search>['body'];

async function endSpan(outcome: EventOutcome) {
if (span) {
try {
const stack: Array<{
library_frame: boolean;
filename: string;
function?: string;
}> = await (span as any)._stackObj!;

const caller = stack.find(
(site) =>
!site.library_frame &&
!site.filename.includes(
'create_es_client/create_apm_event_client'
) &&
site.function !== '<anonymous'
);

let name = caller?.function;

if (!name) {
name =
last(
caller?.filename.split('x-pack/plugins/apm/server')
)?.replace(/\.[a-z]+$/, '') ?? span.name;
}

span.name = name;
} catch (error) {
// do nothing
}

span.outcome = outcome;

span.end();
}
}

try {
const searchParams = {
...withPossibleLegacyDataFilter,
ignore_throttled: !includeFrozen,
ignore_unavailable: true,
},
request,
debug,
});
};

response = await callAsyncWithDebug({
cb: () => unwrapEsResponse(esClient.search(searchParams)),
getMessage: () => ({
body: getSearchDebugBody(searchParams),
title: getDebugTitle(request),
}),
debug,
});

process.nextTick(() => endSpan(EventOutcome.success));
} catch (err) {
process.nextTick(() => endSpan(EventOutcome.failure));
throw err;
}

return response as Promise<TypedSearchResponse<TParams>>;
},
};
}
Loading

0 comments on commit 7dfe543

Please sign in to comment.