Skip to main content

Documentation Index

Fetch the complete documentation index at: https://agno-v2-fix-deploy-docs-restructure.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

1

Create a Python file

run_agents.py
import asyncio
import json

import httpx
from agno.client import AgentOSClient
from agno.run.agent import RunCompletedEvent, RunContentEvent


async def run_agent_non_streaming():
    """Execute a non-streaming agent run."""
    print("=" * 60)
    print("Non-Streaming Agent Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    config = await client.aget_config()
    if not config.agents:
        print("No agents available")
        return

    agent_id = config.agents[0].id
    print(f"Running agent: {agent_id}")

    result = await client.run_agent(
        agent_id=agent_id,
        message="What is 2 + 2? Explain your answer briefly.",
    )

    print(f"\nRun ID: {result.run_id}")
    print(f"Content: {result.content}")
    print(f"Tokens: {result.metrics.total_tokens if result.metrics else 'N/A'}")


async def run_agent_streaming():
    """Execute a streaming agent run."""
    print("\n" + "=" * 60)
    print("Streaming Agent Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    config = await client.aget_config()
    if not config.agents:
        print("No agents available")
        return

    agent_id = config.agents[0].id
    print(f"Streaming from agent: {agent_id}")
    print("\nResponse: ", end="", flush=True)

    async for event in client.run_agent_stream(
        agent_id=agent_id,
        message="Tell me a short joke.",
    ):
        if isinstance(event, RunContentEvent):
            print(event.content, end="", flush=True)
        elif isinstance(event, RunCompletedEvent):
            pass

    print("\n")


async def run_agent_background_resumable():
    """Start a background streaming run and reconnect via /resume."""
    print("\n" + "=" * 60)
    print("Background Resumable Agent Run (SSE)")
    print("=" * 60)

    BASE_URL = "http://localhost:7777"

    async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as client:
        agents = (await client.get("/agents")).json()
        agent_id = agents[0]["id"]

    # Phase 1: Start a background streaming run, disconnect after a few events
    run_id = None
    session_id = None
    last_event_index = None

    print("Starting background stream...")
    async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client:
        form_data = {
            "message": "Write a detailed story about a brave knight.",
            "stream": "true",
            "background": "true",
        }
        async with client.stream("POST", f"/agents/{agent_id}/runs", data=form_data) as response:
            event_count = 0
            buffer = ""
            async for chunk in response.aiter_text():
                buffer += chunk
                while "\n\n" in buffer:
                    event_str, buffer = buffer.split("\n\n", 1)
                    for line in event_str.strip().split("\n"):
                        if not line.startswith("data: "):
                            continue
                        data = json.loads(line[6:])
                        if data.get("run_id") and not run_id:
                            run_id = data["run_id"]
                        if data.get("session_id") and not session_id:
                            session_id = data["session_id"]
                        if data.get("event_index") is not None:
                            last_event_index = data["event_index"]
                        event_count += 1
                        print(f"  [{event_count}] index={data.get('event_index')} event={data.get('event')}")
                        if event_count >= 5:
                            break
                    if event_count >= 5:
                        break
                if event_count >= 5:
                    break

    print(f"\nDisconnected after {event_count} events (last_event_index={last_event_index})")

    # Phase 2: Simulate being away
    await asyncio.sleep(2)

    # Phase 3: Reconnect via /resume
    print("\nReconnecting via /resume...")
    form_data = {}
    if last_event_index is not None:
        form_data["last_event_index"] = str(last_event_index)
    if session_id:
        form_data["session_id"] = session_id

    async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client:
        async with client.stream(
            "POST", f"/agents/{agent_id}/runs/{run_id}/resume", data=form_data
        ) as response:
            buffer = ""
            async for chunk in response.aiter_text():
                buffer += chunk
                while "\n\n" in buffer:
                    event_str, buffer = buffer.split("\n\n", 1)
                    for line in event_str.strip().split("\n"):
                        if not line.startswith("data: "):
                            continue
                        data = json.loads(line[6:])
                        event_type = data.get("event")
                        if event_type in ("catch_up", "replay", "subscribed"):
                            print(f"  [META] {event_type}")
                        else:
                            print(f"  [RESUME] index={data.get('event_index')} event={event_type}")

    print("\nDone!")


async def main():
    await run_agent_non_streaming()
    await run_agent_streaming()
    await run_agent_background_resumable()


if __name__ == "__main__":
    asyncio.run(main())
2

Set up your virtual environment

uv venv --python 3.12
source .venv/bin/activate
3

Install dependencies

uv pip install -U agno openai httpx
4

Export your OpenAI API key

export OPENAI_API_KEY="your_openai_api_key_here"
5

Start an AgentOS Server

Make sure you have an AgentOS server running on port 7777. See Creating Your First OS for setup instructions.
6

Run the Client

python run_agents.py