LangGraph research agent

Code available here.

This tutorial combines LangGraph for agentic control flow with Flyte for durable compute. A research pipeline plans sub-topics, fans out ReAct agents that search the web with Tavily, synthesizes findings, and loops on quality gaps until the report is good enough. Each LangGraph step dispatches to a separate Flyte task so planning, research, synthesis, and quality checks appear independently in the run UI.

Flyte provides:

  • Per-step tasks visible in the Flyte UI while LangGraph orchestrates the graph.
  • Secrets for OpenAI and Tavily API keys.
  • Live HTML reports with Mermaid graph visualizations and the final synthesized report.

Define the task environment

langgraph_agent_research.py
main_img = flyte.Image.from_uv_script(__file__, name="langgraph-agent-research", pre=True)

env = flyte.TaskEnvironment(
    name="langgraph-agent-research",
    image=main_img,
    secrets=[
        flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY"),
        flyte.Secret(key="tavily_api_key", as_env_var="TAVILY_API_KEY"),
    ],
    resources=flyte.Resources(cpu=2, memory="2Gi"),
)
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.4.0",
#    "langgraph>=1.0.7",
#    "langchain-openai",
#    "tavily-python",
#    ...
# ]
# ///

Orchestrate the pipeline

The research_pipeline task builds the LangGraph workflow, renders graph diagrams in a report tab, and runs the full plan → research → synthesize → quality-check loop.

langgraph_agent_research.py
@env.task(report=True)
async def research_pipeline(
    query: str,
    num_topics: int = 3,
    max_searches: int = 2,
    max_iterations: int = 2,
) -> PipelineResult:
    """
    Research pipeline workflow:
    1. LangGraph plans sub-topics via plan_topics Flyte task
    2. LangGraph fans out research via Send → each dispatches to research_topic Flyte task
    3. LangGraph synthesizes results via synthesize Flyte task
    4. LangGraph evaluates quality via quality_check Flyte task
    5. If gaps found, loops back to step 2
    """
    log.info(f"Starting research pipeline: {query}")

    anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
    tavily_api_key = os.getenv("TAVILY_API_KEY")

    # Build the pipeline graph, passing all Flyte tasks as compute backends
    pipeline = build_pipeline_graph(
        anthropic_api_key=anthropic_api_key,
        tavily_api_key=tavily_api_key,
        plan_task=plan_topics,
        research_task=research_topic,
        synthesize_task=synthesize,
        quality_check_task=quality_check,
        model=MODEL,
    )

    # Visualize the graphs in report tabs
    graph_tab = flyte.report.get_tab("Agent Graphs")

    png_bytes = pipeline.get_graph().draw_mermaid_png()
    img_b64 = base64.b64encode(png_bytes).decode()
    graph_tab.log(f"""\
<h2>Research Pipeline</h2>\
<img src="data:image/png;base64,{img_b64}" alt="Research pipeline" />""")

    subgraph = build_research_subgraph(anthropic_api_key, tavily_api_key, max_searches, model=MODEL)
    sub_png = subgraph.get_graph().draw_mermaid_png()
    sub_b64 = base64.b64encode(sub_png).decode()
    graph_tab.log(f"""\
<h2>Research Agent (ReAct)</h2>\
<img src="data:image/png;base64,{sub_b64}" alt="ReAct research agent" />""")
    await flyte.report.flush.aio()

    # Run the pipeline — LangGraph controls the flow, Flyte tasks run the compute
    result = await pipeline.ainvoke({
        "query": query,
        "num_topics": num_topics,
        "max_searches": max_searches,
        "max_iterations": max_iterations,
        "iteration": 0,
        "topics": [],
        "research_results": [],
        "synthesis": "",
        "score": 0,
        "gaps": [],
        "final_report": "",
    })

    # Build the final report
    final_report = result["final_report"]
    sub_reports = [TopicReport(**r) for r in result["research_results"]]
    score = result.get("score", 0)
    iteration = result.get("iteration", 1) - 1

    await flyte.report.replace.aio(f"""\
<h2>Research Report</h2>\
<p><b>Query:</b> {query}</p>\
<p><b>Quality:</b> {score}/10 after {iteration} iteration(s)</p>\
<hr/>{md_to_html(final_report)}""")
    await flyte.report.flush.aio()

    log.info(f"Research pipeline complete. Score: {score}/10, Iterations: {iteration}")
    return PipelineResult(
        query=query,
        report=final_report,
        sub_reports=sub_reports,
        score=score,
        iterations=iteration,
    )

Inside each research task, a ReAct subgraph (graph.py) uses @flyte.trace on Tavily search calls for observability.

Run the agent

Create secrets for Anthropic and Tavily:

flyte create secret internal-anthropic-api-key <YOUR_ANTHROPIC_API_KEY>
flyte create secret tavily_api_key <YOUR_TAVILY_API_KEY>

From the example directory:

cd v2/tutorials/langgraph_agent_research
uv run --script langgraph_agent_research.py

Or pass a custom query:

flyte run langgraph_agent_research.py research_pipeline --query "Compare quantum computing approaches"

Check the Agent Graphs report tab for the LangGraph diagram and the main report for the synthesized research output.