This commit is contained in:
Timothy Carambat 2026-02-16 16:05:14 -08:00
parent 1ccf468158
commit 35af646a53

View File

@ -1,11 +1,14 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const { v4: uuidv4 } = require("uuid");
const {
formatChatHistory,
writeResponseChunk,
clientAbortedHandler,
} = require("../../helpers/chat/responses");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
formatChatHistory,
} = require("../../helpers/chat/responses");
const { OpenAI: OpenAIApi } = require("openai");
class FoundryLLM {
@ -262,8 +265,176 @@ class FoundryLLM {
return measuredStreamRequest;
}
/**
* The timeout for the Foundry stream in milliseconds.
* This is because Foundry does not self-close the stream and so we need to timeout the stream after a certain amount of time.
* @returns {number}
*/
get timeout() {
return 500;
}
/**
* Handles the default stream response for a chat.
* @param {import("express").Response} response
* @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream
* @param {Object} responseProps
* @returns {Promise<string>}
*/
handleStream(response, stream, responseProps) {
return handleDefaultStreamResponseV2(response, stream, responseProps);
const timeoutThresholdMs = this.timeout;
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise(async (resolve) => {
let fullText = "";
let reasoningText = "";
let lastChunkTime = null; // null when first token is still not received.
// Establish listener to early-abort a streaming response
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => {
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
// NOTICE: As of Foundry 0.8.119 the stream will never return a finish_reason
// nor will it self-close or send a final chunk. So we need to maintain an interval timer that if we go >=timeoutThresholdMs with
// no new chunks then we kill the stream and assume it to be complete.
const timeoutCheck = setInterval(() => {
if (lastChunkTime === null) return;
const now = Number(new Date());
const diffMs = now - lastChunkTime;
if (diffMs >= timeoutThresholdMs) {
console.log(
`Foundry stream did not self-close and has been stale for >${timeoutThresholdMs}ms. Closing response stream.`
);
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
clearInterval(timeoutCheck);
response.removeListener("close", handleAbort);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
}
}, 500);
try {
for await (const chunk of stream) {
// console.log(JSON.stringify(chunk, null, 2));
const message = chunk?.choices?.[0];
const token = message?.delta?.content;
const reasoningToken = message?.delta?.reasoning;
lastChunkTime = Number(new Date());
// Reasoning models will always return the reasoning text before the token text.
// can be null or ''
if (reasoningToken) {
// If the reasoning text is empty (''), we need to initialize it
// and send the first chunk of reasoning text.
if (reasoningText.length === 0) {
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: `<think>${reasoningToken}`,
close: false,
error: false,
});
reasoningText += `<think>${reasoningToken}`;
continue;
} else {
// If the reasoning text is not empty, we need to append the reasoning text
// to the existing reasoning text.
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: reasoningToken,
close: false,
error: false,
});
reasoningText += reasoningToken;
}
}
// If the reasoning text is not empty, but the reasoning token is empty
// and the token text is not empty we need to close the reasoning text and begin sending the token text.
if (!!reasoningText && !reasoningToken && token) {
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: `</think>`,
close: false,
error: false,
});
fullText += `${reasoningText}</think>`;
reasoningText = "";
}
if (token) {
fullText += token;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: token,
close: false,
error: false,
});
}
// finish_reason can be "stop", "length", etc. when complete
// Must check for truthy value since undefined !== null is true
if (message?.finish_reason) {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
response.removeListener("close", handleAbort);
clearInterval(timeoutCheck);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
return; // Exit the loop after resolving
}
}
} catch (e) {
writeResponseChunk(response, {
uuid,
sources,
type: "abort",
textResponse: null,
close: true,
error: e.message,
});
response.removeListener("close", handleAbort);
clearInterval(timeoutCheck);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
}
});
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations