Flyte
AI
Training and Finetuning
Agentic AI

Building Crash-Proof AI Systems

David Espejo

David Espejo

Traditional data pipelines have predictable timing. An ETL job that loads a database takes 10 minutes today and, very likely, will take 10 minutes tomorrow. You can set timeouts. You can predict costs.

AI workflows break all these assumptions:

Copied to clipboard!
import flyte

env = flyte.TaskEnvironment()

@env.task
async def analyze_data(dataset_path: str) -> dict:
    """Agent-driven data analysis - how long will this take?"""
    # This could take 30 seconds or 30 minutes
    # The AI agent decides how many tools to call
    # Each tool call might trigger more tool calls
    response = await agent.analyze(dataset_path, tools=[
        query_database,
        run_statistical_analysis,
        fetch_external_data,
        train_small_model  # This one might take hours!
    ])
    return process_agent_response(response)

The challenges:

  • Unknown duration: An AI agent might reason for 5 minutes or 5 hours
  • Heterogeneous resource demands: GPU training, large model inference, data downloads
  • Dynamic behavior: The same input might trigger different tool sequences
  • External dependencies: API calls, database queries, file uploads that can't be easily repeated

When these workflows crash (and they will), what happens to:

  • That $50,000 GPU training job?
  • The 100GB dataset you just downloaded?
  • The API calls that created resources in external systems?
  • The 6 hours of AI agent reasoning?

Traditional orchestration will restart execution from scratch; but that's not acceptable.

Enter Durable Executions

Durable executions are a programming model that guarantees your workflow can survive any failure and resume exactly where it left off—as if the failure never happened.

Think of it like a video game with autosave. When your game crashes, you don't restart from the beginning. You load from the last checkpoint and continue playing. The gameplay after the crash is identical to what it would have been without the crash.

That's what durable executions provide for your AI workflows.

The Three Guarantees

A truly durable execution system provides three critical guarantees:

1. Atomic Actions: All-or-Nothing Operations

When an action starts, it runs to completion without interruption. You never observe partial states.

Copied to clipboard!
@env.task
async def train_model(data: pd.DataFrame, config: dict) -> flyte.io.File:
    """
    This entire training run is atomic.
    It either completes fully or hasn't started.
    You never see a half-trained model.
    """
    model = initialize_model(config)
    
    # Even if this takes 48 hours, it's treated as one atomic operation
    for epoch in range(config["epochs"]):
        train_one_epoch(model)
        save_checkpoint(f"checkpoint-{epoch}")
    
    model_path = model.save("final_model.pkl")
    return flyte.io.File.from_local(model_path)

2. Progress Tracking: Never Lose Your Place

The system logs every state transition. If something fails, it knows exactly where to resume.

Copied to clipboard!
@env.task
async def multi_step_pipeline(input_data: dict) -> dict:
    """
    Each step is tracked. Failure recovery resumes from the last completed step.
    """
    # Step 1: Logged when complete
    cleaned = await clean_data(data=input_data)
    
    # Step 2: Logged when complete
    features = await extract_features(data=cleaned)
    
    # Step 3: Logged when complete (and expensive!)
    model_file = await train_expensive_model(features=features)  # $50K, 48 hours
    
    # Step 4: Logged when complete
    results = await evaluate_model(model=model_file)
    
    return results

# If crash happens after Step 2:
# Step 1: Skip (already completed)
# Step 2: Skip (already completed)
# Step 3: Execute (hasn't started yet)
# Step 4: Pending

3. Failure Transparency: Crashes Are Invisible

After recovery, the execution continues as if nothing happened. The sequence of operations is identical to a failure-free run.

This is the magic that prevents waste:

  • No duplicate API calls
  • No re-downloading data
  • No restarting expensive computations
  • No inconsistent states

How Flyte V2 Implements Durable Executions

Flyte V2 achieves durability through two key mechanisms working together: Run-to-Completion Semantics and Trace-Based Checkpointing.

Mechanism 1: Run-to-Completion Semantics

Every task decorated with `@env.task` becomes an atomic action. Once it starts executing, it proceeds until reaching a terminal state (success or failure) without preemption.

