Flyte 2
Pure Python execution
Write workflows in pure Python, enabling a more natural development experience and removing the constraints of a domain-specific language (DSL).
import flyte
env = flyte.TaskEnvironment("sync_example_env")
@env.task
def hello_world(name: str) -> str:
return f"Hello, {name}!"
@env.task
def main(name: str) -> str:
for i in range(10):
hello_world(name)
return "Done"
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main, name="World")
print(r.name)
print(r.url)
r.wait()import asyncio
import flyte
env = flyte.TaskEnvironment("async_example_env")
@env.task
async def hello_world(name: str) -> str:
return f"Hello, {name}!"
@env.task
async def main(name: str) -> str:
results = []
for i in range(10):
results.append(hello_world(name))
await asyncio.gather(*results)
return "Done"
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main, name="World")
print(r.name)
print(r.url)
r.wait()As you can see in the hello world example, workflows can be constructed at runtime, allowing for more flexible and adaptive behavior. The Flyte 2 also supports:
- Python’s asynchronous programming model to express parallelism.
- Python’s native error handling with
try-exceptto overridden configurations, like resource requests. - Predefined static workflows when compile-time safety is critical.
Simplified API
The new API is more intuitive, with fewer abstractions to learn and a focus on simplicity.
| Use case | Flyte 1 | Flyte 2 |
|---|---|---|
| Environment management | N/A |
TaskEnvironment |
| Perform basic computation | @task |
@env.task |
| Combine tasks into a workflow | @workflow |
@env.task |
| Create dynamic workflows | @dynamic |
@env.task |
| Fanout parallelism | flytekit.map |
Python for loop with asyncio.gather |
| Conditional execution | flytekit.conditional |
Python if-elif-else |
| Catching workflow failures | @workflow(on_failure=...) |
Python try-except |
There is no @workflow decorator. Instead, “workflows” are authored through a pattern of tasks calling tasks.
Tasks are defined within environments, which encapsulate the context and resources needed for execution.
Fine-grained reproducibility and recoverability
Flyte tasks support caching via @env.task(cache=...), but tracing with @flyte.trace augments task level-caching
even further enabling reproducibility and recovery at the sub-task function level.
import flyte
env = flyte.TaskEnvironment(name="trace_example_env")
@flyte.trace
async def call_llm(prompt: str) -> str:
return "Initial response from LLM"
@env.task
async def finalize_output(output: str) -> str:
return "Finalized output"
@env.task(cache=flyte.Cache(behavior="auto"))
async def main(prompt: str) -> str:
output = await call_llm(prompt)
output = await finalize_output(output)
return output
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main, prompt="Prompt to LLM")
print(r.name)
print(r.url)
r.wait()Here the call_llm function is called in the same container as main that serves as an automated checkpoint with full
observability in the UI. If the task run fails, the workflow is able to recover and replay from where it left off.
Improved remote functionality
Flyte 2 provides full management of the workflow lifecycle through a standardized API through the CLI and the Python SDK.
| Use case | CLI | Python SDK |
|---|---|---|
| Run a task | flyte run ... |
flyte.run(...) |
| Deploy a task | flyte deploy ... |
flyte.deploy(...) |
You can also fetch and run remote (previously deployed) tasks within the course of a running workflow.
import flyte
from flyte import remote
env_1 = flyte.TaskEnvironment(name="env_1")
env_2 = flyte.TaskEnvironment(name="env_2")
env_1.add_dependency(env_2)
@env_2.task
async def remote_task(x: str) -> str:
return "Remote task processed: " + x
@env_1.task
async def main() -> str:
remote_task_ref = remote.Task.get("env_2.remote_task", auto_version="latest")
r = await remote_task_ref(x="Hello")
return "main called remote and recieved: " + r
if __name__ == "__main__":
flyte.init_from_config()
d = flyte.deploy(env_1)
print(d[0].summary_repr())
r = flyte.run(main)
print(r.name)
print(r.url)
r.wait()Native Notebook support
Author and run workflows and fetch workflow metadata (I/O and logs) directly from Jupyter notebooks.
High performance engine
Schedule tasks in milliseconds with reusable containers, which massively increases the throughput of containerized tasks.
# Currently required to enable resuable containers
reusable_image = flyte.Image.from_debian_base().with_pip_packages("unionai-reuse>=0.1.3")
env = flyte.TaskEnvironment(
name="reusable-env",
resources=flyte.Resources(memory="1Gi", cpu="500m"),
reusable=flyte.ReusePolicy(
replicas=2, # Create 2 container instances
concurrency=1, # Process 1 task per container at a time
scaledown_ttl=timedelta(minutes=10), # Individual containers shut down after 5 minutes of inactivity
idle_ttl=timedelta(hours=1) # Entire environment shuts down after 30 minutes of no tasks
),
image=reusable_image # Use the container image augmented with the unionai-reuse library.
)Enhanced UI
New UI with a streamlined and user-friendly experience for authoring and managing workflows.
This UI improves the visualization of workflow execution and monitoring, simplifying access to logs, metadata, and other important information.