Skip to content

Commit

Permalink
[Ingest Pipelines] Migrate to new ES client (#96406) (#96929)
Browse files Browse the repository at this point in the history
* - migrated use of legacy.client to client
- removed use of isEsError to detect legacy errors
- refactored types to use types from @elastic/elasticsearch
  instead (where appropriate)

tested get, put, post, delete, simulate and documents endpoints
locally

* remove use of legacyEs service in functional test

* fixing type issues and API response object

* remove id from get all request!

* reinstated logic for handling 404 from get all pipelines request

* clarify error handling with comments and small variable name refactor

* updated delete error responses

* update functional test

* refactor use of legacyEs

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
jloleysens and kibanamachine committed Apr 14, 2021
1 parent d08d6dd commit ac301ff
Show file tree
Hide file tree
Showing 19 changed files with 85 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ interface EsErrorHandlerParams {
handleCustomError?: () => IKibanaResponse<any>;
}

/*
/**
* For errors returned by the new elasticsearch js client.
*
* @throws If "error" is not an error from the elasticsearch client this handler will throw "error".
*/
export const handleEsError = ({
error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
* 2.0.
*/

import { PipelinesByName, Pipeline } from '../types';
import { Pipeline as ESPipeline } from '@elastic/elasticsearch/api/types';
import { Pipeline, Processor } from '../types';

export function deserializePipelines(pipelinesByName: PipelinesByName): Pipeline[] {
export function deserializePipelines(pipelinesByName: { [key: string]: ESPipeline }): Pipeline[] {
const pipelineNames: string[] = Object.keys(pipelinesByName);

const deserializedPipelines = pipelineNames.map((name: string) => {
const deserializedPipelines = pipelineNames.map<Pipeline>((name: string) => {
return {
...pipelinesByName[name],
processors: (pipelinesByName[name]?.processors as Processor[]) ?? [],
on_failure: pipelinesByName[name]?.on_failure as Processor[],
name,
};
});
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/ingest_pipelines/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export interface Processor {

export interface Pipeline {
name: string;
description: string;
description?: string;
version?: number;
processors: Processor[];
on_failure?: Processor[];
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/ingest_pipelines/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { PLUGIN_ID, PLUGIN_MIN_LICENSE_TYPE } from '../common/constants';

import { License } from './services';
import { ApiRoutes } from './routes';
import { isEsError } from './shared_imports';
import { handleEsError } from './shared_imports';
import { Dependencies } from './types';

export class IngestPipelinesPlugin implements Plugin<void, void, any, any> {
Expand Down Expand Up @@ -66,7 +66,7 @@ export class IngestPipelinesPlugin implements Plugin<void, void, any, any> {
isSecurityEnabled: () => security !== undefined && security.license.isEnabled(),
},
lib: {
isEsError,
handleEsError,
},
});
}
Expand Down
27 changes: 8 additions & 19 deletions x-pack/plugins/ingest_pipelines/server/routes/api/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import { schema } from '@kbn/config-schema';
import { Pipeline } from '../../../common/types';
import { API_BASE_PATH } from '../../../common/constants';
import { RouteDependencies } from '../../types';
import { pipelineSchema } from './pipeline_schema';
import { isObjectWithKeys } from './shared';
import { pipelineSchema } from './shared';

const bodySchema = schema.object({
name: schema.string(),
Expand All @@ -22,7 +21,7 @@ const bodySchema = schema.object({
export const registerCreateRoute = ({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies): void => {
router.post(
{
Expand All @@ -32,15 +31,17 @@ export const registerCreateRoute = ({
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;
const pipeline = req.body as Pipeline;

// eslint-disable-next-line @typescript-eslint/naming-convention
const { name, description, processors, version, on_failure } = pipeline;

try {
// Check that a pipeline with the same name doesn't already exist
const pipelineByName = await callAsCurrentUser('ingest.getPipeline', { id: name });
const { body: pipelineByName } = await clusterClient.asCurrentUser.ingest.getPipeline({
id: name,
});

if (pipelineByName[name]) {
return res.conflict({
Expand All @@ -59,7 +60,7 @@ export const registerCreateRoute = ({
}

try {
const response = await callAsCurrentUser('ingest.putPipeline', {
const { body: response } = await clusterClient.asCurrentUser.ingest.putPipeline({
id: name,
body: {
description,
Expand All @@ -71,19 +72,7 @@ export const registerCreateRoute = ({

return res.ok({ body: response });
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: isObjectWithKeys(error.body)
? {
message: error.message,
attributes: error.body,
}
: error,
});
}

throw error;
return handleEsError({ error, response: res });
}
})
);
Expand Down
14 changes: 8 additions & 6 deletions x-pack/plugins/ingest_pipelines/server/routes/api/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const registerDeleteRoute = ({ router, license }: RouteDependencies): voi
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;
const { names } = req.params;
const pipelineNames = names.split(',');

Expand All @@ -34,14 +34,16 @@ export const registerDeleteRoute = ({ router, license }: RouteDependencies): voi

await Promise.all(
pipelineNames.map((pipelineName) => {
return callAsCurrentUser('ingest.deletePipeline', { id: pipelineName })
return clusterClient.asCurrentUser.ingest
.deletePipeline({ id: pipelineName })
.then(() => response.itemsDeleted.push(pipelineName))
.catch((e) =>
.catch((e) => {
response.errors.push({
error: e?.meta?.body?.error ?? e,
status: e?.meta?.body?.status,
name: pipelineName,
error: e,
})
);
});
});
})
);

Expand Down
15 changes: 4 additions & 11 deletions x-pack/plugins/ingest_pipelines/server/routes/api/documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const paramsSchema = schema.object({
export const registerDocumentsRoute = ({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies): void => {
router.get(
{
Expand All @@ -28,11 +28,11 @@ export const registerDocumentsRoute = ({
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;
const { index, id } = req.params;

try {
const document = await callAsCurrentUser('get', { index, id });
const { body: document } = await clusterClient.asCurrentUser.get({ index, id });

const { _id, _index, _source } = document;

Expand All @@ -44,14 +44,7 @@ export const registerDocumentsRoute = ({
},
});
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: error,
});
}

throw error;
return handleEsError({ error, response: res });
}
})
);
Expand Down
38 changes: 13 additions & 25 deletions x-pack/plugins/ingest_pipelines/server/routes/api/get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,26 @@ const paramsSchema = schema.object({
export const registerGetRoutes = ({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies): void => {
// Get all pipelines
router.get(
{ path: API_BASE_PATH, validate: false },
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;

try {
const pipelines = await callAsCurrentUser('ingest.getPipeline');
const { body: pipelines } = await clusterClient.asCurrentUser.ingest.getPipeline();

return res.ok({ body: deserializePipelines(pipelines) });
} catch (error) {
if (isEsError(error)) {
const esErrorResponse = handleEsError({ error, response: res });
if (esErrorResponse.status === 404) {
// ES returns 404 when there are no pipelines
// Instead, we return an empty array and 200 status back to the client
if (error.status === 404) {
return res.ok({ body: [] });
}

return res.customError({
statusCode: error.statusCode,
body: error,
});
return res.ok({ body: [] });
}

throw error;
return esErrorResponse;
}
})
);
Expand All @@ -58,27 +51,22 @@ export const registerGetRoutes = ({
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;
const { name } = req.params;

try {
const pipeline = await callAsCurrentUser('ingest.getPipeline', { id: name });
const { body: pipelines } = await clusterClient.asCurrentUser.ingest.getPipeline({
id: name,
});

return res.ok({
body: {
...pipeline[name],
...pipelines[name],
name,
},
});
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: error,
});
}

throw error;
return handleEsError({ error, response: res });
}
})
);
Expand Down
23 changes: 6 additions & 17 deletions x-pack/plugins/ingest_pipelines/server/routes/api/privileges.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,13 @@ export const registerPrivilegesRoute = ({ license, router, config }: RouteDepend
return res.ok({ body: privilegesResult });
}

const {
core: {
elasticsearch: {
legacy: { client },
},
},
} = ctx;
const { client: clusterClient } = ctx.core.elasticsearch;

const { has_all_requested: hasAllPrivileges, cluster } = await client.callAsCurrentUser(
'transport.request',
{
path: '/_security/user/_has_privileges',
method: 'POST',
body: {
cluster: APP_CLUSTER_REQUIRED_PRIVILEGES,
},
}
);
const {
body: { has_all_requested: hasAllPrivileges, cluster },
} = await clusterClient.asCurrentUser.security.hasPrivileges({
body: { cluster: APP_CLUSTER_REQUIRED_PRIVILEGES },
});

if (!hasAllPrivileges) {
privilegesResult.missingPrivileges.cluster = extractMissingPrivileges(cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
* 2.0.
*/

export { isObjectWithKeys } from './is_object_with_keys';
export { pipelineSchema } from './pipeline_schema';

This file was deleted.

21 changes: 7 additions & 14 deletions x-pack/plugins/ingest_pipelines/server/routes/api/simulate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { SimulatePipelineDocument } from '@elastic/elasticsearch/api/types';
import { schema } from '@kbn/config-schema';

import { API_BASE_PATH } from '../../../common/constants';
import { RouteDependencies } from '../../types';
import { pipelineSchema } from './pipeline_schema';
import { pipelineSchema } from './shared';

const bodySchema = schema.object({
pipeline: schema.object(pipelineSchema),
Expand All @@ -20,7 +20,7 @@ const bodySchema = schema.object({
export const registerSimulateRoute = ({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies): void => {
router.post(
{
Expand All @@ -30,29 +30,22 @@ export const registerSimulateRoute = ({
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;

const { pipeline, documents, verbose } = req.body;

try {
const response = await callAsCurrentUser('ingest.simulate', {
const { body: response } = await clusterClient.asCurrentUser.ingest.simulate({
verbose,
body: {
pipeline,
docs: documents,
docs: documents as SimulatePipelineDocument[],
},
});

return res.ok({ body: response });
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: error,
});
}

throw error;
return handleEsError({ error, response: res });
}
})
);
Expand Down
Loading

0 comments on commit ac301ff

Please sign in to comment.