Field data enrichment agent
Code available here.
This example demonstrates how to build an autonomous systems and field-data enrichment agent on Flyte. The agent enriches geo-tagged operational events — from autonomous vehicles, aircraft, satellites, or field sensors — with real-world public context: road closures, weather events, airspace changes, or local incidents tied to a geofence.
Operational data stays in your environment while public-web grounding queries go to the
You.com Search API. The API provides unified web and news results with freshness and country targeting, and
Claude via
LiteLLM summarizes the relevant context for each geo-tagged event.
Flyte provides:
- Fan-out parallelism across geo-tagged events
cache="auto"so repeated geofence checks within the cache window reuse prior results@flyte.traceon every external call for lineage- Flyte reports with operational severity and per-incident citations
Setting up the environment
The agent runs in a TaskEnvironment with secrets for the You.com and Anthropic API keys, automatic caching, and a container image built from the uv script dependencies.
import asyncio
import json
import os
from dataclasses import dataclass, field
import flyte
MODEL = "anthropic/claude-haiku-4-5"
env = flyte.TaskEnvironment(
name="field-data-enrichment",
secrets=[
flyte.Secret(key="youdotcom-api-key", as_env_var="YOU_API_KEY"),
flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY"),
],
image=flyte.Image.from_uv_script(__file__, name="field-data-enrichment", pre=True),
resources=flyte.Resources(cpu="1", memory="1Gi"),
cache="auto",
)
The Python packages are declared at the top of the file using the uv script style:
# /// script
# requires-python = "==3.13"
# dependencies = [
# "flyte>=2.4.0",
# "httpx>=0.27.0",
# "litellm>=1.72.0",
# ]
# ///Data types
Each GeoEvent carries an event ID, location, ISO country code for geo-targeting, and an event type. Enriched events include a context summary, operational severity, and discrete incidents with source citations.
@dataclass
class GeoEvent:
event_id: str
location: str
country: str
event_type: str
@dataclass
class Incident:
description: str
source_url: str
published: str
domain: str = ""
author: str = ""
favicon: str = ""
snippet: str = ""
section: str = "web"
@dataclass
class EnrichedEvent:
event_id: str
location: str
context_summary: str
severity: str
incidents: list[Incident] = field(default_factory=list)
@dataclass
class EnrichmentReport:
events: list[EnrichedEvent] = field(default_factory=list)
Search with the You.com Search API
The you_search helper calls the
You.com Search API with freshness and country parameters to retrieve location-relevant web and news results. See the
Search API reference for supported country codes and freshness values.
YOU_SEARCH_URL = "https://ydc-index.io/v1/search"
@dataclass
class SearchHit:
title: str
url: str
domain: str
snippet: str
published: str
author: str
favicon: str
section: str
def _domain(url: str) -> str:
from urllib.parse import urlparse
try:
return urlparse(url).netloc.replace("www.", "")
except Exception:
return ""
def _favicon(item: dict, url: str) -> str:
return item.get("favicon_url") or (
f"https://ydc-index.io/favicon?domain={_domain(url)}&size=128"
)
async def _you_get(url: str, params: dict, timeout: float = 60.0) -> dict:
"""GET with exponential backoff + jitter on 429 rate limits."""
import asyncio
import random
import httpx
headers = {"X-API-Key": os.environ["YOU_API_KEY"]}
async with httpx.AsyncClient(timeout=timeout) as client:
for attempt in range(7):
resp = await client.get(url, headers=headers, params=params)
if resp.status_code == 429 and attempt < 6:
wait = float(resp.headers.get("retry-after") or 0) or min(2**attempt, 30)
await asyncio.sleep(wait + random.uniform(0, 2))
continue
resp.raise_for_status()
return resp.json()
resp.raise_for_status()
return resp.json()
@flyte.trace
async def you_search(
query: str, country: str, freshness: str = "day", count: int = 8
) -> list[SearchHit]:
"""Search the public web + news for context near a geofenced location."""
params = {
"query": query,
"count": count,
"freshness": freshness,
"country": country,
}
data = await _you_get(YOU_SEARCH_URL, params)
results = data.get("results", {})
hits: list[SearchHit] = []
for section in ("news", "web"):
for item in results.get(section, []) or []:
snippets = item.get("snippets") or []
url = item.get("url", "")
hits.append(
SearchHit(
title=item.get("title", ""),
url=url,
domain=_domain(url),
snippet=(snippets[0] if snippets else item.get("description", "")),
published=item.get("page_age", "") or "",
author=", ".join(item.get("authors") or []),
favicon=_favicon(item, url),
section=section,
)
)
return hits
Enrich one event
The enrich_event task builds a location- and type-scoped query, calls the You.com Search API, and asks Claude to summarize relevant real-world context, extract discrete incidents, and assign an operational severity — all grounded in the returned sources.
@env.task(retries=3)
async def enrich_event(event: GeoEvent, freshness: str) -> EnrichedEvent:
"""Ground one geo-tagged event in fresh public context."""
query = f"{event.location} {event.event_type.replace('_', ' ')} road closure weather incident"
hits = await you_search(query, country=event.country, freshness=freshness)
evidence = "\n\n".join(
f"[{i + 1}] {h.title} ({h.published}) — {h.domain}\n{h.url}\n{h.snippet}"
for i, h in enumerate(hits)
)
user = (
f"Location: {event.location}\n"
f"Event type: {event.event_type}\n\n"
f"Search results:\n{evidence or 'No results.'}"
)
parsed = await llm_json(ENRICH_SYSTEM, user)
def _incident(it: dict) -> Incident:
idx = int(it.get("source_index", 0) or 0)
src = hits[idx - 1] if 1 <= idx <= len(hits) else None
return Incident(
description=str(it.get("description", "")),
source_url=src.url if src else "",
published=src.published if src else "",
domain=src.domain if src else "",
author=src.author if src else "",
favicon=src.favicon if src else "",
snippet=src.snippet if src else "",
section=src.section if src else "web",
)
incidents = [_incident(it) for it in (parsed.get("incidents", []) or [])]
return EnrichedEvent(
event_id=event.event_id,
location=event.location,
context_summary=str(parsed.get("context_summary", "")),
severity=str(parsed.get("severity", "none")),
incidents=incidents,
)
Orchestration
The field_data_enrichment driver task fans out across all events and renders a Flyte report sorted by severity.
DEFAULT_EVENTS = [
GeoEvent("evt-1", "Mountain View, CA", "US", "road_closure_check"),
GeoEvent("evt-2", "Tokyo, Japan", "JP", "weather"),
GeoEvent("evt-3", "Austin, TX", "US", "road_closure_check"),
GeoEvent("evt-4", "Phoenix, AZ", "US", "weather"),
GeoEvent("evt-5", "London, UK", "GB", "incident"),
GeoEvent("evt-6", "San Francisco, CA", "US", "incident"),
GeoEvent("evt-7", "Seattle, WA", "US", "weather"),
GeoEvent("evt-8", "Miami, FL", "US", "weather"),
GeoEvent("evt-9", "Denver, CO", "US", "road_closure_check"),
GeoEvent("evt-10", "Berlin, Germany", "DE", "incident"),
]
@env.task(report=True)
async def field_data_enrichment(
events: list[GeoEvent] = DEFAULT_EVENTS,
freshness: str = "day",
) -> EnrichmentReport:
"""Fan out across geo-tagged events and enrich each with public context."""
with flyte.group("enrich-events"):
enriched = await asyncio.gather(
*[enrich_event(e, freshness) for e in events]
)
report = EnrichmentReport(events=list(enriched))
await flyte.report.replace.aio(_render_report(report), do_flush=True)
await flyte.report.flush.aio()
return report
Run the agent
Create secrets
Get a You.com API key from the You.com platform (see the quickstart guide). Get an Anthropic API key from the Anthropic console.
Register both keys as Flyte secrets. The secret key names must match those declared in the TaskEnvironment:
flyte create secret youdotcom-api-key <YOUR_YOU_API_KEY>
flyte create secret internal-anthropic-api-key <YOUR_ANTHROPIC_API_KEY>See Secrets for scoping and file-based secrets.
Run locally or remotely
From the example directory:
cd v2/tutorials/field_data_enrichment_agent
uv run --script main.pyTo test locally without Flyte secrets:
export YOU_API_KEY=<YOUR_YOU_API_KEY>
export ANTHROPIC_API_KEY=<YOUR_ANTHROPIC_API_KEY>
uv run --script main.pyWhen the run completes, open the Flyte report to review enriched events with operational severity and timestamped You.com source citations for each incident.