From 76ab6c09450df5b9c61df5d4b89aa44b71af946f Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Wed, 19 Aug 2020 16:10:33 +0100 Subject: [PATCH] [Saved objects] Add support for version on create & bulkCreate when overwriting a document (#75172) Adds support for `version` one the SavedObjectsClient's create api. This sallows us to retain Optimistic concurrency control when using create to overwrite an existing document. # Conflicts: # src/core/server/saved_objects/service/lib/repository.ts --- ...ore-server.savedobjectsbulkcreateobject.md | 1 + ...er.savedobjectsbulkcreateobject.version.md | 11 +++++ ...n-core-server.savedobjectscreateoptions.md | 1 + ...erver.savedobjectscreateoptions.version.md | 13 ++++++ .../import/import_saved_objects.ts | 7 ++- .../import/resolve_import_errors.ts | 12 +++-- .../service/lib/repository.test.js | 43 +++++++++++++++++- .../saved_objects/service/lib/repository.ts | 44 ++++++++++++------- .../service/saved_objects_client.ts | 6 +++ src/core/server/server.api.md | 3 ++ .../services/sample_data/routes/install.ts | 2 +- .../services/epm/kibana/assets/install.ts | 4 +- 12 files changed, 121 insertions(+), 26 deletions(-) create mode 100644 docs/development/core/server/kibana-plugin-core-server.savedobjectsbulkcreateobject.version.md create mode 100644 docs/development/core/server/kibana-plugin-core-server.savedobjectscreateoptions.version.md diff --git a/docs/development/core/server/kibana-plugin-core-server.savedobjectsbulkcreateobject.md b/docs/development/core/server/kibana-plugin-core-server.savedobjectsbulkcreateobject.md index 5a9ca36ba56f43..5ccad134248f64 100644 --- a/docs/development/core/server/kibana-plugin-core-server.savedobjectsbulkcreateobject.md +++ b/docs/development/core/server/kibana-plugin-core-server.savedobjectsbulkcreateobject.md @@ -20,4 +20,5 @@ export interface SavedObjectsBulkCreateObject | [migrationVersion](./kibana-plugin-core-server.savedobjectsbulkcreateobject.migrationversion.md) | SavedObjectsMigrationVersion | Information about the migrations that have been applied to this SavedObject. When Kibana starts up, KibanaMigrator detects outdated documents and migrates them based on this value. For each migration that has been applied, the plugin's name is used as a key and the latest migration version as the value. | | [references](./kibana-plugin-core-server.savedobjectsbulkcreateobject.references.md) | SavedObjectReference[] | | | [type](./kibana-plugin-core-server.savedobjectsbulkcreateobject.type.md) | string | | +| [version](./kibana-plugin-core-server.savedobjectsbulkcreateobject.version.md) | string | | diff --git a/docs/development/core/server/kibana-plugin-core-server.savedobjectsbulkcreateobject.version.md b/docs/development/core/server/kibana-plugin-core-server.savedobjectsbulkcreateobject.version.md new file mode 100644 index 00000000000000..ca2a38693d0365 --- /dev/null +++ b/docs/development/core/server/kibana-plugin-core-server.savedobjectsbulkcreateobject.version.md @@ -0,0 +1,11 @@ + + +[Home](./index.md) > [kibana-plugin-core-server](./kibana-plugin-core-server.md) > [SavedObjectsBulkCreateObject](./kibana-plugin-core-server.savedobjectsbulkcreateobject.md) > [version](./kibana-plugin-core-server.savedobjectsbulkcreateobject.version.md) + +## SavedObjectsBulkCreateObject.version property + +Signature: + +```typescript +version?: string; +``` diff --git a/docs/development/core/server/kibana-plugin-core-server.savedobjectscreateoptions.md b/docs/development/core/server/kibana-plugin-core-server.savedobjectscreateoptions.md index 5e9433c5c9196a..c5201efd0608d6 100644 --- a/docs/development/core/server/kibana-plugin-core-server.savedobjectscreateoptions.md +++ b/docs/development/core/server/kibana-plugin-core-server.savedobjectscreateoptions.md @@ -20,4 +20,5 @@ export interface SavedObjectsCreateOptions extends SavedObjectsBaseOptions | [overwrite](./kibana-plugin-core-server.savedobjectscreateoptions.overwrite.md) | boolean | Overwrite existing documents (defaults to false) | | [references](./kibana-plugin-core-server.savedobjectscreateoptions.references.md) | SavedObjectReference[] | | | [refresh](./kibana-plugin-core-server.savedobjectscreateoptions.refresh.md) | MutatingOperationRefreshSetting | The Elasticsearch Refresh setting for this operation | +| [version](./kibana-plugin-core-server.savedobjectscreateoptions.version.md) | string | An opaque version number which changes on each successful write operation. Can be used in conjunction with overwrite for implementing optimistic concurrency control. | diff --git a/docs/development/core/server/kibana-plugin-core-server.savedobjectscreateoptions.version.md b/docs/development/core/server/kibana-plugin-core-server.savedobjectscreateoptions.version.md new file mode 100644 index 00000000000000..51da57064abb9e --- /dev/null +++ b/docs/development/core/server/kibana-plugin-core-server.savedobjectscreateoptions.version.md @@ -0,0 +1,13 @@ + + +[Home](./index.md) > [kibana-plugin-core-server](./kibana-plugin-core-server.md) > [SavedObjectsCreateOptions](./kibana-plugin-core-server.savedobjectscreateoptions.md) > [version](./kibana-plugin-core-server.savedobjectscreateoptions.version.md) + +## SavedObjectsCreateOptions.version property + +An opaque version number which changes on each successful write operation. Can be used in conjunction with `overwrite` for implementing optimistic concurrency control. + +Signature: + +```typescript +version?: string; +``` diff --git a/src/core/server/saved_objects/import/import_saved_objects.ts b/src/core/server/saved_objects/import/import_saved_objects.ts index 6065e03fb16283..4956491a79aa91 100644 --- a/src/core/server/saved_objects/import/import_saved_objects.ts +++ b/src/core/server/saved_objects/import/import_saved_objects.ts @@ -25,6 +25,7 @@ import { SavedObjectsImportOptions, } from './types'; import { validateReferences } from './validate_references'; +import { SavedObject } from '../types'; /** * Import saved objects from given stream. See the {@link SavedObjectsImportOptions | options} for more @@ -67,7 +68,7 @@ export async function importSavedObjectsFromStream({ } // Create objects in bulk - const bulkCreateResult = await savedObjectsClient.bulkCreate(filteredObjects, { + const bulkCreateResult = await savedObjectsClient.bulkCreate(omitVersion(filteredObjects), { overwrite, namespace, }); @@ -82,3 +83,7 @@ export async function importSavedObjectsFromStream({ ...(errorAccumulator.length ? { errors: errorAccumulator } : {}), }; } + +export function omitVersion(objects: SavedObject[]): SavedObject[] { + return objects.map(({ version, ...object }) => object); +} diff --git a/src/core/server/saved_objects/import/resolve_import_errors.ts b/src/core/server/saved_objects/import/resolve_import_errors.ts index a5175aa0805980..dce044a31a577a 100644 --- a/src/core/server/saved_objects/import/resolve_import_errors.ts +++ b/src/core/server/saved_objects/import/resolve_import_errors.ts @@ -26,6 +26,7 @@ import { SavedObjectsResolveImportErrorsOptions, } from './types'; import { validateReferences } from './validate_references'; +import { omitVersion } from './import_saved_objects'; /** * Resolve and return saved object import errors. @@ -91,7 +92,7 @@ export async function resolveSavedObjectsImportErrors({ // Bulk create in two batches, overwrites and non-overwrites const { objectsToOverwrite, objectsToNotOverwrite } = splitOverwrites(filteredObjects, retries); if (objectsToOverwrite.length) { - const bulkCreateResult = await savedObjectsClient.bulkCreate(objectsToOverwrite, { + const bulkCreateResult = await savedObjectsClient.bulkCreate(omitVersion(objectsToOverwrite), { overwrite: true, namespace, }); @@ -102,9 +103,12 @@ export async function resolveSavedObjectsImportErrors({ successCount += bulkCreateResult.saved_objects.filter((obj) => !obj.error).length; } if (objectsToNotOverwrite.length) { - const bulkCreateResult = await savedObjectsClient.bulkCreate(objectsToNotOverwrite, { - namespace, - }); + const bulkCreateResult = await savedObjectsClient.bulkCreate( + omitVersion(objectsToNotOverwrite), + { + namespace, + } + ); errorAccumulator = [ ...errorAccumulator, ...extractErrors(bulkCreateResult.saved_objects, objectsToNotOverwrite), diff --git a/src/core/server/saved_objects/service/lib/repository.test.js b/src/core/server/saved_objects/service/lib/repository.test.js index d563edbe66c9b6..4be4c65c111f25 100644 --- a/src/core/server/saved_objects/service/lib/repository.test.js +++ b/src/core/server/saved_objects/service/lib/repository.test.js @@ -472,8 +472,16 @@ describe('SavedObjectsRepository', () => { { method, _index = expect.any(String), getId = () => expect.any(String) } ) => { const body = []; - for (const { type, id } of objects) { - body.push({ [method]: { _index, _id: getId(type, id) } }); + for (const { type, id, if_primary_term: ifPrimaryTerm, if_seq_no: ifSeqNo } of objects) { + body.push({ + [method]: { + _index, + _id: getId(type, id), + ...(ifPrimaryTerm && ifSeqNo + ? { if_primary_term: expect.any(Number), if_seq_no: expect.any(Number) } + : {}), + }, + }); body.push(expect.any(Object)); } expectClusterCallArgs({ body }); @@ -529,6 +537,27 @@ describe('SavedObjectsRepository', () => { expectClusterCallArgsAction([obj1, obj2], { method: 'index' }); }); + it(`should use the ES index method with version if ID and version are defined and overwrite=true`, async () => { + await bulkCreateSuccess( + [ + { + ...obj1, + version: mockVersion, + }, + obj2, + ], + { overwrite: true } + ); + + const obj1WithSeq = { + ...obj1, + if_seq_no: mockVersionProps._seq_no, + if_primary_term: mockVersionProps._primary_term, + }; + + expectClientCallArgsAction([obj1WithSeq, obj2], { method: 'index' }); + }); + it(`should use the ES create method if ID is defined and overwrite=false`, async () => { await bulkCreateSuccess([obj1, obj2]); expectClusterCallArgsAction([obj1, obj2], { method: 'create' }); @@ -1458,6 +1487,16 @@ describe('SavedObjectsRepository', () => { expectClusterCalls('index'); }); + it(`should use the ES index with version if ID and version are defined and overwrite=true`, async () => { + await createSuccess(type, attributes, { id, overwrite: true, version: mockVersion }); + expect(client.index).toHaveBeenCalled(); + + expect(client.index.mock.calls[0][0]).toMatchObject({ + if_seq_no: mockVersionProps._seq_no, + if_primary_term: mockVersionProps._primary_term, + }); + }); + it(`should use the ES create action if ID is defined and overwrite=false`, async () => { await createSuccess(type, attributes, { id }); expectClusterCalls('create'); diff --git a/src/core/server/saved_objects/service/lib/repository.ts b/src/core/server/saved_objects/service/lib/repository.ts index 7a5ac9204627c0..ce9be015a79286 100644 --- a/src/core/server/saved_objects/service/lib/repository.ts +++ b/src/core/server/saved_objects/service/lib/repository.ts @@ -137,11 +137,11 @@ export class SavedObjectsRepository { injectedConstructor: any = SavedObjectsRepository ): ISavedObjectsRepository { const mappings = migrator.getActiveMappings(); - const allTypes = typeRegistry.getAllTypes().map((t) => t.name); + const allTypes = typeRegistry.getAllTypes().map(t => t.name); const serializer = new SavedObjectsSerializer(typeRegistry); - const visibleTypes = allTypes.filter((type) => !typeRegistry.isHidden(type)); + const visibleTypes = allTypes.filter(type => !typeRegistry.isHidden(type)); - const missingTypeMappings = includedHiddenTypes.filter((type) => !allTypes.includes(type)); + const missingTypeMappings = includedHiddenTypes.filter(type => !allTypes.includes(type)); if (missingTypeMappings.length > 0) { throw new Error( `Missing mappings for saved objects types: '${missingTypeMappings.join(', ')}'` @@ -220,6 +220,7 @@ export class SavedObjectsRepository { overwrite = false, references = [], refresh = DEFAULT_REFRESH_SETTING, + version, } = options; if (!this._allowedTypes.includes(type)) { @@ -260,6 +261,7 @@ export class SavedObjectsRepository { index: this.getIndexForType(type), refresh, body: raw._source, + ...(overwrite && version ? decodeRequestVersion(version) : {}), }); return this._rawToSavedObject({ @@ -285,7 +287,7 @@ export class SavedObjectsRepository { const time = this._getCurrentTime(); let bulkGetRequestIndexCounter = 0; - const expectedResults: Either[] = objects.map((object) => { + const expectedResults: Either[] = objects.map(object => { if (!this._allowedTypes.includes(object.type)) { return { tag: 'Left' as 'Left', @@ -332,14 +334,19 @@ export class SavedObjectsRepository { let bulkRequestIndexCounter = 0; const bulkCreateParams: object[] = []; - const expectedBulkResults: Either[] = expectedResults.map((expectedBulkGetResult) => { + const expectedBulkResults: Either[] = expectedResults.map(expectedBulkGetResult => { if (isLeft(expectedBulkGetResult)) { return expectedBulkGetResult; } let savedObjectNamespace; let savedObjectNamespaces; - const { esRequestIndex, object, method } = expectedBulkGetResult.value; + let versionProperties; + const { + esRequestIndex, + object: { version, ...object }, + method, + } = expectedBulkGetResult.value; if (esRequestIndex !== undefined) { const indexFound = bulkGetResponse.status !== 404; const actualResult = indexFound ? bulkGetResponse.docs[esRequestIndex] : undefined; @@ -356,12 +363,14 @@ export class SavedObjectsRepository { }; } savedObjectNamespaces = getSavedObjectNamespaces(namespace, docFound && actualResult); + versionProperties = getExpectedVersionProperties(version, actualResult); } else { if (this._registry.isSingleNamespace(object.type)) { savedObjectNamespace = namespace; } else if (this._registry.isMultiNamespace(object.type)) { savedObjectNamespaces = getSavedObjectNamespaces(namespace); } + versionProperties = getExpectedVersionProperties(version); } const expectedResult = { @@ -386,6 +395,7 @@ export class SavedObjectsRepository { [method]: { _id: expectedResult.rawMigratedDoc._id, _index: this.getIndexForType(object.type), + ...(overwrite && versionProperties), }, }, expectedResult.rawMigratedDoc._source @@ -402,7 +412,7 @@ export class SavedObjectsRepository { : undefined; return { - saved_objects: expectedBulkResults.map((expectedResult) => { + saved_objects: expectedBulkResults.map(expectedResult => { if (isLeft(expectedResult)) { return expectedResult.error as any; } @@ -454,7 +464,7 @@ export class SavedObjectsRepository { preflightResult = await this.preflightCheckIncludesNamespace(type, id, namespace); const existingNamespaces = getSavedObjectNamespaces(undefined, preflightResult); const remainingNamespaces = existingNamespaces?.filter( - (x) => x !== getNamespaceString(namespace) + x => x !== getNamespaceString(namespace) ); if (remainingNamespaces?.length) { @@ -531,7 +541,7 @@ export class SavedObjectsRepository { const { refresh = DEFAULT_REFRESH_SETTING } = options; const allTypes = Object.keys(getRootPropertiesObjects(this._mappings)); - const typesToUpdate = allTypes.filter((type) => !this._registry.isNamespaceAgnostic(type)); + const typesToUpdate = allTypes.filter(type => !this._registry.isNamespaceAgnostic(type)); const updateOptions = { index: this.getIndicesForTypes(typesToUpdate), @@ -602,7 +612,7 @@ export class SavedObjectsRepository { } const types = Array.isArray(type) ? type : [type]; - const allowedTypes = types.filter((t) => this._allowedTypes.includes(t)); + const allowedTypes = types.filter(t => this._allowedTypes.includes(t)); if (allowedTypes.length === 0) { return { page, @@ -709,7 +719,7 @@ export class SavedObjectsRepository { } let bulkGetRequestIndexCounter = 0; - const expectedBulkGetResults: Either[] = objects.map((object) => { + const expectedBulkGetResults: Either[] = objects.map(object => { const { type, id, fields } = object; if (!this._allowedTypes.includes(type)) { @@ -751,7 +761,7 @@ export class SavedObjectsRepository { : undefined; return { - saved_objects: expectedBulkGetResults.map((expectedResult) => { + saved_objects: expectedBulkGetResults.map(expectedResult => { if (isLeft(expectedResult)) { return expectedResult.error as any; } @@ -1004,7 +1014,7 @@ export class SavedObjectsRepository { const preflightResult = await this.preflightCheckIncludesNamespace(type, id, namespace); const existingNamespaces = getSavedObjectNamespaces(undefined, preflightResult); // if there are somehow no existing namespaces, allow the operation to proceed and delete this saved object - const remainingNamespaces = existingNamespaces?.filter((x) => !namespaces.includes(x)); + const remainingNamespaces = existingNamespaces?.filter(x => !namespaces.includes(x)); if (remainingNamespaces?.length) { // if there is 1 or more namespace remaining, update the saved object @@ -1080,7 +1090,7 @@ export class SavedObjectsRepository { const { namespace } = options; let bulkGetRequestIndexCounter = 0; - const expectedBulkGetResults: Either[] = objects.map((object) => { + const expectedBulkGetResults: Either[] = objects.map(object => { const { type, id } = object; if (!this._allowedTypes.includes(type)) { @@ -1136,7 +1146,7 @@ export class SavedObjectsRepository { let bulkUpdateRequestIndexCounter = 0; const bulkUpdateParams: object[] = []; const expectedBulkUpdateResults: Either[] = expectedBulkGetResults.map( - (expectedBulkGetResult) => { + expectedBulkGetResult => { if (isLeft(expectedBulkGetResult)) { return expectedBulkGetResult; } @@ -1201,7 +1211,7 @@ export class SavedObjectsRepository { : {}; return { - saved_objects: expectedBulkUpdateResults.map((expectedResult) => { + saved_objects: expectedBulkUpdateResults.map(expectedResult => { if (isLeft(expectedResult)) { return expectedResult.error as any; } @@ -1354,7 +1364,7 @@ export class SavedObjectsRepository { * @param types The types whose indices should be retrieved */ private getIndicesForTypes(types: string[]) { - return unique(types.map((t) => this.getIndexForType(t))); + return unique(types.map(t => this.getIndexForType(t))); } private _getCurrentTime() { diff --git a/src/core/server/saved_objects/service/saved_objects_client.ts b/src/core/server/saved_objects/service/saved_objects_client.ts index e15a92c92772f3..6a9f4f5143e841 100644 --- a/src/core/server/saved_objects/service/saved_objects_client.ts +++ b/src/core/server/saved_objects/service/saved_objects_client.ts @@ -37,6 +37,11 @@ export interface SavedObjectsCreateOptions extends SavedObjectsBaseOptions { id?: string; /** Overwrite existing documents (defaults to false) */ overwrite?: boolean; + /** + * An opaque version number which changes on each successful write operation. + * Can be used in conjunction with `overwrite` for implementing optimistic concurrency control. + **/ + version?: string; /** {@inheritDoc SavedObjectsMigrationVersion} */ migrationVersion?: SavedObjectsMigrationVersion; references?: SavedObjectReference[]; @@ -52,6 +57,7 @@ export interface SavedObjectsBulkCreateObject { id?: string; type: string; attributes: T; + version?: string; references?: SavedObjectReference[]; /** {@inheritDoc SavedObjectsMigrationVersion} */ migrationVersion?: SavedObjectsMigrationVersion; diff --git a/src/core/server/server.api.md b/src/core/server/server.api.md index c3cd219f2b8ec5..f198ac53be16a7 100644 --- a/src/core/server/server.api.md +++ b/src/core/server/server.api.md @@ -1945,6 +1945,8 @@ export interface SavedObjectsBulkCreateObject { references?: SavedObjectReference[]; // (undocumented) type: string; + // (undocumented) + version?: string; } // @public (undocumented) @@ -2079,6 +2081,7 @@ export interface SavedObjectsCreateOptions extends SavedObjectsBaseOptions { // (undocumented) references?: SavedObjectReference[]; refresh?: MutatingOperationRefreshSetting; + version?: string; } // @public (undocumented) diff --git a/src/plugins/home/server/services/sample_data/routes/install.ts b/src/plugins/home/server/services/sample_data/routes/install.ts index 2d1a53fbb09dc8..b94456682afcc9 100644 --- a/src/plugins/home/server/services/sample_data/routes/install.ts +++ b/src/plugins/home/server/services/sample_data/routes/install.ts @@ -154,7 +154,7 @@ export function createInstallRoute( let createResults; try { createResults = await context.core.savedObjects.client.bulkCreate( - sampleDataset.savedObjects, + sampleDataset.savedObjects.map(({ version, ...savedObject }) => savedObject), { overwrite: true } ); } catch (err) { diff --git a/x-pack/plugins/ingest_manager/server/services/epm/kibana/assets/install.ts b/x-pack/plugins/ingest_manager/server/services/epm/kibana/assets/install.ts index a3fe444b19b1a7..26e30948abfa79 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/kibana/assets/install.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/kibana/assets/install.ts @@ -20,7 +20,9 @@ import { import { deleteKibanaSavedObjectsAssets } from '../../packages/remove'; import { getInstallationObject, savedObjectTypes } from '../../packages'; -type SavedObjectToBe = Required & { type: AssetType }; +type SavedObjectToBe = Required> & { + type: AssetType; +}; export type ArchiveAsset = Pick< SavedObject, 'id' | 'attributes' | 'migrationVersion' | 'references'