# Flyte v2 Workflow Authoring and Execution Context This document provides comprehensive context for authoring and running Flyte v2 workflows based on Union.ai's official documentation. It serves as a reference for AI assistants to understand how to properly write, configure, and execute Flyte v2 workflows. ## Overview Flyte v2 is a workflow orchestrator focused on AI development teams to rapidly ship high-quality code to production. Key characteristics: - **Pure Python**: Workflow orchestration using native Python syntax - **Asynchronous Model**: Built for modern async/await paradigms - **Performance & Scale**: Optimized for complex AI workloads - **Beta Status**: Currently in 2.0 beta with active development ## Getting Started ### Prerequisites and Setup **Required Tools:** - `uv` tool (Python package manager) - `flyte` Python package **Configuration Methods:** 1. **Configuration File** (`config.yaml`) 2. **Inline Configuration** (CLI flags) 3. **Environment Variables** **Creating Configuration:** ```bash flyte create config # Generates config.yaml ``` **Configuration Structure:** ```yaml admin: endpoint: "https://your-union-instance.com" image: builder: "local" # or "remote" task: domain: "development" org: "your_org" project: "your_project" ``` **Configuration File Locations:** - `./config.yaml` (current directory) - `~/.union/config.yaml` - `~/.flyte/config.yaml` **Verification:** ```bash flyte get config # Check current configuration ``` ### Running Workflows **Remote Execution:** ```bash # Command line flyte run hello.py main # Python flyte.init_from_config() flyte.run(main, name="Ada") ``` **Local Execution (Testing/Debugging):** ```bash # Command line flyte run --local hello.py main # Python flyte.init_from_config() flyte.with_runcontext(mode="local").run(main) ``` ## Task Configuration ### Container Images **Direct Image Reference:** ```python env = flyte.TaskEnvironment( name="my_task_env", image="docker.io/myorg/myimage" ) ``` **Using flyte.Image Object:** ```python image = flyte.Image.from_debian_base() \ .with_apt_packages(["git"]) \ .with_pip_packages(["pandas", "numpy"]) \ .with_env_vars({"MY_VAR": "value"}) env = flyte.TaskEnvironment( name="my_env", image=image ) ``` **Image Builder Configuration:** - `local`: Requires Docker login - `remote`: Uses Union ImageBuilder service ### Caching **Cache Modes:** ```python # Auto caching (recommended for most cases) @env.task(cache="auto") def my_task(): pass # Manual cache version control @env.task(cache="override", cache_version="v1.2") def my_task(): pass # Disable caching (default) @env.task(cache="disable") def my_task(): pass ``` **Cache Key Components:** - Final inputs - Task name - Interface hash - Cache version **Best Practices:** - Use "auto" caching for development and most production scenarios - Avoid caching functions with side effects - Use "override" for explicit cache control - Consider performance and storage implications ### Secrets Management **Creating Secrets:** ```bash flyte create secret MY_SECRET_KEY my_secret_value # Optional scoping: --project project_name --domain domain_name ``` **Using Secrets in Tasks:** ```python env = flyte.TaskEnvironment( secrets=[ flyte.Secret(key="MY_SECRET_KEY", as_env_var="MY_SECRET_ENV_VAR") ] ) @env.task def secure_task(): secret_value = os.getenv("MY_SECRET_ENV_VAR") # Use secret_value safely ``` **Important:** - Secrets are scoped at organization, project, or domain levels - Never return secret values from tasks - Always access via environment variables ### Reusable Containers **Configuration:** ```python env = flyte.TaskEnvironment( reusable=flyte.ReusePolicy( replicas=2, # Number of container instances idle_ttl=300 # 5 minutes idle timeout ) ) ``` **Best Use Cases:** - Frequent, short-duration tasks - Tasks with expensive initialization - Batch processing - Development/testing scenarios **Avoid When:** - Long-running tasks - Large memory consumption without cleanup - Tasks modifying global state ## Task Programming ### Files and Directories **Special Data Types:** - `flyte.io.File`: For individual files - `flyte.io.Dir`: For directories - `flyte.io.DataFrame`: For structured data **Key Features:** - Offloaded data types (store references, not data) - Efficient handling of large files - Support for both sync and async methods **Example Usage:** ```python @env.task def process_file(input_file: flyte.io.File) -> flyte.io.File: # Read file content content = input_file.read() # Process content result = process_data(content) # Create output file return flyte.io.File.from_string(result, "output.txt") @env.task def upload_local(): # Upload local file return flyte.io.File.from_local("./local_file.txt") ``` ### Dataclasses and Structures **Supported Types:** - Python dataclasses - Pydantic models - Nested structures - Lists of dataclasses **Example:** ```python @dataclass class InferenceRequest: model_name: str input_data: List[str] batch_size: int = 32 @env.task def batch_inference(requests: List[InferenceRequest]) -> List[dict]: results = [] for request in requests: # Process each request result = run_inference(request.model_name, request.input_data) results.append(result) return results ``` **Key Considerations:** - Fields must be serializable - Data is serialized/deserialized between tasks - Can include Flyte's offloaded types as fields ### Reports **Basic Report Usage:** ```python @env.task(report=True) def generate_report(): flyte.report.log("

