diff --git a/packages/components/nodes/agentflow/Loop/Loop.ts b/packages/components/nodes/agentflow/Loop/Loop.ts index bc9d7b08df2..edf7f5e1d6b 100644 --- a/packages/components/nodes/agentflow/Loop/Loop.ts +++ b/packages/components/nodes/agentflow/Loop/Loop.ts @@ -1,4 +1,5 @@ import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface' +import { updateFlowState } from '../utils' class Loop_Agentflow implements INode { label: string @@ -19,7 +20,7 @@ class Loop_Agentflow implements INode { constructor() { this.label = 'Loop' this.name = 'loopAgentflow' - this.version = 1.0 + this.version = 1.1 this.type = 'Loop' this.category = 'Agent Flows' this.description = 'Loop back to a previous node' @@ -40,6 +41,40 @@ class Loop_Agentflow implements INode { name: 'maxLoopCount', type: 'number', default: 5 + }, + { + label: 'Fallback Message', + name: 'fallbackMessage', + type: 'string', + description: 'Message to display if the loop count is exceeded', + placeholder: 'Enter your fallback message here', + rows: 4, + acceptVariable: true, + optional: true + }, + { + label: 'Update Flow State', + name: 'loopUpdateState', + description: 'Update runtime state during the execution of the workflow', + type: 'array', + optional: true, + acceptVariable: true, + array: [ + { + label: 'Key', + name: 'key', + type: 'asyncOptions', + loadMethod: 'listRuntimeStateKeys', + freeSolo: true + }, + { + label: 'Value', + name: 'value', + type: 'string', + acceptVariable: true, + acceptNodeOutputAsVariable: true + } + ] } ] } @@ -58,12 +93,20 @@ class Loop_Agentflow implements INode { }) } return returnOptions + }, + async listRuntimeStateKeys(_: INodeData, options: ICommonObject): Promise { + const previousNodes = options.previousNodes as ICommonObject[] + const startAgentflowNode = previousNodes.find((node) => node.name === 'startAgentflow') + const state = startAgentflowNode?.inputs?.startState as ICommonObject[] + return state.map((item) => ({ label: item.key, name: item.key })) } } async run(nodeData: INodeData, _: string, options: ICommonObject): Promise { const loopBackToNode = nodeData.inputs?.loopBackToNode as string const _maxLoopCount = nodeData.inputs?.maxLoopCount as string + const fallbackMessage = nodeData.inputs?.fallbackMessage as string + const _loopUpdateState = nodeData.inputs?.loopUpdateState const state = options.agentflowRuntime?.state as ICommonObject @@ -75,16 +118,34 @@ class Loop_Agentflow implements INode { maxLoopCount: _maxLoopCount ? parseInt(_maxLoopCount) : 5 } + const finalOutput = 'Loop back to ' + `${loopBackToNodeLabel} (${loopBackToNodeId})` + + // Update flow state if needed + let newState = { ...state } + if (_loopUpdateState && Array.isArray(_loopUpdateState) && _loopUpdateState.length > 0) { + newState = updateFlowState(state, _loopUpdateState) + } + + // Process template variables in state + if (newState && Object.keys(newState).length > 0) { + for (const key in newState) { + if (newState[key].toString().includes('{{ output }}')) { + newState[key] = finalOutput + } + } + } + const returnOutput = { id: nodeData.id, name: this.name, input: data, output: { - content: 'Loop back to ' + `${loopBackToNodeLabel} (${loopBackToNodeId})`, + content: finalOutput, nodeID: loopBackToNodeId, - maxLoopCount: _maxLoopCount ? parseInt(_maxLoopCount) : 5 + maxLoopCount: _maxLoopCount ? parseInt(_maxLoopCount) : 5, + fallbackMessage }, - state + state: newState } return returnOutput diff --git a/packages/server/src/utils/buildAgentflow.ts b/packages/server/src/utils/buildAgentflow.ts index 632017842ba..e654596da2e 100644 --- a/packages/server/src/utils/buildAgentflow.ts +++ b/packages/server/src/utils/buildAgentflow.ts @@ -43,7 +43,8 @@ import { QUESTION_VAR_PREFIX, CURRENT_DATE_TIME_VAR_PREFIX, _removeCredentialId, - validateHistorySchema + validateHistorySchema, + LOOP_COUNT_VAR_PREFIX } from '.' import { ChatFlow } from '../database/entities/ChatFlow' import { Variable } from '../database/entities/Variable' @@ -84,6 +85,8 @@ interface IProcessNodeOutputsParams { waitingNodes: Map loopCounts: Map abortController?: AbortController + sseStreamer?: IServerSideEventStreamer + chatId: string } interface IAgentFlowRuntime { @@ -130,6 +133,7 @@ interface IExecuteNodeParams { parentExecutionId?: string isRecursive?: boolean iterationContext?: ICommonObject + loopCounts?: Map orgId: string workspaceId: string subscriptionId: string @@ -216,7 +220,8 @@ export const resolveVariables = async ( uploadedFilesContent: string, chatHistory: IMessage[], agentFlowExecutedData?: IAgentflowExecutedData[], - iterationContext?: ICommonObject + iterationContext?: ICommonObject, + loopCounts?: Map ): Promise => { let flowNodeData = cloneDeep(reactFlowNodeData) const types = 'inputs' @@ -283,6 +288,20 @@ export const resolveVariables = async ( resolvedValue = resolvedValue.replace(match, flowConfig?.runtimeChatHistoryLength ?? 0) } + if (variableFullPath === LOOP_COUNT_VAR_PREFIX) { + // Get the current loop count from the most recent loopAgentflow node execution + let currentLoopCount = 0 + if (loopCounts && agentFlowExecutedData) { + // Find the most recent loopAgentflow node execution to get its loop count + const loopNodes = [...agentFlowExecutedData].reverse().filter((data) => data.data?.name === 'loopAgentflow') + if (loopNodes.length > 0) { + const latestLoopNode = loopNodes[0] + currentLoopCount = loopCounts.get(latestLoopNode.nodeId) || 0 + } + } + resolvedValue = resolvedValue.replace(match, currentLoopCount.toString()) + } + if (variableFullPath === CURRENT_DATE_TIME_VAR_PREFIX) { resolvedValue = resolvedValue.replace(match, new Date().toISOString()) } @@ -605,7 +624,9 @@ async function processNodeOutputs({ edges, nodeExecutionQueue, waitingNodes, - loopCounts + loopCounts, + sseStreamer, + chatId }: IProcessNodeOutputsParams): Promise<{ humanInput?: IHumanInput }> { logger.debug(`\n🔄 Processing outputs from node: ${nodeId}`) @@ -686,6 +707,11 @@ async function processNodeOutputs({ } } else { logger.debug(` ⚠️ Maximum loop count (${maxLoop}) reached, stopping loop`) + const fallbackMessage = result.output.fallbackMessage || `Loop completed after reaching maximum iteration count of ${maxLoop}.` + if (sseStreamer) { + sseStreamer.streamTokenEvent(chatId, fallbackMessage) + } + result.output = { ...result.output, content: fallbackMessage } } } @@ -830,6 +856,7 @@ const executeNode = async ({ isInternal, isRecursive, iterationContext, + loopCounts, orgId, workspaceId, subscriptionId @@ -905,7 +932,8 @@ const executeNode = async ({ uploadedFilesContent, chatHistory, agentFlowExecutedData, - iterationContext + iterationContext, + loopCounts ) // Handle human input if present @@ -1691,6 +1719,7 @@ export const executeAgentFlow = async ({ analyticHandlers, isRecursive, iterationContext, + loopCounts, orgId, workspaceId, subscriptionId @@ -1758,7 +1787,8 @@ export const executeAgentFlow = async ({ nodeExecutionQueue, waitingNodes, loopCounts, - abortController + sseStreamer, + chatId }) // Update humanInput if it was changed diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index d99c0b546b7..c8d010c2cd1 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -69,6 +69,7 @@ export const QUESTION_VAR_PREFIX = 'question' export const FILE_ATTACHMENT_PREFIX = 'file_attachment' export const CHAT_HISTORY_VAR_PREFIX = 'chat_history' export const RUNTIME_MESSAGES_LENGTH_VAR_PREFIX = 'runtime_messages_length' +export const LOOP_COUNT_VAR_PREFIX = 'loop_count' export const CURRENT_DATE_TIME_VAR_PREFIX = 'current_date_time' export const REDACTED_CREDENTIAL_VALUE = '_FLOWISE_BLANK_07167752-1a71-43b1-bf8f-4f32252165db' diff --git a/packages/ui/src/ui-component/input/suggestionOption.js b/packages/ui/src/ui-component/input/suggestionOption.js index 587fa978831..694263ed506 100644 --- a/packages/ui/src/ui-component/input/suggestionOption.js +++ b/packages/ui/src/ui-component/input/suggestionOption.js @@ -71,6 +71,12 @@ export const suggestionOptions = ( description: 'Total messsages between LLM and Agent', category: 'Chat Context' }, + { + id: 'loop_count', + mentionLabel: 'loop_count', + description: 'Current loop count', + category: 'Chat Context' + }, { id: 'file_attachment', mentionLabel: 'file_attachment',