Flyte LLM context

The following document provides comprehensive LLM context for authoring and running Flyte 2 workflows. It serves as a reference for LLM-based AI assistants to understand how to properly write, configure, and execute Flyte v2 workflows.

Download the document  flyte-context.txt or copy it directly from this page, below.

You can then add it to the context window of your LLM-based AI assistant to help it better understand Flyte 2 development.

# Flyte v2 Workflow Authoring and Execution Context

This document provides comprehensive context for authoring and running Flyte v2 workflows based on Union.ai's official documentation. It serves as a reference for AI assistants to understand how to properly write, configure, and execute Flyte v2 workflows.

## Overview

Flyte v2 is a workflow orchestrator focused on AI development teams to rapidly ship high-quality code to production. Key characteristics:

- **Pure Python**: Workflow orchestration using native Python syntax
- **Asynchronous Model**: Built for modern async/await paradigms
- **Performance & Scale**: Optimized for complex AI workloads
- **Beta Status**: Currently in 2.0 beta with active development

## Getting Started

### Prerequisites and Setup

**Required Tools:**
- `uv` tool (Python package manager)
- `flyte` Python package

**Configuration Methods:**
1. **Configuration File** (`config.yaml`)
2. **Inline Configuration** (CLI flags)
3. **Environment Variables**

**Creating Configuration:**
```bash
flyte create config  # Generates config.yaml
```

**Configuration Structure:**
```yaml
admin:
  endpoint: "https://your-union-instance.com"
image:
  builder: "local"  # or "remote"
task:
  domain: "development"
  org: "your_org"
  project: "your_project"
```

**Configuration File Locations:**
- `./config.yaml` (current directory)
- `~/.union/config.yaml`
- `~/.flyte/config.yaml`

**Verification:**
```bash
flyte get config  # Check current configuration
```

### Running Workflows

**Remote Execution:**
```bash
# Command line
flyte run hello.py main

# Python
flyte.init_from_config()
flyte.run(main, name="Ada")
```

**Local Execution (Testing/Debugging):**
```bash
# Command line
flyte run --local hello.py main

# Python
flyte.init_from_config()
flyte.with_runcontext(mode="local").run(main)
```

## Task Configuration

### Container Images

**Direct Image Reference:**
```python
env = flyte.TaskEnvironment(
    name="my_task_env",
    image="docker.io/myorg/myimage"
)
```

**Using flyte.Image Object:**
```python
image = flyte.Image.from_debian_base() \
    .with_apt_packages(["git"]) \
    .with_pip_packages(["pandas", "numpy"]) \
    .with_env_vars({"MY_VAR": "value"})

env = flyte.TaskEnvironment(
    name="my_env",
    image=image
)
```

**Image Builder Configuration:**
- `local`: Requires Docker login
- `remote`: Uses Union ImageBuilder service

### Caching

**Cache Modes:**
```python
# Auto caching (recommended for most cases)
@env.task(cache="auto")
def my_task():
    pass

# Manual cache version control
@env.task(cache="override", cache_version="v1.2")
def my_task():
    pass

# Disable caching (default)
@env.task(cache="disable")
def my_task():
    pass
```

**Cache Key Components:**
- Final inputs
- Task name
- Interface hash
- Cache version

**Best Practices:**
- Use "auto" caching for development and most production scenarios
- Avoid caching functions with side effects
- Use "override" for explicit cache control
- Consider performance and storage implications

### Secrets Management

**Creating Secrets:**
```bash
flyte create secret MY_SECRET_KEY my_secret_value
# Optional scoping: --project project_name --domain domain_name
```

**Using Secrets in Tasks:**
```python
env = flyte.TaskEnvironment(
    secrets=[
        flyte.Secret(key="MY_SECRET_KEY", as_env_var="MY_SECRET_ENV_VAR")
    ]
)

@env.task
def secure_task():
    secret_value = os.getenv("MY_SECRET_ENV_VAR")
    # Use secret_value safely
```

**Important:**
- Secrets are scoped at organization, project, or domain levels
- Never return secret values from tasks
- Always access via environment variables

### Reusable Containers

**Configuration:**
```python
env = flyte.TaskEnvironment(
    reusable=flyte.ReusePolicy(
        replicas=2,        # Number of container instances
        idle_ttl=300       # 5 minutes idle timeout
    )
)
```

**Best Use Cases:**
- Frequent, short-duration tasks
- Tasks with expensive initialization
- Batch processing
- Development/testing scenarios

**Avoid When:**
- Long-running tasks
- Large memory consumption without cleanup
- Tasks modifying global state

## Task Programming

### Files and Directories

**Special Data Types:**
- `flyte.io.File`: For individual files
- `flyte.io.Dir`: For directories
- `flyte.io.DataFrame`: For structured data

**Key Features:**
- Offloaded data types (store references, not data)
- Efficient handling of large files
- Support for both sync and async methods

**Example Usage:**
```python
@env.task
def process_file(input_file: flyte.io.File) -> flyte.io.File:
    # Read file content
    content = input_file.read()

    # Process content
    result = process_data(content)

    # Create output file
    return flyte.io.File.from_string(result, "output.txt")

@env.task
def upload_local():
    # Upload local file
    return flyte.io.File.from_local("./local_file.txt")
```

### Dataclasses and Structures

**Supported Types:**
- Python dataclasses
- Pydantic models
- Nested structures
- Lists of dataclasses

**Example:**
```python
@dataclass
class InferenceRequest:
    model_name: str
    input_data: List[str]
    batch_size: int = 32

@env.task
def batch_inference(requests: List[InferenceRequest]) -> List[dict]:
    results = []
    for request in requests:
        # Process each request
        result = run_inference(request.model_name, request.input_data)
        results.append(result)
    return results
```

