Flyte LLM context
The following document provides comprehensive LLM context for authoring and running Flyte 2 workflows. It serves as a reference for LLM-based AI assistants to understand how to properly write, configure, and execute Flyte v2 workflows.
Download the document
You can then add it to the context window of your LLM-based AI assistant to help it better understand Flyte 2 development.
# Flyte v2 Workflow Authoring and Execution Context
This document provides comprehensive context for authoring and running Flyte v2 workflows based on Union.ai's official documentation. It serves as a reference for AI assistants to understand how to properly write, configure, and execute Flyte v2 workflows.
## Overview
Flyte v2 is a workflow orchestrator focused on AI development teams to rapidly ship high-quality code to production. Key characteristics:
- **Pure Python**: Workflow orchestration using native Python syntax
- **Asynchronous Model**: Built for modern async/await paradigms
- **Performance & Scale**: Optimized for complex AI workloads
- **Beta Status**: Currently in 2.0 beta with active development
## Getting Started
### Prerequisites and Setup
**Required Tools:**
- `uv` tool (Python package manager)
- `flyte` Python package
**Configuration Methods:**
1. **Configuration File** (`config.yaml`)
2. **Inline Configuration** (CLI flags)
3. **Environment Variables**
**Creating Configuration:**
```bash
flyte create config # Generates config.yaml
```
**Configuration Structure:**
```yaml
admin:
endpoint: "https://your-union-instance.com"
image:
builder: "local" # or "remote"
task:
domain: "development"
org: "your_org"
project: "your_project"
```
**Configuration File Locations:**
- `./config.yaml` (current directory)
- `~/.union/config.yaml`
- `~/.flyte/config.yaml`
**Verification:**
```bash
flyte get config # Check current configuration
```
### Running Workflows
**Remote Execution:**
```bash
# Command line
flyte run hello.py main
# Python
flyte.init_from_config()
flyte.run(main, name="Ada")
```
**Local Execution (Testing/Debugging):**
```bash
# Command line
flyte run --local hello.py main
# Python
flyte.init_from_config()
flyte.with_runcontext(mode="local").run(main)
```
## Task Configuration
### Container Images
**Direct Image Reference:**
```python
env = flyte.TaskEnvironment(
name="my_task_env",
image="docker.io/myorg/myimage"
)
```
**Using flyte.Image Object:**
```python
image = flyte.Image.from_debian_base() \
.with_apt_packages(["git"]) \
.with_pip_packages(["pandas", "numpy"]) \
.with_env_vars({"MY_VAR": "value"})
env = flyte.TaskEnvironment(
name="my_env",
image=image
)
```
**Image Builder Configuration:**
- `local`: Requires Docker login
- `remote`: Uses Union ImageBuilder service
### Caching
**Cache Modes:**
```python
# Auto caching (recommended for most cases)
@env.task(cache="auto")
def my_task():
pass
# Manual cache version control
@env.task(cache="override", cache_version="v1.2")
def my_task():
pass
# Disable caching (default)
@env.task(cache="disable")
def my_task():
pass
```
**Cache Key Components:**
- Final inputs
- Task name
- Interface hash
- Cache version
**Best Practices:**
- Use "auto" caching for development and most production scenarios
- Avoid caching functions with side effects
- Use "override" for explicit cache control
- Consider performance and storage implications
### Secrets Management
**Creating Secrets:**
```bash
flyte create secret MY_SECRET_KEY my_secret_value
# Optional scoping: --project project_name --domain domain_name
```
**Using Secrets in Tasks:**
```python
env = flyte.TaskEnvironment(
secrets=[
flyte.Secret(key="MY_SECRET_KEY", as_env_var="MY_SECRET_ENV_VAR")
]
)
@env.task
def secure_task():
secret_value = os.getenv("MY_SECRET_ENV_VAR")
# Use secret_value safely
```
**Important:**
- Secrets are scoped at organization, project, or domain levels
- Never return secret values from tasks
- Always access via environment variables
### Reusable Containers
**Configuration:**
```python
env = flyte.TaskEnvironment(
reusable=flyte.ReusePolicy(
replicas=2, # Number of container instances
idle_ttl=300 # 5 minutes idle timeout
)
)
```
**Best Use Cases:**
- Frequent, short-duration tasks
- Tasks with expensive initialization
- Batch processing
- Development/testing scenarios
**Avoid When:**
- Long-running tasks
- Large memory consumption without cleanup
- Tasks modifying global state
## Task Programming
### Files and Directories
**Special Data Types:**
- `flyte.io.File`: For individual files
- `flyte.io.Dir`: For directories
- `flyte.io.DataFrame`: For structured data
**Key Features:**
- Offloaded data types (store references, not data)
- Efficient handling of large files
- Support for both sync and async methods
**Example Usage:**
```python
@env.task
def process_file(input_file: flyte.io.File) -> flyte.io.File:
# Read file content
content = input_file.read()
# Process content
result = process_data(content)
# Create output file
return flyte.io.File.from_string(result, "output.txt")
@env.task
def upload_local():
# Upload local file
return flyte.io.File.from_local("./local_file.txt")
```
### Dataclasses and Structures
**Supported Types:**
- Python dataclasses
- Pydantic models
- Nested structures
- Lists of dataclasses
**Example:**
```python
@dataclass
class InferenceRequest:
model_name: str
input_data: List[str]
batch_size: int = 32
@env.task
def batch_inference(requests: List[InferenceRequest]) -> List[dict]:
results = []
for request in requests:
# Process each request
result = run_inference(request.model_name, request.input_data)
results.append(result)
return results
```
**Key Considerations:**
- Fields must be serializable
- Data is serialized/deserialized between tasks
- Can include Flyte's offloaded types as fields
### Reports
**Basic Report Usage:**
```python
@env.task(report=True)
def generate_report():
flyte.report.log("<h1>Processing Started</h1>")
# Do work
process_data()
flyte.report.log("<p>Processing completed successfully</p>")
flyte.report.flush() # Send to UI
```
**Advanced Reports:**
```python
@env.task(report=True)
def streaming_report():
tab = flyte.report.get_tab("Progress")
for i in range(100):
tab.replace(f"<div>Progress: {i}%</div>")
flyte.report.flush()
time.sleep(0.1)
```
### Error Handling
**Pattern:**
```python
@env.task
async def resilient_workflow():
try:
result = await potentially_failing_task()
return result
except flyte.errors.OOMError:
# Retry with more resources
return await retry_with_more_memory()
finally:
# Cleanup tasks always run
await cleanup_task()
```
**Philosophy:**
- Treat certain errors as "expected and acceptable"
- Leverage Python's native error handling
- Support dynamic resource adjustment
### Traces
**Purpose:**
- Fine-grained observability of helper functions
- Checkpoint creation for workflow resumption
- Track execution time, inputs, outputs
**Usage:**
```python
@flyte.trace
async def expensive_api_call(query: str) -> dict:
"""External API call with tracing."""
result = await api_client.query(query)
return result
@env.task
async def main_task(topic: str):
# This call will be traced
data = await expensive_api_call(f"research {topic}")
return process_data(data)
```
**Best For:**
- External API calls
- Expensive computations
- LLM interactions
- Non-deterministic operations
### Grouping Actions
**Visual Organization:**
```python
@env.task
async def complex_workflow():
with flyte.group("data-validation"):
validated_data = await validate_schema(raw_data)
validated_data = await check_data_quality(validated_data)
with flyte.group("model-training"):
model = await train_model(validated_data)
metrics = await evaluate_model(model)
return model, metrics
```
**Purpose:**
- Logical clustering of task invocations
- Improved UI visualization
- No impact on execution behavior
### Fanout Patterns
**Parallel Execution:**
```python
@env.task
async def parallel_processing(items: List[str]):
# Create task invocations
tasks = [process_item(item) for item in items]
# Execute in parallel
results = await asyncio.gather(*tasks)
return results
```
**Batched Processing:**
```python
@env.task
async def batched_fanout(large_dataset: List[dict]):
batch_size = 100
batches = [large_dataset[i:i+batch_size]
for i in range(0, len(large_dataset), batch_size)]
# Process batches in parallel
batch_tasks = [process_batch(batch) for batch in batches]
batch_results = await asyncio.gather(*batch_tasks)
# Combine results
return [item for batch in batch_results for item in batch]
```
### Notebooks Integration
**Initialization:**
```python
import flyte
flyte.init(
endpoint="https://union.example.com",
org="example_org",
project="example_project",
domain="development",
)
```
**Remote Interaction:**
```python
# Access existing runs
remote = flyte.remote()
runs = remote.list_runs()
# Download logs
run = remote.get_run(run_id)
logs = run.download_logs()
```
## Important Considerations
### Non-deterministic Behavior
- **Problem**: Randomness breaks task recovery and replay
- **Solution**: Use `@flyte.trace` decorator for checkpointing
### Type Safety
- **Note**: Not automatically guaranteed at workflow level (unlike v1)
- **Recommendation**: Use Python type hints and `mypy`
### Global State
- **Limitation**: No global state preservation across containers
- **Requirement**: State must be reconstructable through deterministic execution
### Driver Pod Sizing
- **Consideration**: Parent tasks orchestrating downstream tasks need appropriate resources
- **Avoid**: CPU-intensive operations in driver pods
### Memory Management
- **Risk**: Large data structures can cause OOM errors
- **Solution**: Use `flyte.io.File`, `flyte.io.Dir`, and `flyte.io.DataFrame` for lower memory footprint
## Common Workflow Patterns
### Basic Task Definition
```python
import flyte
# Create environment
env = flyte.TaskEnvironment(
name="my_environment",
image="ghcr.io/unionai-oss/flyte:latest"
)
@env.task
def simple_task(x: int) -> int:
return x * 2
@env.task
async def async_task(data: str) -> dict:
result = await process_data_async(data)
return result
```
### Workflow Composition
```python
@env.task
async def data_pipeline(input_data: List[str]) -> dict:
# Parallel preprocessing
preprocessed = await asyncio.gather(
*[preprocess_item(item) for item in input_data]
)
# Sequential training steps
with flyte.group("model-training"):
features = await extract_features(preprocessed)
model = await train_model(features)
metrics = await evaluate_model(model)
return {"model": model, "metrics": metrics}
```
### File Processing Pipeline
```python
@env.task
def process_dataset(input_dir: flyte.io.Dir) -> flyte.io.File:
results = []
# Process files in directory
for file_path in input_dir.walk():
if file_path.endswith('.csv'):
data = pd.read_csv(file_path)
processed = process_dataframe(data)
results.append(processed)
# Combine results
combined = pd.concat(results)
# Return as Flyte file
return flyte.io.File.from_dataframe(combined, "processed_data.parquet")
```
This context provides comprehensive guidance for authoring and executing Flyte v2 workflows, covering all major concepts, patterns, and best practices from the official Union.ai documentation.