Compliance monitoring agent

Code available here.

This example demonstrates how to build a regulatory and compliance monitoring agent on Flyte. The agent watches trusted regulatory sources — FDA guidance, SEC filings, sanctions lists, state-level privacy laws — and routes structured, citation-precise findings to the right downstream team (compliance, legal, or clinical ops).

Compliance monitoring requires citation precision and recency so every finding can be verified. The You.com Research API returns a grounded, synthesized answer plus structured sources (URL, title, snippet). Use source_control to restrict research to trusted government and regulator domains within a recency window, and output_schema when you need machine-readable findings. Claude via LiteLLM triages each finding for severity and routing. Combined with Flyte’s audit lineage, you get end-to-end traceability from query to citation.

Flyte provides:

  • Fan-out parallelism across watch items
  • @flyte.trace on every You.com Research and LLM call
  • Retries on monitoring tasks for robustness
  • Flyte reports grouped by team and severity

Compliance monitoring agent report

Setting up the environment

The agent runs in a TaskEnvironment with secrets for the You.com and Anthropic API keys and a container image built from the uv script dependencies.

main.py
import asyncio
import json
import os
from dataclasses import dataclass, field

import flyte

MODEL = "anthropic/claude-haiku-4-5"

env = flyte.TaskEnvironment(
    name="compliance-monitoring",
    secrets=[
        flyte.Secret(key="youdotcom-api-key", as_env_var="YOU_API_KEY"),
        flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY"),
    ],
    image=flyte.Image.from_uv_script(__file__, name="compliance-monitoring", pre=True),
    resources=flyte.Resources(cpu="1", memory="1Gi"),
)

The Python packages are declared at the top of the file using the uv script style:

# /// script
# requires-python = "==3.13"
# dependencies = [
#     "flyte>=2.4.0",
#     "httpx>=0.27.0",
#     "litellm>=1.72.0",
# ]
# ///

Data types

Each WatchItem specifies a regulatory topic, a list of trusted domains for source_control, and a routing destination team. Findings carry citation metadata — source URL, published date, and snippet — so every claim can be verified.

main.py
@dataclass
class WatchItem:
    topic: str
    trusted_domains: list[str]
    team: str


@dataclass
class Finding:
    topic: str
    team: str
    title: str
    summary: str
    source_url: str
    published_date: str
    snippet: str
    domain: str = ""
    favicon: str = ""
    severity: str = "info"
    rationale: str = ""


def _domain(url: str) -> str:
    from urllib.parse import urlparse

    try:
        return urlparse(url).netloc.replace("www.", "")
    except Exception:
        return ""


def _favicon_for(url: str) -> str:
    return f"https://ydc-index.io/favicon?domain={_domain(url)}&size=128"


@dataclass
class ComplianceReport:
    findings: list[Finding] = field(default_factory=list)

Research with the You.com Research API

The you_research helper calls the You.com Research API at https://api.you.com/v1/research. It passes source_control with an include_domains allowlist and a freshness filter, and requests structured output via output_schema.

See the Research API reference for research_effort levels (lite, standard, deep, exhaustive), source_control, and output_schema parameters.

main.py
YOU_RESEARCH_URL = "https://api.you.com/v1/research"

FINDINGS_SCHEMA = {
    "type": "object",
    "properties": {
        "findings": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "title": {"type": "string"},
                    "summary": {"type": "string"},
                    "source_url": {"type": "string"},
                    "published_date": {"type": "string"},
                    "snippet": {"type": "string"},
                },
                "required": [
                    "title",
                    "summary",
                    "source_url",
                    "published_date",
                    "snippet",
                ],
                "additionalProperties": False,
            },
        }
    },
    "required": ["findings"],
    "additionalProperties": False,
}


async def _you_post(url: str, body: dict, timeout: float = 300.0) -> dict:
    """POST with exponential backoff + jitter on 429 rate limits."""
    import asyncio
    import random

    import httpx

    headers = {
        "X-API-Key": os.environ["YOU_API_KEY"],
        "Content-Type": "application/json",
    }
    async with httpx.AsyncClient(timeout=timeout) as client:
        for attempt in range(7):
            resp = await client.post(url, headers=headers, json=body)
            if resp.status_code == 429 and attempt < 6:
                wait = float(resp.headers.get("retry-after") or 0) or min(2**attempt, 30)
                await asyncio.sleep(wait + random.uniform(0, 2))
                continue
            resp.raise_for_status()
            return resp.json()
    resp.raise_for_status()
    return resp.json()


@flyte.trace
async def you_research(
    question: str,
    include_domains: list[str],
    freshness: str,
    research_effort: str = "standard",
) -> dict:
    """Call the You.com Research API with domain + freshness source controls."""
    body = {
        "input": question,
        "research_effort": research_effort,
        "source_control": {
            "include_domains": include_domains,
            "freshness": freshness,
        },
        "output_schema": FINDINGS_SCHEMA,
    }
    return await _you_post(YOU_RESEARCH_URL, body)

Triage findings with Claude

After the Research API returns structured findings, Claude assigns a severity (info, watch, or action) and a routing rationale for each one.

