Skip to content

Commit

Permalink
[Alerting] retry internal OCC calls within alertsClient
Browse files Browse the repository at this point in the history
During development of elastic#75553,
some issues came up with the optimistic concurrency control (OCC) we
were using internally within the alertsClient, via the `version`
option/property of the saved object. The referenced PR updates new
fields in the alert from the taskManager task after the alertType
executor runs. In some alertsClient methods, OCC is used to update
the alert which are requested via user requests. And so in some
cases, version conflict errors were coming up when the alert was
updated by task manager, in the middle of one of these methods. Note:
the SIEM function test cases stress test this REALLY well.

In this PR, we wrap all the methods using OCC with a function that
will retry them, a short number of times, with a short delay in
between. If the original method STILL has a conflict error, it
will get thrown after the retry limit.  In practice, this eliminated
the version conflict calls that were occuring with the SIEM tests,
once we started updating the saved object in the executor.

For cases where we know only attributes not contributing to AAD are
being updated, a new function is provided that does a partial update
on just those attributes, making partial updates for those attributes
a bit safer. That will be also used by PR elastic#75553.
  • Loading branch information
pmuellr committed Sep 17, 2020
1 parent 3c597d0 commit c3f5168
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 33 deletions.
30 changes: 20 additions & 10 deletions x-pack/plugins/alerts/server/alerts_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1657,11 +1657,16 @@ describe('muteAll()', () => {
});

await alertsClient.muteAll({ id: '1' });
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith('alert', '1', {
muteAll: true,
mutedInstanceIds: [],
updatedBy: 'elastic',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith(
'alert',
'1',
{
muteAll: true,
mutedInstanceIds: [],
updatedBy: 'elastic',
},
{}
);
});