This provides exactly-once execution semantics: each task runs observably exactly once, producing exactly one result or error.

Mechanism 2: Trace-Based Checkpointing

For fine-grained checkpointing within a task, Flyte V2 uses Traces—similar to database write-ahead logs. Each traced operation is logged after execution, creating a recoverable history.

Copied to clipboard!
@env.task
async def process_large_batch(items: list[str]) -> list[dict]:
    """
    Process thousands of items with external API calls.
    Without traces: crash means restart from zero.
    With traces: resume from last successful checkpoint.
    """
    
    # Phase 1: Submit all items to external service (traced/checkpointed)
    @flyte.trace
    async def submit_jobs(items: list[str]) -> dict[str, str]:
        """Submit returns job IDs. This is checkpointed."""
        job_ids = {}
        for item in items:
            job_id = await external_api.submit(item)
            job_ids[item] = job_id
        return job_ids
    
    # Phase 2: Wait for all jobs to complete (traced/checkpointed)
    @flyte.trace
    async def wait_for_completion(job_mapping: dict[str, str]) -> list[dict]:
        """Poll job status until done. This is checkpointed."""
        results = []
        for item, job_id in job_mapping.items():
            result = await external_api.poll_until_complete(job_id)
            results.append(result)
        return results
    
    # Execute with automatic checkpointing
    job_mapping = await submit_jobs(items)  # Checkpoint 1
    results = await wait_for_completion(job_mapping)  # Checkpoint 2
    return results

# Timeline with crash:
# 10:00 - Start execution
# 10:15 - submit_jobs completes → CHECKPOINT 1 saved (job_mapping stored)
# 10:20 - wait_for_completion starts polling
# 10:35 - CRASH (network failure, pod eviction, etc.)
# 10:40 - System recovers, workflow resumes
# 10:40 - submit_jobs: SKIP (checkpoint exists, use cached job_mapping)
# 10:40 - wait_for_completion:  RESUME polling with same job IDs
# 10:50 - Complete successfully
#
# Result: No duplicate submissions! No wasted API calls!

Why this matters for AI workloads:

  1. Expensive LLM calls: Each API call costs money. Traces prevent re-calling on restart.
  2. Agent reasoning: AI agents build up context over time. Traces preserve this context.
  3. External state changes: File uploads, database writes, resource creation—all recorded and not repeated.

Putting It Together: A Complete Example

Here's how atomic actions and traces work together to create a truly durable AI workflow:

Copied to clipboard!
import flyte
import pandas as pd

image = flyte.Image.from_debian_base().with_pip_packages("pandas", "httpx", "openai")
env = flyte.TaskEnvironment(name="ai-pipeline", image=image)

@env.task(cache="auto")
async def download_training_data(urls: list[str]) -> pd.DataFrame:
    """
    ATOMIC ACTION: Downloads all data or fails completely.
    CACHED: Won't re-execute on workflow restart.
    """
    data = []
    async with httpx.AsyncClient() as client:
        for url in urls:
            response = await client.get(url)  # Might download GBs
            data.append(parse_content(response.text))
    return pd.DataFrame(data)

@env.task
async def ai_agent_training(dataset: pd.DataFrame, config: dict) -> str:
    """
    ATOMIC ACTION with TRACES: Long-running AI agent with checkpoints.
    If crash occurs, resumes from last checkpoint.
    """
    
    # Phase 1: Agent explores hyperparameter space (traced)
    @flyte.trace
    async def hyperparameter_search(data: pd.DataFrame) -> dict:
        """Agent-driven search. Duration unknown. Result checkpointed."""
        agent = OptimizerAgent()
        best_config = await agent.optimize(
            data=data,
            max_iterations=100,  # Could take minutes or hours
            tools=[train_small_model, evaluate_metrics]
        )
        return best_config
    
    # Phase 2: Train final model with best config (traced)
    @flyte.trace
    async def train_final_model(data: pd.DataFrame, hyperparams: dict) -> flyte.io.File:
        """Expensive training. $50K in compute. Result checkpointed."""
        model = LargeLanguageModel()
        await model.train(
            data=data,
            config=hyperparams,
            epochs=100  # 48 hours on 8x A100 GPUs
        )
        model_path = model.save_to_storage()
        return flyte.io.File.from_local(model_path)
    
    # Execute phases with automatic checkpointing
    best_config = await hyperparameter_search(data=dataset)  # Checkpoint 1
    model_file = await train_final_model(data=dataset, hyperparams=best_config)  # Checkpoint 2
    
    return model_file

