Competitive intelligence agent
Code available here.
This example demonstrates how to build a continuous competitive and market intelligence agent on Flyte. The agent fans out across a list of competitors, pulls fresh, source-cited web and news results from the You.com Search API, and uses Claude via LiteLLM to extract structured deltas — pricing changes, product launches, funding events, leadership moves, and more — into a knowledge-graph-ready table.
You.com returns ranked web and news results with snippets and publication timestamps, giving the LLM attributable sources to cite. Flyte orchestrates the rest:
- Fan-out parallelism across competitors with
asyncio.gather cache="auto"so converging parallel or repeat runs reuse prior You.com and LLM results when queries overlap@flyte.traceon every You.com and LLM call for full prompt → query → source lineage- Flyte reports that render an HTML dashboard grouping deltas by competitor and category
Setting up the environment
The agent runs in a single TaskEnvironment with secrets for the You.com and Anthropic API keys, automatic caching, and a container image built from the uv script dependencies.
import asyncio
import json
from dataclasses import dataclass, field
import flyte
MODEL = "anthropic/claude-haiku-4-5"
env = flyte.TaskEnvironment(
name="competitive-intelligence",
secrets=[
flyte.Secret(key="youdotcom-api-key", as_env_var="YOU_API_KEY"),
flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY"),
],
image=flyte.Image.from_uv_script(__file__, name="competitive-intelligence", pre=True),
resources=flyte.Resources(cpu="1", memory="1Gi"),
cache="auto",
)
The Python packages are declared at the top of the file using the uv script style:
# /// script
# requires-python = "==3.13"
# dependencies = [
# "flyte>=2.4.0",
# "httpx>=0.27.0",
# "litellm>=1.72.0",
# ]
# ///Data types
The agent models search hits, deltas, and the final report as dataclasses. Each Delta links back to a SearchHit that preserves You.com metadata — domain, publication date, author, and snippet.
@dataclass
class SearchHit:
"""A You.com Search result with its full structured metadata."""
title: str
url: str
domain: str
snippet: str
published: str # You.com page_age timestamp
author: str
favicon: str # You.com favicon_url
thumbnail: str
section: str # "news" or "web" — You.com's auto classification
@dataclass
class Delta:
competitor: str
category: str
summary: str
confidence: float
source: SearchHit | None = None
@dataclass
class CompetitorWatch:
competitor: str
deltas: list[Delta] = field(default_factory=list)
sources: list[SearchHit] = field(default_factory=list)
@dataclass
class IntelReport:
watches: list[CompetitorWatch] = field(default_factory=list)
@property
def deltas(self) -> list[Delta]:
return [d for w in self.watches for d in w.deltas]
Search with the You.com Search API
The you_search helper calls the
You.com Search API at https://ydc-index.io/v1/search. It requests unified web and news results with a freshness filter (day, week, month, or year) and returns structured hits the LLM can cite by index.
See the
Search API reference for all supported parameters, including count, country, and search operators.
YOU_SEARCH_URL = "https://ydc-index.io/v1/search"
async def _you_get(url: str, params: dict, timeout: float = 60.0) -> dict:
"""GET with exponential backoff + jitter on 429 rate limits."""
import asyncio
import os
import random
import httpx
headers = {"X-API-Key": os.environ["YOU_API_KEY"]}
async with httpx.AsyncClient(timeout=timeout) as client:
for attempt in range(7):
resp = await client.get(url, headers=headers, params=params)
if resp.status_code == 429 and attempt < 6:
wait = float(resp.headers.get("retry-after") or 0) or min(2**attempt, 30)
await asyncio.sleep(wait + random.uniform(0, 2))
continue
resp.raise_for_status()
return resp.json()
resp.raise_for_status()
return resp.json()
def _domain(url: str) -> str:
from urllib.parse import urlparse
try:
return urlparse(url).netloc.replace("www.", "")
except Exception:
return ""
def _favicon(item: dict, url: str) -> str:
return item.get("favicon_url") or (
f"https://ydc-index.io/favicon?domain={_domain(url)}&size=128"
)
@flyte.trace
async def you_search(query: str, count: int = 8, freshness: str = "week") -> list[SearchHit]:
"""Call the You.com Search API and return unified web + news hits."""
params = {"query": query, "count": count, "freshness": freshness}
data = await _you_get(YOU_SEARCH_URL, params)
results = data.get("results", {})
hits: list[SearchHit] = []
for section in ("news", "web"):
for item in results.get(section, []) or []:
snippets = item.get("snippets") or []
url = item.get("url", "")
hits.append(
SearchHit(
title=item.get("title", ""),
url=url,
domain=_domain(url),
snippet=(snippets[0] if snippets else item.get("description", "")),
published=item.get("page_age", "") or "",
author=", ".join(item.get("authors") or []),
favicon=_favicon(item, url),
thumbnail=item.get("thumbnail_url", "") or "",
section=section,
)
)
return hits
We use @flyte.trace to track intermediate steps within a task, like You.com API calls and LLM invocations. Each traced call appears as a span in the Flyte dashboard with its inputs and outputs captured.
Extract deltas with Claude
A shared llm_json helper routes to Claude through LiteLLM and parses structured JSON from the response.
@flyte.trace
async def llm_json(system: str, user: str) -> dict | list:
"""Call Claude via LiteLLM and parse a JSON response."""
from litellm import acompletion
resp = await acompletion(
model=MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
temperature=0.0,
max_tokens=2048,
)
content = resp.choices[0].message.content
return _parse_json(content)
def _parse_json(text: str) -> dict | list:
text = text.strip()
if text.startswith("```"):
text = text.split("```", 2)[1]
if text.lstrip().startswith("json"):
text = text.lstrip()[4:]
start = min(
(i for i in (text.find("{"), text.find("[")) if i != -1),
default=0,
)
end = max(text.rfind("}"), text.rfind("]")) + 1
return json.loads(text[start:end])
Watch one competitor
The watch_competitor task builds a category-scoped search query, calls the You.com Search API, and asks Claude to extract only changes that are supported by a specific search result. Each delta carries a confidence score and a link to its source hit.
@env.task(retries=3)
async def watch_competitor(
competitor: str,
categories: list[str],
freshness: str,
) -> CompetitorWatch:
"""Search for fresh signals on one competitor and extract structured deltas."""
query = (
f"{competitor} "
+ " OR ".join(categories)
+ " announcement OR news OR update"
)
hits = await you_search(query, count=8, freshness=freshness)
if not hits:
return CompetitorWatch(competitor=competitor)
evidence = "\n\n".join(
f"[{i + 1}] {h.title} ({h.published}) — {h.domain}\n{h.url}\n{h.snippet}"
for i, h in enumerate(hits)
)
user = (
f"Competitor: {competitor}\n"
f"Categories to watch: {', '.join(categories)}\n\n"
f"Search results:\n{evidence}"
)
parsed = await llm_json(EXTRACT_SYSTEM, user)
raw_deltas = parsed.get("deltas", []) if isinstance(parsed, dict) else []
deltas: list[Delta] = []
cited: list[SearchHit] = []
for d in raw_deltas:
idx = int(d.get("source_index", 0) or 0)
src = hits[idx - 1] if 1 <= idx <= len(hits) else None
if src is not None and src not in cited:
cited.append(src)
deltas.append(
Delta(
competitor=competitor,
category=str(d.get("category", "unknown")),
summary=str(d.get("summary", "")),
confidence=float(d.get("confidence", 0.0) or 0.0),
source=src,
)
)
return CompetitorWatch(competitor=competitor, deltas=deltas, sources=cited)
Orchestration
The competitive_intelligence driver task fans out across all competitors with asyncio.gather, aggregates the results, and renders a Flyte report.
@env.task(report=True)
async def competitive_intelligence(
competitors: list[str] = [
"Anthropic",
"OpenAI",
"Mistral AI",
"Google DeepMind",
"Cohere",
"Perplexity AI",
"xAI",
"Hugging Face",
"Databricks",
"Together AI",
],
categories: list[str] = [
"pricing",
"product launch",
"model release",
"funding",
"leadership",
"partnership",
],
freshness: str = "week",
) -> IntelReport:
"""Fan out across competitors and aggregate structured deltas."""
with flyte.group("watch-competitors"):
results = await asyncio.gather(
*[watch_competitor(c, categories, freshness) for c in competitors]
)
report = IntelReport(watches=list(results))
await flyte.report.replace.aio(_render_report(report), do_flush=True)
await flyte.report.flush.aio()
return report
Run the agent
Create secrets
Get a You.com API key from the You.com platform (see the quickstart guide). Get an Anthropic API key from the Anthropic console.
Register both keys as Flyte secrets. The secret key names must match those declared in the TaskEnvironment:
flyte create secret youdotcom-api-key <YOUR_YOU_API_KEY>
flyte create secret internal-anthropic-api-key <YOUR_ANTHROPIC_API_KEY>See Secrets for scoping and file-based secrets.
Run locally or remotely
From the example directory:
cd v2/tutorials/competitive_intelligence_agent
uv run --script main.pyOr pass custom competitors with the Flyte CLI:
flyte run main.py competitive_intelligence \
--competitors '["Anthropic", "OpenAI"]'To test locally without Flyte secrets, export the environment variables directly:
export YOU_API_KEY=<YOUR_YOU_API_KEY>
export ANTHROPIC_API_KEY=<YOUR_ANTHROPIC_API_KEY>
uv run --script main.pyWhen the run completes, open the Flyte report in the UI to review deltas grouped by competitor, each with a clickable You.com source citation.