merlyn/server/utils/BackgroundWorkers/index.js
Marcello Fitton 41495cdabe
feat: Scheduled Jobs (#5322)
* initialize

* expand tool result text limit | add syntax highlighting and json formatting to tool result rendering

* fix onError jsdoc

* lint

* fix unread icon

* route protection

* improve form handling for NewJobModal

* safeJsonParse

* remove unneeded comments

* remove trycatch

* add truncateText helper

* add explicit fallback value tos safeJsonParse

* add shared cron constant and helpers

* reduce frontend indirection

* use isLight to compute syntax highlighting theme

* remove dead code

* remove forJob and make job limit to 50

* create recomputeNextRunAt helper method

* add comment about nextRunAt recomputation

* add job queue and concurrency control to scheduled jobs

* use p-queue

* change default max concurrent value to 1

* add comment explaining internal scheduling system

* add recomputeNextRunAt on boot

* add generated documents to run details

* Modify toolsOverride functionality where no tools selected means no tools are given to the agent

add a select all/deselect all toggle button for easily selecting all
tools in the cerate job form

* create usePolling hook

* add polling to scheduled jobs and scheduled job runs pages

* add cron generation feature in job form

* remove cron generation feature | add cron builder feature | add max active scheduled jobs limit

* set MAX_ACTIVE to null

* replace hour and minute input fields with input with type time

* simplify

* organize components

* move components to bottom of page component

* change Generated Documents to Generated Files

* add i18n to cronstrue

* add i18n

* add type="button" to button elements

* refactor fileSource retrieval logic

* one scheduled job run can have status "running"

* add protection of file retrieveal from scheduled job in multiuser mode

* fix comments

* make job status default to queued

* add queued status

* fix bug with result trace rendering

* store timeout ref and clearTimeout once race settles

* remove unneeded handlerPromise tracking

* move imports to top level

* refactor hardcoded paths to path resolve functions

* implement new job form design

* simplify

* fix button styles

* fix runJob bug

* implement styles for scheduled jobs page

* apply dark mode figma styles

* delete unused translation key

* implement light mode for new new job modal, run history, and run details

* lint

* fix light mode scroll bar in tool call card

* adjust table header contrast

* fix type in subtitle

* kill workers when job is in-flight before deleting job

* add border-none to buttons

* change locale time to iso string

* import BackgroundService module level | instatiate backgroundService singltone once and reuse across handlers

* add p-queue, @breejs/later and cron-validate as core deps

* parse cron expression to a builder state once

* add theme to day buttons in cron builder

* fix stale tools selection caption

* flip popover when popover clips screen height

* make ScheduleJob.trigger() await the run insertion | disable run now button if job is in flight

* regen table

* refactor generated file card

* refactor frontend

* remove logs

* major refactor for tool picking, fix bree/later bug

* combine action endpoints, move contine to method

* fix unoptimized query with include + take + order

* fix dangerous use, refactor job to utils

* add copy content to text response

* improve notification system subscription for browser

* remove unused translations

* prevent gen-file cleanup job from deleting active job file generated references

* rich text copy

* Scheduled Jobs: Translations (#5482)

* add locales for scheduled jobs

* i18n

---------

Co-authored-by: Timothy Carambat <rambat1010@gmail.com>

* add config flag with UI notice

* update README

* telemetry datapoints

* Always use UTC on backend, convert to local in frontend

* fix tz render

* Add job killing

* cleanup thinking text in job notifications and break out reasoning in response text.
Also hide zero metrics since that is useless

* Port generatedFile schema to the normalized workspace chat `outputs` file format so porting to thread is simple and implem between chats <> jobs is 1:1

* what the fuck

* compiled bug

* fixed thinking oddity in complied frontend

* supress multi-toast

* fix duration call

* Revert "fix duration call"

This reverts commit 0491bc71f4223e65ea4046561b15b268fefb8da2.

* revert and reapply fix

---------

Co-authored-by: Timothy Carambat <rambat1010@gmail.com>
2026-04-29 12:05:46 -07:00

396 lines
12 KiB
JavaScript

const path = require("path");
const Graceful = require("@ladjs/graceful");
const Bree = require("@mintplex-labs/bree");
const later = require("@breejs/later");
const PQueue = require("p-queue").default;
const setLogger = require("../logger");
// Use UTC time for cron interpretation. This ensures consistent behavior
// regardless of server timezone (e.g., when running in containers).
later.date.UTC();
class BackgroundService {
name = "BackgroundWorkerService";
static _instance = null;
documentSyncEnabled = false;
#root = path.resolve(__dirname, "../../jobs");
#scheduledJobTimers = new Map();
#scheduledJobQueue = new PQueue({
concurrency: Number(process.env.SCHEDULED_JOB_MAX_CONCURRENT) || 1,
});
// Tracks in-flight worker processes per scheduled jobId so we can kill any
// active runs when the job is deleted. Without this, a running worker
// outlives the cascade-delete of its scheduled_job_runs row and throws when
// it tries to write the result back (prisma.update on a missing row).
#scheduledJobWorkers = new Map();
#alwaysRunJobs = [
{
name: "cleanup-orphan-documents",
timeout: "1m",
interval: "12hr",
},
{
name: "cleanup-generated-files",
timeout: "5m",
interval: "8hr",
},
];
#documentSyncJobs = [
// Job for auto-sync of documents
// https://github.com/breejs/bree
{
name: "sync-watched-documents",
interval: "1hr",
},
];
constructor() {
if (BackgroundService._instance) {
this.#log("SINGLETON LOCK: Using existing BackgroundService.");
return BackgroundService._instance;
}
this.logger = setLogger();
BackgroundService._instance = this;
}
#log(text, ...args) {
console.log(`\x1b[36m[${this.name}]\x1b[0m ${text}`, ...args);
}
/**
* Returns the root path where job files are located.
* Handles the difference between development and production (bundled) environments.
* @returns {string}
*/
get jobsRoot() {
return this.#root;
}
/**
* Wraps the logger so that IPC messages carrying `silent: true` are
* suppressed. Bree unconditionally calls `logger.info(message)` for
* every IPC message from forked processes, so this is the only
* interception point.
*/
#makeBreeLogger() {
const base = this.logger;
const isSilent = (args) => args.length === 1 && args[0]?.silent === true;
const wrapped = Object.create(base);
wrapped.info = (...args) => {
if (!isSilent(args)) base.info(...args);
};
wrapped.log = (...args) => {
if (!isSilent(args)) base.log(...args);
};
return wrapped;
}
async boot() {
const { DocumentSyncQueue } = require("../../models/documentSyncQueue");
const { ScheduledJobRun } = require("../../models/scheduledJobRun");
this.documentSyncEnabled = await DocumentSyncQueue.enabled();
// Mark any orphaned scheduled job runs as failed (server crashed mid-execution)
const orphanedCount = await ScheduledJobRun.failOrphanedRuns();
if (orphanedCount > 0) {
this.#log(
`Marked ${orphanedCount} orphaned scheduled job run(s) as failed`
);
}
const jobsToRun = this.jobs();
this.#log("Starting...");
this.bree = new Bree({
logger: this.#makeBreeLogger(),
root: this.#root,
jobs: jobsToRun,
errorHandler: this.onError,
workerMessageHandler: this.onWorkerMessageHandler,
runJobsAs: "process",
});
this.graceful = new Graceful({ brees: [this.bree], logger: this.logger });
this.graceful.listen();
this.bree.start();
this.#log(
`Service started with ${jobsToRun.length} jobs`,
jobsToRun.map((j) => j.name)
);
await this.#bootScheduledJobs();
}
/**
* Cleanup scheduled jobs (in-process cron timers + p-queue)
*/
#cleanupScheduledJobs() {
for (const [id, timer] of this.#scheduledJobTimers) {
timer.clear();
this.#scheduledJobTimers.delete(id);
}
this.#scheduledJobQueue.clear();
}
async stop() {
this.#log("Stopping...");
this.#cleanupScheduledJobs();
if (!!this.graceful && !!this.bree) this.graceful.stopBree(this.bree, 0);
this.bree = null;
this.graceful = null;
this.#log("Service stopped");
}
/** @returns {import("@mintplex-labs/bree").Job[]} */
jobs() {
const activeJobs = [...this.#alwaysRunJobs];
if (this.documentSyncEnabled) activeJobs.push(...this.#documentSyncJobs);
return activeJobs;
}
onError(error, _workerMetadata) {
this.logger.error(`${error.message}`, {
service: "bg-worker",
origin: error.name,
});
}
onWorkerMessageHandler(message, _workerMetadata) {
if (message?.silent || message?.message?.silent) return;
this.logger.info(`${message.message}`, {
service: "bg-worker",
origin: message.name,
});
}
/**
* Spawn a one-off Bree worker process for the given script.
* @param {string} scriptPath - Absolute path to the worker JS file
* @returns {Promise<{ worker: ChildProcess, jobId: string }>}
*/
async spawnWorker(scriptPath) {
if (!this.bree)
throw new Error("BackgroundService has not been booted yet");
const jobId = `${path.basename(scriptPath, ".js")}-${Date.now()}`;
await this.bree.add({
name: jobId,
path: scriptPath,
});
await this.bree.run(jobId);
const worker = this.bree.workers.get(jobId);
if (!worker) throw new Error("Failed to get worker reference from Bree");
return { worker, jobId };
}
/**
* Remove a one-off Bree job registration so stale entries don't accumulate.
* @param {string} jobId
*/
async removeJob(jobId) {
if (!jobId) return;
try {
if (this.bree) await this.bree.remove(jobId);
} catch {
/* Job may already be removed */
}
}
// ---------------------------------------------------------------
// Scheduled Jobs — in-process cron timers + p-queue
//
// Bree tightly couples scheduling with worker spawning — when a
// Bree cron fires, it directly calls run() which immediately
// spawns a child process with no way to intercept it. We manage
// our own cron timers (via later.setInterval) to decouple
// scheduling from execution so we can route jobs through p-queue
// for global concurrency control before spawning workers.
//
// Per-job dedup lives in the database, not in process memory: any
// non-terminal row (`queued` or `running`) in scheduled_job_runs means
// the job has a run in flight. ScheduledJobRun.start() does the check +
// insert atomically and creates the row in `queued` status. The worker
// transitions it to `running` once it actually begins executing, so
// `startedAt` reflects execution start rather than queue-claim time.
// Cron-fired and manually-triggered enqueues use the same rule —
// at most one in-flight run per job, regardless of source.
// ---------------------------------------------------------------
/**
* Register cron timers for all enabled scheduled jobs on startup.
*/
async #bootScheduledJobs() {
const { ScheduledJob } = require("../../models/scheduledJob");
const enabledJobs = await ScheduledJob.allEnabled();
for (const job of enabledJobs) {
await ScheduledJob.recomputeNextRunAt(job.id);
this.addScheduledJob(job);
}
if (enabledJobs.length > 0) {
this.#log(
`Registered ${enabledJobs.length} scheduled job(s) (max concurrent: ${this.#scheduledJobQueue.concurrency})`,
enabledJobs.map((j) => `${j.name} (${j.schedule})`)
);
}
}
/**
* Register an in-process cron timer for a scheduled job.
* When the cron fires, the jobId is enqueued for execution.
* @param {object} job - scheduled_jobs DB record
*/
addScheduledJob(job) {
this.removeScheduledJob(job.id);
const sched = later.parse.cron(job.schedule);
const timer = later.setInterval(() => {
this.enqueueScheduledJob(job.id);
}, sched);
this.#scheduledJobTimers.set(job.id, timer);
}
/**
* Remove an in-process cron timer for a scheduled job and kill any in-flight
* worker processes for it. Killing in-flight workers prevents them from
* writing results back to a scheduled_job_runs row that the FK cascade (from
* a subsequent ScheduledJob.delete) is about to remove.
* @param {number} jobId - scheduled_jobs.id
*/
removeScheduledJob(jobId) {
const timer = this.#scheduledJobTimers.get(jobId);
if (timer) timer.clear();
this.#scheduledJobTimers.delete(jobId);
const workers = this.#scheduledJobWorkers.get(jobId);
if (workers) {
for (const worker of workers) {
try {
worker.kill("SIGTERM");
} catch {
/* worker may have already exited */
}
}
this.#scheduledJobWorkers.delete(jobId);
}
}
/**
* Re-sync a scheduled job's cron timer after an update.
* Removes the old timer and re-adds if still enabled.
* @param {number} jobId - scheduled_jobs.id
*/
async syncScheduledJob(jobId) {
const { ScheduledJob } = require("../../models/scheduledJob");
this.removeScheduledJob(jobId);
const job = await ScheduledJob.get({ id: Number(jobId) });
if (job && job.enabled) {
this.addScheduledJob(job);
}
}
/**
* Kill a specific run's worker process. This terminates the worker but does
* not update the database — the caller should use ScheduledJobRun.kill()
* before or after calling this to mark the run as failed.
*
* @param {number} jobId - scheduled_jobs.id (parent job)
* @param {number} runId - scheduled_job_runs.id (not directly used, but for
* future multi-run support; currently we kill all workers for the jobId)
* @returns {boolean} true if a worker was found and killed, false otherwise
*/
killRun(jobId, _runId) {
const workers = this.#scheduledJobWorkers.get(Number(jobId));
if (!workers || workers.size === 0) return false;
let killed = false;
for (const worker of workers) {
try {
worker.kill("SIGTERM");
killed = true;
} catch {
/* worker may have already exited */
}
}
return killed;
}
/**
* Enqueue a scheduled job for execution. Called by both the cron timer
* (in addScheduledJob) and the manual trigger endpoint. ScheduledJobRun.start()
* atomically rejects the call if the job already has a run in flight.
*
* @param {number} jobId - scheduled_jobs.id
* @returns {Promise<object|null>} the created run row, or null if skipped
* because a run is already in flight for this job.
*/
async enqueueScheduledJob(jobId) {
const { ScheduledJobRun } = require("../../models/scheduledJobRun");
const run = await ScheduledJobRun.start(jobId);
// if start returns null, skip enqueuing, schueduled job already has a run in flight
if (!run) return null;
this.#scheduledJobQueue.add(() =>
this.#runScheduledJobWorker(jobId, run.id).catch(async (err) => {
this.#log(`Scheduled job ${jobId} failed: ${err.message}`);
await ScheduledJobRun.failIfNotTerminal(run.id, err.message);
})
);
return run;
}
/**
* Spawn the run-scheduled-job worker for a given run and resolve when it
* exits so p-queue can advance. The worker reads its payload from an IPC
* `process.on("message", ...)` handler.
*
* @param {number} jobId
* @param {number} runId
* @returns {Promise<void>}
*/
async #runScheduledJobWorker(jobId, runId) {
const scriptPath = path.resolve(this.jobsRoot, "run-scheduled-job.js");
const { worker, jobId: workerId } = await this.spawnWorker(scriptPath);
if (!this.#scheduledJobWorkers.has(jobId)) {
this.#scheduledJobWorkers.set(jobId, new Set());
}
this.#scheduledJobWorkers.get(jobId).add(worker);
try {
worker.send({ jobId, runId });
await new Promise((resolve, reject) => {
worker.on("exit", (code, signal) => {
// SIGTERM is sent by removeScheduledJob when the job is deleted
// mid-run; treat that as a normal exit rather than a worker failure.
if (code === 0 || code == null || signal === "SIGTERM") {
resolve();
} else {
reject(new Error(`Worker exited with code ${code}`));
}
});
worker.on("error", reject);
});
} finally {
const workers = this.#scheduledJobWorkers.get(jobId);
if (workers) {
workers.delete(worker);
if (workers.size === 0) this.#scheduledJobWorkers.delete(jobId);
}
await this.removeJob(workerId).catch(() => {});
}
}
}
module.exports.BackgroundService = BackgroundService;