PydanticAI agents

PydanticAI is a type-safe agent framework from the Pydantic team. As with any Python agent framework, you run a PydanticAI agent on Union.ai by invoking it from inside an @env.task — the framework drives the loop, while Union.ai gives you a container, durable checkpointing, and observability.

The key integration point: PydanticAI tools are just Python functions, so a tool can delegate to a durable @env.task. That gives you the best of both worlds — PydanticAI’s typed tool-calling and Union.ai’s durable, observable, on-cluster tool execution.

A PydanticAI agent in a task

Define a PydanticAI Agent, register tools that call your durable Flyte tasks, and run it from an @env.task:

pydantic_ai_agent.py
from pydantic_ai import Agent

import flyte

env = flyte.TaskEnvironment(
    name="pydantic-ai-agent",
    image=flyte.Image.from_debian_base(python_version=(3, 13)).with_pip_packages(
        "pydantic-ai-slim[anthropic]",
    ),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    secrets=[flyte.Secret(key="ANTHROPIC_API_KEY")],
)


# Durable Flyte tasks that do the real work (heavy IO / compute, retryable).
@env.task
async def fetch_weather(city: str) -> dict[str, float | str]:
    """Fetch a weather snapshot for a city."""
    # Replace with a real weather API call.
    return {"city": city, "temperature_f": 68.4, "conditions": "partly cloudy"}


@env.task
async def add(x: float, y: float) -> float:
    """Add two numbers."""
    return x + y


# The PydanticAI agent. Tools delegate to the durable tasks above.
agent = Agent(
    "anthropic:claude-3-5-sonnet-latest",
    system_prompt=(
        "You are a friendly assistant. Use the tools to look up weather and "
        "compute math. Reply with a single-sentence summary."
    ),
)


@agent.tool_plain
async def get_weather(city: str) -> dict:
    """Look up the current weather for a city."""
    return await fetch_weather(city)


@agent.tool_plain
async def add_numbers(x: float, y: float) -> float:
    """Add two numbers together."""
    return await add(x, y)


@env.task(report=True)
async def run_agent(prompt: str) -> str:
    """Run the PydanticAI agent inside a durable Flyte task."""
    with flyte.group("pydantic-ai-run"):
        result = await agent.run(prompt)
    return result.output

What’s happening under the hood:

  • run_agent runs in a container with only pydantic-ai-slim installed.
  • PydanticAI drives its own typed tool-call loop. Each tool delegates to an @env.task, so fetch_weather and add execute durably on the cluster and appear in the Union.ai dashboard.
  • flyte.group("pydantic-ai-run") groups the loop’s tool calls under one span for readability.
  • The task’s input prompt and final output are durably persisted for end-to-end inspection.

For lighter, in-process tools you don’t need to promote to tasks, decorate them with @flyte.trace instead of @env.task to still capture them as spans.

Parallel agents

To run many PydanticAI agents concurrently — each in its own container — fan out with asyncio.gather():

import asyncio


@env.task
async def run_many(prompts: list[str]) -> list[str]:
    results = await asyncio.gather(*[run_agent(p) for p in prompts])
    return list(results)

Next steps