Human-in-the-loop

Human-in-the-loop (HITL) workflows pause execution at a defined point, wait for a human to provide input or approval, and then continue based on that response. Common use cases include content moderation gates, model output review, anomaly confirmation, and manual approval steps before costly or irreversible operations.

The flyteplugins-hitl package provides an event-based API for this pattern. When an event is created, Flyte automatically serves a small FastAPI web app with a form where a human can submit input. The workflow then resumes with the submitted value.

pip install flyteplugins-hitl

Key characteristics:

  • Supports int, float, str, and bool input types
  • Crash-resilient: uses durable sleep so polling survives task restarts
  • Configurable timeout and poll interval
  • The web form is accessible from the task’s report in the Flyte UI

Setup

The task environment must declare hitl.env as a dependency. This makes the HITL web app available during task execution:

hitl.py
import flyte
import flyteplugins.hitl as hitl

# The task environment must declare hitl.env as a dependency.
# This makes the HITL web app available during task execution.
env = flyte.TaskEnvironment(
    name="hitl-workflow",
    image=flyte.Image.from_debian_base(name="hitl").with_pip_packages(
        "flyteplugins-hitl>=2.0.0",
        "fastapi",
        "uvicorn",
        "python-multipart",
    ),
    resources=flyte.Resources(cpu="1", memory="512Mi"),
    depends_on=[hitl.env],
)

Automated task

An automated task runs first and produces a result that requires human review:

hitl.py
@env.task(report=True)
async def analyze_data(dataset: str) -> dict:
    """Automated task that produces a result requiring human review."""
    # Simulate analysis
    result = {
        "dataset": dataset,
        "row_count": 142857,
        "anomalies_detected": 3,
        "confidence": 0.87,
    }
    await flyte.report.replace.aio(
        f"Analysis complete: {result['anomalies_detected']} anomalies detected "
        f"(confidence: {result['confidence']:.0%})"
    )
    await flyte.report.flush.aio()
    return result

Requesting human input

Use hitl.new_event() to pause and wait for a human response. The prompt is shown on the web form. The data_type controls what type the submitted value is converted to before being returned:

hitl.py
@env.task(report=True)
async def request_human_review(analysis: dict) -> bool:
    """Pause and ask a human whether to proceed with the flagged records."""
    event = await hitl.new_event.aio(
        "review_decision",
        data_type=bool,
        scope="run",
        prompt=(
            f"Analysis found {analysis['anomalies_detected']} anomalies "
            f"with {analysis['confidence']:.0%} confidence. "
            "Approve for downstream processing? (true/false)"
        ),
    )
    approved: bool = await event.wait.aio()
    return approved

When this task runs, Flyte:

  1. Serves the HITL web app (if not already running)
  2. Creates an event and writes a pending request to object storage
  3. Displays a link to the web form in the task report
  4. Polls for a response using durable sleep
  5. Returns the submitted value once input is received

Wiring it together

The main task orchestrates the automated step and the HITL gate:

hitl.py
@env.task(report=True)
async def main(dataset: str = "s3://my-bucket/data.parquet") -> str:
    analysis = await analyze_data(dataset=dataset)

    approved = await request_human_review(analysis=analysis)

    if approved:
        return "Processing approved — continuing pipeline."
    else:
        return "Processing rejected by reviewer — pipeline halted."


if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(main)
    print(r.name)
    print(r.url)
    r.wait()
    print(r.outputs())

Event options

hitl.new_event() accepts the following parameters:

Parameter Type Default Description
name str Descriptive name shown in logs and the UI
data_type type Expected input type: int, float, str, or bool
scope str "run" Scope of the event. Currently only "run" is supported
prompt str "Please provide a value" Message shown on the web form
timeout_seconds int 3600 Maximum time to wait before raising TimeoutError
poll_interval_seconds int 5 How often to check for a response

Submitting input programmatically

In addition to the web form, input can be submitted via the event’s JSON API endpoint. This is useful for automated testing or integration with external approval systems:

curl -X POST https://<hitl-app-endpoint>/submit/json \
  -H "Content-Type: application/json" \
  -d '{
    "request_id": "<request_id>",
    "response_path": "<response_path>",
    "value": "true",
    "data_type": "bool"
  }'

The request_id and response_path are shown in the task report alongside the form URL.