Integrating Cycles with AutoGen
This guide shows how to add budget governance to AutoGen multi-agent workflows so that every LLM call is cost-controlled, observable, and automatically stopped when budgets run out.
AutoGen (v0.4+) does not have a built-in middleware or callback system for intercepting LLM calls. The recommended pattern is to wrap the model client with a budget-gated wrapper that creates Cycles reservations before each call and commits actual usage after.
Prerequisites
pip install runcycles autogen-agentchat "autogen-ext[openai]"export CYCLES_BASE_URL="http://localhost:7878"
export CYCLES_API_KEY="your-api-key" # create via Admin Server — see note below
export CYCLES_TENANT="acme"
export OPENAI_API_KEY="sk-..."Need an API key? Create one via the Admin Server — see Deploy the Full Stack or API Key Management.
60-Second Quick Start
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from runcycles import CyclesClient, CyclesConfig, cycles, set_default_client
set_default_client(CyclesClient(CyclesConfig.from_env()))
model_client = OpenAIChatCompletionClient(model="gpt-4o")
@cycles(estimate=2_000_000, action_kind="llm.completion", action_name="gpt-4o")
async def ask(prompt: str) -> str:
agent = AssistantAgent("assistant", model_client=model_client)
result = await agent.run(task=prompt)
await model_client.close()
return result.messages[-1].content
print(asyncio.run(ask("What is budget authority?")))Every agent run is now budget-guarded. If the budget is exhausted, BudgetExceededError is raised before the agent runs. Read on for per-call budget control with a model client wrapper.
Budget-gated model client
Wrap OpenAIChatCompletionClient to create a Cycles reservation before every LLM call and commit actual token usage after:
import uuid
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_core.models import CreateResult, RequestUsage
from runcycles import (
CyclesClient, CyclesConfig, ReservationCreateRequest, CommitRequest,
ReleaseRequest, Subject, Action, Amount, Unit, CyclesMetrics,
BudgetExceededError, CyclesProtocolError,
)
PRICE_PER_INPUT_TOKEN = 250 # GPT-4o: $2.50/1M tokens in microcents
PRICE_PER_OUTPUT_TOKEN = 1_000 # GPT-4o: $10/1M tokens in microcents
class CyclesBudgetClient:
"""Wraps an OpenAIChatCompletionClient with Cycles budget governance.
Delegates all ChatCompletionClient protocol methods to the inner client,
overriding create() to add reserve → execute → commit lifecycle.
"""
def __init__(
self,
inner: OpenAIChatCompletionClient,
cycles_client: CyclesClient,
tenant: str = "acme",
workflow: str | None = None,
agent: str | None = None,
estimate_amount: int = 2_000_000,
):
self._inner = inner
self._cycles = cycles_client
self._subject = Subject(tenant=tenant, workflow=workflow, agent=agent)
self._estimate_amount = estimate_amount
async def create(self, messages, **kwargs) -> CreateResult:
key = str(uuid.uuid4())
# Reserve budget
res = self._cycles.create_reservation(ReservationCreateRequest(
idempotency_key=key,
subject=self._subject,
action=Action(kind="llm.completion", name="gpt-4o"),
estimate=Amount(unit=Unit.USD_MICROCENTS, amount=self._estimate_amount),
ttl_ms=60_000,
))
if not res.is_success:
error = res.get_error_response()
if error and error.error == "BUDGET_EXCEEDED":
raise BudgetExceededError(
error.message, status=res.status,
error_code=error.error, request_id=error.request_id,
)
msg = error.message if error else (res.error_message or "Reservation failed")
raise CyclesProtocolError(
msg, status=res.status,
error_code=error.error if error else None,
)
rid = res.get_body_attribute("reservation_id")
try:
# Execute LLM call
result = await self._inner.create(messages, **kwargs)
# Commit actual cost
input_tokens = result.usage.prompt_tokens if result.usage else 0
output_tokens = result.usage.completion_tokens if result.usage else 0
actual = input_tokens * PRICE_PER_INPUT_TOKEN + output_tokens * PRICE_PER_OUTPUT_TOKEN
self._cycles.commit_reservation(rid, CommitRequest(
idempotency_key=f"commit-{key}",
actual=Amount(unit=Unit.USD_MICROCENTS, amount=actual),
metrics=CyclesMetrics(
tokens_input=input_tokens,
tokens_output=output_tokens,
),
))
return result
except BudgetExceededError:
raise
except Exception:
self._cycles.release_reservation(
rid, ReleaseRequest(idempotency_key=f"release-{key}"),
)
raise
def create_stream(self, messages, **kwargs):
# Streaming calls are delegated without budget governance.
# For per-stream budget control, use reserveForStream patterns instead.
return self._inner.create_stream(messages, **kwargs)
async def close(self):
await self._inner.close()
def actual_usage(self):
return self._inner.actual_usage()
def total_usage(self):
return self._inner.total_usage()
def count_tokens(self, messages, *, tools=[]):
return self._inner.count_tokens(messages, tools=tools)
def remaining_tokens(self, messages, *, tools=[]):
return self._inner.remaining_tokens(messages, tools=tools)
@property
def capabilities(self):
return self._inner.capabilities
@property
def model_info(self):
return self._inner.model_infoUsing the budget-gated client
Single agent
Pass the wrapped client to any AssistantAgent:
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from runcycles import CyclesClient, CyclesConfig, BudgetExceededError
cycles_client = CyclesClient(CyclesConfig.from_env())
inner = OpenAIChatCompletionClient(model="gpt-4o")
model = CyclesBudgetClient(
inner=inner,
cycles_client=cycles_client,
tenant="acme",
agent="support-bot",
)
agent = AssistantAgent("support-bot", model_client=model)
async def main():
try:
result = await agent.run(task="Explain budget governance for AI agents.")
print(result.messages[-1].content)
except BudgetExceededError:
print("Budget exhausted.")
finally:
await model.close()
asyncio.run(main())With tools
Every LLM call the agent makes — including tool-calling turns — gets its own reservation:
from autogen_core.tools import FunctionTool
from autogen_agentchat.agents import AssistantAgent
async def get_weather(location: str) -> str:
"""Get current weather for a location."""
return f"72°F and sunny in {location}"
weather_tool = FunctionTool(get_weather, description="Get current weather")
agent = AssistantAgent(
"weather-agent",
model_client=model,
tools=[weather_tool],
system_message="Use the weather tool to answer questions.",
)
result = await agent.run(task="What's the weather in NYC?")Each iteration of the tool-calling loop (LLM call → tool → LLM call) creates its own reservation. The agent stops as soon as budget is denied.
Per-agent budget scoping in teams
Use separate CyclesBudgetClient instances with different agent values for each team member:
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
inner = OpenAIChatCompletionClient(model="gpt-4o")
researcher_model = CyclesBudgetClient(
inner=inner,
cycles_client=cycles_client,
tenant="acme",
workflow="research-pipeline",
agent="researcher",
estimate_amount=3_000_000,
)
writer_model = CyclesBudgetClient(
inner=inner,
cycles_client=cycles_client,
tenant="acme",
workflow="research-pipeline",
agent="writer",
estimate_amount=2_000_000,
)
researcher = AssistantAgent(
"researcher",
model_client=researcher_model,
system_message="Research topics thoroughly and provide detailed findings.",
)
writer = AssistantAgent(
"writer",
model_client=writer_model,
system_message="Write clear, concise reports from research findings.",
)
team = RoundRobinGroupChat(
participants=[researcher, writer],
termination_condition=MaxMessageTermination(max_messages=6),
)
try:
result = await team.run(task="Analyze AI safety trends for Q4.")
print(result.messages[-1].content)
except BudgetExceededError:
print("Team stopped — budget exhausted.")This gives you a budget hierarchy: tenant (acme) > workflow (research-pipeline) > agent (researcher / writer). Each agent can have its own budget limits set by the budget authority.
Guarding entire workflows with the decorator
For coarser-grained control — budgeting the entire team run rather than individual LLM calls — use the @cycles decorator:
from runcycles import cycles, set_default_client, BudgetExceededError
set_default_client(CyclesClient(CyclesConfig.from_env()))
@cycles(estimate=10_000_000, action_kind="llm.completion", action_name="research-pipeline")
async def run_research_pipeline(topic: str) -> str:
result = await team.run(task=f"Research and write a report on: {topic}")
return result.messages[-1].content
try:
report = await run_research_pipeline("quantum computing")
print(report)
except BudgetExceededError:
print("Pipeline budget exhausted.")With this approach, the entire team run gets a single reservation. This is simpler but less granular than per-call wrapping.
Swarm teams with budget governance
For Swarm teams where agents hand off to each other, each agent's model client tracks its own budget:
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import MaxMessageTermination
reviewer = AssistantAgent(
"reviewer",
model_client=CyclesBudgetClient(
inner=inner, cycles_client=cycles_client,
tenant="acme", agent="reviewer",
),
handoffs=["approver"],
system_message="Review budgets. Hand off to approver when ready.",
)
approver = AssistantAgent(
"approver",
model_client=CyclesBudgetClient(
inner=inner, cycles_client=cycles_client,
tenant="acme", agent="approver",
),
handoffs=["reviewer"],
system_message="Approve or reject. Hand back to reviewer if issues found.",
)
team = Swarm(
participants=[reviewer, approver],
termination_condition=MaxMessageTermination(max_messages=10),
)
result = await team.run(task="Review this budget proposal: ...")Choosing an integration approach
| Approach | Granularity | Best for |
|---|---|---|
CyclesBudgetClient wrapper | Per-LLM-call | Fine-grained token tracking per agent |
@cycles decorator on run | Per-workflow | Coarser budget control, simpler setup |
| Per-agent wrappers in teams | Per-LLM-call, per-agent scoped | Independent budgets per team member |
You can combine approaches — for example, use per-agent CyclesBudgetClient wrappers for LLM cost tracking and @cycles on the team run for total workflow budget.
Key points
- Wrap the model client, not the agent. AutoGen v0.4+ doesn't have callback hooks, so wrap
OpenAIChatCompletionClientwithCyclesBudgetClientfor per-call budget governance. - Per-agent scoping with separate wrappers. Create wrappers with different
agentvalues to track and limit costs per team member independently. - Tool-calling turns are automatically covered. Each LLM call in a tool-use loop gets its own reservation through the model client wrapper.
- Everything is async. AutoGen v0.4+ is fully async — use
asyncio.run()orawaitfor all agent and team operations. - Errors stop the agent.
BudgetExceededErrorraised in the model client propagates up and stops the agent or team.
Full example
See examples/autogen_integration.py for a complete, runnable script.
Next steps
- Error Handling Patterns in Python — handling budget errors in Python
- Degradation Paths — strategies for graceful degradation
- Testing with Cycles — testing budget-guarded code
- Production Operations Guide — running Cycles in production
- Integrating with OpenAI — budget governance for direct OpenAI calls