External conditions
An external condition is a first-class action that pauses a running task until an external signal arrives — a human approval, a callback from an external service, or a value supplied at runtime. The paused action stays observable, resumable, and governable like any other action, so you no longer need polling loops or side processes to wait on something the workflow can’t produce itself.
Inside a task, await flyte.new_condition.aio(...) registers a condition action and returns a
handle; await handle.wait.aio() blocks the task until the condition is signaled and returns the
typed payload. (new_condition and wait are sync-by-default; in an async def task use their
.aio() form.)
Supported types
A condition declares a data_type, which determines what a signal must supply and what wait()
returns:
data_type |
wait() returns |
A signal value of |
|---|---|---|
bool (default) |
True / False |
true / false |
int |
Python int |
an integer literal |
float |
Python float |
a decimal literal |
str |
Python str |
any string |
Example: human approval
A typed approval gate with a timeout — the most common use case:
from datetime import timedelta
import flyte
env = flyte.TaskEnvironment("approvals")
@env.task
async def etl_pipeline():
staged = await transform()
approval = await flyte.new_condition.aio(
"prod_write_approval",
prompt="Approve writing staged data to production?",
data_type=bool,
timeout=timedelta(hours=24),
)
if not await approval.wait.aio():
raise RuntimeError("Pipeline rejected by reviewer")
await write_to_prod(staged)The task pauses at await approval.wait.aio() until someone signals the condition (see
Signaling a condition). If the timeout elapses with no signal, wait()
raises flyte.errors.ConditionTimedoutError.
Example: string input at runtime
A condition can collect a typed value — not just a yes/no — and feed it back into the workflow. Here the task waits for a free-form string before continuing:
import flyte
env = flyte.TaskEnvironment("conditions")
@env.task
async def deploy_with_reason():
reason = await flyte.new_condition.aio(
"deploy_reason",
prompt="Enter a deployment reason to continue:",
data_type=str,
)
note: str = await reason.wait.aio()
# `note` now holds the string a human supplied — use it downstream.
await record_audit(note)Parameters
flyte.new_condition(
name,
prompt="Approve?",
prompt_type="text",
data_type=bool,
description="",
timeout=None,
webhook=None,
)| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Identifier for the condition within the parent action. Signal it with this name (flyte signal condition <run> <name>) or look it up with flyte.remote.Condition.get("<name>", ...). |
prompt |
str |
"Approve?" |
Human-readable text shown in the UI signal form. |
prompt_type |
"text" | "markdown" |
"text" |
How the prompt is rendered. |
data_type |
type |
bool |
Payload type — one of bool, int, float, str. Determines what wait() returns and what a signal must supply. |
description |
str |
"" |
Longer explanation rendered alongside the prompt. |
timeout |
timedelta | int | float | None |
None |
Maximum wait. If it elapses with no signal, wait() raises flyte.errors.ConditionTimedoutError. |
An optional advanced webhook parameter accepts a flyte.ConditionWebhook so the backend POSTs a
callback URL when the condition is created; see the API reference for details.
Signaling a condition
A condition is satisfied by delivering exactly one typed signal of its declared data_type.
From the CLI
# List the conditions on a run, optionally scoped to one parent action.
flyte get condition <run-name>
flyte get condition <run-name> <parent-action>
# Signal a specific condition by its name. Omit the value for an interactive typed prompt.
flyte signal condition <run-name> <condition-name> trueThe value is coerced to the condition’s declared data_type (true/false for bool, integer
literals for int, decimal literals for float, any string for str).
From Python (remote)
import flyte.remote
condition = flyte.remote.Condition.get(
"prod_write_approval",
run_name="deploy-run-123",
action_name="etl_pipeline",
)
condition.signal(True)flyte.remote.Condition.listall(run_name=...) enumerates the conditions on a run.
Timeout with a fallback
from datetime import timedelta
import flyte
from flyte.errors import ConditionTimedoutError
env = flyte.TaskEnvironment("conditions")
@env.task
async def with_default():
cond = await flyte.new_condition.aio("threshold", data_type=float, timeout=timedelta(minutes=30))
try:
threshold = await cond.wait.aio()
except ConditionTimedoutError:
threshold = 0.5 # proceed with a default if no one respondsErrors
| Situation | Raised |
|---|---|
| Timeout elapses before a signal | flyte.errors.ConditionTimedoutError |
Creating a condition whose name already exists in the action |
flyte.errors.ConditionAlreadyExistsError |
| Condition fails during execution | flyte.errors.ConditionFailedError |
Signal value doesn’t match data_type |
TypeError (client-side, before any call) |
wait() called outside a task context |
RuntimeError |