Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search] Search batching using bfetch (again) #84043

Merged
merged 45 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
20bb351
Use bfetch for search (no abort behavior)
Nov 10, 2020
8b4eb1c
fix merge
Nov 10, 2020
9eebded
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 15, 2020
812a06b
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 16, 2020
aaa3122
Handle request abortion + unit tests
Nov 16, 2020
b1a2cf2
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 16, 2020
2cdfc47
fix jest
Nov 16, 2020
e8fc98f
shim totals in oss
Nov 16, 2020
993e385
proper formatting for errors
Nov 17, 2020
e89a46f
jest, types and docs
Nov 17, 2020
b727e49
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 17, 2020
3de838c
Fix doc
Nov 17, 2020
17de9fa
Remove old search code and rename UI Setting
Nov 17, 2020
56e7a46
jest mocks
Nov 17, 2020
5aaeefa
jest
Nov 17, 2020
c393416
Solve unhanled error
Nov 17, 2020
a1b2979
Use AbortSignal
Nov 17, 2020
811da0a
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 18, 2020
162473d
ts
Nov 18, 2020
8c83657
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 18, 2020
662b810
code review - use abort controller instead of observable
Nov 18, 2020
41f6406
Revert "Remove old search code and rename UI Setting"
Nov 18, 2020
cbd8106
Remove old search code and rename UI Setting
Nov 17, 2020
1ec890a
revert search route
Nov 18, 2020
3e334a7
fix event unsubscribe
Nov 18, 2020
c7aebdf
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 19, 2020
8abff48
code review 2
Nov 19, 2020
b4193e2
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 19, 2020
9d16439
revert filter
Nov 19, 2020
4a1e69c
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 19, 2020
bcd894f
Merge branch 'master' into enhance/use-bfetch-in-search
kibanamachine Nov 21, 2020
d57b8dc
simplify batch done logic
Nov 22, 2020
6b9a4f8
code review
Nov 22, 2020
4bd2a96
Merge branch 'enhance/use-bfetch-in-search' of github.com:lizozom/kib…
Nov 22, 2020
e9103b7
filter items in the beginning
Nov 22, 2020
3fcac6e
jest
Nov 22, 2020
be828c2
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 23, 2020
b5a0a44
Handle JSON parse error in bfetch + test
Nov 23, 2020
4413f8a
Adjust Security Solution Cypress tests to bsearch
patrykkopycinski Nov 24, 2020
55629da
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
patrykkopycinski Nov 24, 2020
42ec042
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 24, 2020
6e8cc8d
Merge branch 'enhance/use-bfetch-in-search' of github.com:lizozom/kib…
Nov 24, 2020
9cc2922
Merge branch 'master' of github.com:elastic/kibana into enhance/use-b…
Nov 25, 2020
77148ca
shimmy
Nov 25, 2020
1c925de
fix cypress
patrykkopycinski Nov 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
```

## Parameters

| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataStartDependencies, DataPublicPluginStart&gt;</code> | |
| { expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |
| { bfetch, expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |

<b>Returns:</b>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [kibana-plugin-plugins-data-public](./kibana-plugin-plugins-data-public.md) &gt; [SearchInterceptorDeps](./kibana-plugin-plugins-data-public.searchinterceptordeps.md) &gt; [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md)

## SearchInterceptorDeps.bfetch property

<b>Signature:</b>

```typescript
bfetch: BfetchPublicSetup;
```
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface SearchInterceptorDeps

| Property | Type | Description |
| --- | --- | --- |
| [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md) | <code>BfetchPublicSetup</code> | |
| [http](./kibana-plugin-plugins-data-public.searchinterceptordeps.http.md) | <code>CoreSetup['http']</code> | |
| [session](./kibana-plugin-plugins-data-public.searchinterceptordeps.session.md) | <code>ISessionService</code> | |
| [startServices](./kibana-plugin-plugins-data-public.searchinterceptordeps.startservices.md) | <code>Promise&lt;[CoreStart, any, unknown]&gt;</code> | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<b>Signature:</b>

```typescript
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressions, usageCollection }: DataPluginSetupDependencies): {
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): {
__enhance: (enhancements: DataEnhancements) => void;
search: ISearchSetup;
fieldFormats: {
Expand All @@ -21,7 +21,7 @@ setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressio
| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataPluginStartDependencies, DataPluginStart&gt;</code> | |
| { expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |
| { bfetch, expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |

<b>Returns:</b>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { defer, of } from '../../../kibana_utils/public';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';

const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Expand Down Expand Up @@ -168,6 +168,28 @@ describe('createStreamingBatchedFunction()', () => {
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});

test('ignores a request with an aborted signal', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
abortController.abort();

of(fn({ foo: 'bar' }, abortController.signal));
fn({ baz: 'quix' });

await new Promise((r) => setTimeout(r, 6));
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ baz: 'quix' }],
});
});

test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
Expand Down Expand Up @@ -423,6 +445,73 @@ describe('createStreamingBatchedFunction()', () => {
expect(result3).toEqual({ b: '3' });
});