main.py
@flyte.trace
async def triage(topic: str, findings: list[dict]) -> list[dict]:
    """Use Claude to assign a severity + rationale to each finding."""
    from litellm import acompletion

    if not findings:
        return []

    system = (
        "You are a regulatory-compliance triage analyst. For each finding, "
        "assign a severity of 'info' (FYI), 'watch' (monitor closely), or "
        "'action' (requires a concrete compliance/legal response), and a one-"
        "sentence rationale. Respond ONLY with JSON: "
        '{"triage": [{"severity": str, "rationale": str}]} with one entry per '
        "finding, in order."
    )
    listing = "\n".join(
        f"[{i + 1}] {f.get('title', '')}: {f.get('summary', '')}"
        for i, f in enumerate(findings)
    )
    resp = await acompletion(
        model=MODEL,
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": f"Topic: {topic}\n\nFindings:\n{listing}"},
        ],
        temperature=0.0,
        max_tokens=1024,
    )
    parsed = _parse_json(resp.choices[0].message.content)
    return parsed.get("triage", []) if isinstance(parsed, dict) else []


def _parse_json(text: str) -> dict | list:
    text = text.strip()
    if text.startswith("```"):
        text = text.split("```", 2)[1]
        if text.lstrip().startswith("json"):
            text = text.lstrip()[4:]
    start = min((i for i in (text.find("{"), text.find("[")) if i != -1), default=0)
    end = max(text.rfind("}"), text.rfind("]")) + 1
    return json.loads(text[start:end])

Monitor one watch item

The monitor_watch_item task researches a single regulatory topic, enriches findings with source metadata from the Research API response, and triages each finding for severity and routing.

main.py
@env.task(retries=3)
async def monitor_watch_item(item: WatchItem, freshness: str) -> list[Finding]:
    """Research one regulatory topic and produce triaged, cited findings."""
    question = (
        f"What are the most recent changes, updates, or new guidance regarding "
        f"'{item.topic}'? Report concrete, dated changes with their sources."
    )
    result = await you_research(question, item.trusted_domains, freshness)
    output = result.get("output", {})

    # Build a lookup from the Research API's full source list (url -> metadata).
    src_by_url: dict[str, dict] = {}
    for s in output.get("sources", []) or []:
        url = str(s.get("url", ""))
        if url:
            src_by_url[url] = s

    content = output.get("content", {})
    if isinstance(content, str):
        content = _parse_json(content) if content.strip() else {}
    raw_findings = content.get("findings", []) if isinstance(content, dict) else []

    triage_results = await triage(item.topic, raw_findings)

    findings: list[Finding] = []
    for i, f in enumerate(raw_findings):
        t = triage_results[i] if i < len(triage_results) else {}
        url = str(f.get("source_url", ""))
        meta = src_by_url.get(url, {})
        snippet = str(f.get("snippet", "")) or str((meta.get("snippets") or [""])[0])
        findings.append(
            Finding(
                topic=item.topic,
                team=item.team,
                title=str(f.get("title", "") or meta.get("title", "")),
                summary=str(f.get("summary", "")),
                source_url=url,
                published_date=str(f.get("published_date", "")),
                snippet=snippet,
                domain=_domain(url),
                favicon=_favicon_for(url),
                severity=str(t.get("severity", "info")),
                rationale=str(t.get("rationale", "")),
            )
        )
    return findings

Orchestration

The compliance_monitoring driver task fans out across all watch items, aggregates findings, and renders a Flyte report sorted by severity and team.

main.py
def _default_watch_items() -> list[WatchItem]:
    return [
        WatchItem(
            topic="FDA guidance on AI/ML-enabled medical device software",
            trusted_domains=["fda.gov", "federalregister.gov"],
            team="clinical",
        ),
        WatchItem(
            topic="SEC climate-related disclosure rules for public companies",
            trusted_domains=["sec.gov", "federalregister.gov"],
            team="legal",
        ),
        WatchItem(
            topic="OFAC sanctions list additions and updates",
            trusted_domains=["treasury.gov", "ofac.treasury.gov"],
            team="compliance",
        ),
        WatchItem(
            topic="State-level consumer data privacy laws and amendments",
            trusted_domains=["iapp.org", "oag.ca.gov"],
            team="legal",
        ),
        WatchItem(
            topic="FDA drug recalls and safety communications",
            trusted_domains=["fda.gov"],
            team="clinical",
        ),
        WatchItem(
            topic="HIPAA enforcement actions and guidance updates",
            trusted_domains=["hhs.gov"],
            team="compliance",
        ),
    ]


@env.task(report=True)
async def compliance_monitoring(
    watch_items: list[WatchItem] | None = None,
    freshness: str = "month",
) -> ComplianceReport:
    """Fan out across regulatory watch items and aggregate triaged findings."""
    if watch_items is None:
        watch_items = _default_watch_items()

    with flyte.group("monitor-watch-items"):
        results = await asyncio.gather(
            *[monitor_watch_item(item, freshness) for item in watch_items]
        )

    report = ComplianceReport(findings=[f for fs in results for f in fs])

    await flyte.report.replace.aio(_render_report(report), do_flush=True)
    await flyte.report.flush.aio()
    return report

Run the agent

Create secrets

Get a You.com API key from the You.com platform (see the quickstart guide). Get an Anthropic API key from the Anthropic console.

Register both keys as Flyte secrets. The secret key names must match those declared in the TaskEnvironment:

flyte create secret youdotcom-api-key <YOUR_YOU_API_KEY>
flyte create secret internal-anthropic-api-key <YOUR_ANTHROPIC_API_KEY>

See Secrets for scoping and file-based secrets.

Run locally or remotely

From the example directory:

cd v2/tutorials/compliance_monitoring_agent
uv run --script main.py

To test locally without Flyte secrets:

export YOU_API_KEY=<YOUR_YOU_API_KEY>
export ANTHROPIC_API_KEY=<YOUR_ANTHROPIC_API_KEY>

uv run --script main.py

When the run completes, open the Flyte report to review findings grouped by severity, each with a verifiable You.com Research citation.