From b99d8af623a398fa3175f99cf9d99a72dad5cb7e Mon Sep 17 00:00:00 2001 From: Lukas Olson Date: Sun, 13 Sep 2020 10:20:10 -0700 Subject: [PATCH 1/3] [Search] Remove long-running query pop-up (#75385) * [Search] Remove long-running query pop-up * Don't timeout if requestTimeout isn't configured * Remove unused kibanaUtils * Remove unused kibanaReact * Re-add reference to kibanaUtils * Remove unused translations and update documentation * Add new x-pack advanced setting searchTimeout and use it in the EnhancedSearchInterceptor * docs * Re-add toast when queries time out * Fix types * Update error message with capabilities * Update docs * Update docs * Move search server routes into a directory. * Add internal/_msearch route. * Remove legacy search API, rewrite default search strategy to use internal route. * Remove legacy es_client code. * Handle msearch options on server. * Remove elasticsearch-browser dependency. * Update generated docs. * Rely on server timeout in OSS (?) Use UI setting in xpack. * Rename function * Add features to dependencies * Undefined check * doc * Code review fixes * code review * doc * loading count * simplify code review and fix jest tets * type check * Remove esShard from client * cleanup request parameters from FE * doc * doc * Align request parameters on server, Remove leftover parameters from client Shim responses for search and msearch routes * docs Stop using toSnakeCase Updates jest tests * add management docs * docs * Remove import * Break circular dep + fix msearch test * Remove deleted type * Fix jest * Bring toSnakeCase back * docs * fix jest * Add new x-pack advanced setting searchTimeout and use it in the EnhancedSearchInterceptor * docs * Rely on server timeout in OSS (?) Use UI setting in xpack. * Rename function * doc * Remove esShard from client * cleanup request parameters from FE * doc * doc * Align request parameters on server, Remove leftover parameters from client Shim responses for search and msearch routes * docs Stop using toSnakeCase Updates jest tests * add management docs * docs * Remove import * Break circular dep + fix msearch test * Remove deleted type * Fix jest * Bring toSnakeCase back * docs * fix jest * Fix merge * Fix types * Allow timeout to be undefined * Fix jest test * Upldate docs * Fix msearch jest * Merge correction * docs * Fix rollup search merge * Fix merge * Use i18n Co-authored-by: Liza K Co-authored-by: Luke Elmers Co-authored-by: Elastic Machine --- ...blic.searchinterceptor.getpendingcount_.md | 17 ---- ...n-plugins-data-public.searchinterceptor.md | 2 +- ...blic.searchinterceptor.showtimeouterror.md | 11 +++ src/plugins/data/public/public.api.md | 10 +-- .../collectors/create_usage_collector.test.ts | 27 ------ .../collectors/create_usage_collector.ts | 24 ----- .../data/public/search/collectors/types.ts | 6 -- .../public/search/long_query_notification.tsx | 59 ------------ .../public/search/search_interceptor.test.ts | 75 +++++++--------- .../data/public/search/search_interceptor.ts | 90 +++++++------------ src/plugins/data/server/search/types.ts | 2 +- x-pack/plugins/data_enhanced/kibana.json | 7 +- .../public/search/long_query_notification.tsx | 47 ---------- .../public/search/search_interceptor.test.ts | 87 ------------------ .../public/search/search_interceptor.ts | 54 ++++++----- x-pack/plugins/data_enhanced/server/plugin.ts | 2 +- .../translations/translations/ja-JP.json | 4 - .../translations/translations/zh-CN.json | 4 - 18 files changed, 110 insertions(+), 418 deletions(-) delete mode 100644 docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.getpendingcount_.md create mode 100644 docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.showtimeouterror.md delete mode 100644 src/plugins/data/public/search/long_query_notification.tsx delete mode 100644 x-pack/plugins/data_enhanced/public/search/long_query_notification.tsx diff --git a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.getpendingcount_.md b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.getpendingcount_.md deleted file mode 100644 index ef36b3f37b0c7c..00000000000000 --- a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.getpendingcount_.md +++ /dev/null @@ -1,17 +0,0 @@ - - -[Home](./index.md) > [kibana-plugin-plugins-data-public](./kibana-plugin-plugins-data-public.md) > [SearchInterceptor](./kibana-plugin-plugins-data-public.searchinterceptor.md) > [getPendingCount$](./kibana-plugin-plugins-data-public.searchinterceptor.getpendingcount_.md) - -## SearchInterceptor.getPendingCount$() method - -Returns an `Observable` over the current number of pending searches. This could mean that one of the search requests is still in flight, or that it has only received partial responses. - -Signature: - -```typescript -getPendingCount$(): Observable; -``` -Returns: - -`Observable` - diff --git a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.md b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.md index fd9f23a7f00527..5cee345db6cd2a 100644 --- a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.md +++ b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.md @@ -21,11 +21,11 @@ export declare class SearchInterceptor | Property | Modifiers | Type | Description | | --- | --- | --- | --- | | [deps](./kibana-plugin-plugins-data-public.searchinterceptor.deps.md) | | SearchInterceptorDeps | | +| [showTimeoutError](./kibana-plugin-plugins-data-public.searchinterceptor.showtimeouterror.md) | | ((e: Error) => void) & import("lodash").Cancelable | | ## Methods | Method | Modifiers | Description | | --- | --- | --- | -| [getPendingCount$()](./kibana-plugin-plugins-data-public.searchinterceptor.getpendingcount_.md) | | Returns an Observable over the current number of pending searches. This could mean that one of the search requests is still in flight, or that it has only received partial responses. | | [search(request, options)](./kibana-plugin-plugins-data-public.searchinterceptor.search.md) | | Searches using the given search method. Overrides the AbortSignal with one that will abort either when cancelPending is called, when the request times out, or when the original AbortSignal is aborted. Updates pendingCount$ when the request is started/finalized. | diff --git a/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.showtimeouterror.md b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.showtimeouterror.md new file mode 100644 index 00000000000000..91ecb2821acbfb --- /dev/null +++ b/docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptor.showtimeouterror.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-plugins-data-public](./kibana-plugin-plugins-data-public.md) > [SearchInterceptor](./kibana-plugin-plugins-data-public.searchinterceptor.md) > [showTimeoutError](./kibana-plugin-plugins-data-public.searchinterceptor.showtimeouterror.md) + +## SearchInterceptor.showTimeoutError property + +Signature: + +```typescript +protected showTimeoutError: ((e: Error) => void) & import("lodash").Cancelable; +``` diff --git a/src/plugins/data/public/public.api.md b/src/plugins/data/public/public.api.md index 9006c172d933ea..fa5d3cd85f4300 100644 --- a/src/plugins/data/public/public.api.md +++ b/src/plugins/data/public/public.api.md @@ -65,7 +65,6 @@ import { Search } from '@elastic/elasticsearch/api/requestParams'; import { SearchResponse } from 'elasticsearch'; import { SerializedFieldFormat as SerializedFieldFormat_2 } from 'src/plugins/expressions/common'; import { Subscription } from 'rxjs'; -import { Toast } from 'kibana/public'; import { ToastInputFields } from 'src/core/public/notifications'; import { ToastsSetup } from 'kibana/public'; import { TransportRequestOptions } from '@elastic/elasticsearch/lib/Transport'; @@ -1939,11 +1938,6 @@ export class SearchInterceptor { protected application: CoreStart['application']; // (undocumented) protected readonly deps: SearchInterceptorDeps; - getPendingCount$(): Observable; - // @internal (undocumented) - protected hideToast: () => void; - // @internal - protected longRunningToast?: Toast; // @internal protected pendingCount$: BehaviorSubject; // @internal (undocumented) @@ -1957,8 +1951,8 @@ export class SearchInterceptor { combinedSignal: AbortSignal; cleanup: () => void; }; - // @internal (undocumented) - protected showToast: () => void; + // (undocumented) + protected showTimeoutError: ((e: Error) => void) & import("lodash").Cancelable; // @internal protected timeoutSubscriptions: Subscription; } diff --git a/src/plugins/data/public/search/collectors/create_usage_collector.test.ts b/src/plugins/data/public/search/collectors/create_usage_collector.test.ts index 315d4678cabf1b..9cadb1e796ad68 100644 --- a/src/plugins/data/public/search/collectors/create_usage_collector.test.ts +++ b/src/plugins/data/public/search/collectors/create_usage_collector.test.ts @@ -63,31 +63,4 @@ describe('Search Usage Collector', () => { SEARCH_EVENT_TYPE.QUERIES_CANCELLED ); }); - - test('tracks long popups', async () => { - await usageCollector.trackLongQueryPopupShown(); - expect(mockUsageCollectionSetup.reportUiStats).toHaveBeenCalled(); - expect(mockUsageCollectionSetup.reportUiStats.mock.calls[0][1]).toBe(METRIC_TYPE.LOADED); - expect(mockUsageCollectionSetup.reportUiStats.mock.calls[0][2]).toBe( - SEARCH_EVENT_TYPE.LONG_QUERY_POPUP_SHOWN - ); - }); - - test('tracks long popups dismissed', async () => { - await usageCollector.trackLongQueryDialogDismissed(); - expect(mockUsageCollectionSetup.reportUiStats).toHaveBeenCalled(); - expect(mockUsageCollectionSetup.reportUiStats.mock.calls[0][1]).toBe(METRIC_TYPE.CLICK); - expect(mockUsageCollectionSetup.reportUiStats.mock.calls[0][2]).toBe( - SEARCH_EVENT_TYPE.LONG_QUERY_DIALOG_DISMISSED - ); - }); - - test('tracks run query beyond timeout', async () => { - await usageCollector.trackLongQueryRunBeyondTimeout(); - expect(mockUsageCollectionSetup.reportUiStats).toHaveBeenCalled(); - expect(mockUsageCollectionSetup.reportUiStats.mock.calls[0][1]).toBe(METRIC_TYPE.CLICK); - expect(mockUsageCollectionSetup.reportUiStats.mock.calls[0][2]).toBe( - SEARCH_EVENT_TYPE.LONG_QUERY_RUN_BEYOND_TIMEOUT - ); - }); }); diff --git a/src/plugins/data/public/search/collectors/create_usage_collector.ts b/src/plugins/data/public/search/collectors/create_usage_collector.ts index 321b2c5b990492..187ed90652bb26 100644 --- a/src/plugins/data/public/search/collectors/create_usage_collector.ts +++ b/src/plugins/data/public/search/collectors/create_usage_collector.ts @@ -48,29 +48,5 @@ export const createUsageCollector = ( SEARCH_EVENT_TYPE.QUERIES_CANCELLED ); }, - trackLongQueryPopupShown: async () => { - const currentApp = await getCurrentApp(); - return usageCollection?.reportUiStats( - currentApp!, - METRIC_TYPE.LOADED, - SEARCH_EVENT_TYPE.LONG_QUERY_POPUP_SHOWN - ); - }, - trackLongQueryDialogDismissed: async () => { - const currentApp = await getCurrentApp(); - return usageCollection?.reportUiStats( - currentApp!, - METRIC_TYPE.CLICK, - SEARCH_EVENT_TYPE.LONG_QUERY_DIALOG_DISMISSED - ); - }, - trackLongQueryRunBeyondTimeout: async () => { - const currentApp = await getCurrentApp(); - return usageCollection?.reportUiStats( - currentApp!, - METRIC_TYPE.CLICK, - SEARCH_EVENT_TYPE.LONG_QUERY_RUN_BEYOND_TIMEOUT - ); - }, }; }; diff --git a/src/plugins/data/public/search/collectors/types.ts b/src/plugins/data/public/search/collectors/types.ts index 3e98f901eb0c39..bb7fa1e6ae4a27 100644 --- a/src/plugins/data/public/search/collectors/types.ts +++ b/src/plugins/data/public/search/collectors/types.ts @@ -20,15 +20,9 @@ export enum SEARCH_EVENT_TYPE { QUERY_TIMED_OUT = 'queryTimedOut', QUERIES_CANCELLED = 'queriesCancelled', - LONG_QUERY_POPUP_SHOWN = 'longQueryPopupShown', - LONG_QUERY_DIALOG_DISMISSED = 'longQueryDialogDismissed', - LONG_QUERY_RUN_BEYOND_TIMEOUT = 'longQueryRunBeyondTimeout', } export interface SearchUsageCollector { trackQueryTimedOut: () => Promise; trackQueriesCancelled: () => Promise; - trackLongQueryPopupShown: () => Promise; - trackLongQueryDialogDismissed: () => Promise; - trackLongQueryRunBeyondTimeout: () => Promise; } diff --git a/src/plugins/data/public/search/long_query_notification.tsx b/src/plugins/data/public/search/long_query_notification.tsx deleted file mode 100644 index 1db298618fae81..00000000000000 --- a/src/plugins/data/public/search/long_query_notification.tsx +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import { EuiButton, EuiFlexGroup, EuiFlexItem, EuiSpacer } from '@elastic/eui'; -import { FormattedMessage } from '@kbn/i18n/react'; -import React from 'react'; -import { ApplicationStart } from 'kibana/public'; -import { toMountPoint } from '../../../kibana_react/public'; - -interface Props { - application: ApplicationStart; -} - -export function getLongQueryNotification(props: Props) { - return toMountPoint(); -} - -export function LongQueryNotification(props: Props) { - return ( -
- - - - - { - await props.application.navigateToApp('management/stack/license_management'); - }} - > - - - - -
- ); -} diff --git a/src/plugins/data/public/search/search_interceptor.test.ts b/src/plugins/data/public/search/search_interceptor.test.ts index 84db69a83a005d..7bfa6f0ab1bc57 100644 --- a/src/plugins/data/public/search/search_interceptor.test.ts +++ b/src/plugins/data/public/search/search_interceptor.test.ts @@ -95,6 +95,39 @@ describe('SearchInterceptor', () => { await flushPromises(); }); + test('Should not timeout if requestTimeout is undefined', async () => { + searchInterceptor = new SearchInterceptor({ + startServices: mockCoreSetup.getStartServices(), + uiSettings: mockCoreSetup.uiSettings, + http: mockCoreSetup.http, + toasts: mockCoreSetup.notifications.toasts, + }); + mockCoreSetup.http.fetch.mockImplementationOnce((options: any) => { + return new Promise((resolve, reject) => { + options.signal.addEventListener('abort', () => { + reject(new AbortError()); + }); + + setTimeout(resolve, 5000); + }); + }); + const mockRequest: IEsSearchRequest = { + params: {}, + }; + const response = searchInterceptor.search(mockRequest); + + expect.assertions(1); + const next = jest.fn(); + const complete = () => { + expect(next).toBeCalled(); + }; + response.subscribe({ next, complete }); + + jest.advanceTimersByTime(5000); + + await flushPromises(); + }); + test('Observable should fail if user aborts (test merged signal)', async () => { const abortController = new AbortController(); mockCoreSetup.http.fetch.mockImplementationOnce((options: any) => { @@ -125,7 +158,7 @@ describe('SearchInterceptor', () => { await flushPromises(); }); - test('Immediatelly aborts if passed an aborted abort signal', async (done) => { + test('Immediately aborts if passed an aborted abort signal', async (done) => { const abort = new AbortController(); const mockRequest: IEsSearchRequest = { params: {}, @@ -141,44 +174,4 @@ describe('SearchInterceptor', () => { response.subscribe({ error }); }); }); - - describe('getPendingCount$', () => { - test('should observe the number of pending requests', () => { - const pendingCount$ = searchInterceptor.getPendingCount$(); - const pendingNext = jest.fn(); - pendingCount$.subscribe(pendingNext); - - const mockResponse: any = { result: 200 }; - mockCoreSetup.http.fetch.mockResolvedValue(mockResponse); - const mockRequest: IEsSearchRequest = { - params: {}, - }; - const response = searchInterceptor.search(mockRequest); - - response.subscribe({ - complete: () => { - expect(pendingNext.mock.calls).toEqual([[0], [1], [0]]); - }, - }); - }); - - test('should observe the number of pending requests on error', () => { - const pendingCount$ = searchInterceptor.getPendingCount$(); - const pendingNext = jest.fn(); - pendingCount$.subscribe(pendingNext); - - const mockResponse: any = { result: 500 }; - mockCoreSetup.http.fetch.mockRejectedValue(mockResponse); - const mockRequest: IEsSearchRequest = { - params: {}, - }; - const response = searchInterceptor.search(mockRequest); - - response.subscribe({ - complete: () => { - expect(pendingNext.mock.calls).toEqual([[0], [1], [0]]); - }, - }); - }); - }); }); diff --git a/src/plugins/data/public/search/search_interceptor.ts b/src/plugins/data/public/search/search_interceptor.ts index 0a6d60afed2f73..888e12a4285b1c 100644 --- a/src/plugins/data/public/search/search_interceptor.ts +++ b/src/plugins/data/public/search/search_interceptor.ts @@ -17,7 +17,7 @@ * under the License. */ -import { trimEnd } from 'lodash'; +import { trimEnd, debounce } from 'lodash'; import { BehaviorSubject, throwError, @@ -28,25 +28,24 @@ import { Observable, NEVER, } from 'rxjs'; -import { finalize, filter } from 'rxjs/operators'; -import { Toast, CoreStart, ToastsSetup, CoreSetup } from 'kibana/public'; -import { getCombinedSignal, AbortError } from '../../common/utils'; +import { catchError, finalize } from 'rxjs/operators'; +import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public'; +import { i18n } from '@kbn/i18n'; import { + getCombinedSignal, + AbortError, IEsSearchRequest, IEsSearchResponse, ISearchOptions, ES_SEARCH_STRATEGY, -} from '../../common/search'; -import { getLongQueryNotification } from './long_query_notification'; +} from '../../common'; import { SearchUsageCollector } from './collectors'; -const LONG_QUERY_NOTIFICATION_DELAY = 10000; - export interface SearchInterceptorDeps { - toasts: ToastsSetup; http: CoreSetup['http']; uiSettings: CoreSetup['uiSettings']; startServices: Promise<[CoreStart, any, unknown]>; + toasts: ToastsSetup; usageCollector?: SearchUsageCollector; } @@ -69,12 +68,6 @@ export class SearchInterceptor { */ protected timeoutSubscriptions: Subscription = new Subscription(); - /** - * The current long-running toast (if there is one). - * @internal - */ - protected longRunningToast?: Toast; - /** * @internal */ @@ -89,19 +82,6 @@ export class SearchInterceptor { this.deps.startServices.then(([coreStart]) => { this.application = coreStart.application; }); - - // When search requests go out, a notification is scheduled allowing users to continue the - // request past the timeout. When all search requests complete, we remove the notification. - this.getPendingCount$() - .pipe(filter((count) => count === 0)) - .subscribe(this.hideToast); - } - /** - * Returns an `Observable` over the current number of pending searches. This could mean that one - * of the search requests is still in flight, or that it has only received partial responses. - */ - public getPendingCount$() { - return this.pendingCount$.asObservable(); } /** @@ -146,6 +126,12 @@ export class SearchInterceptor { this.pendingCount$.next(this.pendingCount$.getValue() + 1); return this.runSearch(request, combinedSignal, options?.strategy).pipe( + catchError((e: any) => { + if (e.body?.attributes?.error === 'Request timed out') { + this.showTimeoutError(e); + } + return throwError(e); + }), finalize(() => { this.pendingCount$.next(this.pendingCount$.getValue() - 1); cleanup(); @@ -170,12 +156,10 @@ export class SearchInterceptor { const timeout$ = timeout ? timer(timeout) : NEVER; const subscription = timeout$.subscribe(() => { timeoutController.abort(); + this.showTimeoutError(new AbortError()); }); this.timeoutSubscriptions.add(subscription); - // Schedule the notification to allow users to cancel or wait beyond the timeout - const notificationSubscription = timer(LONG_QUERY_NOTIFICATION_DELAY).subscribe(this.showToast); - // Get a combined `AbortSignal` that will be aborted whenever the first of the following occurs: // 1. The user manually aborts (via `cancelPending`) // 2. The request times out @@ -189,7 +173,6 @@ export class SearchInterceptor { const combinedSignal = getCombinedSignal(signals); const cleanup = () => { this.timeoutSubscriptions.remove(subscription); - notificationSubscription.unsubscribe(); }; combinedSignal.addEventListener('abort', cleanup); @@ -200,36 +183,23 @@ export class SearchInterceptor { }; } - /** - * @internal - */ - protected showToast = () => { - if (this.longRunningToast) return; - this.longRunningToast = this.deps.toasts.addInfo( - { - title: 'Your query is taking a while', - text: getLongQueryNotification({ - application: this.application, + // Right now we are debouncing but we will hook this up with background sessions to show only one + // error notification per session. + protected showTimeoutError = debounce( + (e: Error) => { + this.deps.toasts.addError(e, { + title: 'Timed out', + toastMessage: i18n.translate('data.search.upgradeLicense', { + defaultMessage: + 'One or more queries timed out. With our free Basic tier, your queries never time out.', }), - }, - { - toastLifeTimeMs: 1000000, - } - ); - }; - - /** - * @internal - */ - protected hideToast = () => { - if (this.longRunningToast) { - this.deps.toasts.remove(this.longRunningToast); - delete this.longRunningToast; - if (this.deps.usageCollector) { - this.deps.usageCollector.trackLongQueryDialogDismissed(); - } + }); + }, + 60000, + { + leading: true, } - }; + ); } export type ISearchInterceptor = PublicMethodsOf; diff --git a/src/plugins/data/server/search/types.ts b/src/plugins/data/server/search/types.ts index b2b958454de48b..aefdac2ab639fb 100644 --- a/src/plugins/data/server/search/types.ts +++ b/src/plugins/data/server/search/types.ts @@ -20,7 +20,7 @@ import { RequestHandlerContext } from '../../../../core/server'; import { ISearchOptions } from '../../common/search'; import { AggsSetup, AggsStart } from './aggs'; -import { SearchUsage } from './collectors/usage'; +import { SearchUsage } from './collectors'; import { IEsSearchRequest, IEsSearchResponse } from './es_search'; export interface SearchEnhancements { diff --git a/x-pack/plugins/data_enhanced/kibana.json b/x-pack/plugins/data_enhanced/kibana.json index 637af39339e277..5ded0f8f0dec3a 100644 --- a/x-pack/plugins/data_enhanced/kibana.json +++ b/x-pack/plugins/data_enhanced/kibana.json @@ -6,10 +6,11 @@ "xpack", "data_enhanced" ], "requiredPlugins": [ - "data" + "data", + "features" ], - "optionalPlugins": ["kibanaReact", "kibanaUtils", "usageCollection"], + "optionalPlugins": ["kibanaUtils", "usageCollection"], "server": true, "ui": true, - "requiredBundles": ["kibanaReact", "kibanaUtils"] + "requiredBundles": ["kibanaUtils"] } diff --git a/x-pack/plugins/data_enhanced/public/search/long_query_notification.tsx b/x-pack/plugins/data_enhanced/public/search/long_query_notification.tsx deleted file mode 100644 index 325cf1145fa5f1..00000000000000 --- a/x-pack/plugins/data_enhanced/public/search/long_query_notification.tsx +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { EuiButton, EuiButtonEmpty, EuiFlexGroup, EuiFlexItem, EuiSpacer } from '@elastic/eui'; -import { FormattedMessage } from '@kbn/i18n/react'; -import React from 'react'; -import { toMountPoint } from '../../../../../src/plugins/kibana_react/public'; - -interface Props { - cancel: () => void; - runBeyondTimeout: () => void; -} - -export function getLongQueryNotification(props: Props) { - return toMountPoint( - - ); -} - -export function LongQueryNotification(props: Props) { - return ( -
- - - - - - - - - - - - - -
- ); -} diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts index 261e03887acdba..af2fc85602541a 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.test.ts @@ -60,9 +60,6 @@ describe('EnhancedSearchInterceptor', () => { mockUsageCollector = { trackQueryTimedOut: jest.fn(), trackQueriesCancelled: jest.fn(), - trackLongQueryPopupShown: jest.fn(), - trackLongQueryDialogDismissed: jest.fn(), - trackLongQueryRunBeyondTimeout: jest.fn(), }; const mockPromise = new Promise((resolve) => { @@ -390,88 +387,4 @@ describe('EnhancedSearchInterceptor', () => { expect(mockUsageCollector.trackQueriesCancelled).toBeCalledTimes(1); }); }); - - describe('runBeyondTimeout', () => { - const timedResponses = [ - { - time: 250, - value: { - isPartial: true, - isRunning: true, - id: 1, - rawResponse: { - took: 1, - }, - }, - }, - { - time: 2000, - value: { - isPartial: false, - isRunning: false, - id: 1, - rawResponse: { - took: 1, - }, - }, - }, - ]; - - test('times out if runBeyondTimeout is not called', async () => { - mockFetchImplementation(timedResponses); - - const response = searchInterceptor.search({}); - response.subscribe({ next, error }); - - await timeTravel(250); - - expect(next).toHaveBeenCalled(); - expect(next.mock.calls[0][0]).toStrictEqual(timedResponses[0].value); - - await timeTravel(750); - - expect(error).toHaveBeenCalled(); - expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError); - }); - - test('times out if runBeyondTimeout is called too late', async () => { - mockFetchImplementation(timedResponses); - - const response = searchInterceptor.search({}); - response.subscribe({ next, error }); - setTimeout(() => searchInterceptor.runBeyondTimeout(), 1100); - - await timeTravel(250); - - expect(next).toHaveBeenCalled(); - expect(next.mock.calls[0][0]).toStrictEqual(timedResponses[0].value); - - await timeTravel(750); - - expect(error).toHaveBeenCalled(); - expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError); - }); - - test('should prevent the request from timing out', async () => { - mockFetchImplementation(timedResponses); - - const response = searchInterceptor.search({}, { pollInterval: 0 }); - response.subscribe({ next, error, complete }); - setTimeout(() => searchInterceptor.runBeyondTimeout(), 500); - - await timeTravel(250); - - expect(next).toHaveBeenCalled(); - expect(next.mock.calls[0][0]).toStrictEqual(timedResponses[0].value); - - await timeTravel(250); // Run beyond timeout - await timeTravel(1750); // Final response - - expect(next).toHaveBeenCalledTimes(2); - expect(next.mock.calls[0][0]).toStrictEqual(timedResponses[0].value); - expect(next.mock.calls[1][0]).toStrictEqual(timedResponses[1].value); - expect(error).not.toHaveBeenCalled(); - expect(mockUsageCollector.trackLongQueryRunBeyondTimeout).toBeCalledTimes(1); - }); - }); }); diff --git a/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts b/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts index 61cf579d3136b6..f7ae9fc6d0f917 100644 --- a/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts +++ b/x-pack/plugins/data_enhanced/public/search/search_interceptor.ts @@ -6,7 +6,8 @@ import { throwError, EMPTY, timer, from, Subscription } from 'rxjs'; import { mergeMap, expand, takeUntil, finalize, tap } from 'rxjs/operators'; -import { getLongQueryNotification } from './long_query_notification'; +import { debounce } from 'lodash'; +import { i18n } from '@kbn/i18n'; import { SearchInterceptor, SearchInterceptorDeps, @@ -42,38 +43,11 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { * Abort our `AbortController`, which in turn aborts any intercepted searches. */ public cancelPending = () => { - this.hideToast(); this.abortController.abort(); this.abortController = new AbortController(); if (this.deps.usageCollector) this.deps.usageCollector.trackQueriesCancelled(); }; - /** - * Un-schedule timing out all of the searches intercepted. - */ - public runBeyondTimeout = () => { - this.hideToast(); - this.timeoutSubscriptions.unsubscribe(); - if (this.deps.usageCollector) this.deps.usageCollector.trackLongQueryRunBeyondTimeout(); - }; - - protected showToast = () => { - if (this.longRunningToast) return; - this.longRunningToast = this.deps.toasts.addInfo( - { - title: 'Your query is taking a while', - text: getLongQueryNotification({ - cancel: this.cancelPending, - runBeyondTimeout: this.runBeyondTimeout, - }), - }, - { - toastLifeTimeMs: 1000000, - } - ); - if (this.deps.usageCollector) this.deps.usageCollector.trackLongQueryPopupShown(); - }; - public search( request: IAsyncSearchRequest, { pollInterval = 1000, ...options }: IAsyncSearchOptions = {} @@ -127,4 +101,28 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { }) ); } + + // Right now we are debouncing but we will hook this up with background sessions to show only one + // error notification per session. + protected showTimeoutError = debounce( + (e: Error) => { + const message = this.application.capabilities.advancedSettings?.save + ? i18n.translate('xpack.data.search.timeoutIncreaseSetting', { + defaultMessage: + 'One or more queries timed out. Increase run time with the search.timeout advanced setting.', + }) + : i18n.translate('xpack.data.search.timeoutContactAdmin', { + defaultMessage: + 'One or more queries timed out. Contact your system administrator to increase the run time.', + }); + this.deps.toasts.addError(e, { + title: 'Timed out', + toastMessage: message, + }); + }, + 60000, + { + leading: true, + } + ); } diff --git a/x-pack/plugins/data_enhanced/server/plugin.ts b/x-pack/plugins/data_enhanced/server/plugin.ts index 3b05e83d208b7b..a1dff00ddfdd39 100644 --- a/x-pack/plugins/data_enhanced/server/plugin.ts +++ b/x-pack/plugins/data_enhanced/server/plugin.ts @@ -18,8 +18,8 @@ import { } from '../../../../src/plugins/data/server'; import { enhancedEsSearchStrategyProvider } from './search'; import { UsageCollectionSetup } from '../../../../src/plugins/usage_collection/server'; -import { ENHANCED_ES_SEARCH_STRATEGY } from '../common'; import { getUiSettings } from './ui_settings'; +import { ENHANCED_ES_SEARCH_STRATEGY } from '../common'; interface SetupDependencies { data: DataPluginSetup; diff --git a/x-pack/plugins/translations/translations/ja-JP.json b/x-pack/plugins/translations/translations/ja-JP.json index e1cafa34519f5e..603723111c0511 100644 --- a/x-pack/plugins/translations/translations/ja-JP.json +++ b/x-pack/plugins/translations/translations/ja-JP.json @@ -816,8 +816,6 @@ "data.query.queryBar.KQLNestedQuerySyntaxInfoTitle": "KQL ネストされたクエリ構文", "data.query.queryBar.kqlOffLabel": "オフ", "data.query.queryBar.kqlOnLabel": "オン", - "data.query.queryBar.licenseOptions": "ライセンスオプションに進む", - "data.query.queryBar.longQueryMessage": "ライセンスをアップグレードすれば、リクエストの完了までに十分な時間を確保できます。", "data.query.queryBar.luceneLanguageName": "Lucene", "data.query.queryBar.luceneSyntaxWarningMessage": "Lucene クエリ構文を使用しているようですが、Kibana クエリ言語 (KQL) が選択されています。KQL ドキュメント {link} を確認してください。", "data.query.queryBar.luceneSyntaxWarningOptOutText": "今後表示しない", @@ -6676,8 +6674,6 @@ "xpack.data.kueryAutocomplete.lessThanOrEqualOperatorDescription.lessThanOrEqualToText": "より小さいまたは等しい", "xpack.data.kueryAutocomplete.orOperatorDescription": "{oneOrMoreArguments} が true であることを条件とする", "xpack.data.kueryAutocomplete.orOperatorDescription.oneOrMoreArgumentsText": "1つ以上の引数", - "xpack.data.query.queryBar.cancelLongQuery": "キャンセル", - "xpack.data.query.queryBar.runBeyond": "タイムアウトを越えて実行", "xpack.discover.FlyoutCreateDrilldownAction.displayName": "基本データを調査", "xpack.embeddableEnhanced.actions.panelNotifications.manyDrilldowns": "パネルには{count}個のドリルダウンがあります", "xpack.embeddableEnhanced.actions.panelNotifications.oneDrilldown": "パネルには1個のドリルダウンがあります", diff --git a/x-pack/plugins/translations/translations/zh-CN.json b/x-pack/plugins/translations/translations/zh-CN.json index b0de74ab2150a2..d7d3e63ffd8bc8 100644 --- a/x-pack/plugins/translations/translations/zh-CN.json +++ b/x-pack/plugins/translations/translations/zh-CN.json @@ -817,8 +817,6 @@ "data.query.queryBar.KQLNestedQuerySyntaxInfoTitle": "KQL 嵌套查询语法", "data.query.queryBar.kqlOffLabel": "关闭", "data.query.queryBar.kqlOnLabel": "开启", - "data.query.queryBar.licenseOptions": "前往许可证选项", - "data.query.queryBar.longQueryMessage": "使用升级的许可证,您可以确保有足够的时间来完成请求。", "data.query.queryBar.luceneLanguageName": "Lucene", "data.query.queryBar.luceneSyntaxWarningMessage": "尽管您选择了 Kibana 查询语言 (KQL),但似乎您正在尝试使用 Lucene 查询语法。请查看 KQL 文档 {link}。", "data.query.queryBar.luceneSyntaxWarningOptOutText": "不再显示", @@ -6679,8 +6677,6 @@ "xpack.data.kueryAutocomplete.lessThanOrEqualOperatorDescription.lessThanOrEqualToText": "小于或等于", "xpack.data.kueryAutocomplete.orOperatorDescription": "需要{oneOrMoreArguments}为 true", "xpack.data.kueryAutocomplete.orOperatorDescription.oneOrMoreArgumentsText": "一个或多个参数", - "xpack.data.query.queryBar.cancelLongQuery": "取消", - "xpack.data.query.queryBar.runBeyond": "运行超时", "xpack.discover.FlyoutCreateDrilldownAction.displayName": "浏览底层数据", "xpack.embeddableEnhanced.actions.panelNotifications.manyDrilldowns": "面板有 {count} 个向下钻取", "xpack.embeddableEnhanced.actions.panelNotifications.oneDrilldown": "面板有 1 个向下钻取", From 49a00352b5122e6c35e7953a47c2663034dc1f67 Mon Sep 17 00:00:00 2001 From: Lukas Olson Date: Sun, 13 Sep 2020 14:08:39 -0700 Subject: [PATCH 2/3] [Search] Re-add support for aborting when a connection is closed (#76470) * [Search] Re-add support for aborting when a connection is closed * Fix types Co-authored-by: Elastic Machine --- .../search/es_search/es_search_strategy.ts | 9 ++++--- .../server/search/es_search_strategy.ts | 26 +++++++++++++------ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/plugins/data/server/search/es_search/es_search_strategy.ts b/src/plugins/data/server/search/es_search/es_search_strategy.ts index 106f974ed34577..e2ed500689cfa5 100644 --- a/src/plugins/data/server/search/es_search/es_search_strategy.ts +++ b/src/plugins/data/server/search/es_search/es_search_strategy.ts @@ -52,10 +52,11 @@ export const esSearchStrategyProvider = ( }); try { - const esResponse = (await context.core.elasticsearch.client.asCurrentUser.search( - params - )) as ApiResponse>; - const rawResponse = esResponse.body; + // Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297 + const promise = context.core.elasticsearch.client.asCurrentUser.search(params); + if (options?.abortSignal) + options.abortSignal.addEventListener('abort', () => promise.abort()); + const { body: rawResponse } = (await promise) as ApiResponse>; if (usage) usage.trackSuccess(rawResponse.took); diff --git a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts index eda6178dc8e5b5..72ea1f096e8fb1 100644 --- a/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts +++ b/x-pack/plugins/data_enhanced/server/search/es_search_strategy.ts @@ -7,6 +7,7 @@ import { first } from 'rxjs/operators'; import { SearchResponse } from 'elasticsearch'; import { Observable } from 'rxjs'; +import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport'; import { SharedGlobalConfig, RequestHandlerContext, Logger } from '../../../../../src/core/server'; import { getTotalLoaded, @@ -40,8 +41,8 @@ export const enhancedEsSearchStrategyProvider = ( try { const response = isAsync - ? await asyncSearch(context, request) - : await rollupSearch(context, request); + ? await asyncSearch(context, request, options) + : await rollupSearch(context, request, options); if ( usage && @@ -69,9 +70,10 @@ export const enhancedEsSearchStrategyProvider = ( async function asyncSearch( context: RequestHandlerContext, - request: IEnhancedEsSearchRequest + request: IEnhancedEsSearchRequest, + options?: ISearchOptions ): Promise { - let esResponse; + let promise: TransportRequestPromise; const esClient = context.core.elasticsearch.client.asCurrentUser; const uiSettingsClient = await context.core.uiSettings.client; @@ -89,14 +91,17 @@ export const enhancedEsSearchStrategyProvider = ( ...request.params, }); - esResponse = await esClient.asyncSearch.submit(submitOptions); + promise = esClient.asyncSearch.submit(submitOptions); } else { - esResponse = await esClient.asyncSearch.get({ + promise = esClient.asyncSearch.get({ id: request.id, ...toSnakeCase(asyncOptions), }); } + // Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297 + if (options?.abortSignal) options.abortSignal.addEventListener('abort', () => promise.abort()); + const esResponse = await promise; const { id, response, is_partial: isPartial, is_running: isRunning } = esResponse.body; return { id, @@ -109,7 +114,8 @@ export const enhancedEsSearchStrategyProvider = ( const rollupSearch = async function ( context: RequestHandlerContext, - request: IEnhancedEsSearchRequest + request: IEnhancedEsSearchRequest, + options?: ISearchOptions ): Promise { const esClient = context.core.elasticsearch.client.asCurrentUser; const uiSettingsClient = await context.core.uiSettings.client; @@ -123,13 +129,17 @@ export const enhancedEsSearchStrategyProvider = ( ...params, }); - const esResponse = await esClient.transport.request({ + const promise = esClient.transport.request({ method, path, body, querystring, }); + // Temporary workaround until https://github.com/elastic/elasticsearch-js/issues/1297 + if (options?.abortSignal) options.abortSignal.addEventListener('abort', () => promise.abort()); + const esResponse = await promise; + const response = esResponse.body as SearchResponse; return { rawResponse: response, From 61951a577041317989140e22992b6c75410d34d9 Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Sun, 13 Sep 2020 21:09:33 -0400 Subject: [PATCH 3/3] [Ingest Manager] Shared Fleet agent policy action (#76013) --- .../common/types/models/agent.ts | 31 ++- .../common/types/rest_spec/agent.ts | 2 +- .../server/routes/agent/actions_handlers.ts | 3 +- .../server/saved_objects/index.ts | 3 + .../server/services/agent_policy.ts | 32 ++- .../server/services/agent_policy_update.ts | 11 +- .../server/services/agents/acks.test.ts | 257 ++---------------- .../server/services/agents/acks.ts | 112 +++++--- .../server/services/agents/actions.test.ts | 8 +- .../server/services/agents/actions.ts | 118 +++++++- .../agents/checkin/state_new_actions.ts | 103 ++++--- .../server/services/agents/saved_objects.ts | 53 +++- .../ingest_manager/server/services/setup.ts | 6 + .../ingest_manager/server/types/index.tsx | 3 + .../server/types/models/agent.ts | 7 +- .../apis/fleet/agents/actions.ts | 3 +- 16 files changed, 397 insertions(+), 355 deletions(-) diff --git a/x-pack/plugins/ingest_manager/common/types/models/agent.ts b/x-pack/plugins/ingest_manager/common/types/models/agent.ts index 2b8a306577e7d2..a204373fe2e56b 100644 --- a/x-pack/plugins/ingest_manager/common/types/models/agent.ts +++ b/x-pack/plugins/ingest_manager/common/types/models/agent.ts @@ -21,7 +21,8 @@ export type AgentStatus = | 'unenrolling' | 'degraded'; -export type AgentActionType = 'CONFIG_CHANGE' | 'DATA_DUMP' | 'RESUME' | 'PAUSE' | 'UNENROLL'; +export type AgentActionType = 'CONFIG_CHANGE' | 'UNENROLL'; + export interface NewAgentAction { type: AgentActionType; data?: any; @@ -29,20 +30,44 @@ export interface NewAgentAction { } export interface AgentAction extends NewAgentAction { + type: AgentActionType; + data?: any; + sent_at?: string; id: string; agent_id: string; created_at: string; + ack_data?: any; +} + +export interface AgentPolicyAction extends NewAgentAction { + id: string; + type: AgentActionType; + data?: any; + policy_id: string; + policy_revision: number; + created_at: string; + ack_data?: any; } -export interface AgentActionSOAttributes { +interface CommonAgentActionSOAttributes { type: AgentActionType; sent_at?: string; timestamp?: string; created_at: string; - agent_id: string; data?: string; + ack_data?: string; } +export type AgentActionSOAttributes = CommonAgentActionSOAttributes & { + agent_id: string; +}; +export type AgentPolicyActionSOAttributes = CommonAgentActionSOAttributes & { + policy_id: string; + policy_revision: number; +}; + +export type BaseAgentActionSOAttributes = AgentActionSOAttributes | AgentPolicyActionSOAttributes; + export interface NewAgentEvent { type: 'STATE' | 'ERROR' | 'ACTION_RESULT' | 'ACTION'; subtype: // State diff --git a/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts b/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts index cf8d3ab1c908ac..54cdeade3764e0 100644 --- a/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts +++ b/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts @@ -7,11 +7,11 @@ import { Agent, AgentAction, + NewAgentAction, NewAgentEvent, AgentEvent, AgentStatus, AgentType, - NewAgentAction, } from '../models'; export interface GetAgentsRequest { diff --git a/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts b/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts index b81d44c40f8eb8..12a0956b791550 100644 --- a/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts +++ b/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts @@ -10,7 +10,6 @@ import { RequestHandler } from 'kibana/server'; import { TypeOf } from '@kbn/config-schema'; import { PostNewAgentActionRequestSchema } from '../../types/rest_spec'; import { ActionsService } from '../../services/agents'; -import { NewAgentAction } from '../../../common/types/models'; import { PostNewAgentActionResponse } from '../../../common/types/rest_spec'; export const postNewAgentActionHandlerBuilder = function ( @@ -26,7 +25,7 @@ export const postNewAgentActionHandlerBuilder = function ( const agent = await actionsService.getAgent(soClient, request.params.agentId); - const newAgentAction = request.body.action as NewAgentAction; + const newAgentAction = request.body.action; const savedAgentAction = await actionsService.createAgentAction(soClient, { created_at: new Date().toISOString(), diff --git a/x-pack/plugins/ingest_manager/server/saved_objects/index.ts b/x-pack/plugins/ingest_manager/server/saved_objects/index.ts index aff8e607622d47..e86f7b24e2c784 100644 --- a/x-pack/plugins/ingest_manager/server/saved_objects/index.ts +++ b/x-pack/plugins/ingest_manager/server/saved_objects/index.ts @@ -98,8 +98,11 @@ const savedObjectTypes: { [key: string]: SavedObjectsType } = { mappings: { properties: { agent_id: { type: 'keyword' }, + policy_id: { type: 'keyword' }, + policy_revision: { type: 'integer' }, type: { type: 'keyword' }, data: { type: 'binary' }, + ack_data: { type: 'text' }, sent_at: { type: 'date' }, created_at: { type: 'date' }, }, diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy.ts index a03a3b7f59fba3..938cfb43516303 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy.ts @@ -21,7 +21,7 @@ import { ListWithKuery, } from '../types'; import { DeleteAgentPolicyResponse, storedPackagePoliciesToAgentInputs } from '../../common'; -import { listAgents } from './agents'; +import { createAgentPolicyAction, listAgents } from './agents'; import { packagePolicyService } from './package_policy'; import { outputService } from './output'; import { agentPolicyUpdateEventHandler } from './agent_policy_update'; @@ -67,6 +67,10 @@ class AgentPolicyService { updated_by: user ? user.username : 'system', }); + if (options.bumpRevision) { + await this.triggerAgentPolicyUpdatedEvent(soClient, 'updated', id); + } + return (await this.get(soClient, id)) as AgentPolicy; } @@ -383,6 +387,32 @@ class AgentPolicyService { }; } + public async createFleetPolicyChangeAction( + soClient: SavedObjectsClientContract, + agentPolicyId: string + ) { + const policy = await agentPolicyService.getFullAgentPolicy(soClient, agentPolicyId); + if (!policy || !policy.revision) { + return; + } + const packages = policy.inputs.reduce((acc, input) => { + const packageName = input.meta?.package?.name; + if (packageName && acc.indexOf(packageName) < 0) { + acc.push(packageName); + } + return acc; + }, []); + + await createAgentPolicyAction(soClient, { + type: 'CONFIG_CHANGE', + data: { config: policy } as any, + ack_data: { packages }, + created_at: new Date().toISOString(), + policy_id: policy.id, + policy_revision: policy.revision, + }); + } + public async getFullAgentPolicy( soClient: SavedObjectsClientContract, id: string, diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts index 3c743dd957f62e..ff20e25e5bf0d9 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts @@ -8,6 +8,7 @@ import { SavedObjectsClientContract } from 'src/core/server'; import { generateEnrollmentAPIKey, deleteEnrollmentApiKeyForAgentPolicyId } from './api_keys'; import { unenrollForAgentPolicyId } from './agents'; import { outputService } from './output'; +import { agentPolicyService } from './agent_policy'; export async function agentPolicyUpdateEventHandler( soClient: SavedObjectsClientContract, @@ -15,8 +16,9 @@ export async function agentPolicyUpdateEventHandler( agentPolicyId: string ) { const adminUser = await outputService.getAdminUser(soClient); - // If no admin user fleet is not enabled just skip this hook - if (!adminUser) { + const outputId = await outputService.getDefaultOutputId(soClient); + // If no admin user and no default output fleet is not enabled just skip this hook + if (!adminUser || !outputId) { return; } @@ -24,6 +26,11 @@ export async function agentPolicyUpdateEventHandler( await generateEnrollmentAPIKey(soClient, { agentPolicyId, }); + await agentPolicyService.createFleetPolicyChangeAction(soClient, agentPolicyId); + } + + if (action === 'updated') { + await agentPolicyService.createFleetPolicyChangeAction(soClient, agentPolicyId); } if (action === 'deleted') { diff --git a/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts index 80fdc305d0ba7f..866aa587b8a56a 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts @@ -6,45 +6,19 @@ import Boom from 'boom'; import { SavedObjectsBulkResponse } from 'kibana/server'; import { savedObjectsClientMock } from 'src/core/server/mocks'; -import { encryptedSavedObjectsMock } from '../../../../../plugins/encrypted_saved_objects/server/mocks'; import { Agent, - AgentAction, AgentActionSOAttributes, + BaseAgentActionSOAttributes, AgentEvent, } from '../../../common/types/models'; import { AGENT_TYPE_PERMANENT, AGENT_ACTION_SAVED_OBJECT_TYPE } from '../../../common/constants'; import { acknowledgeAgentActions } from './acks'; -import { appContextService } from '../app_context'; -import { IngestManagerAppContext } from '../../plugin'; describe('test agent acks services', () => { it('should succeed on valid and matched actions', async () => { const mockSavedObjectsClient = savedObjectsClientMock.create(); - const mockStartEncryptedSOPlugin = encryptedSavedObjectsMock.createStart(); - appContextService.start(({ - encryptedSavedObjectsStart: mockStartEncryptedSOPlugin, - } as unknown) as IngestManagerAppContext); - - const [ - { value: mockStartEncryptedSOClient }, - ] = mockStartEncryptedSOPlugin.getClient.mock.results; - - mockStartEncryptedSOClient.getDecryptedAsInternalUser.mockReturnValue( - Promise.resolve({ - id: 'action1', - references: [], - type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - }, - }) - ); mockSavedObjectsClient.bulkGet.mockReturnValue( Promise.resolve({ @@ -65,7 +39,7 @@ describe('test agent acks services', () => { } as SavedObjectsBulkResponse) ); - const agentActions = await acknowledgeAgentActions( + await acknowledgeAgentActions( mockSavedObjectsClient, ({ id: 'id', @@ -81,125 +55,32 @@ describe('test agent acks services', () => { } as AgentEvent, ] ); - expect(agentActions).toEqual([ - ({ - type: 'CONFIG_CHANGE', - id: 'action1', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - } as unknown) as AgentAction, - ]); }); it('should update config field on the agent if a policy change is acknowledged', async () => { const mockSavedObjectsClient = savedObjectsClientMock.create(); - const mockStartEncryptedSOPlugin = encryptedSavedObjectsMock.createStart(); - appContextService.start(({ - encryptedSavedObjectsStart: mockStartEncryptedSOPlugin, - } as unknown) as IngestManagerAppContext); - const [ - { value: mockStartEncryptedSOClient }, - ] = mockStartEncryptedSOPlugin.getClient.mock.results; - - mockStartEncryptedSOClient.getDecryptedAsInternalUser.mockReturnValue( - Promise.resolve({ - id: 'action1', - references: [], - type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - data: JSON.stringify({ - config: { - id: 'policy1', - revision: 4, - settings: { - monitoring: { - enabled: true, - use_output: 'default', - logs: true, - metrics: true, - }, - }, - outputs: { - default: { - type: 'elasticsearch', - hosts: ['http://localhost:9200'], - }, - }, - inputs: [ - { - id: 'f2293360-b57c-11ea-8bd3-7bd51e425399', - name: 'system-1', - type: 'logs', - use_output: 'default', - meta: { - package: { - name: 'system', - version: '0.3.0', - }, - }, - dataset: { - namespace: 'default', - }, - streams: [ - { - id: 'logs-system.syslog', - dataset: { - name: 'system.syslog', - }, - paths: ['/var/log/messages*', '/var/log/syslog*'], - exclude_files: ['.gz$'], - multiline: { - pattern: '^\\s', - match: 'after', - }, - processors: [ - { - add_locale: null, - }, - { - add_fields: { - target: '', - fields: { - 'ecs.version': '1.5.0', - }, - }, - }, - ], - }, - ], - }, - ], - }, - }), - }, - }) - ); + const actionAttributes = { + type: 'CONFIG_CHANGE', + policy_id: 'policy1', + policy_revision: 4, + sent_at: '2020-03-14T19:45:02.620Z', + timestamp: '2019-01-04T14:32:03.36764-05:00', + created_at: '2020-03-14T19:45:02.620Z', + ack_data: JSON.stringify({ packages: ['system'] }), + }; mockSavedObjectsClient.bulkGet.mockReturnValue( Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action2', references: [], type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - }, + attributes: actionAttributes, }, ], - } as SavedObjectsBulkResponse) + } as SavedObjectsBulkResponse) ); await acknowledgeAgentActions( @@ -214,13 +95,13 @@ describe('test agent acks services', () => { type: 'ACTION_RESULT', subtype: 'CONFIG', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action1', + action_id: 'action2', agent_id: 'id', } as AgentEvent, ] ); expect(mockSavedObjectsClient.bulkUpdate).toBeCalled(); - expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(2); + expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(1); expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0][0]).toMatchInlineSnapshot(` Object { "attributes": Object { @@ -237,111 +118,25 @@ describe('test agent acks services', () => { it('should not update config field on the agent if a policy change for an old revision is acknowledged', async () => { const mockSavedObjectsClient = savedObjectsClientMock.create(); - const mockStartEncryptedSOPlugin = encryptedSavedObjectsMock.createStart(); - appContextService.start(({ - encryptedSavedObjectsStart: mockStartEncryptedSOPlugin, - } as unknown) as IngestManagerAppContext); - - const [ - { value: mockStartEncryptedSOClient }, - ] = mockStartEncryptedSOPlugin.getClient.mock.results; - - mockStartEncryptedSOClient.getDecryptedAsInternalUser.mockReturnValue( - Promise.resolve({ - id: 'action1', - references: [], - type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - data: JSON.stringify({ - config: { - id: 'policy1', - revision: 4, - settings: { - monitoring: { - enabled: true, - use_output: 'default', - logs: true, - metrics: true, - }, - }, - outputs: { - default: { - type: 'elasticsearch', - hosts: ['http://localhost:9200'], - }, - }, - inputs: [ - { - id: 'f2293360-b57c-11ea-8bd3-7bd51e425399', - name: 'system-1', - type: 'logs', - use_output: 'default', - meta: { - package: { - name: 'system', - version: '0.3.0', - }, - }, - dataset: { - namespace: 'default', - }, - streams: [ - { - id: 'logs-system.syslog', - dataset: { - name: 'system.syslog', - }, - paths: ['/var/log/messages*', '/var/log/syslog*'], - exclude_files: ['.gz$'], - multiline: { - pattern: '^\\s', - match: 'after', - }, - processors: [ - { - add_locale: null, - }, - { - add_fields: { - target: '', - fields: { - 'ecs.version': '1.5.0', - }, - }, - }, - ], - }, - ], - }, - ], - }, - }), - }, - }) - ); mockSavedObjectsClient.bulkGet.mockReturnValue( Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action3', references: [], type: AGENT_ACTION_SAVED_OBJECT_TYPE, attributes: { type: 'CONFIG_CHANGE', - agent_id: 'id', sent_at: '2020-03-14T19:45:02.620Z', timestamp: '2019-01-04T14:32:03.36764-05:00', created_at: '2020-03-14T19:45:02.620Z', + policy_id: 'policy1', + policy_revision: 99, }, }, ], - } as SavedObjectsBulkResponse) + } as SavedObjectsBulkResponse) ); await acknowledgeAgentActions( @@ -357,13 +152,13 @@ describe('test agent acks services', () => { type: 'ACTION_RESULT', subtype: 'CONFIG', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action1', + action_id: 'action3', agent_id: 'id', } as AgentEvent, ] ); expect(mockSavedObjectsClient.bulkUpdate).toBeCalled(); - expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(1); + expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(0); }); it('should fail for actions that cannot be found on agent actions list', async () => { @@ -372,7 +167,7 @@ describe('test agent acks services', () => { Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action4', error: { message: 'Not found', statusCode: 404, @@ -394,7 +189,7 @@ describe('test agent acks services', () => { type: 'ACTION_RESULT', subtype: 'CONFIG', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action2', + action_id: 'action4', agent_id: 'id', } as unknown) as AgentEvent, ] @@ -412,7 +207,7 @@ describe('test agent acks services', () => { Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action5', references: [], type: AGENT_ACTION_SAVED_OBJECT_TYPE, attributes: { @@ -439,7 +234,7 @@ describe('test agent acks services', () => { type: 'ACTION', subtype: 'FAILED', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action1', + action_id: 'action5', agent_id: 'id', } as unknown) as AgentEvent, ] diff --git a/x-pack/plugins/ingest_manager/server/services/agents/acks.ts b/x-pack/plugins/ingest_manager/server/services/agents/acks.ts index 87572ce405ee70..d29dfcec7ef307 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/acks.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/acks.ts @@ -11,14 +11,15 @@ import { SavedObjectsClientContract, } from 'src/core/server'; import Boom from 'boom'; +import LRU from 'lru-cache'; import { Agent, AgentAction, + AgentPolicyAction, AgentEvent, AgentEventSOAttributes, AgentSOAttributes, AgentActionSOAttributes, - FullAgentPolicy, } from '../../types'; import { AGENT_EVENT_SAVED_OBJECT_TYPE, @@ -30,11 +31,20 @@ import { forceUnenrollAgent } from './unenroll'; const ALLOWED_ACKNOWLEDGEMENT_TYPE: string[] = ['ACTION_RESULT']; +const actionCache = new LRU({ + max: 20, + maxAge: 10 * 60 * 1000, // 10 minutes +}); + export async function acknowledgeAgentActions( soClient: SavedObjectsClientContract, agent: Agent, agentEvents: AgentEvent[] ): Promise { + if (agentEvents.length === 0) { + return []; + } + for (const agentEvent of agentEvents) { if (!isAllowedType(agentEvent.type)) { throw Boom.badRequest(`${agentEvent.type} not allowed for acknowledgment only ACTION_RESULT`); @@ -45,9 +55,9 @@ export async function acknowledgeAgentActions( .map((event) => event.action_id) .filter((actionId) => actionId !== undefined) as string[]; - let actions; + let actions: AgentAction[]; try { - actions = await getAgentActionByIds(soClient, actionIds); + actions = await fetchActionsUsingCache(soClient, actionIds); } catch (error) { if (Boom.isBoom(error) && error.output.statusCode === 404) { throw Boom.badRequest(`One or more actions cannot be found`); @@ -55,65 +65,91 @@ export async function acknowledgeAgentActions( throw error; } + const agentActionsIds: string[] = []; for (const action of actions) { - if (action.agent_id !== agent.id) { + if (action.agent_id) { + agentActionsIds.push(action.id); + } + if (action.agent_id && action.agent_id !== agent.id) { throw Boom.badRequest(`${action.id} not found`); } } - if (actions.length === 0) { - return []; - } - const isAgentUnenrolled = actions.some((action) => action.type === 'UNENROLL'); if (isAgentUnenrolled) { await forceUnenrollAgent(soClient, agent.id); } - const agentPolicy = getLatestAgentPolicyIfUpdated(agent, actions); + const configChangeAction = getLatestConfigChangePolicyActionIfUpdated(agent, actions); await soClient.bulkUpdate([ - ...(agentPolicy ? [buildUpdateAgentPolicy(agent.id, agentPolicy)] : []), - ...buildUpdateAgentActionSentAt(actionIds), + ...(configChangeAction + ? [ + { + type: AGENT_SAVED_OBJECT_TYPE, + id: agent.id, + attributes: { + policy_revision: configChangeAction.policy_revision, + packages: configChangeAction?.ack_data?.packages, + }, + }, + ] + : []), + ...buildUpdateAgentActionSentAt(agentActionsIds), ]); return actions; } -function getLatestAgentPolicyIfUpdated(agent: Agent, actions: AgentAction[]) { - return actions.reduce((acc, action) => { - if (action.type !== 'CONFIG_CHANGE') { - return acc; - } - const data = action.data || {}; +async function fetchActionsUsingCache( + soClient: SavedObjectsClientContract, + actionIds: string[] +): Promise { + const missingActionIds: string[] = []; + const actions = actionIds + .map((actionId) => { + const action = actionCache.get(actionId); + if (!action) { + missingActionIds.push(actionId); + } + return action; + }) + .filter((action): action is AgentAction => action !== undefined); + + if (missingActionIds.length === 0) { + return actions; + } - if (data?.config?.id !== agent.policy_id) { - return acc; - } + const freshActions = await getAgentActionByIds(soClient, actionIds, false); + freshActions.forEach((action) => actionCache.set(action.id, action)); - const currentRevision = (acc && acc.revision) || agent.policy_revision || 0; + return [...freshActions, ...actions]; +} - return data?.config?.revision > currentRevision ? data?.config : acc; - }, null); +function isAgentPolicyAction(action: AgentAction | AgentPolicyAction): action is AgentPolicyAction { + return (action as AgentPolicyAction).policy_id !== undefined; } -function buildUpdateAgentPolicy(agentId: string, agentPolicy: FullAgentPolicy) { - const packages = agentPolicy.inputs.reduce((acc, input) => { - const packageName = input.meta?.package?.name; - if (packageName && acc.indexOf(packageName) < 0) { - return [packageName, ...acc]; +function getLatestConfigChangePolicyActionIfUpdated( + agent: Agent, + actions: Array +): AgentPolicyAction | null { + return actions.reduce((acc, action) => { + if ( + !isAgentPolicyAction(action) || + action.type !== 'CONFIG_CHANGE' || + action.policy_id !== agent.policy_id || + (acc?.policy_revision ?? 0) < (agent.policy_revision || 0) + ) { + return acc; } - return acc; - }, []); - return { - type: AGENT_SAVED_OBJECT_TYPE, - id: agentId, - attributes: { - policy_revision: agentPolicy.revision, - packages, - }, - }; + if (action.policy_revision > (acc?.policy_revision ?? 0)) { + return action; + } + + return acc; + }, null); } function buildUpdateAgentActionSentAt( diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts index c7390079523893..bcb3fc7fdc7bd4 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts @@ -22,7 +22,13 @@ describe('test agent actions services', () => { }; mockSavedObjectsClient.create.mockReturnValue( Promise.resolve({ - attributes: {}, + attributes: { + agent_id: 'agentid', + type: 'CONFIG_CHANGE', + data: JSON.stringify({ content: 'data' }), + sent_at: '2020-03-14T19:45:02.620Z', + created_at: '2020-03-14T19:45:02.620Z', + }, } as SavedObject) ); await createAgentAction(mockSavedObjectsClient, newAgentAction); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts index cd0dd921312306..8519714334986d 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts @@ -5,9 +5,20 @@ */ import { SavedObjectsClientContract } from 'kibana/server'; -import { Agent, AgentAction, AgentActionSOAttributes } from '../../../common/types/models'; +import { + Agent, + AgentAction, + AgentPolicyAction, + BaseAgentActionSOAttributes, + AgentActionSOAttributes, + AgentPolicyActionSOAttributes, +} from '../../../common/types/models'; import { AGENT_ACTION_SAVED_OBJECT_TYPE } from '../../../common/constants'; -import { savedObjectToAgentAction } from './saved_objects'; +import { + isAgentActionSavedObject, + isPolicyActionSavedObject, + savedObjectToAgentAction, +} from './saved_objects'; import { appContextService } from '../app_context'; import { nodeTypes } from '../../../../../../src/plugins/data/common'; @@ -15,15 +26,45 @@ export async function createAgentAction( soClient: SavedObjectsClientContract, newAgentAction: Omit ): Promise { - const so = await soClient.create(AGENT_ACTION_SAVED_OBJECT_TYPE, { + return createAction(soClient, newAgentAction); +} + +export function createAgentPolicyAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit +): Promise { + return createAction(soClient, newAgentAction); +} +async function createAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit +): Promise; +async function createAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit +): Promise; +async function createAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit | Omit +): Promise { + const so = await soClient.create(AGENT_ACTION_SAVED_OBJECT_TYPE, { ...newAgentAction, data: newAgentAction.data ? JSON.stringify(newAgentAction.data) : undefined, + ack_data: newAgentAction.ack_data ? JSON.stringify(newAgentAction.ack_data) : undefined, }); - const agentAction = savedObjectToAgentAction(so); - agentAction.data = newAgentAction.data; + if (isAgentActionSavedObject(so)) { + const agentAction = savedObjectToAgentAction(so); + agentAction.data = newAgentAction.data; + + return agentAction; + } else if (isPolicyActionSavedObject(so)) { + const agentAction = savedObjectToAgentAction(so); + agentAction.data = newAgentAction.data; - return agentAction; + return agentAction; + } + throw new Error('Invalid action'); } export async function getAgentActionsForCheckin( @@ -67,7 +108,8 @@ export async function getAgentActionsForCheckin( export async function getAgentActionByIds( soClient: SavedObjectsClientContract, - actionIds: string[] + actionIds: string[], + decryptData: boolean = true ) { const actions = ( await soClient.bulkGet( @@ -76,7 +118,11 @@ export async function getAgentActionByIds( type: AGENT_ACTION_SAVED_OBJECT_TYPE, })) ) - ).saved_objects.map(savedObjectToAgentAction); + ).saved_objects.map((action) => savedObjectToAgentAction(action)); + + if (!decryptData) { + return actions; + } return Promise.all( actions.map(async (action) => { @@ -93,6 +139,39 @@ export async function getAgentActionByIds( ); } +export async function getAgentPolicyActionByIds( + soClient: SavedObjectsClientContract, + actionIds: string[], + decryptData: boolean = true +) { + const actions = ( + await soClient.bulkGet( + actionIds.map((actionId) => ({ + id: actionId, + type: AGENT_ACTION_SAVED_OBJECT_TYPE, + })) + ) + ).saved_objects.map((action) => savedObjectToAgentAction(action)); + + if (!decryptData) { + return actions; + } + + return Promise.all( + actions.map(async (action) => { + // Get decrypted actions + return savedObjectToAgentAction( + await appContextService + .getEncryptedSavedObjects() + .getDecryptedAsInternalUser( + AGENT_ACTION_SAVED_OBJECT_TYPE, + action.id + ) + ); + }) + ); +} + export async function getNewActionsSince(soClient: SavedObjectsClientContract, timestamp: string) { const filter = nodeTypes.function.buildNode('and', [ nodeTypes.function.buildNode( @@ -116,7 +195,26 @@ export async function getNewActionsSince(soClient: SavedObjectsClientContract, t filter, }); - return res.saved_objects.map(savedObjectToAgentAction); + return res.saved_objects + .filter(isAgentActionSavedObject) + .map((so) => savedObjectToAgentAction(so)); +} + +export async function getLatestConfigChangeAction( + soClient: SavedObjectsClientContract, + policyId: string +) { + const res = await soClient.find({ + type: AGENT_ACTION_SAVED_OBJECT_TYPE, + search: policyId, + searchFields: ['policy_id'], + sortField: 'created_at', + sortOrder: 'DESC', + }); + + if (res.saved_objects[0]) { + return savedObjectToAgentAction(res.saved_objects[0]); + } } export interface ActionsService { @@ -124,6 +222,6 @@ export interface ActionsService { createAgentAction: ( soClient: SavedObjectsClientContract, - newAgentAction: AgentActionSOAttributes + newAgentAction: Omit ) => Promise; } diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index eddfb0e64b84b2..8f586420c3ecb7 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -5,6 +5,7 @@ */ import { timer, from, Observable, TimeoutError } from 'rxjs'; +import { omit } from 'lodash'; import { shareReplay, distinctUntilKeyChanged, @@ -16,14 +17,7 @@ import { take, } from 'rxjs/operators'; import { SavedObjectsClientContract, KibanaRequest } from 'src/core/server'; -import { - Agent, - AgentAction, - AgentSOAttributes, - AgentPolicy, - FullAgentPolicy, -} from '../../../types'; -import { agentPolicyService } from '../../agent_policy'; +import { Agent, AgentAction, AgentPolicyAction, AgentSOAttributes } from '../../../types'; import * as APIKeysService from '../../api_keys'; import { AGENT_SAVED_OBJECT_TYPE, @@ -31,7 +25,11 @@ import { AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS, AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL, } from '../../../constants'; -import { createAgentAction, getNewActionsSince } from '../actions'; +import { + getNewActionsSince, + getLatestConfigChangeAction, + getAgentPolicyActionByIds, +} from '../actions'; import { appContextService } from '../../app_context'; import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils'; @@ -54,27 +52,27 @@ function getInternalUserSOClient() { return appContextService.getInternalUserSOClient(fakeRequest); } -function createAgentPolicySharedObservable(agentPolicyId: string) { +function createNewActionsSharedObservable(): Observable { const internalSOClient = getInternalUserSOClient(); + return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( - switchMap(() => - from(agentPolicyService.get(internalSOClient, agentPolicyId) as Promise) - ), - distinctUntilKeyChanged('revision'), - switchMap((data) => - from(agentPolicyService.getFullAgentPolicy(internalSOClient, agentPolicyId)) - ), + switchMap(() => { + return from(getNewActionsSince(internalSOClient, new Date().toISOString())); + }), shareReplay({ refCount: true, bufferSize: 1 }) ); } -function createNewActionsSharedObservable(): Observable { - return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( - switchMap(() => { - const internalSOClient = getInternalUserSOClient(); +function createAgentPolicyActionSharedObservable(agentPolicyId: string) { + const internalSOClient = getInternalUserSOClient(); - return from(getNewActionsSince(internalSOClient, new Date().toISOString())); - }), + return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( + switchMap(() => from(getLatestConfigChangeAction(internalSOClient, agentPolicyId))), + filter((data): data is AgentPolicyAction => data !== undefined), + distinctUntilKeyChanged('id'), + switchMap((data) => + from(getAgentPolicyActionByIds(internalSOClient, [data.id]).then((r) => r[0])) + ), shareReplay({ refCount: true, bufferSize: 1 }) ); } @@ -102,47 +100,35 @@ async function getOrCreateAgentDefaultOutputAPIKey( return outputAPIKey.key; } -function shouldCreateAgentPolicyAction(agent: Agent, agentPolicy: FullAgentPolicy | null): boolean { - if (!agentPolicy || !agentPolicy.revision) { - return false; - } - const isAgentPolicyOutdated = - !agent.policy_revision || agent.policy_revision < agentPolicy.revision; - if (!isAgentPolicyOutdated) { - return false; - } - - return true; -} - -async function createAgentActionFromAgentPolicy( +async function createAgentActionFromPolicyAction( soClient: SavedObjectsClientContract, agent: Agent, - policy: FullAgentPolicy | null + policyAction: AgentPolicyAction ) { - // Deep clone !not supporting Date, and undefined value. - const newAgentPolicy = JSON.parse(JSON.stringify(policy)); + const newAgentAction: AgentAction = Object.assign( + omit( + // Faster than clone + JSON.parse(JSON.stringify(policyAction)) as AgentPolicyAction, + 'policy_id', + 'policy_revision' + ), + { + agent_id: agent.id, + } + ); // Mutate the policy to set the api token for this agent - newAgentPolicy.outputs.default.api_key = await getOrCreateAgentDefaultOutputAPIKey( + newAgentAction.data.config.outputs.default.api_key = await getOrCreateAgentDefaultOutputAPIKey( soClient, agent ); - const policyChangeAction = await createAgentAction(soClient, { - agent_id: agent.id, - type: 'CONFIG_CHANGE', - data: { config: newAgentPolicy } as any, - created_at: new Date().toISOString(), - sent_at: undefined, - }); - - return [policyChangeAction]; + return [newAgentAction]; } export function agentCheckinStateNewActionsFactory() { // Shared Observables - const agentPolicies$ = new Map>(); + const agentPolicies$ = new Map>(); const newActions$ = createNewActionsSharedObservable(); // Rx operators const rateLimiter = createRateLimiter( @@ -162,7 +148,7 @@ export function agentCheckinStateNewActionsFactory() { } const agentPolicyId = agent.policy_id; if (!agentPolicies$.has(agentPolicyId)) { - agentPolicies$.set(agentPolicyId, createAgentPolicySharedObservable(agentPolicyId)); + agentPolicies$.set(agentPolicyId, createAgentPolicyActionSharedObservable(agentPolicyId)); } const agentPolicy$ = agentPolicies$.get(agentPolicyId); if (!agentPolicy$) { @@ -174,15 +160,22 @@ export function agentCheckinStateNewActionsFactory() { // Set a timeout 3s before the real timeout to have a chance to respond an empty response before socket timeout Math.max((appContextService.getConfig()?.fleet.pollingRequestTimeout ?? 0) - 3000, 3000) ), - filter((agentPolicy) => shouldCreateAgentPolicyAction(agent, agentPolicy)), + filter( + (action) => + agent.policy_id !== undefined && + action.policy_revision !== undefined && + action.policy_id !== undefined && + action.policy_id === agent.policy_id && + (!agent.policy_revision || action.policy_revision > agent.policy_revision) + ), rateLimiter(), - mergeMap((agentPolicy) => createAgentActionFromAgentPolicy(soClient, agent, agentPolicy)), + mergeMap((policyAction) => createAgentActionFromPolicyAction(soClient, agent, policyAction)), merge(newActions$), mergeMap(async (data) => { if (!data) { return; } - const newActions = data.filter((action) => action.agent_id); + const newActions = data.filter((action) => action.agent_id === agent.id); if (newActions.length === 0) { return; } diff --git a/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts b/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts index 2ab5cc8139f69f..3ae664c086da99 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts @@ -6,7 +6,15 @@ import Boom from 'boom'; import { SavedObject } from 'src/core/server'; -import { Agent, AgentSOAttributes, AgentAction, AgentActionSOAttributes } from '../../types'; +import { + Agent, + AgentSOAttributes, + AgentAction, + AgentPolicyAction, + AgentActionSOAttributes, + AgentPolicyActionSOAttributes, + BaseAgentActionSOAttributes, +} from '../../types'; export function savedObjectToAgent(so: SavedObject): Agent { if (so.error) { @@ -27,7 +35,13 @@ export function savedObjectToAgent(so: SavedObject): Agent { }; } -export function savedObjectToAgentAction(so: SavedObject): AgentAction { +export function savedObjectToAgentAction(so: SavedObject): AgentAction; +export function savedObjectToAgentAction( + so: SavedObject +): AgentPolicyAction; +export function savedObjectToAgentAction( + so: SavedObject +): AgentAction | AgentPolicyAction { if (so.error) { if (so.error.statusCode === 404) { throw Boom.notFound(so.error.message); @@ -36,9 +50,42 @@ export function savedObjectToAgentAction(so: SavedObject +): so is SavedObject { + return (so.attributes as AgentActionSOAttributes).agent_id !== undefined; +} + +export function isPolicyActionSavedObject( + so: SavedObject +): so is SavedObject { + return (so.attributes as AgentPolicyActionSOAttributes).policy_id !== undefined; +} diff --git a/x-pack/plugins/ingest_manager/server/services/setup.ts b/x-pack/plugins/ingest_manager/server/services/setup.ts index ec3a05a4fa390d..f02057bae15981 100644 --- a/x-pack/plugins/ingest_manager/server/services/setup.ts +++ b/x-pack/plugins/ingest_manager/server/services/setup.ts @@ -170,6 +170,12 @@ export async function setupFleet( }); }) ); + + await Promise.all( + agentPolicies.map((agentPolicy) => + agentPolicyService.createFleetPolicyChangeAction(soClient, agentPolicy.id) + ) + ); } function generateRandomPassword() { diff --git a/x-pack/plugins/ingest_manager/server/types/index.tsx b/x-pack/plugins/ingest_manager/server/types/index.tsx index 2746dfcd00ce37..d00491afef72b1 100644 --- a/x-pack/plugins/ingest_manager/server/types/index.tsx +++ b/x-pack/plugins/ingest_manager/server/types/index.tsx @@ -16,7 +16,10 @@ export { AgentEvent, AgentEventSOAttributes, AgentAction, + AgentPolicyAction, + BaseAgentActionSOAttributes, AgentActionSOAttributes, + AgentPolicyActionSOAttributes, PackagePolicy, PackagePolicyInput, PackagePolicyInputStream, diff --git a/x-pack/plugins/ingest_manager/server/types/models/agent.ts b/x-pack/plugins/ingest_manager/server/types/models/agent.ts index 5ad98cfd406226..b249705fe6c2fd 100644 --- a/x-pack/plugins/ingest_manager/server/types/models/agent.ts +++ b/x-pack/plugins/ingest_manager/server/types/models/agent.ts @@ -62,12 +62,7 @@ export const AgentEventSchema = schema.object({ }); export const NewAgentActionSchema = schema.object({ - type: schema.oneOf([ - schema.literal('CONFIG_CHANGE'), - schema.literal('DATA_DUMP'), - schema.literal('RESUME'), - schema.literal('PAUSE'), - ]), + type: schema.oneOf([schema.literal('CONFIG_CHANGE'), schema.literal('UNENROLL')]), data: schema.maybe(schema.any()), sent_at: schema.maybe(schema.string()), }); diff --git a/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts b/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts index 2b4bb335dfc5c9..68e02933f56500 100644 --- a/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts +++ b/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts @@ -28,13 +28,12 @@ export default function (providerContext: FtrProviderContext) { action: { type: 'CONFIG_CHANGE', data: { data: 'action_data' }, - sent_at: '2020-03-18T19:45:02.620Z', }, }) .expect(200); + expect(apiResponse.item.type).to.eql('CONFIG_CHANGE'); expect(apiResponse.item.data).to.eql({ data: 'action_data' }); - expect(apiResponse.item.sent_at).to.be('2020-03-18T19:45:02.620Z'); }); it('should return a 400 when request does not have type information', async () => {