Part 1 — Vanilla Operators
This is the first part of the Airflow → Flyte migration guide. It covers:
- Where dependencies are specified
- The driver task (in place of a DAG definition)
- Triggers (in place of DAG schedules)
- PythonOperator →
@env.task - TaskFlow →
@env.task - BashOperator → ContainerTask
- KubernetesPodOperator → TaskEnvironment + PodTemplate
- Orchestration: parallelism, conditionals, error handling
Part 2 (later) covers provider operators: Beam, Dataproc, BigQuery, Databricks, Spark, sensors.
1. Where dependencies are specified
In Airflow, dependencies are specified at the platform level. A single Airflow deployment has a base image with a fixed Python environment; every DAG author writes against the same set of installed libraries. Adding a new library means modifying the deployment (Helm extraPipPackages, a custom base image, or a redeploy), or working around the shared env with PythonVirtualenvOperator, DockerOperator, or KubernetesPodOperator.
In Flyte, dependencies are specified at the code level. Each task declares its TaskEnvironment, which includes the image and its dependencies. The image is the unit of isolation, and a single workflow can fan out across tasks running in different images.
Two ways to declare the image:
# (a) Build with flyte.Image — from a base, add pip/apt packages, env vars, etc.
etl_env = flyte.TaskEnvironment(
name="etl",
image=flyte.Image.from_debian_base()
.with_pip_packages("pandas", "pyarrow"),
)
# (b) Pass a string reference to an existing image — for example the same one
# you already deploy with in your Airflow KubernetesExecutor / KPO setup.
gpu_env = flyte.TaskEnvironment(
name="gpu",
image="registry.example.com/my-org/gpu-training:2026.04.01",
)Docs: TaskEnvironment · Container Images
2. The driver task (in place of DAGs)
Airflow DAGs are static graphs. The with DAG(...) block runs at parse time; the scheduler compiles the node/edge structure and then traverses it.
Flyte has no parse-time graph. The driver task is Python code that runs at execution time; the graph is built dynamically as the driver calls other tasks. There is no compilation step.
Flyte tasks are async-native — @env.task functions are typically declared async def and tasks are invoked with await. Plain def tasks are also supported when you don’t need concurrency.
@env.task
async def driver(start: date, regions: list[str]) -> list[Summary]:
data = await fetch(start)
summaries = await asyncio.gather(
*[summarize(data, region) for region in regions]
)
return summariesThe driver is just a task that calls other tasks — there’s no separate workflow object.
3. Triggers (in place of schedules)
Airflow’s schedule= on a DAG maps to a Flyte Trigger attached to a task.
| Airflow | Flyte |
|---|---|
schedule="@hourly" |
flyte.Trigger.hourly() |
schedule="@daily" |
flyte.Trigger.daily() |
schedule="0 5 * * *" |
flyte.Trigger("nightly", flyte.Cron("0 5 * * *")) |
schedule="30 9 * * 1-5" + timezone=... |
flyte.Trigger("biz_hours", flyte.Cron("30 9 * * 1-5", timezone="America/New_York")) |
@env.task(
triggers=flyte.Trigger(
"daily_report",
flyte.Cron("0 6 * * *"),
inputs={"trigger_time": flyte.TriggerTime},
)
)
def generate_report(trigger_time: datetime) -> str:
...Multiple triggers per task and parameterized trigger inputs are supported — see the Triggers docs.
4. PythonOperator to @env.task
Airflow’s PythonOperator runs a Python callable in the worker’s environment. The callable’s return value becomes XCom, and inputs arrive through three channels: op_args/op_kwargs passed at operator construction, the Airflow context injected as **kwargs (ti, ds, dag_run, logical_date, …), and ti.xcom_pull(...) for data from upstream tasks.
Flyte’s equivalent is @env.task on a plain function. The function’s parameters and return type are the interface — there is no separate context channel and no XCom step.
# Airflow
def fetch_events(**context):
ds = context["ds"]
return _fetch(ds) # returned value is serialized to XCom
def summarize(**context):
ti = context["ti"]
records = ti.xcom_pull(task_ids="fetch_events")
return f"{len(records)} events on {context['ds']}"
with DAG("events", schedule="@daily", ...) as dag:
t1 = PythonOperator(task_id="fetch_events", python_callable=fetch_events)
t2 = PythonOperator(task_id="summarize", python_callable=summarize)
t1 >> t2# Flyte
env = flyte.TaskEnvironment(name="events", image=...)
@env.task
async def fetch_events(ds: str) -> list[dict]:
return _fetch(ds)
@env.task
async def summarize(ds: str, records: list[dict]) -> str:
return f"{len(records)} events on {ds}"
@env.task
async def driver(ds: str) -> str:
records = await fetch_events(ds)
return await summarize(ds, records)A few things change in the move:
- Inputs are the function parameters. No
**context. If the task needs the run’s date, declare it as a parameter (ds: str) and the driver passes it in. The driver itself can receive trigger time when a Trigger fires it. - Data flows through
await, not XCom. The value returned byfetch_eventsis the valuesummarizereceives — the function call graph IS the dependency graph. Noxcom_pulland not1 >> t2to maintain separately from the data flow. - Types are part of the signature. Flyte uses the hints to serialize between tasks, but keep expectations calibrated: the runtime is more like typed JSON than a fully enforced contract. It is useful as documentation and for tooling, not as a strict static check.
- Async-native, sync-also-works. Tasks are typically
async defand invoked withawait. Plaindeftasks are fully supported if you’d rather stay in a sync codebase — you just give up some of the flexibility async offers.
The driver above has nothing in it but task calls, for readability. It doesn’t have to — a driver is just a @env.task, and any code that belongs in a Python function belongs in a driver: plain expressions, loops, if/try, helpers. Turn something into a @env.task when you want it to have its own resources, image, retries, caching, or parallelism. Otherwise leave it as regular Python and call it inline.
Docs: Tasks
File and Dir — for data that doesn’t fit in a return value
Primitive and JSON-serializable values (int, str, list, dict, dataclasses, Pydantic models) flow directly as return values — the SDK serializes them. Same shape as XCom, but typed. Most tasks will use these and nothing else.
Flyte adds File and Dir for the cases where the payload is too big or too binary to inline. In Airflow this is where pipelines step outside the framework: XCom is a Postgres row with a soft ~48KB limit, so larger payloads are written to a shared filesystem or object storage and a path is passed as a string — the upload, the lifecycle, and the cleanup are the author’s responsibility, outside Airflow’s model.
Flyte covers both cases with the same interface. A task that returns File or Dir is declaring that its output is an offloaded blob, and the SDK handles the upload on write and the download on read.
import flyte
from flyte.io import File, Dir
@env.task
async def extract(ds: str) -> File:
# Stream straight to remote storage — no local temp needed.
file = File.new_remote()
async with file.open("wb") as f:
await f.write(b"col1,col2\nfoo,bar\n")
return file
@env.task
async def count_rows(csv: File) -> int:
async with csv.open("rb") as f:
data = await f.read()
return data.count(b"\n") - 1The File object travels between tasks the same way an int does — as a typed argument. Underneath, it carries a remote path. Common methods:
File.new_remote()— new reference in the run’s scratch area, for streaming writes.File.from_local(path)/from_local_sync(path)— upload a local file, get aFileback.File.from_existing_remote(uri)— wrap an existing remote URI (for example, a path produced by an upstream system).async with file.open("rb")/async with file.open("wb")/with file.open_sync(...)— stream read/write.await file.download()/file.download_sync()— materialize to a local path and return it.
Dir has the same surface for directories, plus walk() and list_files() to iterate entries.
Docs: Files and directories
5. TaskFlow to @env.task
If the DAG you’re porting uses Airflow’s TaskFlow API (@task, @dag), the surface move is small: @task becomes @env.task, the function’s return value is the data (no ti.xcom_pull), and function calls ARE the dependencies (no >>). A lot of TaskFlow code compiles to Flyte with little more than a find-and-replace on the decorator.
# Airflow TaskFlow
from airflow.decorators import dag, task
@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def events():
@task
def fetch_events() -> list[dict]:
return _fetch()
@task
def summarize(records: list[dict]) -> str:
return f"{len(records)} events"
summarize(fetch_events())
events()# Flyte
env = flyte.TaskEnvironment(name="events", image=...)
@env.task
async def fetch_events() -> list[dict]:
return _fetch()
@env.task
async def summarize(records: list[dict]) -> str:
return f"{len(records)} events"
@env.task(triggers=flyte.Trigger.daily())
async def driver() -> str:
return await summarize(await fetch_events())The thing worth internalizing — and the main place this stops being a find-and-replace — is what the outer function is doing.
An Airflow @dag function runs at parse time. Calling fetch_events() inside it doesn’t run fetch_events — it registers a task and an edge in the static graph. The scheduler later traverses that graph. By the time the tasks actually execute, the @dag function is long gone.
A Flyte driver is a @env.task that runs at execution time. There is no parse-time graph-building step. Calling await fetch_events() actually calls fetch_events. That means the driver — and any task — is just Python: if/else, try/except, loops, recursion, calling other tasks from inside other tasks, nested drivers, reading a value from one task and deciding what to do next. All of it works because there is no static graph to fit into.
To make the point concrete — a task can call itself:
@env.task
async def countdown(n: int) -> int:
if n == 0:
return 0
return 1 + await countdown(n - 1)Each await countdown(...) call is a real task invocation — the graph grows as the computation runs. This is impossible to express in Airflow’s @dag model, where the graph has to be known before execution.
The practical effect: patterns that Airflow encodes with its own primitives (BranchPythonOperator, ShortCircuitOperator, trigger_rule, .expand() for dynamic mapping, custom XComArg gymnastics) are just Python constructs in Flyte. Branching is if. Short-circuit is return. Dynamic mapping is asyncio.gather or flyte.map. Error handling is try/except/finally. Section 8 covers these with runnable examples.
TaskFlow decorator variants
TaskFlow ships several decorators beyond @task. Rough mapping:
| TaskFlow | Flyte equivalent |
|---|---|
@task |
@env.task |
@task.bash |
ContainerTask |
@task.virtualenv |
@env.task on a TaskEnvironment with its own image |
@task.docker |
@env.task on a TaskEnvironment with image=... |
@task.kubernetes |
TaskEnvironment + PodTemplate |
@task.branch |
plain if in the driver |
@task.short_circuit |
plain return in the driver |
6. BashOperator to ContainerTask
Airflow’s BashOperator runs a shell command in the Airflow worker’s image, with inputs rendered into the command via Jinja templating and output captured as the last line of stdout.
BashOperator(
task_id="extract",
bash_command="gsutil cp gs://bucket/data-{{ ds }}.csv /tmp/data.csv "
"&& wc -l /tmp/data.csv | awk '{print $1}'",
do_xcom_push=True,
)Flyte’s equivalent is a ContainerTask: specify an image, a command, typed inputs, and typed outputs. Inputs are substituted via {{.inputs.<name>}}; outputs are files the container writes to output_data_dir, which Flyte reads back with the declared types.
import flyte
from flyte.extras import ContainerTask
extract = ContainerTask(
name="extract",
image=flyte.Image.from_base("google/cloud-sdk:slim"),
inputs={"date": str},
outputs={"row_count": int},
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
command=[
"/bin/sh", "-c",
"gsutil cp gs://bucket/data-{{.inputs.date}}.csv /tmp/data.csv && "
"wc -l /tmp/data.csv | awk '{print $1}' > /var/outputs/row_count",
],
)A ContainerTask is invoked the same way as any other task — by calling it from a driver with await:
container_env = flyte.TaskEnvironment.from_task("extract_env", extract)
env = flyte.TaskEnvironment(
name="pipeline",
image=flyte.Image.from_debian_base().with_uv_project(pyproject_file="pyproject.toml"),
depends_on=[container_env],
)
@env.task
async def driver(date: str) -> int:
return await extract(date=date)Two things about the invocation:
TaskEnvironment.from_task(...)wraps the container task in an environment so it can be registered alongside the driver.- The driver’s env
depends_on=[container_env]so Flyte registers both together. The driver’s own image needs Flyte installed (that’s whatfrom_uv_projectdoes — builds an image from yourpyproject.toml, which includesflyte). The container task’s image does not need Flyte — it just needs the tools its command invokes.
When to use ContainerTask
ContainerTask is the right choice when the container shouldn’t or can’t have Flyte installed — for example:
- The tool is not Python (a Go/C CLI, a bioinformatics binary, an ML framework container)
- You want to reuse an existing production image without modifying it
- You want to stay out of Python entirely for the task body
If you already have Python in the loop and just need to shell out for one step, a regular @env.task with subprocess is simpler:
@env.task
async def extract(date: str) -> int:
import subprocess
subprocess.run(["gsutil", "cp", f"gs://bucket/data-{date}.csv", "/tmp/data.csv"], check=True)
out = subprocess.check_output(["wc", "-l", "/tmp/data.csv"])
return int(out.split()[0])How the arguments map
| BashOperator | ContainerTask |
|---|---|
bash_command (string) |
command=[...] (list; you choose the shell) |
| (implicit worker image) | image= (explicit, per task) |
env / append_env |
flyte.Image.from_...().with_env_vars(...) or the TaskEnvironment |
{{ ds }}, {{ ti.xcom_pull(...) }} |
{{.inputs.<name>}} |
do_xcom_push=True (last stdout line) |
outputs={...}, written to files in output_data_dir |
cwd |
cd ... && ... inside the command |
Docs: Container Tasks
7. KubernetesPodOperator to TaskEnvironment + PodTemplate
KubernetesPodOperator (KPO) gives you the full pod spec: image, commands, env, secrets, resources, volumes, node selectors, tolerations, service accounts, affinity — plus XCom via a sidecar writing to /airflow/xcom/return.json.
In Flyte, the same knobs live in three places:
TaskEnvironment(...)— the common knobs. Image, resources, env vars, secrets, interruptible/spot, and an option to add apod_templatefor every task in the env.@env.task(...)— per-task overrides on top of the env: retries, timeout, cache, triggers, and a task-levelpod_templateif this one task needs to differ.flyte.PodTemplate(...)— raw Kubernetes escape hatch. Wrapskubernetes.client.V1PodSpec, so anything in the pod spec (volumes, node selectors, tolerations, affinity, service accounts, sidecars, init containers, image pull secrets, security contexts, lifecycle hooks) is available.
XCom has no equivalent — the task’s typed return value is the output, and large payloads use File/Dir (Section 4). The sidecar-writing-to-/airflow/xcom/return.json contract doesn’t exist.
Where every KPO knob lands
| KPO argument | Flyte location |
|---|---|
image |
TaskEnvironment(image=...) |
cmds, arguments |
function body of @env.task |
env_vars (dict) |
TaskEnvironment(env_vars={...}) |
secrets=[Secret(...)] |
TaskEnvironment(secrets=[flyte.Secret(...)]) |
container_resources (requests/limits) |
TaskEnvironment(resources=flyte.Resources(cpu=(1,4), memory="2Gi", gpu="T4:1")) |
node_selector, tolerations, affinity |
flyte.PodTemplate(pod_spec=V1PodSpec(node_selector=..., tolerations=..., affinity=...)) |
service_account_name |
PodTemplate(pod_spec=V1PodSpec(service_account_name=...)) |
volumes, volume_mounts |
PodTemplate(pod_spec=V1PodSpec(volumes=[...], containers=[V1Container(volume_mounts=[...])])) |
image_pull_secrets |
PodTemplate(pod_spec=V1PodSpec(image_pull_secrets=[V1LocalObjectReference(name=...)])) |
security_context |
PodTemplate(pod_spec=V1PodSpec(security_context=...)) |
labels, annotations |
PodTemplate(labels={...}, annotations={...}) |
init_containers, sidecars |
PodTemplate(pod_spec=V1PodSpec(init_containers=[...], containers=[primary, ...])) |
retries, retry_delay |
@env.task(retries=...) |
execution_timeout |
@env.task(timeout=timedelta(...)) |
do_xcom_push + sidecar contract |
function return type — primitives/dataclasses inline, large payloads via File/Dir |
on_finish_action / pod cleanup |
handled by Flyte — pods are cleaned up per run lifecycle |
What a fully-specified task looks like
from datetime import timedelta
from kubernetes.client import V1Container, V1PodSpec
import flyte
pod_template = flyte.PodTemplate(
primary_container_name="primary",
labels={"team": "etl"},
pod_spec=V1PodSpec(
service_account_name="etl-runner",
init_containers=[
V1Container(
name="warm-cache",
image="busybox:1.36",
command=["sh", "-c", "echo warming cache && sleep 1"],
),
],
),
)
etl_env = flyte.TaskEnvironment(
name="etl",
image="registry.example.com/etl:2026.04.01",
resources=flyte.Resources(cpu=(1, 4), memory="2Gi", gpu="T4:1"),
env_vars={"LOG_LEVEL": "INFO"},
secrets=[flyte.Secret(key="db-password", as_env_var="DB_PASSWORD")],
pod_template=pod_template,
interruptible=True,
)
@etl_env.task(retries=3, timeout=timedelta(minutes=30))
async def load_warehouse(ds: str) -> int:
...You don’t have to list the primary container in the pod_spec — Flyte fills it in from the env’s image, the function’s command, and the decorator’s resources. Add a V1Container(name="primary", ...) entry only when you need to put fields on it directly (volume mounts, extra env, security context).
Docs: TaskEnvironment · Secrets · PodTemplate / advanced k8s config
8. Orchestration: parallelism, conditionals, error handling
Airflow encodes orchestration in first-class primitives — [t1, t2, t3] >> merge for fan-out, .expand() for dynamic mapping, BranchPythonOperator / @task.branch for branching, ShortCircuitOperator for early exit, trigger_rule for post-branch merges, and on_failure_callback / trigger_rule=ALL_DONE for failure paths.
In Flyte these are plain Python inside a driver task, because the driver runs at execution time. There is no static graph to encode into.
Parallelism
Static fan-out in Airflow:
fetch >> [summarize_us, summarize_eu, summarize_apac] >> mergeFlyte — concurrent awaits with asyncio.gather:
@env.task
async def driver(ds: str) -> Summary:
raw = await fetch(ds)
us, eu, apac = await asyncio.gather(
summarize(raw, "us"),
summarize(raw, "eu"),
summarize(raw, "apac"),
)
return await merge(us, eu, apac)Each summarize(...) returns a coroutine; asyncio.gather runs them concurrently and awaits all of them. Tasks called concurrently run in their own pods — the concurrency is real, not just asyncio on one worker.
Dynamic mapping
Airflow uses .expand() to fan out over values known only at runtime:
process.partial(batch_size=100).expand(shard_id=list_shards())Flyte — regular comprehension over the runtime list:
shards = await list_shards()
results = await asyncio.gather(*(process(shard) for shard in shards))To bound concurrency (for example, when the downstream is rate-limited), wrap the call in an asyncio.Semaphore:
sem = asyncio.Semaphore(20)
async def process_one(shard):
async with sem:
return await process(shard)
results = await asyncio.gather(
*(process_one(s) for s in shards),
return_exceptions=True,
)return_exceptions=True collects per-item failures instead of failing the batch. The semaphore is also the pattern when different tasks in the same fan-out need different concurrency limits.
If your codebase is sync, list(flyte.map(process, shards, concurrency=20)) is the sync equivalent of the pattern above.
Docs: Controlling parallelism · Fanout
Conditionals
Airflow: BranchPythonOperator (or @task.branch) returns the task_id(s) to run next; ShortCircuitOperator skips the rest of the branch; a trigger_rule=NONE_FAILED_MIN_ONE_SUCCESS on the merge task reconciles skipped upstreams.
Flyte:
@env.task
async def driver(ds: str) -> Summary:
size = await inspect(ds)
if size == 0:
return Summary(status="empty")
if size < 1_000_000:
return await fast_path(ds)
return await slow_path(ds)Plain if / elif / else, plain return. There is no trigger_rule to set because there are no skipped tasks to reconcile — the code below the branch simply doesn’t run.
Error handling
Per-task retries and timeouts live on the @env.task decorator:
@env.task(retries=3, timeout=timedelta(minutes=15))
async def flaky(ds: str) -> int:
...Orchestration-level error handling — Airflow’s trigger_rule=ALL_DONE cleanup and on_failure_callback alerts — is try / except / finally in the driver:
@env.task
async def driver(ds: str) -> Summary:
try:
result = await heavy_step(ds)
return await publish(result)
except Exception as e:
await alert(f"{ds} failed: {e}")
raise
finally:
await cleanup(ds)finally runs on both success and failure. except catches task failures at the await site after the task’s own retries are exhausted. Specific failure modes live in flyte.errors — OOMError, TaskTimeoutError, RetriesExhaustedError, ActionAbortedError — and can be caught by type. A common pattern is retrying an OOM with larger resources via .override(...):
import flyte.errors
env = flyte.TaskEnvironment(
name="transforms",
image=...,
resources=flyte.Resources(cpu=1, memory="250Mi"),
)
@env.task
async def transform(ds: str) -> int:
...
@env.task
async def driver(ds: str) -> int:
try:
return await transform(ds)
except flyte.errors.OOMError:
return await transform.override(
resources=flyte.Resources(cpu=1, memory="2Gi"),
)(ds)Docs: Retries and timeouts · Error handling
What’s next
Once the port is in place, a few Flyte features don’t have direct Airflow counterparts and are worth knowing about.
Caching
A task can be marked cacheable; subsequent calls with the same inputs short-circuit to the previous output instead of re-running. The cache key is derived from inputs and a task version, so bumping the version invalidates.
@env.task(cache="auto")
async def expensive(ds: str) -> Result:
...Airflow has no equivalent — XCom stores outputs but doesn’t short-circuit on re-execution.
Docs: Caching
Reusable containers
By default each task call gets a fresh pod. A reuse policy keeps the container warm across calls so follow-up invocations skip pod startup and image pull.
warm_env = flyte.TaskEnvironment(
name="warm",
image=...,
reusable=flyte.ReusePolicy(replicas=(1, 3), concurrency=2),
)Useful when a fan-out issues many short tasks against a heavy image.
Docs: Reusable containers
Reports
A task can emit an HTML report — tables, plots, logs — attached to the run and viewable in the UI. Written from inside the task with flyte.report.
@env.task(report=True)
async def summarize(ds: str) -> Summary:
tab = flyte.report.get_tab("main")
tab.log(f"<h2>{ds}</h2>")
tab.log(dataframe.to_html())
await flyte.report.flush.aio()
...Docs: Reports
Apps
A long-running HTTP server (FastAPI, Panel, Streamlit, a webhook endpoint) can be deployed alongside your tasks. The app has a URL and can call tasks via the Flyte API. This is the path for webhook-triggered runs, a UI on top of a pipeline, or a custom inference endpoint.
Docs: Serve and deploy apps · Build apps