Run LLM-generated code
Code available here.
This example demonstrates how to run code generated by a large language model (LLM) using a ContainerTask
.
The agent takes a user’s question, generates Flyte 2 code using the Flyte 2 documentation as context, and runs it in an isolated container.
If the execution fails, the agent reflects on the error and retries
up to a configurable limit until it succeeds.
Using ContainerTask
ensures that all generated code runs in a secure environment.
This gives you full flexibility to execute arbitrary logic safely and reliably.
What this example demonstrates
- How to combine LLM generation with programmatic execution.
- How to run untrusted or dynamically generated code securely.
- How to iteratively improve code using agent-like behavior.
Setting up the agent environment
Let’s start by importing the necessary libraries and setting up the environment for the agent task.
This example follows the uv
script format to declare dependencies.
# /// script
# requires-python = "==3.13"
# dependencies = [
# "flyte>=0.2.0b33",
# "langchain-core==0.3.66",
# "langchain-openai==0.3.24",
# "langchain-community==0.3.26",
# "beautifulsoup4==4.13.4",
# "docker==7.1.0",
# ]
# ///
import tempfile
from typing import Optional
from langchain_core.runnables import Runnable
from pydantic import BaseModel, Field
import flyte
from flyte.extras import ContainerTask
from flyte.io import File
env = flyte.TaskEnvironment(
name="code_runner",
secrets=[flyte.Secret(key="openai_api_key", as_env_var="OPENAI_API_KEY")],
image=flyte.Image.from_uv_script(__file__, name="code-runner-agent"),
resources=flyte.Resources(cpu=1),
)
You can set up access to the OpenAI API using a Flyte secret.
flyte create secret openai_api_key <YOUR_OPENAI_API_KEY>
We store the LLM-generated code in a structured format. This allows us to:
- Enforce consistent formatting
- Make debugging easier
- Log and analyze generations systematically
By capturing metadata alongside the raw code, we maintain transparency and make it easier to iterate or trace issues over time.
class Code(BaseModel):
"""Schema for code solutions to questions about Flyte v2."""
prefix: str = Field(
default="", description="Description of the problem and approach"
)
imports: str = Field(
default="", description="Code block with just import statements"
)
code: str = Field(
default="", description="Code block not including import statements"
)
We then define a state model to persist the agent’s history across iterations. This includes previous messages, generated code, and any errors encountered.
Maintaining this history allows the agent to reflect on past attempts, avoid repeating mistakes, and iteratively improve the generated code.
class AgentState(BaseModel):
messages: list[dict[str, str]] = Field(default_factory=list)
generation: Code = Field(default_factory=Code)
iterations: int = 0
error: str = "no"
output: Optional[str] = None
Retrieve docs
We define a task to load documents from a given URL and concatenate them into a single string. This string is then used as part of the LLM prompt.
We set max_depth = 20
to avoid loading an excessive number of documents.
However, even with this limit, the resulting context can still be quite large.
To handle this, we use an LLM (GPT-4 in this case) that supports extended context windows.
Appending all documents into a single string can result in extremely large contexts, potentially exceeding the LLM’s token limit. If your dataset grows beyond what a single prompt can handle, there are a couple of strategies you can use. One option is to apply Retrieval-Augmented Generation (RAG), where you chunk the documents, embed them using a model, store the vectors in a vector database, and retrieve only the most relevant pieces at inference time.
An alternative approach is to pass references to full files into the prompt, allowing the LLM to decide which files are most relevant based on natural-language search over file paths, summaries, or even contents. This method assumes that only a subset of files will be necessary for a given task, and the LLM is responsible for navigating the structure and identifying what to read. While this can be a lighter-weight solution for smaller datasets, its effectiveness depends on how well the LLM can reason over file references and the reliability of its internal search heuristics.
@env.task
async def docs_retriever(url: str) -> str:
from bs4 import BeautifulSoup
from langchain_community.document_loaders.recursive_url_loader import (
RecursiveUrlLoader,
)
loader = RecursiveUrlLoader(
url=url, max_depth=20, extractor=lambda x: BeautifulSoup(x, "html.parser").text
)
docs = loader.load()
# Sort the list based on the URLs and get the text
d_sorted = sorted(docs, key=lambda x: x.metadata["source"])
d_reversed = list(reversed(d_sorted))
concatenated_content = "\n\n\n --- \n\n\n".join(
[doc.page_content for doc in d_reversed]
)
return concatenated_content
Code generation
Next, we define a utility function to construct the LLM chain responsible for generating Python code from user input. This chain leverages
a LangChain PromptTemplate
to structure the input and an OpenAI chat model to generate well-formed, Flyte 2-compatible Python scripts.
async def generate_code_gen_chain(debug: bool) -> Runnable:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# Grader prompt
code_gen_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""
You are a coding assistant with expertise in Python.
You are able to execute the Flyte v2 code locally in a sandbox environment.
Use the following pattern to execute the code:
<code>
if __name__ == "__main__":
flyte.init()
print(flyte.run(...))
</code>
Your response will be shown to the user.
Here is a full set of documentation:
-------
{context}
-------
Answer the user question based on the above provided documentation.
Ensure any code you provide can be executed with all required imports and variables defined.
Structure your answer with a description of the code solution.
Then list the imports. And finally list the functioning code block.
Here is the user question:""",
),
("placeholder", "{messages}"),
]
)
expt_llm = "gpt-4o" if not debug else "gpt-4o-mini"
llm = ChatOpenAI(temperature=0, model=expt_llm)
code_gen_chain = code_gen_prompt | llm.with_structured_output(Code)
return code_gen_chain
We then define a generate
task responsible for producing the code solution.
To improve clarity and testability, the output is structured in three parts:
a short summary of the generated solution, a list of necessary imports,
and the main body of executable code.
@env.task
async def generate(
question: str, state: AgentState, concatenated_content: str, debug: bool
) -> AgentState:
"""
Generate a code solution
Args:
question (str): The user question
state (dict): The current graph state
concatenated_content (str): The concatenated docs content
debug (bool): Debug mode
Returns:
state (dict): New key added to state, generation
"""
print("---GENERATING CODE SOLUTION---")
messages = state.messages
iterations = state.iterations
error = state.error
# We have been routed back to generation with an error
if error == "yes":
messages += [
{
"role": "user",
"content": (
"Now, try again. Invoke the code tool to structure the output "
"with a prefix, imports, and code block:"
),
}
]
code_gen_chain = await generate_code_gen_chain(debug)
# Solution
code_solution = code_gen_chain.invoke(
{
"context": concatenated_content,
"messages": (
messages if messages else [{"role": "user", "content": question}]
),
}
)
messages += [
{
"role": "assistant",
"content": f"{code_solution.prefix} \n Imports: {code_solution.imports} \n Code: {code_solution.code}",
}
]
return AgentState(
messages=messages,
generation=code_solution,
iterations=iterations + 1,
error=error,
output=state.output,
)
A ContainerTask
then executes this code in an isolated container environment.
It takes the code as input, runs it safely, and returns the program’s output and exit code.
code_runner_task = ContainerTask(
name="run_flyte_v2",
image="ghcr.io/unionai-oss/flyte:py3.13-v2.0.0b0",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs={"script": File},
outputs={"result": str, "exit_code": str},
command=[
"/bin/bash",
"-c",
(
"set -o pipefail && "
"uv run --script /var/inputs/script > /var/outputs/result 2>&1; "
"echo $? > /var/outputs/exit_code"
),
],
resources=flyte.Resources(cpu=1, memory="1Gi"),
)
This task verifies that the generated code runs as expected. It tests the import statements first, then executes the full code. It records the output and any error messages in the agent state for further analysis.
@env.task
async def code_check(state: AgentState) -> AgentState:
"""
Check code
Args:
state (dict): The current graph state
Returns:
state (dict): New key added to state, error
"""
print("---CHECKING CODE---")
# State
messages = state.messages
code_solution = state.generation
iterations = state.iterations
# Get solution components
imports = code_solution.imports.strip()
code = code_solution.code.strip()
# Create temp file for imports
with tempfile.NamedTemporaryFile(
mode="w", suffix=".py", delete=False
) as imports_file:
imports_file.write(imports + "\n")
imports_path = imports_file.name
# Create temp file for code body
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as code_file:
code_file.write(imports + "\n" + code + "\n")
code_path = code_file.name
# Check imports
import_output, import_exit_code = await code_runner_task(
script=await File.from_local(imports_path)
)
if import_exit_code.strip() != "0":
print("---CODE IMPORT CHECK: FAILED---")
error_message = [
{
"role": "user",
"content": f"Your solution failed the import test: {import_output}",
}
]
messages += error_message
return AgentState(
generation=code_solution,
messages=messages,
iterations=iterations,
error="yes",
output=import_output,
)
else:
print("---CODE IMPORT CHECK: PASSED---")
# Check execution
code_output, code_exit_code = await code_runner_task(
script=await File.from_local(code_path)
)
if code_exit_code.strip() != "0":
print("---CODE BLOCK CHECK: FAILED---")
error_message = [
{
"role": "user",
"content": f"Your solution failed the code execution test: {code_output}",
}
]
messages += error_message
return AgentState(
generation=code_solution,
messages=messages,
iterations=iterations,
error="yes",
output=code_output,
)
else:
print("---CODE BLOCK CHECK: PASSED---")
# No errors
print("---NO CODE TEST FAILURES---")
return AgentState(
generation=code_solution,
messages=messages,
iterations=iterations,
error="no",
output=code_output,
)
If an error occurs, a separate task reflects on the failure and generates a response. This reflection is added to the agent state to guide future iterations.
@env.task
async def reflect(
state: AgentState, concatenated_content: str, debug: bool
) -> AgentState:
"""
Reflect on errors
Args:
state (dict): The current graph state
concatenated_content (str): Concatenated docs content
debug (bool): Debug mode
Returns:
state (dict): New key added to state, reflection
"""
print("---REFLECTING---")
# State
messages = state.messages
iterations = state.iterations
code_solution = state.generation
# Prompt reflection
code_gen_chain = await generate_code_gen_chain(debug)
# Add reflection
reflections = code_gen_chain.invoke(
{"context": concatenated_content, "messages": messages}
)
messages += [
{
"role": "assistant",
"content": f"Here are reflections on the error: {reflections}",
}
]
return AgentState(
generation=code_solution,
messages=messages,
iterations=iterations,
error=state.error,
output=state.output,
)
Finally, we define a main
task that runs the code agent and orchestrates the steps above.
If the code execution fails, we reflect on the error and retry until we reach the maximum number of iterations.
@env.task
async def main(
question: str = (
"Define a two-task pattern where the second catches OOM from the first and retries with more memory."
),
url: str = "https://pre-release-v2.docs-builder.pages.dev/docs/byoc/user-guide/",
max_iterations: int = 3,
debug: bool = False,
) -> str:
concatenated_content = await docs_retriever(url=url)
state: AgentState = AgentState()
iterations = 0
while True:
with flyte.group(f"code-generation-pass-{iterations + 1}"):
state = await generate(question, state, concatenated_content, debug)
state = await code_check(state)
error = state.error
iterations = state.iterations
if error == "no" or iterations >= max_iterations:
print("---DECISION: FINISH---")
code_solution = state.generation
prefix = code_solution.prefix
imports = code_solution.imports
code = code_solution.code
code_output = state.output
return f"""{prefix}
{imports}
{code}
Result of code execution:
{code_output}
"""
else:
print("---DECISION: RE-TRY SOLUTION---")
state = await reflect(state, concatenated_content, debug)
if __name__ == "__main__":
flyte.init_from_config("config.yaml")
run = flyte.run(main)
print(run.url)
Running the code agent
If things are working properly, you should see output similar to the following:
---GENERATING CODE SOLUTION---
---CHECKING CODE---
---CODE BLOCK CHECK: PASSED---
---NO CODE TEST FAILURES---
---DECISION: FINISH---
In this solution, we define two tasks using Flyte v2.
The first task, `oomer`, is designed to simulate an out-of-memory (OOM) error by attempting to allocate a large list.
The second task, `failure_recovery`, attempts to execute `oomer` and catches any OOM errors.
If an OOM error is caught, it retries the `oomer` task with increased memory resources.
This pattern demonstrates how to handle resource-related exceptions and dynamically adjust task configurations in Flyte workflows.
import asyncio
import flyte
import flyte.errors
env = flyte.TaskEnvironment(name="oom_example", resources=flyte.Resources(cpu=1, memory="250Mi"))
@env.task
async def oomer(x: int):
large_list = [0] * 100000000 # Simulate OOM
print(len(large_list))
@env.task
async def always_succeeds() -> int:
await asyncio.sleep(1)
return 42
...
You can run the code agent on a Flyte/Union cluster using the following command:
uv run --prerelease=allow agent.py