Skip to content

Commit

Permalink
馃毟 (dify) Improve error display when streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
baptisteArno committed May 25, 2024
1 parent e015385 commit e1f1b58
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export async function POST(req: Request) {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
set: (_1: string, _2: unknown) => {},
}
const stream = await action.run.stream.run({
const { stream } = await action.run.stream.run({
credentials: decryptedCredentials,
options: block.options,
variables,
Expand Down
13 changes: 11 additions & 2 deletions packages/bot-engine/apiHandlers/getMessageStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ type Props = {
messages: OpenAI.Chat.ChatCompletionMessage[] | undefined
}

export const getMessageStream = async ({ sessionId, messages }: Props) => {
export const getMessageStream = async ({
sessionId,
messages,
}: Props): Promise<{
stream?: ReadableStream<any>
status?: number
message?: string
}> => {
const session = await getSession(sessionId)

if (!session?.state || !session.state.currentBlockId)
Expand Down Expand Up @@ -130,13 +137,15 @@ export const getMessageStream = async ({ sessionId, messages }: Props) => {
})
},
}
const stream = await action.run.stream.run({
const { stream, httpError } = await action.run.stream.run({
credentials: decryptedCredentials,
options: deepParseVariables(
session.state.typebotsQueue[0].typebot.variables
)(block.options),
variables,
})
if (httpError) return httpError

if (!stream) return { status: 500, message: 'Could not create stream' }

return { stream }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export const createChatMessage = createAction({
stream: true,
})

return AnthropicStream(response)
return { stream: AnthropicStream(response) }
},
},
},
Expand Down
162 changes: 91 additions & 71 deletions packages/forge/blocks/difyAi/actions/createChatMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { isDefined, isEmpty, isNotEmpty } from '@typebot.io/lib'
import { auth } from '../auth'
import { defaultBaseUrl } from '../constants'
import { Chunk } from '../types'
import ky from 'ky'
import ky, { HTTPError } from 'ky'
import { deprecatedCreateChatMessageOptions } from '../deprecated'

export const createChatMessage = createAction({
Expand Down Expand Up @@ -68,77 +68,94 @@ export const createChatMessage = createAction({
const existingDifyConversationId = conversationVariableId
? variables.get(conversationVariableId)
: conversation_id
const response = await ky(
(apiEndpoint ?? defaultBaseUrl) + '/v1/chat-messages',
{
method: 'POST',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
inputs:
inputs?.reduce((acc, { key, value }) => {
if (isEmpty(key) || isEmpty(value)) return acc
return {
...acc,
[key]: value,
}
}, {}) ?? {},
query,
response_mode: 'streaming',
conversation_id: existingDifyConversationId,
user,
files: [],
}),
}
)
const reader = response.body?.getReader()
try {
const response = await ky(
(apiEndpoint ?? defaultBaseUrl) + '/v1/chat-messages',
{
method: 'POST',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
inputs:
inputs?.reduce((acc, { key, value }) => {
if (isEmpty(key) || isEmpty(value)) return acc
return {
...acc,
[key]: value,
}
}, {}) ?? {},
query,
response_mode: 'streaming',
conversation_id: existingDifyConversationId,
user,
files: [],
}),
}
)
const reader = response.body?.getReader()

if (!reader) return
if (!reader) return {}

return new ReadableStream({
async start(controller) {
try {
await processDifyStream(reader, {
onDone: () => {
controller.close()
},
onMessage: (message) => {
controller.enqueue(
new TextEncoder().encode('0:"' + message + '"\n')
)
},
onMessageEnd({ totalTokens, conversationId }) {
if (
conversationVariableId &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(conversationVariableId, conversationId)
return {
stream: new ReadableStream({
async start(controller) {
try {
await processDifyStream(reader, {
onDone: () => {
controller.close()
},
onMessage: (message) => {
controller.enqueue(
new TextEncoder().encode(
'0:"' + message.replace(/"/g, '\\"') + '"\n'
)
)
},
onMessageEnd({ totalTokens, conversationId }) {
if (
conversationVariableId &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(conversationVariableId, conversationId)

if ((responseMapping?.length ?? 0) === 0) return
responseMapping?.forEach((mapping) => {
if (!mapping.variableId) return
if ((responseMapping?.length ?? 0) === 0) return
responseMapping?.forEach((mapping) => {
if (!mapping.variableId) return

if (
mapping.item === 'Conversation ID' &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(mapping.variableId, conversationId)
if (
mapping.item === 'Conversation ID' &&
isNotEmpty(conversationId) &&
isEmpty(existingDifyConversationId?.toString())
)
variables.set(mapping.variableId, conversationId)

if (mapping.item === 'Total Tokens')
variables.set(mapping.variableId, totalTokens)
if (mapping.item === 'Total Tokens')
variables.set(mapping.variableId, totalTokens)
})
},
})
},
})
} catch (e) {
console.error(e)
controller.error(e) // Properly closing the stream with an error
} catch (e) {
console.error(e)
controller.error(e) // Properly closing the stream with an error
}
},
}),
}
} catch (err) {
if (err instanceof HTTPError) {
return {
httpError: {
status: err.response.status,
message: await err.response.text(),
},
}
},
})
}
console.error(err)
return {}
}
},
},
server: async ({
Expand Down Expand Up @@ -243,12 +260,15 @@ export const createChatMessage = createAction({
if (item === 'Total Tokens')
variables.set(mapping.variableId, totalTokens)
})
} catch (error) {
logs.add({
status: 'error',
description: 'Failed to create chat message',
})
console.error(error)
} catch (err) {
if (err instanceof HTTPError) {
return logs.add({
status: 'error',
description: err.message,
details: await err.response.text(),
})
}
console.error(err)
}
},
},
Expand Down
4 changes: 2 additions & 2 deletions packages/forge/blocks/mistral/actions/createChatCompletion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ export const createChatCompletion = createAction({
(res) => res.item === 'Message content' || !res.item
)?.variableId,
run: async ({ credentials: { apiKey }, options, variables }) => {
if (!options.model) return
if (!options.model) return {}
const model = createMistral({
apiKey,
})(options.model)
Expand All @@ -149,7 +149,7 @@ export const createChatCompletion = createAction({
messages: parseMessages({ options, variables }),
})

return response.toAIStream()
return { stream: response.toAIStream() }
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ export const createChatCompletion = createAction({
}),
stream: {
getStreamVariableId: getChatCompletionStreamVarId,
run: (params) =>
runChatCompletionStream({
run: async (params) => ({
stream: await runChatCompletionStream({
...params,
config: { baseUrl: defaultOpenRouterOptions.baseUrl },
}),
}),
},
},
})
5 changes: 3 additions & 2 deletions packages/forge/blocks/openai/actions/askAssistant.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ export const askAssistant = createAction({
getStreamVariableId: ({ responseMapping }) =>
responseMapping?.find((m) => !m.item || m.item === 'Message')
?.variableId,
run: async ({ credentials, options, variables }) =>
createAssistantStream({
run: async ({ credentials, options, variables }) => ({
stream: await createAssistantStream({
apiKey: credentials.apiKey,
assistantId: options.assistantId,
message: options.message,
Expand All @@ -154,6 +154,7 @@ export const askAssistant = createAction({
functions: options.functions,
responseMapping: options.responseMapping,
}),
}),
},
server: async ({
credentials: { apiKey },
Expand Down
5 changes: 3 additions & 2 deletions packages/forge/blocks/openai/actions/createChatCompletion.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,15 @@ export const createChatCompletion = createAction({
}),
stream: {
getStreamVariableId: getChatCompletionStreamVarId,
run: (params) =>
runChatCompletionStream({
run: async (params) => ({
stream: await runChatCompletionStream({
...params,
config: {
baseUrl: defaultOpenAIOptions.baseUrl,
defaultModel: defaultOpenAIOptions.model,
},
}),
}),
},
},
})
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ export const createChatCompletion = createAction({
}),
stream: {
getStreamVariableId: getChatCompletionStreamVarId,
run: (params) =>
runChatCompletionStream({
run: async (params) => ({
stream: await runChatCompletionStream({
...params,
config: { baseUrl: defaultTogetherOptions.baseUrl },
}),
}),
},
},
})
5 changes: 4 additions & 1 deletion packages/forge/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ export type ActionDefinition<
credentials: CredentialsFromAuthDef<A>
options: z.infer<BaseOptions> & z.infer<Options>
variables: VariableStore
}) => Promise<ReadableStream<any> | undefined>
}) => Promise<{
stream?: ReadableStream<any>
httpError?: { status: number; message: string }
}>
}
web?: {
displayEmbedBubble?: {
Expand Down

0 comments on commit e1f1b58

Please sign in to comment.