fix: Patch agent flow to honor agent handler established provider (#3251)

resolves #3248
This commit is contained in:
Timothy Carambat 2025-02-17 14:44:23 -08:00 committed by GitHub
parent ca60ba827b
commit c6ff3a7765
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 48 additions and 54 deletions

View File

@ -10,12 +10,14 @@ const { Telemetry } = require("../../models/telemetry");
class FlowExecutor {
constructor() {
this.variables = {};
this.introspect = () => {}; // Default no-op introspect
this.logger = console.info; // Default console.info
this.introspect = (...args) => console.log("[introspect] ", ...args);
this.logger = console.info;
this.aibitat = null;
}
attachLogging(introspectFn, loggerFn) {
this.introspect = introspectFn || (() => {});
attachLogging(introspectFn = null, loggerFn = null) {
this.introspect =
introspectFn || ((...args) => console.log("[introspect] ", ...args));
this.logger = loggerFn || console.info;
}
@ -54,8 +56,7 @@ class FlowExecutor {
introspect: this.introspect,
variables: this.variables,
logger: this.logger,
model: process.env.LLM_PROVIDER_MODEL || "gpt-4",
provider: process.env.LLM_PROVIDER || "openai",
aibitat: this.aibitat,
};
switch (step.type) {
@ -101,13 +102,13 @@ class FlowExecutor {
return result;
}
// Execute entire flow
async executeFlow(
flow,
initialVariables = {},
introspectFn = null,
loggerFn = null
) {
/**
* Execute entire flow
* @param {Object} flow - The flow to execute
* @param {Object} initialVariables - Initial variables for the flow
* @param {Object} aibitat - The aibitat instance from the agent handler
*/
async executeFlow(flow, initialVariables = {}, aibitat) {
await Telemetry.sendTelemetry("agent_flow_execution_started");
// Initialize variables with both initial values and any passed-in values
@ -119,7 +120,8 @@ class FlowExecutor {
...initialVariables, // This will override any default values with passed-in values
};
this.attachLogging(introspectFn, loggerFn);
this.aibitat = aibitat;
this.attachLogging(aibitat?.introspect, aibitat?.handlerProps?.log);
const results = [];
for (const step of flow.config.steps) {

View File

@ -8,8 +8,8 @@ const { safeJsonParse } = require("../../http");
*/
async function executeApiCall(config, context) {
const { url, method, headers = [], body, bodyType, formData } = config;
const { introspect } = context;
const { introspect, logger } = context;
logger(`\x1b[43m[AgentFlowToolExecutor]\x1b[0m - executing API Call block`);
introspect(`Making ${method} request to external API...`);
const requestConfig = {

View File

@ -8,15 +8,21 @@ const AIbitat = require("../../agents/aibitat");
*/
async function executeLLMInstruction(config, context) {
const { instruction, inputVariable, resultVariable } = config;
const { introspect, variables, logger } = context;
const { introspect, variables, logger, aibitat } = context;
logger(
`\x1b[43m[AgentFlowToolExecutor]\x1b[0m - executing LLM Instruction block`
);
introspect(`Processing data with LLM instruction...`);
if (!variables[inputVariable]) {
logger(`Input variable ${inputVariable} not found`);
throw new Error(`Input variable ${inputVariable} not found`);
}
try {
logger(
`Sending request to LLM (${aibitat.defaultProvider.provider}::${aibitat.defaultProvider.model})`
);
introspect(`Sending request to LLM...`);
// Ensure the input is a string since we are sending it to the LLM direct as a message
@ -24,7 +30,6 @@ async function executeLLMInstruction(config, context) {
if (typeof input === "object") input = JSON.stringify(input);
if (typeof input !== "string") input = String(input);
const aibitat = new AIbitat();
const provider = aibitat.getProviderForConfig(aibitat.defaultProvider);
const completion = await provider.complete([
{

View File

@ -11,13 +11,15 @@ const { summarizeContent } = require("../../agents/aibitat/utils/summarize");
*/
async function executeWebScraping(config, context) {
const { url, captureAs = "text" } = config;
const { introspect, model, provider } = context;
const { introspect, logger, aibitat } = context;
logger(
`\x1b[43m[AgentFlowToolExecutor]\x1b[0m - executing Web Scraping block`
);
if (!url) {
throw new Error("URL is required for web scraping");
}
// Remap the captureAs to the correct mode for the CollectorApi
const captureMode = captureAs === "querySelector" ? "html" : captureAs;
introspect(`Scraping the content of ${url} as ${captureAs}`);
const { success, content } = await new CollectorApi()
@ -33,29 +35,30 @@ async function executeWebScraping(config, context) {
}
introspect(`Successfully scraped content from ${url}`);
if (!content || content?.length === 0) {
introspect("There was no content to be collected or read.");
throw new Error("There was no content to be collected or read.");
}
const tokenCount = new TokenManager(model).countFromString(content);
const contextLimit = Provider.contextLimit(provider, model);
if (tokenCount < contextLimit) {
return content;
}
const tokenCount = new TokenManager(
aibitat.defaultProvider.model
).countFromString(content);
const contextLimit = Provider.contextLimit(
aibitat.defaultProvider.provider,
aibitat.defaultProvider.model
);
if (tokenCount < contextLimit) return content;
introspect(
`This page's content is way too long. I will summarize it right now.`
`This page's content is way too long (${tokenCount} tokens). I will summarize it right now.`
);
const summary = await summarizeContent({
provider,
model,
provider: aibitat.defaultProvider.provider,
model: aibitat.defaultProvider.model,
content,
});
introspect(`Successfully summarized content`);
return summary;
}

View File

@ -141,25 +141,14 @@ class AgentFlows {
* Execute a flow by UUID
* @param {string} uuid - The UUID of the flow to execute
* @param {Object} variables - Initial variables for the flow
* @param {Function} introspectFn - Function to introspect the flow
* @param {Function} loggerFn - Function to log the flow
* @param {Object} aibitat - The aibitat instance from the agent handler
* @returns {Promise<Object>} Result of flow execution
*/
static async executeFlow(
uuid,
variables = {},
introspectFn = null,
loggerFn = null
) {
static async executeFlow(uuid, variables = {}, aibitat = null) {
const flow = AgentFlows.loadFlow(uuid);
if (!flow) throw new Error(`Flow ${uuid} not found`);
const flowExecutor = new FlowExecutor();
return await flowExecutor.executeFlow(
flow,
variables,
introspectFn,
loggerFn
);
return await flowExecutor.executeFlow(flow, variables, aibitat);
}
/**
@ -210,12 +199,7 @@ class AgentFlows {
},
handler: async (args) => {
aibitat.introspect(`Executing flow: ${flow.name}`);
const result = await AgentFlows.executeFlow(
uuid,
args,
aibitat.introspect,
aibitat.handlerProps.log
);
const result = await AgentFlows.executeFlow(uuid, args, aibitat);
if (!result.success) {
aibitat.introspect(
`Flow failed: ${result.results[0]?.error || "Unknown error"}`

View File

@ -798,7 +798,7 @@ ${this.getHistory({ to: route.to })
default:
throw new Error(
`Unknown provider: ${config.provider}. Please use "openai"`
`Unknown provider: ${config.provider}. Please use a valid provider.`
);
}
}

View File

@ -390,7 +390,7 @@ class AgentHandler {
// Load flow plugin. This is marked by `@@flow_` in the array of functions to load.
if (name.startsWith("@@flow_")) {
const uuid = name.replace("@@flow_", "");
const plugin = AgentFlows.loadFlowPlugin(uuid);
const plugin = AgentFlows.loadFlowPlugin(uuid, this.aibitat);
if (!plugin) {
this.log(
`Flow ${uuid} not found in flows directory. Skipping inclusion to agent cluster.`