From e1f1b58c1c68a66b24bfb7de4868ae15074ac39b Mon Sep 17 00:00:00 2001 From: Baptiste Arnaud Date: Sat, 25 May 2024 19:59:34 +0200 Subject: [PATCH] :children_crossing: (dify) Improve error display when streaming --- .../api/integrations/openai/streamer/route.ts | 2 +- .../apiHandlers/getMessageStream.ts | 13 +- .../anthropic/actions/createChatMessage.tsx | 2 +- .../difyAi/actions/createChatMessage.ts | 162 ++++++++++-------- .../mistral/actions/createChatCompletion.ts | 4 +- .../actions/createChatCompletion.tsx | 5 +- .../blocks/openai/actions/askAssistant.tsx | 5 +- .../openai/actions/createChatCompletion.tsx | 5 +- .../actions/createChatCompletion.tsx | 5 +- packages/forge/core/types.ts | 5 +- 10 files changed, 122 insertions(+), 86 deletions(-) diff --git a/apps/viewer/src/app/api/integrations/openai/streamer/route.ts b/apps/viewer/src/app/api/integrations/openai/streamer/route.ts index a8bfe137c4..07ab21068d 100644 --- a/apps/viewer/src/app/api/integrations/openai/streamer/route.ts +++ b/apps/viewer/src/app/api/integrations/openai/streamer/route.ts @@ -153,7 +153,7 @@ export async function POST(req: Request) { // eslint-disable-next-line @typescript-eslint/no-unused-vars set: (_1: string, _2: unknown) => {}, } - const stream = await action.run.stream.run({ + const { stream } = await action.run.stream.run({ credentials: decryptedCredentials, options: block.options, variables, diff --git a/packages/bot-engine/apiHandlers/getMessageStream.ts b/packages/bot-engine/apiHandlers/getMessageStream.ts index a3edbaa3a2..cc2bba2eee 100644 --- a/packages/bot-engine/apiHandlers/getMessageStream.ts +++ b/packages/bot-engine/apiHandlers/getMessageStream.ts @@ -23,7 +23,14 @@ type Props = { messages: OpenAI.Chat.ChatCompletionMessage[] | undefined } -export const getMessageStream = async ({ sessionId, messages }: Props) => { +export const getMessageStream = async ({ + sessionId, + messages, +}: Props): Promise<{ + stream?: ReadableStream + status?: number + message?: string +}> => { const session = await getSession(sessionId) if (!session?.state || !session.state.currentBlockId) @@ -130,13 +137,15 @@ export const getMessageStream = async ({ sessionId, messages }: Props) => { }) }, } - const stream = await action.run.stream.run({ + const { stream, httpError } = await action.run.stream.run({ credentials: decryptedCredentials, options: deepParseVariables( session.state.typebotsQueue[0].typebot.variables )(block.options), variables, }) + if (httpError) return httpError + if (!stream) return { status: 500, message: 'Could not create stream' } return { stream } diff --git a/packages/forge/blocks/anthropic/actions/createChatMessage.tsx b/packages/forge/blocks/anthropic/actions/createChatMessage.tsx index 26c3666058..555d846b09 100644 --- a/packages/forge/blocks/anthropic/actions/createChatMessage.tsx +++ b/packages/forge/blocks/anthropic/actions/createChatMessage.tsx @@ -168,7 +168,7 @@ export const createChatMessage = createAction({ stream: true, }) - return AnthropicStream(response) + return { stream: AnthropicStream(response) } }, }, }, diff --git a/packages/forge/blocks/difyAi/actions/createChatMessage.ts b/packages/forge/blocks/difyAi/actions/createChatMessage.ts index d5d4d505bd..1c969cb0a3 100644 --- a/packages/forge/blocks/difyAi/actions/createChatMessage.ts +++ b/packages/forge/blocks/difyAi/actions/createChatMessage.ts @@ -3,7 +3,7 @@ import { isDefined, isEmpty, isNotEmpty } from '@typebot.io/lib' import { auth } from '../auth' import { defaultBaseUrl } from '../constants' import { Chunk } from '../types' -import ky from 'ky' +import ky, { HTTPError } from 'ky' import { deprecatedCreateChatMessageOptions } from '../deprecated' export const createChatMessage = createAction({ @@ -68,77 +68,94 @@ export const createChatMessage = createAction({ const existingDifyConversationId = conversationVariableId ? variables.get(conversationVariableId) : conversation_id - const response = await ky( - (apiEndpoint ?? defaultBaseUrl) + '/v1/chat-messages', - { - method: 'POST', - headers: { - Authorization: `Bearer ${apiKey}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - inputs: - inputs?.reduce((acc, { key, value }) => { - if (isEmpty(key) || isEmpty(value)) return acc - return { - ...acc, - [key]: value, - } - }, {}) ?? {}, - query, - response_mode: 'streaming', - conversation_id: existingDifyConversationId, - user, - files: [], - }), - } - ) - const reader = response.body?.getReader() + try { + const response = await ky( + (apiEndpoint ?? defaultBaseUrl) + '/v1/chat-messages', + { + method: 'POST', + headers: { + Authorization: `Bearer ${apiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + inputs: + inputs?.reduce((acc, { key, value }) => { + if (isEmpty(key) || isEmpty(value)) return acc + return { + ...acc, + [key]: value, + } + }, {}) ?? {}, + query, + response_mode: 'streaming', + conversation_id: existingDifyConversationId, + user, + files: [], + }), + } + ) + const reader = response.body?.getReader() - if (!reader) return + if (!reader) return {} - return new ReadableStream({ - async start(controller) { - try { - await processDifyStream(reader, { - onDone: () => { - controller.close() - }, - onMessage: (message) => { - controller.enqueue( - new TextEncoder().encode('0:"' + message + '"\n') - ) - }, - onMessageEnd({ totalTokens, conversationId }) { - if ( - conversationVariableId && - isNotEmpty(conversationId) && - isEmpty(existingDifyConversationId?.toString()) - ) - variables.set(conversationVariableId, conversationId) + return { + stream: new ReadableStream({ + async start(controller) { + try { + await processDifyStream(reader, { + onDone: () => { + controller.close() + }, + onMessage: (message) => { + controller.enqueue( + new TextEncoder().encode( + '0:"' + message.replace(/"/g, '\\"') + '"\n' + ) + ) + }, + onMessageEnd({ totalTokens, conversationId }) { + if ( + conversationVariableId && + isNotEmpty(conversationId) && + isEmpty(existingDifyConversationId?.toString()) + ) + variables.set(conversationVariableId, conversationId) - if ((responseMapping?.length ?? 0) === 0) return - responseMapping?.forEach((mapping) => { - if (!mapping.variableId) return + if ((responseMapping?.length ?? 0) === 0) return + responseMapping?.forEach((mapping) => { + if (!mapping.variableId) return - if ( - mapping.item === 'Conversation ID' && - isNotEmpty(conversationId) && - isEmpty(existingDifyConversationId?.toString()) - ) - variables.set(mapping.variableId, conversationId) + if ( + mapping.item === 'Conversation ID' && + isNotEmpty(conversationId) && + isEmpty(existingDifyConversationId?.toString()) + ) + variables.set(mapping.variableId, conversationId) - if (mapping.item === 'Total Tokens') - variables.set(mapping.variableId, totalTokens) + if (mapping.item === 'Total Tokens') + variables.set(mapping.variableId, totalTokens) + }) + }, }) - }, - }) - } catch (e) { - console.error(e) - controller.error(e) // Properly closing the stream with an error + } catch (e) { + console.error(e) + controller.error(e) // Properly closing the stream with an error + } + }, + }), + } + } catch (err) { + if (err instanceof HTTPError) { + return { + httpError: { + status: err.response.status, + message: await err.response.text(), + }, } - }, - }) + } + console.error(err) + return {} + } }, }, server: async ({ @@ -243,12 +260,15 @@ export const createChatMessage = createAction({ if (item === 'Total Tokens') variables.set(mapping.variableId, totalTokens) }) - } catch (error) { - logs.add({ - status: 'error', - description: 'Failed to create chat message', - }) - console.error(error) + } catch (err) { + if (err instanceof HTTPError) { + return logs.add({ + status: 'error', + description: err.message, + details: await err.response.text(), + }) + } + console.error(err) } }, }, diff --git a/packages/forge/blocks/mistral/actions/createChatCompletion.ts b/packages/forge/blocks/mistral/actions/createChatCompletion.ts index 0f65b47561..a9327fb4ac 100644 --- a/packages/forge/blocks/mistral/actions/createChatCompletion.ts +++ b/packages/forge/blocks/mistral/actions/createChatCompletion.ts @@ -139,7 +139,7 @@ export const createChatCompletion = createAction({ (res) => res.item === 'Message content' || !res.item )?.variableId, run: async ({ credentials: { apiKey }, options, variables }) => { - if (!options.model) return + if (!options.model) return {} const model = createMistral({ apiKey, })(options.model) @@ -149,7 +149,7 @@ export const createChatCompletion = createAction({ messages: parseMessages({ options, variables }), }) - return response.toAIStream() + return { stream: response.toAIStream() } }, }, }, diff --git a/packages/forge/blocks/openRouter/actions/createChatCompletion.tsx b/packages/forge/blocks/openRouter/actions/createChatCompletion.tsx index 06cdb565d4..a23d5e8a34 100644 --- a/packages/forge/blocks/openRouter/actions/createChatCompletion.tsx +++ b/packages/forge/blocks/openRouter/actions/createChatCompletion.tsx @@ -62,11 +62,12 @@ export const createChatCompletion = createAction({ }), stream: { getStreamVariableId: getChatCompletionStreamVarId, - run: (params) => - runChatCompletionStream({ + run: async (params) => ({ + stream: await runChatCompletionStream({ ...params, config: { baseUrl: defaultOpenRouterOptions.baseUrl }, }), + }), }, }, }) diff --git a/packages/forge/blocks/openai/actions/askAssistant.tsx b/packages/forge/blocks/openai/actions/askAssistant.tsx index fbf1e4bbc3..a9756aedd8 100644 --- a/packages/forge/blocks/openai/actions/askAssistant.tsx +++ b/packages/forge/blocks/openai/actions/askAssistant.tsx @@ -142,8 +142,8 @@ export const askAssistant = createAction({ getStreamVariableId: ({ responseMapping }) => responseMapping?.find((m) => !m.item || m.item === 'Message') ?.variableId, - run: async ({ credentials, options, variables }) => - createAssistantStream({ + run: async ({ credentials, options, variables }) => ({ + stream: await createAssistantStream({ apiKey: credentials.apiKey, assistantId: options.assistantId, message: options.message, @@ -154,6 +154,7 @@ export const askAssistant = createAction({ functions: options.functions, responseMapping: options.responseMapping, }), + }), }, server: async ({ credentials: { apiKey }, diff --git a/packages/forge/blocks/openai/actions/createChatCompletion.tsx b/packages/forge/blocks/openai/actions/createChatCompletion.tsx index 90aa58168b..fdb0e413b1 100644 --- a/packages/forge/blocks/openai/actions/createChatCompletion.tsx +++ b/packages/forge/blocks/openai/actions/createChatCompletion.tsx @@ -86,14 +86,15 @@ export const createChatCompletion = createAction({ }), stream: { getStreamVariableId: getChatCompletionStreamVarId, - run: (params) => - runChatCompletionStream({ + run: async (params) => ({ + stream: await runChatCompletionStream({ ...params, config: { baseUrl: defaultOpenAIOptions.baseUrl, defaultModel: defaultOpenAIOptions.model, }, }), + }), }, }, }) diff --git a/packages/forge/blocks/togetherAi/actions/createChatCompletion.tsx b/packages/forge/blocks/togetherAi/actions/createChatCompletion.tsx index ff9191e350..f8375b981c 100644 --- a/packages/forge/blocks/togetherAi/actions/createChatCompletion.tsx +++ b/packages/forge/blocks/togetherAi/actions/createChatCompletion.tsx @@ -45,11 +45,12 @@ export const createChatCompletion = createAction({ }), stream: { getStreamVariableId: getChatCompletionStreamVarId, - run: (params) => - runChatCompletionStream({ + run: async (params) => ({ + stream: await runChatCompletionStream({ ...params, config: { baseUrl: defaultTogetherOptions.baseUrl }, }), + }), }, }, }) diff --git a/packages/forge/core/types.ts b/packages/forge/core/types.ts index 907f1ea6f7..325a94fcd4 100644 --- a/packages/forge/core/types.ts +++ b/packages/forge/core/types.ts @@ -64,7 +64,10 @@ export type ActionDefinition< credentials: CredentialsFromAuthDef options: z.infer & z.infer variables: VariableStore - }) => Promise | undefined> + }) => Promise<{ + stream?: ReadableStream + httpError?: { status: number; message: string } + }> } web?: { displayEmbedBubble?: {