Flyte LLM context

The following document provides comprehensive LLM context for authoring and running Flyte workflows. It serves as a reference for LLM-based AI assistants to understand how to properly write, configure, and execute Flyte workflows.

Download the document  flyte-context.txt or copy it directly from this page, below.

You can then add it to the context window of your LLM-based AI assistant to help it better understand Flyte development.

# Flyte v1 Workflow Authoring and Execution Context

This document provides comprehensive context for authoring and running Flyte v1 workflows based on Union.ai's official documentation. It serves as a reference for AI assistants to understand how to properly write, configure, and execute Flyte v1 workflows.

## Overview

Flyte v1 is an open-source platform for orchestrating AI workflows with focus on reliability, scalability, and reproducibility. Key characteristics:

- **Reusable, immutable tasks and workflows**: Versioned entities that ensure reproducibility
- **Declarative resource provisioning**: Task-level resource management
- **GitOps-style versioning**: Version control integration
- **Strongly-typed interfaces**: Type safety between tasks
- **Caching and checkpointing**: Performance optimization
- **Task parallelism**: Concurrent execution capabilities
- **Dynamic workflow creation**: Runtime workflow generation

## Deployment Options

1. **Union.ai Serverless**: Fully managed cloud environment with zero infrastructure management
2. **Union.ai BYOC (Bring Your Own Cloud)**: Managed by Union.ai on your infrastructure
3. **Union.ai Self-managed**: Full control of data, code, and infrastructure

## Getting Started

### Prerequisites and Setup

**System Requirements:**
- Python: `>=3.9,<3.13` (recommended: 3.12)
- `uv` package manager

**Installation Steps:**
```bash
# Install uv (package manager)
curl -LsSf https://astral.sh/uv/install.sh | sh

# Install Python
uv python install 3.12

# Install Union CLI
uv tool install union

# Configure cluster connection
union create login --host <union-host-url>

# Verify configuration
union info
```

**Configuration:**
- Default config location: `~/.union/config.yaml`
- Can override with `UNION_CONFIG` environment variable or `--config` flag
- Remove deprecated `~/.unionai/` directory to avoid conflicts

### First Project Setup

**Create Project in UI:**
1. Click "All projects"
2. Select "New Project"
3. Name the project

**Initialize Local Project:**
```bash
union init --template union-simple my-project
```

**Generated Project Structure:**
```
├── LICENSE
├── README.md
├── hello_world.py
├── pyproject.toml
└── uv.lock
```

## Core Concepts

### Tasks

Tasks are fundamental computational units with these characteristics:

**Key Features:**
- **Independently Executable**: Can run in isolation and be unit tested locally
- **Strongly Typed**: Input/output validation at deployment using Python type annotations
- **Containerized**: Run in independent Kubernetes pods with custom dependencies
- **Versioned and Immutable**: Named with project, domain, and name; updating creates new version

**Task Types:**
- Standard Python tasks
- Map tasks (for parallel processing)
- Raw container tasks
- Plugin tasks

**Basic Task Definition:**
```python
from union import task, ImageSpec
from union.types import Resources

# Define container image
image_spec = ImageSpec(
    name="my-image",
    requirements="requirements.txt",
    registry="ghcr.io/myorg"
)

@task(
    container_image=image_spec,
    requests=Resources(cpu="1", mem="1Gi"),
    limits=Resources(cpu="2", mem="2Gi"),
    cache=True
)
def my_task(x: int, y: str) -> str:
    return f"Processing {x}: {y}"
```

### Workflows

Workflows compile tasks into directed acyclic graphs (DAGs):

**Workflow Types:**
1. **Standard Workflows**: Basic workflow composition
2. **Subworkflows**: Nested within other workflows for modularity
3. **Dynamic Workflows**: Flexible runtime workflow generation
4. **Imperative Workflows**: Programmatic workflow construction

**Basic Workflow Definition:**
```python
from union import workflow

@workflow
def my_workflow(name: str = "world") -> str:
    greeting = say_hello(name=name)
    processed = process_greeting(greeting=greeting)
    return processed
```

**Important Conventions:**
- Use keyword arguments when invoking tasks/workflows
- Type hints are mandatory
- Tasks/workflows must be defined at top-level module scope

