Interruptible tasks and queues

Interruptible tasks

Cloud providers offer discounted compute instances (AWS Spot Instances, GCP Preemptible VMs) that can be reclaimed at any time. These instances are significantly cheaper than on-demand instances but come with the risk of preemption.

Setting interruptible=True allows Flyte to schedule the task on these spot/preemptible instances for cost savings:

import flyte

env = flyte.TaskEnvironment(
    name="my_env",
    interruptible=True,
)

@env.task
def train_model(data: list) -> dict:
    return {"accuracy": 0.95}

Setting at different levels

interruptible can be set at the TaskEnvironment level, the @env.task decorator level, and at the task.override() invocation level. The more specific level always takes precedence.

This lets you set a default at the environment level and override per-task:

import flyte

# All tasks in this environment are interruptible by default
env = flyte.TaskEnvironment(
    name="my_env",
    interruptible=True,
)

# This task uses the environment default (interruptible)
@env.task
def preprocess(data: list) -> list:
    return [x * 2 for x in data]

# This task overrides to non-interruptible (critical, should not be preempted)
@env.task(interruptible=False)
def save_results(results: dict) -> str:
    return "saved"

You can also override at invocation time:

@env.task
async def main(data: list) -> str:
    processed = preprocess(data=data)
    # Run this specific invocation as non-interruptible
    return save_results.override(interruptible=False)(results={"data": processed})

Behavior on preemption

When a spot instance is reclaimed, the task is terminated and rescheduled. Combine interruptible=True with retries to handle preemptions gracefully:

@env.task(interruptible=True, retries=3)
def train_model(data: list) -> dict:
    return {"accuracy": 0.95}

Retries due to spot preemption do not count against the user-configured retry budget. System retries (for preemptions and other system-level failures) are tracked separately.

Queues

Queues are named routing labels that map tasks to specific resource pools or execution clusters in your infrastructure.

Setting a queue directs the task to the corresponding compute partition:

import flyte

env = flyte.TaskEnvironment(
    name="my_env",
    queue="gpu-pool",
)

@env.task
def train_model(data: list) -> dict:
    return {"accuracy": 0.95}

Setting at different levels

queue can be set at the TaskEnvironment level, the @env.task decorator level, and at the task.override() invocation level. The more specific level takes precedence.

import flyte

env = flyte.TaskEnvironment(
    name="my_env",
    queue="default-pool",
)

# Uses environment-level queue ("default-pool")
@env.task
def preprocess(data: list) -> list:
    return [x * 2 for x in data]

# Overrides to a different queue
@env.task(queue="gpu-pool")
def train_model(data: list) -> dict:
    return {"accuracy": 0.95}

If no queue is specified at any level, the default queue is used.

Queues are configured as part of your Union.ai deployment by your platform administrator. The available queue names depend on your infrastructure setup.