# Parallelized autoresearch agent

> [!NOTE]
> Code available [here](https://github.com/unionai/unionai-examples/tree/main/v2/tutorials/parallelized_autoresearch).

This tutorial extends the [Autoresearch agent](https://www.union.ai/docs/v2/union/tutorials/agents/autoresearch/_index) pattern with a code-mode MLE agent that plans **batches** of training experiments, saves distinct `train.py` edits, and runs them **in parallel** via `flyte.map`. It follows the [karpathy/autoresearch](https://github.com/karpathy/autoresearch) loop — minimize validation bits-per-byte on a TinyGPT variant — but orchestrates fan-out batches with durable Flyte tasks and [unionai-sandbox](https://www.union.ai/docs/v2/union/user-guide/sandboxing/_index) execution.

Compared to the single-threaded Claude Code autoresearch tutorial, this agent:

- Edits full `train.py` source (upstream karpathy style) instead of calling a remote coding CLI
- Uses **`code_mode=True`** so the LLM writes Python plans that call `run_experiment_batch` or `flyte_map`
- Persists a **leaderboard**, code-edit history, and batch plans in `MemoryStore`
- Self-heals **OOM** during sandbox training runs by bumping memory and retrying

## Define the task environments

The example uses three environments — bundle preparation, sandbox experiments, and the agent driver — sharing a Debian-based image with PyTorch and sandbox tooling.

```
"""Shared Flyte environments and climbmix dataset bundle tasks."""

from __future__ import annotations

import os
import tempfile
from dataclasses import dataclass
from pathlib import Path

import flyte
from flyte.io import Dir

from autoresearch_types import DatasetProfile
from autoresearch_types import DEFAULT_NUM_SHARDS

TRAIN_PIP_PACKAGES = ["torch", "numpy", "pyarrow", "requests", "tiktoken", "rustbpe"]

_TUTORIAL_DIR = Path(__file__).parent
_INCLUDE = [str(p) for p in sorted(_TUTORIAL_DIR.glob("*.py"))]

image = flyte.Image.from_debian_base(name="mle-autoresearch").with_pip_packages(
    "litellm",
    "httpx",
    "pydantic-monty",
    "unionai-sandbox[flyte]",
    *TRAIN_PIP_PACKAGES,
)

bundle_env = flyte.TaskEnvironment(
    name="autoresearch-bundle",
    resources=flyte.Resources(cpu=4, memory="8Gi"),
    image=image,
    include=_INCLUDE,
)

experiment_env = flyte.TaskEnvironment(
    name="autoresearch-experiment",
    resources=flyte.Resources(cpu=2, memory="2Gi"),
    image=image,
    include=_INCLUDE,
)

# {{docs-fragment env}}
agent_env = flyte.TaskEnvironment(
    name="autoresearch-agent",
    resources=flyte.Resources(cpu=1, memory="2Gi"),
    image=image,
    include=_INCLUDE,
    secrets=[flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY")],
    depends_on=[experiment_env, bundle_env],
)
# {{/docs-fragment env}}

@dataclass
class AutoresearchBundle:
    data_dir: Dir
    tokenizer_dir: Dir

@bundle_env.task(cache="auto")
async def build_bundle(num_shards: int = DEFAULT_NUM_SHARDS, download_workers: int = 4) -> AutoresearchBundle:
    """Download climbmix shards + train the BPE tokenizer; cache the result."""
    import prepare

    cache = tempfile.mkdtemp(prefix="autoresearch-cache-")
    os.environ["AUTORESEARCH_CACHE"] = cache
    prepare.download_data(num_shards, download_workers=download_workers)
    prepare.train_tokenizer()
    data_dir = await Dir.from_local(prepare.data_dir())
    tokenizer_dir = await Dir.from_local(prepare.tokenizer_dir())
    return AutoresearchBundle(data_dir=data_dir, tokenizer_dir=tokenizer_dir)

@bundle_env.task(cache="auto")
async def profile_bundle(bundle: AutoresearchBundle) -> DatasetProfile:
    """Summarize the prepared bundle for the agent's context."""
    import prepare

    data_dir = await bundle.data_dir.download()
    tokenizer_dir = await bundle.tokenizer_dir.download()
    parquet_files = sorted(p.name for p in Path(data_dir).glob("*.parquet"))
    data_bytes = sum(p.stat().st_size for p in Path(data_dir).glob("**/*") if p.is_file())
    tok_bytes = sum(p.stat().st_size for p in Path(tokenizer_dir).glob("**/*") if p.is_file())
    return DatasetProfile(
        n_parquet_files=len(parquet_files),
        parquet_files=parquet_files,
        vocab_size=prepare.VOCAB_SIZE,
        data_bytes=data_bytes,
        tokenizer_bytes=tok_bytes,
    )

async def materialize_cache(bundle: AutoresearchBundle) -> str:
    """Download the bundle into an AUTORESEARCH_CACHE-shaped scratch dir."""
    cache = tempfile.mkdtemp(prefix="autoresearch-run-")
    os.environ["AUTORESEARCH_CACHE"] = cache
    await bundle.data_dir.download(local_path=os.path.join(cache, "data"))
    await bundle.tokenizer_dir.download(local_path=os.path.join(cache, "tokenizer"))
    return cache
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/parallelized_autoresearch/bundle.py*

Supporting modules (`train.py`, `prepare.py`, `tools.py`, and `ui.py`) live alongside the entry point in the example directory.

## The fan-out agent task

The driver task `parallelized_autoresearch` restores prior memory (default key `parallelized-autoresearch`), streams Activity / Leaderboard / Code edits / Memory report tabs, and runs the code-mode agent loop.

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.4.0",
#    "litellm",
#    "httpx",
#    "pydantic-monty",
#    "unionai-sandbox[flyte]",
#    "torch",
#    "numpy",
#    "pyarrow",
#    "requests",
#    "tiktoken",
#    "rustbpe",
# ]
# main = "parallelized_autoresearch"
# params = "--n-experiments 6 --batch-size 3 --num-shards 1"
# ///
"""Parallelized autoresearch agent — code-mode MLE agent with batched sandbox experiments."""

from __future__ import annotations

import dataclasses
from typing import Any

import flyte
import flyte.errors
import flyte.report
from flyte.ai.agents import Agent, MemoryStore, agent_progress_cb, tool

from autoresearch_types import AutoresearchOutput, DEFAULT_MAX_STEPS, DEFAULT_NUM_SHARDS, MAX_DEVICE_BATCH_SIZE, MAX_N_EMBD, MAX_N_HEAD, MAX_N_LAYER
from bundle import agent_env, build_bundle, experiment_env, materialize_cache, profile_bundle
import tools
import ui

MODEL = "claude-sonnet-4-6"
MAX_OOM_RETRIES = 3

async def _run_experiment_body(
    title: str,
    time_budget_sec: int,
    memory_key: str,
) -> dict:
    """Execute one sandbox training run (no OOM retry — used as a mapped sub-task)."""
    train_py = await tools.load_train_code(memory_key, title)
    config_overrides = await tools.load_config_overrides(memory_key, title)
    duplicate = await tools.check_duplicate_config(memory_key, title, train_py, config_overrides)
    if duplicate:
        result = {
            "success": False,
            "title": title,
            "error": (
                f"Duplicate config of '{duplicate['duplicate_of']}' "
                f"(signature {duplicate['config_signature']}); change train.py or overrides."
            ),
            "duplicate_of": duplicate["duplicate_of"],
        }
        await tools.record_experiment_result(
            memory_key,
            result,
            actor="parallelized-autoresearch",
        )
        return result
    bundle = await build_bundle()
    cache_dir = await materialize_cache(bundle)
    result = await tools.run_train_in_sandbox(
        cache_dir,
        train_py,
        title=title,
        time_budget_sec=time_budget_sec,
        config_overrides=config_overrides or None,
    )
    if result.get("success"):
        await tools.record_promising_run(memory_key, title, result)
        await tools.register_config_signature(
            memory_key,
            title,
            train_py,
            config_overrides,
            actor="parallelized-autoresearch",
        )
    await tools.record_experiment_result(
        memory_key,
        result,
        actor="parallelized-autoresearch",
    )
    return result

@experiment_env.task
async def run_experiment_body(
    title: str,
    time_budget_sec: int = 45,
    memory_key: str = tools.MEMORY_KEY_FANOUT,
) -> dict:
    """Run one edited ``train.py`` inside a sandbox (mapped sub-task)."""
    return await _run_experiment_body(title, time_budget_sec, memory_key)

@experiment_env.task
async def run_experiment(
    title: str,
    time_budget_sec: int = 45,
    memory_key: str = tools.MEMORY_KEY_FANOUT,
) -> dict:
    """Train using agent-edited ``train.py`` with platform OOM self-healing.

    Safe to call directly or via ``flyte_map("run_experiment", titles, ...)``.
    """
    resources = tools.RESOURCE_FLOOR
    attempt = 0
    while True:
        try:
            result = await run_experiment_body.override(resources=resources).aio(
                title=title,
                time_budget_sec=time_budget_sec,
                memory_key=memory_key,
            )
        except flyte.errors.OOMError:
            if attempt >= MAX_OOM_RETRIES:
                raise
            resources = tools.bump_memory(resources)
            attempt += 1
            flyte.logger.warning(
                "run_experiment Flyte OOM for %s; retry memory=%s",
                title,
                resources.memory,
            )
            continue

        if isinstance(result, dict):
            result["resources"] = f"cpu={resources.cpu}, mem={resources.memory}"
            result["oom_retries"] = attempt

        if isinstance(result, dict) and result.get("oom"):
            if attempt >= MAX_OOM_RETRIES:
                return result
            resources = tools.bump_memory(resources)
            attempt += 1
            flyte.logger.warning(
                "run_experiment sandbox OOM for %s; retry memory=%s",
                title,
                resources.memory,
            )
            continue

        return result

@tool
@agent_env.task
async def run_experiment_batch(
    titles: list[str],
    time_budget_sec: int = 45,
    memory_key: str = tools.MEMORY_KEY_FANOUT,
    concurrency: int = 4,
    batch_id: str = "",
) -> dict:
    """Run multiple ``run_experiment`` calls in parallel via ``flyte.map``.

    Prefer this over hand-rolling ``flyte_map`` when you already have a list of
    experiment titles with saved ``train.py`` edits.

    Args:
        titles: Experiment titles whose code was saved with ``edit_train_code`` or
        ``edit_train_code_batch``.
        time_budget_sec: Wall-clock budget passed to each run.
        memory_key: Memory namespace from your directive.
        concurrency: Max parallel sandbox runs (default 4).
        batch_id: Optional label attached to the returned batch metadata.

    Returns:
        A dict with ``batch_size``, ``titles``, ``results``, and ``evaluation``
        (from :func:`evaluate_batch_results`).
    """
    group = batch_id or f"batch-{len(titles)}"
    payload = await tools.run_experiment_batch_impl(
        run_experiment,
        titles,
        time_budget_sec=time_budget_sec,
        memory_key=memory_key,
        concurrency=concurrency,
        group_name=group,
    )
    payload["evaluation"] = tools.evaluate_batch_results_impl(payload["results"], batch_id=batch_id)
    await tools.persist_run_results_to_leaderboard(memory_key, payload["results"])
    return payload

INSTRUCTIONS = f"""\
You are a senior ML-engineer agent running karpathy/autoresearch-style research by
**editing train.py** and **batching parallel experiments**. Your goal: MINIMIZE
val_bpb (LOWER is better).

You operate in CODE MODE. Each turn, write ONE ```python``` block that calls the
available functions, OR reply in plain text when finished. The last expression in
your code block is returned as the observation.

Core tools (same as the single-threaded code-edit agent):
- get_code_edit_history — **call first on resumed sessions**: prior edits, val_bpb, vs-best deltas
- get_baseline_train_code, edit_train_code, edit_train_code_batch, read_train_code, get_promising_code
- inspect_dataset, search_arxiv
- record_hypothesis, get_leaderboard, compare_experiments
- run_experiment — one sandbox training run (OOM-healed by the platform)

Saving edits (required for visible diffs and distinct runs):
- **Batch 1 only:** you may use ``config_overrides`` for a quick architecture/LR sweep via
  ``edit_train_code_batch(edits=[{{"title": "...", "config_overrides": {{"n_layer": 6}}, "change_summary": "..."}}])``.
- **Batch 2 and later:** every edit must include a **substantive ``train_py`` change**
  (learning-rate schedule, optimizer/weight_decay, grad clipping, warmup, etc.).
  ``config_overrides`` alone is **rejected** after the first batch — fork with
  ``parent_title`` and edit the training loop in ``train_py``.
- ``config_overrides`` fields: ``n_layer``, ``n_head``, ``n_embd``, ``dropout``,
  ``device_batch_size``, ``learning_rate``, ``time_budget_sec``, ``max_steps``.
- To fork a winner: set ``parent_title`` to the best title, then edit ``train_py``.
- Do **not** save baseline ``train.py`` without overrides — the platform rejects identical edits.
- Duplicate configs (same effective train.py + overrides) are rejected at run time.

Training budget (fair comparison across architectures):
- Default **max_steps={DEFAULT_MAX_STEPS}** with **time_budget_sec=45** as a safety cap.
  All models train for the same step count unless they hit the wall-clock limit.
- Check ``steps`` in batch results — if a run stopped early on time, the model may be too large.

Batch / fan-out tools:
- record_batch_plan(batch_id, experiments) — persist a multi-experiment plan
- get_batch_plan(batch_id) — reload a plan
- record_batch_hypotheses(experiments) — write hypotheses for every title in a batch
- edit_train_code_batch(edits) — save all ``train.py`` edits in one memory transaction
- run_experiment_batch(titles, concurrency=...) — parallel ``flyte.map`` over runs
- evaluate_batch_results(results, batch_id=...) — rank successes vs failures

Parallel fan-out in code:
- After saving edits, you may call ``run_experiment_batch(titles, ...)`` OR
  ``flyte_map("run_experiment", titles, budgets, keys, concurrency=N)`` where
  budgets/keys are lists matching titles.

Typical batch loop (aim for **≤8 code turns** before your plain-text summary):
0. If prior research exists in your directive, ``get_code_edit_history()`` then
   ``read_train_code(best_title)`` before planning new batches.
1. Turn 1: ``get_baseline_train_code()`` + ``inspect_dataset()``.
2. Turn 2: ``record_batch_plan`` then ``edit_train_code_batch(edits=[...])`` for the whole batch.
3. Turn 3: ``record_batch_hypotheses`` + ``run_experiment_batch(titles, concurrency=...)``.
4. Turn 4+: fork winners into the next batch with **train.py** edits, or reply in plain text when done.

Batch diversity (required):
- Every title in a batch must test a **distinct hypothesis** — no duplicate configs or renames.
- **Spread axes across the batch**: e.g. one edit tweaks depth/width, another changes the
  **training loop** (cosine LR, AdamW betas, weight decay), another regularization or batch size.
- Avoid LR micro-sweeps (±30% of the current best LR) after batch 1 — those rarely beat a plateau.
- Vary **one or two knobs per edit**; state the change in ``change_summary`` and
  ``record_batch_hypotheses``.
- Use ``evaluate_batch_results`` to see **which axis** helped, then explore under-tested axes.

Plateau rule (required):
- If **3 consecutive batches** fail to beat the global best val_bpb by more than **0.01**,
  stop hyperparameter micro-sweeps. Switch to **training-loop code edits** in ``train.py``
  (scheduler, optimizer, regularization, data/loss changes).

Rules:
- Prefer ``edit_train_code_batch`` over repeated ``edit_train_code`` when saving 2+ titles.
- Every edit must keep ``run_training(config: ExperimentConfig) -> ExperimentResult``.
- Do NOT size compute — the platform right-sizes and retries OOM per run.
- Workshop limits: n_layer<={MAX_N_LAYER}, n_embd<={MAX_N_EMBD}, n_head<={MAX_N_HEAD},
  device_batch_size<={MAX_DEVICE_BATCH_SIZE}, seq_len=512.
- Prefer ``run_experiment_batch`` over hand-written ``flyte_map`` unless you need it.
- Monty sandbox: no imports, no dict mutation, no augmented assignment (`+=`).
- **Always finish with plain text (no code block)** once you have results to report.
"""

DEFAULT_MAX_TURNS = 50

def build_fanout_agent(*, max_turns: int = DEFAULT_MAX_TURNS) -> Agent:
    """Construct the fan-out agent (``code_mode=True``) with a configurable turn budget."""
    return Agent(
        name="parallelized-autoresearch",
        instructions=INSTRUCTIONS,
        model=MODEL,
        tools=[
            tools.search_arxiv,
            tools.inspect_dataset,
            tools.get_baseline_train_code,
            tools.get_code_edit_history,
            tools.edit_train_code,
            tools.edit_train_code_batch,
            tools.read_train_code,
            tools.get_promising_code,
            tools.record_hypothesis,
            tools.get_leaderboard,
            tools.compare_experiments,
            tools.record_batch_plan,
            tools.get_batch_plan,
            tools.record_batch_hypotheses,
            run_experiment,
            run_experiment_batch,
            tools.evaluate_batch_results,
        ],
        code_mode=True,
        max_turns=max_turns,
        call_llm=tools.call_llm,
    )

agent = build_fanout_agent()

# {{docs-fragment agent}}
@agent_env.task(report=True)
async def parallelized_autoresearch(
    n_experiments: int = 6,
    num_shards: int = DEFAULT_NUM_SHARDS,
    memory_key: str = tools.MEMORY_KEY_FANOUT,
    batch_size: int = 3,
    max_turns: int = DEFAULT_MAX_TURNS,
) -> AutoresearchOutput:
    """Drive the fan-out code-edit MLE agent with sandbox batch execution."""
    bundle = await build_bundle(num_shards=num_shards)
    profile = await profile_bundle(bundle)

    memory = await MemoryStore.get_or_create.aio(key=memory_key)
    persisted = await memory.read_json.aio("memory/leaderboard.json", default=[])
    promising = await memory.read_json.aio("memory/promising_code.json", default=[])
    history = await tools.load_research_history(memory_key)
    flyte.logger.info(
        "Fan-out agent restored %d messages, %d experiments, %d promising edits, best val_bpb=%s.",
        len(memory.messages),
        len(persisted),
        len(promising),
        history.get("best_val_bpb"),
    )

    events: list[dict[str, Any]] = []

    async def on_event(ev) -> None:
        events.append({"type": ev.type, "data": ev.data})
        if ev.type in ("tool_start", "tool_end", "tool_error", "turn_start", "agent_end"):
            tab = flyte.report.get_tab("Activity")
            tab.replace(ui.render_activity_log(events))
            await flyte.report.flush.aio()
        if ev.type == "tool_end" and ev.data.get("tool") in (
            "edit_train_code",
            "edit_train_code_batch",
            "<sandbox>",
        ):
            edits = await tools.load_saved_code_edits(memory_key)
            if edits:
                flyte.report.get_tab("Code edits").replace(ui.render_code_edits_panel(edits))
                await flyte.report.flush.aio()

    directive_text = ui.directive_code_edit_fanout(
        n_experiments,
        profile,
        memory_key,
        batch_size=batch_size,
        history=history,
    )

    token = agent_progress_cb.set(on_event)
    run_agent = build_fanout_agent(max_turns=max_turns)
    try:
        result = await run_agent.run.aio(directive_text, memory=memory)
    finally:
        agent_progress_cb.reset(token)

    leaderboard, best = ui.parse_leaderboard(
        memory.messages,
        promising_fallback=promising,
    )
    leaderboard_dicts = [dataclasses.asdict(e) for e in leaderboard]
    code_edits = await tools.load_saved_code_edits(memory_key)

    tab_lb = flyte.report.get_tab("Leaderboard")
    tab_lb.replace(ui.render_leaderboard(leaderboard, best))

    flyte.report.get_tab("Code edits").replace(
        ui.render_code_edits_panel(code_edits, best_title=best.title if best else None)
    )

    await memory.write_json.aio(
        "memory/leaderboard.json",
        leaderboard_dicts,
        actor="parallelized-autoresearch",
        reason=f"leaderboard after {len(leaderboard)} experiments",
    )
    await memory.save.aio()
    audit = await memory.audit_tail(20)
    hypotheses = await memory.read_json.aio("memory/hypotheses.json", default=[])
    promising = await memory.read_json.aio("memory/promising_code.json", default=[])

    tab_mem = flyte.report.get_tab("Memory")
    tab_mem.replace(
        ui.render_memory_panel(
            memory_key,
            len(memory.messages),
            leaderboard_dicts,
            audit,
            hypotheses,
            persisted_promising=promising,
            code_edits=code_edits,
        )
    )

    summary_body = result.summary or result.error or ""
    if result.error and leaderboard:
        best_line = f" Best val_bpb so far: {best.val_bpb} ({best.title})." if best and best.val_bpb else ""
        summary_body = f"{result.error}{best_line}"

    await flyte.report.replace.aio(
        ui.render_summary(
            directive_text,
            leaderboard,
            best,
            summary_body,
            code_edits=code_edits,
        )
    )
    await flyte.report.flush.aio()

    return AutoresearchOutput(
        directive=directive_text,
        dataset_profile=profile,
        best=best,
        leaderboard=leaderboard,
        summary=summary_body,
        memory_key=memory_key,
        total_experiments=len(leaderboard),
    )

# {{/docs-fragment agent}}

# {{docs-fragment main}}
if __name__ == "__main__":
    import argparse
    import asyncio
    import os

    parser = argparse.ArgumentParser(description="Parallelized autoresearch agent (CODE MODE)")
    parser.add_argument("--n-experiments", type=int, default=6)
    parser.add_argument("--batch-size", type=int, default=3)
    parser.add_argument("--max-turns", type=int, default=DEFAULT_MAX_TURNS)
    parser.add_argument("--num-shards", type=int, default=DEFAULT_NUM_SHARDS)
    parser.add_argument("--memory-key", default=tools.MEMORY_KEY_FANOUT)
    parser.add_argument(
        "--config",
        default=os.environ.get("FLYTE_CONFIG", os.path.expanduser("~/.flyte/config.yaml")),
    )
    args = parser.parse_args()

    flyte.init_from_config(args.config, image_builder="remote")

    async def main() -> None:
        run = await flyte.with_runcontext(copy_style="all").run.aio(
            parallelized_autoresearch,
            n_experiments=args.n_experiments,
            num_shards=args.num_shards,
            memory_key=args.memory_key,
            batch_size=args.batch_size,
            max_turns=args.max_turns,
        )
        print(f"View run at: {run.url}")

    asyncio.run(main())
# {{/docs-fragment main}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/parallelized_autoresearch/parallelized_autoresearch.py*

## Run the agent

### Create secrets

Register an Anthropic API key for the agent LLM calls:

```
flyte create secret internal-anthropic-api-key <YOUR_ANTHROPIC_API_KEY>
```

### Run remotely

From the [example directory](https://github.com/unionai/unionai-examples/tree/main/v2/tutorials/parallelized_autoresearch):

```
cd v2/tutorials/parallelized_autoresearch
uv run --script parallelized_autoresearch.py -- --n-experiments 6 --batch-size 3 --num-shards 1
```

Use `--memory-key` to resume a prior research session (default: `parallelized-autoresearch`). Pass a unique key — for example `parallelized-autoresearch-20260622-215057` — to start with empty memory. Code mode needs more turns than JSON tool mode — increase `--max-turns` for larger sweeps.

Or invoke the agent task directly with `flyte run` (snake_case task inputs):

```
flyte run parallelized_autoresearch.py parallelized_autoresearch \
  --n_experiments 6 --batch_size 3 --num_shards 1 --max_turns 12 \
  --memory_key parallelized-autoresearch
```

> [!NOTE]
> The first run downloads climbmix data shards and trains a BPE tokenizer. Subsequent runs reuse cached bundle tasks.

See also the single-task [Autoresearch agent](https://www.union.ai/docs/v2/union/tutorials/agents/autoresearch/_index) tutorial for the Claude Code + pull-request workflow.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/tutorials/agents/parallelized-autoresearch-agent/_index.md
**HTML**: https://www.union.ai/docs/v2/union/tutorials/agents/parallelized-autoresearch-agent/
