diff --git a/server/utils/AiProviders/foundry/index.js b/server/utils/AiProviders/foundry/index.js index b503e212..76145bf0 100644 --- a/server/utils/AiProviders/foundry/index.js +++ b/server/utils/AiProviders/foundry/index.js @@ -1,11 +1,14 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native"); +const { v4: uuidv4 } = require("uuid"); +const { + formatChatHistory, + writeResponseChunk, + clientAbortedHandler, +} = require("../../helpers/chat/responses"); const { LLMPerformanceMonitor, } = require("../../helpers/chat/LLMPerformanceMonitor"); -const { - handleDefaultStreamResponseV2, - formatChatHistory, -} = require("../../helpers/chat/responses"); + const { OpenAI: OpenAIApi } = require("openai"); class FoundryLLM { @@ -262,8 +265,176 @@ class FoundryLLM { return measuredStreamRequest; } + /** + * The timeout for the Foundry stream in milliseconds. + * This is because Foundry does not self-close the stream and so we need to timeout the stream after a certain amount of time. + * @returns {number} + */ + get timeout() { + return 500; + } + + /** + * Handles the default stream response for a chat. + * @param {import("express").Response} response + * @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream + * @param {Object} responseProps + * @returns {Promise} + */ handleStream(response, stream, responseProps) { - return handleDefaultStreamResponseV2(response, stream, responseProps); + const timeoutThresholdMs = this.timeout; + const { uuid = uuidv4(), sources = [] } = responseProps; + + return new Promise(async (resolve) => { + let fullText = ""; + let reasoningText = ""; + let lastChunkTime = null; // null when first token is still not received. + + // Establish listener to early-abort a streaming response + // in case things go sideways or the user does not like the response. + // We preserve the generated text but continue as if chat was completed + // to preserve previously generated content. + const handleAbort = () => { + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); + clientAbortedHandler(resolve, fullText); + }; + response.on("close", handleAbort); + + // NOTICE: As of Foundry 0.8.119 the stream will never return a finish_reason + // nor will it self-close or send a final chunk. So we need to maintain an interval timer that if we go >=timeoutThresholdMs with + // no new chunks then we kill the stream and assume it to be complete. + const timeoutCheck = setInterval(() => { + if (lastChunkTime === null) return; + + const now = Number(new Date()); + const diffMs = now - lastChunkTime; + + if (diffMs >= timeoutThresholdMs) { + console.log( + `Foundry stream did not self-close and has been stale for >${timeoutThresholdMs}ms. Closing response stream.` + ); + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + clearInterval(timeoutCheck); + response.removeListener("close", handleAbort); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); + resolve(fullText); + } + }, 500); + + try { + for await (const chunk of stream) { + // console.log(JSON.stringify(chunk, null, 2)); + const message = chunk?.choices?.[0]; + const token = message?.delta?.content; + const reasoningToken = message?.delta?.reasoning; + lastChunkTime = Number(new Date()); + + // Reasoning models will always return the reasoning text before the token text. + // can be null or '' + if (reasoningToken) { + // If the reasoning text is empty (''), we need to initialize it + // and send the first chunk of reasoning text. + if (reasoningText.length === 0) { + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: `${reasoningToken}`, + close: false, + error: false, + }); + reasoningText += `${reasoningToken}`; + continue; + } else { + // If the reasoning text is not empty, we need to append the reasoning text + // to the existing reasoning text. + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: reasoningToken, + close: false, + error: false, + }); + reasoningText += reasoningToken; + } + } + + // If the reasoning text is not empty, but the reasoning token is empty + // and the token text is not empty we need to close the reasoning text and begin sending the token text. + if (!!reasoningText && !reasoningToken && token) { + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: ``, + close: false, + error: false, + }); + fullText += `${reasoningText}`; + reasoningText = ""; + } + + if (token) { + fullText += token; + writeResponseChunk(response, { + uuid, + sources: [], + type: "textResponseChunk", + textResponse: token, + close: false, + error: false, + }); + } + + // finish_reason can be "stop", "length", etc. when complete + // Must check for truthy value since undefined !== null is true + if (message?.finish_reason) { + writeResponseChunk(response, { + uuid, + sources, + type: "textResponseChunk", + textResponse: "", + close: true, + error: false, + }); + response.removeListener("close", handleAbort); + clearInterval(timeoutCheck); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); + resolve(fullText); + return; // Exit the loop after resolving + } + } + } catch (e) { + writeResponseChunk(response, { + uuid, + sources, + type: "abort", + textResponse: null, + close: true, + error: e.message, + }); + response.removeListener("close", handleAbort); + clearInterval(timeoutCheck); + stream?.endMeasurement({ + completion_tokens: LLMPerformanceMonitor.countTokens(fullText), + }); + resolve(fullText); + } + }); } // Simple wrapper for dynamic embedder & normalize interface for all LLM implementations