# Flyte v2 Workflow Authoring and Execution Context This document provides comprehensive context for authoring and running Flyte v2 workflows based on Union.ai's official documentation. It serves as a reference for AI assistants to understand how to properly write, configure, and execute Flyte v2 workflows. ## Overview Flyte v2 is a workflow orchestrator focused on AI development teams to rapidly ship high-quality code to production. Key characteristics: - **Pure Python**: Workflow orchestration using native Python syntax - **Asynchronous Model**: Built for modern async/await paradigms - **Performance & Scale**: Optimized for complex AI workloads - **Beta Status**: Currently in 2.0 beta with active development ## Getting Started ### Prerequisites and Setup **Required Tools:** - `uv` tool (Python package manager) - `flyte` Python package **Configuration Methods:** 1. **Configuration File** (`config.yaml`) 2. **Inline Configuration** (CLI flags) 3. **Environment Variables** **Creating Configuration:** ```bash flyte create config # Generates config.yaml ``` **Configuration Structure:** ```yaml admin: endpoint: "https://your-union-instance.com" image: builder: "local" # or "remote" task: domain: "development" org: "your_org" project: "your_project" ``` **Configuration File Locations:** - `./config.yaml` (current directory) - `~/.union/config.yaml` - `~/.flyte/config.yaml` **Verification:** ```bash flyte get config # Check current configuration ``` ### Running Workflows **Remote Execution:** ```bash # Command line flyte run hello.py main # Python flyte.init_from_config() flyte.run(main, name="Ada") ``` **Local Execution (Testing/Debugging):** ```bash # Command line flyte run --local hello.py main # Python flyte.init_from_config() flyte.with_runcontext(mode="local").run(main) ``` ## Task Configuration ### Container Images **Direct Image Reference:** ```python env = flyte.TaskEnvironment( name="my_task_env", image="docker.io/myorg/myimage" ) ``` **Using flyte.Image Object:** ```python image = flyte.Image.from_debian_base() \ .with_apt_packages(["git"]) \ .with_pip_packages(["pandas", "numpy"]) \ .with_env_vars({"MY_VAR": "value"}) env = flyte.TaskEnvironment( name="my_env", image=image ) ``` **Image Builder Configuration:** - `local`: Requires Docker login - `remote`: Uses Union ImageBuilder service ### Caching **Cache Modes:** ```python # Auto caching (recommended for most cases) @env.task(cache="auto") def my_task(): pass # Manual cache version control @env.task(cache="override", cache_version="v1.2") def my_task(): pass # Disable caching (default) @env.task(cache="disable") def my_task(): pass ``` **Cache Key Components:** - Final inputs - Task name - Interface hash - Cache version **Best Practices:** - Use "auto" caching for development and most production scenarios - Avoid caching functions with side effects - Use "override" for explicit cache control - Consider performance and storage implications ### Secrets Management **Creating Secrets:** ```bash flyte create secret MY_SECRET_KEY my_secret_value # Optional scoping: --project project_name --domain domain_name ``` **Using Secrets in Tasks:** ```python env = flyte.TaskEnvironment( secrets=[ flyte.Secret(key="MY_SECRET_KEY", as_env_var="MY_SECRET_ENV_VAR") ] ) @env.task def secure_task(): secret_value = os.getenv("MY_SECRET_ENV_VAR") # Use secret_value safely ``` **Important:** - Secrets are scoped at organization, project, or domain levels - Never return secret values from tasks - Always access via environment variables ### Reusable Containers **Configuration:** ```python env = flyte.TaskEnvironment( reusable=flyte.ReusePolicy( replicas=2, # Number of container instances idle_ttl=300 # 5 minutes idle timeout ) ) ``` **Best Use Cases:** - Frequent, short-duration tasks - Tasks with expensive initialization - Batch processing - Development/testing scenarios **Avoid When:** - Long-running tasks - Large memory consumption without cleanup - Tasks modifying global state ## Task Programming ### Files and Directories **Special Data Types:** - `flyte.io.File`: For individual files - `flyte.io.Dir`: For directories - `flyte.io.DataFrame`: For structured data **Key Features:** - Offloaded data types (store references, not data) - Efficient handling of large files - Support for both sync and async methods **Example Usage:** ```python @env.task def process_file(input_file: flyte.io.File) -> flyte.io.File: # Read file content content = input_file.read() # Process content result = process_data(content) # Create output file return flyte.io.File.from_string(result, "output.txt") @env.task def upload_local(): # Upload local file return flyte.io.File.from_local("./local_file.txt") ``` ### Dataclasses and Structures **Supported Types:** - Python dataclasses - Pydantic models - Nested structures - Lists of dataclasses **Example:** ```python @dataclass class InferenceRequest: model_name: str input_data: List[str] batch_size: int = 32 @env.task def batch_inference(requests: List[InferenceRequest]) -> List[dict]: results = [] for request in requests: # Process each request result = run_inference(request.model_name, request.input_data) results.append(result) return results ``` **Key Considerations:** - Fields must be serializable - Data is serialized/deserialized between tasks - Can include Flyte's offloaded types as fields ### Reports **Basic Report Usage:** ```python @env.task(report=True) def generate_report(): flyte.report.log("
Processing completed successfully
") flyte.report.flush() # Send to UI ``` **Advanced Reports:** ```python @env.task(report=True) def streaming_report(): tab = flyte.report.get_tab("Progress") for i in range(100): tab.replace(f"