Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Pipelines] Migrate to new ES client #96406

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions x-pack/plugins/ingest_pipelines/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* 2.0.
*/

import { Pipeline as ESPipeline } from '@elastic/elasticsearch/api/types';

export interface ESProcessorConfig {
on_failure?: Processor[];
ignore_failure?: boolean;
Expand All @@ -17,19 +19,10 @@ export interface Processor {
[typeName: string]: ESProcessorConfig;
}

export interface Pipeline {
export interface Pipeline extends ESPipeline {
name: string;
description: string;
version?: number;
processors: Processor[];
on_failure?: Processor[];
}

export interface PipelinesByName {
[key: string]: {
description: string;
version?: number;
processors: Processor[];
on_failure?: Processor[];
};
[key: string]: ESPipeline;
}
4 changes: 2 additions & 2 deletions x-pack/plugins/ingest_pipelines/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { PLUGIN_ID, PLUGIN_MIN_LICENSE_TYPE } from '../common/constants';

import { License } from './services';
import { ApiRoutes } from './routes';
import { isEsError } from './shared_imports';
import { handleEsError } from './shared_imports';
import { Dependencies } from './types';

export class IngestPipelinesPlugin implements Plugin<void, void, any, any> {
Expand Down Expand Up @@ -66,7 +66,7 @@ export class IngestPipelinesPlugin implements Plugin<void, void, any, any> {
isSecurityEnabled: () => security !== undefined && security.license.isEnabled(),
},
lib: {
isEsError,
handleEsError,
},
});
}
Expand Down
27 changes: 8 additions & 19 deletions x-pack/plugins/ingest_pipelines/server/routes/api/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import { schema } from '@kbn/config-schema';
import { Pipeline } from '../../../common/types';
import { API_BASE_PATH } from '../../../common/constants';
import { RouteDependencies } from '../../types';
import { pipelineSchema } from './pipeline_schema';
import { isObjectWithKeys } from './shared';
import { pipelineSchema } from './shared';