describe('authorization', () => {
Expand Down Expand Up @@ -1742,11 +1747,16 @@ describe('unmuteAll()', () => {
});

await alertsClient.unmuteAll({ id: '1' });
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith('alert', '1', {
muteAll: false,
mutedInstanceIds: [],
updatedBy: 'elastic',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledWith(
'alert',
'1',
{
muteAll: false,
mutedInstanceIds: [],
updatedBy: 'elastic',
},
{}
);
});

describe('authorization', () => {
Expand Down
126 changes: 103 additions & 23 deletions x-pack/plugins/alerts/server/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import { parseIsoOrRelativeDate } from './lib/iso_or_relative_date';
import { alertInstanceSummaryFromEventLog } from './lib/alert_instance_summary_from_event_log';
import { IEvent } from '../../event_log/server';
import { parseDuration } from '../common/parse_duration';
import { retryIfConflicts } from './lib/retry_if_conflicts';
import { partiallyUpdateAlert } from './saved_objects';

export interface RegistryAlertTypeWithAuth extends RegistryAlertType {
authorizedConsumers: string[];
Expand Down Expand Up @@ -421,6 +423,14 @@ export class AlertsClient {
}

public async update({ id, data }: UpdateOptions): Promise<PartialAlert> {
return await retryIfConflicts(
this.logger,
`alertsClient.update(${id})`,
async () => await this.updateWithOCC({ id, data })
);
}

private async updateWithOCC({ id, data }: UpdateOptions): Promise<PartialAlert> {
let alertSavedObject: SavedObject<RawAlert>;

try {
Expand Down Expand Up @@ -528,7 +538,15 @@ export class AlertsClient {
};
}

public async updateApiKey({ id }: { id: string }) {
public async updateApiKey({ id }: { id: string }): Promise<void> {
return await retryIfConflicts(
this.logger,
`alertsClient.updateApiKey(${id})`,
async () => await this.updateApiKeyWithOCC({ id })
);
}

private async updateApiKeyWithOCC({ id }: { id: string }) {
let apiKeyToInvalidate: string | null = null;
let attributes: RawAlert;
let version: string | undefined;
Expand Down Expand Up @@ -596,7 +614,15 @@ export class AlertsClient {
}
}

public async enable({ id }: { id: string }) {
public async enable({ id }: { id: string }): Promise<void> {
return await retryIfConflicts(
this.logger,
`alertsClient.enable(${id})`,
async () => await this.enableWithOCC({ id })
);
}

private async enableWithOCC({ id }: { id: string }) {
let apiKeyToInvalidate: string | null = null;
let attributes: RawAlert;
let version: string | undefined;
Expand Down Expand Up @@ -657,7 +683,15 @@ export class AlertsClient {
}
}

public async disable({ id }: { id: string }) {
public async disable({ id }: { id: string }): Promise<void> {
return await retryIfConflicts(
this.logger,
`alertsClient.disable(${id})`,
async () => await this.disableWithOCC({ id })
);
}

private async disableWithOCC({ id }: { id: string }) {
let apiKeyToInvalidate: string | null = null;
let attributes: RawAlert;
let version: string | undefined;
Expand Down Expand Up @@ -710,8 +744,19 @@ export class AlertsClient {
}
}

public async muteAll({ id }: { id: string }) {
const { attributes } = await this.unsecuredSavedObjectsClient.get<RawAlert>('alert', id);
public async muteAll({ id }: { id: string }): Promise<void> {
return await retryIfConflicts(
this.logger,
`alertsClient.muteAll(${id})`,
async () => await this.muteAllWithOCC({ id })
);
}

private async muteAllWithOCC({ id }: { id: string }) {
const { attributes, version } = await this.unsecuredSavedObjectsClient.get<RawAlert>(
'alert',
id
);
await this.authorization.ensureAuthorized(
attributes.alertTypeId,
attributes.consumer,
Expand All @@ -722,19 +767,34 @@ export class AlertsClient {
await this.actionsAuthorization.ensureAuthorized('execute');
}

await this.unsecuredSavedObjectsClient.update(
'alert',
const updateAttributes = {
muteAll: true,
mutedInstanceIds: [],
updatedBy: await this.getUserName(),
};
const updateOptions = { version };

await partiallyUpdateAlert(
this.unsecuredSavedObjectsClient,
id,
this.updateMeta({
muteAll: true,
mutedInstanceIds: [],
updatedBy: await this.getUserName(),
})
updateAttributes,
updateOptions
);
}

public async unmuteAll({ id }: { id: string }): Promise<void> {
return await retryIfConflicts(
this.logger,
`alertsClient.unmuteAll(${id})`,
async () => await this.unmuteAllWithOCC({ id })
);
}

public async unmuteAll({ id }: { id: string }) {
const { attributes } = await this.unsecuredSavedObjectsClient.get<RawAlert>('alert', id);
private async unmuteAllWithOCC({ id }: { id: string }) {
const { attributes, version } = await this.unsecuredSavedObjectsClient.get<RawAlert>(
'alert',
id
);
await this.authorization.ensureAuthorized(
attributes.alertTypeId,
attributes.consumer,
Expand All @@ -745,18 +805,30 @@ export class AlertsClient {
await this.actionsAuthorization.ensureAuthorized('execute');
}

await this.unsecuredSavedObjectsClient.update(
'alert',
const updateAttributes = {
muteAll: false,
mutedInstanceIds: [],
updatedBy: await this.getUserName(),
};
const updateOptions = { version };

await partiallyUpdateAlert(
this.unsecuredSavedObjectsClient,
id,
this.updateMeta({
muteAll: false,
mutedInstanceIds: [],
updatedBy: await this.getUserName(),
})
updateAttributes,
updateOptions
);
}

public async muteInstance({ alertId, alertInstanceId }: MuteOptions): Promise<void> {
return await retryIfConflicts(
this.logger,
`alertsClient.muteInstance(${alertId})`,
async () => await this.muteInstanceWithOCC({ alertId, alertInstanceId })
);
}

public async muteInstance({ alertId, alertInstanceId }: MuteOptions) {
private async muteInstanceWithOCC({ alertId, alertInstanceId }: MuteOptions) {
const { attributes, version } = await this.unsecuredSavedObjectsClient.get<Alert>(
'alert',
alertId
Expand Down Expand Up @@ -787,7 +859,15 @@ export class AlertsClient {
}
}

public async unmuteInstance({
public async unmuteInstance({ alertId, alertInstanceId }: MuteOptions): Promise<void> {
return await retryIfConflicts(
this.logger,
`alertsClient.unmuteInstance(${alertId})`,
async () => await this.unmuteInstanceWithOCC({ alertId, alertInstanceId })
);
}

private async unmuteInstanceWithOCC({
alertId,
alertInstanceId,
}: {
Expand Down
59 changes: 59 additions & 0 deletions x-pack/plugins/alerts/server/lib/retry_if_conflicts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

// This module provides a helper to perform retries on a function if the
// function ends up throwing a SavedObject 409 conflict. This can happen
// when alert SO's are updated in the background, and will avoid having to
// have the caller make explicit conflict checks, where the conflict was
// caused by a background update.

import { random } from 'lodash';
import { Logger, SavedObjectsErrorHelpers } from '../../../../../src/core/server';

type RetryableForConflicts<T> = () => Promise<T>;

// number of times to retry when conflicts occur
const RetryForConflictsAttempts = 5;

// milliseconds to wait before retrying when conflicts occur
const RetryForConflictsDelayMin = 100;
const RetryForConflictsDelayMax = 250;

// retry an operation if it runs into 409 Conflict's, up to a limit
export async function retryIfConflicts<T>(
logger: Logger,
name: string,
operation: RetryableForConflicts<T>,
retries: number = RetryForConflictsAttempts
): Promise<T> {
let error: Error;

// run the operation, return if no errors or throw if not a conflict error
try {
return await operation();
} catch (err) {
error = err;
if (!SavedObjectsErrorHelpers.isConflictError(err)) {
logger.error(`alertClient ${name} conflict, exceeded retries`);
throw err;
}
}

// must be a conflict; if no retries left, throw it
if (retries <= 0) {
throw error;
}

// delay a bit before retrying
logger.warn(`alertClient ${name} conflict, retrying ...`);
await waitBeforeNextRetry();
return await retryIfConflicts(logger, name, operation, retries - 1);
}

async function waitBeforeNextRetry(): Promise<void> {
const millis = random(RetryForConflictsDelayMin, RetryForConflictsDelayMax);
await new Promise((resolve) => setTimeout(resolve, millis));
}
13 changes: 13 additions & 0 deletions x-pack/plugins/alerts/server/saved_objects/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@ import mappings from './mappings.json';
import { getMigrations } from './migrations';
import { EncryptedSavedObjectsPluginSetup } from '../../../encrypted_saved_objects/server';

export { partiallyUpdateAlert } from './partially_update_alert';

export const AlertAttributesExcludedFromAAD = [
'scheduledTaskId',
'muteAll',
'mutedInstanceIds',
'updatedBy',
];

// useful for Pick<RawAlert, AlertAttributesExcludedFromAADType> which is a
// type which is a subset of RawAlert with just attributes excluded from AAD
export type AlertAttributesExcludedFromAADType = typeof AlertAttributesExcludedFromAAD[number];

export function setupSavedObjects(
savedObjects: SavedObjectsServiceSetup,
encryptedSavedObjects: EncryptedSavedObjectsPluginSetup
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { pick } from 'lodash';
import { RawAlert } from '../types';

import {
SavedObjectsClientContract,
ISavedObjectsRepository,
SavedObjectsErrorHelpers,
SavedObjectsUpdateOptions,
} from '../../../../../src/core/server';

import { AlertAttributesExcludedFromAAD, AlertAttributesExcludedFromAADType } from './index';

type PartiallyUpdateableAlertAttributes = Pick<RawAlert, AlertAttributesExcludedFromAADType>;

interface PartiallyUpdateAlertSavedObjectOptions {
version?: string;
ignore404?: boolean;
namespace?: string; // only should be used with ISavedObjectsRepository
}

type SavedObjectClient = SavedObjectsClientContract | ISavedObjectsRepository;

// direct, partial update to an alert saved object via scoped SavedObjectsClient
// using namespace set in the client
export async function partiallyUpdateAlert(
savedObjectsClient: SavedObjectClient,
id: string,
attributes: PartiallyUpdateableAlertAttributes,
options?: PartiallyUpdateAlertSavedObjectOptions
): Promise<void | ReturnType<SavedObjectsClientContract['update']>> {
const attributeUpdates = pick(attributes, AlertAttributesExcludedFromAAD);

const updateOptions: SavedObjectsUpdateOptions = {};
if (options?.namespace) {
updateOptions.namespace = options.namespace;
}
if (options?.version) {
updateOptions.version = options.version;
}

try {
return await savedObjectsClient.update<RawAlert>('alert', id, attributeUpdates, updateOptions);
} catch (err) {
if (options?.ignore404 && SavedObjectsErrorHelpers.isNotFoundError(err)) {
return;
}
throw err;
}
}

0 comments on commit c3f5168

Please sign in to comment.