diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/ToolApprovalRequest/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/ToolApprovalRequest/index.jsx new file mode 100644 index 00000000..017646d2 --- /dev/null +++ b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/ToolApprovalRequest/index.jsx @@ -0,0 +1,243 @@ +import React, { useState, useEffect, useRef } from "react"; +import { CaretDown, Check, X, Hammer } from "@phosphor-icons/react"; +import AgentSkillWhitelist from "@/models/agentSkillWhitelist"; +import { useTranslation } from "react-i18next"; + +export default function ToolApprovalRequest({ + requestId, + skillName, + payload = {}, + description = null, + timeoutMs = null, + websocket, + onResponse, +}) { + const [isExpanded, setIsExpanded] = useState(false); + const [responded, setResponded] = useState(false); + const [approved, setApproved] = useState(null); + const [alwaysAllow, setAlwaysAllow] = useState(false); + const [timeRemaining, setTimeRemaining] = useState(timeoutMs); + const startTimeRef = useRef(null); + const hasPayload = payload && Object.keys(payload).length > 0; + + useEffect(() => { + if (!timeoutMs || responded) return; + if (startTimeRef.current === null) { + startTimeRef.current = Date.now(); + } + + const intervalId = setInterval(() => { + const elapsed = Date.now() - startTimeRef.current; + const remaining = Math.max(0, timeoutMs - elapsed); + setTimeRemaining(remaining); + + if (remaining <= 0) { + clearInterval(intervalId); + handleTimeout(); + } + }, 50); + + return () => clearInterval(intervalId); + }, [timeoutMs, responded]); + + function handleTimeout() { + if (responded) return; + setResponded(true); + setApproved(false); + onResponse?.(false); + } + + async function handleResponse(isApproved) { + if (responded) return; + + setResponded(true); + setApproved(isApproved); + + // If user approved and checked "Always allow", add to whitelist + if (isApproved && alwaysAllow) { + await AgentSkillWhitelist.addToWhitelist(skillName); + } + + if (websocket && websocket.readyState === WebSocket.OPEN) { + websocket.send( + JSON.stringify({ + type: "toolApprovalResponse", + requestId, + approved: isApproved, + }) + ); + } + + onResponse?.(isApproved); + } + + const progressPercent = timeoutMs ? (timeRemaining / timeoutMs) * 100 : 0; + + return ( +
+
+
+
+ +
+ {description && ( + + {description} + + )} + + handleResponse(true)} + onReject={() => handleResponse(false)} + /> + +
+ {timeoutMs && !responded && ( +
+
+
+ )} +
+
+
+
+ ); +} + +function ToolApprovalHeader({ + skillName, + hasPayload, + isExpanded, + setIsExpanded, +}) { + const { t } = useTranslation(); + return ( +
+
+ +
+ {t("chat_window.agent_invocation.model_wants_to_call")} + + {skillName} + +
+
+ {hasPayload && ( + + )} +
+ ); +} + +function ToolApprovalPayload({ payload, isExpanded }) { + const hasPayload = payload && Object.keys(payload).length > 0; + if (!hasPayload || !isExpanded) return null; + + function formatPayload(data) { + if (typeof data === "string") return data; + try { + return JSON.stringify(data, null, 2); + } catch { + return String(data); + } + } + + return ( +
+
+        {formatPayload(payload)}
+      
+
+ ); +} + +function ToolApprovalResponseOption({ + approved, + skillName, + alwaysAllow, + setAlwaysAllow, + onApprove, + onReject, +}) { + const { t } = useTranslation(); + if (approved !== null) return null; + + return ( +
+
+ + +
+ +
+ ); +} + +function ToolApprovalResponseMessage({ approved }) { + const { t } = useTranslation(); + + if (approved === null) return null; + if (approved === false) { + return ( +
+ + {t("chat_window.agent_invocation.tool_call_was_rejected")} +
+ ); + } + + return ( +
+ + {t("chat_window.agent_invocation.tool_call_was_approved")} +
+ ); +} diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/index.jsx index a8303bcb..aa615559 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/ChatHistory/index.jsx @@ -9,6 +9,7 @@ import { import HistoricalMessage from "./HistoricalMessage"; import PromptReply from "./PromptReply"; import StatusResponse from "./StatusResponse"; +import ToolApprovalRequest from "./ToolApprovalRequest"; import { useManageWorkspaceModal } from "../../../Modals/ManageWorkspace"; import ManageWorkspace from "../../../Modals/ManageWorkspace"; import { ArrowDown } from "@phosphor-icons/react"; @@ -29,6 +30,7 @@ export default forwardRef(function ( sendCommand, updateHistory, regenerateAssistantMessage, + websocket = null, }, ref ) { @@ -179,6 +181,7 @@ export default forwardRef(function ( regenerateAssistantMessage, saveEditedMessage, forkThread, + websocket, }), [ workspace, @@ -186,6 +189,7 @@ export default forwardRef(function ( regenerateAssistantMessage, saveEditedMessage, forkThread, + websocket, ] ); const lastMessageInfo = useMemo(() => getLastMessageInfo(history), [history]); @@ -262,6 +266,7 @@ const getLastMessageInfo = (history) => { * @param {Function} param0.regenerateAssistantMessage - The function to regenerate the assistant message. * @param {Function} param0.saveEditedMessage - The function to save the edited message. * @param {Function} param0.forkThread - The function to fork the thread. + * @param {WebSocket} param0.websocket - The active websocket connection for agent communication. * @returns {Array} The compiled history of messages. */ function buildMessages({ @@ -270,6 +275,7 @@ function buildMessages({ regenerateAssistantMessage, saveEditedMessage, forkThread, + websocket, }) { return history.reduce((acc, props, index) => { const isLastBotReply = @@ -284,6 +290,21 @@ function buildMessages({ return acc; } + if (props.type === "toolApprovalRequest") { + acc.push( + + ); + return acc; + } + if (props.type === "rechartVisualize" && !!props.content) { acc.push(); } else if (isLastBotReply && props.animate) { diff --git a/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx b/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx index be0becd8..d7b22cd7 100644 --- a/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx +++ b/frontend/src/components/WorkspaceChat/ChatContainer/index.jsx @@ -428,6 +428,7 @@ export default function ChatContainer({ workspace, knownHistory = [] }) { sendCommand={sendCommand} updateHistory={setChatHistory} regenerateAssistantMessage={regenerateAssistantMessage} + websocket={websocket} /> } + */ + addToWhitelist: async function (skillName) { + return fetch(`${API_BASE}/agent-skills/whitelist/add`, { + method: "POST", + headers: baseHeaders(), + body: JSON.stringify({ skillName }), + }) + .then((res) => res.json()) + .catch((e) => ({ success: false, error: e.message })); + }, +}; + +export default AgentSkillWhitelist; diff --git a/frontend/src/utils/chat/agent.js b/frontend/src/utils/chat/agent.js index 56bc33ae..858bd1f5 100644 --- a/frontend/src/utils/chat/agent.js +++ b/frontend/src/utils/chat/agent.js @@ -12,6 +12,7 @@ const handledEvents = [ "awaitingFeedback", "wssFailure", "rechartVisualize", + "toolApprovalRequest", // Streaming events "reportStreamEvent", ]; @@ -47,7 +48,12 @@ export default function handleSocketResponse(socket, event, setChatHistory) { }); } - if (!handledEvents.includes(data.type) || !data.content) return; + // toolApprovalRequest doesn't have content field, so check separately + if (data.type === "toolApprovalRequest") { + if (!data.requestId || !data.skillName) return; + } else if (!handledEvents.includes(data.type) || !data.content) { + return; + } if (data.type === "reportStreamEvent") { // Enable agent streaming for the next message so we can handle streaming or non-streaming responses @@ -220,6 +226,31 @@ export default function handleSocketResponse(socket, event, setChatHistory) { }); } + if (data.type === "toolApprovalRequest") { + return setChatHistory((prev) => { + return [ + ...prev.filter((msg) => !!msg.content), + { + uuid: v4(), + type: "toolApprovalRequest", + requestId: data.requestId, + skillName: data.skillName, + payload: data.payload, + description: data.description, + timeoutMs: data.timeoutMs, + content: `Approval requested for ${data.skillName}`, + role: "assistant", + sources: [], + closed: false, + error: null, + animate: false, + pending: true, + metrics: {}, + }, + ]; + }); + } + return setChatHistory((prev) => { return [ ...prev.filter((msg) => !!msg.content), diff --git a/server/endpoints/agentSkillWhitelist.js b/server/endpoints/agentSkillWhitelist.js new file mode 100644 index 00000000..12a414d6 --- /dev/null +++ b/server/endpoints/agentSkillWhitelist.js @@ -0,0 +1,46 @@ +const { AgentSkillWhitelist } = require("../models/agentSkillWhitelist"); +const { reqBody, userFromSession } = require("../utils/http"); +const { validatedRequest } = require("../utils/middleware/validatedRequest"); +const { + flexUserRoleValid, + ROLES, +} = require("../utils/middleware/multiUserProtected"); + +function agentSkillWhitelistEndpoints(app) { + if (!app) return; + + app.post( + "/agent-skills/whitelist/add", + [validatedRequest, flexUserRoleValid(ROLES.all)], + async (request, response) => { + try { + const { skillName } = reqBody(request); + if (!skillName) { + response + .status(400) + .json({ success: false, error: "Missing skillName" }); + return; + } + + const user = await userFromSession(request, response); + if (!user && response.locals?.multiUserMode) { + return response + .status(401) + .json({ success: false, error: "Unauthorized" }); + } + + const userId = user?.id || null; + const { success, error } = await AgentSkillWhitelist.add( + skillName, + userId + ); + return response.status(success ? 200 : 400).json({ success, error }); + } catch (e) { + console.error(e); + return response.status(500).json({ success: false, error: e.message }); + } + } + ); +} + +module.exports = { agentSkillWhitelistEndpoints }; diff --git a/server/endpoints/agentWebsocket.js b/server/endpoints/agentWebsocket.js index c5fc1475..e40f0b41 100644 --- a/server/endpoints/agentWebsocket.js +++ b/server/endpoints/agentWebsocket.js @@ -11,6 +11,7 @@ const { safeJsonParse } = require("../utils/http"); // Setup listener for incoming messages to relay to socket so it can be handled by agent plugin. function relayToSocket(message) { if (this.handleFeedback) return this?.handleFeedback?.(message); + if (this.handleToolApproval) return this?.handleToolApproval?.(message); this.checkBailCommand(message); } diff --git a/server/endpoints/system.js b/server/endpoints/system.js index cabe0975..02b23888 100644 --- a/server/endpoints/system.js +++ b/server/endpoints/system.js @@ -60,6 +60,7 @@ const { const { TemporaryAuthToken } = require("../models/temporaryAuthToken"); const { SystemPromptVariables } = require("../models/systemPromptVariables"); const { VALID_COMMANDS } = require("../utils/chats"); +const { AgentSkillWhitelist } = require("../models/agentSkillWhitelist"); function systemEndpoints(app) { if (!app) return; @@ -619,7 +620,7 @@ function systemEndpoints(app) { multi_user_mode: true, }); await BrowserExtensionApiKey.migrateApiKeysToMultiUser(user.id); - + await AgentSkillWhitelist.clearSingleUserWhitelist(); await updateENV( { JWTSecret: process.env.JWT_SECRET || v4(), diff --git a/server/index.js b/server/index.js index 422b0193..8924d1e6 100644 --- a/server/index.js +++ b/server/index.js @@ -23,6 +23,9 @@ const { bootHTTP, bootSSL } = require("./utils/boot"); const { workspaceThreadEndpoints } = require("./endpoints/workspaceThreads"); const { documentEndpoints } = require("./endpoints/document"); const { agentWebsocket } = require("./endpoints/agentWebsocket"); +const { + agentSkillWhitelistEndpoints, +} = require("./endpoints/agentSkillWhitelist"); const { experimentalEndpoints } = require("./endpoints/experimental"); const { browserExtensionEndpoints } = require("./endpoints/browserExtension"); const { communityHubEndpoints } = require("./endpoints/communityHub"); @@ -75,6 +78,7 @@ embedManagementEndpoints(apiRouter); utilEndpoints(apiRouter); documentEndpoints(apiRouter); agentWebsocket(apiRouter); +agentSkillWhitelistEndpoints(apiRouter); experimentalEndpoints(apiRouter); developerEndpoints(app, apiRouter); communityHubEndpoints(apiRouter); diff --git a/server/jobs/handle-telegram-chat.js b/server/jobs/handle-telegram-chat.js index b7f60ea8..beccb957 100644 --- a/server/jobs/handle-telegram-chat.js +++ b/server/jobs/handle-telegram-chat.js @@ -8,6 +8,9 @@ const { WorkspaceThread } = require("../models/workspaceThread"); const { streamResponse } = require("../utils/telegramBot/chat/stream"); process.on("message", async (payload) => { + // Ignore tool approval responses - these are handled by http-socket plugin + if (payload?.type === "toolApprovalResponse") return; + const { botToken, chatId, diff --git a/server/models/agentSkillWhitelist.js b/server/models/agentSkillWhitelist.js new file mode 100644 index 00000000..2c2c1587 --- /dev/null +++ b/server/models/agentSkillWhitelist.js @@ -0,0 +1,100 @@ +const prisma = require("../utils/prisma"); +const { safeJsonParse } = require("../utils/http"); + +const AgentSkillWhitelist = { + SINGLE_USER_LABEL: "whitelisted_agent_skills", + + /** + * Get the label for storing whitelist in system_settings + * @param {number|null} userId - User ID in multi-user mode, null for single-user + * @returns {string} + */ + _getLabel: function (userId = null) { + if (userId) return `user_${userId}_whitelisted_agent_skills`; + return this.SINGLE_USER_LABEL; + }, + + /** + * Get the whitelisted skills for a user or the system + * @param {number|null} userId - User ID in multi-user mode, null for single-user + * @returns {Promise} Array of whitelisted skill names + */ + get: async function (userId = null) { + try { + const label = this._getLabel(userId); + const setting = await prisma.system_settings.findFirst({ + where: { label }, + }); + return safeJsonParse(setting?.value, []); + } catch (error) { + console.error("AgentSkillWhitelist.get error:", error.message); + return []; + } + }, + + /** + * Add a skill to the whitelist + * @param {string} skillName - The skill name to whitelist + * @param {number|null} userId - User ID in multi-user mode, null for single-user + * @returns {Promise<{success: boolean, error: string|null}>} + */ + add: async function (skillName, userId = null) { + try { + if (!skillName || typeof skillName !== "string") { + return { success: false, error: "Invalid skill name" }; + } + + const label = this._getLabel(userId); + const currentList = await this.get(userId); + + if (currentList.includes(skillName)) { + return { success: true, error: null }; + } + + const newList = [...currentList, skillName]; + + await prisma.system_settings.upsert({ + where: { label }, + update: { value: JSON.stringify(newList) }, + create: { label, value: JSON.stringify(newList) }, + }); + + return { success: true, error: null }; + } catch (error) { + console.error("AgentSkillWhitelist.add error:", error.message); + return { success: false, error: error.message }; + } + }, + + /** + * Check if a skill is whitelisted + * @param {string} skillName - The skill name to check + * @param {number|null} userId - User ID in multi-user mode, null for single-user + * @returns {Promise} + */ + isWhitelisted: async function (skillName, userId = null) { + const whitelist = await this.get(userId); + return whitelist.includes(skillName); + }, + + /** + * Clear the single-user whitelist (used when switching to multi-user mode) + * @returns {Promise<{success: boolean, error: string|null}>} + */ + clearSingleUserWhitelist: async function () { + try { + await prisma.system_settings.deleteMany({ + where: { label: this.SINGLE_USER_LABEL }, + }); + return { success: true, error: null }; + } catch (error) { + console.error( + "AgentSkillWhitelist.clearSingleUserWhitelist error:", + error.message + ); + return { success: false, error: error.message }; + } + }, +}; + +module.exports = { AgentSkillWhitelist }; diff --git a/server/utils/agents/aibitat/plugins/http-socket.js b/server/utils/agents/aibitat/plugins/http-socket.js index bbfd1cbc..af443c58 100644 --- a/server/utils/agents/aibitat/plugins/http-socket.js +++ b/server/utils/agents/aibitat/plugins/http-socket.js @@ -1,10 +1,48 @@ const chalk = require("chalk"); const { Telemetry } = require("../../../../models/telemetry"); +const { v4: uuidv4 } = require("uuid"); +const TOOL_APPROVAL_TIMEOUT_MS = 120 * 1_000; // 2 mins for tool approval + +/** + * Get the IPC channel for worker communication. + * Bree workers use worker_threads internally but polyfill process.on("message") for receiving. + * Workers send via parentPort.postMessage(), receive via process.on("message"). + * @returns {{ send: Function, on: Function, removeListener: Function } | null} + */ +function getWorkerIPC() { + try { + const { parentPort } = require("node:worker_threads"); + if (parentPort) { + // Bree worker context: send via parentPort, receive via process (Bree polyfill) + return { + send: (msg) => parentPort.postMessage(msg), + on: (event, handler) => process.on(event, handler), + removeListener: (event, handler) => + process.removeListener(event, handler), + }; + } + } catch {} + + // Fallback for child_process workers + if (typeof process.send === "function") { + return { + send: (msg) => process.send(msg), + on: (event, handler) => process.on(event, handler), + removeListener: (event, handler) => + process.removeListener(event, handler), + }; + } + + return null; +} /** * HTTP Interface plugin for Aibitat to emulate a websocket interface in the agent * framework so we dont have to modify the interface for passing messages and responses * in REST or WSS. + * + * When telegramChatId is provided, enables tool approval via Telegram inline keyboards + * using IPC messages to communicate with the parent TelegramBotService process. */ const httpSocket = { name: "httpSocket", @@ -21,12 +59,17 @@ const httpSocket = { required: false, default: true, }, + telegramChatId: { + required: false, + default: null, + }, }, }, plugin: function ({ handler, muteUserReply = true, // Do not post messages to "USER" back to frontend. introspection = false, // when enabled will attach socket to Aibitat object with .introspect method which reports status updates to frontend. + telegramChatId = null, // When set, enables tool approval via Telegram IPC }) { return { name: this.name, @@ -59,6 +102,125 @@ const httpSocket = { }, }; + /** + * Request user approval before executing a tool/skill. + * Only available when running in Telegram context (telegramChatId is set). + * Sends IPC message to parent process which shows Telegram inline keyboard. + * + * @param {Object} options - The approval request options + * @param {string} options.skillName - The name of the skill/tool requesting approval + * @param {Object} [options.payload={}] - Optional payload data to display to the user + * @param {string} [options.description] - Optional description of what the skill will do + * @returns {Promise<{approved: boolean, message: string}>} - The approval result + */ + aibitat.requestToolApproval = async function ({ + skillName, + payload = {}, + description = null, + }) { + // Check whitelist first + const { + AgentSkillWhitelist, + } = require("../../../../models/agentSkillWhitelist"); + const isWhitelisted = await AgentSkillWhitelist.isWhitelisted( + skillName, + null + ); + if (isWhitelisted) { + console.log( + chalk.green(`Skill ${skillName} is whitelisted - auto-approved.`) + ); + return { + approved: true, + message: "Skill is whitelisted - auto-approved.", + }; + } + + // Tool approval only available in Telegram worker context + const ipc = getWorkerIPC(); + if (!telegramChatId || !ipc) { + console.log( + chalk.yellow( + `Tool approval requested for ${skillName} but no Telegram context available. Auto-denying for safety.` + ) + ); + return { + approved: false, + message: + "Tool approval is not available in this context. Operation denied.", + }; + } + + const requestId = uuidv4(); + console.log( + chalk.blue( + `Requesting tool approval for ${skillName} (${requestId})` + ) + ); + + // Send introspection message before the approval UI appears + aibitat.introspect( + `Requesting approval to execute: ${skillName}${description ? ` - ${description}` : ""}` + ); + + return new Promise((resolve) => { + let timeoutId = null; + + const messageHandler = (msg) => { + if (msg?.type !== "toolApprovalResponse") return; + if (msg?.requestId !== requestId) return; + + ipc.removeListener("message", messageHandler); + clearTimeout(timeoutId); + + if (msg.approved) { + console.log( + chalk.green(`Tool ${skillName} approved by user via Telegram`) + ); + return resolve({ + approved: true, + message: "User approved the tool execution.", + }); + } + + console.log( + chalk.yellow(`Tool ${skillName} denied by user via Telegram`) + ); + return resolve({ + approved: false, + message: "Tool call was rejected by the user.", + }); + }; + + ipc.on("message", messageHandler); + + // Send approval request to parent TelegramBotService process + ipc.send({ + type: "toolApprovalRequest", + requestId, + chatId: telegramChatId, + skillName, + payload, + description, + timeoutMs: TOOL_APPROVAL_TIMEOUT_MS, + }); + + timeoutId = setTimeout(() => { + ipc.removeListener("message", messageHandler); + console.log( + chalk.yellow( + `Tool approval request timed out after ${TOOL_APPROVAL_TIMEOUT_MS}ms` + ) + ); + resolve({ + approved: false, + message: + "Tool approval request timed out. User did not respond in time.", + }); + }, TOOL_APPROVAL_TIMEOUT_MS); + }); + }; + // We can only receive one message response with HTTP // so we end on first response. aibitat.onMessage((message) => { diff --git a/server/utils/agents/aibitat/plugins/websocket.js b/server/utils/agents/aibitat/plugins/websocket.js index 1cdd07db..b64407fd 100644 --- a/server/utils/agents/aibitat/plugins/websocket.js +++ b/server/utils/agents/aibitat/plugins/websocket.js @@ -1,6 +1,9 @@ const chalk = require("chalk"); const { Telemetry } = require("../../../../models/telemetry"); +const { v4: uuidv4 } = require("uuid"); +const { safeJsonParse } = require("../../../http"); const SOCKET_TIMEOUT_MS = 300 * 1_000; // 5 mins +const TOOL_APPROVAL_TIMEOUT_MS = 120 * 1_000; // 2 mins for tool approval /** * Websocket Interface plugin. It prints the messages on the console and asks for feedback @@ -11,6 +14,7 @@ const SOCKET_TIMEOUT_MS = 300 * 1_000; // 5 mins // askForFeedback?: any // awaitResponse?: any // handleFeedback?: (message: string) => void; +// handleToolApproval?: (message: string) => void; // } const WEBSOCKET_BAIL_COMMANDS = [ @@ -43,6 +47,7 @@ const websocket = { socket, // @type AIbitatWebSocket muteUserReply = true, // Do not post messages to "USER" back to frontend. introspection = false, // when enabled will attach socket to Aibitat object with .introspect method which reports status updates to frontend. + userId = null, // User ID for multi-user mode whitelist lookups }) { return { name: this.name, @@ -79,6 +84,102 @@ const websocket = { }, }; + /** + * Request user approval before executing a tool/skill. + * This sends a request to the frontend and blocks until the user responds. + * If the skill is whitelisted, approval is granted automatically. + * + * @param {Object} options - The approval request options + * @param {string} options.skillName - The name of the skill/tool requesting approval + * @param {Object} [options.payload={}] - Optional payload data to display to the user + * @param {string} [options.description] - Optional description of what the skill will do + * @returns {Promise<{approved: boolean, message: string}>} - The approval result + */ + aibitat.requestToolApproval = async function ({ + skillName, + payload = {}, + description = null, + }) { + const { + AgentSkillWhitelist, + } = require("../../../../models/agentSkillWhitelist"); + const isWhitelisted = await AgentSkillWhitelist.isWhitelisted( + skillName, + userId + ); + if (isWhitelisted) { + console.log( + chalk.green( + userId + ? `User ${userId} - ` + : "" + `Skill ${skillName} is whitelisted - auto-approved.` + ) + ); + return { + approved: true, + message: "Skill is whitelisted - auto-approved.", + }; + } + + const requestId = uuidv4(); + return new Promise((resolve) => { + let timeoutId = null; + + socket.handleToolApproval = (message) => { + try { + const data = safeJsonParse(message, {}); + if ( + data?.type !== "toolApprovalResponse" || + data?.requestId !== requestId + ) + return; + + delete socket.handleToolApproval; + clearTimeout(timeoutId); + + if (data.approved) { + return resolve({ + approved: true, + message: "User approved the tool execution.", + }); + } + + return resolve({ + approved: false, + message: "Tool call was rejected by the user.", + }); + } catch (e) { + console.error("Error handling tool approval response:", e); + } + }; + + socket.send( + JSON.stringify({ + type: "toolApprovalRequest", + requestId, + skillName, + payload, + description, + timeoutMs: TOOL_APPROVAL_TIMEOUT_MS, + }) + ); + + timeoutId = setTimeout(() => { + delete socket.handleToolApproval; + console.log( + chalk.yellow( + `Tool approval request timed out after ${TOOL_APPROVAL_TIMEOUT_MS}ms` + ) + ); + resolve({ + approved: false, + message: + "Tool approval request timed out. User did not respond in time.", + }); + }, TOOL_APPROVAL_TIMEOUT_MS); + }); + }; + // aibitat.onStart(() => { // console.log("🚀 starting chat ..."); // }); diff --git a/server/utils/agents/ephemeral.js b/server/utils/agents/ephemeral.js index cc826290..dd0626bf 100644 --- a/server/utils/agents/ephemeral.js +++ b/server/utils/agents/ephemeral.js @@ -436,6 +436,7 @@ class EphemeralAgentHandler extends AgentHandler { async createAIbitat( args = { handler: null, + telegramChatId: null, } ) { this.aibitat = new AIbitat({ @@ -456,12 +457,14 @@ class EphemeralAgentHandler extends AgentHandler { this.aibitat.fetchParsedFileContext = () => this.#fetchParsedFileContext(); // Attach HTTP response object if defined for chunk streaming. + // When telegramChatId is provided, tool approval via Telegram is enabled. this.log(`Attached ${httpSocket.name} plugin to Agent cluster`); this.aibitat.use( httpSocket.plugin({ handler: args.handler, muteUserReply: true, introspection: true, + telegramChatId: args.telegramChatId, }) ); diff --git a/server/utils/agents/index.js b/server/utils/agents/index.js index 7c544855..8e64d02f 100644 --- a/server/utils/agents/index.js +++ b/server/utils/agents/index.js @@ -726,6 +726,7 @@ class AgentHandler { socket: args.socket, muteUserReply: true, introspection: true, + userId: this.invocation.user_id || null, }) ); diff --git a/server/utils/telegramBot/chat/agent.js b/server/utils/telegramBot/chat/agent.js index a525df05..550b61cf 100644 --- a/server/utils/telegramBot/chat/agent.js +++ b/server/utils/telegramBot/chat/agent.js @@ -232,7 +232,7 @@ async function handleAgentResponse( threadId: thread?.id || null, attachments, }).init(); - await agentHandler.createAIbitat({ handler }); + await agentHandler.createAIbitat({ handler, telegramChatId: chatId }); // httpSocket terminates after the first agent message, but cap rounds // as a safety net so the agent can't loop indefinitely. diff --git a/server/utils/telegramBot/index.js b/server/utils/telegramBot/index.js index 75620174..7ce76065 100644 --- a/server/utils/telegramBot/index.js +++ b/server/utils/telegramBot/index.js @@ -38,6 +38,8 @@ class TelegramBotService { #pendingPairings = new Map(); // Active workers per chat: chatId -> { worker, jobId } #activeWorkers = new Map(); + // Pending tool approval requests: requestId -> { worker, chatId, messageId } + #pendingToolApprovals = new Map(); constructor() { if (TelegramBotService._instance) return TelegramBotService._instance; @@ -153,6 +155,7 @@ class TelegramBotService { this.#chatState.clear(); this.#pendingPairings.clear(); this.#activeWorkers.clear(); + this.#pendingToolApprovals.clear(); this.#log("Stopped"); } @@ -349,9 +352,12 @@ class TelegramBotService { guard(msg, () => handler(ctx, msg.chat.id, msg.text)); }); - // Register callback queries, used for workspace/thread selection interactive menus + // Register callback queries, used for workspace/thread selection, tool approval, etc. this.#bot.on("callback_query", (query) => - handleKeyboardQueryCallback(ctx, query) + handleKeyboardQueryCallback(ctx, query, { + pendingToolApprovals: this.#pendingToolApprovals, + log: this.#log.bind(this), + }) ); this.#bot.on("message", (msg) => { @@ -396,6 +402,9 @@ class TelegramBotService { if (worker) { worker.on("message", (msg) => { if (msg?.type === "closeInvocation") invocationUuid = msg.uuid; + if (msg?.type === "toolApprovalRequest") { + this.#handleToolApprovalRequest(worker, msg); + } }); this.#activeWorkers.set(chatId, { worker, jobId, bgService }); } @@ -467,6 +476,91 @@ class TelegramBotService { return true; } + /** + * Handle a tool approval request from a worker process. + * Sends a Telegram message with Approve/Deny inline keyboard buttons. + * @param {Worker} worker - The worker process requesting approval + * @param {Object} msg - The tool approval request message + */ + async #handleToolApprovalRequest(worker, msg) { + const { requestId, chatId, skillName, payload, description, timeoutMs } = + msg; + + this.#log( + `Tool approval request received: ${skillName} (requestId: ${requestId})` + ); + + try { + const payloadText = + payload && Object.keys(payload).length > 0 + ? `\n\nParameters:\n${JSON.stringify(payload, null, 2)}` + : ""; + + const descText = description ? `\n${description}` : ""; + + const messageText = + `🔧 Tool Approval Required\n\n` + + `The agent wants to execute: ${skillName}${descText}${payloadText}\n\n` + + `Do you want to allow this action?`; + + const keyboard = { + inline_keyboard: [ + [ + { + text: "✅ Approve", + callback_data: `tool:approve:${requestId}`, + }, + { text: "❌ Deny", callback_data: `tool:deny:${requestId}` }, + ], + ], + }; + + const sent = await this.#bot.sendMessage(chatId, messageText, { + parse_mode: "HTML", + reply_markup: keyboard, + }); + + this.#pendingToolApprovals.set(requestId, { + worker, + chatId, + messageId: sent.message_id, + skillName, + }); + + // Auto-cleanup if timeout expires (worker will also timeout) + setTimeout(() => { + if (this.#pendingToolApprovals.has(requestId)) { + this.#pendingToolApprovals.delete(requestId); + this.#bot + .editMessageText( + `⏱️ Tool approval for ${skillName} timed out.`, + { + chat_id: chatId, + message_id: sent.message_id, + parse_mode: "HTML", + } + ) + .catch(() => {}); + } + }, timeoutMs + 1000); + } catch (error) { + this.#log("Failed to send tool approval request:", error.message); + // Send denial back to worker if we can't show the UI + try { + const response = { + type: "toolApprovalResponse", + requestId, + approved: false, + }; + if (worker && typeof worker.send === "function") { + worker.send(response); + } else if (worker && typeof worker.postMessage === "function") { + worker.postMessage(response); + } + } catch {} + } + } + #shouldVoiceRespond(isVoiceMessage) { if (!this.#config) return false; const mode = this.#config.voice_response_mode || "text_only"; diff --git a/server/utils/telegramBot/utils/navigation/callbacks/handleToolApproval.js b/server/utils/telegramBot/utils/navigation/callbacks/handleToolApproval.js new file mode 100644 index 00000000..c83e282e --- /dev/null +++ b/server/utils/telegramBot/utils/navigation/callbacks/handleToolApproval.js @@ -0,0 +1,104 @@ +/** + * Handle tool approval callback from an inline keyboard button. + * This handler requires access to the pending tool approvals map from TelegramBotService. + * + * @param {object} params + * @param {import("../../commands/index").BotContext} params.ctx + * @param {number} params.chatId + * @param {{id: string}} params.query + * @param {string} params.data + * @param {Map} params.pendingToolApprovals - Map of pending tool approval requests + * @param {Function} [params.log] - Logging function (falls back to ctx.log) + */ +async function handleToolApproval({ + ctx, + chatId, + query, + data, + pendingToolApprovals, + log, +} = {}) { + const _log = log || ctx.log; + _log(`Tool approval callback received: ${data}`); + + try { + const parts = data.split(":"); + if (parts.length !== 3) { + _log(`Invalid callback data format: ${data}`); + await ctx.bot.answerCallbackQuery(query.id, { + text: "Invalid request format.", + }); + return; + } + + const action = parts[1]; // "approve" or "deny" + const requestId = parts[2]; + + const pending = pendingToolApprovals.get(requestId); + if (!pending) { + _log(`No pending approval found for requestId: ${requestId}`); + await ctx.bot.answerCallbackQuery(query.id, { + text: "This approval request has expired.", + }); + return; + } + + const { worker, messageId, skillName } = pending; + const approved = action === "approve"; + + _log(`Processing ${approved ? "approval" : "denial"} for ${skillName}`); + + // Send response back to worker (Bree workers use send(), raw worker_threads use postMessage()) + try { + const response = { + type: "toolApprovalResponse", + requestId, + approved, + }; + + if (worker && typeof worker.send === "function") { + worker.send(response); + _log( + `Sent tool approval response to worker via send(): ${approved ? "approved" : "denied"}` + ); + } else if (worker && typeof worker.postMessage === "function") { + worker.postMessage(response); + _log( + `Sent tool approval response to worker via postMessage(): ${approved ? "approved" : "denied"}` + ); + } else { + _log( + `Worker not available to send approval response (send: ${typeof worker?.send}, postMessage: ${typeof worker?.postMessage})` + ); + } + } catch (err) { + _log(`Failed to send approval response: ${err.message}`); + } + + pendingToolApprovals.delete(requestId); + + // Update the message to show the result + const resultText = approved + ? `✅ ${skillName} was approved.` + : `❌ ${skillName} was denied.`; + + await ctx.bot + .editMessageText(resultText, { + chat_id: chatId, + message_id: messageId, + parse_mode: "HTML", + }) + .catch((err) => _log(`Failed to edit message: ${err.message}`)); + + await ctx.bot.answerCallbackQuery(query.id, { + text: approved ? "Approved!" : "Denied.", + }); + } catch (error) { + _log(`Error handling tool approval callback: ${error.message}`); + await ctx.bot.answerCallbackQuery(query.id, { + text: "Something went wrong.", + }); + } +} + +module.exports = { handleToolApproval }; diff --git a/server/utils/telegramBot/utils/navigation/callbacks/index.js b/server/utils/telegramBot/utils/navigation/callbacks/index.js index d344a3c4..03ed6890 100644 --- a/server/utils/telegramBot/utils/navigation/callbacks/index.js +++ b/server/utils/telegramBot/utils/navigation/callbacks/index.js @@ -10,6 +10,7 @@ const { handleModelSelect } = require("./handleModelSelect"); const { handleSourceSelect } = require("./handleSourceSelect"); const { handleSourcePagination } = require("./handleSourcePagination"); const { handleBackSources } = require("./handleBackSources"); +const { handleToolApproval } = require("./handleToolApproval"); const ExactCallbackHandlers = { "ws-create": handleWorkspaceCreate, @@ -20,6 +21,7 @@ const ExactCallbackHandlers = { }; const PrefixCallbackHandlers = [ + { prefix: "tool:", handler: handleToolApproval }, { prefix: "wspg:", handler: handleWorkspacePagination }, { prefix: "ws:", handler: handleWorkspaceSelect }, { prefix: "thpg:", handler: handleThreadPagination }, diff --git a/server/utils/telegramBot/utils/navigation/index.js b/server/utils/telegramBot/utils/navigation/index.js index b1b37209..47ad0e86 100644 --- a/server/utils/telegramBot/utils/navigation/index.js +++ b/server/utils/telegramBot/utils/navigation/index.js @@ -2,11 +2,12 @@ const { isVerified } = require("../verification"); const { resolveCallbackHandler } = require("./callbacks"); /** - * Handle inline keyboard callback queries (workspace/thread selection). + * Handle inline keyboard callback queries (workspace/thread selection, tool approval, etc). * @param {BotContext} ctx * @param {object} query - Telegram callback query object + * @param {object} [options={}] - Optional dependencies that specific handlers may need */ -async function handleKeyboardQueryCallback(ctx, query) { +async function handleKeyboardQueryCallback(ctx, query, options = {}) { const chatId = query.message.chat.id; const messageId = query.message.message_id; const data = query.data; @@ -21,7 +22,7 @@ async function handleKeyboardQueryCallback(ctx, query) { try { const handler = resolveCallbackHandler(data); if (!handler) throw new Error(`Callback handler not found: ${data}`); - await handler({ ctx, chatId, query, messageId, data }); + await handler({ ctx, chatId, query, messageId, data, ...options }); } catch (error) { ctx.log("Callback error:", error.message); await ctx.bot.answerCallbackQuery(query.id, {