Skip to content

Commit

Permalink
[Search Sessions] Implement cancel on search session monitoring task,…
Browse files Browse the repository at this point in the history
… fetch and process sessions page by page (#96321)
  • Loading branch information
Dosant committed Apr 12, 2021
1 parent 171f398 commit c4b3dfd
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 105 deletions.
7 changes: 7 additions & 0 deletions x-pack/plugins/data_enhanced/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ export const configSchema = schema.object({
* trackingInterval controls how often we track search session objects progress
*/
trackingInterval: schema.duration({ defaultValue: '10s' }),

/**
* monitoringTaskTimeout controls for how long task manager waits for search session monitoring task to complete before considering it timed out,
* If tasks timeouts it receives cancel signal and next task starts in "trackingInterval" time
*/
monitoringTaskTimeout: schema.duration({ defaultValue: '5m' }),

/**
* notTouchedTimeout controls how long do we store unpersisted search session results,
* after the last search in the session has completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
* 2.0.
*/

import { checkRunningSessions } from './check_running_sessions';
import {
checkRunningSessions as checkRunningSessions$,
CheckRunningSessionsDeps,
} from './check_running_sessions';
import {
SearchSessionStatus,
SearchSessionSavedObjectAttributes,
Expand All @@ -20,6 +23,13 @@ import {
SavedObjectsDeleteOptions,
SavedObjectsClientContract,
} from '../../../../../../src/core/server';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

jest.useFakeTimers();

const checkRunningSessions = (deps: CheckRunningSessionsDeps, config: SearchSessionsConfig) =>
checkRunningSessions$(deps, config).toPromise();

describe('getSearchStatus', () => {
let mockClient: any;
Expand All @@ -32,6 +42,7 @@ describe('getSearchStatus', () => {
maxUpdateRetries: 3,
defaultExpiration: moment.duration(7, 'd'),
trackingInterval: moment.duration(10, 's'),
monitoringTaskTimeout: moment.duration(5, 'm'),
management: {} as any,
};
const mockLogger: any = {
Expand All @@ -41,11 +52,13 @@ describe('getSearchStatus', () => {
};

const emptySO = {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(10, 's')),
idMapping: {},
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(10, 's')),
idMapping: {},
},
};

beforeEach(() => {
Expand Down Expand Up @@ -171,6 +184,118 @@ describe('getSearchStatus', () => {

expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);
});

test('fetching is abortable', async () => {
let i = 0;
const abort$ = new Subject();
savedObjectsClient.find.mockImplementation(() => {
return new Promise((resolve) => {
if (++i === 2) {
abort$.next();
}
resolve({
saved_objects: i <= 5 ? [emptySO, emptySO, emptySO, emptySO, emptySO] : [],
total: 25,
page: i,
} as any);
});
});

await checkRunningSessions$(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
)
.pipe(takeUntil(abort$))
.toPromise();

jest.runAllTimers();

// if not for `abort$` then this would be called 6 times!
expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);
});

test('sorting is by "touched"', async () => {
savedObjectsClient.find.mockResolvedValueOnce({
saved_objects: [],
total: 0,
} as any);

await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);

expect(savedObjectsClient.find).toHaveBeenCalledWith(
expect.objectContaining({ sortField: 'touched', sortOrder: 'asc' })
);
});

test('sessions fetched in the beginning are processed even if sessions in the end fail', async () => {
let i = 0;
savedObjectsClient.find.mockImplementation(() => {
return new Promise((resolve, reject) => {
if (++i === 2) {
reject(new Error('Fake find error...'));
}
resolve({
saved_objects:
i <= 5
? [
i === 1
? {
id: '123',
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(2, 'm')),
idMapping: {
'map-key': {
strategy: ENHANCED_ES_SEARCH_STRATEGY,
id: 'async-id',
},
},
},
}
: emptySO,
emptySO,
emptySO,
emptySO,
emptySO,
]
: [],
total: 25,
page: i,
} as any);
});
});

await checkRunningSessions$(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
).toPromise();

jest.runAllTimers();

expect(savedObjectsClient.find).toHaveBeenCalledTimes(2);

// by checking that delete was called we validate that sessions from session that were successfully fetched were processed
expect(mockClient.asyncSearch.delete).toBeCalled();
const { id } = mockClient.asyncSearch.delete.mock.calls[0][0];
expect(id).toBe('async-id');
});
});

describe('delete', () => {
Expand Down
Loading

0 comments on commit c4b3dfd

Please sign in to comment.