merlyn/server/utils/chats/stream.js
Timothy Carambat 0fb33736da
Workspace Chat with documents overhaul (#4261)
* Create parse endpoint in collector (#4212)

* create parse endpoint in collector

* revert cleanup temp util call

* lint

* remove unused cleanupTempDocuments function

* revert slug change
minor change for destinations

---------

Co-authored-by: timothycarambat <rambat1010@gmail.com>

* Add parsed files table and parse server endpoints (#4222)

* add workspace_parsed_files table + parse endpoints/models

* remove dev api parse endpoint

* remove unneeded imports

* iterate over all files + remove unneeded update function + update telemetry debounce

* Upload UI/UX context window check + frontend alert (#4230)

* prompt user to embed if exceeds prompt window + handle embed + handle cancel

* add tokenCountEstimate to workspace_parsed_files + optimizations

* use util for path locations + use safeJsonParse

* add modal for user decision on overflow of context window

* lint

* dynamic fetching of provider/model combo + inject parsed documents

* remove unneeded comments

* popup ui for attaching/removing files + warning to embed + wip fetching states on update

* remove prop drilling, fetch files/limits directly in attach files popup

* rework ux of FE + BE optimizations

* fix ux of FE + BE optimizations

* Implement bidirectional sync for parsed file states
linting
small changes and comments

* move parse support to another endpoint file
simplify calls and loading of records

* button borders

* enable default users to upload parsed files but NOT embed

* delete cascade on user/workspace/thread deletion to remove parsedFileRecord

* enable bgworker with "always" jobs and optional document sync jobs
orphan document job: Will find any broken reference files to prevent overpollution of the storage folder. This will run 10s after boot and every 12hr after

* change run timeout for orphan job to 1m to allow settling before spawning a worker

* linting and cleanup pr

---------

Co-authored-by: Timothy Carambat <rambat1010@gmail.com>

* dev build

* fix tooltip hiding during embedding overflow files

* prevent crash log from ERRNO on parse files

* unused import

* update docs link

* Migrate parsed-files to GET endpoint
patch logic for grabbing models names from utils
better handling for undetermined context windows (null instead of Pos_INIFI)
UI placeholder for null context windows

* patch URL

---------

Co-authored-by: Sean Hatfield <seanhatfield5@gmail.com>
2025-08-11 09:26:19 -07:00

315 lines
9.4 KiB
JavaScript

const { v4: uuidv4 } = require("uuid");
const { DocumentManager } = require("../DocumentManager");
const { WorkspaceChats } = require("../../models/workspaceChats");
const { WorkspaceParsedFiles } = require("../../models/workspaceParsedFiles");
const { getVectorDbClass, getLLMProvider } = require("../helpers");
const { writeResponseChunk } = require("../helpers/chat/responses");
const { grepAgents } = require("./agents");
const {
grepCommand,
VALID_COMMANDS,
chatPrompt,
recentChatHistory,
sourceIdentifier,
} = require("./index");
const VALID_CHAT_MODE = ["chat", "query"];
async function streamChatWithWorkspace(
response,
workspace,
message,
chatMode = "chat",
user = null,
thread = null,
attachments = []
) {
const uuid = uuidv4();
const updatedMessage = await grepCommand(message, user);
if (Object.keys(VALID_COMMANDS).includes(updatedMessage)) {
const data = await VALID_COMMANDS[updatedMessage](
workspace,
message,
uuid,
user,
thread
);
writeResponseChunk(response, data);
return;
}
// If is agent enabled chat we will exit this flow early.
const isAgentChat = await grepAgents({
uuid,
response,
message: updatedMessage,
user,
workspace,
thread,
});
if (isAgentChat) return;
const LLMConnector = getLLMProvider({
provider: workspace?.chatProvider,
model: workspace?.chatModel,
});
const VectorDb = getVectorDbClass();
const messageLimit = workspace?.openAiHistory || 20;
const hasVectorizedSpace = await VectorDb.hasNamespace(workspace.slug);
const embeddingsCount = await VectorDb.namespaceCount(workspace.slug);
// User is trying to query-mode chat a workspace that has no data in it - so
// we should exit early as no information can be found under these conditions.
if ((!hasVectorizedSpace || embeddingsCount === 0) && chatMode === "query") {
const textResponse =
workspace?.queryRefusalResponse ??
"There is no relevant information in this workspace to answer your query.";
writeResponseChunk(response, {
id: uuid,
type: "textResponse",
textResponse,
sources: [],
attachments,
close: true,
error: null,
});
await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: message,
response: {
text: textResponse,
sources: [],
type: chatMode,
attachments,
},
threadId: thread?.id || null,
include: false,
user,
});
return;
}
// If we are here we know that we are in a workspace that is:
// 1. Chatting in "chat" mode and may or may _not_ have embeddings
// 2. Chatting in "query" mode and has at least 1 embedding
let completeText;
let metrics = {};
let contextTexts = [];
let sources = [];
let pinnedDocIdentifiers = [];
const { rawHistory, chatHistory } = await recentChatHistory({
user,
workspace,
thread,
messageLimit,
});
// Look for pinned documents and see if the user decided to use this feature. We will also do a vector search
// as pinning is a supplemental tool but it should be used with caution since it can easily blow up a context window.
// However we limit the maximum of appended context to 80% of its overall size, mostly because if it expands beyond this
// it will undergo prompt compression anyway to make it work. If there is so much pinned that the context here is bigger than
// what the model can support - it would get compressed anyway and that really is not the point of pinning. It is really best
// suited for high-context models.
await new DocumentManager({
workspace,
maxTokens: LLMConnector.promptWindowLimit(),
})
.pinnedDocs()
.then((pinnedDocs) => {
pinnedDocs.forEach((doc) => {
const { pageContent, ...metadata } = doc;
pinnedDocIdentifiers.push(sourceIdentifier(doc));
contextTexts.push(doc.pageContent);
sources.push({
text:
pageContent.slice(0, 1_000) +
"...continued on in source document...",
...metadata,
});
});
});
// Inject any parsed files for this workspace/thread/user
const parsedFiles = await WorkspaceParsedFiles.getContextFiles(
workspace,
thread || null,
user || null
);
parsedFiles.forEach((doc) => {
const { pageContent, ...metadata } = doc;
contextTexts.push(doc.pageContent);
sources.push({
text:
pageContent.slice(0, 1_000) + "...continued on in source document...",
...metadata,
});
});
const vectorSearchResults =
embeddingsCount !== 0
? await VectorDb.performSimilaritySearch({
namespace: workspace.slug,
input: updatedMessage,
LLMConnector,
similarityThreshold: workspace?.similarityThreshold,
topN: workspace?.topN,
filterIdentifiers: pinnedDocIdentifiers,
rerank: workspace?.vectorSearchMode === "rerank",
})
: {
contextTexts: [],
sources: [],
message: null,
};
// Failed similarity search if it was run at all and failed.
if (!!vectorSearchResults.message) {
writeResponseChunk(response, {
id: uuid,
type: "abort",
textResponse: null,
sources: [],
close: true,
error: vectorSearchResults.message,
});
return;
}
const { fillSourceWindow } = require("../helpers/chat");
const filledSources = fillSourceWindow({
nDocs: workspace?.topN || 4,
searchResults: vectorSearchResults.sources,
history: rawHistory,
filterIdentifiers: pinnedDocIdentifiers,
});
// Why does contextTexts get all the info, but sources only get current search?
// This is to give the ability of the LLM to "comprehend" a contextual response without
// populating the Citations under a response with documents the user "thinks" are irrelevant
// due to how we manage backfilling of the context to keep chats with the LLM more correct in responses.
// If a past citation was used to answer the question - that is visible in the history so it logically makes sense
// and does not appear to the user that a new response used information that is otherwise irrelevant for a given prompt.
// TLDR; reduces GitHub issues for "LLM citing document that has no answer in it" while keep answers highly accurate.
contextTexts = [...contextTexts, ...filledSources.contextTexts];
sources = [...sources, ...vectorSearchResults.sources];
// If in query mode and no context chunks are found from search, backfill, or pins - do not
// let the LLM try to hallucinate a response or use general knowledge and exit early
if (chatMode === "query" && contextTexts.length === 0) {
const textResponse =
workspace?.queryRefusalResponse ??
"There is no relevant information in this workspace to answer your query.";
writeResponseChunk(response, {
id: uuid,
type: "textResponse",
textResponse,
sources: [],
close: true,
error: null,
});
await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: message,
response: {
text: textResponse,
sources: [],
type: chatMode,
attachments,
},
threadId: thread?.id || null,
include: false,
user,
});
return;
}
// Compress & Assemble message to ensure prompt passes token limit with room for response
// and build system messages based on inputs and history.
const messages = await LLMConnector.compressMessages(
{
systemPrompt: await chatPrompt(workspace, user),
userPrompt: updatedMessage,
contextTexts,
chatHistory,
attachments,
},
rawHistory
);
// If streaming is not explicitly enabled for connector
// we do regular waiting of a response and send a single chunk.
if (LLMConnector.streamingEnabled() !== true) {
console.log(
`\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.`
);
const { textResponse, metrics: performanceMetrics } =
await LLMConnector.getChatCompletion(messages, {
temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
completeText = textResponse;
metrics = performanceMetrics;
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: completeText,
close: true,
error: false,
metrics,
});
} else {
const stream = await LLMConnector.streamGetChatCompletion(messages, {
temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
completeText = await LLMConnector.handleStream(response, stream, {
uuid,
sources,
});
metrics = stream.metrics;
}
if (completeText?.length > 0) {
const { chat } = await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: message,
response: {
text: completeText,
sources,
type: chatMode,
attachments,
metrics,
},
threadId: thread?.id || null,
user,
});
writeResponseChunk(response, {
uuid,
type: "finalizeResponseStream",
close: true,
error: false,
chatId: chat.id,
metrics,
});
return;
}
writeResponseChunk(response, {
uuid,
type: "finalizeResponseStream",
close: true,
error: false,
metrics,
});
return;
}
module.exports = {
VALID_CHAT_MODE,
streamChatWithWorkspace,
};