merlyn/server/models/scheduledJobRun.js
Marcello Fitton 41495cdabe
feat: Scheduled Jobs (#5322)
* initialize

* expand tool result text limit | add syntax highlighting and json formatting to tool result rendering

* fix onError jsdoc

* lint

* fix unread icon

* route protection

* improve form handling for NewJobModal

* safeJsonParse

* remove unneeded comments

* remove trycatch

* add truncateText helper

* add explicit fallback value tos safeJsonParse

* add shared cron constant and helpers

* reduce frontend indirection

* use isLight to compute syntax highlighting theme

* remove dead code

* remove forJob and make job limit to 50

* create recomputeNextRunAt helper method

* add comment about nextRunAt recomputation

* add job queue and concurrency control to scheduled jobs

* use p-queue

* change default max concurrent value to 1

* add comment explaining internal scheduling system

* add recomputeNextRunAt on boot

* add generated documents to run details

* Modify toolsOverride functionality where no tools selected means no tools are given to the agent

add a select all/deselect all toggle button for easily selecting all
tools in the cerate job form

* create usePolling hook

* add polling to scheduled jobs and scheduled job runs pages

* add cron generation feature in job form

* remove cron generation feature | add cron builder feature | add max active scheduled jobs limit

* set MAX_ACTIVE to null

* replace hour and minute input fields with input with type time

* simplify

* organize components

* move components to bottom of page component

* change Generated Documents to Generated Files

* add i18n to cronstrue

* add i18n

* add type="button" to button elements

* refactor fileSource retrieval logic

* one scheduled job run can have status "running"

* add protection of file retrieveal from scheduled job in multiuser mode

* fix comments

* make job status default to queued

* add queued status

* fix bug with result trace rendering

* store timeout ref and clearTimeout once race settles

* remove unneeded handlerPromise tracking

* move imports to top level

* refactor hardcoded paths to path resolve functions

* implement new job form design

* simplify

* fix button styles

* fix runJob bug

* implement styles for scheduled jobs page

* apply dark mode figma styles

* delete unused translation key

* implement light mode for new new job modal, run history, and run details

* lint

* fix light mode scroll bar in tool call card

* adjust table header contrast

* fix type in subtitle

* kill workers when job is in-flight before deleting job

* add border-none to buttons

* change locale time to iso string

* import BackgroundService module level | instatiate backgroundService singltone once and reuse across handlers

* add p-queue, @breejs/later and cron-validate as core deps

* parse cron expression to a builder state once

* add theme to day buttons in cron builder

* fix stale tools selection caption

* flip popover when popover clips screen height

* make ScheduleJob.trigger() await the run insertion | disable run now button if job is in flight

* regen table

* refactor generated file card

* refactor frontend

* remove logs

* major refactor for tool picking, fix bree/later bug

* combine action endpoints, move contine to method

* fix unoptimized query with include + take + order

* fix dangerous use, refactor job to utils

* add copy content to text response

* improve notification system subscription for browser

* remove unused translations

* prevent gen-file cleanup job from deleting active job file generated references

* rich text copy

* Scheduled Jobs: Translations (#5482)

* add locales for scheduled jobs

* i18n

---------

Co-authored-by: Timothy Carambat <rambat1010@gmail.com>

* add config flag with UI notice

* update README

* telemetry datapoints

* Always use UTC on backend, convert to local in frontend

* fix tz render

* Add job killing

* cleanup thinking text in job notifications and break out reasoning in response text.
Also hide zero metrics since that is useless

* Port generatedFile schema to the normalized workspace chat `outputs` file format so porting to thread is simple and implem between chats <> jobs is 1:1

* what the fuck

* compiled bug

* fixed thinking oddity in complied frontend

* supress multi-toast

* fix duration call

* Revert "fix duration call"

This reverts commit 0491bc71f4223e65ea4046561b15b268fefb8da2.

* revert and reapply fix

---------

Co-authored-by: Timothy Carambat <rambat1010@gmail.com>
2026-04-29 12:05:46 -07:00

363 lines
11 KiB
JavaScript

const prisma = require("../utils/prisma");
const ScheduledJobRun = {
statuses: {
queued: "queued",
running: "running",
completed: "completed",
failed: "failed",
timed_out: "timed_out",
},
// Non-terminal statuses — a row in any of these states is considered
// "in flight" for dedup purposes. The parent claims a row as `queued`
// when enqueuing; the worker transitions it to `running` once it
// actually begins executing (it may sit in p-queue first).
nonTerminalStatuses: ["queued", "running"],
/**
* Claim a new run for a job. At most one in-flight run per job is allowed —
* if a `queued` or `running` row already exists, this returns null and the
* caller should drop the request. The check + insert run inside an
* interactive transaction so two concurrent callers cannot both pass;
* SQLite serializes writes.
*
* The row is created in `queued` status; the worker transitions it to
* `running` via markRunning() once it actually begins executing.
*
* @param {number} jobId
* @returns {Promise<object|null>} The created run row, or null if a run is
* already in progress for this job (or on failure).
*/
start: async function (jobId) {
try {
return await prisma.$transaction(async (tx) => {
const existing = await tx.scheduled_job_runs.findFirst({
where: {
jobId: Number(jobId),
status: { in: this.nonTerminalStatuses },
},
select: { id: true },
});
if (existing) return null;
return tx.scheduled_job_runs.create({
data: {
jobId: Number(jobId),
status: this.statuses.queued,
},
});
});
} catch (error) {
console.error("Failed to enqueue scheduled job run:", error.message);
return null;
}
},
/**
* Transition a queued run into the running state. Called by the worker as
* its first DB write, so `startedAt` reflects actual execution start rather
* than queue-claim time. Filtered updateMany makes it a no-op if the row
* has already been transitioned to a terminal state (e.g. parent failed it
* because the worker failed to spawn, then a stale child somehow boots).
*
* @param {number} id - scheduled_job_runs.id
* @returns {Promise<boolean>} true if the row transitioned, false otherwise
*/
markRunning: async function (id) {
try {
const result = await prisma.scheduled_job_runs.updateMany({
where: { id: Number(id), status: this.statuses.queued },
data: {
status: this.statuses.running,
startedAt: new Date(),
},
});
return result.count > 0;
} catch (error) {
console.error(
"Failed to transition scheduled job run to running:",
error.message
);
return false;
}
},
/**
* Mark a run as failed only if it has not already reached a terminal state.
* Used by the parent process when a worker exits unexpectedly — atomic
* filtered update prevents clobbering a row the worker already transitioned
* to `completed` (the rare race where the worker succeeded but exited
* non-zero during cleanup).
* @param {number} id - scheduled_job_runs.id
* @param {string} errorMsg
*/
failIfNotTerminal: async function (id, errorMsg) {
try {
const result = await prisma.scheduled_job_runs.updateMany({
where: {
id: Number(id),
status: { in: this.nonTerminalStatuses },
},
data: {
status: this.statuses.failed,
error: String(errorMsg || "Worker exited unexpectedly"),
completedAt: new Date(),
},
});
return result.count > 0;
} catch (error) {
console.error(
"Failed to conditionally fail scheduled job run:",
error.message
);
return false;
}
},
complete: async function (id, { result } = {}) {
try {
const run = await prisma.scheduled_job_runs.update({
where: { id: Number(id) },
data: {
status: this.statuses.completed,
result: typeof result === "string" ? result : JSON.stringify(result),
completedAt: new Date(),
},
});
return run;
} catch (error) {
console.error("Failed to complete scheduled job run:", error.message);
return null;
}
},
fail: async function (id, { error: errorMsg } = {}) {
try {
// Use updateMany with a filter to avoid overwriting a run that was
// already moved to a terminal state (e.g., killed by user).
const result = await prisma.scheduled_job_runs.updateMany({
where: {
id: Number(id),
status: { in: this.nonTerminalStatuses },
},
data: {
status: this.statuses.failed,
error: String(errorMsg || "Unknown error"),
completedAt: new Date(),
},
});
if (result.count === 0) return null;
return await this.get({ id: Number(id) });
} catch (error) {
console.error(
"Failed to mark scheduled job run as failed:",
error.message
);
return null;
}
},
timeout: async function (id) {
try {
// Use updateMany with a filter to avoid overwriting a run that was
// already moved to a terminal state (e.g., killed by user).
const result = await prisma.scheduled_job_runs.updateMany({
where: {
id: Number(id),
status: { in: this.nonTerminalStatuses },
},
data: {
status: this.statuses.timed_out,
error: "Job execution timed out",
completedAt: new Date(),
},
});
if (result.count === 0) return null;
return await this.get({ id: Number(id) });
} catch (error) {
console.error(
"Failed to mark scheduled job run as timed out:",
error.message
);
return null;
}
},
/**
* Kill a running or queued job run. This marks the run as failed with a
* user-initiated kill message. The actual worker process termination is
* handled by BackgroundService.killRun().
* - Killing a run will also mark it as read (user killed it, so dont bother with unread status)
* @param {number} id - scheduled_job_runs.id
* @returns {Promise<object|null>} The updated run row, or null if not killable
*/
kill: async function (id) {
try {
const result = await prisma.scheduled_job_runs.updateMany({
where: {
id: Number(id),
status: { in: this.nonTerminalStatuses },
},
data: {
status: this.statuses.failed,
error: "Job killed by user",
completedAt: new Date(),
readAt: new Date(),
},
});
if (result.count === 0) return null;
return await this.get({ id: Number(id) });
} catch (error) {
console.error("Failed to kill scheduled job run:", error.message);
return null;
}
},
get: async function (clause = {}, include = {}) {
try {
const run = await prisma.scheduled_job_runs.findFirst({
where: clause,
...(Object.keys(include).length > 0 ? { include } : {}),
});
return run || null;
} catch (error) {
console.error("Failed to get scheduled job run:", error.message);
return null;
}
},
where: async function (
clause = {},
limit = null,
orderBy = null,
include = {},
offset = 0
) {
try {
const results = await prisma.scheduled_job_runs.findMany({
where: clause,
...(limit !== null ? { take: limit } : {}),
...(orderBy !== null
? { orderBy }
: { orderBy: { startedAt: "desc" } }),
...(Object.keys(include).length > 0 ? { include } : {}),
...(offset !== null ? { skip: offset } : {}),
});
return results;
} catch (error) {
console.error("Failed to query scheduled job runs:", error.message);
return [];
}
},
markRead: async function (id) {
try {
await prisma.scheduled_job_runs.update({
where: { id: Number(id) },
data: { readAt: new Date() },
});
return true;
} catch (error) {
console.error("Failed to mark run as read:", error.message);
return false;
}
},
delete: async function (clause = {}) {
try {
await prisma.scheduled_job_runs.deleteMany({ where: clause });
return true;
} catch (error) {
console.error("Failed to delete scheduled job runs:", error.message);
return false;
}
},
/**
* Mark all orphaned in-flight runs (queued or running) as failed — used on
* cold startup to recover rows whose owning worker died with the server.
*/
failOrphanedRuns: async function () {
try {
const result = await prisma.scheduled_job_runs.updateMany({
where: { status: { in: this.nonTerminalStatuses } },
data: {
status: this.statuses.failed,
error: "Server restarted during execution",
completedAt: new Date(),
},
});
return result.count;
} catch (error) {
console.error("Failed to fail orphaned runs:", error.message);
return 0;
}
},
/**
* Continue a run in a workspace thread.
* This will create a new workspace and thread specific for the run if they do not exist, and add the run's response to the thread.
* @param {number} runId - The ID of the run to continue.
* @returns {Promise<{workspace: import("@prisma/client").workspaces | null, thread: import("@prisma/client").workspace_threads | null, error: string | null}>} A promise that resolves to an object containing the workspace, thread, and an error message if applicable.
*/
continueInThread: async function (runId) {
try {
const { Workspace } = require("./workspace");
const { WorkspaceThread } = require("./workspaceThread");
const { WorkspaceChats } = require("./workspaceChats");
const { safeJsonParse } = require("../utils/http");
const run = await this.get({ id: Number(runId) }, { job: true });
if (!run) throw new Error("Run not found");
const result = safeJsonParse(run.result, {});
const responseText = result?.text || "No response was generated.";
// Get or create the "Scheduled Jobs" workspace
const { workspace, error: workspaceError } = await Workspace.upsert(
{ slug: "scheduled-jobs" },
{
name: "Scheduled Jobs",
slug: "scheduled-jobs",
chatMode: "automatic",
}
);
if (workspaceError)
throw new Error(workspaceError || "Failed to create workspace");
const { thread, message: threadError } =
await WorkspaceThread.new(workspace);
if (threadError)
throw new Error(threadError || "Failed to create thread");
await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: run.job.prompt,
response: {
text: responseText,
sources: result.sources || [],
outputs: result.outputs || [],
type: "chat",
},
threadId: thread.id,
include: true,
});
return {
workspace,
thread,
error: null,
};
} catch (error) {
return {
workspace: null,
thread: null,
error: error.message ?? "Unknown error",
};
}
},
};
module.exports = { ScheduledJobRun };