**Key Considerations:**
- Fields must be serializable
- Data is serialized/deserialized between tasks
- Can include Flyte's offloaded types as fields

### Reports

**Basic Report Usage:**
```python
@env.task(report=True)
def generate_report():
    flyte.report.log("<h1>Processing Started</h1>")

    # Do work
    process_data()

    flyte.report.log("<p>Processing completed successfully</p>")
    flyte.report.flush()  # Send to UI
```

**Advanced Reports:**
```python
@env.task(report=True)
def streaming_report():
    tab = flyte.report.get_tab("Progress")

    for i in range(100):
        tab.replace(f"<div>Progress: {i}%</div>")
        flyte.report.flush()
        time.sleep(0.1)
```

### Error Handling

**Pattern:**
```python
@env.task
async def resilient_workflow():
    try:
        result = await potentially_failing_task()
        return result
    except flyte.errors.OOMError:
        # Retry with more resources
        return await retry_with_more_memory()
    finally:
        # Cleanup tasks always run
        await cleanup_task()
```

**Philosophy:**
- Treat certain errors as "expected and acceptable"
- Leverage Python's native error handling
- Support dynamic resource adjustment

### Traces

**Purpose:**
- Fine-grained observability of helper functions
- Checkpoint creation for workflow resumption
- Track execution time, inputs, outputs

**Usage:**
```python
@flyte.trace
async def expensive_api_call(query: str) -> dict:
    """External API call with tracing."""
    result = await api_client.query(query)
    return result

@env.task
async def main_task(topic: str):
    # This call will be traced
    data = await expensive_api_call(f"research {topic}")
    return process_data(data)
```

**Best For:**
- External API calls
- Expensive computations
- LLM interactions
- Non-deterministic operations

### Grouping Actions

**Visual Organization:**
```python
@env.task
async def complex_workflow():
    with flyte.group("data-validation"):
        validated_data = await validate_schema(raw_data)
        validated_data = await check_data_quality(validated_data)

    with flyte.group("model-training"):
        model = await train_model(validated_data)
        metrics = await evaluate_model(model)

    return model, metrics
```

**Purpose:**
- Logical clustering of task invocations
- Improved UI visualization
- No impact on execution behavior

### Fanout Patterns

**Parallel Execution:**
```python
@env.task
async def parallel_processing(items: List[str]):
    # Create task invocations
    tasks = [process_item(item) for item in items]

    # Execute in parallel
    results = await asyncio.gather(*tasks)
    return results
```

**Batched Processing:**
```python
@env.task
async def batched_fanout(large_dataset: List[dict]):
    batch_size = 100
    batches = [large_dataset[i:i+batch_size]
              for i in range(0, len(large_dataset), batch_size)]

    # Process batches in parallel
    batch_tasks = [process_batch(batch) for batch in batches]
    batch_results = await asyncio.gather(*batch_tasks)

    # Combine results
    return [item for batch in batch_results for item in batch]
```

### Notebooks Integration

**Initialization:**
```python
import flyte

flyte.init(
    endpoint="https://union.example.com",
    org="example_org",
    project="example_project",
    domain="development",
)
```

**Remote Interaction:**
```python
# Access existing runs
remote = flyte.remote()
runs = remote.list_runs()

# Download logs
run = remote.get_run(run_id)
logs = run.download_logs()
```

## Important Considerations

### Non-deterministic Behavior
- **Problem**: Randomness breaks task recovery and replay
- **Solution**: Use `@flyte.trace` decorator for checkpointing

### Type Safety
- **Note**: Not automatically guaranteed at workflow level (unlike v1)
- **Recommendation**: Use Python type hints and `mypy`

### Global State
- **Limitation**: No global state preservation across containers
- **Requirement**: State must be reconstructable through deterministic execution

### Driver Pod Sizing
- **Consideration**: Parent tasks orchestrating downstream tasks need appropriate resources
- **Avoid**: CPU-intensive operations in driver pods

### Memory Management
- **Risk**: Large data structures can cause OOM errors
- **Solution**: Use `flyte.io.File`, `flyte.io.Dir`, and `flyte.io.DataFrame` for lower memory footprint

## Common Workflow Patterns

### Basic Task Definition
```python
import flyte

# Create environment
env = flyte.TaskEnvironment(
    name="my_environment",
    image="ghcr.io/unionai-oss/flyte:latest"
)

@env.task
def simple_task(x: int) -> int:
    return x * 2

@env.task
async def async_task(data: str) -> dict:
    result = await process_data_async(data)
    return result
```

### Workflow Composition
```python
@env.task
async def data_pipeline(input_data: List[str]) -> dict:
    # Parallel preprocessing
    preprocessed = await asyncio.gather(
        *[preprocess_item(item) for item in input_data]
    )

    # Sequential training steps
    with flyte.group("model-training"):
        features = await extract_features(preprocessed)
        model = await train_model(features)
        metrics = await evaluate_model(model)

    return {"model": model, "metrics": metrics}
```

### File Processing Pipeline
```python
@env.task
def process_dataset(input_dir: flyte.io.Dir) -> flyte.io.File:
    results = []

    # Process files in directory
    for file_path in input_dir.walk():
        if file_path.endswith('.csv'):
            data = pd.read_csv(file_path)
            processed = process_dataframe(data)
            results.append(processed)

    # Combine results
    combined = pd.concat(results)

    # Return as Flyte file
    return flyte.io.File.from_dataframe(combined, "processed_data.parquet")
```

This context provides comprehensive guidance for authoring and executing Flyte v2 workflows, covering all major concepts, patterns, and best practices from the official Union.ai documentation.