### Launch Plans

Launch plans are workflow templates that include:
- A specific workflow
- Partial or complete workflow inputs
- Optional notifications and schedules

**Characteristics:**
- Every workflow has a default launch plan
- Launch plans are versioned
- Multiple launch plans per workflow allowed
- Can be registered via CLI or programmatically

### Actors

Actors enable container reuse between tasks to reduce startup overhead:

```python
from union import ActorEnvironment, Resources

actor = ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    requests=Resources(cpu="2", mem="300Mi")
)

@actor.task
def actor_task() -> str:
    return "hello from actor"
```

**Benefits:**
- Minimize startup costs for complex initialization
- Maintain persistent environment across executions
- Support for `@actor_cache` decorator

### Artifacts

Artifacts are intermediate outputs with unique identifiers:

**Features:**
- Up to 10 partition keys for metadata
- Optional `time_partition` for execution timestamp
- Queryable to find recent versions
- Lineage tracking in UI
- Semantic indexing of outputs

### Caching

**Configuration Levels:**
- Task level: `@task(cache=True)`
- Workflow level
- Launch plan level

**Cache Configuration:**
```python
from union import Cache

@task(cache=Cache(version="1.0", serialize=True))
def cached_task(x: int) -> int:
    return expensive_computation(x)
```

**Cache Key Components:**
- Project
- Domain
- Cache version
- Node signature
- Input values

**Cache Features:**
- `ignored_inputs`: Exclude specific inputs from cache key
- `serialize`: Ensure only one instance runs concurrently
- Custom hash methods for complex objects

### Workspaces

VSCode development environments for Union.ai:

**Features:**
- Develop and debug tasks/workflows
- Production-like environment
- Persistent file storage
- Custom container images
- Resource specification (CPU, memory, GPU)
- Secrets integration

## Authentication

**Three Authentication Methods:**

1. **PKCE (Default)**: For local machines with browser access
2. **DeviceFlow**: For remote machines without browser (SSH sessions)
3. **ClientSecret**: For CI/CD and automation using `UNION_API_KEY`

**Configuration:**
- Config location: `~/.union/config.yaml`
- Override with `UNION_CONFIG` environment variable
- Use `--config` flag for commands

## Project Structure

**Recommended Directory Layout:**
```
├── .github/workflows/        # CI/CD workflows
├── .gitignore
├── docs/
├── src/
│   ├── core/                # Business logic
│   ├── tasks/               # Individual tasks
│   ├── workflows/           # Workflow definitions
│   └── orchestration/       # Helper constructs
├── uv.lock
└── pyproject.toml
```

**Key Principles:**
- Include `__init__.py` files for proper imports
- Separate concerns into different directories
- Store each task in its own file/module
- Use docstrings for Union.ai UI context

## Running Workflows

### Local Execution
```bash
# Set up environment
uv sync
source .venv/bin/activate

# Run locally for testing
union run hello_world.py hello_world_wf --name="everybody"
```

### Remote Execution
```bash
# Register and run on Union.ai
union run --remote --project my-project --domain development

# Registration only (no immediate execution)
union register --project my-project --domain development .
```

### Execution Interface
- Navigate to Workflows in UI
- Search and select workflow
- Click "Launch Workflow"
- Configure execution parameters
- Monitor workflow status and graph

## Data Input/Output

### Files and Directories

Flyte v1 uses special types for efficient data handling:
- `FlyteFile`: Individual files
- `FlyteDirectory`: Directory structures
- `StructuredDataset`: Structured data (DataFrames)

### Dataclasses

**Full Support for Complex Data Structures:**
```python
from dataclasses import dataclass
from union import task

@dataclass
class ProcessingRequest:
    model_name: str
    input_data: List[str]
    batch_size: int = 32

@task
def process_batch(request: ProcessingRequest) -> dict:
    # Process the request
    return {"status": "completed", "items": len(request.input_data)}
```

**Key Features:**
- All variables must be type-annotated
- Support for nested dataclasses
- Can include Flyte-specific types
- JSON input support for CLI execution

### Enums

**String-only Enums:**
```python
from enum import Enum

class Coffee(Enum):
    ESPRESSO = "espresso"
    AMERICANO = "americano"
    LATTE = "latte"

@task
def prepare_coffee(order: Coffee) -> str:
    return f"Preparing {order.value}"
```

