From 0fb33736dae19b78c3a7f5827e2f8abc3f61c35f Mon Sep 17 00:00:00 2001 From: Timothy Carambat Date: Mon, 11 Aug 2025 09:26:19 -0700 Subject: [PATCH] Workspace Chat with documents overhaul (#4261) * Create parse endpoint in collector (#4212) * create parse endpoint in collector * revert cleanup temp util call * lint * remove unused cleanupTempDocuments function * revert slug change minor change for destinations --------- Co-authored-by: timothycarambat * Add parsed files table and parse server endpoints (#4222) * add workspace_parsed_files table + parse endpoints/models * remove dev api parse endpoint * remove unneeded imports * iterate over all files + remove unneeded update function + update telemetry debounce * Upload UI/UX context window check + frontend alert (#4230) * prompt user to embed if exceeds prompt window + handle embed + handle cancel * add tokenCountEstimate to workspace_parsed_files + optimizations * use util for path locations + use safeJsonParse * add modal for user decision on overflow of context window * lint * dynamic fetching of provider/model combo + inject parsed documents * remove unneeded comments * popup ui for attaching/removing files + warning to embed + wip fetching states on update * remove prop drilling, fetch files/limits directly in attach files popup * rework ux of FE + BE optimizations * fix ux of FE + BE optimizations * Implement bidirectional sync for parsed file states linting small changes and comments * move parse support to another endpoint file simplify calls and loading of records * button borders * enable default users to upload parsed files but NOT embed * delete cascade on user/workspace/thread deletion to remove parsedFileRecord * enable bgworker with "always" jobs and optional document sync jobs orphan document job: Will find any broken reference files to prevent overpollution of the storage folder. This will run 10s after boot and every 12hr after * change run timeout for orphan job to 1m to allow settling before spawning a worker * linting and cleanup pr --------- Co-authored-by: Timothy Carambat * dev build * fix tooltip hiding during embedding overflow files * prevent crash log from ERRNO on parse files * unused import * update docs link * Migrate parsed-files to GET endpoint patch logic for grabbing models names from utils better handling for undetermined context windows (null instead of Pos_INIFI) UI placeholder for null context windows * patch URL --------- Co-authored-by: Sean Hatfield --- .github/workflows/dev-build.yaml | 2 +- collector/index.js | 33 +++ .../processSingleFile/convert/asAudio.js | 1 + collector/processSingleFile/convert/asDocx.js | 3 +- collector/processSingleFile/convert/asEPub.js | 3 +- .../processSingleFile/convert/asImage.js | 1 + collector/processSingleFile/convert/asMbox.js | 1 + .../processSingleFile/convert/asOfficeMime.js | 1 + .../processSingleFile/convert/asPDF/index.js | 1 + collector/processSingleFile/convert/asTxt.js | 3 +- collector/processSingleFile/convert/asXlsx.js | 8 +- collector/utils/files/index.js | 18 +- .../ChatContainer/ChatTooltips/index.jsx | 6 + .../FileUploadWarningModal/index.jsx | 106 +++++++ .../ChatContainer/DnDWrapper/index.jsx | 270 +++++++++++++++--- .../AttachItem/ParsedFilesMenu/index.jsx | 197 +++++++++++++ .../PromptInput/AttachItem/index.jsx | 132 +++++++-- .../PromptInput/Attachments/index.jsx | 8 +- .../src/components/WorkspaceChat/index.jsx | 2 +- frontend/src/models/workspace.js | 51 +++- frontend/src/utils/paths.js | 7 + server/.gitignore | 1 + server/endpoints/workspaces.js | 6 +- server/endpoints/workspacesParsedFiles.js | 199 +++++++++++++ server/jobs/cleanup-orphan-documents.js | 64 +++++ server/models/telemetry.js | 3 +- server/models/workspace.js | 49 +++- server/models/workspaceParsedFiles.js | 227 +++++++++++++++ .../20250808171557_init/migration.sql | 23 ++ server/prisma/schema.prisma | 38 ++- server/utils/BackgroundWorkers/index.js | 42 ++- server/utils/chats/stream.js | 17 ++ server/utils/collectorApi/index.js | 36 +++ server/utils/files/index.js | 5 + server/utils/helpers/index.js | 8 +- 35 files changed, 1471 insertions(+), 101 deletions(-) create mode 100644 frontend/src/components/WorkspaceChat/ChatContainer/DnDWrapper/FileUploadWarningModal/index.jsx create mode 100644 frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/AttachItem/ParsedFilesMenu/index.jsx create mode 100644 server/endpoints/workspacesParsedFiles.js create mode 100644 server/jobs/cleanup-orphan-documents.js create mode 100644 server/models/workspaceParsedFiles.js create mode 100644 server/prisma/migrations/20250808171557_init/migration.sql diff --git a/.github/workflows/dev-build.yaml b/.github/workflows/dev-build.yaml index c4a2ccc6..f9aca9fe 100644 --- a/.github/workflows/dev-build.yaml +++ b/.github/workflows/dev-build.yaml @@ -6,7 +6,7 @@ concurrency: on: push: - branches: ['mobile-support'] # put your current branch to create a build. Core team only. + branches: ['upload-ui-ux'] # put your current branch to create a build. Core team only. paths-ignore: - '**.md' - 'cloud-deployments/*' diff --git a/collector/index.js b/collector/index.js index 73091efc..962ee6f8 100644 --- a/collector/index.js +++ b/collector/index.js @@ -58,6 +58,39 @@ app.post( } ); +app.post( + "/parse", + [verifyPayloadIntegrity], + async function (request, response) { + const { filename, options = {} } = reqBody(request); + try { + const targetFilename = path + .normalize(filename) + .replace(/^(\.\.(\/|\\|$))+/, ""); + const { + success, + reason, + documents = [], + } = await processSingleFile(targetFilename, { + ...options, + parseOnly: true, + }); + response + .status(200) + .json({ filename: targetFilename, success, reason, documents }); + } catch (e) { + console.error(e); + response.status(200).json({ + filename: filename, + success: false, + reason: "A processing error occurred.", + documents: [], + }); + } + return; + } +); + app.post( "/process-link", [verifyPayloadIntegrity], diff --git a/collector/processSingleFile/convert/asAudio.js b/collector/processSingleFile/convert/asAudio.js index 8b179a52..dccb2b95 100644 --- a/collector/processSingleFile/convert/asAudio.js +++ b/collector/processSingleFile/convert/asAudio.js @@ -62,6 +62,7 @@ async function asAudio({ fullFilePath = "", filename = "", options = {} }) { const document = writeToServerDocuments({ data, filename: `${slugify(filename)}-${data.id}`, + options: { parseOnly: options.parseOnly }, }); trashFile(fullFilePath); console.log( diff --git a/collector/processSingleFile/convert/asDocx.js b/collector/processSingleFile/convert/asDocx.js index 2dfad739..6e2133b1 100644 --- a/collector/processSingleFile/convert/asDocx.js +++ b/collector/processSingleFile/convert/asDocx.js @@ -8,7 +8,7 @@ const { const { tokenizeString } = require("../../utils/tokenizer"); const { default: slugify } = require("slugify"); -async function asDocX({ fullFilePath = "", filename = "" }) { +async function asDocX({ fullFilePath = "", filename = "", options = {} }) { const loader = new DocxLoader(fullFilePath); console.log(`-- Working ${filename} --`); @@ -48,6 +48,7 @@ async function asDocX({ fullFilePath = "", filename = "" }) { const document = writeToServerDocuments({ data, filename: `${slugify(filename)}-${data.id}`, + options: { parseOnly: options.parseOnly }, }); trashFile(fullFilePath); console.log(`[SUCCESS]: ${filename} converted & ready for embedding.\n`); diff --git a/collector/processSingleFile/convert/asEPub.js b/collector/processSingleFile/convert/asEPub.js index 3b7f7295..15a01b23 100644 --- a/collector/processSingleFile/convert/asEPub.js +++ b/collector/processSingleFile/convert/asEPub.js @@ -8,7 +8,7 @@ const { } = require("../../utils/files"); const { default: slugify } = require("slugify"); -async function asEPub({ fullFilePath = "", filename = "" }) { +async function asEPub({ fullFilePath = "", filename = "", options = {} }) { let content = ""; try { const loader = new EPubLoader(fullFilePath, { splitChapters: false }); @@ -46,6 +46,7 @@ async function asEPub({ fullFilePath = "", filename = "" }) { const document = writeToServerDocuments({ data, filename: `${slugify(filename)}-${data.id}`, + options: { parseOnly: options.parseOnly }, }); trashFile(fullFilePath); console.log(`[SUCCESS]: ${filename} converted & ready for embedding.\n`); diff --git a/collector/processSingleFile/convert/asImage.js b/collector/processSingleFile/convert/asImage.js index 8bf8adf7..05eff2d5 100644 --- a/collector/processSingleFile/convert/asImage.js +++ b/collector/processSingleFile/convert/asImage.js @@ -41,6 +41,7 @@ async function asImage({ fullFilePath = "", filename = "", options = {} }) { const document = writeToServerDocuments({ data, filename: `${slugify(filename)}-${data.id}`, + options: { parseOnly: options.parseOnly }, }); trashFile(fullFilePath); console.log(`[SUCCESS]: ${filename} converted & ready for embedding.\n`); diff --git a/collector/processSingleFile/convert/asMbox.js b/collector/processSingleFile/convert/asMbox.js index 74427046..e5a3a98d 100644 --- a/collector/processSingleFile/convert/asMbox.js +++ b/collector/processSingleFile/convert/asMbox.js @@ -60,6 +60,7 @@ async function asMbox({ fullFilePath = "", filename = "" }) { const document = writeToServerDocuments({ data, filename: `${slugify(filename)}-${data.id}-msg-${item}`, + options: { parseOnly: options.parseOnly }, }); documents.push(document); } diff --git a/collector/processSingleFile/convert/asOfficeMime.js b/collector/processSingleFile/convert/asOfficeMime.js index 66a13588..ac8ae31e 100644 --- a/collector/processSingleFile/convert/asOfficeMime.js +++ b/collector/processSingleFile/convert/asOfficeMime.js @@ -44,6 +44,7 @@ async function asOfficeMime({ fullFilePath = "", filename = "" }) { const document = writeToServerDocuments({ data, filename: `${slugify(filename)}-${data.id}`, + options: { parseOnly: options.parseOnly }, }); trashFile(fullFilePath); console.log(`[SUCCESS]: ${filename} converted & ready for embedding.\n`); diff --git a/collector/processSingleFile/convert/asPDF/index.js b/collector/processSingleFile/convert/asPDF/index.js index db66c766..5971dd4e 100644 --- a/collector/processSingleFile/convert/asPDF/index.js +++ b/collector/processSingleFile/convert/asPDF/index.js @@ -65,6 +65,7 @@ async function asPdf({ fullFilePath = "", filename = "", options = {} }) { const document = writeToServerDocuments({ data, filename: `${slugify(filename)}-${data.id}`, + options: { parseOnly: options.parseOnly }, }); trashFile(fullFilePath); console.log(`[SUCCESS]: ${filename} converted & ready for embedding.\n`); diff --git a/collector/processSingleFile/convert/asTxt.js b/collector/processSingleFile/convert/asTxt.js index 1abe4953..7e3bd92c 100644 --- a/collector/processSingleFile/convert/asTxt.js +++ b/collector/processSingleFile/convert/asTxt.js @@ -8,7 +8,7 @@ const { } = require("../../utils/files"); const { default: slugify } = require("slugify"); -async function asTxt({ fullFilePath = "", filename = "" }) { +async function asTxt({ fullFilePath = "", filename = "", options = {} }) { let content = ""; try { content = fs.readFileSync(fullFilePath, "utf8"); @@ -44,6 +44,7 @@ async function asTxt({ fullFilePath = "", filename = "" }) { const document = writeToServerDocuments({ data, filename: `${slugify(filename)}-${data.id}`, + options: { parseOnly: options.parseOnly }, }); trashFile(fullFilePath); console.log(`[SUCCESS]: ${filename} converted & ready for embedding.\n`); diff --git a/collector/processSingleFile/convert/asXlsx.js b/collector/processSingleFile/convert/asXlsx.js index a64e8e20..832e44a7 100644 --- a/collector/processSingleFile/convert/asXlsx.js +++ b/collector/processSingleFile/convert/asXlsx.js @@ -7,6 +7,7 @@ const { trashFile, writeToServerDocuments, documentsFolder, + directUploadsFolder, } = require("../../utils/files"); const { tokenizeString } = require("../../utils/tokenizer"); const { default: slugify } = require("slugify"); @@ -26,14 +27,16 @@ function convertToCSV(data) { .join("\n"); } -async function asXlsx({ fullFilePath = "", filename = "" }) { +async function asXlsx({ fullFilePath = "", filename = "", options = {} }) { const documents = []; const folderName = slugify(`${path.basename(filename)}-${v4().slice(0, 4)}`, { lower: true, trim: true, }); + const outFolderPath = options.parseOnly + ? path.resolve(directUploadsFolder, folderName) + : path.resolve(documentsFolder, folderName); - const outFolderPath = path.resolve(documentsFolder, folderName); try { const workSheetsFromFile = xlsx.parse(fullFilePath); if (!fs.existsSync(outFolderPath)) @@ -68,6 +71,7 @@ async function asXlsx({ fullFilePath = "", filename = "" }) { data: sheetData, filename: `sheet-${slugify(name)}`, destinationOverride: outFolderPath, + options: { parseOnly: options.parseOnly }, }); documents.push(document); console.log( diff --git a/collector/utils/files/index.js b/collector/utils/files/index.js index f1b9d93e..dd0f9315 100644 --- a/collector/utils/files/index.js +++ b/collector/utils/files/index.js @@ -11,6 +11,16 @@ const documentsFolder = ? path.resolve(__dirname, `../../../server/storage/documents`) : path.resolve(process.env.STORAGE_DIR, `documents`); +/** + * The folder where direct uploads are stored to be stored when + * processed by the collector. These are files that were DnD'd into UI + * and are not to be embedded or selectable from the file picker. + */ +const directUploadsFolder = + process.env.NODE_ENV === "development" + ? path.resolve(__dirname, `../../../server/storage/direct-uploads`) + : path.resolve(process.env.STORAGE_DIR, `direct-uploads`); + /** * Checks if a file is text by checking the mime type and then falling back to buffer inspection. * This way we can capture all the cases where the mime type is not known but still parseable as text @@ -102,17 +112,21 @@ function createdDate(filepath) { * @param {Object} params.data - The data to write to the file. Must look like a document object. * @param {string} params.filename - The name of the file to write to. * @param {string|null} params.destinationOverride - A forced destination to write to - will be honored if provided. + * @param {Object} params.options - The options for the function. + * @param {boolean} params.options.parseOnly - If true, the file will be written to the direct uploads folder instead of the documents folder. Will be ignored if destinationOverride is provided. * @returns {Object} - The data with the location added. */ function writeToServerDocuments({ data = {}, - filename = null, + filename, destinationOverride = null, + options = {}, }) { if (!filename) throw new Error("Filename is required!"); let destination = null; if (destinationOverride) destination = path.resolve(destinationOverride); + else if (options.parseOnly) destination = path.resolve(directUploadsFolder); else destination = path.resolve(documentsFolder, "custom-documents"); if (!fs.existsSync(destination)) @@ -129,6 +143,7 @@ function writeToServerDocuments({ // that will work since we know the location exists and since we only allow // 1-level deep folders this will always work. This still works for integrations like GitHub and YouTube. location: destinationFilePath.split("/").slice(-2).join("/"), + isDirectUpload: options.parseOnly || false, }; } @@ -207,4 +222,5 @@ module.exports = { isWithin, sanitizeFileName, documentsFolder, + directUploadsFolder, }; diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/ChatTooltips/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/ChatTooltips/index.jsx index de1b1234..671e0fd4 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/ChatTooltips/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/ChatTooltips/index.jsx @@ -78,6 +78,12 @@ export function ChatTooltips() { delayShow={500} className="tooltip !text-xs max-w-[350px]" /> + ); diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/DnDWrapper/FileUploadWarningModal/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/DnDWrapper/FileUploadWarningModal/index.jsx new file mode 100644 index 00000000..b5b6d9a8 --- /dev/null +++ b/frontend/src/components/WorkspaceChat/ChatContainer/DnDWrapper/FileUploadWarningModal/index.jsx @@ -0,0 +1,106 @@ +import { CircleNotch } from "@phosphor-icons/react"; +import ModalWrapper from "@/components/ModalWrapper"; +import pluralize from "pluralize"; +import { numberWithCommas } from "@/utils/numbers"; +import useUser from "@/hooks/useUser"; +import { Link } from "react-router-dom"; +import Paths from "@/utils/paths"; +import Workspace from "@/models/workspace"; + +export default function FileUploadWarningModal({ + show, + onClose, + onContinue, + onEmbed, + tokenCount, + maxTokens, + fileCount = 1, + isEmbedding = false, + embedProgress = 0, +}) { + const { user } = useUser(); + const canEmbed = !user || user.role !== "default"; + if (!show) return null; + + if (isEmbedding) { + return ( + +
+
+

+ Embedding {embedProgress + 1} of {fileCount}{" "} + {pluralize("file", fileCount)} +

+ +

+ Please wait while we embed your files... +

+
+
+
+ ); + } + + return ( + +
+
+
+

+ Context Window Warning +

+
+
+ +
+

+ Your workspace is using {numberWithCommas(tokenCount)} of{" "} + {numberWithCommas(maxTokens)} available tokens. We recommend keeping + usage below {(Workspace.maxContextWindowLimit * 100).toFixed(0)}% to + ensure the best chat experience. Adding {fileCount} more{" "} + {pluralize("file", fileCount)} would exceed this limit.{" "} + + Learn more about context windows → + +

+

+ Choose how you would like to proceed with these uploads. +

+
+ +
+ +
+ + {canEmbed && ( + + )} +
+
+
+
+ ); +} diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/DnDWrapper/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/DnDWrapper/index.jsx index 27148ef0..1cbfcda8 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/DnDWrapper/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/DnDWrapper/index.jsx @@ -4,7 +4,9 @@ import System from "@/models/system"; import { useDropzone } from "react-dropzone"; import DndIcon from "./dnd-icon.png"; import Workspace from "@/models/workspace"; -import useUser from "@/hooks/useUser"; +import showToast from "@/utils/toast"; +import FileUploadWarningModal from "./FileUploadWarningModal"; +import pluralize from "pluralize"; export const DndUploaderContext = createContext(); export const REMOVE_ATTACHMENT_EVENT = "ATTACHMENT_REMOVE"; @@ -12,6 +14,8 @@ export const CLEAR_ATTACHMENTS_EVENT = "ATTACHMENT_CLEAR"; export const PASTE_ATTACHMENT_EVENT = "ATTACHMENT_PASTED"; export const ATTACHMENTS_PROCESSING_EVENT = "ATTACHMENTS_PROCESSING"; export const ATTACHMENTS_PROCESSED_EVENT = "ATTACHMENTS_PROCESSED"; +export const PARSED_FILE_ATTACHMENT_REMOVED_EVENT = + "PARSED_FILE_ATTACHMENT_REMOVED"; /** * File Attachment for automatic upload on the chat container page. @@ -19,30 +23,58 @@ export const ATTACHMENTS_PROCESSED_EVENT = "ATTACHMENTS_PROCESSED"; * @property {string} uid - unique file id. * @property {File} file - native File object * @property {string|null} contentString - base64 encoded string of file - * @property {('in_progress'|'failed'|'success')} status - the automatic upload status. + * @property {('in_progress'|'failed'|'embedded'|'added_context')} status - the automatic upload status. * @property {string|null} error - Error message * @property {{id:string, location:string}|null} document - uploaded document details * @property {('attachment'|'upload')} type - The type of upload. Attachments are chat-specific, uploads go to the workspace. */ -export function DnDFileUploaderProvider({ workspace, children }) { +/** + * @typedef {Object} ParsedFile + * @property {number} id - The id of the parsed file. + * @property {string} filename - The name of the parsed file. + * @property {number} workspaceId - The id of the workspace the parsed file belongs to. + * @property {string|null} userId - The id of the user the parsed file belongs to. + * @property {string|null} threadId - The id of the thread the parsed file belongs to. + * @property {string} metadata - The metadata of the parsed file. + * @property {number} tokenCountEstimate - The estimated token count of the parsed file. + */ + +export function DnDFileUploaderProvider({ + workspace, + threadSlug = null, + children, +}) { const [files, setFiles] = useState([]); const [ready, setReady] = useState(false); const [dragging, setDragging] = useState(false); - const { user } = useUser(); + const [showWarningModal, setShowWarningModal] = useState(false); + const [isEmbedding, setIsEmbedding] = useState(false); + const [embedProgress, setEmbedProgress] = useState(0); + const [pendingFiles, setPendingFiles] = useState([]); + const [tokenCount, setTokenCount] = useState(0); + const [maxTokens, setMaxTokens] = useState(Number.POSITIVE_INFINITY); useEffect(() => { System.checkDocumentProcessorOnline().then((status) => setReady(status)); - }, [user]); + }, []); useEffect(() => { window.addEventListener(REMOVE_ATTACHMENT_EVENT, handleRemove); window.addEventListener(CLEAR_ATTACHMENTS_EVENT, resetAttachments); window.addEventListener(PASTE_ATTACHMENT_EVENT, handlePastedAttachment); + window.addEventListener( + PARSED_FILE_ATTACHMENT_REMOVED_EVENT, + handleRemoveParsedFile + ); return () => { window.removeEventListener(REMOVE_ATTACHMENT_EVENT, handleRemove); window.removeEventListener(CLEAR_ATTACHMENTS_EVENT, resetAttachments); + window.removeEventListener( + PARSED_FILE_ATTACHMENT_REMOVED_EVENT, + handleRemoveParsedFile + ); window.removeEventListener( PASTE_ATTACHMENT_EVENT, handlePastedAttachment @@ -50,6 +82,18 @@ export function DnDFileUploaderProvider({ workspace, children }) { }; }, []); + /** + * Handles the removal of a parsed file attachment from the uploader queue. + * Only uses the document id to remove the file from the queue + * @param {CustomEvent<{document: ParsedFile}>} event + */ + async function handleRemoveParsedFile(event) { + const { document } = event.detail; + setFiles((prev) => + prev.filter((prevFile) => prevFile.document.id !== document.id) + ); + } + /** * Remove file from uploader queue. * @param {CustomEvent<{uid: string}>} event @@ -112,8 +156,6 @@ export function DnDFileUploaderProvider({ workspace, children }) { type: "attachment", }); } else { - // If the user is a default user, we do not want to allow them to upload files. - if (!!user && user.role === "default") continue; newAccepted.push({ uid: v4(), file, @@ -149,8 +191,6 @@ export function DnDFileUploaderProvider({ workspace, children }) { type: "attachment", }); } else { - // If the user is a default user, we do not want to allow them to upload files. - if (!!user && user.role === "default") continue; newAccepted.push({ uid: v4(), file, @@ -170,36 +210,87 @@ export function DnDFileUploaderProvider({ workspace, children }) { * Embeds attachments that are eligible for embedding - basically files that are not images. * @param {Attachment[]} newAttachments */ - function embedEligibleAttachments(newAttachments = []) { + async function embedEligibleAttachments(newAttachments = []) { window.dispatchEvent(new CustomEvent(ATTACHMENTS_PROCESSING_EVENT)); const promises = []; + const { currentContextTokenCount, contextWindow } = + await Workspace.getParsedFiles(workspace.slug, threadSlug); + const workspaceContextWindow = contextWindow + ? Math.floor(contextWindow * Workspace.maxContextWindowLimit) + : Number.POSITIVE_INFINITY; + setMaxTokens(workspaceContextWindow); + + let totalTokenCount = currentContextTokenCount; + let batchPendingFiles = []; + for (const attachment of newAttachments) { // Images/attachments are chat specific. if (attachment.type === "attachment") continue; const formData = new FormData(); formData.append("file", attachment.file, attachment.file.name); + formData.append("threadSlug", threadSlug || null); promises.push( - Workspace.uploadAndEmbedFile(workspace.slug, formData).then( - ({ response, data }) => { + Workspace.parseFile(workspace.slug, formData).then( + async ({ response, data }) => { + if (!response.ok) { + const updates = { + status: "failed", + error: data?.error ?? null, + }; + setFiles((prev) => + prev.map( + ( + /** @type {Attachment} */ + prevFile + ) => + prevFile.uid !== attachment.uid + ? prevFile + : { ...prevFile, ...updates } + ) + ); + return; + } + // Will always be one file in the array + /** @type {ParsedFile} */ + const file = data.files[0]; + + // Add token count for this file + // and add it to the batch pending files + totalTokenCount += file.tokenCountEstimate; + batchPendingFiles.push({ + attachment, + parsedFileId: file.id, + tokenCount: file.tokenCountEstimate, + }); + + if (totalTokenCount > workspaceContextWindow) { + setTokenCount(totalTokenCount); + setPendingFiles(batchPendingFiles); + setShowWarningModal(true); + return; + } + + // File is within limits, keep in parsed files + const result = { success: true, document: file }; const updates = { - status: response.ok ? "success" : "failed", - error: data?.error ?? null, - document: data?.document, + status: result.success ? "added_context" : "failed", + error: result.error ?? null, + document: result.document, }; - setFiles((prev) => { - return prev.map( + setFiles((prev) => + prev.map( ( /** @type {Attachment} */ prevFile - ) => { - if (prevFile.uid !== attachment.uid) return prevFile; - return { ...prevFile, ...updates }; - } - ); - }); + ) => + prevFile.uid !== attachment.uid + ? prevFile + : { ...prevFile, ...updates } + ) + ); } ) ); @@ -211,10 +302,117 @@ export function DnDFileUploaderProvider({ workspace, children }) { ); } + // Handle modal actions + const handleCloseModal = async () => { + if (!pendingFiles.length) return; + + // Delete all files from this batch + await Workspace.deleteParsedFiles( + workspace.slug, + pendingFiles.map((file) => file.parsedFileId) + ); + + // Remove all files from this batch from the UI + setFiles((prev) => + prev.filter( + (prevFile) => + !pendingFiles.some((file) => file.attachment.uid === prevFile.uid) + ) + ); + setShowWarningModal(false); + setPendingFiles([]); + setTokenCount(0); + window.dispatchEvent(new CustomEvent(ATTACHMENTS_PROCESSED_EVENT)); + }; + + const handleContinueAnyway = async () => { + if (!pendingFiles.length) return; + const results = pendingFiles.map((file) => ({ + success: true, + document: { id: file.parsedFileId }, + })); + + const fileUpdates = pendingFiles.map((file, i) => ({ + uid: file.attachment.uid, + updates: { + status: results[i].success ? "success" : "failed", + error: results[i].error ?? null, + document: results[i].document, + }, + })); + + setFiles((prev) => + prev.map((prevFile) => { + const update = fileUpdates.find((f) => f.uid === prevFile.uid); + return update ? { ...prevFile, ...update.updates } : prevFile; + }) + ); + setShowWarningModal(false); + setPendingFiles([]); + setTokenCount(0); + }; + + const handleEmbed = async () => { + if (!pendingFiles.length) return; + setIsEmbedding(true); + setEmbedProgress(0); + + // Embed all pending files + let completed = 0; + const results = await Promise.all( + pendingFiles.map((file) => + Workspace.embedParsedFile(workspace.slug, file.parsedFileId).then( + (result) => { + completed++; + setEmbedProgress(completed); + return result; + } + ) + ) + ); + + // Update status for all files + const fileUpdates = pendingFiles.map((file, i) => ({ + uid: file.attachment.uid, + updates: { + status: results[i].response.ok ? "embedded" : "failed", + error: results[i].data?.error ?? null, + document: results[i].data?.document, + }, + })); + + setFiles((prev) => + prev.map((prevFile) => { + const update = fileUpdates.find((f) => f.uid === prevFile.uid); + return update ? { ...prevFile, ...update.updates } : prevFile; + }) + ); + setShowWarningModal(false); + setPendingFiles([]); + setTokenCount(0); + setIsEmbedding(false); + window.dispatchEvent(new CustomEvent(ATTACHMENTS_PROCESSED_EVENT)); + showToast( + `${pendingFiles.length} ${pluralize("file", pendingFiles.length)} embedded successfully`, + "success" + ); + }; + return ( + {children} ); @@ -231,8 +429,6 @@ export default function DnDFileUploaderWrapper({ children }) { onDragEnter: () => setDragging(true), onDragLeave: () => setDragging(false), }); - const { user } = useUser(); - const canUploadAll = !user || user?.role !== "default"; return (
- -

- Add {canUploadAll ? "anything" : "an image"} -

+ Drag and drop icon +

Add anything

- {canUploadAll ? ( - <> - Drop your file here to embed it into your
- workspace auto-magically. - - ) : ( - <> - Drop your image here to chat with it
- auto-magically. - - )} + Drop a file or image here to attach it to your
+ workspace auto-magically.

diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/AttachItem/ParsedFilesMenu/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/AttachItem/ParsedFilesMenu/index.jsx new file mode 100644 index 00000000..4102c001 --- /dev/null +++ b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/AttachItem/ParsedFilesMenu/index.jsx @@ -0,0 +1,197 @@ +import { useState } from "react"; +import { X, CircleNotch, Warning } from "@phosphor-icons/react"; +import Workspace from "@/models/workspace"; +import { useParams } from "react-router-dom"; +import { nFormatter } from "@/utils/numbers"; +import showToast from "@/utils/toast"; +import pluralize from "pluralize"; +import { PARSED_FILE_ATTACHMENT_REMOVED_EVENT } from "../../../DnDWrapper"; +import useUser from "@/hooks/useUser"; + +export default function ParsedFilesMenu({ + onEmbeddingChange, + tooltipRef, + files, + setFiles, + currentTokens, + setCurrentTokens, + contextWindow, + isLoading, +}) { + const { user } = useUser(); + const canEmbed = !user || user.role !== "default"; + const initialContextWindowLimitExceeded = + contextWindow && + currentTokens >= contextWindow * Workspace.maxContextWindowLimit; + const { slug, threadSlug = null } = useParams(); + const [isEmbedding, setIsEmbedding] = useState(false); + const [embedProgress, setEmbedProgress] = useState(1); + const [contextWindowLimitExceeded, setContextWindowLimitExceeded] = useState( + initialContextWindowLimitExceeded + ); + + async function handleRemove(e, file) { + e.preventDefault(); + e.stopPropagation(); + if (!file?.id) return; + + const success = await Workspace.deleteParsedFiles(slug, [file.id]); + if (!success) return; + + // Update the local files list and current tokens + setFiles((prev) => prev.filter((f) => f.id !== file.id)); + + // Dispatch an event to the DnDFileUploaderWrapper to update the files list in attachment manager if it exists + window.dispatchEvent( + new CustomEvent(PARSED_FILE_ATTACHMENT_REMOVED_EVENT, { + detail: { document: file }, + }) + ); + const { currentContextTokenCount } = await Workspace.getParsedFiles( + slug, + threadSlug + ); + const newContextWindowLimitExceeded = + contextWindow && + currentContextTokenCount >= + contextWindow * Workspace.maxContextWindowLimit; + setCurrentTokens(currentContextTokenCount); + setContextWindowLimitExceeded(newContextWindowLimitExceeded); + } + + /** + * Handles the embedding of the files when the user exceeds the context window limit + * and opts to embed the files into the workspace instead. + * @returns {Promise} + */ + async function handleEmbed() { + if (!files.length) return; + setIsEmbedding(true); + onEmbeddingChange?.(true); + setEmbedProgress(1); + try { + let completed = 0; + await Promise.all( + files.map((file) => + Workspace.embedParsedFile(slug, file.id).then(() => { + completed++; + setEmbedProgress(completed + 1); + }) + ) + ); + setFiles([]); + const { currentContextTokenCount } = await Workspace.getParsedFiles( + slug, + threadSlug + ); + setCurrentTokens(currentContextTokenCount); + setContextWindowLimitExceeded( + currentContextTokenCount >= + contextWindow * Workspace.maxContextWindowLimit + ); + showToast( + `${files.length} ${pluralize("file", files.length)} embedded successfully`, + "success" + ); + tooltipRef?.current?.close(); + } catch (error) { + console.error("Failed to embed files:", error); + showToast("Failed to embed files", "error"); + } + setIsEmbedding(false); + onEmbeddingChange?.(false); + setEmbedProgress(1); + } + + return ( +
+
+
+ Current Context ({files.length} files) +
+
+ {contextWindowLimitExceeded && ( + + )} +
+ {nFormatter(currentTokens)} /{" "} + {contextWindow ? nFormatter(contextWindow) : "--"} tokens +
+
+
+ {contextWindowLimitExceeded && canEmbed && ( +
+
+ +
+ Your context window is getting full. Some files may be truncated + or excluded from chat responses. We recommend embedding these + files directly into your workspace for better results. +
+
+ +
+ )} +
+ {files.length > 0 && + files.map((file, i) => ( +
+
+ {file.title} +
+ +
+ ))} + {isLoading && ( +
+ + Loading... +
+ )} + {!isLoading && files.length === 0 && ( +
+ No files found +
+ )} +
+
+ ); +} diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/AttachItem/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/AttachItem/index.jsx index 81a402d2..1f827060 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/AttachItem/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/AttachItem/index.jsx @@ -1,7 +1,15 @@ -import useUser from "@/hooks/useUser"; import { PaperclipHorizontal } from "@phosphor-icons/react"; import { Tooltip } from "react-tooltip"; import { useTranslation } from "react-i18next"; +import { useRef, useState, useEffect } from "react"; +import { useParams } from "react-router-dom"; +import Workspace from "@/models/workspace"; +import { + ATTACHMENTS_PROCESSED_EVENT, + REMOVE_ATTACHMENT_EVENT, +} from "../../DnDWrapper"; +import { useTheme } from "@/hooks/useTheme"; +import ParsedFilesMenu from "./ParsedFilesMenu"; /** * This is a simple proxy component that clicks on the DnD file uploader for the user. @@ -9,35 +17,119 @@ import { useTranslation } from "react-i18next"; */ export default function AttachItem() { const { t } = useTranslation(); - const { user } = useUser(); - if (!!user && user.role === "default") return null; + const { theme } = useTheme(); + const { slug, threadSlug = null } = useParams(); + const tooltipRef = useRef(null); + const [isEmbedding, setIsEmbedding] = useState(false); + const [files, setFiles] = useState([]); + const [currentTokens, setCurrentTokens] = useState(0); + const [contextWindow, setContextWindow] = useState(Infinity); + const [showTooltip, setShowTooltip] = useState(false); + const [isLoading, setIsLoading] = useState(true); + + const fetchFiles = () => { + if (!slug) return; + if (isEmbedding) return; + setIsLoading(true); + Workspace.getParsedFiles(slug, threadSlug) + .then(({ files, contextWindow, currentContextTokenCount }) => { + setFiles(files); + setShowTooltip(files.length > 0); + setContextWindow(contextWindow); + setCurrentTokens(currentContextTokenCount); + }) + .finally(() => { + setIsLoading(false); + }); + }; + + /** + * Handles the removal of an attachment from the parsed files + * and triggers a re-fetch of the parsed files. + * This function handles when the user clicks the X on an Attachment via the AttachmentManager + * so we need to sync the state in the ParsedFilesMenu picker here. + */ + async function handleRemoveAttachment(e) { + const { document } = e.detail; + await Workspace.deleteParsedFiles(slug, [document.id]); + fetchFiles(); + } + + /** + * Handles the click event for the attach item button. + * @param {MouseEvent} e - The click event. + * @returns {void} + */ + function handleClick(e) { + e?.target?.blur(); + document?.getElementById("dnd-chat-file-uploader")?.click(); + return; + } + + useEffect(() => { + fetchFiles(); + window.addEventListener(ATTACHMENTS_PROCESSED_EVENT, fetchFiles); + window.addEventListener(REMOVE_ATTACHMENT_EVENT, handleRemoveAttachment); + return () => { + window.removeEventListener(ATTACHMENTS_PROCESSED_EVENT, fetchFiles); + window.removeEventListener( + REMOVE_ATTACHMENT_EVENT, + handleRemoveAttachment + ); + }; + }, [slug, threadSlug]); return ( <> - + {showTooltip && ( + + + + )} ); } diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/Attachments/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/Attachments/index.jsx index a961bc8d..69ecbc54 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/Attachments/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/PromptInput/Attachments/index.jsx @@ -160,7 +160,11 @@ function AttachmentItem({ attachment }) { <>
@@ -186,7 +190,7 @@ function AttachmentItem({ attachment }) { {file.name}

- File embedded! + {status === "embedded" ? "File embedded!" : "Added as context!"}

diff --git a/frontend/src/components/WorkspaceChat/index.jsx b/frontend/src/components/WorkspaceChat/index.jsx index a4d601dc..47a3bbeb 100644 --- a/frontend/src/components/WorkspaceChat/index.jsx +++ b/frontend/src/components/WorkspaceChat/index.jsx @@ -79,7 +79,7 @@ export default function WorkspaceChat({ loading, workspace }) { setEventDelegatorForCodeSnippets(); return ( - + diff --git a/frontend/src/models/workspace.js b/frontend/src/models/workspace.js index 8abdcfe3..3627b4e3 100644 --- a/frontend/src/models/workspace.js +++ b/frontend/src/models/workspace.js @@ -1,4 +1,4 @@ -import { API_BASE } from "@/utils/constants"; +import { API_BASE, fullApiUrl } from "@/utils/constants"; import { baseHeaders, safeJsonParse } from "@/utils/request"; import { fetchEventSource } from "@microsoft/fetch-event-source"; import WorkspaceThread from "@/models/workspaceThread"; @@ -7,6 +7,8 @@ import { ABORT_STREAM_EVENT } from "@/utils/chat"; const Workspace = { workspaceOrderStorageKey: "anythingllm-workspace-order", + /** The maximum percentage of the context window that can be used for attachments */ + maxContextWindowLimit: 0.8, new: async function (data = {}) { const { workspace, message } = await fetch(`${API_BASE}/workspace/new`, { @@ -250,6 +252,28 @@ const Workspace = { const data = await response.json(); return { response, data }; }, + parseFile: async function (slug, formData) { + const response = await fetch(`${API_BASE}/workspace/${slug}/parse`, { + method: "POST", + body: formData, + headers: baseHeaders(), + }); + + const data = await response.json(); + return { response, data }; + }, + + getParsedFiles: async function (slug, threadSlug = null) { + const basePath = new URL(`${fullApiUrl()}/workspace/${slug}/parsed-files`); + if (threadSlug) basePath.searchParams.set("threadSlug", threadSlug); + const response = await fetch(basePath, { + method: "GET", + headers: baseHeaders(), + }); + + const data = await response.json(); + return data; + }, uploadLink: async function (slug, link) { const response = await fetch(`${API_BASE}/workspace/${slug}/upload-link`, { method: "POST", @@ -454,6 +478,31 @@ const Workspace = { return { response, data }; }, + deleteParsedFiles: async function (slug, fileIds = []) { + const response = await fetch( + `${API_BASE}/workspace/${slug}/delete-parsed-files`, + { + method: "DELETE", + headers: baseHeaders(), + body: JSON.stringify({ fileIds }), + } + ); + return response.ok; + }, + + embedParsedFile: async function (slug, fileId) { + const response = await fetch( + `${API_BASE}/workspace/${slug}/embed-parsed-file/${fileId}`, + { + method: "POST", + headers: baseHeaders(), + } + ); + + const data = await response.json(); + return { response, data }; + }, + /** * Deletes and un-embeds a single file in a single call from a workspace * @param {string} slug - workspace slug diff --git a/frontend/src/utils/paths.js b/frontend/src/utils/paths.js index 966fd9c2..49a4eef7 100644 --- a/frontend/src/utils/paths.js +++ b/frontend/src/utils/paths.js @@ -211,6 +211,13 @@ export default { }, }, + // TODO: Migrate all docs.anythingllm.com links to the new docs. + documentation: { + contextWindows: () => { + return "https://docs.anythingllm.com/chatting-with-documents/introduction#you-exceed-the-context-window---what-now"; + }, + }, + experimental: { liveDocumentSync: { manage: () => `/settings/beta-features/live-document-sync/manage`, diff --git a/server/.gitignore b/server/.gitignore index 45a0f037..7f8b3a5c 100644 --- a/server/.gitignore +++ b/server/.gitignore @@ -14,6 +14,7 @@ storage/plugins/agent-flows/* storage/plugins/office-extensions/* storage/plugins/anythingllm_mcp_servers.json !storage/documents/DOCUMENTS.md +storage/direct-uploads logs/server.log *.db *.db-journal diff --git a/server/endpoints/workspaces.js b/server/endpoints/workspaces.js index 66605d65..af4eb998 100644 --- a/server/endpoints/workspaces.js +++ b/server/endpoints/workspaces.js @@ -32,14 +32,15 @@ const { } = require("../utils/files/pfp"); const { getTTSProvider } = require("../utils/TextToSpeech"); const { WorkspaceThread } = require("../models/workspaceThread"); + const truncate = require("truncate"); const { purgeDocument } = require("../utils/files/purgeDocument"); const { getModelTag } = require("./utils"); const { searchWorkspaceAndThreads } = require("../utils/helpers/search"); +const { workspaceParsedFilesEndpoints } = require("./workspacesParsedFiles"); function workspaceEndpoints(app) { if (!app) return; - const responseCache = new Map(); app.post( @@ -1060,6 +1061,9 @@ function workspaceEndpoints(app) { } } ); + + // Parsed Files in separate endpoint just to keep the workspace endpoints clean + workspaceParsedFilesEndpoints(app); } module.exports = { workspaceEndpoints }; diff --git a/server/endpoints/workspacesParsedFiles.js b/server/endpoints/workspacesParsedFiles.js new file mode 100644 index 00000000..fde289a7 --- /dev/null +++ b/server/endpoints/workspacesParsedFiles.js @@ -0,0 +1,199 @@ +const { reqBody, multiUserMode, userFromSession } = require("../utils/http"); +const { handleFileUpload } = require("../utils/files/multer"); +const { validatedRequest } = require("../utils/middleware/validatedRequest"); +const { Telemetry } = require("../models/telemetry"); +const { + flexUserRoleValid, + ROLES, +} = require("../utils/middleware/multiUserProtected"); +const { EventLogs } = require("../models/eventLogs"); +const { validWorkspaceSlug } = require("../utils/middleware/validWorkspace"); +const { CollectorApi } = require("../utils/collectorApi"); +const { WorkspaceThread } = require("../models/workspaceThread"); +const { WorkspaceParsedFiles } = require("../models/workspaceParsedFiles"); + +function workspaceParsedFilesEndpoints(app) { + if (!app) return; + + app.get( + "/workspace/:slug/parsed-files", + [validatedRequest, flexUserRoleValid([ROLES.all]), validWorkspaceSlug], + async (request, response) => { + try { + const threadSlug = request.query.threadSlug || null; + const user = await userFromSession(request, response); + const workspace = response.locals.workspace; + const thread = threadSlug + ? await WorkspaceThread.get({ slug: String(threadSlug) }) + : null; + const { files, contextWindow, currentContextTokenCount } = + await WorkspaceParsedFiles.getContextMetadataAndLimits( + workspace, + thread || null, + multiUserMode(response) ? user : null + ); + + return response + .status(200) + .json({ files, contextWindow, currentContextTokenCount }); + } catch (e) { + console.error(e.message, e); + return response.sendStatus(500).end(); + } + } + ); + + app.delete( + "/workspace/:slug/delete-parsed-files", + [validatedRequest, flexUserRoleValid([ROLES.all]), validWorkspaceSlug], + async function (request, response) { + try { + const { fileIds = [] } = reqBody(request); + if (!fileIds.length) return response.sendStatus(400).end(); + const success = await WorkspaceParsedFiles.delete({ + id: { in: fileIds.map((id) => parseInt(id)) }, + }); + return response.status(success ? 200 : 500).end(); + } catch (e) { + console.error(e.message, e); + return response.sendStatus(500).end(); + } + } + ); + + app.post( + "/workspace/:slug/embed-parsed-file/:fileId", + [ + validatedRequest, + // Embed is still an admin/manager only feature + flexUserRoleValid([ROLES.admin, ROLES.manager]), + validWorkspaceSlug, + ], + async function (request, response) { + const { fileId = null } = request.params; + try { + const user = await userFromSession(request, response); + const workspace = response.locals.workspace; + + if (!fileId) return response.sendStatus(400).end(); + const { success, error, document } = + await WorkspaceParsedFiles.moveToDocumentsAndEmbed(fileId, workspace); + + if (!success) { + return response.status(500).json({ + success: false, + error: error || "Failed to embed file", + }); + } + + await Telemetry.sendTelemetry("document_embedded"); + await EventLogs.logEvent( + "document_embedded", + { + documentName: document?.name || "unknown", + workspaceId: workspace.id, + }, + user?.id + ); + + return response.status(200).json({ + success: true, + error: null, + document, + }); + } catch (e) { + console.error(e.message, e); + return response.sendStatus(500).end(); + } finally { + if (!fileId) return; + await WorkspaceParsedFiles.delete({ id: parseInt(fileId) }); + } + } + ); + + app.post( + "/workspace/:slug/parse", + [ + validatedRequest, + flexUserRoleValid([ROLES.all]), + handleFileUpload, + validWorkspaceSlug, + ], + async function (request, response) { + try { + const user = await userFromSession(request, response); + const workspace = response.locals.workspace; + const Collector = new CollectorApi(); + const { originalname } = request.file; + const processingOnline = await Collector.online(); + + if (!processingOnline) { + return response.status(500).json({ + success: false, + error: `Document processing API is not online. Document ${originalname} will not be parsed.`, + }); + } + + const { success, reason, documents } = + await Collector.parseDocument(originalname); + if (!success || !documents?.[0]) { + return response.status(500).json({ + success: false, + error: reason || "No document returned from collector", + }); + } + + // Get thread ID if we have a slug + const { threadSlug = null } = reqBody(request); + const thread = threadSlug + ? await WorkspaceThread.get({ + slug: String(threadSlug), + workspace_id: workspace.id, + user_id: user?.id || null, + }) + : null; + const files = await Promise.all( + documents.map(async (doc) => { + const metadata = { ...doc }; + // Strip out pageContent + delete metadata.pageContent; + const filename = `${originalname}-${doc.id}.json`; + const { file, error: dbError } = await WorkspaceParsedFiles.create({ + filename, + workspaceId: workspace.id, + userId: user?.id || null, + threadId: thread?.id || null, + metadata: JSON.stringify(metadata), + tokenCountEstimate: doc.token_count_estimate || 0, + }); + + if (dbError) throw new Error(dbError); + return file; + }) + ); + + Collector.log(`Document ${originalname} parsed successfully.`); + await EventLogs.logEvent( + "document_uploaded_to_chat", + { + documentName: originalname, + workspace: workspace.slug, + thread: thread?.name || null, + }, + user?.id + ); + + return response.status(200).json({ + success: true, + error: null, + files, + }); + } catch (e) { + console.error(e.message, e); + return response.sendStatus(500).end(); + } + } + ); +} + +module.exports = { workspaceParsedFilesEndpoints }; diff --git a/server/jobs/cleanup-orphan-documents.js b/server/jobs/cleanup-orphan-documents.js new file mode 100644 index 00000000..9a50fcf0 --- /dev/null +++ b/server/jobs/cleanup-orphan-documents.js @@ -0,0 +1,64 @@ +const fs = require('fs'); +const path = require('path'); +const { log, conclude } = require('./helpers/index.js'); +const { WorkspaceParsedFiles } = require('../models/workspaceParsedFiles.js'); +const { directUploadsPath } = require('../utils/files'); + +async function batchDeleteFiles(filesToDelete, batchSize = 500) { + let deletedCount = 0; + let failedCount = 0; + + for (let i = 0; i < filesToDelete.length; i += batchSize) { + const batch = filesToDelete.slice(i, i + batchSize); + + try { + await Promise.all(batch.map(filePath => fs.unlink(filePath))); + deletedCount += batch.length; + + log(`Deleted batch ${Math.floor(i / batchSize) + 1}: ${batch.length} files`); + } catch (err) { + // If batch fails, try individual files sync + for (const filePath of batch) { + try { + fs.unlinkSync(filePath); + deletedCount++; + } catch (fileErr) { + failedCount++; + log(`Failed to delete ${filePath}: ${fileErr.message}`); + } + } + } + } + + return { deletedCount, failedCount }; +} + +(async () => { + try { + const filesToDelete = []; + const knownFiles = await WorkspaceParsedFiles + .where({}, null, null, { filename: true }) + .then(files => new Set(files.map(f => f.filename))); + + if (!fs.existsSync(directUploadsPath)) return log('No direct uploads path found - exiting.'); + const filesInDirectUploadsPath = fs.readdirSync(directUploadsPath); + if (filesInDirectUploadsPath.length === 0) return; + + for (let i = 0; i < filesInDirectUploadsPath.length; i++) { + const file = filesInDirectUploadsPath[i]; + if (knownFiles.has(file)) continue; + filesToDelete.push(path.resolve(directUploadsPath, file)); + } + + if (filesToDelete.length === 0) return; // No orphaned files to delete + log(`Found ${filesToDelete.length} orphaned files to delete`); + const { deletedCount, failedCount } = await batchDeleteFiles(filesToDelete); + log(`Deleted ${deletedCount} orphaned files`); + if (failedCount > 0) log(`Failed to delete ${failedCount} files`); + } catch (e) { + console.error(e) + log(`errored with ${e.message}`) + } finally { + conclude(); + } +})(); diff --git a/server/models/telemetry.js b/server/models/telemetry.js index 4c2e3576..0257d030 100644 --- a/server/models/telemetry.js +++ b/server/models/telemetry.js @@ -11,7 +11,7 @@ const Telemetry = { pubkey: "phc_9qu7QLpV8L84P3vFmEiZxL020t2EqIubP7HHHxrSsqS", stubDevelopmentEvents: true, // [DO NOT TOUCH] Core team only. label: "telemetry_id", - /* + /* Key value pairs of events that should be debounced to prevent spamming the logs. This should be used for events that could be triggered in rapid succession that are not useful to atomically log. The value is the number of seconds to debounce the event @@ -27,6 +27,7 @@ const Telemetry = { documents_embedded_in_workspace: 30, link_uploaded: 30, raw_document_uploaded: 30, + document_parsed: 30, }, id: async function () { diff --git a/server/models/workspace.js b/server/models/workspace.js index c195b176..6b361d6b 100644 --- a/server/models/workspace.js +++ b/server/models/workspace.js @@ -283,6 +283,10 @@ const Workspace = { return { ...workspace, documents: await Document.forWorkspace(workspace.id), + contextWindow: this._getContextWindow(workspace), + currentContextTokenCount: await this._getCurrentContextTokenCount( + workspace.id + ), }; } catch (error) { console.error(error.message); @@ -290,6 +294,42 @@ const Workspace = { } }, + /** + * Get the total token count of all parsed files in a workspace/thread + * @param {number} workspaceId - The ID of the workspace + * @param {number|null} threadId - Optional thread ID to filter by + * @returns {Promise} Total token count of all files + * @private + */ + async _getCurrentContextTokenCount(workspaceId, threadId = null) { + const { WorkspaceParsedFiles } = require("./workspaceParsedFiles"); + return await WorkspaceParsedFiles.totalTokenCount({ + workspaceId: Number(workspaceId), + threadId: threadId ? Number(threadId) : null, + }); + }, + + /** + * Get the context window size for a workspace based on its provider and model settings. + * If the workspace has no provider/model set, falls back to system defaults. + * @param {Workspace} workspace - The workspace to get context window for + * @returns {number|null} The context window size in tokens (defaults to null if no provider/model found) + * @private + */ + _getContextWindow: function (workspace) { + const { + getLLMProviderClass, + getBaseLLMProviderModel, + } = require("../utils/helpers"); + const provider = workspace.chatProvider || process.env.LLM_PROVIDER || null; + const LLMProvider = getLLMProviderClass({ provider }); + const model = + workspace.chatModel || getBaseLLMProviderModel({ provider }) || null; + + if (!provider || !model) return null; + return LLMProvider?.promptWindowLimit?.(model) || null; + }, + get: async function (clause = {}) { try { const workspace = await prisma.workspaces.findFirst({ @@ -299,7 +339,14 @@ const Workspace = { }, }); - return workspace || null; + if (!workspace) return null; + return { + ...workspace, + contextWindow: this._getContextWindow(workspace), + currentContextTokenCount: await this._getCurrentContextTokenCount( + workspace.id + ), + }; } catch (error) { console.error(error.message); return null; diff --git a/server/models/workspaceParsedFiles.js b/server/models/workspaceParsedFiles.js new file mode 100644 index 00000000..24c6d134 --- /dev/null +++ b/server/models/workspaceParsedFiles.js @@ -0,0 +1,227 @@ +const prisma = require("../utils/prisma"); +const { EventLogs } = require("./eventLogs"); +const { Document } = require("./documents"); +const { documentsPath, directUploadsPath } = require("../utils/files"); +const { safeJsonParse } = require("../utils/http"); +const fs = require("fs"); +const path = require("path"); + +const WorkspaceParsedFiles = { + create: async function ({ + filename, + workspaceId, + userId = null, + threadId = null, + metadata = null, + tokenCountEstimate = 0, + }) { + try { + const file = await prisma.workspace_parsed_files.create({ + data: { + filename, + workspaceId: parseInt(workspaceId), + userId: userId ? parseInt(userId) : null, + threadId: threadId ? parseInt(threadId) : null, + metadata, + tokenCountEstimate, + }, + }); + + await EventLogs.logEvent( + "workspace_file_uploaded", + { + filename, + workspaceId, + }, + userId + ); + + return { file, error: null }; + } catch (error) { + console.error("FAILED TO CREATE PARSED FILE RECORD.", error.message); + return { file: null, error: error.message }; + } + }, + + get: async function (clause = {}) { + try { + const file = await prisma.workspace_parsed_files.findFirst({ + where: clause, + }); + return file; + } catch (error) { + console.error(error.message); + return null; + } + }, + + where: async function ( + clause = {}, + limit = null, + orderBy = null, + select = null + ) { + try { + const files = await prisma.workspace_parsed_files.findMany({ + where: clause, + ...(limit !== null ? { take: limit } : {}), + ...(orderBy !== null ? { orderBy } : {}), + ...(select !== null ? { select } : {}), + }); + return files; + } catch (error) { + console.error(error.message); + return []; + } + }, + + delete: async function (clause = {}) { + try { + await prisma.workspace_parsed_files.deleteMany({ + where: clause, + }); + return true; + } catch (error) { + console.error(error.message); + return false; + } + }, + + totalTokenCount: async function (clause = {}) { + const { _sum } = await prisma.workspace_parsed_files.aggregate({ + where: clause, + _sum: { tokenCountEstimate: true }, + }); + return _sum.tokenCountEstimate || 0; + }, + + moveToDocumentsAndEmbed: async function (fileId, workspace) { + try { + const parsedFile = await this.get({ id: parseInt(fileId) }); + if (!parsedFile) throw new Error("File not found"); + + // Get file location from metadata + const metadata = safeJsonParse(parsedFile.metadata, {}); + const location = metadata.location; + if (!location) throw new Error("No file location in metadata"); + + // Get file from metadata location + const sourceFile = path.join(directUploadsPath, location.split("/")[1]); + if (!fs.existsSync(sourceFile)) throw new Error("Source file not found"); + + // Move to custom-documents + const customDocsPath = path.join(documentsPath, "custom-documents"); + if (!fs.existsSync(customDocsPath)) + fs.mkdirSync(customDocsPath, { recursive: true }); + + // Copy the file to custom-documents + const targetPath = path.join(customDocsPath, location.split("/")[1]); + fs.copyFileSync(sourceFile, targetPath); + fs.unlinkSync(sourceFile); + + const { + failedToEmbed = [], + errors = [], + embedded = [], + } = await Document.addDocuments( + workspace, + [`custom-documents/${location.split("/")[1]}`], + parsedFile.userId + ); + + if (failedToEmbed.length > 0) + throw new Error(errors[0] || "Failed to embed document"); + + const document = await Document.get({ + workspaceId: workspace.id, + docpath: embedded[0], + }); + return { success: true, error: null, document }; + } catch (error) { + console.error("Failed to move and embed file:", error); + return { success: false, error: error.message, document: null }; + } finally { + // Always delete the file after processing + await this.delete({ id: parseInt(fileId) }); + } + }, + + getContextMetadataAndLimits: async function ( + workspace, + thread = null, + user = null + ) { + try { + if (!workspace) throw new Error("Workspace is required"); + const files = await this.where({ + workspaceId: workspace.id, + threadId: thread?.id || null, + ...(user ? { userId: user.id } : {}), + }); + + const results = []; + let totalTokens = 0; + + for (const file of files) { + const metadata = safeJsonParse(file.metadata, {}); + totalTokens += file.tokenCountEstimate || 0; + results.push({ + id: file.id, + title: metadata.title || metadata.location, + location: metadata.location, + token_count_estimate: file.tokenCountEstimate, + }); + } + + return { + files: results, + contextWindow: workspace.contextWindow, + currentContextTokenCount: totalTokens, + }; + } catch (error) { + console.error("Failed to get context metadata:", error); + return { + files: [], + contextWindow: Infinity, + currentContextTokenCount: 0, + }; + } + }, + + getContextFiles: async function (workspace, thread = null, user = null) { + try { + const files = await this.where({ + workspaceId: workspace.id, + threadId: thread?.id || null, + ...(user ? { userId: user.id } : {}), + }); + + const results = []; + for (const file of files) { + const metadata = safeJsonParse(file.metadata, {}); + const location = metadata.location; + if (!location) continue; + + const sourceFile = path.join(directUploadsPath, location.split("/")[1]); + if (!fs.existsSync(sourceFile)) continue; + + const content = fs.readFileSync(sourceFile, "utf-8"); + const data = safeJsonParse(content, null); + if (!data?.pageContent) continue; + + results.push({ + pageContent: data.pageContent, + token_count_estimate: file.tokenCountEstimate, + ...metadata, + }); + } + + return results; + } catch (error) { + console.error("Failed to get context files:", error); + return []; + } + }, +}; + +module.exports = { WorkspaceParsedFiles }; diff --git a/server/prisma/migrations/20250808171557_init/migration.sql b/server/prisma/migrations/20250808171557_init/migration.sql new file mode 100644 index 00000000..4b3e7514 --- /dev/null +++ b/server/prisma/migrations/20250808171557_init/migration.sql @@ -0,0 +1,23 @@ +-- CreateTable +CREATE TABLE "workspace_parsed_files" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "filename" TEXT NOT NULL, + "workspaceId" INTEGER NOT NULL, + "userId" INTEGER, + "threadId" INTEGER, + "metadata" TEXT, + "tokenCountEstimate" INTEGER DEFAULT 0, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT "workspace_parsed_files_workspaceId_fkey" FOREIGN KEY ("workspaceId") REFERENCES "workspaces" ("id") ON DELETE CASCADE ON UPDATE CASCADE, + CONSTRAINT "workspace_parsed_files_userId_fkey" FOREIGN KEY ("userId") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE, + CONSTRAINT "workspace_parsed_files_threadId_fkey" FOREIGN KEY ("threadId") REFERENCES "workspace_threads" ("id") ON DELETE CASCADE ON UPDATE CASCADE +); + +-- CreateIndex +CREATE UNIQUE INDEX "workspace_parsed_files_filename_key" ON "workspace_parsed_files"("filename"); + +-- CreateIndex +CREATE INDEX "workspace_parsed_files_workspaceId_idx" ON "workspace_parsed_files"("workspaceId"); + +-- CreateIndex +CREATE INDEX "workspace_parsed_files_userId_idx" ON "workspace_parsed_files"("userId"); diff --git a/server/prisma/schema.prisma b/server/prisma/schema.prisma index 233c1e36..a3db69f1 100644 --- a/server/prisma/schema.prisma +++ b/server/prisma/schema.prisma @@ -83,6 +83,7 @@ model users { system_prompt_variables system_prompt_variables[] prompt_history prompt_history[] desktop_mobile_devices desktop_mobile_devices[] + workspace_parsed_files workspace_parsed_files[] } model recovery_codes { @@ -149,18 +150,20 @@ model workspaces { threads workspace_threads[] workspace_agent_invocations workspace_agent_invocations[] prompt_history prompt_history[] + workspace_parsed_files workspace_parsed_files[] } model workspace_threads { - id Int @id @default(autoincrement()) - name String - slug String @unique - workspace_id Int - user_id Int? - createdAt DateTime @default(now()) - lastUpdatedAt DateTime @default(now()) - workspace workspaces @relation(fields: [workspace_id], references: [id], onDelete: Cascade) - user users? @relation(fields: [user_id], references: [id], onDelete: Cascade) + id Int @id @default(autoincrement()) + name String + slug String @unique + workspace_id Int + user_id Int? + createdAt DateTime @default(now()) + lastUpdatedAt DateTime @default(now()) + workspace workspaces @relation(fields: [workspace_id], references: [id], onDelete: Cascade) + user users? @relation(fields: [user_id], references: [id], onDelete: Cascade) + workspace_parsed_files workspace_parsed_files[] @@index([workspace_id]) @@index([user_id]) @@ -371,3 +374,20 @@ model desktop_mobile_devices { @@index([userId]) } + +model workspace_parsed_files { + id Int @id @default(autoincrement()) + filename String @unique + workspaceId Int + userId Int? + threadId Int? + metadata String? + tokenCountEstimate Int? @default(0) + createdAt DateTime @default(now()) + workspace workspaces @relation(fields: [workspaceId], references: [id], onDelete: Cascade) + user users? @relation(fields: [userId], references: [id], onDelete: Cascade) + thread workspace_threads? @relation(fields: [threadId], references: [id], onDelete: Cascade) + + @@index([workspaceId]) + @@index([userId]) +} diff --git a/server/utils/BackgroundWorkers/index.js b/server/utils/BackgroundWorkers/index.js index a1341616..6fa43d0f 100644 --- a/server/utils/BackgroundWorkers/index.js +++ b/server/utils/BackgroundWorkers/index.js @@ -6,8 +6,26 @@ const setLogger = require("../logger"); class BackgroundService { name = "BackgroundWorkerService"; static _instance = null; + documentSyncEnabled = false; #root = path.resolve(__dirname, "../../jobs"); + #alwaysRunJobs = [ + { + name: "cleanup-orphan-documents", + timeout: "1m", + interval: "12hr", + }, + ]; + + #documentSyncJobs = [ + // Job for auto-sync of documents + // https://github.com/breejs/bree + { + name: "sync-watched-documents", + interval: "1hr", + }, + ]; + constructor() { if (BackgroundService._instance) { this.#log("SINGLETON LOCK: Using existing BackgroundService."); @@ -24,16 +42,14 @@ class BackgroundService { async boot() { const { DocumentSyncQueue } = require("../../models/documentSyncQueue"); - if (!(await DocumentSyncQueue.enabled())) { - this.#log("Feature is not enabled and will not be started."); - return; - } + this.documentSyncEnabled = await DocumentSyncQueue.enabled(); + const jobsToRun = this.jobs(); this.#log("Starting..."); this.bree = new Bree({ logger: this.logger, root: this.#root, - jobs: this.jobs(), + jobs: jobsToRun, errorHandler: this.onError, workerMessageHandler: this.onWorkerMessageHandler, runJobsAs: "process", @@ -41,7 +57,10 @@ class BackgroundService { this.graceful = new Graceful({ brees: [this.bree], logger: this.logger }); this.graceful.listen(); this.bree.start(); - this.#log("Service started"); + this.#log( + `Service started with ${jobsToRun.length} jobs`, + jobsToRun.map((j) => j.name) + ); } async stop() { @@ -54,14 +73,9 @@ class BackgroundService { /** @returns {import("@mintplex-labs/bree").Job[]} */ jobs() { - return [ - // Job for auto-sync of documents - // https://github.com/breejs/bree - { - name: "sync-watched-documents", - interval: "1hr", - }, - ]; + const activeJobs = [...this.#alwaysRunJobs]; + if (this.documentSyncEnabled) activeJobs.push(...this.#documentSyncJobs); + return activeJobs; } onError(error, _workerMetadata) { diff --git a/server/utils/chats/stream.js b/server/utils/chats/stream.js index 0ecf86c8..893a6941 100644 --- a/server/utils/chats/stream.js +++ b/server/utils/chats/stream.js @@ -1,6 +1,7 @@ const { v4: uuidv4 } = require("uuid"); const { DocumentManager } = require("../DocumentManager"); const { WorkspaceChats } = require("../../models/workspaceChats"); +const { WorkspaceParsedFiles } = require("../../models/workspaceParsedFiles"); const { getVectorDbClass, getLLMProvider } = require("../helpers"); const { writeResponseChunk } = require("../helpers/chat/responses"); const { grepAgents } = require("./agents"); @@ -130,6 +131,22 @@ async function streamChatWithWorkspace( }); }); + // Inject any parsed files for this workspace/thread/user + const parsedFiles = await WorkspaceParsedFiles.getContextFiles( + workspace, + thread || null, + user || null + ); + parsedFiles.forEach((doc) => { + const { pageContent, ...metadata } = doc; + contextTexts.push(doc.pageContent); + sources.push({ + text: + pageContent.slice(0, 1_000) + "...continued on in source document...", + ...metadata, + }); + }); + const vectorSearchResults = embeddingsCount !== 0 ? await VectorDb.performSimilaritySearch({ diff --git a/server/utils/collectorApi/index.js b/server/utils/collectorApi/index.js index d7953ce2..c991c299 100644 --- a/server/utils/collectorApi/index.js +++ b/server/utils/collectorApi/index.js @@ -232,6 +232,42 @@ class CollectorApi { return { success: false, content: null }; }); } + + /** + * Parse a document without processing it + * - Will append the options to the request body + * @param {string} filename - The filename of the document to parse + * @returns {Promise} - The response from the collector API + */ + async parseDocument(filename = "") { + if (!filename) return false; + + const data = JSON.stringify({ + filename, + options: this.#attachOptions(), + }); + + return await fetch(`${this.endpoint}/parse`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-Integrity": this.comkey.sign(data), + "X-Payload-Signer": this.comkey.encrypt( + new EncryptionManager().xPayload + ), + }, + body: data, + }) + .then((res) => { + if (!res.ok) throw new Error("Response could not be completed"); + return res.json(); + }) + .then((res) => res) + .catch((e) => { + this.log(e.message); + return { success: false, reason: e.message, documents: [] }; + }); + } } module.exports.CollectorApi = CollectorApi; diff --git a/server/utils/files/index.js b/server/utils/files/index.js index 47c358d4..73c442b6 100644 --- a/server/utils/files/index.js +++ b/server/utils/files/index.js @@ -7,6 +7,10 @@ const documentsPath = process.env.NODE_ENV === "development" ? path.resolve(__dirname, `../../storage/documents`) : path.resolve(process.env.STORAGE_DIR, `documents`); +const directUploadsPath = + process.env.NODE_ENV === "development" + ? path.resolve(__dirname, `../../storage/direct-uploads`) + : path.resolve(process.env.STORAGE_DIR, `direct-uploads`); const vectorCachePath = process.env.NODE_ENV === "development" ? path.resolve(__dirname, `../../storage/vector-cache`) @@ -468,6 +472,7 @@ module.exports = { normalizePath, isWithin, documentsPath, + directUploadsPath, hasVectorCachedFiles, purgeEntireVectorCache, getDocumentsByFolder, diff --git a/server/utils/helpers/index.js b/server/utils/helpers/index.js index 9e101a2b..8fccb4fd 100644 --- a/server/utils/helpers/index.js +++ b/server/utils/helpers/index.js @@ -402,19 +402,19 @@ function getBaseLLMProviderModel({ provider = null } = {}) { case "koboldcpp": return process.env.KOBOLD_CPP_MODEL_PREF; case "textgenwebui": - return process.env.TEXT_GEN_WEB_UI_API_KEY; + return null; case "cohere": return process.env.COHERE_MODEL_PREF; case "litellm": return process.env.LITE_LLM_MODEL_PREF; case "generic-openai": - return process.env.GENERIC_OPEN_AI_EMBEDDING_API_KEY; + return process.env.GENERIC_OPEN_AI_MODEL_PREF; case "bedrock": return process.env.AWS_BEDROCK_LLM_MODEL_PREFERENCE; case "deepseek": return process.env.DEEPSEEK_MODEL_PREF; case "apipie": - return process.env.APIPIE_LLM_API_KEY; + return process.env.APIPIE_LLM_MODEL_PREF; case "novita": return process.env.NOVITA_LLM_MODEL_PREF; case "xai": @@ -422,7 +422,7 @@ function getBaseLLMProviderModel({ provider = null } = {}) { case "nvidia-nim": return process.env.NVIDIA_NIM_LLM_MODEL_PREF; case "ppio": - return process.env.PPIO_API_KEY; + return process.env.PPIO_MODEL_PREF; case "dpais": return process.env.DPAIS_LLM_MODEL_PREF; case "moonshotai":