Processing Started

") # Do work process_data() flyte.report.log("

Processing completed successfully

") flyte.report.flush() # Send to UI ``` **Advanced Reports:** ```python @env.task(report=True) def streaming_report(): tab = flyte.report.get_tab("Progress") for i in range(100): tab.replace(f"
Progress: {i}%
") flyte.report.flush() time.sleep(0.1) ``` ### Error Handling **Pattern:** ```python @env.task async def resilient_workflow(): try: result = await potentially_failing_task() return result except flyte.errors.OOMError: # Retry with more resources return await retry_with_more_memory() finally: # Cleanup tasks always run await cleanup_task() ``` **Philosophy:** - Treat certain errors as "expected and acceptable" - Leverage Python's native error handling - Support dynamic resource adjustment ### Traces **Purpose:** - Fine-grained observability of helper functions - Checkpoint creation for workflow resumption - Track execution time, inputs, outputs **Usage:** ```python @flyte.trace async def expensive_api_call(query: str) -> dict: """External API call with tracing.""" result = await api_client.query(query) return result @env.task async def main_task(topic: str): # This call will be traced data = await expensive_api_call(f"research {topic}") return process_data(data) ``` **Best For:** - External API calls - Expensive computations - LLM interactions - Non-deterministic operations ### Grouping Actions **Visual Organization:** ```python @env.task async def complex_workflow(): with flyte.group("data-validation"): validated_data = await validate_schema(raw_data) validated_data = await check_data_quality(validated_data) with flyte.group("model-training"): model = await train_model(validated_data) metrics = await evaluate_model(model) return model, metrics ``` **Purpose:** - Logical clustering of task invocations - Improved UI visualization - No impact on execution behavior ### Fanout Patterns **Parallel Execution:** ```python @env.task async def parallel_processing(items: List[str]): # Create task invocations tasks = [process_item(item) for item in items] # Execute in parallel results = await asyncio.gather(*tasks) return results ``` **Batched Processing:** ```python @env.task async def batched_fanout(large_dataset: List[dict]): batch_size = 100 batches = [large_dataset[i:i+batch_size] for i in range(0, len(large_dataset), batch_size)] # Process batches in parallel batch_tasks = [process_batch(batch) for batch in batches] batch_results = await asyncio.gather(*batch_tasks) # Combine results return [item for batch in batch_results for item in batch] ``` ### Notebooks Integration **Initialization:** ```python import flyte flyte.init( endpoint="https://union.example.com", org="example_org", project="example_project", domain="development", ) ``` **Remote Interaction:** ```python # Access existing runs remote = flyte.remote() runs = remote.list_runs() # Download logs run = remote.get_run(run_id) logs = run.download_logs() ``` ## Important Considerations ### Non-deterministic Behavior - **Problem**: Randomness breaks task recovery and replay - **Solution**: Use `@flyte.trace` decorator for checkpointing ### Type Safety - **Note**: Not automatically guaranteed at workflow level (unlike v1) - **Recommendation**: Use Python type hints and `mypy` ### Global State - **Limitation**: No global state preservation across containers - **Requirement**: State must be reconstructable through deterministic execution ### Driver Pod Sizing - **Consideration**: Parent tasks orchestrating downstream tasks need appropriate resources - **Avoid**: CPU-intensive operations in driver pods ### Memory Management - **Risk**: Large data structures can cause OOM errors - **Solution**: Use `flyte.io.File`, `flyte.io.Dir`, and `flyte.io.DataFrame` for lower memory footprint ## Common Workflow Patterns ### Basic Task Definition ```python import flyte # Create environment env = flyte.TaskEnvironment( name="my_environment", image="ghcr.io/unionai-oss/flyte:latest" ) @env.task def simple_task(x: int) -> int: return x * 2 @env.task async def async_task(data: str) -> dict: result = await process_data_async(data) return result ``` ### Workflow Composition ```python @env.task async def data_pipeline(input_data: List[str]) -> dict: # Parallel preprocessing preprocessed = await asyncio.gather( *[preprocess_item(item) for item in input_data] ) # Sequential training steps with flyte.group("model-training"): features = await extract_features(preprocessed) model = await train_model(features) metrics = await evaluate_model(model) return {"model": model, "metrics": metrics} ``` ### File Processing Pipeline ```python @env.task def process_dataset(input_dir: flyte.io.Dir) -> flyte.io.File: results = [] # Process files in directory for file_path in input_dir.walk(): if file_path.endswith('.csv'): data = pd.read_csv(file_path) processed = process_dataframe(data) results.append(processed) # Combine results combined = pd.concat(results) # Return as Flyte file return flyte.io.File.from_dataframe(combined, "processed_data.parquet") ``` This context provides comprehensive guidance for authoring and executing Flyte v2 workflows, covering all major concepts, patterns, and best practices from the official Union.ai documentation.