**Characteristics:**
- Only string values supported
- First value used as default
- Cannot be optional
- Automatically converted by Union.ai

### Framework Integrations

**PyTorch Support:**
- Native PyTorch tensor passing
- `PyTorchCheckpoint` for model serialization
- Automatic device conversion (GPU/CPU)
- Support for hyperparameters, optimizer state

**TensorFlow Support:**
- `tf.keras.Model` serialization
- `TFRecordFile` and `TFRecordsDirectory` types
- Configurable compression and buffer settings

## Programming Patterns

### Chaining Entities

**Sequential Execution with `>>` Operator:**
```python
@workflow
def chain_tasks_wf():
    t0_promise = task_0()
    t1_promise = task_1()
    t2_promise = task_2()

    t0_promise >> t1_promise >> t2_promise
```

**Note**: Not supported in local Python environments

### Conditionals

**Dynamic Branching:**
```python
from union import conditional

@workflow
def conditional_wf(x: int) -> str:
    return (
        conditional("branch_logic")
        .if_(x > 10)
        .then(large_number_task(x=x))
        .elif_(x > 0)
        .then(small_number_task(x=x))
        .else_()
        .then(zero_or_negative_task(x=x))
    )
```

**Features:**
- Uses bitwise operators (`&`, `|`) instead of logical
- Supports `.if_()`, `.elif_()`, `.else_()` methods
- Nested conditionals allowed
- Boolean method checks (`.is_true()`, `.is_false()`)

## Workflow Building Best Practices

### Task Decomposition Strategy

**Consider These Factors:**
1. **Runtime Requirements**: Separate tasks with different computational needs
2. **Caching Performance**: Fine-grained checkpoints minimize redundant work
3. **Interruptible Tasks**: Enable cheaper spot instances

### Parallelization

**Map Tasks and Dynamic Workflows:**
```python
from union import map_task

@task
def process_item(item: str) -> str:
    return f"processed: {item}"

@workflow
def parallel_workflow(items: List[str]) -> List[str]:
    return map_task(process_item)(items)
```

**Key Principles:**
- "Parallelize early and often"
- Consider task overhead for very short operations
- Use Actors to reduce task startup costs

### Caching Strategy

**Best Practices:**
- Enable caching after functionality stabilizes
- Add explicit cache keys for control
- Useful during development and resource-constrained production
- Use `overwrite-cache` flag to force re-execution

## Resource Management

### Task-Level Resource Configuration

```python
from union import Resources

@task(
    requests=Resources(cpu="1", mem="1Gi", gpu="1"),
    limits=Resources(cpu="2", mem="2Gi", gpu="1")
)
def resource_intensive_task():
    pass
```

### Resource Monitoring

**Resources Dashboard Features:**
- Workflow and task execution insights
- Resource consumption tracking
- Filtering by project, domain, time period
- Quota management using Kubernetes namespaces

### Cost Allocation

**Cost Tracking:**
- Total cost estimation (memory, CPU, GPU, unused compute)
- Resource utilization metrics
- Breakdowns by project, domain, workflow/task
- 60-day lookback period
- Stacked bar charts for cost visualization

**Cost Components:**
- Allocated cost (direct resource usage)
- Overhead cost (infrastructure costs)
- Proportional allocation methodology

## Administration

### User Management

**Role-Based Access Control:**

**Built-in Policies:**
- **Admin**: Full permissions across organization
- **Contributor**: Register and execute workflows, tasks, launch plans
- **Viewer**: View workflows, tasks, launch plans, executions

**Actions Available:**
- `administer_project`: Manage project resources
- `manage_permissions`: Manage user/application policies
- `create_flyte_executions`: Launch new executions
- `register_flyte_inventory`: Register workflows, tasks, launch plans
- `view_flyte_executions`: View execution history
- `view_flyte_inventory`: View registered entities

**Management Features:**
- Custom roles via `uctl` CLI
- Organization, project, or domain level policies
- Multiple policies per user with combined permissions

### Secrets Management

**Creating Secrets:**
```bash
# Command line creation
union create secret my_secret_name

# From file
union create secret my_secret -f /path/to/secret_file

# With scoping
union create secret my_secret --project proj --domain dev
```

