Field data enrichment agent

Code available here.

This example demonstrates how to build an autonomous systems and field-data enrichment agent on Flyte. The agent enriches geo-tagged operational events — from autonomous vehicles, aircraft, satellites, or field sensors — with real-world public context: road closures, weather events, airspace changes, or local incidents tied to a geofence.

Operational data stays in your environment while public-web grounding queries go to the You.com Search API. The API provides unified web and news results with freshness and country targeting, and Claude via LiteLLM summarizes the relevant context for each geo-tagged event.

Flyte provides:

  • Fan-out parallelism across geo-tagged events
  • cache="auto" so repeated geofence checks within the cache window reuse prior results
  • @flyte.trace on every external call for lineage
  • Flyte reports with operational severity and per-incident citations

Field data enrichment agent report

Setting up the environment

The agent runs in a TaskEnvironment with secrets for the You.com and Anthropic API keys, automatic caching, and a container image built from the uv script dependencies.

main.py
import asyncio
import json
import os
from dataclasses import dataclass, field

import flyte

MODEL = "anthropic/claude-haiku-4-5"

env = flyte.TaskEnvironment(
    name="field-data-enrichment",
    secrets=[
        flyte.Secret(key="youdotcom-api-key", as_env_var="YOU_API_KEY"),
        flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY"),
    ],
    image=flyte.Image.from_uv_script(__file__, name="field-data-enrichment", pre=True),
    resources=flyte.Resources(cpu="1", memory="1Gi"),
    cache="auto",
)

The Python packages are declared at the top of the file using the uv script style:

# /// script
# requires-python = "==3.13"
# dependencies = [
#     "flyte>=2.4.0",
#     "httpx>=0.27.0",
#     "litellm>=1.72.0",
# ]
# ///

Data types

Each GeoEvent carries an event ID, location, ISO country code for geo-targeting, and an event type. Enriched events include a context summary, operational severity, and discrete incidents with source citations.

main.py
@dataclass
class GeoEvent:
    event_id: str
    location: str
    country: str
    event_type: str


@dataclass
class Incident:
    description: str
    source_url: str
    published: str
    domain: str = ""
    author: str = ""
    favicon: str = ""
    snippet: str = ""
    section: str = "web"


@dataclass
class EnrichedEvent:
    event_id: str
    location: str
    context_summary: str
    severity: str
    incidents: list[Incident] = field(default_factory=list)


@dataclass
class EnrichmentReport:
    events: list[EnrichedEvent] = field(default_factory=list)

Search with the You.com Search API

The you_search helper calls the You.com Search API with freshness and country parameters to retrieve location-relevant web and news results. See the Search API reference for supported country codes and freshness values.

main.py

Enrich one event

The enrich_event task builds a location- and type-scoped query, calls the You.com Search API, and asks Claude to summarize relevant real-world context, extract discrete incidents, and assign an operational severity — all grounded in the returned sources.

main.py
@env.task(retries=3)
async def enrich_event(event: GeoEvent, freshness: str) -> EnrichedEvent:
    """Ground one geo-tagged event in fresh public context."""
    query = f"{event.location} {event.event_type.replace('_', ' ')} road closure weather incident"
    hits = await you_search(query, country=event.country, freshness=freshness)

    evidence = "\n\n".join(
        f"[{i + 1}] {h.title} ({h.published}) — {h.domain}\n{h.url}\n{h.snippet}"
        for i, h in enumerate(hits)
    )
    user = (
        f"Location: {event.location}\n"
        f"Event type: {event.event_type}\n\n"
        f"Search results:\n{evidence or 'No results.'}"
    )
    parsed = await llm_json(ENRICH_SYSTEM, user)

    def _incident(it: dict) -> Incident:
        idx = int(it.get("source_index", 0) or 0)
        src = hits[idx - 1] if 1 <= idx <= len(hits) else None
        return Incident(
            description=str(it.get("description", "")),
            source_url=src.url if src else "",
            published=src.published if src else "",
            domain=src.domain if src else "",
            author=src.author if src else "",
            favicon=src.favicon if src else "",
            snippet=src.snippet if src else "",
            section=src.section if src else "web",
        )

    incidents = [_incident(it) for it in (parsed.get("incidents", []) or [])]
    return EnrichedEvent(
        event_id=event.event_id,
        location=event.location,
        context_summary=str(parsed.get("context_summary", "")),
        severity=str(parsed.get("severity", "none")),
        incidents=incidents,
    )

Orchestration

The field_data_enrichment driver task fans out across all events and renders a Flyte report sorted by severity.

main.py
DEFAULT_EVENTS = [
    GeoEvent("evt-1", "Mountain View, CA", "US", "road_closure_check"),
    GeoEvent("evt-2", "Tokyo, Japan", "JP", "weather"),
    GeoEvent("evt-3", "Austin, TX", "US", "road_closure_check"),
    GeoEvent("evt-4", "Phoenix, AZ", "US", "weather"),
    GeoEvent("evt-5", "London, UK", "GB", "incident"),
    GeoEvent("evt-6", "San Francisco, CA", "US", "incident"),
    GeoEvent("evt-7", "Seattle, WA", "US", "weather"),
    GeoEvent("evt-8", "Miami, FL", "US", "weather"),
    GeoEvent("evt-9", "Denver, CO", "US", "road_closure_check"),
    GeoEvent("evt-10", "Berlin, Germany", "DE", "incident"),
]


@env.task(report=True)
async def field_data_enrichment(
    events: list[GeoEvent] = DEFAULT_EVENTS,
    freshness: str = "day",
) -> EnrichmentReport:
    """Fan out across geo-tagged events and enrich each with public context."""
    with flyte.group("enrich-events"):
        enriched = await asyncio.gather(
            *[enrich_event(e, freshness) for e in events]
        )

    report = EnrichmentReport(events=list(enriched))
    await flyte.report.replace.aio(_render_report(report), do_flush=True)
    await flyte.report.flush.aio()
    return report

Run the agent

Create secrets

Get a You.com API key from the You.com platform (see the quickstart guide). Get an Anthropic API key from the Anthropic console.

Register both keys as Flyte secrets. The secret key names must match those declared in the TaskEnvironment:

flyte create secret youdotcom-api-key <YOUR_YOU_API_KEY>
flyte create secret internal-anthropic-api-key <YOUR_ANTHROPIC_API_KEY>

See Secrets for scoping and file-based secrets.

Run locally or remotely

From the example directory:

cd v2/tutorials/field_data_enrichment_agent
uv run --script main.py

To test locally without Flyte secrets:

export YOU_API_KEY=<YOUR_YOU_API_KEY>
export ANTHROPIC_API_KEY=<YOUR_ANTHROPIC_API_KEY>

uv run --script main.py

When the run completes, open the Flyte report to review enriched events with operational severity and timestamped You.com source citations for each incident.