Skip to content

Commit

Permalink
[Search Sessions] Monitoring hardening part 1 (elastic#96196)
Browse files Browse the repository at this point in the history
* Decrease default pageSize to 100
Set default strategy
Don't create sessions when disabled
Clear monitoring task when disabled
Use concatMap to serialize session checkup

* ts

* ts

* ts

* Update x-pack/plugins/data_enhanced/server/search/session/session_service.ts

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>

* Search sessions are disabled

* Clear task on server start

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>
  • Loading branch information
2 people authored and kibanamachine committed Apr 7, 2021
1 parent 11e86ad commit 5ea4192
Show file tree
Hide file tree
Showing 7 changed files with 959 additions and 834 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugins/data_enhanced/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export const configSchema = schema.object({
* pageSize controls how many search session objects we load at once while monitoring
* session completion
*/
pageSize: schema.number({ defaultValue: 10000 }),
pageSize: schema.number({ defaultValue: 100 }),
/**
* trackingInterval controls how often we track search session objects progress
*/
Expand Down
29 changes: 7 additions & 22 deletions x-pack/plugins/data_enhanced/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@
*/

import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from 'kibana/server';
import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server';
import {
PluginSetup as DataPluginSetup,
PluginStart as DataPluginStart,
usageProvider,
} from '../../../../src/plugins/data/server';
import { UsageCollectionSetup } from '../../../../src/plugins/usage_collection/server';
import { usageProvider } from '../../../../src/plugins/data/server';
import { ENHANCED_ES_SEARCH_STRATEGY, EQL_SEARCH_STRATEGY } from '../common';
import { registerSessionRoutes } from './routes';
import { searchSessionMapping } from './saved_objects';
Expand All @@ -22,22 +16,13 @@ import {
eqlSearchStrategyProvider,
} from './search';
import { getUiSettings } from './ui_settings';
import type { DataEnhancedRequestHandlerContext } from './type';
import type {
DataEnhancedRequestHandlerContext,
DataEnhancedSetupDependencies as SetupDependencies,
DataEnhancedStartDependencies as StartDependencies,
} from './type';
import { ConfigSchema } from '../config';
import { registerUsageCollector } from './collectors';
import { SecurityPluginSetup } from '../../security/server';

interface SetupDependencies {
data: DataPluginSetup;
usageCollection?: UsageCollectionSetup;
taskManager: TaskManagerSetupContract;
security?: SecurityPluginSetup;
}

export interface StartDependencies {
data: DataPluginStart;
taskManager: TaskManagerStartContract;
}

export class EnhancedDataServerPlugin
implements Plugin<void, void, SetupDependencies, StartDependencies> {
Expand All @@ -50,7 +35,7 @@ export class EnhancedDataServerPlugin
this.config = this.initializerContext.config.get<ConfigSchema>();
}

public setup(core: CoreSetup<DataPluginStart>, deps: SetupDependencies) {
public setup(core: CoreSetup<StartDependencies>, deps: SetupDependencies) {
const usage = deps.usageCollection ? usageProvider(core) : undefined;

core.uiSettings.register(getUiSettings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
} from 'kibana/server';
import moment from 'moment';
import { EMPTY, from } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';
import { expand, concatMap } from 'rxjs/operators';
import { nodeBuilder } from '../../../../../../src/plugins/data/common';
import {
SearchSessionStatus,
Expand Down Expand Up @@ -148,7 +148,7 @@ export async function checkRunningSessions(
try {
await getAllSavedSearchSessions$(deps, config)
.pipe(
mergeMap(async (runningSearchSessionsResponse) => {
concatMap(async (runningSearchSessionsResponse) => {
if (!runningSearchSessionsResponse.total) return;

logger.debug(`Found ${runningSearchSessionsResponse.total} running sessions`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { checkRunningSessions } from './check_running_sessions';
import { CoreSetup, SavedObjectsClient, Logger } from '../../../../../../src/core/server';
import { ConfigSchema } from '../../../config';
import { SEARCH_SESSION_TYPE } from '../../../common';
import { DataEnhancedStartDependencies } from '../../type';

export const SEARCH_SESSIONS_TASK_TYPE = 'search_sessions_monitor';
export const SEARCH_SESSIONS_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_TASK_TYPE}`;
Expand All @@ -25,12 +26,19 @@ interface SearchSessionTaskDeps {
config: ConfigSchema;
}

function searchSessionRunner(core: CoreSetup, { logger, config }: SearchSessionTaskDeps) {
function searchSessionRunner(
core: CoreSetup<DataEnhancedStartDependencies>,
{ logger, config }: SearchSessionTaskDeps
) {
return ({ taskInstance }: RunContext) => {
return {
async run() {
const sessionConfig = config.search.sessions;
const [coreStart] = await core.getStartServices();
if (!sessionConfig.enabled) {
logger.debug('Search sessions are disabled. Skipping task.');
return;
}
const internalRepo = coreStart.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]);
const internalSavedObjectsClient = new SavedObjectsClient(internalRepo);
await checkRunningSessions(
Expand All @@ -50,7 +58,10 @@ function searchSessionRunner(core: CoreSetup, { logger, config }: SearchSessionT
};
}

export function registerSearchSessionsTask(core: CoreSetup, deps: SearchSessionTaskDeps) {
export function registerSearchSessionsTask(
core: CoreSetup<DataEnhancedStartDependencies>,
deps: SearchSessionTaskDeps
) {
deps.taskManager.registerTaskDefinitions({
[SEARCH_SESSIONS_TASK_TYPE]: {
title: 'Search Sessions Monitor',
Expand All @@ -59,6 +70,18 @@ export function registerSearchSessionsTask(core: CoreSetup, deps: SearchSessionT
});
}

export async function unscheduleSearchSessionsTask(
taskManager: TaskManagerStartContract,
logger: Logger
) {
try {
await taskManager.removeIfExists(SEARCH_SESSIONS_TASK_ID);
logger.debug(`Search sessions cleared`);
} catch (e) {
logger.error(`Error clearing task, received ${e.message}`);
}
}

export async function scheduleSearchSessionsTasks(
taskManager: TaskManagerStartContract,
logger: Logger,
Expand All @@ -79,6 +102,6 @@ export async function scheduleSearchSessionsTasks(

logger.debug(`Search sessions task, scheduled to run`);
} catch (e) {
logger.debug(`Error scheduling task, received ${e.message}`);
logger.error(`Error scheduling task, received ${e.message}`);
}
}
Loading

0 comments on commit 5ea4192

Please sign in to comment.