merlyn/server/jobs/sync-watched-documents.js
Marcello Fitton 4a4378ed99
chore: add ESLint to /server (#5126)
* add eslint config to server

* add break statements to switch case

* add support for browser globals and turn off empty catch blocks

* disable lines with useless try/catch wrappers

* format

* fix no-undef errors

* disbale lines violating no-unsafe-finally

* ignore syncStaticLists.mjs

* use proper null check for creatorId instead of unreachable nullish coalescing

* remove unneeded typescript eslint comment

* make no-unused-private-class-members a warning

* disable line for no-empty-objects

* add new lint script

* fix no-unused-vars violations

* make no-unsued-vars an error

---------

Co-authored-by: shatfield4 <seanhatfield5@gmail.com>
Co-authored-by: Timothy Carambat <rambat1010@gmail.com>
2026-03-05 16:32:45 -08:00

210 lines
7.3 KiB
JavaScript

const { Document } = require("../models/documents.js");
const { DocumentSyncQueue } = require("../models/documentSyncQueue.js");
const { CollectorApi } = require("../utils/collectorApi");
const { fileData } = require("../utils/files");
const { log, conclude, updateSourceDocument } = require("./helpers/index.js");
const { getVectorDbClass } = require("../utils/helpers/index.js");
const { DocumentSyncRun } = require("../models/documentSyncRun.js");
(async () => {
try {
const queuesToProcess = await DocumentSyncQueue.staleDocumentQueues();
if (queuesToProcess.length === 0) {
log("No outstanding documents to sync. Exiting.");
return;
}
const collector = new CollectorApi();
if (!(await collector.online())) {
log("Could not reach collector API. Exiting.");
return;
}
log(
`${queuesToProcess.length} watched documents have been found to be stale and will be updated now.`
);
for (const queue of queuesToProcess) {
let newContent = null;
const document = queue.workspaceDoc;
const workspace = document.workspace;
const { metadata, type, source } =
Document.parseDocumentTypeAndSource(document);
if (!metadata || !DocumentSyncQueue.validFileTypes.includes(type)) {
// Document is either broken, invalid, or not supported so drop it from future queues.
log(
`Document ${document.filename} has no metadata, is broken, or invalid and has been removed from all future runs.`
);
await DocumentSyncQueue.unwatch(document);
continue;
}
if (["link", "youtube"].includes(type)) {
const response = await collector.forwardExtensionRequest({
endpoint: "/ext/resync-source-document",
method: "POST",
body: JSON.stringify({
type,
options: { link: source },
}),
});
newContent = response?.content;
}
if (["confluence", "github", "gitlab", "drupalwiki"].includes(type)) {
const response = await collector.forwardExtensionRequest({
endpoint: "/ext/resync-source-document",
method: "POST",
body: JSON.stringify({
type,
options: { chunkSource: metadata.chunkSource },
}),
});
newContent = response?.content;
}
if (!newContent) {
// Check if the last "x" runs were all failures (not exits!). If so - remove the job entirely since it is broken.
const failedRunCount = (
await DocumentSyncRun.where(
{ queueId: queue.id },
DocumentSyncQueue.maxRepeatFailures,
{ createdAt: "desc" }
)
).filter(
(run) => run.status === DocumentSyncRun.statuses.failed
).length;
if (failedRunCount >= DocumentSyncQueue.maxRepeatFailures) {
log(
`Document ${document.filename} has failed to refresh ${failedRunCount} times continuously and will now be removed from the watched document set.`
);
await DocumentSyncQueue.unwatch(document);
continue;
}
log(
`Failed to get a new content response from collector for source ${source}. Skipping, but will retry next worker interval. Attempt ${failedRunCount === 0 ? 1 : failedRunCount}/${DocumentSyncQueue.maxRepeatFailures}`
);
await DocumentSyncQueue.saveRun(
queue.id,
DocumentSyncRun.statuses.failed,
{
filename: document.filename,
workspacesModified: [],
reason: "No content found.",
}
);
continue;
}
const currentDocumentData = await fileData(document.docpath);
if (currentDocumentData.pageContent === newContent) {
const nextSync = DocumentSyncQueue.calcNextSync(queue);
log(
`Source ${source} is unchanged and will be skipped. Next sync will be ${nextSync.toLocaleString()}.`
);
await DocumentSyncQueue._update(queue.id, {
lastSyncedAt: new Date().toISOString(),
nextSyncAt: nextSync.toISOString(),
});
await DocumentSyncQueue.saveRun(
queue.id,
DocumentSyncRun.statuses.exited,
{
filename: document.filename,
workspacesModified: [],
reason: "Content unchanged.",
}
);
continue;
}
// update the defined document and workspace vectorDB with the latest information
// it will skip cache and create a new vectorCache file.
const vectorDatabase = getVectorDbClass();
await vectorDatabase.deleteDocumentFromNamespace(
workspace.slug,
document.docId
);
await vectorDatabase.addDocumentToNamespace(
workspace.slug,
{
...currentDocumentData,
pageContent: newContent,
docId: document.docId,
},
document.docpath,
true
);
updateSourceDocument(document.docpath, {
...currentDocumentData,
pageContent: newContent,
docId: document.docId,
published: new Date().toLocaleString(),
// Todo: Update word count and token_estimate?
});
log(
`Workspace "${workspace.name}" vectors of ${source} updated. Document and vector cache updated.`
);
// Now we can bloom the results to all matching documents in all other workspaces
const workspacesModified = [workspace.slug];
const moreReferences = await Document.where(
{
id: { not: document.id },
filename: document.filename,
},
null,
null,
{ workspace: true }
);
if (moreReferences.length !== 0) {
log(
`${source} is referenced in ${moreReferences.length} other workspaces. Updating those workspaces as well...`
);
for (const additionalDocumentRef of moreReferences) {
const additionalWorkspace = additionalDocumentRef.workspace;
workspacesModified.push(additionalWorkspace.slug);
await vectorDatabase.deleteDocumentFromNamespace(
additionalWorkspace.slug,
additionalDocumentRef.docId
);
await vectorDatabase.addDocumentToNamespace(
additionalWorkspace.slug,
{
...currentDocumentData,
pageContent: newContent,
docId: additionalDocumentRef.docId,
},
additionalDocumentRef.docpath
);
log(
`Workspace "${additionalWorkspace.name}" vectors for ${source} was also updated with the new content from cache.`
);
}
}
const nextRefresh = DocumentSyncQueue.calcNextSync(queue);
log(
`${source} has been refreshed in all workspaces it is currently referenced in. Next refresh will be ${nextRefresh.toLocaleString()}.`
);
await DocumentSyncQueue._update(queue.id, {
lastSyncedAt: new Date().toISOString(),
nextSyncAt: nextRefresh.toISOString(),
});
await DocumentSyncQueue.saveRun(
queue.id,
DocumentSyncRun.statuses.success,
{ filename: document.filename, workspacesModified }
);
}
} catch (e) {
console.error(e);
log(`errored with ${e.message}`);
} finally {
conclude();
}
})();