Deployment patterns
Once you understand the basics of task deployment, you can leverage various deployment patterns to handle different project structures, dependency management approaches, and deployment requirements. This guide covers the most common patterns with practical examples.
Overview of deployment patterns
Flyte supports multiple deployment patterns to accommodate different project structures and requirements:
- Simple file deployment - Single file with tasks and environments
- Custom Dockerfile deployment - Full control over container environment
- PyProject package deployment - Structured Python packages with dependencies and async tasks
- Package structure deployment - Organized packages with shared environments
- Full build deployment - Complete code embedding in containers
- Python path deployment - Multi-directory project structures
- Dynamic environment deployment - Environment selection based on domain context
Each pattern serves specific use cases and can be combined as needed for complex projects.
Simple file deployment
The simplest deployment pattern involves defining both your tasks and task environment in a single Python file. This pattern works well for:
- Prototyping and experimentation
- Simple tasks with minimal dependencies
- Educational examples and tutorials
Example structure
import flyte
env = flyte.TaskEnvironment(name="simple_env")
@env.task
async def my_task(name: str) -> str:
return f"Hello, {name}!"
if __name__ == "__main__":
flyte.init_from_config()
flyte.deploy(env)
Deployment commands
# Deploy the environment
flyte deploy my_example.py env
# Run the task ephemerally
flyte run my_example.py my_task --name "World"When to use
- Quick prototypes and experiments
- Single-purpose scripts
- Learning Flyte basics
- Tasks with no external dependencies
Custom Dockerfile deployment
When you need full control over the container environment, you can specify a custom Dockerfile. This pattern is ideal for:
- Complex system dependencies
- Specific OS or runtime requirements
- Custom base images
- Multi-stage builds
Example structure
# syntax=docker/dockerfile:1.5
FROM ghcr.io/astral-sh/uv:0.8 as uv
FROM python:3.12-slim-bookworm
USER root
# Copy in uv so that later commands don't have to mount it in
COPY --from=uv /uv /usr/bin/uv
# Configure default envs
ENV UV_COMPILE_BYTECODE=1 \
UV_LINK_MODE=copy \
VIRTUALENV=/opt/venv \
UV_PYTHON=/opt/venv/bin/python \
PATH="/opt/venv/bin:$PATH"
# Create a virtualenv with the user specified python version
RUN uv venv /opt/venv --python=3.12
WORKDIR /root
# Install dependencies
COPY requirements.txt .
RUN uv pip install --pre -r /root/requirements.txt
from pathlib import Path
import flyte
env = flyte.TaskEnvironment(
name="docker_env",
image=flyte.Image.from_dockerfile(
# relative paths in python change based on where you call, so set it relative to this file
Path(__file__).parent / "Dockerfile",
registry="ghcr.io/flyteorg",
name="docker_env_image",
),
)
@env.task
def main(x: int) -> int:
return x * 2
if __name__ == "__main__":
import flyte.git
flyte.init_from_config(flyte.git.config_from_root())
run = flyte.run(main, x=10)
print(run.url)
Alternative: Dockerfile in different directory
You can also reference Dockerfiles from subdirectories:
from pathlib import Path
import flyte
env = flyte.TaskEnvironment(
name="docker_env_in_dir",
image=flyte.Image.from_dockerfile(
# relative paths in python change based on where you call, so set it relative to this file
Path(__file__).parent.parent / "Dockerfile.workdir",
registry="ghcr.io/flyteorg",
name="docker_env_image",
),
)
@env.task
def main(x: int) -> int:
return x * 2
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(main, x=10)
print(run.url)
# syntax=docker/dockerfile:1.5
FROM ghcr.io/astral-sh/uv:0.8 as uv
FROM python:3.12-slim-bookworm
USER root
# Copy in uv so that later commands don't have to mount it in
COPY --from=uv /uv /usr/bin/uv
# Configure default envs
ENV UV_COMPILE_BYTECODE=1 \
UV_LINK_MODE=copy \
VIRTUALENV=/opt/venv \
UV_PYTHON=/opt/venv/bin/python \
PATH="/opt/venv/bin:$PATH"
# Create a virtualenv with the user specified python version
RUN uv venv /opt/venv --python=3.12
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN uv pip install --pre -r /app/requirements.txt
Key considerations
- Path handling: Use
Path(__file__).parentfor relative Dockerfile paths# relative paths in python change based on where you call, so set it relative to this file Path(__file__).parent / "Dockerfile" - Registry configuration: Specify a registry for image storage
- Build context: The directory containing the Dockerfile becomes the build context
- Flyte installation: Ensure Flyte is installed in the container and available on
$PATH# Install Flyte in your Dockerfile RUN pip install flyte - Dependencies: Include all application requirements in the Dockerfile or requirements.txt
When to use
- Need specific system packages or tools
- Custom base image requirements
- Complex installation procedures
- Multi-stage build optimization
PyProject package deployment
For structured Python projects with proper package management, use the PyProject pattern. This approach demonstrates a realistic Python project structure that provides:
- Proper dependency management with
pyproject.tomland external packages likehttpx - Clean separation of business logic and Flyte tasks across multiple modules
- Professional project structure with
src/layout - Async task execution with API calls and data processing
- Entrypoint patterns for both command-line and programmatic execution
Example structure
pyproject_package/
├── pyproject.toml # Project metadata and dependencies
├── README.md # Documentation
└── src/
└── pyproject_package/
├── __init__.py # Package initialization
├── main.py # Entrypoint script
├── data/
│ ├── __init__.py
│ ├── loader.py # Data loading utilities (no Flyte)
│ └── processor.py # Data processing utilities (no Flyte)
├── models/
│ ├── __init__.py
│ └── analyzer.py # Analysis utilities (no Flyte)
└── tasks/
├── __init__.py
└── tasks.py # Flyte task definitionsBusiness logic modules
The business logic is completely separate from Flyte and can be used independently:
Data Loading (data/loader.py)
import json
from pathlib import Path
from typing import Any
import httpx
async def fetch_data_from_api(url: str) -> list[dict[str, Any]]:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
def load_local_data(file_path: str | Path) -> dict[str, Any]:
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
with path.open("r") as f:
return json.load(f)
Data Processing (data/processor.py)
import asyncio
from typing import Any
from pydantic import BaseModel, Field, field_validator
class DataItem(BaseModel):
id: int = Field(gt=0, description="Item ID must be positive")
value: float = Field(description="Item value")
category: str = Field(min_length=1, description="Item category")
@field_validator("category")
@classmethod
def category_must_be_lowercase(cls, v: str) -> str:
return v.lower()
def clean_data(raw_data: dict[str, Any]) -> dict[str, Any]:
# Remove None values
cleaned = {k: v for k, v in raw_data.items() if v is not None}
# Validate items if present
if "items" in cleaned:
validated_items = []
for item in cleaned["items"]:
try:
validated = DataItem(**item)
validated_items.append(validated.model_dump())
except Exception as e:
print(f"Skipping invalid item {item}: {e}")
continue
cleaned["items"] = validated_items
return cleaned
def transform_data(data: dict[str, Any]) -> list[dict[str, Any]]:
items = data.get("items", [])
# Add computed fields
transformed = []
for item in items:
transformed_item = {
**item,
"value_squared": item["value"] ** 2,
"category_upper": item["category"].upper(),
}
transformed.append(transformed_item)
return transformed
async def aggregate_data(items: list[dict[str, Any]]) -> dict[str, Any]:
# Simulate async processing
await asyncio.sleep(0.1)
aggregated: dict[str, dict[str, Any]] = {}
for item in items:
category = item["category"]
if category not in aggregated:
aggregated[category] = {
"count": 0,
"total_value": 0.0,
"values": [],
}
aggregated[category]["count"] += 1
aggregated[category]["total_value"] += item["value"]
aggregated[category]["values"].append(item["value"])
# Calculate averages
for category, v in aggregated.items():
total = v["total_value"]
count = v["count"]
v["average_value"] = total / count if count > 0 else 0.0
return {"categories": aggregated, "total_items": len(items)}
Analysis (models/analyzer.py)
from typing import Any
import numpy as np
def calculate_statistics(data: list[dict[str, Any]]) -> dict[str, Any]:
if not data:
return {
"count": 0,
"mean": 0.0,
"median": 0.0,
"std_dev": 0.0,
"min": 0.0,
"max": 0.0,
}
values = np.array([item["value"] for item in data])
stats = {
"count": len(values),
"mean": float(np.mean(values)),
"median": float(np.median(values)),
"std_dev": float(np.std(values)),
"min": float(np.min(values)),
"max": float(np.max(values)),
"percentile_25": float(np.percentile(values, 25)),
"percentile_75": float(np.percentile(values, 75)),
}
return stats
def generate_report(stats: dict[str, Any]) -> str:
report_lines = [
"=" * 60,
"DATA ANALYSIS REPORT",
"=" * 60,
]
# Basic statistics section
if "basic" in stats:
basic = stats["basic"]
report_lines.extend(
[
"",
"BASIC STATISTICS:",
f" Count: {basic.get('count', 0)}",
f" Mean: {basic.get('mean', 0.0):.2f}",
f" Median: {basic.get('median', 0.0):.2f}",
f" Std Dev: {basic.get('std_dev', 0.0):.2f}",
f" Min: {basic.get('min', 0.0):.2f}",
f" Max: {basic.get('max', 0.0):.2f}",
f" 25th %ile: {basic.get('percentile_25', 0.0):.2f}",
f" 75th %ile: {basic.get('percentile_75', 0.0):.2f}",
]
)
# Category aggregations section
if "aggregated" in stats and "categories" in stats["aggregated"]:
categories = stats["aggregated"]["categories"]
total_items = stats["aggregated"].get("total_items", 0)
report_lines.extend(
[
"",
"CATEGORY BREAKDOWN:",
f" Total Items: {total_items}",
"",
]
)
for category, cat_stats in sorted(categories.items()):
report_lines.extend(
[
f" Category: {category.upper()}",
f" Count: {cat_stats.get('count', 0)}",
f" Total Value: {cat_stats.get('total_value', 0.0):.2f}",
f" Average Value: {cat_stats.get('average_value', 0.0):.2f}",
"",
]
)
report_lines.append("=" * 60)
return "\n".join(report_lines)
These modules demonstrate:
- No Flyte dependencies - can be tested and used independently
- Pydantic models for data validation with custom validators
- Async patterns with proper context managers and error handling
- NumPy integration for statistical calculations
- Professional error handling with timeouts and validation
Flyte orchestration layer
The Flyte tasks orchestrate the business logic with proper async execution:
import pathlib
from typing import Any
import flyte
from pyproject_package.data import loader, processor
from pyproject_package.models import analyzer
UV_PROJECT_ROOT = pathlib.Path(__file__).parent.parent.parent.parent
env = flyte.TaskEnvironment(
name="data_pipeline",
image=flyte.Image.from_debian_base().with_uv_project(pyproject_file=UV_PROJECT_ROOT / "pyproject.toml"),
resources=flyte.Resources(memory="512Mi", cpu="500m"),
)
@env.task
async def fetch_task(url: str) -> list[dict[str, Any]]:
print(f"Fetching data from: {url}")
data = await loader.fetch_data_from_api(url)
print(f"Fetched {len(data)} top-level keys")
return data
@env.task
async def process_task(raw_data: dict[str, Any]) -> list[dict[str, Any]]:
print("Cleaning data...")
cleaned = processor.clean_data(raw_data)
print("Transforming data...")
transformed = processor.transform_data(cleaned)
print(f"Processed {len(transformed)} items")
return transformed
@env.task
async def analyze_task(processed_data: list[dict[str, Any]]) -> str:
print("Aggregating data...")
aggregated = await processor.aggregate_data(processed_data)
print("Calculating statistics...")
stats = analyzer.calculate_statistics(processed_data)
print("Generating report...")
report = analyzer.generate_report({"basic": stats, "aggregated": aggregated})
print("\n" + report)
return report
@env.task
async def pipeline(api_url: str) -> str:
# Chain tasks together
raw_data = await fetch_task(url=api_url)
processed_data = await process_task(raw_data=raw_data[0])
report = await analyze_task(processed_data=processed_data)
return report
Entrypoint configuration
The main entrypoint demonstrates proper initialization and execution patterns:
import pathlib
import flyte
from pyproject_package.tasks.tasks import pipeline
def main():
# Initialize Flyte connection
flyte.init_from_config(root_dir=pathlib.Path(__file__).parent.parent)
# Example API URL with mock data
# In a real scenario, this would be a real API endpoint
example_url = "https://jsonplaceholder.typicode.com/posts"
# For demonstration, we'll use mock data instead of the actual API
# to ensure the example works reliably
print("Starting data pipeline...")
print(f"Target API: {example_url}")
# To run remotely, uncomment the following:
run = flyte.run(pipeline, api_url=example_url)
print(f"\nRun Name: {run.name}")
print(f"Run URL: {run.url}")
run.wait()
if __name__ == "__main__":
main()
Dependencies and configuration
[project]
name = "pyproject-package"
version = "0.1.0"
description = "Example Python package with Flyte tasks and modular business logic"
readme = "README.md"
authors = [
{ name = "Ketan Umare", email = "[email protected]" }
]
requires-python = ">=3.10"
dependencies = [
"flyte>=2.0.0b24",
"httpx>=0.27.0",
"numpy>=1.26.0",
"pydantic>=2.0.0",
]
[project.scripts]
run-pipeline = "pyproject_package.main:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Key features
- Async task chains: Tasks can be chained together with proper async/await patterns
- External dependencies: Demonstrates integration with external libraries (
httpx,pyyaml) - uv integration: Uses
.with_uv_project()for dependency management - Resource specification: Shows how to set memory and CPU requirements
- Proper error handling: Includes timeout and error handling in API calls
Key learning points
- Separation of concerns: Business logic (
data/,models/) separate from orchestration (main.py) - Reusable code: Non-Flyte modules can be tested independently and reused
- Async support: Demonstrates async Flyte tasks for I/O-bound operations
- Dependency management: Shows how external packages integrate with Flyte
- Realistic structure: Mirrors real-world Python project organization
- Entrypoint script: Shows how to create runnable entry points
Usage patterns
Run locally:
python -m pyproject_package.mainDeploy to Flyte:
flyte deploy .Run remotely:
python -m pyproject_package.main # Uses remote executionWhat this example demonstrates
- Multiple files and modules in a package
- Async Flyte tasks with external API calls
- Separation of business logic from orchestration
- External dependencies (
httpx,numpy,pydantic) - Data validation with Pydantic models for robust data processing
- Professional error handling with try/catch for data validation
- Timeout configuration for external API calls (
timeout=10.0) - Async context managers for proper resource management (
async with httpx.AsyncClient()) - Entrypoint script pattern with
project.scripts - Realistic project structure with
src/layout - Task chaining and data flow
- How non-Flyte code integrates with Flyte tasks
When to use
- Production-ready, maintainable projects
- Projects requiring external API integration
- Complex data processing pipelines
- Team development with proper separation of concerns
- Applications needing async execution patterns
Package structure deployment
For organizing Flyte workflows in a package structure with shared task environments and utilities, use this pattern. It’s particularly useful for:
- Multiple workflows that share common environments and utilities
- Organized code structure with clear module boundaries
- Projects where you want to reuse task environments across workflows
Example structure
lib/
├── __init__.py
└── workflows/
├── __init__.py
├── workflow1.py # First workflow
├── workflow2.py # Second workflow
├── env.py # Shared task environment
└── utils.py # Shared utilitiesKey concepts
- Shared environments: Define task environments in
env.pyand import across workflows - Utility modules: Common functions and utilities shared between workflows
- Root directory handling: Use
--root-dirflag for proper Python path configuration
Running with root directory
When running workflows with a package structure, specify the root directory:
# Run first workflow
flyte run --root-dir . lib/workflows/workflow1.py process_workflow
# Run second workflow
flyte run --root-dir . lib/workflows/workflow2.py math_workflow --n 6How --root-dir works
The --root-dir flag automatically configures the Python path (sys.path) to ensure:
- Local execution: Package imports work correctly when running locally
- Consistent behavior: Same Python path configuration locally and at runtime
- No manual PYTHONPATH: Eliminates need to manually export environment variables
- Runtime packaging: Flyte packages and copies code correctly to execution environment
- Runtime consistency: The same package structure is preserved in the runtime container
Alternative: Using a Python project
For larger projects, create a proper Python project with pyproject.toml:
# pyproject.toml
[project]
name = "lib"
version = "0.1.0"
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"Then install in editable mode:
pip install -e .After installation, you can run workflows without --root-dir:
flyte run lib/workflows/workflow1.py process_workflowHowever, for deployment and remote execution, still use --root-dir for consistency:
flyte run --root-dir . lib/workflows/workflow1.py process_workflow
flyte deploy --root-dir . lib/workflows/workflow1.pyWhen to use
- Multiple related workflows in one project
- Shared task environments and utilities
- Team projects with multiple contributors
- Applications requiring organized code structure
- Projects that benefit from proper Python packaging
Full build deployment
When you need complete reproducibility and want to embed all code directly in the container image, use the full build pattern. This disables Flyte’s fast deployment system in favor of traditional container builds.
Overview
By default, Flyte uses a fast deployment system that:
- Creates a tar archive of your files
- Skips the full image build and push process
- Provides faster iteration during development
However, sometimes you need to completely embed your code into the container image for:
- Full reproducibility with immutable container images
- Environments where fast deployment isn’t available
- Production deployments with all dependencies baked in
- Air-gapped or restricted deployment environments
Key configuration
import pathlib
from dep import foo
import flyte
env = flyte.TaskEnvironment(
name="full_build",
image=flyte.Image.from_debian_base().with_source_folder(
pathlib.Path(__file__).parent,
copy_contents_only=True # Avoid nested folders
),
)
@env.task
def square(x) -> int:
return x ** foo()
@env.task
def main(n: int) -> list[int]:
return list(flyte.map(square, range(n)))
if __name__ == "__main__":
# copy_contents_only=True requires root_dir=parent, False requires root_dir=parent.parent
flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
run = flyte.with_runcontext(copy_style="none", version="x").run(main, n=10)
print(run.url)
Local dependency example
The main.py file imports from a local dependency that gets included in the build:
Critical configuration components
-
Set
copy_styleto"none":flyte.with_runcontext(copy_style="none", version="x").run(main, n=10)This disables Flyte’s fast deployment system and forces a full container build.
-
Set a custom version:
flyte.with_runcontext(copy_style="none", version="x").run(main, n=10)The
versionparameter should be set to a desired value (not auto-generated) for consistent image tagging. -
Configure image source copying:
image=flyte.Image.from_debian_base().with_source_folder( pathlib.Path(__file__).parent, copy_contents_only=True )Use
.with_source_folder()to specify what code to copy into the container. -
Set
root_dircorrectly:flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)- If
copy_contents_only=True: Setroot_dirto the source folder (contents are copied) - If
copy_contents_only=False: Setroot_dirto parent directory (folder is copied)
- If
Configuration options
Option A: Copy Folder Structure
# Copies the entire folder structure into the container
image=flyte.Image.from_debian_base().with_source_folder(
pathlib.Path(__file__).parent,
copy_contents_only=False # Default
)
# When copy_contents_only=False, set root_dir to parent.parent
flyte.init_from_config(root_dir=pathlib.Path(__file__).parent.parent)Option B: Copy Contents Only (Recommended)
# Copies only the contents of the folder (flattens structure)
# This is useful when you want to avoid nested folders - for example all your code is in the root of the repo
image=flyte.Image.from_debian_base().with_source_folder(
pathlib.Path(__file__).parent,
copy_contents_only=True
)
# When copy_contents_only=True, set root_dir to parent
flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)Version management best practices
When using copy_style="none", always specify an explicit version:
- Use semantic versioning:
"v1.0.0","v1.1.0" - Use build numbers:
"build-123" - Use git commits:
"abc123"
Avoid auto-generated versions to ensure reproducible deployments.
Performance considerations
- Full builds take longer than fast deployment
- Container images will be larger as they include all source code
- Better for production where immutability is important
- Use during development when testing the full deployment pipeline
When to use
✅ Use full build when:
- Deploying to production environments
- Need immutable, reproducible container images
- Working with complex dependency structures
- Deploying to air-gapped or restricted environments
- Building CI/CD pipelines
❌ Don’t use full build when:
- Rapid development and iteration
- Working with frequently changing code
- Development environments where speed matters
- Simple workflows without complex dependencies
Troubleshooting
Common issues:
- Import errors: Check your
root_dirconfiguration matchescopy_contents_only - Missing files: Ensure all dependencies are in the source folder
- Version conflicts: Use explicit, unique version strings
- Build failures: Check that the base image has all required system dependencies
Debug tips:
- Add print statements to verify file paths in containers
- Use
docker run -it <image> /bin/bashto inspect built images - Check Flyte logs for build errors and warnings
- Verify that relative imports work correctly in the container context
Python path deployment
For projects where workflows are separated from business logic across multiple directories, use the Python path pattern with proper root_dir configuration.
Example structure
pythonpath/
├── workflows/
│ └── workflow.py # Flyte workflow definitions
├── src/
│ └── my_module.py # Business logic modules
├── run.sh # Execute from project root
└── run_inside_folder.sh # Execute from workflows/ directoryImplementation
import pathlib
from src.my_module import env, say_hello
import flyte
env = flyte.TaskEnvironment(
name="workflow_env",
depends_on=[env],
)
@env.task
async def greet(name: str) -> str:
return await say_hello(name)
if __name__ == "__main__":
current_dir = pathlib.Path(__file__).parent
flyte.init_from_config(root_dir=current_dir.parent)
r = flyte.run(greet, name="World")
print(r.url)
import flyte
env = flyte.TaskEnvironment(
name="my_module",
)
@env.task
async def say_hello(name: str) -> str:
return f"Hello, {name}!"
Task environment dependencies
Note how the workflow imports both the task environment and the task function:
from src.my_module import env, say_hello
env = flyte.TaskEnvironment(
name="workflow_env",
depends_on=[env], # Depends on the imported environment
)This pattern allows sharing task environments across modules while maintaining proper dependency relationships.
Key considerations
- Import resolution:
root_direnables proper module imports across directories - File packaging: Flyte packages all files starting from
root_dir - Execution flexibility: Works regardless of where you execute the script
- PYTHONPATH handling: Different behavior for CLI vs direct Python execution
CLI vs Direct Python execution
Using Flyte CLI with --root-dir (Recommended)
When using flyte run with --root-dir, you don’t need to export PYTHONPATH:
flyte run --root-dir . workflows/workflow.py greet --name "World"The CLI automatically:
- Adds the
--root-dirlocation tosys.path - Resolves all imports correctly
- Packages files from the root directory for remote execution
Using Python directly
When running Python scripts directly, you must set PYTHONPATH manually:
PYTHONPATH=.:$PYTHONPATH python workflows/workflow.pyThis is because:
- Python doesn’t automatically know about your project structure
- You need to explicitly tell Python where to find your modules
- The
root_dirparameter handles remote packaging, not local path resolution
Best practices
- Always set
root_dirwhen workflows import from multiple directories - Use pathlib for cross-platform path handling
- Set
root_dirto your project root to ensure all dependencies are captured - Test both execution patterns to ensure deployment works from any directory
Common pitfalls
- Forgetting
root_dir: Results in import errors during remote execution - Wrong
root_dirpath: May package too many or too few files - Not setting PYTHONPATH when using Python directly: Use
flyte run --root-dir .instead - Mixing execution methods: If you use
flyte run --root-dir ., you don’t need PYTHONPATH
When to use
- Legacy projects with established directory structures
- Separation of concerns between workflows and business logic
- Multiple workflow definitions sharing common modules
- Projects with complex import hierarchies
Note: This pattern is an escape hatch for larger projects where code organization requires separating workflows from business logic. Ideally, structure projects with pyproject.toml for cleaner dependency management.
Dynamic environment deployment
For environments that need to change based on deployment context (development vs production), use dynamic environment selection based on Flyte domains.
Domain-based environment selection
Use flyte.current_domain() to deterministically create different task environments based on the deployment domain:
# NOTE: flyte.init() invocation at the module level is strictly discouraged.
# At runtime, Flyte controls initialization and configuration files are not present.
import os
import flyte
def create_env():
if flyte.current_domain() == "development":
return flyte.TaskEnvironment(name="dev", image=flyte.Image.from_debian_base(), env_vars={"MY_ENV": "dev"})
return flyte.TaskEnvironment(name="prod", image=flyte.Image.from_debian_base(), env_vars={"MY_ENV": "prod"})
env = create_env()
@env.task
async def my_task(n: int) -> int:
print(f"Environment Variable MY_ENV = {os.environ['MY_ENV']}", flush=True)
return n + 1
@env.task
async def entrypoint(n: int) -> int:
print(f"Environment Variable MY_ENV = {os.environ['MY_ENV']}", flush=True)
return await my_task(n)
Why this pattern works
Environment reproducibility in local and remote clusters is critical. Flyte re-instantiates modules in remote clusters, so current_domain() will be set correctly based on where the code executes.
✅ Do use flyte.current_domain() - Flyte automatically sets this based on the execution context
❌ Don’t use environment variables directly - They won’t yield correct results unless manually passed to the downstream system
How it works
- Flyte sets the domain context when initializing
current_domain()returns the domain string (e.g., “development”, “staging”, “production”)- Your code deterministically configures resources based on this domain
- When Flyte executes remotely, it re-instantiates modules with the correct domain context
- The same environment configuration logic runs consistently everywhere
Important constraints
flyte.current_domain() only works after flyte.init() is called:
- ✅ Works with
flyte runandflyte deployCLI commands (they init automatically) - ✅ Works when called from
if __name__ == "__main__"after explicitflyte.init() - ❌ Does NOT work at module level without initialization
Critical: flyte.init() invocation at the module level is strictly discouraged. The reason is that at runtime, Flyte controls the initialization and configuration files are not present at runtime.
Alternative: Environment variable approach
For cases where you need to pass domain information as environment variables to the container runtime, use this approach:
import os
import flyte
def create_env(domain: str):
# Pass domain as environment variable so tasks can see which domain they're running in
if domain == "development":
return flyte.TaskEnvironment(name="dev", image=flyte.Image.from_debian_base(), env_vars={"DOMAIN_NAME": domain})
return flyte.TaskEnvironment(name="prod", image=flyte.Image.from_debian_base(), env_vars={"DOMAIN_NAME": domain})
env = create_env(os.getenv("DOMAIN_NAME", "development"))
@env.task
async def my_task(n: int) -> int:
print(f"Environment Variable MY_ENV = {os.environ['DOMAIN_NAME']}", flush=True)
return n + 1
@env.task
async def entrypoint(n: int) -> int:
print(f"Environment Variable MY_ENV = {os.environ['DOMAIN_NAME']}", flush=True)
return await my_task(n)
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(entrypoint, n=5)
print(r.url)
Key differences from domain-based approach
- Environment variable access: The domain name is available inside tasks via
os.environ['DOMAIN_NAME'] - External control: Can be controlled via system environment variables before execution
- Runtime visibility: Tasks can inspect which environment they’re running in during execution
- Default fallback: Uses
"development"as default whenDOMAIN_NAMEis not set
Usage with environment variables
# Set environment and run
export DOMAIN_NAME=production
flyte run environment_picker.py entrypoint --n 5
# Or set inline
DOMAIN_NAME=development flyte run environment_picker.py entrypoint --n 5When to use environment variables vs domain-based
Use environment variables when:
- Tasks need runtime access to environment information
- External systems set environment configuration
- You need flexibility to override environment externally
- Debugging requires visibility into environment selection
Use domain-based approach when:
- Environment selection should be automatic based on Flyte domain
- You want tighter integration with Flyte’s domain system
- No need for runtime environment inspection within tasks
You can vary multiple aspects based on context:
- Base images: Different images for dev vs prod
- Environment variables: Configuration per environment
- Resource requirements: Different CPU/memory per domain
- Dependencies: Different package versions
- Registry settings: Different container registries
Usage patterns
# CLI usage (recommended)
flyte run environment_picker.py entrypoint --n 5
flyte deploy environment_picker.pyFor programmatic usage, ensure proper initialization:
import flyte
flyte.init_from_config()
from environment_picker import entrypoint
if __name__ == "__main__":
r = flyte.run(entrypoint, n=5)
print(r.url)
When to use dynamic environments
General use cases:
- Multi-environment deployments (dev/staging/prod)
- Different resource requirements per environment
- Environment-specific dependencies or settings
- Context-sensitive configuration needs
Domain-based approach for:
- Automatic environment selection tied to Flyte domains
- Simpler configuration without external environment variables
- Integration with Flyte’s built-in domain system
Environment variable approach for:
- Runtime visibility into environment selection within tasks
- External control over environment configuration
- Debugging and logging environment-specific behavior
- Integration with external deployment systems that set environment variables
Best practices
Project organization
- Separate concerns: Keep business logic separate from Flyte task definitions
- Use proper imports: Structure projects for clean import patterns
- Version control: Include all necessary files in version control
- Documentation: Document deployment requirements and patterns
Image management
- Registry configuration: Use consistent registry settings across environments
- Image tagging: Use meaningful tags for production deployments
- Base image selection: Choose appropriate base images for your needs
- Dependency management: Keep container images lightweight but complete
Configuration management
- Root directory: Set
root_dirappropriately for your project structure - Path handling: Use
pathlib.Pathfor cross-platform compatibility - Environment variables: Use environment-specific configurations
- Secrets management: Handle sensitive data appropriately
Development workflow
- Local testing: Test tasks locally before deployment
- Incremental development: Use
flyte runfor quick iterations - Production deployment: Use
flyte deployfor permanent deployments - Monitoring: Monitor deployed tasks and environments
Choosing the right pattern
| Pattern | Use Case | Complexity | Best For |
|---|---|---|---|
| Simple file | Quick prototypes, learning | Low | Single tasks, experiments |
| Custom Dockerfile | System dependencies, custom environments | Medium | Complex dependencies |
| PyProject package | Professional projects, async pipelines | Medium-High | Production applications |
| Package structure | Multiple workflows, shared utilities | Medium | Organized team projects |
| Full build | Production, reproducibility | High | Immutable deployments |
| Python path | Legacy structures, separated concerns | Medium | Existing codebases |
| Dynamic environment | Multi-environment, domain-aware deployments | Medium | Context-aware deployments |
Start with simpler patterns and evolve to more complex ones as your requirements grow. Many projects will combine multiple patterns as they scale and mature.