From 8c8323abfd5863825a001081ed090581b223d5b7 Mon Sep 17 00:00:00 2001 From: Anton Dosov Date: Mon, 5 Apr 2021 14:01:24 +0200 Subject: [PATCH] [Search Sessions] fix updating deleting sessions from non-default space (#96123) * add spaces test * fix updating and deleting sessions in non-default space * revert back to batch update * Add space tests Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Liza K --- .../session/check_running_sessions.test.ts | 90 ++++++++++- .../search/session/check_running_sessions.ts | 43 ++++- .../api_integration/apis/search/session.ts | 153 ++++++++++++++++++ 3 files changed, 278 insertions(+), 8 deletions(-) diff --git a/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.test.ts b/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.test.ts index f9c62069154b68..2611f6c9da19fa 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.test.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.test.ts @@ -13,9 +13,13 @@ import { EQL_SEARCH_STRATEGY, } from '../../../common'; import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks'; -import type { SavedObjectsClientContract } from 'kibana/server'; import { SearchSessionsConfig, SearchStatus } from './types'; import moment from 'moment'; +import { + SavedObjectsBulkUpdateObject, + SavedObjectsDeleteOptions, + SavedObjectsClientContract, +} from '../../../../../../src/core/server'; describe('getSearchStatus', () => { let mockClient: any; @@ -263,6 +267,45 @@ describe('getSearchStatus', () => { expect(savedObjectsClient.delete).not.toBeCalled(); }); + test('deletes in space', async () => { + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [ + { + id: '123', + namespaces: ['awesome'], + attributes: { + persisted: false, + status: SearchSessionStatus.IN_PROGRESS, + created: moment().subtract(moment.duration(3, 'm')), + touched: moment().subtract(moment.duration(2, 'm')), + idMapping: { + 'map-key': { + strategy: ENHANCED_ES_SEARCH_STRATEGY, + id: 'async-id', + }, + }, + }, + }, + ], + total: 1, + } as any); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + + expect(savedObjectsClient.delete).toBeCalled(); + + const [, id, opts] = savedObjectsClient.delete.mock.calls[0]; + expect(id).toBe('123'); + expect((opts as SavedObjectsDeleteOptions).namespace).toBe('awesome'); + }); + test('deletes a non persisted, abandoned session', async () => { savedObjectsClient.find.mockResolvedValue({ saved_objects: [ @@ -479,6 +522,50 @@ describe('getSearchStatus', () => { expect(savedObjectsClient.delete).not.toBeCalled(); }); + test('updates in space', async () => { + savedObjectsClient.bulkUpdate = jest.fn(); + const so = { + namespaces: ['awesome'], + attributes: { + status: SearchSessionStatus.IN_PROGRESS, + touched: '123', + idMapping: { + 'search-hash': { + id: 'search-id', + strategy: 'cool', + status: SearchStatus.IN_PROGRESS, + }, + }, + }, + }; + savedObjectsClient.find.mockResolvedValue({ + saved_objects: [so], + total: 1, + } as any); + + mockClient.asyncSearch.status.mockResolvedValue({ + body: { + is_partial: false, + is_running: false, + completion_status: 200, + }, + }); + + await checkRunningSessions( + { + savedObjectsClient, + client: mockClient, + logger: mockLogger, + }, + config + ); + + expect(mockClient.asyncSearch.status).toBeCalledWith({ id: 'search-id' }); + const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0]; + const updatedAttributes = updateInput[0] as SavedObjectsBulkUpdateObject; + expect(updatedAttributes.namespace).toBe('awesome'); + }); + test('updates to complete if the search is done', async () => { savedObjectsClient.bulkUpdate = jest.fn(); const so = { @@ -563,7 +650,6 @@ describe('getSearchStatus', () => { config ); const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0]; - const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes; expect(updatedAttributes.status).toBe(SearchSessionStatus.ERROR); expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.ERROR); diff --git a/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts b/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts index e521c39d7cfd31..6e52b17f368030 100644 --- a/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts +++ b/x-pack/plugins/data_enhanced/server/search/session/check_running_sessions.ts @@ -10,6 +10,7 @@ import { Logger, SavedObjectsClientContract, SavedObjectsFindResult, + SavedObjectsUpdateResponse, } from 'kibana/server'; import moment from 'moment'; import { EMPTY, from } from 'rxjs'; @@ -169,12 +170,20 @@ export async function checkRunningSessions( if (!session.attributes.persisted) { if (isSessionStale(session, config, logger)) { - deleted = true; // delete saved object to free up memory // TODO: there's a potential rare edge case of deleting an object and then receiving a new trackId for that same session! // Maybe we want to change state to deleted and cleanup later? logger.debug(`Deleting stale session | ${session.id}`); - await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id); + try { + await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id, { + namespace: session.namespaces?.[0], + }); + deleted = true; + } catch (e) { + logger.error( + `Error while deleting stale search session ${session.id}: ${e.message}` + ); + } // Send a delete request for each async search to ES Object.keys(session.attributes.idMapping).map(async (searchKey: string) => { @@ -183,8 +192,8 @@ export async function checkRunningSessions( try { await client.asyncSearch.delete({ id: searchInfo.id }); } catch (e) { - logger.debug( - `Error ignored while deleting async_search ${searchInfo.id}: ${e.message}` + logger.error( + `Error while deleting async_search ${searchInfo.id}: ${e.message}` ); } } @@ -202,9 +211,31 @@ export async function checkRunningSessions( if (updatedSessions.length) { // If there's an error, we'll try again in the next iteration, so there's no need to check the output. const updatedResponse = await savedObjectsClient.bulkUpdate( - updatedSessions + updatedSessions.map((session) => ({ + ...session, + namespace: session.namespaces?.[0], + })) + ); + + const success: Array< + SavedObjectsUpdateResponse + > = []; + const fail: Array> = []; + + updatedResponse.saved_objects.forEach((savedObjectResponse) => { + if ('error' in savedObjectResponse) { + fail.push(savedObjectResponse); + logger.error( + `Error while updating search session ${savedObjectResponse?.id}: ${savedObjectResponse.error?.message}` + ); + } else { + success.push(savedObjectResponse); + } + }); + + logger.debug( + `Updating search sessions: success: ${success.length}, fail: ${fail.length}` ); - logger.debug(`Updated ${updatedResponse.saved_objects.length} search sessions`); } }) ) diff --git a/x-pack/test/api_integration/apis/search/session.ts b/x-pack/test/api_integration/apis/search/session.ts index 50bc85ed1e793e..63a6a842fd9f71 100644 --- a/x-pack/test/api_integration/apis/search/session.ts +++ b/x-pack/test/api_integration/apis/search/session.ts @@ -14,6 +14,7 @@ export default function ({ getService }: FtrProviderContext) { const supertestWithoutAuth = getService('supertestWithoutAuth'); const security = getService('security'); const retry = getService('retry'); + const spacesService = getService('spaces'); describe('search session', () => { describe('session management', () => { @@ -596,5 +597,157 @@ export default function ({ getService }: FtrProviderContext) { .expect(403); }); }); + + describe('in non-default space', () => { + const spaceId = 'foo-space'; + before(async () => { + try { + await spacesService.create({ + id: spaceId, + name: 'Foo Space', + }); + } catch { + // might already be created + } + }); + + after(async () => { + await spacesService.delete(spaceId); + }); + + it('should complete and delete non-persistent sessions', async () => { + const sessionId = `my-session-${Math.random()}`; + + // run search + const searchRes = await supertest + .post(`/s/${spaceId}/internal/search/ese`) + .set('kbn-xsrf', 'foo') + .send({ + sessionId, + params: { + body: { + query: { + term: { + agent: '1', + }, + }, + }, + wait_for_completion_timeout: '1ms', + }, + }) + .expect(200); + + const { id } = searchRes.body; + + await retry.waitForWithTimeout('searches persisted into session', 5000, async () => { + const resp = await supertest + .get(`/s/${spaceId}/internal/session/${sessionId}`) + .set('kbn-xsrf', 'foo') + .expect(200); + + const { touched, created, persisted, idMapping } = resp.body.attributes; + expect(persisted).to.be(false); + expect(touched).not.to.be(undefined); + expect(created).not.to.be(undefined); + + const idMappings = Object.values(idMapping).map((value: any) => value.id); + expect(idMappings).to.contain(id); + return true; + }); + + // not touched timeout in tests is 15s, wait to give a chance for status to update + await new Promise((resolve) => + setTimeout(() => { + resolve(void 0); + }, 15_000) + ); + + await retry.waitForWithTimeout( + 'searches eventually complete and session gets into the complete state', + 30_000, + async () => { + await supertest + .get(`/s/${spaceId}/internal/session/${sessionId}`) + .set('kbn-xsrf', 'foo') + .expect(404); + + return true; + } + ); + }); + + it('should complete persisten session', async () => { + const sessionId = `my-session-${Math.random()}`; + + // run search + const searchRes = await supertest + .post(`/s/${spaceId}/internal/search/ese`) + .set('kbn-xsrf', 'foo') + .send({ + sessionId, + params: { + body: { + query: { + term: { + agent: '1', + }, + }, + }, + wait_for_completion_timeout: '1ms', + }, + }) + .expect(200); + + const { id } = searchRes.body; + + // persist session + await supertest + .post(`/s/${spaceId}/internal/session`) + .set('kbn-xsrf', 'foo') + .send({ + sessionId, + name: 'My Session', + appId: 'discover', + expires: '123', + urlGeneratorId: 'discover', + }) + .expect(200); + + await retry.waitForWithTimeout('searches persisted into session', 5000, async () => { + const resp = await supertest + .get(`/s/${spaceId}/internal/session/${sessionId}`) + .set('kbn-xsrf', 'foo') + .expect(200); + + const { touched, created, persisted, idMapping } = resp.body.attributes; + expect(persisted).to.be(true); + expect(touched).not.to.be(undefined); + expect(created).not.to.be(undefined); + + const idMappings = Object.values(idMapping).map((value: any) => value.id); + expect(idMappings).to.contain(id); + return true; + }); + + // session refresh interval is 5 seconds, wait to give a chance for status to update + await new Promise((resolve) => setTimeout(resolve, 5000)); + + await retry.waitForWithTimeout( + 'searches eventually complete and session gets into the complete state', + 5000, + async () => { + const resp = await supertest + .get(`/s/${spaceId}/internal/session/${sessionId}`) + .set('kbn-xsrf', 'foo') + .expect(200); + + const { status } = resp.body.attributes; + + expect(status).to.be(SearchSessionStatus.COMPLETE); + return true; + } + ); + }); + }); }); }