From e1af72daa73a12ee96035548f81fd364d5da3c4c Mon Sep 17 00:00:00 2001 From: Timothy Carambat Date: Thu, 30 Jan 2025 14:55:20 -0800 Subject: [PATCH] parse streams from `reasoning_content` from DeepSeek API (#3068) --- server/utils/AiProviders/deepseek/index.js | 172 ++++++++++++++++++++- 1 file changed, 164 insertions(+), 8 deletions(-) diff --git a/server/utils/AiProviders/deepseek/index.js b/server/utils/AiProviders/deepseek/index.js index 7bc804bb..694f8550 100644 --- a/server/utils/AiProviders/deepseek/index.js +++ b/server/utils/AiProviders/deepseek/index.js @@ -2,10 +2,9 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); const { LLMPerformanceMonitor, } = require("../../helpers/chat/LLMPerformanceMonitor"); -const { - handleDefaultStreamResponseV2, -} = require("../../helpers/chat/responses"); +const { v4: uuidv4 } = require("uuid"); const { MODEL_MAP } = require("../modelMap"); +const { writeResponseChunk, clientAbortedHandler } = require("../../helpers/chat/responses"); class DeepSeekLLM { constructor(embedder = null, modelPreference = null) { @@ -27,6 +26,11 @@ class DeepSeekLLM { this.embedder = embedder ?? new NativeEmbedder(); this.defaultTemp = 0.7; + this.log("Initialized with model:", this.model); + } + + log(text, ...args) { + console.log(`\x1b[36m[${this.constructor.name}]\x1b[0m ${text}`, ...args); } #appendContext(contextTexts = []) { @@ -71,6 +75,21 @@ class DeepSeekLLM { return [prompt, ...chatHistory, { role: "user", content: userPrompt }]; } + /** + * Parses and prepends reasoning from the response and returns the full text response. + * @param {Object} response + * @returns {string} + */ + #parseReasoningFromResponse({ message }) { + let textResponse = message?.content; + if ( + !!message?.reasoning_content && + message.reasoning_content.trim().length > 0 + ) + textResponse = `${message.reasoning_content}${textResponse}`; + return textResponse; + } + async getChatCompletion(messages = null, { temperature = 0.7 }) { if (!(await this.isValidChatCompletionModel(this.model))) throw new Error( @@ -90,13 +109,15 @@ class DeepSeekLLM { ); if ( - !result.output.hasOwnProperty("choices") || - result.output.choices.length === 0 + !result?.output?.hasOwnProperty("choices") || + result?.output?.choices?.length === 0 ) - return null; + throw new Error( + `Invalid response body returned from DeepSeek: ${JSON.stringify(result.output)}` + ); return { - textResponse: result.output.choices[0].message.content, + textResponse: this.#parseReasoningFromResponse(result.output.choices[0]), metrics: { prompt_tokens: result.output.usage.prompt_tokens || 0, completion_tokens: result.output.usage.completion_tokens || 0, @@ -127,8 +148,143 @@ class DeepSeekLLM { return measuredStreamRequest; } + // TODO: This is a copy of the generic handleStream function in responses.js + // to specifically handle the DeepSeek reasoning model `reasoning_content` field. + // When or if ever possible, we should refactor this to be in the generic function. handleStream(response, stream, responseProps) { - return handleDefaultStreamResponseV2(response, stream, responseProps); + const { uuid = uuidv4(), sources = [] } = responseProps; + let hasUsageMetrics = false; + let usage = { + completion_tokens: 0, + }; + + return new Promise(async (resolve) => { + let fullText = ""; + let reasoningText = ""; + + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => { + stream?.endMeasurement(usage); + clientAbortedHandler(resolve, fullText); + }; + response.on("close", handleAbort); + + try { + for await (const chunk of stream) { + const message = chunk?.choices?.[0]; + const token = message?.delta?.content; + const reasoningToken = message?.delta?.reasoning_content; + + if ( + chunk.hasOwnProperty("usage") && // exists + !!chunk.usage && // is not null + Object.values(chunk.usage).length > 0 // has values + ) { + if (chunk.usage.hasOwnProperty("prompt_tokens")) { + usage.prompt_tokens = Number(chunk.usage.prompt_tokens); + } + + if (chunk.usage.hasOwnProperty("completion_tokens")) { + hasUsageMetrics = true; // to stop estimating counter + usage.completion_tokens = Number(chunk.usage.completion_tokens); + } + } + + // Reasoning models will always return the reasoning text before the token text. + if (reasoningToken) { + // If the reasoning text is empty (''), we need to initialize it + // and send the first chunk of reasoning text. + if (reasoningText.length === 0) { + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: `${reasoningToken}`, + close: false, + error: false, + }); + reasoningText += `${reasoningToken}`; + continue; + } else { + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: reasoningToken, + close: false, + error: false, + }); + reasoningText += reasoningToken; + } + } + + // If the reasoning text is not empty, but the reasoning token is empty + // and the token text is not empty we need to close the reasoning text and begin sending the token text. + if (!!reasoningText && !reasoningToken && token) { + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: ``, + close: false, + error: false, + }); + fullText += `${reasoningText}`; + reasoningText = ""; + } + + if (token) { + fullText += token; + // If we never saw a usage metric, we can estimate them by number of completion chunks + if (!hasUsageMetrics) usage.completion_tokens++; + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: token, + close: false, + error: false, + }); + } + + // LocalAi returns '' and others return null on chunks - the last chunk is not "" or null. + // Either way, the key `finish_reason` must be present to determine ending chunk. + if ( + message?.hasOwnProperty("finish_reason") && // Got valid message and it is an object with finish_reason + message.finish_reason !== "" && + message.finish_reason !== null + ) { + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + response.removeListener("close", handleAbort); + stream?.endMeasurement(usage); + resolve(fullText); + break; // Break streaming when a valid finish_reason is first encountered + } + } + } catch (e) { + console.log(`\x1b[43m\x1b[34m[STREAMING ERROR]\x1b[0m ${e.message}`); + writeResponseChunk(response, { + uuid, + type: "abort", + textResponse: null, + sources: [], + close: true, + error: e.message, + }); + stream?.endMeasurement(usage); + resolve(fullText); // Return what we currently have - if anything. + } + }); } async embedTextInput(textInput) {