# AutoSec researcher agent

> [!NOTE]
> Code available [here](https://github.com/unionai/unionai-examples/tree/main/v2/tutorials/autosec_research_agent).

This tutorial demonstrates an autonomous security-research agent on Flyte. The pipeline fans out across bundled C source files (each with a planted memory-corruption bug), runs static analysis, uses a `flyte.ai.agents.Agent` to hypothesize vulnerabilities, builds proof-of-concept payloads, and validates exploits inside an on-device [unionai-sandbox](https://www.union.ai/docs/v2/union/user-guide/sandboxing/_index) user-namespace session.

Flyte provides:

- **Parallel fan-out** across every target file with `asyncio.gather`
- **Self-healing tasks** — LLM timeouts, malformed JSON, and OOM during static analysis retry with bounded resources
- **Sandbox isolation** — PoC compilation and execution never runs on the orchestration node
- **Live HTML reports** with per-target detail tabs in the Flyte UI

> [!WARNING]
> This example analyzes deliberately vulnerable C code and runs generated exploit payloads in a sandbox. Use it only in controlled environments.

## Define the task environment

The agent needs an Anthropic API key and a container image with `gcc` for sandbox compilation.

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.4.0",
#    "unionai-sandbox",
#    "litellm",
# ]
# main = "run_autosec_agent"
# params = ""
# ///
"""AutoSec researcher agent — parallel vulnerability analysis with sandbox PoC validation."""

from __future__ import annotations

import asyncio
import html
import json
import os
import pathlib
import re
from typing import Any

import flyte
import flyte.errors
import flyte.report
from flyte.ai.agents import Agent

HERE = pathlib.Path(__file__).parent
TARGETS_DIR = HERE / "targets"
MODEL = os.getenv("AUTOSEC_MODEL", "claude-haiku-4-5")

# {{docs-fragment env}}
main_img = flyte.Image.from_uv_script(__file__, name="autosec-research-agent", pre=True).with_apt_packages("gcc")

env = flyte.TaskEnvironment(
    name="autosec-research-agent",
    image=main_img,
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    include=[str(TARGETS_DIR)],
    secrets=[
        flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY"),
    ],
)
# {{/docs-fragment env}}

def _attempt() -> int:
    tc = flyte.ctx()
    return tc.attempt_number if tc is not None else 0

def _force(flag: str) -> bool:
    return bool(os.getenv(flag) or os.getenv("AUTOSEC_FORCE_ALL"))

def _extract_json(text: str) -> dict[str, Any]:
    match = re.search(r"\{.*\}", text, re.DOTALL)
    if not match:
        raise ValueError(f"no JSON object in model reply: {text[:200]!r}")
    blob = match.group(0)
    try:
        return json.loads(blob)
    except json.JSONDecodeError:
        fixed = re.sub(r'\\(?!["\\/bfnrtu])', r"\\\\", blob)
        return json.loads(fixed)

# --- Stage 1: static analysis (CPU, OOM-prone) ------------------------------
@env.task(retries=2, timeout=30)
async def scan_static(source: str, scope: str = "whole") -> str:
    """Cheap stand-in for whole-program analysis (Joern/CodeQL in the real system)."""
    try:
        if scope == "whole" and _force("AUTOSEC_FORCE_OOM") and _attempt() == 0:
            raise flyte.errors.OOMError("whole-program graph exceeded memory limit")
        findings = _grep_dangerous_calls(source)
        return findings or "(no dangerous-call sites found)"
    except flyte.errors.OOMError as exc:
        print(f"[scan_static] {exc}; escalating resources + narrowing scope")
        return await scan_static.override(
            short_name="scan_static_more_resources", resources=flyte.Resources(cpu=2, memory="4Gi")
        )(source, scope="file")

def _grep_dangerous_calls(source: str) -> str:
    hits = []
    for i, line in enumerate(source.splitlines(), start=1):
        for fn in ("strcpy", "strcat", "sprintf", "gets", "memcpy"):
            if fn in line:
                hits.append(f"L{i}: {fn} -> {line.strip()}")
    return "\n".join(hits)

# --- Stage 2: hypothesize the vulnerability (LLM via Agent) ------------------
ANALYSIS_INSTRUCTIONS = """\
You are a vulnerability researcher. Your job is to determine whether a given \
C source file contains an exploitable memory-corruption bug reachable from argv.

You have access to these tools during your analysis:
- scan_static: Run static analysis on the source to find dangerous function calls.
- build_poc: Build a proof-of-concept payload (do not call during analysis).
- validate_in_sandbox: Compile and run the target with a PoC input (do not call during analysis).

Focus on analyzing the source and the provided static analysis findings. Call \
scan_static only if you need additional details about dangerous function usage.

Reply with ONLY a JSON object (no prose, no markdown fences):
If vulnerable: {"vulnerable": true, "function": str, \
"buffer_size": int (bytes of the overflowable buffer), "vuln_class": str, \
"reasoning": str}.
If the code looks safe (bounded copies, length checks, snprintf/strlcpy, \
etc.): {"vulnerable": false, "reasoning": str}.
"""

# --- Stage 3: build a proof-of-concept --------------------------------------
@env.task(retries=2, timeout=90)
async def build_poc(hypothesis: dict) -> dict:
    buffer_size = int(hypothesis.get("buffer_size", 64))
    payload_len = buffer_size + 64
    return {
        "payload_len": payload_len,
        "payload_repr": f'"A" * {payload_len}',
        "target_function": hypothesis.get("function", "greet"),
    }

# --- Stage 4: validate in an on-device sandbox -------------------------------
@env.task(retries=2, timeout=300)
async def validate_in_sandbox(source: str, poc: dict) -> dict:
    """Compile + run the target with the PoC input inside an on-device sandbox.

    The exploit code runs in a user-namespace sandbox on the same machine, never
    on the Flyte orchestration node (SPEC §2.6 / §7). The session is torn down
    in __aexit__ regardless of outcome (SPEC VD-5) so a stuck or failed run
    cannot leak resources.
    """
    import tempfile

    from union import sandbox as sb

    with tempfile.TemporaryDirectory() as work:
        async with sb.on_device.session(host_work_dir=work, backend="userns") as sbx:
            await sbx.put_bytes(f"{work}/target.c", source.encode())

            compile_proc = await sbx.run(
                f"gcc -fno-stack-protector -w -o {work}/target {work}/target.c",
                stdout=True,
                stderr=True,
                timeout_s=60,
            )
            compile_out, compile_err = await compile_proc.communicate_text()
            log = compile_out + compile_err
            if "error" in log.lower():
                return {
                    "triggered": False,
                    "sandbox_exit_code": -1,
                    "log": f"COMPILE_FAILED\n{log}",
                }

            payload = "A" * int(poc["payload_len"])
            run_proc = await sbx.run(
                f"{work}/target {payload}",
                stdout=True,
                stderr=True,
                timeout_s=60,
            )
            run_out, run_err = await run_proc.communicate_text()
            log = run_out + "\n" + run_err
            triggered = "SIGSEGV" in log

            return {
                "triggered": bool(triggered),
                "sandbox_exit_code": getattr(run_proc, "returncode", 0),
                "log": log,
            }

# --- Agent + hypothesize task (depends on all tools above) ------------------
hypothesis_agent = Agent(
    name="autosec-hypothesis",
    instructions=ANALYSIS_INSTRUCTIONS,
    model=MODEL,
    tools=[scan_static, build_poc, validate_in_sandbox],
    max_turns=6,
)

@env.task(retries=3, timeout=20)
async def hypothesize(source: str, static_findings: str) -> dict:
    prompt = (
        "Analyze this C source file for memory-corruption vulnerabilities.\n\n"
        f"SOURCE:\n{source}\n\nDANGEROUS CALLS:\n{static_findings}\n"
    )

    # Beat A: hang on the first attempt -> task timeout -> retry.
    timeout_on = _force("AUTOSEC_FORCE_LLM_TIMEOUT") and _attempt() == 0
    bad_on = _force("AUTOSEC_FORCE_BAD_TOOL_CALL")

    if timeout_on:
        await asyncio.sleep(600)

    result = await hypothesis_agent.run.aio(prompt, memory=[])
    raw = result.summary or ""

    # Beat B: simulate a hallucinated/malformed tool call. When the timeout beat
    # is also active it consumes attempt 0, so defer this to attempt 1 — that way
    # both beats are actually demonstrated in a single run (e.g. AUTOSEC_FORCE_ALL).
    bad_attempt = 1 if _force("AUTOSEC_FORCE_LLM_TIMEOUT") else 0
    if bad_on and _attempt() == bad_attempt:
        raw = "Sure! The bug is somewhere around here, trust me."

    hyp = _extract_json(raw)
    if "vulnerable" not in hyp:
        hyp["vulnerable"] = "buffer_size" in hyp
    if hyp.get("vulnerable") and "buffer_size" not in hyp:
        raise ValueError(f"vulnerable hypothesis missing buffer_size: {hyp}")
    return hyp

# --- Orchestration ----------------------------------------------------------
def _load_targets() -> dict[str, str]:
    return {p.name: p.read_text() for p in sorted(TARGETS_DIR.glob("*.c"))}

@env.task
async def analyze_target(name: str, source: str) -> dict:
    findings = await scan_static(source)
    hypothesis = await hypothesize(source, findings)

    if not hypothesis.get("vulnerable"):
        poc: dict = {}
        verdict = {"triggered": False, "skipped": True}
    else:
        poc = await build_poc(hypothesis)
        verdict = await validate_in_sandbox(source, poc)

    return {
        "target": name,
        "static_findings": findings,
        "hypothesis": hypothesis,
        "poc": poc,
        "verdict": verdict,
    }

_REPORT_CSS = """
<style>
  .autosec { --bg:#ffffff; --card:#f7f8fa; --line:#e3e7ec; --muted:#5b6675;
    --text:#1b2330; --red:#c0392b; --amber:#b6791f; --green:#1e7e34; --accent:#1f6feb;
    font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,sans-serif;
    background:var(--bg); color:var(--text); padding:24px; border-radius:12px;
    border:1px solid var(--line); }
  .autosec h2 { margin:0 0 4px; font-size:20px; letter-spacing:.2px; }
  .autosec .sub { color:var(--muted); font-size:13px; margin:0 0 20px; }
  .autosec .cards { display:flex; gap:12px; flex-wrap:wrap; margin-bottom:22px; }
  .autosec .card { background:var(--card); border:1px solid var(--line);
    border-radius:10px; padding:14px 18px; min-width:120px; }
  .autosec .card .n { font-size:26px; font-weight:700; line-height:1; }
  .autosec .card .l { color:var(--muted); font-size:12px; margin-top:6px;
    text-transform:uppercase; letter-spacing:.6px; }
  .autosec table { width:100%; table-layout:fixed; border-collapse:collapse; font-size:13px;
    background:#fff; border:1px solid var(--line); border-radius:10px; overflow:hidden; }
  .autosec th, .autosec td { overflow-wrap:anywhere; }
  .autosec thead th { background:#eef1f5; color:var(--muted); text-align:left;
    font-weight:600; font-size:11px; text-transform:uppercase; letter-spacing:.6px;
    padding:11px 14px; border-bottom:1px solid var(--line); }
  .autosec tbody td { padding:11px 14px; border-bottom:1px solid var(--line);
    vertical-align:top; }
  .autosec tbody tr:last-child td { border-bottom:none; }
  .autosec tbody tr:nth-child(even) { background:#fafbfc; }
  .autosec tbody tr:hover { background:#eef4ff; }
  .autosec code { background:#eef1f5; border:1px solid var(--line); border-radius:5px;
    padding:1px 6px; font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:12px; }
  .autosec .num { text-align:right; font-variant-numeric:tabular-nums; }
  .autosec .reason { color:var(--muted); line-height:1.45; }
  .autosec .badge { display:inline-block; padding:3px 10px; border-radius:999px;
    font-size:11px; font-weight:700; letter-spacing:.4px; white-space:nowrap; }
  .autosec .b-exploited { background:rgba(192,57,43,.10); color:var(--red);
    border:1px solid rgba(192,57,43,.35); }
  .autosec .b-vuln { background:rgba(182,121,31,.12); color:var(--amber);
    border:1px solid rgba(182,121,31,.35); }
  .autosec .b-secure { background:rgba(30,126,52,.10); color:var(--green);
    border:1px solid rgba(30,126,52,.35); }
  .autosec col.c-target  { width:15%; }
  .autosec col.c-status  { width:11%; }
  .autosec col.c-class   { width:11%; }
  .autosec col.c-fn      { width:11%; }
  .autosec col.c-buf     { width:8%; }
  .autosec col.c-payload { width:8%; }
  .autosec col.c-exit    { width:6%; }
  .autosec col.c-reason  { width:30%; }
  .autosec .kv { display:flex; flex-wrap:wrap; gap:12px 28px; margin:16px 0 4px; }
  .autosec .kv .k { color:var(--muted); font-size:11px; text-transform:uppercase; letter-spacing:.6px; }
  .autosec .kv .v { font-weight:600; font-size:14px; margin-top:3px; }
  .autosec .section-label { font-size:11px; text-transform:uppercase; letter-spacing:.6px;
    color:var(--muted); margin:22px 0 7px; }
  .autosec .reason-block { background:var(--card); border:1px solid var(--line);
    border-radius:8px; padding:13px 15px; line-height:1.5; font-size:13px; }
  .autosec pre.code { background:#f6f8fa; border:1px solid var(--line); border-radius:8px;
    padding:14px 16px; overflow:auto; font-family:ui-monospace,SFMono-Regular,Menlo,monospace;
    font-size:12.5px; line-height:1.5; color:#1b2330; margin:0; }
  .autosec .subtabs > input[type=radio] { position:absolute; opacity:0; pointer-events:none; }
  .autosec .subnav { display:flex; flex-wrap:wrap; gap:4px; border-bottom:1px solid var(--line);
    margin:8px 0 18px; }
  .autosec .subnav label { display:inline-flex; align-items:center; gap:8px; padding:8px 14px;
    font-size:12.5px; cursor:pointer; border:1px solid transparent; border-bottom:none;
    border-radius:8px 8px 0 0; color:var(--muted); margin-bottom:-1px; }
  .autosec .subnav label:hover { background:var(--card); color:var(--text); }
  .autosec .subnav .dot { width:8px; height:8px; border-radius:50%; }
  .autosec .dot.b-exploited { background:var(--red); }
  .autosec .dot.b-vuln { background:var(--amber); }
  .autosec .dot.b-secure { background:var(--green); }
  .autosec .panel { display:none; }
</style>
"""

def _status(finding: dict) -> tuple[str, str]:
    hyp = finding.get("hypothesis") or {}
    verdict = finding.get("verdict") or {}
    if not hyp.get("vulnerable"):
        return "b-secure", "SECURE"
    if verdict.get("triggered"):
        return "b-exploited", "EXPLOITED"
    return "b-vuln", "VULNERABLE"

def _render_report_html(findings: list[dict]) -> str:
    exploited = sum(1 for f in findings if (f.get("verdict") or {}).get("triggered"))
    vulnerable = sum(1 for f in findings if (f.get("hypothesis") or {}).get("vulnerable"))
    secure = len(findings) - vulnerable

    rows = []
    for f in sorted(findings, key=lambda x: x["target"]):
        hyp = f.get("hypothesis") or {}
        verdict = f.get("verdict") or {}
        cls, label = _status(f)
        is_vuln = bool(hyp.get("vulnerable"))
        vuln_class = hyp.get("vuln_class", "\u2014") if is_vuln else "\u2014"
        fn = hyp.get("function", "\u2014") if is_vuln else "\u2014"
        buf = hyp.get("buffer_size", "\u2014") if is_vuln else "\u2014"
        payload = (f.get("poc") or {}).get("payload_len", "\u2014") if is_vuln else "\u2014"
        exit_code = verdict.get("sandbox_exit_code", "\u2014")
        rows.append(
            "<tr>"
            f"<td><code>{html.escape(str(f['target']))}</code></td>"
            f'<td><span class="badge {cls}">{label}</span></td>'
            f"<td>{html.escape(str(vuln_class))}</td>"
            f"<td><code>{html.escape(str(fn))}</code></td>"
            f'<td class="num">{html.escape(str(buf))}</td>'
            f'<td class="num">{html.escape(str(payload))}</td>'
            f'<td class="num">{html.escape(str(exit_code))}</td>'
            f'<td class="reason">{html.escape(str(hyp.get("reasoning", "")))}</td>'
            "</tr>"
        )

    return f"""{_REPORT_CSS}
    <div class="autosec">
      <h2>AutoSec &middot; security findings report</h2>
      <p class="sub">{len(findings)} target(s) analyzed in parallel &middot; PoCs validated in isolated sandbox.</p>
      <div class="cards">
        <div class="card"><div class="n">{len(findings)}</div><div class="l">Targets</div></div>
        <div class="card"><div class="n" style="color:#ff6b6b">{exploited}</div><div class="l">Exploited</div></div>
        <div class="card">
          <div class="n" style="color:#ffb454">{vulnerable - exploited}</div>
          <div class="l">Vuln, PoC failed</div>
        </div>
        <div class="card"><div class="n" style="color:#3fb950">{secure}</div><div class="l">Secure</div></div>
      </div>
      <table>
        <colgroup>
          <col class="c-target"><col class="c-status"><col class="c-class"><col class="c-fn">
          <col class="c-buf"><col class="c-payload"><col class="c-exit"><col class="c-reason">
        </colgroup>
        <thead>
          <tr>
            <th>Target</th><th>Status</th><th>Vuln class</th><th>Function</th>
            <th class="num">Buffer&nbsp;(B)</th><th class="num">Payload&nbsp;(B)</th>
            <th class="num">Exit</th><th>Reasoning</th>
          </tr>
        </thead>
        <tbody>{"".join(rows)}</tbody>
      </table>
    </div>
    """

def _target_detail_html(finding: dict, source: str) -> str:
    hyp = finding.get("hypothesis") or {}
    verdict = finding.get("verdict") or {}
    cls, label = _status(finding)
    is_vuln = bool(hyp.get("vulnerable"))

    def cell(k: str, v: Any) -> str:
        return f'<div><div class="k">{html.escape(k)}</div><div class="v">{html.escape(str(v))}</div></div>'

    triggered = verdict.get("triggered")
    verdict_txt = "skipped (secure)" if verdict.get("skipped") else ("triggered" if triggered else "not triggered")
    stats = "".join(
        [
            cell("Vuln class", hyp.get("vuln_class", "\u2014") if is_vuln else "\u2014"),
            cell("Function", hyp.get("function", "\u2014") if is_vuln else "\u2014"),
            cell("Buffer (B)", hyp.get("buffer_size", "\u2014") if is_vuln else "\u2014"),
            cell("Payload (B)", (finding.get("poc") or {}).get("payload_len", "\u2014") if is_vuln else "\u2014"),
            cell("Sandbox exit", verdict.get("sandbox_exit_code", "\u2014")),
            cell("PoC", verdict_txt),
        ]
    )
    reasoning = html.escape(str(hyp.get("reasoning", "")) or "\u2014")
    code = html.escape(source or "(source unavailable)")
    return f"""
      <h3 style="margin:0 0 4px"><code>{html.escape(str(finding["target"]))}</code>
          &nbsp;<span class="badge {cls}">{label}</span></h3>
      <p class="sub">Per-target detail &middot; PoCs validated in an isolated sandbox.</p>
      <div class="kv">{stats}</div>
      <div class="section-label">Reasoning</div>
      <div class="reason-block">{reasoning}</div>
      <div class="section-label">Source</div>
      <pre class="code">{code}</pre>
    """

def _render_targets_tab_html(findings: list[dict], sources: dict[str, str]) -> str:
    ordered = sorted(findings, key=lambda x: x["target"])

    radios, nav, panels, rules = [], [], [], []
    for i, f in enumerate(ordered):
        name = f["target"]
        cls, _ = _status(f)
        checked = " checked" if i == 0 else ""
        radios.append(f'<input type="radio" name="as-targets" id="as-t{i}"{checked}>')
        nav.append(f'<label for="as-t{i}"><span class="dot {cls}"></span><code>{html.escape(str(name))}</code></label>')
        panels.append(f'<div class="panel" id="as-p{i}">{_target_detail_html(f, sources.get(name, ""))}</div>')
        rules.append(
            f'.autosec #as-t{i}:checked ~ .subnav label[for="as-t{i}"]'
            "{background:#fff;color:var(--text);border-color:var(--line);font-weight:600;}"
            f".autosec #as-t{i}:checked ~ .panels #as-p{i}{{display:block;}}"
        )

    return f"""{_REPORT_CSS}
    <style>{"".join(rules)}</style>
    <div class="autosec">
      <h2>AutoSec &middot; target detail</h2>
      <p class="sub">{len(ordered)} target(s) &middot; select a file to see its status, reasoning, and source.</p>
      <div class="subtabs">
        {"".join(radios)}
        <div class="subnav">{"".join(nav)}</div>
        <div class="panels">{"".join(panels)}</div>
      </div>
    </div>
    """

@env.task(retries=1)
async def random_error() -> str:
    if _attempt() == 0:
        raise Exception("Random error")
    return "Passed!"

# {{docs-fragment pipeline}}
@env.task(report=True)
async def run_autosec_agent() -> dict:
    targets = _load_targets()
    if not targets:
        raise FileNotFoundError(f"no targets found under {TARGETS_DIR}")

    findings = list(await asyncio.gather(*(analyze_target(name, src) for name, src in targets.items())))

    await flyte.report.replace.aio(_render_report_html(findings))
    flyte.report.get_tab("targets").replace(_render_targets_tab_html(findings, targets))
    await flyte.report.flush.aio()

    await random_error()

    return {
        "targets_analyzed": len(findings),
        "triggered": sum(1 for f in findings if f["verdict"].get("triggered")),
        "findings": findings,
    }
# {{/docs-fragment pipeline}}

# {{docs-fragment main}}
if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(run_autosec_agent)
    print(run.url)
    run.wait()
# {{/docs-fragment main}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/autosec_research_agent/main.py*

The Python packages are declared at the top of the file using the `uv` script style:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.4.0",
#    "unionai-sandbox",
#    "litellm",
# ]
# ///
```

## Run the security pipeline

Each target flows through four stages: static scan, LLM hypothesis, PoC construction, and sandbox validation. The `run_autosec_agent` driver task analyzes all bundled targets in parallel and streams a findings report.

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.4.0",
#    "unionai-sandbox",
#    "litellm",
# ]
# main = "run_autosec_agent"
# params = ""
# ///
"""AutoSec researcher agent — parallel vulnerability analysis with sandbox PoC validation."""

from __future__ import annotations

import asyncio
import html
import json
import os
import pathlib
import re
from typing import Any

import flyte
import flyte.errors
import flyte.report
from flyte.ai.agents import Agent

HERE = pathlib.Path(__file__).parent
TARGETS_DIR = HERE / "targets"
MODEL = os.getenv("AUTOSEC_MODEL", "claude-haiku-4-5")

# {{docs-fragment env}}
main_img = flyte.Image.from_uv_script(__file__, name="autosec-research-agent", pre=True).with_apt_packages("gcc")

env = flyte.TaskEnvironment(
    name="autosec-research-agent",
    image=main_img,
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    include=[str(TARGETS_DIR)],
    secrets=[
        flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY"),
    ],
)
# {{/docs-fragment env}}

def _attempt() -> int:
    tc = flyte.ctx()
    return tc.attempt_number if tc is not None else 0

def _force(flag: str) -> bool:
    return bool(os.getenv(flag) or os.getenv("AUTOSEC_FORCE_ALL"))

def _extract_json(text: str) -> dict[str, Any]:
    match = re.search(r"\{.*\}", text, re.DOTALL)
    if not match:
        raise ValueError(f"no JSON object in model reply: {text[:200]!r}")
    blob = match.group(0)
    try:
        return json.loads(blob)
    except json.JSONDecodeError:
        fixed = re.sub(r'\\(?!["\\/bfnrtu])', r"\\\\", blob)
        return json.loads(fixed)

# --- Stage 1: static analysis (CPU, OOM-prone) ------------------------------
@env.task(retries=2, timeout=30)
async def scan_static(source: str, scope: str = "whole") -> str:
    """Cheap stand-in for whole-program analysis (Joern/CodeQL in the real system)."""
    try:
        if scope == "whole" and _force("AUTOSEC_FORCE_OOM") and _attempt() == 0:
            raise flyte.errors.OOMError("whole-program graph exceeded memory limit")
        findings = _grep_dangerous_calls(source)
        return findings or "(no dangerous-call sites found)"
    except flyte.errors.OOMError as exc:
        print(f"[scan_static] {exc}; escalating resources + narrowing scope")
        return await scan_static.override(
            short_name="scan_static_more_resources", resources=flyte.Resources(cpu=2, memory="4Gi")
        )(source, scope="file")

def _grep_dangerous_calls(source: str) -> str:
    hits = []
    for i, line in enumerate(source.splitlines(), start=1):
        for fn in ("strcpy", "strcat", "sprintf", "gets", "memcpy"):
            if fn in line:
                hits.append(f"L{i}: {fn} -> {line.strip()}")
    return "\n".join(hits)

# --- Stage 2: hypothesize the vulnerability (LLM via Agent) ------------------
ANALYSIS_INSTRUCTIONS = """\
You are a vulnerability researcher. Your job is to determine whether a given \
C source file contains an exploitable memory-corruption bug reachable from argv.

You have access to these tools during your analysis:
- scan_static: Run static analysis on the source to find dangerous function calls.
- build_poc: Build a proof-of-concept payload (do not call during analysis).
- validate_in_sandbox: Compile and run the target with a PoC input (do not call during analysis).

Focus on analyzing the source and the provided static analysis findings. Call \
scan_static only if you need additional details about dangerous function usage.

Reply with ONLY a JSON object (no prose, no markdown fences):
If vulnerable: {"vulnerable": true, "function": str, \
"buffer_size": int (bytes of the overflowable buffer), "vuln_class": str, \
"reasoning": str}.
If the code looks safe (bounded copies, length checks, snprintf/strlcpy, \
etc.): {"vulnerable": false, "reasoning": str}.
"""

# --- Stage 3: build a proof-of-concept --------------------------------------
@env.task(retries=2, timeout=90)
async def build_poc(hypothesis: dict) -> dict:
    buffer_size = int(hypothesis.get("buffer_size", 64))
    payload_len = buffer_size + 64
    return {
        "payload_len": payload_len,
        "payload_repr": f'"A" * {payload_len}',
        "target_function": hypothesis.get("function", "greet"),
    }

# --- Stage 4: validate in an on-device sandbox -------------------------------
@env.task(retries=2, timeout=300)
async def validate_in_sandbox(source: str, poc: dict) -> dict:
    """Compile + run the target with the PoC input inside an on-device sandbox.

    The exploit code runs in a user-namespace sandbox on the same machine, never
    on the Flyte orchestration node (SPEC §2.6 / §7). The session is torn down
    in __aexit__ regardless of outcome (SPEC VD-5) so a stuck or failed run
    cannot leak resources.
    """
    import tempfile

    from union import sandbox as sb

    with tempfile.TemporaryDirectory() as work:
        async with sb.on_device.session(host_work_dir=work, backend="userns") as sbx:
            await sbx.put_bytes(f"{work}/target.c", source.encode())

            compile_proc = await sbx.run(
                f"gcc -fno-stack-protector -w -o {work}/target {work}/target.c",
                stdout=True,
                stderr=True,
                timeout_s=60,
            )
            compile_out, compile_err = await compile_proc.communicate_text()
            log = compile_out + compile_err
            if "error" in log.lower():
                return {
                    "triggered": False,
                    "sandbox_exit_code": -1,
                    "log": f"COMPILE_FAILED\n{log}",
                }

            payload = "A" * int(poc["payload_len"])
            run_proc = await sbx.run(
                f"{work}/target {payload}",
                stdout=True,
                stderr=True,
                timeout_s=60,
            )
            run_out, run_err = await run_proc.communicate_text()
            log = run_out + "\n" + run_err
            triggered = "SIGSEGV" in log

            return {
                "triggered": bool(triggered),
                "sandbox_exit_code": getattr(run_proc, "returncode", 0),
                "log": log,
            }

# --- Agent + hypothesize task (depends on all tools above) ------------------
hypothesis_agent = Agent(
    name="autosec-hypothesis",
    instructions=ANALYSIS_INSTRUCTIONS,
    model=MODEL,
    tools=[scan_static, build_poc, validate_in_sandbox],
    max_turns=6,
)

@env.task(retries=3, timeout=20)
async def hypothesize(source: str, static_findings: str) -> dict:
    prompt = (
        "Analyze this C source file for memory-corruption vulnerabilities.\n\n"
        f"SOURCE:\n{source}\n\nDANGEROUS CALLS:\n{static_findings}\n"
    )

    # Beat A: hang on the first attempt -> task timeout -> retry.
    timeout_on = _force("AUTOSEC_FORCE_LLM_TIMEOUT") and _attempt() == 0
    bad_on = _force("AUTOSEC_FORCE_BAD_TOOL_CALL")

    if timeout_on:
        await asyncio.sleep(600)

    result = await hypothesis_agent.run.aio(prompt, memory=[])
    raw = result.summary or ""

    # Beat B: simulate a hallucinated/malformed tool call. When the timeout beat
    # is also active it consumes attempt 0, so defer this to attempt 1 — that way
    # both beats are actually demonstrated in a single run (e.g. AUTOSEC_FORCE_ALL).
    bad_attempt = 1 if _force("AUTOSEC_FORCE_LLM_TIMEOUT") else 0
    if bad_on and _attempt() == bad_attempt:
        raw = "Sure! The bug is somewhere around here, trust me."

    hyp = _extract_json(raw)
    if "vulnerable" not in hyp:
        hyp["vulnerable"] = "buffer_size" in hyp
    if hyp.get("vulnerable") and "buffer_size" not in hyp:
        raise ValueError(f"vulnerable hypothesis missing buffer_size: {hyp}")
    return hyp

# --- Orchestration ----------------------------------------------------------
def _load_targets() -> dict[str, str]:
    return {p.name: p.read_text() for p in sorted(TARGETS_DIR.glob("*.c"))}

@env.task
async def analyze_target(name: str, source: str) -> dict:
    findings = await scan_static(source)
    hypothesis = await hypothesize(source, findings)

    if not hypothesis.get("vulnerable"):
        poc: dict = {}
        verdict = {"triggered": False, "skipped": True}
    else:
        poc = await build_poc(hypothesis)
        verdict = await validate_in_sandbox(source, poc)

    return {
        "target": name,
        "static_findings": findings,
        "hypothesis": hypothesis,
        "poc": poc,
        "verdict": verdict,
    }

_REPORT_CSS = """
<style>
  .autosec { --bg:#ffffff; --card:#f7f8fa; --line:#e3e7ec; --muted:#5b6675;
    --text:#1b2330; --red:#c0392b; --amber:#b6791f; --green:#1e7e34; --accent:#1f6feb;
    font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,sans-serif;
    background:var(--bg); color:var(--text); padding:24px; border-radius:12px;
    border:1px solid var(--line); }
  .autosec h2 { margin:0 0 4px; font-size:20px; letter-spacing:.2px; }
  .autosec .sub { color:var(--muted); font-size:13px; margin:0 0 20px; }
  .autosec .cards { display:flex; gap:12px; flex-wrap:wrap; margin-bottom:22px; }
  .autosec .card { background:var(--card); border:1px solid var(--line);
    border-radius:10px; padding:14px 18px; min-width:120px; }
  .autosec .card .n { font-size:26px; font-weight:700; line-height:1; }
  .autosec .card .l { color:var(--muted); font-size:12px; margin-top:6px;
    text-transform:uppercase; letter-spacing:.6px; }
  .autosec table { width:100%; table-layout:fixed; border-collapse:collapse; font-size:13px;
    background:#fff; border:1px solid var(--line); border-radius:10px; overflow:hidden; }
  .autosec th, .autosec td { overflow-wrap:anywhere; }
  .autosec thead th { background:#eef1f5; color:var(--muted); text-align:left;
    font-weight:600; font-size:11px; text-transform:uppercase; letter-spacing:.6px;
    padding:11px 14px; border-bottom:1px solid var(--line); }
  .autosec tbody td { padding:11px 14px; border-bottom:1px solid var(--line);
    vertical-align:top; }
  .autosec tbody tr:last-child td { border-bottom:none; }
  .autosec tbody tr:nth-child(even) { background:#fafbfc; }
  .autosec tbody tr:hover { background:#eef4ff; }
  .autosec code { background:#eef1f5; border:1px solid var(--line); border-radius:5px;
    padding:1px 6px; font-family:ui-monospace,SFMono-Regular,Menlo,monospace; font-size:12px; }
  .autosec .num { text-align:right; font-variant-numeric:tabular-nums; }
  .autosec .reason { color:var(--muted); line-height:1.45; }
  .autosec .badge { display:inline-block; padding:3px 10px; border-radius:999px;
    font-size:11px; font-weight:700; letter-spacing:.4px; white-space:nowrap; }
  .autosec .b-exploited { background:rgba(192,57,43,.10); color:var(--red);
    border:1px solid rgba(192,57,43,.35); }
  .autosec .b-vuln { background:rgba(182,121,31,.12); color:var(--amber);
    border:1px solid rgba(182,121,31,.35); }
  .autosec .b-secure { background:rgba(30,126,52,.10); color:var(--green);
    border:1px solid rgba(30,126,52,.35); }
  .autosec col.c-target  { width:15%; }
  .autosec col.c-status  { width:11%; }
  .autosec col.c-class   { width:11%; }
  .autosec col.c-fn      { width:11%; }
  .autosec col.c-buf     { width:8%; }
  .autosec col.c-payload { width:8%; }
  .autosec col.c-exit    { width:6%; }
  .autosec col.c-reason  { width:30%; }
  .autosec .kv { display:flex; flex-wrap:wrap; gap:12px 28px; margin:16px 0 4px; }
  .autosec .kv .k { color:var(--muted); font-size:11px; text-transform:uppercase; letter-spacing:.6px; }
  .autosec .kv .v { font-weight:600; font-size:14px; margin-top:3px; }
  .autosec .section-label { font-size:11px; text-transform:uppercase; letter-spacing:.6px;
    color:var(--muted); margin:22px 0 7px; }
  .autosec .reason-block { background:var(--card); border:1px solid var(--line);
    border-radius:8px; padding:13px 15px; line-height:1.5; font-size:13px; }
  .autosec pre.code { background:#f6f8fa; border:1px solid var(--line); border-radius:8px;
    padding:14px 16px; overflow:auto; font-family:ui-monospace,SFMono-Regular,Menlo,monospace;
    font-size:12.5px; line-height:1.5; color:#1b2330; margin:0; }
  .autosec .subtabs > input[type=radio] { position:absolute; opacity:0; pointer-events:none; }
  .autosec .subnav { display:flex; flex-wrap:wrap; gap:4px; border-bottom:1px solid var(--line);
    margin:8px 0 18px; }
  .autosec .subnav label { display:inline-flex; align-items:center; gap:8px; padding:8px 14px;
    font-size:12.5px; cursor:pointer; border:1px solid transparent; border-bottom:none;
    border-radius:8px 8px 0 0; color:var(--muted); margin-bottom:-1px; }
  .autosec .subnav label:hover { background:var(--card); color:var(--text); }
  .autosec .subnav .dot { width:8px; height:8px; border-radius:50%; }
  .autosec .dot.b-exploited { background:var(--red); }
  .autosec .dot.b-vuln { background:var(--amber); }
  .autosec .dot.b-secure { background:var(--green); }
  .autosec .panel { display:none; }
</style>
"""

def _status(finding: dict) -> tuple[str, str]:
    hyp = finding.get("hypothesis") or {}
    verdict = finding.get("verdict") or {}
    if not hyp.get("vulnerable"):
        return "b-secure", "SECURE"
    if verdict.get("triggered"):
        return "b-exploited", "EXPLOITED"
    return "b-vuln", "VULNERABLE"

def _render_report_html(findings: list[dict]) -> str:
    exploited = sum(1 for f in findings if (f.get("verdict") or {}).get("triggered"))
    vulnerable = sum(1 for f in findings if (f.get("hypothesis") or {}).get("vulnerable"))
    secure = len(findings) - vulnerable

    rows = []
    for f in sorted(findings, key=lambda x: x["target"]):
        hyp = f.get("hypothesis") or {}
        verdict = f.get("verdict") or {}
        cls, label = _status(f)
        is_vuln = bool(hyp.get("vulnerable"))
        vuln_class = hyp.get("vuln_class", "\u2014") if is_vuln else "\u2014"
        fn = hyp.get("function", "\u2014") if is_vuln else "\u2014"
        buf = hyp.get("buffer_size", "\u2014") if is_vuln else "\u2014"
        payload = (f.get("poc") or {}).get("payload_len", "\u2014") if is_vuln else "\u2014"
        exit_code = verdict.get("sandbox_exit_code", "\u2014")
        rows.append(
            "<tr>"
            f"<td><code>{html.escape(str(f['target']))}</code></td>"
            f'<td><span class="badge {cls}">{label}</span></td>'
            f"<td>{html.escape(str(vuln_class))}</td>"
            f"<td><code>{html.escape(str(fn))}</code></td>"
            f'<td class="num">{html.escape(str(buf))}</td>'
            f'<td class="num">{html.escape(str(payload))}</td>'
            f'<td class="num">{html.escape(str(exit_code))}</td>'
            f'<td class="reason">{html.escape(str(hyp.get("reasoning", "")))}</td>'
            "</tr>"
        )

    return f"""{_REPORT_CSS}
    <div class="autosec">
      <h2>AutoSec &middot; security findings report</h2>
      <p class="sub">{len(findings)} target(s) analyzed in parallel &middot; PoCs validated in isolated sandbox.</p>
      <div class="cards">
        <div class="card"><div class="n">{len(findings)}</div><div class="l">Targets</div></div>
        <div class="card"><div class="n" style="color:#ff6b6b">{exploited}</div><div class="l">Exploited</div></div>
        <div class="card">
          <div class="n" style="color:#ffb454">{vulnerable - exploited}</div>
          <div class="l">Vuln, PoC failed</div>
        </div>
        <div class="card"><div class="n" style="color:#3fb950">{secure}</div><div class="l">Secure</div></div>
      </div>
      <table>
        <colgroup>
          <col class="c-target"><col class="c-status"><col class="c-class"><col class="c-fn">
          <col class="c-buf"><col class="c-payload"><col class="c-exit"><col class="c-reason">
        </colgroup>
        <thead>
          <tr>
            <th>Target</th><th>Status</th><th>Vuln class</th><th>Function</th>
            <th class="num">Buffer&nbsp;(B)</th><th class="num">Payload&nbsp;(B)</th>
            <th class="num">Exit</th><th>Reasoning</th>
          </tr>
        </thead>
        <tbody>{"".join(rows)}</tbody>
      </table>
    </div>
    """

def _target_detail_html(finding: dict, source: str) -> str:
    hyp = finding.get("hypothesis") or {}
    verdict = finding.get("verdict") or {}
    cls, label = _status(finding)
    is_vuln = bool(hyp.get("vulnerable"))

    def cell(k: str, v: Any) -> str:
        return f'<div><div class="k">{html.escape(k)}</div><div class="v">{html.escape(str(v))}</div></div>'

    triggered = verdict.get("triggered")
    verdict_txt = "skipped (secure)" if verdict.get("skipped") else ("triggered" if triggered else "not triggered")
    stats = "".join(
        [
            cell("Vuln class", hyp.get("vuln_class", "\u2014") if is_vuln else "\u2014"),
            cell("Function", hyp.get("function", "\u2014") if is_vuln else "\u2014"),
            cell("Buffer (B)", hyp.get("buffer_size", "\u2014") if is_vuln else "\u2014"),
            cell("Payload (B)", (finding.get("poc") or {}).get("payload_len", "\u2014") if is_vuln else "\u2014"),
            cell("Sandbox exit", verdict.get("sandbox_exit_code", "\u2014")),
            cell("PoC", verdict_txt),
        ]
    )
    reasoning = html.escape(str(hyp.get("reasoning", "")) or "\u2014")
    code = html.escape(source or "(source unavailable)")
    return f"""
      <h3 style="margin:0 0 4px"><code>{html.escape(str(finding["target"]))}</code>
          &nbsp;<span class="badge {cls}">{label}</span></h3>
      <p class="sub">Per-target detail &middot; PoCs validated in an isolated sandbox.</p>
      <div class="kv">{stats}</div>
      <div class="section-label">Reasoning</div>
      <div class="reason-block">{reasoning}</div>
      <div class="section-label">Source</div>
      <pre class="code">{code}</pre>
    """

def _render_targets_tab_html(findings: list[dict], sources: dict[str, str]) -> str:
    ordered = sorted(findings, key=lambda x: x["target"])

    radios, nav, panels, rules = [], [], [], []
    for i, f in enumerate(ordered):
        name = f["target"]
        cls, _ = _status(f)
        checked = " checked" if i == 0 else ""
        radios.append(f'<input type="radio" name="as-targets" id="as-t{i}"{checked}>')
        nav.append(f'<label for="as-t{i}"><span class="dot {cls}"></span><code>{html.escape(str(name))}</code></label>')
        panels.append(f'<div class="panel" id="as-p{i}">{_target_detail_html(f, sources.get(name, ""))}</div>')
        rules.append(
            f'.autosec #as-t{i}:checked ~ .subnav label[for="as-t{i}"]'
            "{background:#fff;color:var(--text);border-color:var(--line);font-weight:600;}"
            f".autosec #as-t{i}:checked ~ .panels #as-p{i}{{display:block;}}"
        )

    return f"""{_REPORT_CSS}
    <style>{"".join(rules)}</style>
    <div class="autosec">
      <h2>AutoSec &middot; target detail</h2>
      <p class="sub">{len(ordered)} target(s) &middot; select a file to see its status, reasoning, and source.</p>
      <div class="subtabs">
        {"".join(radios)}
        <div class="subnav">{"".join(nav)}</div>
        <div class="panels">{"".join(panels)}</div>
      </div>
    </div>
    """

@env.task(retries=1)
async def random_error() -> str:
    if _attempt() == 0:
        raise Exception("Random error")
    return "Passed!"

# {{docs-fragment pipeline}}
@env.task(report=True)
async def run_autosec_agent() -> dict:
    targets = _load_targets()
    if not targets:
        raise FileNotFoundError(f"no targets found under {TARGETS_DIR}")

    findings = list(await asyncio.gather(*(analyze_target(name, src) for name, src in targets.items())))

    await flyte.report.replace.aio(_render_report_html(findings))
    flyte.report.get_tab("targets").replace(_render_targets_tab_html(findings, targets))
    await flyte.report.flush.aio()

    await random_error()

    return {
        "targets_analyzed": len(findings),
        "triggered": sum(1 for f in findings if f["verdict"].get("triggered")),
        "findings": findings,
    }
# {{/docs-fragment pipeline}}

# {{docs-fragment main}}
if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(run_autosec_agent)
    print(run.url)
    run.wait()
# {{/docs-fragment main}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/tutorials/autosec_research_agent/main.py*

## Run the agent

### Create secrets

Get an Anthropic API key from the [Anthropic console](https://console.anthropic.com/) and register it as a Flyte secret:

```
flyte create secret internal-anthropic-api-key <YOUR_ANTHROPIC_API_KEY>
```

See [Secrets](https://www.union.ai/docs/v2/union/user-guide/task-configuration/secrets/page.md) for scoping and file-based secrets.

### Run remotely

From the [example directory](https://github.com/unionai/unionai-examples/tree/main/v2/tutorials/autosec_research_agent):

```
cd v2/tutorials/autosec_research_agent
uv run --script main.py
```

Follow the printed run URL to watch each target progress through the pipeline and open the report panel for the findings table and per-target detail tabs.

Optional environment variables demonstrate self-healing behavior (`AUTOSEC_FORCE_LLM_TIMEOUT`, `AUTOSEC_FORCE_BAD_TOOL_CALL`, `AUTOSEC_FORCE_OOM`, or `AUTOSEC_FORCE_ALL=1`).

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/tutorials/agents/autosec-research-agent/_index.md
**HTML**: https://www.union.ai/docs/v2/union/tutorials/agents/autosec-research-agent/
