Skip to content

Commit

Permalink
Bulk update (#1633)
Browse files Browse the repository at this point in the history
* bulkUpdate()
+ support for synking intended updates in dexie-cloud-addon instead of just syncing the actual changes being made locally.

A flaw in dbcore allowed optional property 'changeSpecs' without the option to combine them with the provided keys (and not the keys sent to bulk 'put' operation, which can be fewer than the intended keys sent to table.bulkPut() because we only put keys that existed in the db currently)

We had to rename `DBCorePutRequest.changeSpecs`. Otherwise an old dexie-cloud-addon combined with a new dexie could apply the changeSpecs on the wrong keys. Making sure the interopability will be the following:

* old dexie + new dexie-cloud-addon: Work as before ('changeSpecs' is not used in old dexie)

* new dexie + old dexie-cloud-addon: Work as before (new dexie's bulkUpdate uses new property 'updates' and don't start using 'changeSpecs'

* new dexie + new dexie-cloud addon: Handle bulkUpdate() similar to how multiple calls to table.update() would behave - the indented keys and their changeSpecs are synced so that the server can reexecute the original intention on server-side data (as described in https://dexie.org/cloud/docs/consistency)

* Added a test to verify middlewares are called even when there's nothing to put.

* Fix - integration tests failing when addons mutate db on open

* Typo fix in tests-table.js

Closes #632
  • Loading branch information
dfahlander committed Oct 25, 2022
1 parent ef3654a commit 1ed9668
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export function createImplicitPropSetterMiddleware(
// modify operations. Reason: Server may not have
// the object. Object should be created on server only
// if is being updated. An update operation won't create it
// so we must delete req.changeSpec to decrate operation to
// so we must delete req.changeSpec to degrade operation to
// an upsert operation with timestamp so that it will be created.
// We must also degrade from consistent modify operations for the
// same reason - object might be there on server. Must but put up instead.
Expand All @@ -58,7 +58,7 @@ export function createImplicitPropSetterMiddleware(
const now = Date.now();
delete req.criteria;
delete req.changeSpec;
delete req.changeSpecs;
delete req.updates;
obj.$ts = Date.now();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,10 @@ export function createMutationTrackingMiddleware({
const { numFailures: hasFailures, failures } = res;
let keys = type === 'delete' ? req.keys! : res.results!;
let values = 'values' in req ? req.values : [];
let changeSpecs = 'changeSpecs' in req ? req.changeSpecs! : [];
let updates = 'updates' in req && req.updates!;
if (hasFailures) {
keys = keys.filter((_, idx) => !failures[idx]);
values = values.filter((_, idx) => !failures[idx]);
changeSpecs = changeSpecs.filter((_, idx) => !failures[idx]);
}
const ts = Date.now();

Expand Down Expand Up @@ -240,13 +239,13 @@ export function createMutationTrackingMiddleware({
txid,
userId
}
: req.changeSpecs
: updates
? {
// One changeSpec per key
type: 'update',
ts,
keys,
changeSpecs,
keys: updates.keys,
changeSpecs: updates.changeSpecs,
txid,
userId
}
Expand Down
70 changes: 70 additions & 0 deletions src/classes/table/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { DBCoreTable } from '../../public/types/dbcore';
import { AnyRange } from '../../dbcore/keyrange';
import { workaroundForUndefinedPrimKey } from '../../functions/workaround-undefined-primkey';
import { Entity } from '../entity/Entity';
import { UpdateSpec } from '../../public';
import { cmp } from '../../functions/cmp';

/** class Table
*
Expand Down Expand Up @@ -487,6 +489,74 @@ export class Table implements ITable<any, IndexableType> {
});
}

/** Table.bulkUpdate()
*
* https://dexie.org/docs/Table.Table.bulkUpdate()
*/
bulkUpdate(
keysAndChanges: readonly { key: any; changes: UpdateSpec<any> }[]
): PromiseExtended<number> {
const coreTable = this.core;
const keys = keysAndChanges.map((entry) => entry.key);
const changeSpecs = keysAndChanges.map((entry) => entry.changes);
const offsetMap: number[] = [];
return this._trans('readwrite', (trans) => {
return coreTable.getMany({ trans, keys, cache: 'clone' }).then((objs) => {
const resultKeys: any[] = [];
const resultObjs: any[] = [];
keysAndChanges.forEach(({ key, changes }, idx) => {
const obj = objs[idx];
if (obj) {
for (const keyPath of Object.keys(changes)) {
const value = changes[keyPath];
if (keyPath === this.schema.primKey.keyPath) {
if (cmp(value, key) !== 0) {
throw new exceptions.Constraint(
`Cannot update primary key in bulkUpdate()`
);
}
} else {
setByKeyPath(obj, keyPath, value);
}
}
offsetMap.push(idx);
resultKeys.push(key);
resultObjs.push(obj);
}
});
const numEntries = resultKeys.length;
return coreTable
.mutate({
trans,
type: 'put',
keys: resultKeys,
values: resultObjs,
updates: {
keys,
changeSpecs
}
})
.then(({ numFailures, failures }) => {
if (numFailures === 0) return numEntries;
// Failure. bulkPut() may have a subset of keys
// so we must translate returned 'failutes' into the offsets of given argument:
for (const offset of Object.keys(failures)) {
const mappedOffset = offsetMap[Number(offset)];
if (mappedOffset != null) {
const failure = failures[offset];
delete failures[offset];
failures[mappedOffset] = failure;
}
}
throw new BulkError(
`${this.name}.bulkUpdate(): ${numFailures} of ${numEntries} operations failed`,
failures
);
});
});
});
}

/** Table.bulkDelete()
*
* https://dexie.org/docs/Table/Table.bulkDelete()
Expand Down
2 changes: 1 addition & 1 deletion src/errors/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export function BulkError (msg, failures) {
this.name = "BulkError";
this.failures = Object.keys(failures).map(pos => failures[pos]);
this.failuresByPos = failures;
this.message = getMultiErrorMessage(msg, failures);
this.message = getMultiErrorMessage(msg, this.failures);
}
derive(BulkError).from(DexieError);

Expand Down
5 changes: 4 additions & 1 deletion src/public/types/dbcore.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ export interface DBCorePutRequest {
range: DBCoreKeyRange;
};
changeSpec?: {[keyPath: string]: any}; // Common changeSpec for each key
changeSpecs?: {[keyPath: string]: any}[]; // changeSpec per key.
updates?: {
keys: any[],
changeSpecs: {[keyPath: string]: any}[]; // changeSpec per key.
},
/** @deprecated Will always get results since 3.1.0-alpha.5 */
wantResults?: boolean;
}
Expand Down
2 changes: 2 additions & 0 deletions src/public/types/table.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,7 @@ export interface Table<T=any, TKeyPropNameOrKeyType=IndexableType, TOpt=void> {
bulkPut<B extends boolean>(items: readonly InsertType<T, TOpt ,TKeyPropNameOrKeyType>[], options: { allKeys: B }): PromiseExtended<B extends true ? IDType<T, TKeyPropNameOrKeyType>[] : IDType<T, TKeyPropNameOrKeyType>>;
bulkPut(items: readonly InsertType<T, TOpt, TKeyPropNameOrKeyType>[], keys?: IndexableTypeArrayReadonly, options?: { allKeys: boolean }): PromiseExtended<IDType<T, TKeyPropNameOrKeyType>>;

bulkUpdate(keysAndChanges: ReadonlyArray<{key: IDType<T, TKeyPropNameOrKeyType>, changes: UpdateSpec<T>}>): PromiseExtended<number>;

bulkDelete(keys: IDType<T, TKeyPropNameOrKeyType>[]): PromiseExtended<void>;
}
182 changes: 180 additions & 2 deletions test/tests-table.js
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,162 @@ asyncTest("put-no-transaction", function () {
}).finally(start);
});

promisedTest("bulkUpdate", async ()=>{
await db.items.bulkAdd([
{id: 1, foo: {bar: 1}},
{id: 2, foo: {bar: 2}},
{id: 3, foo: {bar: 3}}
]);
await db.items.bulkUpdate([
{key: 1, changes: {"foo.bar": 101}},
{key: 2, changes: {"foo.bar": 102, "foo.baz": "x"}},
{key: 4, changes: {"foo.bar": 104}},
]);
const allItems = await db.items.toArray();
const expected = [
{id: 1, foo: {bar: 101}},
{id: 2, foo: {bar: 102, baz: "x"}},
{id: 3, foo: {bar: 3}}
];
deepEqual(allItems, expected, "2 items updated as expected. nonexisting item not updated.");
});

promisedTest("bulkUpdate without actual changes (check it doesn't bail out)", async ()=>{
const dbCoreMutateCalls = [];
db.use({
stack: 'dbcore',
name: 'temp-logger',
create: (downDb) => ({
...downDb,
table: (tableName) => {
const downTable = downDb.table(tableName);
return {
...downTable,
mutate(req) {
if (tableName === 'items') {
dbCoreMutateCalls.push(req);
}
return downTable.mutate(req);
}
};
}
})
});
db.close();
await db.open(); // Apply the middleware

try {
// Clear the log (in case another middleware or addon did something in db ready in integration tests)
dbCoreMutateCalls.splice(0, dbCoreMutateCalls.length);
equal(dbCoreMutateCalls.length, 0, 'No mutate calls yet');
await db.items.bulkUpdate([
{ key: 'nonexist1', changes: { 'foo.bar': 101 } },
{ key: 'nonexist2', changes: { 'foo.bar': 102 } },
{ key: 'nonexist3', changes: { 'foo.bar': 103 } }
]);
equal(dbCoreMutateCalls.length, 1, 'One mutate call has taken place');
deepEqualPartial(
dbCoreMutateCalls,
[
{
type: 'put',
values: [],
criteria: undefined,
changeSpec: undefined,
updates: {
keys: ['nonexist1', 'nonexist2', 'nonexist3'],
changeSpecs: [
{ 'foo.bar': 101 },
{ 'foo.bar': 102 },
{ 'foo.bar': 103 }
],
},
},
],
'The mutate call was a put call and contained the intended updates for consistent sync addons to consume'
);

const allItems = await db.items.toArray();
const expected = [];
deepEqual(allItems, expected, 'Nonexisting item not updated.');
} finally {
// Cleanup temporary middleware:
db.unuse({ stack: 'dbcore', name: 'temp-logger' });
db.close();
await db.open();
}
});


promisedTest("bulkUpdate with failure", async ()=>{
try {
await db.users.bulkUpdate([
{key: "nonexisting", changes: {username: "xyz"}}, // Shall be ignored
{key: idOfFirstUser, changes: {username: "kceder"}}, // Shall fail (unique index)
{key: idOfFirstUser + 1, changes: {first: "Baz"}} // Shall succeed
]);
throw new Error("Should not have succeeded");
} catch (error) {
equal(error.failures.length, 1, "Should be 1 failure");
const failurePositions = Object.keys(error.failuresByPos);
equal(failurePositions.length, 1, "failuresByPos should have one key only (array with holes)");
const failurePosition = failurePositions[0];
equal(failurePosition, 1, "The failure should have occurred at position 1");
const failure = error.failuresByPos[failurePosition];
ok(failure != null && failure instanceof Error, "There was a failure and it was an error");
}
const allItems = await db.users.toArray();
const expected = [
{first: "David", username: "dfahlander" },
{first: "Baz", username: "kceder" }
];
deepEqualPartial(allItems, expected, "The end result in a non-transactional bulkUpdate() should be that non-failing entries succeeded to update");
});

promisedTest("bulkUpdate with failure (transactional)", async ()=>{
try {
await db.transaction('rw', db.users, async () => {
await db.users.bulkUpdate([
{key: "nonexisting", changes: {username: "xyz"}}, // Shall be ignored
{key: idOfFirstUser, changes: {username: "kceder"}}, // Shall fail (unique index)
{key: idOfFirstUser + 1, changes: {"foo.bar": 102}} // Shall succeed
]);
});
throw new Error("Should not have succeeded");
} catch (error) {
equal(error.failures.length, 1, "Should be 1 failure");
const failurePositions = Object.keys(error.failuresByPos);
equal(failurePositions.length, 1, "failuresByPos should have one key only (array with holes)");
const failurePosition = failurePositions[0];
equal(failurePosition, 1, "The failure should have occurred at position 1");
const failure = error.failuresByPos[failurePosition];
ok(failure != null && failure instanceof Error, "There was a failure and it was an error");
}
const allItems = await db.users.toArray();
const expected = [
{first: "David", username: "dfahlander" },
{first: "Karl", username: "kceder" }
];
deepEqualPartial(allItems, expected, "The end result in a transactional bulkUpdate() should be that no entries succeeeded to update if not catching error within transaction");
});

promisedTest("bulkUpdate with failure (transactional with catch)", async ()=>{
await db.transaction('rw', db.users, async () => {
try {
await db.users.bulkUpdate([
{key: "nonexisting", changes: {username: "xyz"}}, // Shall be ignored
{key: idOfFirstUser, changes: {username: "kceder"}}, // Shall fail (unique index)
{key: idOfFirstUser + 1, changes: {"foo.bar": 102}} // Shall succeed
]);
} catch {}
});
const allItems = await db.users.toArray();
const expected = [
{first: "David", last: "Fahlander", username: "dfahlander" },
{first: "Karl", last: "Faadersköld", username: "kceder", foo: {bar: 102} }
];
deepEqualPartial(allItems, expected, "The end result in a transactional bulkUpdate() (with catch inside transaction) should be that non-failing entries succeeded to update");
});

asyncTest("add", function () {
db.transaction("rw", db.users, function () {
Expand Down Expand Up @@ -956,8 +1112,30 @@ asyncTest("failNotIncludedStoreTrans", () => {
});

// Must use this rather than QUnit's deepEqual() because that one fails on Safari when run via karma-browserstack-launcher
export function deepEqual(a, b, description) {
equal(JSON.stringify(a, null, 2), JSON.stringify(b, null, 2), description);
export function deepEqual(actual, expected, description) {
equal(JSON.stringify(actual, null, 2), JSON.stringify(expected, null, 2), description);
}

function stripObj(obj, props) {
const rv = {};
for (const key of props.slice().sort()) {
rv[key] = obj[key];
}
return rv;
}

function sortObj(obj) {
return stripObj(obj, Object.keys(obj));
}

export function deepEqualPartial(actual, expected, description) {
if (Array.isArray(actual)) {
return deepEqual(
actual.map((a, idx) => stripObj(a, Object.keys(expected[idx]))),
expected.map(sortObj),
description);
}
return deepEqual(stripObj(actual, Object.keys(expected)), sortObj(expected), description);
}

promisedTest("bulkGet()", async () => {
Expand Down

0 comments on commit 1ed9668

Please sign in to comment.