-
Notifications
You must be signed in to change notification settings - Fork 6.5k
Enable True Graph-Based Execution flow Pattern in AgentChat #4623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
We discussed this in the Teams chat. I think we need something like this -- it's a great idea. The exact form is TBD, but there are some strong contenders.... |
This kind of approach would greatly benefit a project we're working on the SAINTES team. We're building a system to reason about source code and as part of that an orchestrator creates a plan with many steps. Each step requires running many tools on a repository, some of which are quite verbose. As an example, think about the results of running "find . -type file | xargs grep MessageHandler" in a large repository. This results in some very large messages and these messages are then included permanently in the chat and thus seen over and over even when they are no longer needed. It's not uncommon to have a typical run use more than a million tokens. These large messages are valuable only locally and temporarily. Once they have been used by an agent to reason on or inform the next step in the plan, they can be tossed away. In reality, for each step in the execution plan, we really want to be able pass in context, maybe a set of facts, assumptions, etc. along with a task for the step and ask for information. Then we want a team of agents to be able to use their tools, reasoning, etc. to gather and/or synthesize the asked for information and just return that to the "main" conversation as execution of the high level task progresses. I had a conversation with Victor about this and he directed me to this issue so I thought I'd chime in since I agree it's a great idea. I have to think this is somewhat domain agnostic. The idea of using a plan to decompose a task into steps, substeps, etc. and then having each step need just an input and output with the inner workings hidden or tossed after computation is completed seems like it would be generally useful. |
I ended up using LangGraph for flow control and HIL with AutoGen for agent orchestration. Which seems like the goal of this thread. |
@AricRecker , |
@victordibia I figured there was likely a way to do it natively. A trivial implementation would be helpful, especially in the studio. Ideally, with flow control for teams in various ways, such as Cross-Functional Collaboration, Collaborative Problem-Solving, and Competitive Collaboration. |
Yes. Once we can settle on a good api, it would be really valuable to easily construct these flows in a GUI and run them.
Can you give a concrete example of the above (when you have some time). It would be helpful in thinking about the kinds of usecases to test against. |
@victordibia Sure - this would help solve complex multi-dimensional problems. For instance, the type of team collaboration will result in multiple outputs that address distinct aspects of the problem (Cross-Functional Collaboration), iteratively refine solutions through shared insights (Collaborative Problem-Solving), and enhance overall system resilience and innovation through adversarial refinement (Competitive Collaboration). Example: |
I also think this is a good idea. An example would be an article writer agentic worfklow where I have a sequential chat made of two two-agent chats, one verifies the DoR - we have all which is needed to write the article, and the second writes it, and the second workflow has a nested chat on the Critic agent which has a team of article reviewers. This could be definitely reused on many cases. Other example would be a web researcher workflow... |
thinking about enterprise wide solution, how all agents execution state is kept? |
Agents have a load and save state method. There has also been discussion around natively persisting execution state. Contributions welcome! |
I have been trying to use autogen swarm to achieve this. Swarm does the same if we define handsoff in proper way. |
Let's discuss in the issue you posted. #5563 |
FWIW I think the I agree with the above that handoffs are not quite up to the task, especially when paired WITH other tool calls. In order to achieve a simple tree I created a group of agents like mentioned above, and then a custom I also wonder if the The magic of the system is that it's declarative. Declarative systems by definition are verbose, and something at this level will probably be programmed by machines anyway. Another completely different idea. We allow for easier mixing of AgentChat and agent_core via the runtimes. Right now the runtime is hardcoded into the chat, but that doesn't have to be true. We could expose the runtime, allow external agents to join the system, and then create easy ways to send messages. That way you maintain the relative simplicity of the core agentchat, and simultaneously allow more advanced use-cases. Actually the more I think about it, the more I like this idea 😆. This is very similar in spirit to how something like network proxies handle extensions, they add network calls to perform logic on the data path. |
This is great! See this issue. I think we should prioritize this for the next milestone: #5787 |
This would be very useful feature for us - an ability to serialize/deserialize such DAGs is a must have for us to adopt Autogen. We were planning on building this ourselves till I came across this thread. Happy to contribute in anyway to get this done in the next milestone. |
@abhinav-aegis Would be interesting to see what your team comes up with. Though it may take a while for us to merge a new design like this, we can always have separate community packages for new ideas. |
@ekzhu – Happy to help. Is there a good way to communicate so that I don't spam this issues space? We want to do something very low touch so that you don't really have to modify any part of your code and we can satisfy most of the practical Dag type use cases. See below what we can do: Minimal Touch Points: Two Derived ClassesDAG execution can be achieved without modifying AutoGen-Chat's base code. We probably need only two things:
We achieve this with two classes:
This keeps the source code very modular while leveraging existing AutoGen capabilities. 1. Filtering Messages with
|
Thanks -- this is a good starting point. My feeling is that for both filtering messages and speaker sequence can be defined as part of the team rather than requiring a special agent type. When you provide the DAG, we can construct the right topic-subscription structure using the Core API for the participant agents, and the conditions that are required to trigger each agent in the DAG. |
This is so interesting because I've been playing around with this and I actually went in a COMPLETELY different direction. I looked at I was planning to discuss at office hours, but I can put the outline below. Graph Based Execution.The current agent chat model is great when multiple agents need to be in conversation with each other for various reasons, but in my opinion it's a bit heavy handed for a 1. New Graph ComponentsThe new Nodeclass Node(Protocol):
@property
def name(self) -> str: ...
@property
def targets(self) -> list[str]: ...
async def process_json(
self, input: Mapping[str, Any], cancellation_token: CancellationToken
) -> Mapping[str, Any]: ...
async def save_state(self) -> Mapping[str, Any]: ...
async def load_state(self, state: Mapping[str, Any]) -> None: ...
InputT = TypeVar("InputT", bound=BaseModel)
OutputT = TypeVar("OutputT", bound=BaseModel)
StateT = TypeVar("StateT", bound=BaseModel)
class BaseNode(ABC, Node, Generic[InputT, OutputT, StateT], ComponentBase[BaseModel]):
component_type = "node"
def __init__(self, name: str, input_type: Type[InputT], output_type: Type[OutputT], targets: list[str]):
self._name = name
self._input_type = input_type
self._output_type = output_type
self._targets = targets
@property
def name(self) -> str:
return self._name
@property
def targets(self) -> list[str]:
return self._targets
@abstractmethod
async def process(self, input: InputT, cancellation_token: CancellationToken) -> OutputT:
"""Process the input and return the output."""
...
@abstractmethod
async def on_reset(self) -> None:
"""Reset the node."""
...
async def process_json(self, input: Mapping[str, Any], cancellation_token: CancellationToken) -> Mapping[str, Any]:
return self._output_type.model_dump(
await self.process(self._input_type.model_validate(input), cancellation_token)
)
async def save_state(self) -> Mapping[str, Any]:
"""Export state. Default implementation for stateless nodes."""
return BaseState().model_dump()
async def load_state(self, state: Mapping[str, Any]) -> None:
"""Restore node from saved state. Default implementation for stateless nodes."""
BaseState.model_validate(state)
async def close(self) -> None:
"""Called when the runtime is closed"""
pass Graphclass Graph(Protocol):
@property
def name(self) -> str: ...
async def on_reset(self, cancellation_token: CancellationToken) -> None: ...
async def save_state(self) -> Mapping[str, Any]: ...
async def load_state(self, state: Mapping[str, Any]) -> None: ...
class BaseGraph(ABC, Graph, ComponentBase[BaseModel]):
component_type = "graph"
def __init__(self, name: str, nodes: list[BaseNode], starting_node: str, runtime: AgentRuntime | None = None):
self._name = name
if runtime is None:
self._runtime = SingleThreadedAgentRuntime()
else:
self._runtime = runtime
self._starting_node = starting_node
self._nodes = {node.name: node for node in nodes}
def _create_node_container_factory(self, node: BaseNode) -> Callable[[], NodeContainer]:
def factory() -> NodeContainer:
return NodeContainer(node.name, node, self._name)
return factory
async def run(self, cancellation_token: CancellationToken) -> None:
"""Run the graph. This will run all nodes based on the execution strategy."""
@abstractmethod
async def reset(self) -> None:
"""Reset the graph."""
...
async def on_reset(self, cancellation_token: CancellationToken) -> None:
await self.reset()
async def save_state(self) -> Mapping[str, Any]:
"""Export state. Default implementation for stateless nodes."""
return BaseState().model_dump()
async def load_state(self, state: Mapping[str, Any]) -> None:
"""Restore node from saved state. Default implementation for stateless nodes."""
BaseState.model_validate(state) Currently there is no 2. RoutedAgentsThe actual runtime of the graph would be managed by a set of The definition for the events are as follows. The core mechanism here is just input and output objects which are generic Eventsclass GraphNodeInput(BaseModel):
"""Input for a graph node."""
input: Mapping[str, Any] = Field(default_factory=dict, description="Input for the node")
context: Dict[str, Any] = Field(default_factory=dict, description="Additional context for the node")
class GraphNodeOutput(BaseModel):
"""Output from a graph node."""
input: Mapping[str, Any] = Field(default_factory=dict, description="Input for the node")
context: Dict[str, Any] = Field(default_factory=dict, description="Additional context from the node")
class GraphStart(BaseModel):
"""A request to start a graph execution."""
input: BaseModel = Field(default_factory=BaseModel, description="Input for the graph")
node_id: str = Field(description="ID of the node to start with")
class GraphNodeRequest(BaseModel):
"""A request to process a node."""
input: GraphNodeInput = Field(description="Input for the node")
# node_id: str = Field(description="ID of the node to process")
class GraphNodeResponse(BaseModel):
"""A response from a node."""
output: GraphNodeOutput = Field(description="Output from the node")
node_id: str = Field(description="ID of the node that produced the output")
target_node_ids: List[str] = Field(default_factory=list, description="IDs of the target nodes")
class GraphReset(BaseModel):
"""A request to reset the graph."""
pass
class GraphTermination(BaseModel):
"""A signal that the graph execution has terminated."""
reason: str = Field(description="Reason for termination")
final_output: Optional[GraphNodeOutput] = Field(default=None, description="Final output from the graph") NodeContainerclass GraphNodeState(Dict[str, Any]):
"""State of a graph node."""
pass
class NodeContainer(SequentialRoutedAgent):
"""A container for a node in a graph-based workflow.
This class wraps an agent or team and handles messages for the node.
It follows the single input/output principle where each node takes an input
and produces exactly one output.
Args:
name: Name of the node
node: Node that will process the input
graph_topic_type: Topic type for the graph
"""
def __init__(
self,
name: str,
node: BaseNode,
graph_topic_type: str,
):
"""Initialize a graph node container."""
self.name = name
self._node = node
self._graph_topic_type = graph_topic_type
self._state = GraphNodeState()
@property
def node(self) -> BaseNode:
"""The node that will process the input."""
return self._node
@event
async def handle_request(self, message: GraphNodeRequest, ctx: MessageContext) -> None:
"""Handle a request to process the node.
Args:
message: The request message
ctx: The message context
"""
# Process the input using the agent or team
result = await self._node.process_json(message.input.input, ctx.cancellation_token)
await self.publish_message(
GraphNodeResponse(
output=GraphNodeOutput(input=result, context=message.input.context),
node_id=self.name,
target_node_ids=self._node.targets,
),
topic_id=DefaultTopicId(type=self._graph_topic_type),
)
@rpc
async def handle_reset(self, message: GraphReset, ctx: MessageContext) -> None:
"""Handle a reset request.
Args:
message: The reset message
ctx: The message context
"""
await self._node.on_reset() GraphManagerclass GraphManagerState(Dict[str, Any]):
"""State of a graph manager."""
pass
class GraphManager(SequentialRoutedAgent, ABC):
"""A manager for a graph-based workflow.
This class orchestrates the execution of a graph-based workflow.
It receives responses from nodes and routes them to the appropriate targets.
Args:
name: Name of the graph manager
graph_topic_type: Topic type for the graph
output_queue: Queue for output messages
entry_node_ids: IDs of the entry nodes
exit_node_ids: IDs of the exit nodes
"""
def __init__(
self,
name: str,
graph_topic_type: str,
output_queue: asyncio.Queue[Any],
entry_node_ids: List[str],
exit_node_ids: Optional[List[str]] = None,
):
"""Initialize a graph manager."""
self.name = name
self._graph_topic_type = graph_topic_type
self._output_queue = output_queue
self._entry_node_ids = entry_node_ids
self._exit_node_ids = exit_node_ids or []
self._state = GraphManagerState()
self._visited_nodes: Set[str] = set()
@rpc
async def handle_start(self, message: GraphStart, ctx: MessageContext) -> None:
"""Handle a start request.
Args:
message: The start message
ctx: The message context
"""
@event
async def handle_node_response(self, message: GraphNodeResponse, ctx: MessageContext) -> None:
"""Handle a response from a node.
If there are more targets, forward the output to the next target.
If there are no targets, signal done for this branch
Args:
message: The response message
ctx: The message context
"""
@rpc
async def handle_reset(self, message: GraphReset, ctx: MessageContext) -> None:
"""Handle a reset request.
Args:
message: The reset message
ctx: The message context
""" 3. Extending the base graphWith the core Graph components in place, additional features can be brought on top to allow for various use-cases. For example a class DAGConfiguration(BaseModel):
nodes: list[ComponentModel]
class DAGState(BaseModel):
pass
class DAG(BaseGraph, Component[DAGConfiguration]):
component_config_schema = DAGConfiguration
component_provider_override = "autogen_ext.graph.DAG"
---
class CyclicGraphConfiguration(BaseModel):
nodes: list[ComponentModel]
allowed_cycles: int
class CyclicGraphState(BaseModel):
pass
class CyclicGraph(BaseGraph, Component[CyclicGraphConfiguration]):
component_config_schema = CyclicGraphConfiguration
component_provider_override = "autogen_ext.graph.CyclicGraph" 4. Extending the base nodeOnce a runtime is in place, a user can bring any
For example, a good starting node could be a class TaskRunnerWithState(TaskRunner, ComponentBase[BaseModel]):
pass
class TaskRunnerNodeConfiguration(BaseModel):
# Can either be a team or an agent
task_runner: ComponentModel
# A list of Nodes that are targets of this node
targets: list[str]
# The name of the node
name: str
class TaskRunnerNodeState(BaseModel):
pass
class TaskRunnerNode(BaseNode[TaskResult, TaskResult, TaskRunnerNodeState], Component[TaskRunnerNodeConfiguration]):
component_config_schema = TaskRunnerNodeConfiguration
component_provider_override = "autogen_ext.graph.node.TaskRunnerNode"
def __init__(self, name: str, task_runner: ChatAgent | Team, targets: list[str]):
self._task_runner = task_runner
super().__init__(name=name, input_type=TaskResult, output_type=TaskResult, targets=targets)
async def process(self, input: TaskResult, cancellation_token: CancellationToken) -> TaskResult:
messages = [message for message in input.messages if isinstance(message, ChatMessage)]
return await self._task_runner.run(task=messages, cancellation_token=cancellation_token)
def _to_config(self) -> TaskRunnerNodeConfiguration:
return TaskRunnerNodeConfiguration(
task_runner=self._task_runner.dump_component(),
targets=self._targets,
name=self._name,
)
@classmethod
def _from_config(cls, config: TaskRunnerNodeConfiguration) -> Self:
task_runner = ComponentLoader.load_component(config.task_runner)
if not isinstance(task_runner, ChatAgent | Team):
raise ValueError("Task runner must be an agent or a team")
return cls(name=config.name, task_runner=task_runner, targets=config.targets)
async def on_reset(self) -> None:
if isinstance(self._task_runner, ChatAgent):
await self._task_runner.on_reset(CancellationToken())
else:
await self._task_runner.reset() 5. Brining it all togetherI recognize that this is a much larger undertaking than adding this on top of When thinking about this I looked at existing declarative graph systems, and I appreciated the simplicity of something like GitHub Actions. GitHub Actions have a jobs:
my-first-job:
steps:
- <do stuff>
my-second-job:
needs: my-first-job
steps:
- <do stuff> The fundamental use-case here is very similar, but in my opinion flipped on its head. Rather than a jobs:
my-first-job:
steps:
- <do stuff>
targets: my-second-job
my-second-job:
steps:
- <do stuff> Implementing logic flows like this in config rather than code has numerous important advantages:
From what I have seen, most systems shift over time from code-based config/execution to config based, and this system should be no different. 6. Potential future improvementsThe system outlined above is quite basic, but once the building blocks are in place there are A LOT of things that can be added:
NotesI have some code locally where I've been playing around with this idea, it's not quite done, but I think the explanation above should be a good starting point for an office hours discussion, and then we/I can decide how to move forward. |
@EItanya This design that you proposed is great - in fact, my initial thought was very much along these lines. I then started digging into how easy/difficult it would be to reuse all the work that has already been done and what is nice about the Teams API is that it almost naturally supports the entire DAG use case though it might not seem so initially. As @ekzhu said earlier, the two primary changes needed (sequencing tasks and filtering messages) can be done almost entirely within the Teams API today without adding additional agents. My primary concern with the "doing from scratch" is the amount of work involved - not just now to get this up and running and but also in the future in terms of maintainability. A couple of other (minor) points based on the original requirements from @victordibia:
I had a feeling that once we add the above three items to the requirements to be satisfied by the DAG workflow, we will need to add very "Team like mechanisms" into the DAG anyways. Then any changes to the Teams API will then start to impact the DAG code (and vice-versa). tl;dr: I think both options have a lot of merit. The primary difference is in terms of the amount of work and which solution is better in terms of long-term maintainability. I would be happy to go with either approach as I see the argument for both. It is really a question for the maintainers of this codebase. |
Excellent conversation .. thanks @abhinav-aegis , @EItanya . FWIW, I think the implementations above are in line with some minimal experiments I have tried - e.g., simply using the core api abstractions (topic subscription) to enable graph behavior. The challenge I see there would be lose integration with all the structure in agentchat (which would be great to build on). What @ekzhu mentions seems like the right trade off (also related to @abhinav-aegis's ideas) Create a custom Team (or GroupChat ..) that can take a declarative representation of the agent graph and convert that into some topic subscription (message delivery) logic. Simple case: assume A (start) ->B ->C->D (end) . Team/Groupchat sets up topic subscription for each agent TopicA, TopicB, TopicC, TopicD
We might need structured output from agents e.g, to embed logic on conditional transitions. @abhinav-aegis , I agree with all the effort/maintenance tradeoffs here. |
Very interesting conversation indeed: @abhinav-aegis and @victordibia! I have many thoughts, but I agree with this plan generally as well. The trade-off of risk vs. reward is always a tricky one. I'm going to spend some time better understanding the internals of the I do want to highlight this line from above:
I think this is gonna be super important, I was trying to get this point across in my design, but I don't think I did it enough justice. Figuring out a way to declaratively express a system which could do this would be super duper cool, and in my opinion, really useful. The other behavior that I wanna look into more is parallel execution, in a APII put the API under its own header because I would love to get some more detail on the API going, getting that right can be quite difficult. I have a few ideas, from my time playing around with the project, and my time building declarative APIs the last few years. I agree with @abhinav-aegis about having a class TargetPicker(ABC)
""" I know the name isn't good 😆"""
@abstractmethod
async def get_targets(agents: list[str], agent_output: BaseModel) -> list[str]:
class AllTarget(TargetPicker):
async def get_targets(agents: list[str], agent_output: BaseModel) -> list[str]:
return agents
class AnyTarget(TargetPicker):
...
class DagAgentConfig(BaseModel):
targets: TargetPicker
class DagAgent(AssistantAgent, Component[DagAgentConfig]):
def __init__(self, name: str, target_picker: TargetPicker | None = None):
super().__init__(name=name)
self._ target_picker = target_picker or AllTarget()
async def get_targets(agents: list[str], agent_output: BaseModel) -> list[str]
self._ target_picker.get_targets(agents, agent_output)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
filtered_messages = [msg for msg in messages if msg.source in self._allowed_sources]
if not filtered_messages:
return Response(chat_message=TextMessage(content="No relevant messages to process.", source=self.name))
return await super().on_messages(filtered_messages, cancellation_token) I mentioned this example earlier: jobs:
my-first-job:
steps:
- <do stuff>
my-second-job:
needs: my-first-job
steps:
- <do stuff
---
jobs:
my-first-job:
steps:
- <do stuff>
targets: my-second-job
my-second-job:
steps:
- <do stuff> From what I understand, if we break the DAG config into it's own block, we'd have something like the following: jobs:
my-first-job:
steps:
- <do stuff>
my-second-job:
steps:
- <do stuff
dag:
edges:
- source: my-first-job
dest: my-second-job The 2nd one would get much more complex to read in my opinion, the other method allows you to elide either the Anyway, just thinking out loud, would love to hear others thoughts on what I mentioned :) |
What I believe makes the most sense is to have an |
I will try to put something together a prototype before the call next week and share. Thanks! |
Agree that communication based on edges makes sense. I think this is what @ekzhu and @victordibia are mentioning as well. Looks like it should be feasible where the correct topic subscriptions should be setup. Will try to put this in the prototype. |
What I mean is using direct communication instead of topics as explained here: https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/framework/message-and-communication.html I think it is fair in these workflows for a agent to only receive the direct communication of its predecessors. Not even anyone else in in the direct lineage, but only the agents with direct edge dependencies. I think this also would enforce users to be more thoughtful about the flow of information in the dag and make direct dependencies explicit. If there are any groups of agents that need to share context more generally then we can use memory but I think that would be an exception. |
I'm not as familiar with the core framework, so this may not be correct, but wouldn't direct communication that isn't
Do you envision a way to do this with the GroupChat model? Right now the Agents themselves have a @event
async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None:
"""Handle an agent response event by appending the content to the buffer."""
self._message_buffer.append(message.agent_response.chat_message) The other thing I want to point out is that @ekzhu just created #5931, which the more I think about it functionally a Multiple speakersI did some more research on the topic of multiple speakers within a @abstractmethod
async def select_speaker(self, thread: List[AgentEvent | ChatMessage]) -> str:
"""Select a speaker from the participants and return the
topic type of the selected speaker."""
... And then change the @event
async def handle_agent_response(self, message: GroupChatAgentResponse, ctx: MessageContext) -> None:
...
# Select a speaker to continue the conversation.
speaker_name_future = asyncio.ensure_future(self.select_speaker(self._message_thread))
# Link the select speaker future to the cancellation token.
ctx.cancellation_token.link_future(speaker_name_future)
speaker_names = await speaker_name_future
for speaker_name in speaker_names:
if speaker_name not in self._participant_name_to_topic_type:
raise RuntimeError(f"Speaker {speaker_name} not found in participant names.")
for speaker_name in speaker_names:
speaker_topic_type = self._participant_name_to_topic_type[speaker_name]
await self.publish_message(
GroupChatRequestPublish(),
topic_id=DefaultTopicId(type=speaker_topic_type),
cancellation_token=ctx.cancellation_token,
) However, back to @lspinheiro's point, this will not direct specific pieces of data at these agents, but rather just tell them to execute with the buffered data they already have from all other responses. Unfortunately the more I think about the GroupChat the more I think it will be the incorrect method of accomplishing this task. I wonder if there's a way to share the lifecycle management aspects that |
Great discussion! I am putting in a foundation layer that is structured output for agents #5131 and #5934. I think for a graph-based flow, the ability to specify structured contract is necessary, this will make it much simpler to ensure type consistency in a graph. We can discuss further in our next community office hour meeting: #4059 . |
@EItanya , what I recall is that the chat agent container broadcasts to a single topic and the behavior is currently hardcoded so every participant will see all messages, I think this is why @ekzhu suggested filtering. We could potentially update this and make topics configurable but my current view is that adding a topic per agent may end up in a more complex solution than needed. On the previous question, I don't think the graph-based teams need to be part of group chat. I think the main requirement is tht agents and message types should be reusable and those are defined outside of the group chat. I believe group chat itself focus on this team-wide broadcasting communication pattern which we may not want in graph-based execution. |
@lspinheiro @EItanya @abhinav-aegis I created a discord channel: https://discord.com/channels/1313229474107756584/1341892471084548198 we can also discuss there. Due to time zone difference we may not be able to meet up during the community office hour. The discord channel is an alternative. |
@ekzhu @victordibia See here a solution for this issue: https://github.com/abhinav-aegis/autogen/tree/aegis-dag. I branched off from Structured message Model Components to start this work (98b286f966e079c3f643b93467aa3deaa37b4756) What is in the new work:
Considering the above, I think a solid solution is for an agent to simply filter the messages it needs to listen to allowing encapsulation of agents behaviour (and if required, simply filter just for the parents messages).
TODO:
Hopefully this branch forms a good starting point - I do feel that it can be done with minimal changes to agentchat. |
Amazing work @abhinav-aegis. I will look into the code. |
Hi @abhinav-aegis , do you think you could open a draft PR so we can collaborate on a conversation in the review |
@lspinheiro Would be more than happy to collaborate. However, it was decided during the Office hour on Thursday that we will create a community package for this topic and maintain it as such. I need to do a little bit of work to setup a separate project for that. If you are happy to contribute there I would appreciate the help. Please ping me on Discord and we can discuss the details. @EItanya Same applies to you if you wish to work on this item. |
Of course, I just joined the discord. Are you discussing this in the contributors channel? I will continue there, sadly I generally can't join office hours as it is quite late in my time zone. |
I want to add another vote for priority on this - if this is introduced, theoretically this could replace all our use-cases for Langgraph. It would mean having a single framework - Autogen - for my org and the ability to leverage all the autogen tooling we build alongside the builtin benefits of autogen for both open-ended and directed usecases. |
An issue to foster discussion on enabling graph based execution of team steps (workflow) in AutoGen AgentChat
The current API in AgentChat makes a lot of progress in terms of enabling chat-based execution of steps within an autonomous multi-agent application. This is based mostly on the BaseGroupChat team class.
However, it still does not support the ability to easily craft nodes or chains where the developer has clear control over exact execution flow. What this issue describes could be seem as a midpoint between what a simple chain approach like LangChain can do versus what a fully autonomous group (BaseGroupChat, SelectorGroupChat etc .. but with limited control) chat can do.
What is a Graph/Chain Execution Flow Pattern?
In its simplest form, a graph begins with a set of nodes and edges (similar to LangChain). A node is an independent processing unit that takes an input and provides one output. An edge defines transitions between nodes. For example, an edge between node A and B means that the output from node A goes to node B.
Important behaviors here:
A node has only one input and one output.
A node must be independent - i.e., it must only be aware of its input and use that to produce its output. This way its context is free of pollution that can arise as other agents work (e.g., Agent B does not need to know that Agent A tried to write some code 10 times until it worked... it only needs the output of agent A either pass or fail).
It should be easy to define nodes and edges as a DAG graph; e.g., a simple linear chain is one where the output from one agent goes to the next: A->B->C->D...
There should be the concept of entry and exit points
Why is this a good idea?
Because it is such a common and intuitive thing for a framework to do - expressing the solution to a problem as a set of steps. The most common version being a simple sequence of independent nodes.
A Chain/Graph setup lets us progress from being a simple set of steps by allowing dynamic behavior within each step (e.g., group chats inside a node) while still controlling information flow on a high level.
Other benefits:
When is a flow based approach the wrong approach?
There are some scenarios where this can happen.
For example
Simple example
Data visualization
Graph looks like
When this graph is run, A generates the code,
How is this different from current AgentChat?
AgentChat today is based on a group chat mechanism. Every agent sees every message sent (corrections welcome here). This violates the independence principle above.
What are ways forward?
SwarmGroupChat might come close to the behavior listed above but has the independence issue.
Some rough ideas:
This issue is meant as a discussion to collate ideas into some future implementation.
Feedback and discussion welcome!
@husseinmozannar , @gagb , @afourney , @ekzhu , @jackgerrits
The text was updated successfully, but these errors were encountered: