Error handling
One of the key features of Flyte 2 is the ability to recover from user-level errors in a workflow execution. This includes out-of-memory errors and other exceptions.
In a distributed system with heterogeneous compute, certain types of errors are expected and even, in a sense, acceptable. Flyte 2 recognizes this and allows you to handle them gracefully as part of your workflow logic.
This ability is a direct result of the fact that workflows are now written in regular Python, giving you with all the power and flexibility of Python error handling. Let’s look at an example:
import asyncio
import flyte
import flyte.errors
env = flyte.TaskEnvironment(name="fail", resources=flyte.Resources(cpu=1, memory="250Mi"))
@env.task
async def oomer(x: int):
large_list = [0] * 100000000
print(len(large_list))
@env.task
async def always_succeeds() -> int:
await asyncio.sleep(1)
return 42
@env.task
async def failure_recovery() -> int:
try:
await oomer(2)
except flyte.errors.OOMError as e:
print(f"Failed with oom trying with more resources: {e}, of type {type(e)}, {e.code}")
try:
await oomer.override(resources=flyte.Resources(cpu=1, memory="1Gi"))(5)
except flyte.errors.OOMError as e:
print(f"Failed with OOM Again giving up: {e}, of type {type(e)}, {e.code}")
raise e
finally:
await always_succeeds()
return await always_succeeds()
if __name__ == "__main__":
flyte.init_from_config("config.yaml")
run = flyte.run(failure_recovery)
print(run.url)
run.wait(run)
In this code, we do the following:
- Import the necessary modules
- Set up the task environment. Note that we define our task environment with a resource allocation of 1 CPU and 250 MiB of memory.
- Define two tasks: one that will intentionally cause an out-of-memory (OOM) error, and another that will always succeed.
- Define the main task (the top level workflow task) that will handle the failure recovery logic.
The top try...catch
block attempts to run the oomer
task with a parameter that is likely to cause an OOM error.
If the error occurs, it catches the
flyte.errors.OOMError
and attempts to run the oomer
task again with increased resources.
This type of dynamic error handling allows you to gracefully recover from user-level errors in your workflows.