**Using Secrets in Code:**
```python
from union import Secret, current_context

@task(secret_requests=[Secret(key="my_secret")])
def secure_task():
    secret_value = current_context().secrets.get(key="my_secret")
    # Use secret safely

# Mount as file
@task(secret_requests=[
    Secret(key="my_secret", mount_requirement=Secret.MountType.FILE)
])
def file_secret_task():
    pass
```

**Secret Operations:**
```bash
union get secret      # List secrets
union update secret   # Update existing secret
union delete secret   # Remove secret
```

**Best Practices:**
- Do not return secret values from tasks
- Use `env_var` for libraries expecting specific names
- Secrets only accessible within defined scope

## Common Workflow Patterns

### Basic Workflow Structure
```python
import union
from union import ImageSpec, Resources

# Define container environment
image_spec = ImageSpec(
    name="my-workflow",
    requirements="requirements.txt",
    registry="ghcr.io/myorg"
)

@union.task(container_image=image_spec)
def data_preprocessing(raw_data: str) -> str:
    """Preprocess raw data."""
    return clean_data(raw_data)

@union.task(
    container_image=image_spec,
    requests=Resources(cpu="2", mem="4Gi")
)
def model_training(processed_data: str) -> str:
    """Train ML model."""
    return train_model(processed_data)

@union.workflow
def ml_pipeline(input_data: str) -> str:
    """Complete ML training pipeline."""
    processed = data_preprocessing(raw_data=input_data)
    model = model_training(processed_data=processed)
    return model
```

### Advanced Parallel Processing
```python
from union import map_task, workflow

@union.task
def process_batch(batch: List[str]) -> List[str]:
    return [item.upper() for item in batch]

@workflow
def batch_processing_workflow(all_items: List[str]) -> List[str]:
    # Split into batches
    batch_size = 100
    batches = [all_items[i:i+batch_size]
              for i in range(0, len(all_items), batch_size)]

    # Process batches in parallel using map_task
    results = map_task(process_batch)(batches)

    # Flatten results
    return [item for batch in results for item in batch]
```

### Dynamic Workflow Example
```python
@union.dynamic
def dynamic_workflow(num_tasks: int) -> List[str]:
    results = []
    for i in range(num_tasks):
        result = dynamic_task(index=i)
        results.append(result)
    return results
```

### Error Handling and Retries
```python
from union import task
from union.types import Retries

@task(
    retries=Retries(retries=3),
    cache=True,
    timeout=timedelta(minutes=10)
)
def robust_task(data: str) -> str:
    """Task with retry logic and timeout."""
    try:
        return process_data(data)
    except Exception as e:
        # Log error and re-raise for retry
        print(f"Task failed: {e}")
        raise
```

## Troubleshooting and FAQ

### Common Issues

**Authentication Problems:**
- Remove deprecated `~/.unionai/` directory
- Install `keyring` service on Linux
- Avoid committing API keys to version control
- Use appropriate auth method for environment (PKCE/DeviceFlow/ClientSecret)

**Performance Optimization:**
- Use caching strategically after functionality stabilizes
- Consider Actors for tasks with expensive initialization
- Implement fine-grained task decomposition for checkpoints
- Use interruptible tasks with spot instances

**Resource Management:**
- Default limit: 10,000 concurrent executions per cluster
- Control parallelism with `max_parallelism` and `concurrency`
- Monitor resource utilization via dashboard
- Set appropriate resource requests and limits

**Import Resolution:**
- Use `--copy-all` flag for complex dependencies
- Use `union register` instead of direct imports
- Ensure proper `__init__.py` files in directories

### Data Storage and Security

**Data Handling:**
- Data stored in Union.ai's internal object store within data plane
- Users can change raw data storage location
- Support for custom blob store integration
- Raw data remains in user's data plane for security

**Machine Configuration:**
- Machine types can be changed via Node Group Configuration form
- Resource requirements specified at task level
- Support for GPU, memory, and CPU customization

This comprehensive context provides detailed guidance for authoring, configuring, and executing Flyte v1 workflows, covering all major concepts, patterns, and best practices from the official Union.ai documentation.