merlyn/server/utils/agents/ephemeral.js
Marcello Fitton 41495cdabe
feat: Scheduled Jobs (#5322)
* initialize

* expand tool result text limit | add syntax highlighting and json formatting to tool result rendering

* fix onError jsdoc

* lint

* fix unread icon

* route protection

* improve form handling for NewJobModal

* safeJsonParse

* remove unneeded comments

* remove trycatch

* add truncateText helper

* add explicit fallback value tos safeJsonParse

* add shared cron constant and helpers

* reduce frontend indirection

* use isLight to compute syntax highlighting theme

* remove dead code

* remove forJob and make job limit to 50

* create recomputeNextRunAt helper method

* add comment about nextRunAt recomputation

* add job queue and concurrency control to scheduled jobs

* use p-queue

* change default max concurrent value to 1

* add comment explaining internal scheduling system

* add recomputeNextRunAt on boot

* add generated documents to run details

* Modify toolsOverride functionality where no tools selected means no tools are given to the agent

add a select all/deselect all toggle button for easily selecting all
tools in the cerate job form

* create usePolling hook

* add polling to scheduled jobs and scheduled job runs pages

* add cron generation feature in job form

* remove cron generation feature | add cron builder feature | add max active scheduled jobs limit

* set MAX_ACTIVE to null

* replace hour and minute input fields with input with type time

* simplify

* organize components

* move components to bottom of page component

* change Generated Documents to Generated Files

* add i18n to cronstrue

* add i18n

* add type="button" to button elements

* refactor fileSource retrieval logic

* one scheduled job run can have status "running"

* add protection of file retrieveal from scheduled job in multiuser mode

* fix comments

* make job status default to queued

* add queued status

* fix bug with result trace rendering

* store timeout ref and clearTimeout once race settles

* remove unneeded handlerPromise tracking

* move imports to top level

* refactor hardcoded paths to path resolve functions

* implement new job form design

* simplify

* fix button styles

* fix runJob bug

* implement styles for scheduled jobs page

* apply dark mode figma styles

* delete unused translation key

* implement light mode for new new job modal, run history, and run details

* lint

* fix light mode scroll bar in tool call card

* adjust table header contrast

* fix type in subtitle

* kill workers when job is in-flight before deleting job

* add border-none to buttons

* change locale time to iso string

* import BackgroundService module level | instatiate backgroundService singltone once and reuse across handlers

* add p-queue, @breejs/later and cron-validate as core deps

* parse cron expression to a builder state once

* add theme to day buttons in cron builder

* fix stale tools selection caption

* flip popover when popover clips screen height

* make ScheduleJob.trigger() await the run insertion | disable run now button if job is in flight

* regen table

* refactor generated file card

* refactor frontend

* remove logs

* major refactor for tool picking, fix bree/later bug

* combine action endpoints, move contine to method

* fix unoptimized query with include + take + order

* fix dangerous use, refactor job to utils

* add copy content to text response

* improve notification system subscription for browser

* remove unused translations

* prevent gen-file cleanup job from deleting active job file generated references

* rich text copy

* Scheduled Jobs: Translations (#5482)

* add locales for scheduled jobs

* i18n

---------

Co-authored-by: Timothy Carambat <rambat1010@gmail.com>

* add config flag with UI notice

* update README

* telemetry datapoints

* Always use UTC on backend, convert to local in frontend

* fix tz render

* Add job killing

* cleanup thinking text in job notifications and break out reasoning in response text.
Also hide zero metrics since that is useless

* Port generatedFile schema to the normalized workspace chat `outputs` file format so porting to thread is simple and implem between chats <> jobs is 1:1

* what the fuck

* compiled bug

* fixed thinking oddity in complied frontend

* supress multi-toast

* fix duration call

* Revert "fix duration call"

This reverts commit 0491bc71f4223e65ea4046561b15b268fefb8da2.

* revert and reapply fix

---------

Co-authored-by: Timothy Carambat <rambat1010@gmail.com>
2026-04-29 12:05:46 -07:00

642 lines
21 KiB
JavaScript

const AIbitat = require("./aibitat");
const AgentPlugins = require("./aibitat/plugins");
const ImportedPlugin = require("./imported");
const MCPCompatibilityLayer = require("../MCP");
const { AgentFlows } = require("../agentFlows");
const { httpSocket } = require("./aibitat/plugins/http-socket.js");
const { User } = require("../../models/user");
const { Workspace } = require("../../models/workspace");
const { WorkspaceChats } = require("../../models/workspaceChats");
const { WorkspaceParsedFiles } = require("../../models/workspaceParsedFiles");
const { DocumentManager } = require("../DocumentManager");
const { safeJsonParse } = require("../http");
const {
USER_AGENT,
WORKSPACE_AGENT,
agentSkillsFromSystemSettings,
} = require("./defaults");
const { AgentHandler } = require(".");
const {
WorkspaceAgentInvocation,
} = require("../../models/workspaceAgentInvocation");
/**
* This is an instance and functional Agent handler, but it does not utilize
* sessions or websocket's and is instead a singular one-off agent run that does
* not persist between invocations
*/
class EphemeralAgentHandler extends AgentHandler {
/** @type {string|null} the unique identifier for the agent invocation */
#invocationUUID = null;
/** @type {import("@prisma/client").workspaces|null} the workspace to use for the agent */
#workspace = null;
/** @type {import("@prisma/client").users["id"]|null} the user id to use for the agent */
#userId = null;
/** @type {import("@prisma/client").workspace_threads|null} the workspace thread id to use for the agent */
#threadId = null;
/** @type {string|null} the session id to use for the agent */
#sessionId = null;
/** @type {string|null} the prompt to use for the agent */
#prompt = null;
/** @type {string[]} the functions to load into the agent (Aibitat plugins) */
#funcsToLoad = [];
/** @type {Array<{name: string, mime: string, contentString: string}>} attachments for multimodal support */
#attachments = [];
/** @type {AIbitat|null} */
aibitat = null;
/** @type {string|null} */
channel = null;
/** @type {string|null} */
provider = null;
/** @type {string|null} the model to use for the agent */
model = null;
/**
* @param {{
* uuid: string,
* workspace: import("@prisma/client").workspaces|null,
* prompt: string,
* userId: import("@prisma/client").users["id"]|null,
* threadId: import("@prisma/client").workspace_threads["id"]|null,
* sessionId: string|null,
* attachments: Array<{name: string, mime: string, contentString: string}>
* }} parameters
*/
constructor({
uuid,
workspace = null,
prompt,
userId = null,
threadId = null,
sessionId = null,
attachments = [],
}) {
super({ uuid });
this.#invocationUUID = uuid;
this.#workspace = workspace;
this.#prompt = prompt;
// Note: userId for ephemeral agent is only available
// via the workspace-thread chat endpoints for the API
// since workspaces can belong to multiple users.
this.#userId = userId;
this.#threadId = threadId;
this.#sessionId = sessionId;
this.#attachments = attachments;
}
log(text, ...args) {
console.log(`\x1b[36m[EphemeralAgentHandler]\x1b[0m ${text}`, ...args);
}
closeAlert() {
this.log(`End ${this.#invocationUUID}::${this.provider}:${this.model}`);
}
async #chatHistory(limit = 10) {
if (!this.#workspace) return [];
try {
const rawHistory = (
await WorkspaceChats.where(
{
workspaceId: this.#workspace.id,
user_id: this.#userId || null,
thread_id: this.#threadId || null,
api_session_id: this.#sessionId,
include: true,
},
limit,
{ id: "desc" }
)
).reverse();
const agentHistory = [];
rawHistory.forEach((chatLog) => {
agentHistory.push(
{
from: USER_AGENT.name,
to: WORKSPACE_AGENT.name,
content: chatLog.prompt,
state: "success",
},
{
from: WORKSPACE_AGENT.name,
to: USER_AGENT.name,
content: safeJsonParse(chatLog.response)?.text || "",
state: "success",
}
);
});
return agentHistory;
} catch (e) {
this.log("Error loading chat history", e.message);
return [];
}
}
/**
* Attempts to find a fallback provider and model to use if the workspace
* does not have an explicit `agentProvider` and `agentModel` set.
* 1. Fallback to the workspace `chatProvider` and `chatModel` if they exist.
* 2. Fallback to the system `LLM_PROVIDER` and try to load the associated default model via ENV params or a base available model.
* 3. Otherwise, return null - will likely throw an error the user can act on.
* @returns {object|null} - An object with provider and model keys.
*/
#getFallbackProvider() {
// First, fallback to the workspace chat provider and model if they exist
if (this.#workspace?.chatProvider && this.#workspace?.chatModel) {
return {
provider: this.#workspace.chatProvider,
model: this.#workspace.chatModel,
};
}
// If workspace does not have chat provider and model fallback
// to system provider and try to load provider default model
const systemProvider = process.env.LLM_PROVIDER;
const systemModel = this.providerDefault(systemProvider);
if (systemProvider && systemModel) {
return {
provider: systemProvider,
model: systemModel,
};
}
return null;
}
/**
* Finds or assumes the model preference value to use for API calls.
* If multi-model loading is supported, we use their agent model selection of the workspace
* If not supported, we attempt to fallback to the system provider value for the LLM preference
* and if that fails - we assume a reasonable base model to exist.
* @returns {string|null} the model preference value to use in API calls
*/
#fetchModel() {
// Provider was not explicitly set for workspace, so we are going to run our fallback logic
// that will set a provider and model for us to use.
if (!this.provider) {
const fallback = this.#getFallbackProvider();
if (!fallback) throw new Error("No valid provider found for the agent.");
this.provider = fallback.provider; // re-set the provider to the fallback provider so it is not null.
return fallback.model; // set its defined model based on fallback logic.
}
// The provider was explicitly set, so check if the workspace has an agent model set.
if (this.#workspace?.agentModel) return this.#workspace.agentModel;
// Otherwise, we have no model to use - so guess a default model to use via the provider
// and it's system ENV params and if that fails - we return either a base model or null.
return this.providerDefault();
}
#providerSetupAndCheck() {
this.provider = this.#workspace?.agentProvider ?? null;
this.model = this.#fetchModel();
if (!this.provider)
throw new Error("No valid provider found for the agent.");
this.log(`Start ${this.#invocationUUID}::${this.provider}:${this.model}`);
this.checkSetup();
}
async #attachPlugins(args) {
for (const name of this.#funcsToLoad) {
// Load child plugin
if (name.includes("#")) {
const [parent, childPluginName] = name.split("#");
if (!AgentPlugins.hasOwnProperty(parent)) {
this.log(
`${parent} is not a valid plugin. Skipping inclusion to agent cluster.`
);
continue;
}
const childPlugin = AgentPlugins[parent].plugin.find(
(child) => child.name === childPluginName
);
if (!childPlugin) {
this.log(
`${parent} does not have child plugin named ${childPluginName}. Skipping inclusion to agent cluster.`
);
continue;
}
const callOpts = this.parseCallOptions(
args,
childPlugin?.startupConfig?.params,
name
);
this.aibitat.use(childPlugin.plugin(callOpts));
this.log(
`Attached ${parent}:${childPluginName} plugin to Agent cluster`
);
continue;
}
// Load flow plugin. This is marked by `@@flow_` in the array of functions to load.
// Replace the @@flow_ placeholder in the agent's function list with the actual
// tool name so the function lookup in reply() can find it.
if (name.startsWith("@@flow_")) {
const uuid = name.replace("@@flow_", "");
const plugin = AgentFlows.loadFlowPlugin(uuid, this.aibitat);
if (!plugin) {
this.log(
`Flow ${uuid} not found in flows directory. Skipping inclusion to agent cluster.`
);
continue;
}
this.aibitat.agents.get("@agent").functions = this.aibitat.agents
.get("@agent")
.functions.filter((f) => f !== name);
this.aibitat.agents.get("@agent").functions.push(plugin.name);
this.aibitat.use(plugin.plugin());
this.log(
`Attached flow ${plugin.name} (${plugin.flowName}) plugin to Agent cluster`
);
continue;
}
// Load MCP plugin. This is marked by `@@mcp_` in the array of functions to load.
// All sub-tools are loaded here and are denoted by `pluginName:toolName` as their identifier.
// This will replace the parent MCP server plugin with the sub-tools as child plugins so they
// can be called directly by the agent when invoked.
// Since to get to this point, the `activeMCPServers` method has already been called, we can
// safely assume that the MCP server is running and the tools are available/loaded.
if (name.startsWith("@@mcp_")) {
const mcpPluginName = name.replace("@@mcp_", "");
const plugins =
await new MCPCompatibilityLayer().convertServerToolsToPlugins(
mcpPluginName,
this.aibitat
);
if (!plugins) {
this.log(
`MCP ${mcpPluginName} not found in MCP server config. Skipping inclusion to agent cluster.`
);
continue;
}
// Remove the old function from the agent functions directly
// and push the new ones onto the end of the array so that they are loaded properly.
this.aibitat.agents.get("@agent").functions = this.aibitat.agents
.get("@agent")
.functions.filter((f) => f.name !== name);
for (const plugin of plugins)
this.aibitat.agents.get("@agent").functions.push(plugin.name);
plugins.forEach((plugin) => {
this.aibitat.use(plugin.plugin());
this.log(
`Attached MCP::${plugin.toolName} MCP tool to Agent cluster`
);
});
continue;
}
// Load imported plugin. This is marked by `@@` in the array of functions to load.
// and is the @@hubID of the plugin.
if (name.startsWith("@@")) {
const hubId = name.replace("@@", "");
const valid = ImportedPlugin.validateImportedPluginHandler(hubId);
if (!valid) {
this.log(
`Imported plugin by hubId ${hubId} not found in plugin directory. Skipping inclusion to agent cluster.`
);
continue;
}
const plugin = ImportedPlugin.loadPluginByHubId(hubId);
const callOpts = plugin.parseCallOptions();
this.aibitat.use(plugin.plugin(callOpts));
this.log(
`Attached ${plugin.name} (${hubId}) imported plugin to Agent cluster`
);
continue;
}
// Load single-stage plugin.
if (!AgentPlugins.hasOwnProperty(name)) {
this.log(
`${name} is not a valid plugin. Skipping inclusion to agent cluster.`
);
continue;
}
const callOpts = this.parseCallOptions(
args,
AgentPlugins[name].startupConfig.params
);
const AIbitatPlugin = AgentPlugins[name];
this.aibitat.use(AIbitatPlugin.plugin(callOpts));
this.log(`Attached ${name} plugin to Agent cluster`);
}
}
async #loadAgents() {
// Default User agent and workspace agent
this.log(`Attaching user and default agent to Agent cluster.`);
this.aibitat.agent(USER_AGENT.name, USER_AGENT.getDefinition());
const user = this.#userId
? await User.get({ id: Number(this.#userId) })
: null;
this.aibitat.agent(
WORKSPACE_AGENT.name,
await WORKSPACE_AGENT.getDefinition(this.provider, this.#workspace, user)
);
this.#funcsToLoad = [
...(await agentSkillsFromSystemSettings()),
...ImportedPlugin.activeImportedPlugins(),
...AgentFlows.activeFlowPlugins(),
...(await new MCPCompatibilityLayer().activeMCPServers()),
];
}
async init() {
this.#providerSetupAndCheck();
return this;
}
/**
* Fetch fresh parsed files and pinned documents, format them for injection into user messages.
* Called on every chat turn to ensure context is always up-to-date.
* @returns {Promise<string>} Formatted context string to append to user message
*/
async #fetchParsedFileContext() {
if (!this.#workspace) return "";
const user = this.#userId ? { id: this.#userId } : null;
const thread = this.#threadId ? { id: this.#threadId } : null;
const documentManager = new DocumentManager({
workspace: this.#workspace,
});
return Promise.all([
WorkspaceParsedFiles.getContextFiles(this.#workspace, thread, user),
documentManager.pinnedDocs(),
])
.then(([parsedFiles, pinnedDocs]) => {
const allDocuments = [
...(parsedFiles || []).map((doc) => ({
name: doc.title || "Uploaded Document",
content: doc.pageContent,
})),
...(pinnedDocs || []).map((doc) => ({
name: doc.title || doc.metadata?.title || "Pinned Document",
content: doc.pageContent,
})),
];
if (allDocuments.length === 0) return "";
if (parsedFiles?.length > 0)
this.log(
`Injecting ${parsedFiles.length} parsed file(s) into user message`
);
if (pinnedDocs?.length > 0)
this.log(
`Injecting ${pinnedDocs.length} pinned document(s) into user message`
);
return (
"\n\n<attached_documents>\n" +
allDocuments
.map((doc, i) => {
const filename = doc.name || `Document ${i + 1}`;
return `<document name="${filename}">\n${doc.content}\n</document>`;
})
.join("\n") +
"\n</attached_documents>"
);
})
.catch((e) => {
this.log("Error fetching parsed file context", e.message);
return "";
});
}
/**
* Strip the @agent command from the message if it exists.
* Prevents hallucination by the agent when the @agent command is used from the model thinking
* it is an agent or something itself.
* If the user sent nothing after the @agent command - assume its a greeting.
* @param {string} message - The message to strip the @agent command from.
* @returns {string} The message with the @agent command stripped.
*/
#stripAgentCommand(message = "") {
const stripped = String(message)
.replace(/^@agent\s*/, "")
.trim();
if (!stripped) return "Hello!";
return stripped;
}
async createAIbitat(
args = {
handler: null,
telegramChatId: null,
toolOverrides: null,
}
) {
this.aibitat = new AIbitat({
provider: this.provider ?? "openai",
model: this.model ?? "gpt-4o",
chats: await this.#chatHistory(20),
handlerProps: {
invocation: {
workspace: this.#workspace,
workspace_id: this.#workspace?.id ?? null,
},
log: this.log,
},
});
// Register callback to fetch fresh parsed file context on each chat turn
// This injects parsed files into user messages instead of system prompt
this.aibitat.fetchParsedFileContext = () => this.#fetchParsedFileContext();
// Attach HTTP response object if defined for chunk streaming.
// When telegramChatId is provided, tool approval via Telegram is enabled.
this.log(`Attached ${httpSocket.name} plugin to Agent cluster`);
this.aibitat.use(
httpSocket.plugin({
handler: args.handler,
muteUserReply: true,
introspection: true,
telegramChatId: args.telegramChatId,
})
);
// Load required agents (Default + custom)
await this.#loadAgents();
// Override tools if specified (e.g., for scheduled jobs with per-job tool selection)
if (args.toolOverrides) {
this.#funcsToLoad = args.toolOverrides;
const agentDef = this.aibitat.agents.get("@agent");
if (agentDef) agentDef.functions = args.toolOverrides;
}
// Attach all required plugins for functions to operate.
await this.#attachPlugins(args);
}
startAgentCluster() {
return this.aibitat.start({
from: USER_AGENT.name,
to: this.channel ?? WORKSPACE_AGENT.name,
content: this.#stripAgentCommand(this.#prompt),
attachments: this.#attachments,
});
}
/**
* Gets pending outputs registered by plugins (e.g., file downloads).
* These outputs include the proper type for re-rendering in chat history.
* @returns {Array<{type: string, payload: object}>}
*/
getPendingOutputs() {
return this.aibitat?._pendingOutputs ?? [];
}
/**
* Determine if the message should invoke the agent handler.
* This is true when the user explicitly invokes an agent (via @agent prefix)
* or when the workspace is in automatic mode **and** the provider supports native tool calling.
* @param {{message: string, workspace?: object, chatMode?: string}} parameters
* @returns {Promise<boolean>}
*/
static async isAgentInvocation({
message,
workspace = null,
chatMode = null,
}) {
if (this.#isAgentCommandInvocation({ message })) return true;
if (chatMode === "automatic") {
if (!workspace) return false;
if (await Workspace.supportsNativeToolCalling(workspace)) return true;
return false;
}
return false;
}
/**
* Determine if the message provided is an agent invocation.
* @param {{message:string}} parameters
* @returns {boolean}
*/
static #isAgentCommandInvocation({ message }) {
const agentHandles = WorkspaceAgentInvocation.parseAgents(message);
if (agentHandles.length > 0) return true;
return false;
}
}
const EventEmitter = require("node:events");
const { writeResponseChunk } = require("../helpers/chat/responses");
/**
* This is a special EventEmitter specifically used in the Aibitat agent handler
* that enables us to use HTTP to relay all .introspect and .send events back to an
* http handler instead of websockets, like we do on the frontend. This interface is meant to
* mock a websocket interface for the methods used and bind them to an HTTP method so that the developer
* API can invoke agent calls.
*/
class EphemeralEventListener extends EventEmitter {
messages = [];
constructor() {
super();
}
send(jsonData) {
const data = JSON.parse(jsonData);
this.messages.push(data);
this.emit("chunk", data);
}
close() {
this.emit("closed");
}
/**
* Compacts all messages in class and returns them in a condensed format.
* @returns {{thoughts: string[], textResponse: string}}
*/
packMessages() {
const thoughts = [];
let textResponse = null;
for (let msg of this.messages) {
if (msg.type !== "statusResponse") {
textResponse = msg.content;
} else {
thoughts.push(msg.content);
}
}
return { thoughts, textResponse };
}
/**
* Waits on the HTTP plugin to emit the 'closed' event from the agentHandler
* so that we can compact and return all the messages in the current queue.
* @returns {Promise<{thoughts: string[], textResponse: string}>}
*/
async waitForClose() {
return new Promise((resolve) => {
this.once("closed", () => resolve(this.packMessages()));
});
}
/**
* Streams the events with `writeResponseChunk` over HTTP chunked encoding
* and returns on the close event emission.
* ----------
* DevNote: Agents do not stream so in here we are simply
* emitting the thoughts and text response as soon as we get them.
* @param {import("express").Response} response
* @param {string} uuid - Unique identifier that is the same across chunks.
* @returns {Promise<{thoughts: string[], textResponse: string}>}
*/
async streamAgentEvents(response, uuid) {
const onChunkHandler = (data) => {
if (data.type === "statusResponse") {
return writeResponseChunk(response, {
id: uuid,
type: "agentThought",
thought: data.content,
sources: [],
attachments: [],
close: false,
error: null,
animate: true,
});
}
return writeResponseChunk(response, {
id: uuid,
type: "textResponse",
textResponse: data.content,
sources: [],
attachments: [],
close: true,
error: null,
animate: false,
});
};
this.on("chunk", onChunkHandler);
// Wait for close and after remove chunk listener
return this.waitForClose().then((closedResponse) => {
this.removeListener("chunk", onChunkHandler);
return closedResponse;
});
}
}
module.exports = { EphemeralAgentHandler, EphemeralEventListener };