Human-in-the-loop
Human-in-the-loop (HITL) workflows pause execution at a defined point, wait for a human to provide input or approval, and then continue based on that response. Common use cases include content moderation gates, model output review, anomaly confirmation, and manual approval steps before costly or irreversible operations.
The flyteplugins-hitl package provides an event-based API for this pattern. When an event is created, Flyte automatically serves a small FastAPI web app with a form where a human can submit input. The workflow then resumes with the submitted value.
pip install flyteplugins-hitlKey characteristics:
- Supports
int,float,str, andboolinput types - Crash-resilient: uses durable sleep so polling survives task restarts
- Configurable timeout and poll interval
- The web form is accessible from the task’s report in the Flyte UI
Setup
The task environment must declare hitl.env as a dependency. This makes the HITL web app available during task execution:
import flyte
import flyteplugins.hitl as hitl
# The task environment must declare hitl.env as a dependency.
# This makes the HITL web app available during task execution.
env = flyte.TaskEnvironment(
name="hitl-workflow",
image=flyte.Image.from_debian_base(name="hitl").with_pip_packages(
"flyteplugins-hitl>=2.0.0",
"fastapi",
"uvicorn",
"python-multipart",
),
resources=flyte.Resources(cpu="1", memory="512Mi"),
depends_on=[hitl.env],
)
Automated task
An automated task runs first and produces a result that requires human review:
@env.task(report=True)
async def analyze_data(dataset: str) -> dict:
"""Automated task that produces a result requiring human review."""
# Simulate analysis
result = {
"dataset": dataset,
"row_count": 142857,
"anomalies_detected": 3,
"confidence": 0.87,
}
await flyte.report.replace.aio(
f"Analysis complete: {result['anomalies_detected']} anomalies detected "
f"(confidence: {result['confidence']:.0%})"
)
await flyte.report.flush.aio()
return result
Requesting human input
Use hitl.new_event() to pause and wait for a human response. The prompt is shown on the web form. The data_type controls what type the submitted value is converted to before being returned:
@env.task(report=True)
async def request_human_review(analysis: dict) -> bool:
"""Pause and ask a human whether to proceed with the flagged records."""
event = await hitl.new_event.aio(
"review_decision",
data_type=bool,
scope="run",
prompt=(
f"Analysis found {analysis['anomalies_detected']} anomalies "
f"with {analysis['confidence']:.0%} confidence. "
"Approve for downstream processing? (true/false)"
),
)
approved: bool = await event.wait.aio()
return approved
When this task runs, Flyte:
- Serves the HITL web app (if not already running)
- Creates an event and writes a pending request to object storage
- Displays a link to the web form in the task report
- Polls for a response using durable sleep
- Returns the submitted value once input is received
Wiring it together
The main task orchestrates the automated step and the HITL gate:
@env.task(report=True)
async def main(dataset: str = "s3://my-bucket/data.parquet") -> str:
analysis = await analyze_data(dataset=dataset)
approved = await request_human_review(analysis=analysis)
if approved:
return "Processing approved — continuing pipeline."
else:
return "Processing rejected by reviewer — pipeline halted."
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main)
print(r.name)
print(r.url)
r.wait()
print(r.outputs())
Event options
hitl.new_event() accepts the following parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Descriptive name shown in logs and the UI |
data_type |
type |
— | Expected input type: int, float, str, or bool |
scope |
str |
"run" |
Scope of the event. Currently only "run" is supported |
prompt |
str |
"Please provide a value" |
Message shown on the web form |
timeout_seconds |
int |
3600 |
Maximum time to wait before raising TimeoutError |
poll_interval_seconds |
int |
5 |
How often to check for a response |
Submitting input programmatically
In addition to the web form, input can be submitted via the event’s JSON API endpoint. This is useful for automated testing or integration with external approval systems:
curl -X POST https://<hitl-app-endpoint>/submit/json \
-H "Content-Type: application/json" \
-d '{
"request_id": "<request_id>",
"response_path": "<response_path>",
"value": "true",
"data_type": "bool"
}'The request_id and response_path are shown in the task report alongside the form URL.