@env.task
async def deploy_model(model_file: flyte.io.File) -> str:
    """
    ATOMIC ACTION: Deployment is all-or-nothing.
    """
    local_path = model_file.download()
    endpoint = await production_deployer.deploy(local_path)
    await production_deployer.run_smoke_tests(endpoint)
    return endpoint

@env.task
async def complete_ml_pipeline(data_urls: list[str], config: dict) -> str:
    """
    DURABLE WORKFLOW: Can survive any failure at any point.
    
    Failure scenarios handled:
    - Crash during data download → Resume download (or skip if cached)
    - Crash during hyperparameter search → Resume from last checkpoint
    - Crash during training → Resume from last training checkpoint
    - Crash during deployment → Retry deployment (idempotent)
    """
    
    # Step 1: Atomic, cached
    training_data = await download_training_data(urls=data_urls)
    
    # Step 2: Atomic with internal traces
    model_file = await ai_agent_training(dataset=training_data, config=config)
    
    # Step 3: Atomic
    endpoint = await deploy_model(model_file=model_file)
    
    return endpoint

The Developer Experience

The power of Flyte V2's durable executions is that you get all these guarantees with minimal code changes. Here's a diff comparison showing what gets removed (and what little gets added):

Copied to clipboard!
- from tenacity import retry, stop_after_attempt, wait_exponential
- import pickle
- import os
+ import flyte
+ import httpx

- # Global checkpoint management
- CHECKPOINT_DIR = "/tmp/checkpoints"
+ # Define task environment
+ image = flyte.Image.from_debian_base().with_pip_packages("httpx")
+ env = flyte.TaskEnvironment(name="data-pipeline", image=image)

- @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60))
- def download_with_retry(url: str):
-     """Manual retry logic for every operation"""
-     checkpoint_file = f"{CHECKPOINT_DIR}/{hash(url)}.pkl"
-     
-     # Check if already downloaded
-     if os.path.exists(checkpoint_file):
-         with open(checkpoint_file, 'rb') as f:
-             return pickle.load(f)
-     
-     try:
-         data = httpx.get(url).content
-         # Manual checkpoint
-         with open(checkpoint_file, 'wb') as f:
-             pickle.dump(data, f)
-         return data
-     except Exception as e:
-         if os.path.exists(checkpoint_file):
-             with open(checkpoint_file, 'rb') as f:
-                 return pickle.load(f)
-         raise
+ @env.task(cache="auto")
+ async def download_data(url: str) -> flyte.io.File:
+     """Just write the happy path. Flyte V2 handles everything."""
+     async with httpx.AsyncClient() as client:
+         content = await client.get(url)
+         return flyte.io.File.from_string(content.text, "data.txt")

- def train_with_retry(data: list):
-     """Manual retry logic for training"""
-     checkpoint_file = f"{CHECKPOINT_DIR}/training_checkpoint.pkl"
-     
-     if os.path.exists(checkpoint_file):
-         with open(checkpoint_file, 'rb') as f:
-             return pickle.load(f)
-     
-     try:
-         model = Model()
-         model.train(data)
-         result = model.save()
-         
-         # Manual checkpoint
-         with open(checkpoint_file, 'wb') as f:
-             pickle.dump(result, f)
-         return result
-     except Exception as e:
-         if os.path.exists(checkpoint_file):
-             with open(checkpoint_file, 'rb') as f:
-                 return pickle.load(f)
-         raise
+ @env.task
+ async def train_model(data_file: flyte.io.File) -> flyte.io.File:
+     """No try/except. No checkpointing code. No retry logic."""
+     data = data_file.read()
+     model = Model()
+     model.train(data)
+     model_path = model.save()
+     return flyte.io.File.from_local(model_path)

