Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search Sessions] Monitoring hardening part 1 #96196

Merged
merged 11 commits into from
Apr 7, 2021
2 changes: 1 addition & 1 deletion x-pack/plugins/data_enhanced/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export const configSchema = schema.object({
* pageSize controls how many search session objects we load at once while monitoring
* session completion
*/
pageSize: schema.number({ defaultValue: 10000 }),
pageSize: schema.number({ defaultValue: 100 }),
/**
* trackingInterval controls how often we track search session objects progress
*/
Expand Down
29 changes: 7 additions & 22 deletions x-pack/plugins/data_enhanced/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@
*/

import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from 'kibana/server';
import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server';
import {
PluginSetup as DataPluginSetup,
PluginStart as DataPluginStart,
usageProvider,
} from '../../../../src/plugins/data/server';
import { UsageCollectionSetup } from '../../../../src/plugins/usage_collection/server';
import { usageProvider } from '../../../../src/plugins/data/server';
import { ENHANCED_ES_SEARCH_STRATEGY, EQL_SEARCH_STRATEGY } from '../common';
import { registerSessionRoutes } from './routes';
import { searchSessionSavedObjectType } from './saved_objects';
Expand All @@ -22,22 +16,13 @@ import {
eqlSearchStrategyProvider,
} from './search';
import { getUiSettings } from './ui_settings';
import type { DataEnhancedRequestHandlerContext } from './type';
import type {
DataEnhancedRequestHandlerContext,
DataEnhancedSetupDependencies as SetupDependencies,
DataEnhancedStartDependencies as StartDependencies,
} from './type';
import { ConfigSchema } from '../config';
import { registerUsageCollector } from './collectors';
import { SecurityPluginSetup } from '../../security/server';

interface SetupDependencies {
data: DataPluginSetup;
usageCollection?: UsageCollectionSetup;
taskManager: TaskManagerSetupContract;
security?: SecurityPluginSetup;
}

export interface StartDependencies {
data: DataPluginStart;
taskManager: TaskManagerStartContract;
}

export class EnhancedDataServerPlugin
implements Plugin<void, void, SetupDependencies, StartDependencies> {
Expand All @@ -50,7 +35,7 @@ export class EnhancedDataServerPlugin
this.config = this.initializerContext.config.get<ConfigSchema>();
}

public setup(core: CoreSetup<DataPluginStart>, deps: SetupDependencies) {
public setup(core: CoreSetup<StartDependencies>, deps: SetupDependencies) {
const usage = deps.usageCollection ? usageProvider(core) : undefined;

core.uiSettings.register(getUiSettings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
} from 'kibana/server';
import moment from 'moment';
import { EMPTY, from } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';
import { expand, concatMap } from 'rxjs/operators';
import { nodeBuilder } from '../../../../../../src/plugins/data/common';
import {
ENHANCED_ES_SEARCH_STRATEGY,
Expand Down Expand Up @@ -154,7 +154,7 @@ export async function checkRunningSessions(
try {
await getAllSavedSearchSessions$(deps, config)
.pipe(
mergeMap(async (runningSearchSessionsResponse) => {
concatMap(async (runningSearchSessionsResponse) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned this in the other issue as well, but what are we hoping to accomplish by processing these serially rather than in parallel? The main issue of concern seemed to be that the update request is too large, so the only benefits I can see with serially are a decrease in CPU/memory, but I'm not sure if that will outweigh the fact that this change may also make the process take much longer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole idea of paging the sessions was to update the items in reasonable amounts and in orderly fashion.
With mergeMap we weren't doing that.

I think that serializing bulks of 100 search sessions makes sense, but it's going to be hard to quantify the impact.
Not 100% the same (more related to reducing the page size), but the article @Dosant linked convinced me https://eng.lifion.com/promise-allpocalypse-cfb6741298a7?gi=2c3ce0dba6ea

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukasolson, I think there is also a difference between grouping per 100 requests or sending all of them to ES? with concatMap we spread out the load.

Dosant marked this conversation as resolved.
Show resolved Hide resolved
if (!runningSearchSessionsResponse.total) return;

logger.debug(`Found ${runningSearchSessionsResponse.total} running sessions`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { checkRunningSessions } from './check_running_sessions';
import { CoreSetup, SavedObjectsClient, Logger } from '../../../../../../src/core/server';
import { ConfigSchema } from '../../../config';
import { SEARCH_SESSION_TYPE } from '../../../common';
import { DataEnhancedStartDependencies } from '../../type';

export const SEARCH_SESSIONS_TASK_TYPE = 'search_sessions_monitor';
export const SEARCH_SESSIONS_TASK_ID = `data_enhanced_${SEARCH_SESSIONS_TASK_TYPE}`;
Expand All @@ -25,12 +26,19 @@ interface SearchSessionTaskDeps {
config: ConfigSchema;
}

function searchSessionRunner(core: CoreSetup, { logger, config }: SearchSessionTaskDeps) {
function searchSessionRunner(
core: CoreSetup<DataEnhancedStartDependencies>,
{ logger, config }: SearchSessionTaskDeps
) {
return ({ taskInstance }: RunContext) => {
return {
async run() {
const sessionConfig = config.search.sessions;
const [coreStart] = await core.getStartServices();
if (!sessionConfig.enabled) {
logger.debug('Search sessions are disabled. Skipping task.');
return;
}
const internalRepo = coreStart.savedObjects.createInternalRepository([SEARCH_SESSION_TYPE]);
const internalSavedObjectsClient = new SavedObjectsClient(internalRepo);
await checkRunningSessions(
Expand All @@ -50,7 +58,10 @@ function searchSessionRunner(core: CoreSetup, { logger, config }: SearchSessionT
};
}

export function registerSearchSessionsTask(core: CoreSetup, deps: SearchSessionTaskDeps) {
export function registerSearchSessionsTask(
core: CoreSetup<DataEnhancedStartDependencies>,
deps: SearchSessionTaskDeps
) {
deps.taskManager.registerTaskDefinitions({
[SEARCH_SESSIONS_TASK_TYPE]: {
title: 'Search Sessions Monitor',
Expand All @@ -59,6 +70,18 @@ export function registerSearchSessionsTask(core: CoreSetup, deps: SearchSessionT
});
}

export async function unscheduleSearchSessionsTask(
taskManager: TaskManagerStartContract,
logger: Logger
) {
try {
await taskManager.removeIfExists(SEARCH_SESSIONS_TASK_ID);
logger.debug(`Search sessions cleared`);
} catch (e) {
logger.error(`Error clearing task, received ${e.message}`);
}
}

export async function scheduleSearchSessionsTasks(
taskManager: TaskManagerStartContract,
logger: Logger,
Expand All @@ -79,6 +102,6 @@ export async function scheduleSearchSessionsTasks(

logger.debug(`Search sessions task, scheduled to run`);
} catch (e) {
logger.debug(`Error scheduling task, received ${e.message}`);
logger.error(`Error scheduling task, received ${e.message}`);
}
}
Loading