merlyn/server/jobs/helpers/scheduled-job-helper.js
Marcello Fitton 41495cdabe
feat: Scheduled Jobs (#5322)
* initialize

* expand tool result text limit | add syntax highlighting and json formatting to tool result rendering

* fix onError jsdoc

* lint

* fix unread icon

* route protection

* improve form handling for NewJobModal

* safeJsonParse

* remove unneeded comments

* remove trycatch

* add truncateText helper

* add explicit fallback value tos safeJsonParse

* add shared cron constant and helpers

* reduce frontend indirection

* use isLight to compute syntax highlighting theme

* remove dead code

* remove forJob and make job limit to 50

* create recomputeNextRunAt helper method

* add comment about nextRunAt recomputation

* add job queue and concurrency control to scheduled jobs

* use p-queue

* change default max concurrent value to 1

* add comment explaining internal scheduling system

* add recomputeNextRunAt on boot

* add generated documents to run details

* Modify toolsOverride functionality where no tools selected means no tools are given to the agent

add a select all/deselect all toggle button for easily selecting all
tools in the cerate job form

* create usePolling hook

* add polling to scheduled jobs and scheduled job runs pages

* add cron generation feature in job form

* remove cron generation feature | add cron builder feature | add max active scheduled jobs limit

* set MAX_ACTIVE to null

* replace hour and minute input fields with input with type time

* simplify

* organize components

* move components to bottom of page component

* change Generated Documents to Generated Files

* add i18n to cronstrue

* add i18n

* add type="button" to button elements

* refactor fileSource retrieval logic

* one scheduled job run can have status "running"

* add protection of file retrieveal from scheduled job in multiuser mode

* fix comments

* make job status default to queued

* add queued status

* fix bug with result trace rendering

* store timeout ref and clearTimeout once race settles

* remove unneeded handlerPromise tracking

* move imports to top level

* refactor hardcoded paths to path resolve functions

* implement new job form design

* simplify

* fix button styles

* fix runJob bug

* implement styles for scheduled jobs page

* apply dark mode figma styles

* delete unused translation key

* implement light mode for new new job modal, run history, and run details

* lint

* fix light mode scroll bar in tool call card

* adjust table header contrast

* fix type in subtitle

* kill workers when job is in-flight before deleting job

* add border-none to buttons

* change locale time to iso string

* import BackgroundService module level | instatiate backgroundService singltone once and reuse across handlers

* add p-queue, @breejs/later and cron-validate as core deps

* parse cron expression to a builder state once

* add theme to day buttons in cron builder

* fix stale tools selection caption

* flip popover when popover clips screen height

* make ScheduleJob.trigger() await the run insertion | disable run now button if job is in flight

* regen table

* refactor generated file card

* refactor frontend

* remove logs

* major refactor for tool picking, fix bree/later bug

* combine action endpoints, move contine to method

* fix unoptimized query with include + take + order

* fix dangerous use, refactor job to utils

* add copy content to text response

* improve notification system subscription for browser

* remove unused translations

* prevent gen-file cleanup job from deleting active job file generated references

* rich text copy

* Scheduled Jobs: Translations (#5482)

* add locales for scheduled jobs

* i18n

---------

Co-authored-by: Timothy Carambat <rambat1010@gmail.com>

* add config flag with UI notice

* update README

* telemetry datapoints

* Always use UTC on backend, convert to local in frontend

* fix tz render

* Add job killing

* cleanup thinking text in job notifications and break out reasoning in response text.
Also hide zero metrics since that is useless

* Port generatedFile schema to the normalized workspace chat `outputs` file format so porting to thread is simple and implem between chats <> jobs is 1:1

* what the fuck

* compiled bug

* fixed thinking oddity in complied frontend

* supress multi-toast

* fix duration call

* Revert "fix duration call"

This reverts commit 0491bc71f4223e65ea4046561b15b268fefb8da2.

* revert and reapply fix

---------

Co-authored-by: Timothy Carambat <rambat1010@gmail.com>
2026-04-29 12:05:46 -07:00

112 lines
3.4 KiB
JavaScript

const { safeJsonParse } = require("../../utils/http");
const { stripThinkingFromText } = require("./index.js");
/**
* Maximum time in milliseconds a scheduled job can run before being terminated.
* @type {number}
*/
const SCHEDULED_JOB_TIMEOUT_MS =
Number(process.env.SCHEDULED_JOB_TIMEOUT_MS) || 5 * 60 * 1000;
/**
* Create a callback function for the agent action.
* This is for intercepting messages from the agent and storing the results or thoughts, executions, traces, etc.
* @returns {object}
*/
function agentActionCb() {
const thoughts = [];
const toolCalls = [];
// Use a container object so the reference is preserved when values are updated
const state = {
textResponse: "",
metrics: {},
};
const handler = {
send(jsonStr) {
const data = safeJsonParse(jsonStr, null);
if (!data) return;
if (data.type === "statusResponse" && data.content) {
thoughts.push(data.content);
return;
}
if (data.type === "reportStreamEvent" && data.content) {
const inner = data.content;
if (inner.type === "textResponseChunk" && inner.content)
state.textResponse += inner.content;
if (inner.type === "fullTextResponse" && inner.content)
state.textResponse = inner.content;
if (inner.type === "usageMetrics" && inner.metrics)
state.metrics = inner.metrics;
return;
}
// Final message from agent (onMessage event)
if (data.content && data.from && data.from !== "USER") {
if (!state.textResponse) state.textResponse = data.content;
}
},
close() {},
};
return {
/** Handler to intercept messages from the agent */
handler,
/** Thoughts from the agent @type {string[]} */
thoughts,
/** Tool calls from the agent @type {object[]} */
toolCalls,
/** State container for textResponse and metrics - access via state.textResponse and state.metrics */
state,
};
}
function truncateNotificationBody(bodyText = "") {
if (!bodyText) return "Job completed";
if (bodyText.length <= 100) return bodyText;
return bodyText.slice(0, 100) + (bodyText.length > 100 ? "..." : "");
}
/**
* Send a web push notification to the primary user.
* @param {object} job - The scheduled job object.
* @param {string} runId - The ID of the scheduled job run.
* @param {string} textResponse - The text response from the agent.
* @param {function} logFn - The function to log the error.
* @returns {Promise<void>}
*/
async function sendWebPushNotification(job, runId, textResponse, logFn) {
try {
const {
pushNotificationService,
} = require("../../utils/PushNotifications/index.js");
await pushNotificationService.loadSubscriptions();
// Strip thinking tags from the text response and then truncate to 100 characters
// if the response is longer than 100 characters.
let notificationBody = stripThinkingFromText(textResponse);
notificationBody = truncateNotificationBody(notificationBody);
await pushNotificationService.sendNotification({
to: "primary",
payload: {
title: `${job.name} completed`,
body: notificationBody,
data: {
onClickUrl: `/settings/scheduled-jobs/${job.id}/runs/${runId}`,
},
},
});
} catch (pushError) {
logFn(`Failed to send push notification: ${pushError.message}`);
}
}
module.exports = {
sendWebPushNotification,
SCHEDULED_JOB_TIMEOUT_MS,
agentActionCb,
};