- def my_workflow(urls: list[str]):
-     """Every function needs try/except, checkpointing, retry logic"""
-     
-     # Check workflow-level checkpoint
-     if os.path.exists(f"{CHECKPOINT_DIR}/workflow_state.pkl"):
-         with open(f"{CHECKPOINT_DIR}/workflow_state.pkl", 'rb') as f:
-             state = pickle.load(f)
-         step = state['current_step']
-     else:
-         step = 0
-     
-     try:
-         if step == 0:
-             data = [download_with_retry(url) for url in urls]
-             # Save checkpoint
-             with open(f"{CHECKPOINT_DIR}/workflow_state.pkl", 'wb') as f:
-                 pickle.dump({'current_step': 1, 'data': data}, f)
-             step = 1
-         
-         if step == 1:
-             # Load previous data
-             with open(f"{CHECKPOINT_DIR}/workflow_state.pkl", 'rb') as f:
-                 data = pickle.load(f)['data']
-             
-             model = train_with_retry(data)
-             # Save checkpoint
-             with open(f"{CHECKPOINT_DIR}/workflow_state.pkl", 'wb') as f:
-                 pickle.dump({'current_step': 2, 'model': model}, f)
-             step = 2
-         
-         # ... more steps with similar boilerplate ...
-         
-     except Exception as e:
-         # Handle workflow-level failure
-         log_failure(e)
-         raise
+ @env.task
+ async def my_workflow(urls: list[str]) -> flyte.io.File:
+     """Clean. Readable. Fully durable."""
+     # Download all data files
+     data_files = [await download_data(url=url) for url in urls]
+     
+     # Train model with first file (or combine them)
+     model_file = await train_model(data_file=data_files[0])
+     
+     return model_file

What changed:

  • Removed ~75 lines of manual error handling, checkpointing, and retry logic
  • Added ~15 lines of simple, declarative task definitions
  • 83% less code
  • 100% more reliable

The complexity doesn't disappear—it moves to the platform where it belongs. Flyte V2 handles:

  • Automatic checkpointing
  • Intelligent retry logic
  • State persistence
  • Recovery orchestration
  • Cache management
  • Observability
  • Dynamic infrastructure provisioning
  • Dynamic data marshalling
  • Multi-framework compatbility (Ray, Spark, Dask, etc)
  • Container image building

You write business logic. Flyte V2 handles reliability.

Conclusion: Durable Executions as a Primitive

Durable executions aren't a nice-to-have feature; they're a fundamental requirement for modern AI systems. As workflows become more autonomous, more expensive, and more unpredictable, the ability to survive failures transparently becomes critical.

Flyte V2 makes durable executions a first-class primitive in your AI infrastructure:

  • Atomic actions: All-or-nothing execution semantics
  • Trace-based checkpointing: Never lose progress
  • Failure transparency: Crashes are invisible to your logic
  • Zero boilerplate: Minimal decorators, maximum reliability
  • Cost efficiency: No wasteful re-execution

The paradigm is simple: Write code for the happy path. Let Flyte V2 handle everything else.

Learn More

Further Reading

Have questions about implementing durable executions in your AI systems? Join our community Slack or reach out to our team.

No items found.

More from Union.

What Changes When Experiment Tracking Is Native to the Orchestrator?

What Changes When Experiment Tracking Is Native to the Orchestrator?

Union.ai
Flyte
Observability
Partner
Training and Finetuning
Union.ai on Nebius: Orchestrating the Future of AI Workloads in the Cloud

Union.ai on Nebius: Orchestrating the Future of AI Workloads in the Cloud

Union.ai
AI
Data Processing
Training and Finetuning
Inference
Flyte vs. Ray vs. Flyte + Ray: Choosing the Right Tool for Distributed AI Workflows

Flyte vs. Ray vs. Flyte + Ray: Choosing the Right Tool for Distributed AI Workflows

Flyte
Observability
Data Processing
Training and Finetuning
Compare