From c9f38c79c20dd890763bd89772f509b0696b3088 Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Wed, 9 Dec 2020 16:32:32 +0100 Subject: [PATCH] [Transform] Replace legacy elasticsearch client (#84932) (#85396) * [Transform] replace legacy elasticsearch client * [Transform] delete custom legacy client definition, update transforms_audit_messages.ts * [Transform] fix start and stop transform endpoints * [Transform] fix privileges and stats endpoints * [Transform] fix forbidden * [Transform] revert continue statement, add a comment * [Transform] update privileges.ts using security namespace * [Transform] fix error wrappers * [Transform] add functional test for preview error validation * [Transform] extract error message from the root cause * [Transform] remove error translation --- .../server/client/elasticsearch_transform.ts | 144 ---------------- x-pack/plugins/transform/server/plugin.ts | 46 +----- .../server/routes/api/error_utils.ts | 34 +++- .../transform/server/routes/api/privileges.ts | 13 +- .../transform/server/routes/api/transforms.ts | 156 +++++++++--------- .../routes/api/transforms_audit_messages.ts | 2 +- .../plugins/transform/server/routes/index.ts | 3 - .../apis/transform/transforms_preview.ts | 24 +++ 8 files changed, 142 insertions(+), 280 deletions(-) delete mode 100644 x-pack/plugins/transform/server/client/elasticsearch_transform.ts diff --git a/x-pack/plugins/transform/server/client/elasticsearch_transform.ts b/x-pack/plugins/transform/server/client/elasticsearch_transform.ts deleted file mode 100644 index a17eb1416408a1..00000000000000 --- a/x-pack/plugins/transform/server/client/elasticsearch_transform.ts +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -export const elasticsearchJsPlugin = (Client: any, config: any, components: any) => { - const ca = components.clientAction.factory; - - Client.prototype.transform = components.clientAction.namespaceFactory(); - const transform = Client.prototype.transform.prototype; - - // Currently the endpoint uses a default size of 100 unless a size is supplied. - // So until paging is supported in the UI, explicitly supply a size of 1000 - // to match the max number of docs that the endpoint can return. - transform.getTransforms = ca({ - urls: [ - { - fmt: '/_transform/<%=transformId%>', - req: { - transformId: { - type: 'string', - }, - }, - }, - { - fmt: '/_transform/_all?size=1000', - }, - ], - method: 'GET', - }); - - transform.getTransformsStats = ca({ - urls: [ - { - fmt: '/_transform/<%=transformId%>/_stats', - req: { - transformId: { - type: 'string', - }, - }, - }, - { - // Currently the endpoint uses a default size of 100 unless a size is supplied. - // So until paging is supported in the UI, explicitly supply a size of 1000 - // to match the max number of docs that the endpoint can return. - fmt: '/_transform/_all/_stats?size=1000', - }, - ], - method: 'GET', - }); - - transform.createTransform = ca({ - urls: [ - { - fmt: '/_transform/<%=transformId%>', - req: { - transformId: { - type: 'string', - }, - }, - }, - ], - needBody: true, - method: 'PUT', - }); - - transform.updateTransform = ca({ - urls: [ - { - fmt: '/_transform/<%=transformId%>/_update', - req: { - transformId: { - type: 'string', - }, - }, - }, - ], - needBody: true, - method: 'POST', - }); - - transform.deleteTransform = ca({ - urls: [ - { - fmt: '/_transform/<%=transformId%>?&force=<%=force%>', - req: { - transformId: { - type: 'string', - }, - force: { - type: 'boolean', - }, - }, - }, - ], - method: 'DELETE', - }); - - transform.getTransformsPreview = ca({ - urls: [ - { - fmt: '/_transform/_preview', - }, - ], - needBody: true, - method: 'POST', - }); - - transform.startTransform = ca({ - urls: [ - { - fmt: '/_transform/<%=transformId%>/_start', - req: { - transformId: { - type: 'string', - }, - }, - }, - ], - method: 'POST', - }); - - transform.stopTransform = ca({ - urls: [ - { - fmt: - '/_transform/<%=transformId%>/_stop?&force=<%=force%>&wait_for_completion=<%waitForCompletion%>', - req: { - transformId: { - type: 'string', - }, - force: { - type: 'boolean', - }, - waitForCompletion: { - type: 'boolean', - }, - }, - }, - ], - method: 'POST', - }); -}; diff --git a/x-pack/plugins/transform/server/plugin.ts b/x-pack/plugins/transform/server/plugin.ts index 988750f70efe00..987028dcacf054 100644 --- a/x-pack/plugins/transform/server/plugin.ts +++ b/x-pack/plugins/transform/server/plugin.ts @@ -4,30 +4,14 @@ * you may not use this file except in compliance with the Elastic License. */ import { i18n } from '@kbn/i18n'; -import { - CoreSetup, - ILegacyCustomClusterClient, - Plugin, - ILegacyScopedClusterClient, - Logger, - PluginInitializerContext, -} from 'src/core/server'; +import { CoreSetup, Plugin, Logger, PluginInitializerContext } from 'src/core/server'; import { LicenseType } from '../../licensing/common/types'; -import { elasticsearchJsPlugin } from './client/elasticsearch_transform'; import { Dependencies } from './types'; import { ApiRoutes } from './routes'; import { License } from './services'; -declare module 'kibana/server' { - interface RequestHandlerContext { - transform?: { - dataClient: ILegacyScopedClusterClient; - }; - } -} - const basicLicense: LicenseType = 'basic'; const PLUGIN = { @@ -39,18 +23,10 @@ const PLUGIN = { }), }; -async function getCustomEsClient(getStartServices: CoreSetup['getStartServices']) { - const [core] = await getStartServices(); - return core.elasticsearch.legacy.createClient('transform', { - plugins: [elasticsearchJsPlugin], - }); -} - export class TransformServerPlugin implements Plugin<{}, void, any, any> { private readonly apiRoutes: ApiRoutes; private readonly license: License; private readonly logger: Logger; - private transformESClient?: ILegacyCustomClusterClient; constructor(initContext: PluginInitializerContext) { this.logger = initContext.logger.get(); @@ -58,7 +34,10 @@ export class TransformServerPlugin implements Plugin<{}, void, any, any> { this.license = new License(); } - setup({ http, getStartServices }: CoreSetup, { licensing, features }: Dependencies): {} { + setup( + { http, getStartServices, elasticsearch }: CoreSetup, + { licensing, features }: Dependencies + ): {} { const router = http.createRouter(); this.license.setup( @@ -94,23 +73,10 @@ export class TransformServerPlugin implements Plugin<{}, void, any, any> { license: this.license, }); - // Can access via new platform router's handler function 'context' parameter - context.transform.client - http.registerRouteHandlerContext('transform', async (context, request) => { - this.transformESClient = - this.transformESClient ?? (await getCustomEsClient(getStartServices)); - return { - dataClient: this.transformESClient.asScoped(request), - }; - }); - return {}; } start() {} - stop() { - if (this.transformESClient) { - this.transformESClient.close(); - } - } + stop() {} } diff --git a/x-pack/plugins/transform/server/routes/api/error_utils.ts b/x-pack/plugins/transform/server/routes/api/error_utils.ts index cf388f3c8ca084..fe50830cd24f23 100644 --- a/x-pack/plugins/transform/server/routes/api/error_utils.ts +++ b/x-pack/plugins/transform/server/routes/api/error_utils.ts @@ -76,7 +76,7 @@ export function fillResultsWithTimeouts({ results, id, items, action }: Params) } export function wrapError(error: any): CustomHttpResponseOptions { - const boom = Boom.isBoom(error) ? error : Boom.boomify(error, { statusCode: error.status }); + const boom = Boom.isBoom(error) ? error : Boom.boomify(error, { statusCode: error.statusCode }); return { body: boom, headers: boom.output.headers, @@ -109,14 +109,16 @@ function extractCausedByChain( * @return Object Boom error response */ export function wrapEsError(err: any, statusCodeToMessageMap: Record = {}) { - const { statusCode, response } = err; + const { + meta: { body, statusCode }, + } = err; const { error: { root_cause = [], // eslint-disable-line @typescript-eslint/naming-convention caused_by = {}, // eslint-disable-line @typescript-eslint/naming-convention } = {}, - } = JSON.parse(response); + } = body; // If no custom message if specified for the error's status code, just // wrap the error as a Boom error response, include the additional information from ES, and return it @@ -130,6 +132,12 @@ export function wrapEsError(err: any, statusCodeToMessageMap: Record { - const options = {}; + license.guardApiRoute(async (ctx, req, res) => { try { - const transforms = await getTransforms( - options, - ctx.transform!.dataClient.callAsCurrentUser - ); - return res.ok({ body: transforms }); + const { body } = await ctx.core.elasticsearch.client.asCurrentUser.transform.getTransform({ + size: 1000, + ...req.params, + }); + return res.ok({ body }); } catch (e) { return res.customError(wrapError(wrapEsError(e))); } @@ -113,13 +107,11 @@ export function registerTransformsRoutes(routeDependencies: RouteDependencies) { }, license.guardApiRoute(async (ctx, req, res) => { const { transformId } = req.params; - const options = transformId !== undefined ? { transformId } : {}; try { - const transforms = await getTransforms( - options, - ctx.transform!.dataClient.callAsCurrentUser - ); - return res.ok({ body: transforms }); + const { body } = await ctx.core.elasticsearch.client.asCurrentUser.transform.getTransform({ + transform_id: transformId, + }); + return res.ok({ body }); } catch (e) { return res.customError(wrapError(wrapEsError(e))); } @@ -135,18 +127,21 @@ export function registerTransformsRoutes(routeDependencies: RouteDependencies) { */ router.get( { path: addBasePath('transforms/_stats'), validate: false }, - license.guardApiRoute(async (ctx, req, res) => { - const options = {}; - try { - const stats = await ctx.transform!.dataClient.callAsCurrentUser( - 'transform.getTransformsStats', - options - ); - return res.ok({ body: stats }); - } catch (e) { - return res.customError(wrapError(wrapEsError(e))); + license.guardApiRoute( + async (ctx, req, res) => { + try { + const { + body, + } = await ctx.core.elasticsearch.client.asCurrentUser.transform.getTransformStats({ + size: 1000, + transform_id: '_all', + }); + return res.ok({ body }); + } catch (e) { + return res.customError(wrapError(wrapEsError(e))); + } } - }) + ) ); /** @@ -165,15 +160,13 @@ export function registerTransformsRoutes(routeDependencies: RouteDependencies) { }, license.guardApiRoute(async (ctx, req, res) => { const { transformId } = req.params; - const options = { - ...(transformId !== undefined ? { transformId } : {}), - }; try { - const stats = await ctx.transform!.dataClient.callAsCurrentUser( - 'transform.getTransformsStats', - options - ); - return res.ok({ body: stats }); + const { + body, + } = await ctx.core.elasticsearch.client.asCurrentUser.transform.getTransformStats({ + transform_id: transformId, + }); + return res.ok({ body }); } catch (e) { return res.customError(wrapError(wrapEsError(e))); } @@ -208,12 +201,14 @@ export function registerTransformsRoutes(routeDependencies: RouteDependencies) { errors: [], }; - await ctx - .transform!.dataClient.callAsCurrentUser('transform.createTransform', { + await ctx.core.elasticsearch.client.asCurrentUser.transform + .putTransform({ body: req.body, - transformId, + transform_id: transformId, + }) + .then(() => { + response.transformsCreated.push({ transform: transformId }); }) - .then(() => response.transformsCreated.push({ transform: transformId })) .catch((e) => response.errors.push({ id: transformId, @@ -249,11 +244,14 @@ export function registerTransformsRoutes(routeDependencies: RouteDependencies) { const { transformId } = req.params; try { + const { + body, + } = await ctx.core.elasticsearch.client.asCurrentUser.transform.updateTransform({ + body: req.body, + transform_id: transformId, + }); return res.ok({ - body: (await ctx.transform!.dataClient.callAsCurrentUser('transform.updateTransform', { - body: req.body, - transformId, - })) as PostTransformsUpdateResponseSchema, + body, }); } catch (e) { return res.customError(wrapError(e)); @@ -381,9 +379,8 @@ export function registerTransformsRoutes(routeDependencies: RouteDependencies) { }, license.guardApiRoute(async (ctx, req, res) => { try { - return res.ok({ - body: await ctx.transform!.dataClient.callAsCurrentUser('search', req.body), - }); + const { body } = await ctx.core.elasticsearch.client.asCurrentUser.search(req.body); + return res.ok({ body }); } catch (e) { return res.customError(wrapError(wrapEsError(e))); } @@ -391,13 +388,6 @@ export function registerTransformsRoutes(routeDependencies: RouteDependencies) { ); } -const getTransforms = async ( - options: { transformId?: string }, - callAsCurrentUser: LegacyAPICaller -): Promise => { - return await callAsCurrentUser('transform.getTransforms', options); -}; - async function getIndexPatternId( indexName: string, savedObjectsClient: SavedObjectsClientContract @@ -452,11 +442,10 @@ async function deleteTransforms( } // Grab destination index info to delete try { - const transformConfigs = await getTransforms( - { transformId }, - ctx.transform!.dataClient.callAsCurrentUser - ); - const transformConfig = transformConfigs.transforms[0]; + const { body } = await ctx.core.elasticsearch.client.asCurrentUser.transform.getTransform({ + transform_id: transformId, + }); + const transformConfig = body.transforms[0]; destinationIndex = Array.isArray(transformConfig.dest.index) ? transformConfig.dest.index[0] : transformConfig.dest.index; @@ -468,6 +457,7 @@ async function deleteTransforms( destIndexPatternDeleted, destinationIndex, }; + // No need to perform further delete attempts continue; } @@ -476,7 +466,7 @@ async function deleteTransforms( try { // If user does have privilege to delete the index, then delete the index // if no permission then return 403 forbidden - await ctx.transform!.dataClient.callAsCurrentUser('indices.delete', { + await ctx.core.elasticsearch.client.asCurrentUser.indices.delete({ index: destinationIndex, }); destIndexDeleted.success = true; @@ -502,14 +492,14 @@ async function deleteTransforms( } try { - await ctx.transform!.dataClient.callAsCurrentUser('transform.deleteTransform', { - transformId, + await ctx.core.elasticsearch.client.asCurrentUser.transform.deleteTransform({ + transform_id: transformId, force: shouldForceDelete && needToForceDelete, }); transformDeleted.success = true; } catch (deleteTransformJobError) { transformDeleted.error = wrapError(deleteTransformJobError); - if (transformDeleted.error.statusCode === 403) { + if (deleteTransformJobError.statusCode === 403) { return response.forbidden(); } } @@ -541,11 +531,10 @@ const previewTransformHandler: RequestHandler< PostTransformsPreviewRequestSchema > = async (ctx, req, res) => { try { - return res.ok({ - body: await ctx.transform!.dataClient.callAsCurrentUser('transform.getTransformsPreview', { - body: req.body, - }), + const { body } = await ctx.core.elasticsearch.client.asCurrentUser.transform.previewTransform({ + body: req.body, }); + return res.ok({ body }); } catch (e) { return res.customError(wrapError(wrapEsError(e))); } @@ -559,8 +548,9 @@ const startTransformsHandler: RequestHandler< const transformsInfo = req.body; try { + const body = await startTransforms(transformsInfo, ctx.core.elasticsearch.client.asCurrentUser); return res.ok({ - body: await startTransforms(transformsInfo, ctx.transform!.dataClient.callAsCurrentUser), + body, }); } catch (e) { return res.customError(wrapError(wrapEsError(e))); @@ -569,14 +559,16 @@ const startTransformsHandler: RequestHandler< async function startTransforms( transformsInfo: StartTransformsRequestSchema, - callAsCurrentUser: LegacyAPICaller + esClient: ElasticsearchClient ) { const results: StartTransformsResponseSchema = {}; for (const transformInfo of transformsInfo) { const transformId = transformInfo.id; try { - await callAsCurrentUser('transform.startTransform', { transformId }); + await esClient.transform.startTransform({ + transform_id: transformId, + }); results[transformId] = { success: true }; } catch (e) { if (isRequestTimeout(e)) { @@ -602,7 +594,7 @@ const stopTransformsHandler: RequestHandler< try { return res.ok({ - body: await stopTransforms(transformsInfo, ctx.transform!.dataClient.callAsCurrentUser), + body: await stopTransforms(transformsInfo, ctx.core.elasticsearch.client.asCurrentUser), }); } catch (e) { return res.customError(wrapError(wrapEsError(e))); @@ -611,21 +603,21 @@ const stopTransformsHandler: RequestHandler< async function stopTransforms( transformsInfo: StopTransformsRequestSchema, - callAsCurrentUser: LegacyAPICaller + esClient: ElasticsearchClient ) { const results: StopTransformsResponseSchema = {}; for (const transformInfo of transformsInfo) { const transformId = transformInfo.id; try { - await callAsCurrentUser('transform.stopTransform', { - transformId, + await esClient.transform.stopTransform({ + transform_id: transformId, force: transformInfo.state !== undefined ? transformInfo.state === TRANSFORM_STATE.FAILED : false, - waitForCompletion: true, - } as StopOptions); + wait_for_completion: true, + }); results[transformId] = { success: true }; } catch (e) { if (isRequestTimeout(e)) { diff --git a/x-pack/plugins/transform/server/routes/api/transforms_audit_messages.ts b/x-pack/plugins/transform/server/routes/api/transforms_audit_messages.ts index 8c95ab5c786edb..3563775b26f3c4 100644 --- a/x-pack/plugins/transform/server/routes/api/transforms_audit_messages.ts +++ b/x-pack/plugins/transform/server/routes/api/transforms_audit_messages.ts @@ -77,7 +77,7 @@ export function registerTransformsAuditMessagesRoutes({ router, license }: Route } try { - const resp = await ctx.transform!.dataClient.callAsCurrentUser('search', { + const { body: resp } = await ctx.core.elasticsearch.client.asCurrentUser.search({ index: ML_DF_NOTIFICATION_INDEX_PATTERN, ignore_unavailable: true, size: SIZE, diff --git a/x-pack/plugins/transform/server/routes/index.ts b/x-pack/plugins/transform/server/routes/index.ts index 4f35b094017a41..36aea6677b8158 100644 --- a/x-pack/plugins/transform/server/routes/index.ts +++ b/x-pack/plugins/transform/server/routes/index.ts @@ -20,7 +20,4 @@ export class ApiRoutes { registerPrivilegesRoute(dependencies); registerTransformsRoutes(dependencies); } - - start() {} - stop() {} } diff --git a/x-pack/test/api_integration/apis/transform/transforms_preview.ts b/x-pack/test/api_integration/apis/transform/transforms_preview.ts index d0fc44cf28fdbf..72e2c7d0143563 100644 --- a/x-pack/test/api_integration/apis/transform/transforms_preview.ts +++ b/x-pack/test/api_integration/apis/transform/transforms_preview.ts @@ -57,6 +57,30 @@ export default ({ getService }: FtrProviderContext) => { ); }); + it('should return a correct error for transform preview', async () => { + const { body } = await supertest + .post(`/api/transform/transforms/_preview`) + .auth( + USER.TRANSFORM_POWERUSER, + transform.securityCommon.getPasswordForUser(USER.TRANSFORM_POWERUSER) + ) + .set(COMMON_REQUEST_HEADERS) + .send({ + ...getTransformPreviewConfig(), + pivot: { + group_by: { airline: { terms: { field: 'airline' } } }, + aggregations: { + '@timestamp.value_count': { value_countt: { field: '@timestamp' } }, + }, + }, + }) + .expect(400); + + expect(body.message).to.eql( + '[parsing_exception] Unknown aggregation type [value_countt] did you mean [value_count]?, with line=1 & col=43' + ); + }); + it('should return 403 for transform view-only user', async () => { await supertest .post(`/api/transform/transforms/_preview`)