describe('when requests are aborted', () => {
test('aborts stream when all are aborted', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' }, abortController.signal);
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);
expect(await isPending(promise2)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
expect(await isPending(promise2)).toBe(false);
const [, error] = await of(promise);
const [, error2] = await of(promise2);
expect(error).toBeInstanceOf(AbortError);
expect(error2).toBeInstanceOf(AbortError);
expect(fetchStreaming.mock.calls[0][0].signal.aborted).toBeTruthy();
});

test('rejects promise on abort and lets others continue', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' });
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(true);

abortController.abort();
await new Promise((r) => setTimeout(r, 6));

expect(await isPending(promise)).toBe(false);
const [, error] = await of(promise);
expect(error).toBeInstanceOf(AbortError);

stream.next(
JSON.stringify({
id: 1,
result: { b: '2' },
}) + '\n'
);

await new Promise((r) => setTimeout(r, 1));

const [result2] = await of(promise2);
expect(result2).toEqual({ b: '2' });
});
});

describe('when stream closes prematurely', () => {
test('rejects pending promises with CONNECTION error code', async () => {
const { fetchStreaming, stream } = setup();
Expand Down Expand Up @@ -558,5 +647,41 @@ describe('createStreamingBatchedFunction()', () => {
});
});
});

test('rejects with STREAM error on JSON parse error only pending promises', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});

const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));

await new Promise((r) => setTimeout(r, 6));

stream.next(
JSON.stringify({
id: 1,
result: { b: '1' },
}) + '\n'
);

stream.next('Not a JSON\n');

await new Promise((r) => setTimeout(r, 1));

const [, error1] = await promise1;
const [result1] = await promise2;
expect(error1).toMatchObject({
message: 'Unexpected token N in JSON at position 0',
code: 'STREAM',
});
expect(result1).toMatchObject({
b: '1',
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { defer, Defer } from '../../../kibana_utils/public';
import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
Expand All @@ -27,13 +27,7 @@ import {
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';

export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
}

export type BatchedFunc<Payload, Result> = (payload: Payload) => Promise<Result>;
import { BatchedFunc, BatchItem } from './types';

export interface BatchedFunctionProtocolError extends ErrorLike {
code: string;
Expand Down Expand Up @@ -82,43 +76,84 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
flushOnMaxItems = 25,
maxItemAge = 10,
} = params;
const [fn] = createBatchedFunction<BatchedFunc<Payload, Result>, BatchItem<Payload, Result>>({
onCall: (payload: Payload) => {
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
const future = defer<Result>();
const entry: BatchItem<Payload, Result> = {
payload,
future,
signal,
};
return [future.promise, entry];
},
onBatch: async (items) => {
try {
let responsesReceived = 0;
const batch = items.map(({ payload }) => payload);
// Filter out any items whose signal is already aborted
items = items.filter((item) => {
if (item.signal?.aborted) item.future.reject(new AbortError());
return !item.signal?.aborted;
});

const donePromises: Array<Promise<any>> = items.map((item) => {
return new Promise<void>((resolve) => {
const { promise: abortPromise, cleanup } = item.signal
? abortSignalToPromise(item.signal)
: {
promise: undefined,
cleanup: () => {},
};

const onDone = () => {
resolve();
cleanup();
};
if (abortPromise)
abortPromise.catch(() => {
item.future.reject(new AbortError());
onDone();
});
item.future.promise.then(onDone, onDone);
});
});

// abort when all items were either resolved, rejected or aborted
const abortController = new AbortController();
let isBatchDone = false;
Promise.all(donePromises).then(() => {
isBatchDone = true;
abortController.abort();
});
const batch = items.map((item) => item.payload);

const { stream } = fetchStreamingInjected({
url,
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
});

const handleStreamError = (error: any) => {
const normalizedError = normalizeError<BatchedFunctionProtocolError>(error);
normalizedError.code = 'STREAM';
for (const { future } of items) future.reject(normalizedError);
};

stream.pipe(split('\n')).subscribe({
next: (json: string) => {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
if (response.error) {
responsesReceived++;
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
responsesReceived++;
items[response.id].future.resolve(response.result);
try {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
if (response.error) {
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
items[response.id].future.resolve(response.result);
}
} catch (e) {
handleStreamError(e);
}
},
error: (error) => {
const normalizedError = normalizeError<BatchedFunctionProtocolError>(error);
normalizedError.code = 'STREAM';
for (const { future } of items) future.reject(normalizedError);
},
error: handleStreamError,
complete: () => {
const streamTerminatedPrematurely = responsesReceived !== items.length;
if (streamTerminatedPrematurely) {
if (!isBatchDone) {
const error: BatchedFunctionProtocolError = {
message: 'Connection terminated prematurely.',
code: 'CONNECTION',
Expand Down
31 changes: 31 additions & 0 deletions src/plugins/bfetch/public/batching/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { Defer } from '../../../kibana_utils/public';

export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
signal?: AbortSignal;
}

export type BatchedFunc<Payload, Result> = (
payload: Payload,
signal?: AbortSignal
) => Promise<Result>;
2 changes: 2 additions & 0 deletions src/plugins/bfetch/public/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import { BfetchPublicPlugin } from './plugin';
export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin';
export { split } from './streaming';

export { BatchedFunc } from './batching/types';

export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchPublicPlugin(initializerContext);
}
2 changes: 1 addition & 1 deletion src/plugins/bfetch/public/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './
import { removeLeadingSlash } from '../common';
import {
createStreamingBatchedFunction,
BatchedFunc,
StreamingBatchedFunctionParams,
} from './batching/create_streaming_batched_function';
import { BatchedFunc } from './batching/types';

// eslint-disable-next-line
export interface BfetchPublicSetupDependencies {}
Expand Down
Loading