const bodySchema = schema.object({
name: schema.string(),
Expand All @@ -22,7 +21,7 @@ const bodySchema = schema.object({
export const registerCreateRoute = ({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies): void => {
router.post(
{
Expand All @@ -32,15 +31,17 @@ export const registerCreateRoute = ({
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;
const pipeline = req.body as Pipeline;

// eslint-disable-next-line @typescript-eslint/naming-convention
const { name, description, processors, version, on_failure } = pipeline;

try {
// Check that a pipeline with the same name doesn't already exist
const pipelineByName = await callAsCurrentUser('ingest.getPipeline', { id: name });
const { body: pipelineByName } = await clusterClient.asCurrentUser.ingest.getPipeline({
id: name,
});

if (pipelineByName[name]) {
return res.conflict({
Expand All @@ -59,7 +60,7 @@ export const registerCreateRoute = ({
}

try {
const response = await callAsCurrentUser('ingest.putPipeline', {
const { body: response } = await clusterClient.asCurrentUser.ingest.putPipeline({
id: name,
body: {
description,
Expand All @@ -71,19 +72,7 @@ export const registerCreateRoute = ({

return res.ok({ body: response });
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: isObjectWithKeys(error.body)
? {
message: error.message,
attributes: error.body,
}
: error,
});
}

throw error;
return handleEsError({ error, response: res });
}
})
);
Expand Down
5 changes: 3 additions & 2 deletions x-pack/plugins/ingest_pipelines/server/routes/api/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export const registerDeleteRoute = ({ router, license }: RouteDependencies): voi
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;
const { names } = req.params;
const pipelineNames = names.split(',');

Expand All @@ -34,7 +34,8 @@ export const registerDeleteRoute = ({ router, license }: RouteDependencies): voi

await Promise.all(
pipelineNames.map((pipelineName) => {
return callAsCurrentUser('ingest.deletePipeline', { id: pipelineName })
return clusterClient.asCurrentUser.ingest
.deletePipeline({ id: pipelineName })
.then(() => response.itemsDeleted.push(pipelineName))
.catch((e) =>
response.errors.push({
Expand Down
15 changes: 4 additions & 11 deletions x-pack/plugins/ingest_pipelines/server/routes/api/documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const paramsSchema = schema.object({
export const registerDocumentsRoute = ({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies): void => {
router.get(
{
Expand All @@ -28,11 +28,11 @@ export const registerDocumentsRoute = ({
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;
const { index, id } = req.params;

try {
const document = await callAsCurrentUser('get', { index, id });
const { body: document } = await clusterClient.asCurrentUser.get({ index, id });

const { _id, _index, _source } = document;

Expand All @@ -44,14 +44,7 @@ export const registerDocumentsRoute = ({
},
});
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: error,
});
}

throw error;
return handleEsError({ error, response: res });
}
})
);
Expand Down
40 changes: 12 additions & 28 deletions x-pack/plugins/ingest_pipelines/server/routes/api/get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,22 @@ const paramsSchema = schema.object({
export const registerGetRoutes = ({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies): void => {
// Get all pipelines
router.get(
{ path: API_BASE_PATH, validate: false },
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;

try {
const pipelines = await callAsCurrentUser('ingest.getPipeline');
const { body: pipelines } = await clusterClient.asCurrentUser.ingest.getPipeline({
Copy link
Contributor

@alisonelizabeth alisonelizabeth Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think an id is required or needed to fetch all pipelines.

I'm actually getting an error currently when trying to load the list view as-is. (I'm surprised a test didn't catch this 🤔 nm, looks like it did 😄 )

Screen Shot 2021-04-07 at 1 35 25 PM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha this is what happens when you commit test code! This is how I came across the issue with messages returned in handlEsError 😅 . Will fix!

id: '_all',
});

return res.ok({ body: deserializePipelines(pipelines) });
} catch (error) {
if (isEsError(error)) {
// ES returns 404 when there are no pipelines
// Instead, we return an empty array and 200 status back to the client
if (error.status === 404) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check no longer needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! This behaviour needs to be preserved!

return res.ok({ body: [] });
}

return res.customError({
statusCode: error.statusCode,
body: error,
});
}

throw error;
return handleEsError({ error, response: res });
}
})
);
Expand All @@ -58,27 +47,22 @@ export const registerGetRoutes = ({
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;
const { name } = req.params;

try {
const pipeline = await callAsCurrentUser('ingest.getPipeline', { id: name });
const { body: pipelines } = await clusterClient.asCurrentUser.ingest.getPipeline({
id: name,
});

return res.ok({
body: {
...pipeline[name],
...pipelines[name],
name,
},
});
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: error,
});
}

throw error;
return handleEsError({ error, response: res });
}
})
);
Expand Down
23 changes: 6 additions & 17 deletions x-pack/plugins/ingest_pipelines/server/routes/api/privileges.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,13 @@ export const registerPrivilegesRoute = ({ license, router, config }: RouteDepend
return res.ok({ body: privilegesResult });
}

const {
core: {
elasticsearch: {
legacy: { client },
},
},
} = ctx;
const { client: clusterClient } = ctx.core.elasticsearch;

const { has_all_requested: hasAllPrivileges, cluster } = await client.callAsCurrentUser(
'transport.request',
{
path: '/_security/user/_has_privileges',
method: 'POST',
body: {
cluster: APP_CLUSTER_REQUIRED_PRIVILEGES,
},
}
);
const {
body: { has_all_requested: hasAllPrivileges, cluster },
} = await clusterClient.asCurrentUser.security.hasPrivileges({
body: { cluster: APP_CLUSTER_REQUIRED_PRIVILEGES },
});

if (!hasAllPrivileges) {
privilegesResult.missingPrivileges.cluster = extractMissingPrivileges(cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
* 2.0.
*/

export { isObjectWithKeys } from './is_object_with_keys';
export { pipelineSchema } from './pipeline_schema';

This file was deleted.

21 changes: 7 additions & 14 deletions x-pack/plugins/ingest_pipelines/server/routes/api/simulate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { SimulatePipelineDocument } from '@elastic/elasticsearch/api/types';
import { schema } from '@kbn/config-schema';

import { API_BASE_PATH } from '../../../common/constants';
import { RouteDependencies } from '../../types';
import { pipelineSchema } from './pipeline_schema';
import { pipelineSchema } from './shared';

const bodySchema = schema.object({
pipeline: schema.object(pipelineSchema),
Expand All @@ -20,7 +20,7 @@ const bodySchema = schema.object({
export const registerSimulateRoute = ({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies): void => {
router.post(
{
Expand All @@ -30,29 +30,22 @@ export const registerSimulateRoute = ({
},
},
license.guardApiRoute(async (ctx, req, res) => {
const { callAsCurrentUser } = ctx.core.elasticsearch.legacy.client;
const { client: clusterClient } = ctx.core.elasticsearch;

const { pipeline, documents, verbose } = req.body;

try {
const response = await callAsCurrentUser('ingest.simulate', {
const { body: response } = await clusterClient.asCurrentUser.ingest.simulate({
verbose,
body: {
pipeline,
docs: documents,
docs: documents as SimulatePipelineDocument[],
},
});

return res.ok({ body: response });
} catch (error) {
if (isEsError(error)) {
return res.customError({
statusCode: error.statusCode,
body: error,
});
}

throw error;
return handleEsError({ error, response: res });
}
})
);
Expand Down
Loading