# Programming
> This bundle contains all pages in the Programming section.
> Source: https://www.union.ai/docs/v1/union/user-guide/programming/

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/programming ===

# Programming

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

This section covers the general programming of Union.ai.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/programming/chaining-entities ===

# Chaining Entities

Union.ai offers a mechanism for chaining entities using the `>>` operator. This is particularly valuable when chaining tasks and subworkflows without the need for data flow between the entities.

## Tasks

Let’s establish a sequence where `t1()` occurs after `t0()`, and `t2()` follows `t1()`.

```python
import union

@union.task
def t2():
    print("Running t2")
    return

@union.task
def t1():
    print("Running t1")
    return

@union.task
def t0():
    print("Running t0")
    return

# Chaining tasks
@union.workflow
def chain_tasks_wf():
    t2_promise = t2()
    t1_promise = t1()
    t0_promise = t0()

    t0_promise >> t1_promise
    t1_promise >> t2_promise
```

## Subworkflows

Just like tasks, you can chain subworkflows.

```python
@union.workflow
def sub_workflow_1():
    t1()

@union.workflow
def sub_workflow_0():
    t0()

@union.workflow
def chain_workflows_wf():
    sub_wf1 = sub_workflow_1()
    sub_wf0 = sub_workflow_0()

    sub_wf0 >> sub_wf1
```

> [!NOTE]
> Chaining tasks and subworkflows is not supported in local Python environments.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/programming/conditionals ===

# Conditionals

Union elevates conditions to a first-class construct named `conditional`, providing a powerful mechanism for selectively
executing branches in a workflow. Conditions leverage static or dynamic data generated by tasks or
received as workflow inputs. While conditions are highly performant in their evaluation,
it's important to note that they are restricted to specific binary and logical operators
and are applicable only to primitive values.

To begin, import the necessary libraries.

```python
import random

import union
from flytekit import conditional
from flytekit.core.task import Echo
```

## Simple branch

In this example, we introduce two tasks, `calculate_circle_circumference` and
`calculate_circle_area`. The workflow dynamically chooses between these tasks based on whether the input
falls within the fraction range (0-1) or not.

```python
@union.task
def calculate_circle_circumference(radius: float) -> float:
    return 2 * 3.14 * radius  # Task to calculate the circumference of a circle

@union.task
def calculate_circle_area(radius: float) -> float:
    return 3.14 * radius * radius  # Task to calculate the area of a circle

@union.workflow
def shape_properties(radius: float) -> float:
    return (
        conditional("shape_properties")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(calculate_circle_circumference(radius=radius))
        .else_()
        .then(calculate_circle_area(radius=radius))
    )

if __name__ == "__main__":
    radius_small = 0.5
    print(f"Circumference of circle (radius={radius_small}): {shape_properties(radius=radius_small)}")

    radius_large = 3.0
    print(f"Area of circle (radius={radius_large}): {shape_properties(radius=radius_large)}")
```

## Multiple branches

We establish an `if` condition with multiple branches, which will result in a failure if none of the conditions is met.
It's important to note that any `conditional` statement in Flyte is expected to be complete,
meaning that all possible branches must be accounted for.

```python
@union.workflow
def shape_properties_with_multiple_branches(radius: float) -> float:
    return (
        conditional("shape_properties_with_multiple_branches")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(calculate_circle_circumference(radius=radius))
        .elif_((radius >= 1.0) & (radius <= 10.0))
        .then(calculate_circle_area(radius=radius))
        .else_()
        .fail("The input must be within the range of 0 to 10.")
    )
```

> [!NOTE]
> Take note of the usage of bitwise operators (`&`). Due to Python's PEP-335,
> the logical `and`, `or` and `not` operators cannot be overloaded.
> Flytekit employs bitwise `&` and `|` as equivalents for logical `and` and `or` operators,
> a convention also observed in other libraries.

## Consuming the output of a conditional

Here, we write a task that consumes the output returned by a `conditional`.

```python
@union.workflow
def shape_properties_accept_conditional_output(radius: float) -> float:
    result = (
        conditional("shape_properties_accept_conditional_output")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(calculate_circle_circumference(radius=radius))
        .elif_((radius >= 1.0) & (radius <= 10.0))
        .then(calculate_circle_area(radius=radius))
        .else_()
        .fail("The input must exist between 0 and 10.")
    )
    return calculate_circle_area(radius=result)

if __name__ == "__main__":
    radius_small = 0.5
    print(
        f"Circumference of circle (radius={radius_small}) x Area of circle (radius={calculate_circle_circumference(radius=radius_small)}): {shape_properties_accept_conditional_output(radius=radius_small)}"
    )
```

## Using the output of a previous task in a conditional

You can check if a boolean returned from the previous task is `True`,
but unary operations are not supported directly. Instead, use the `is_true`,
`is_false` and `is_none` methods on the result.

```python
@union..task
def coin_toss(seed: int) -> bool:
    """
    Mimic a condition to verify the successful execution of an operation
    """
    r = random.Random(seed)
    if r.random() < 0.5:
        return True
    return False

@union..task
def failed() -> int:
    """
    Mimic a task that handles failure
    """
    return -1

@union..task
def success() -> int:
    """
    Mimic a task that handles success
    """
    return 0

@union..workflow
def boolean_wf(seed: int = 5) -> int:
    result = coin_toss(seed=seed)
    return conditional("coin_toss").if_(result.is_true()).then(success()).else_().then(failed())
```

[!NOTE]
> *How do output values acquire these methods?* In a workflow, direct access to outputs is not permitted.
> Inputs and outputs are automatically encapsulated in a special object known as `flytekit.extend.Promise`.

## Using boolean workflow inputs in a conditional
You can directly pass a boolean to a workflow.

```python
@union.workflow
def boolean_input_wf(boolean_input: bool) -> int:
    return conditional("boolean_input_conditional").if_(boolean_input.is_true()).then(success()).else_().then(failed())
```

> [!NOTE]
> Observe that the passed boolean possesses a method called `is_true`.
> This boolean resides within the workflow context and is encapsulated in a specialized Flytekit object.
> This special object enables it to exhibit additional behavior.

You can run the workflows locally as follows:

```python
if __name__ == "__main__":
    print("Running boolean_wf a few times...")
    for index in range(0, 5):
        print(f"The output generated by boolean_wf = {boolean_wf(seed=index)}")
        print(
            f"Boolean input: {True if index < 2 else False}; workflow output: {boolean_input_wf(boolean_input=True if index < 2 else False)}"
        )
```

## Nested conditionals

You can nest conditional sections arbitrarily inside other conditional sections.
However, these nested sections can only be in the `then` part of a `conditional` block.

```python
@union.workflow
def nested_conditions(radius: float) -> float:
    return (
        conditional("nested_conditions")
        .if_((radius >= 0.1) & (radius < 1.0))
        .then(
            conditional("inner_nested_conditions")
            .if_(radius < 0.5)
            .then(calculate_circle_circumference(radius=radius))
            .elif_((radius >= 0.5) & (radius < 0.9))
            .then(calculate_circle_area(radius=radius))
            .else_()
            .fail("0.9 is an outlier.")
        )
        .elif_((radius >= 1.0) & (radius <= 10.0))
        .then(calculate_circle_area(radius=radius))
        .else_()
        .fail("The input must be within the range of 0 to 10.")
    )

if __name__ == "__main__":
    print(f"nested_conditions(0.4): {nested_conditions(radius=0.4)}")
```

## Using the output of a task in a conditional

Let's write a fun workflow that triggers the `calculate_circle_circumference` task in the event of a "heads" outcome,
and alternatively, runs the `calculate_circle_area` task in the event of a "tail" outcome.

```python
@union.workflow
def consume_task_output(radius: float, seed: int = 5) -> float:
    is_heads = coin_toss(seed=seed)
    return (
        conditional("double_or_square")
        .if_(is_heads.is_true())
        .then(calculate_circle_circumference(radius=radius))
        .else_()
        .then(calculate_circle_area(radius=radius))
    )
```

You can run the workflow locally as follows:

```python
if __name__ == "__main__":
    default_seed_output = consume_task_output(radius=0.4)
    print(
        f"Executing consume_task_output(0.4) with default seed=5. Expected output: calculate_circle_area => {default_seed_output}"
    )

    custom_seed_output = consume_task_output(radius=0.4, seed=7)
    print(
        f"Executing consume_task_output(0.4, seed=7). Expected output: calculate_circle_circumference => {custom_seed_output}"
    )
```

## Running a noop task in a conditional

In some cases, you may want to skip the execution of a conditional workflow if a certain condition is not met.
You can achieve this by using the `echo` task, which simply returns the input value.

> [!NOTE]
> To enable the echo plugin in the backend, add the plugin to Flyte's configuration file.
> ```yaml
> task-plugins:
>   enabled-plugins:
>     - echo
> ```

```python
echo = Echo(name="echo", inputs={"radius": float})

@union.workflow
def noop_in_conditional(radius: float, seed: int = 5) -> float:
    is_heads = coin_toss(seed=seed)
    return (
        conditional("noop_in_conditional")
        .if_(is_heads.is_true())
        .then(calculate_circle_circumference(radius=radius))
        .else_()
        .then(echo(radius=radius))
    )
```

## Run the example on the Flyte cluster

To run the provided workflows on the Flyte cluster, use the following commands:

```shell
$ union run --remote \
    https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
    shape_properties --radius 3.0
```

```shell
$ union run --remote \
    https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
    shape_properties_with_multiple_branches --radius 11.0
```

```shell
$ union run --remote \
    https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
    shape_properties_accept_conditional_output --radius 0.5
```

```shell
$ union run --remote \
    https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
    boolean_wf
```

```shell
$ union run --remote \
    https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
    boolean_input_wf --boolean_input
```

```shell
$ union run --remote \
    https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
    nested_conditions --radius 0.7
```

```shell
$ union run --remote \
    https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
    consume_task_output --radius 0.4 --seed 7
```

```shell
$ union run --remote \
    https://raw.githubusercontent.com/flyteorg/flytesnacks/656e63d1c8dded3e9e7161c7af6425e9fcd43f56/examples/advanced_composition/advanced_composition/conditional.py \
    noop_in_conditional --radius 0.4 --seed 5
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/programming/decorating_tasks ===

# Decorating tasks

You can easily change how tasks behave by using decorators to wrap your task functions.

In order to make sure that your decorated function contains all the type annotation and docstring
information that Flyte needs, you will need to use the built-in `functools.wraps` decorator.

To begin, create a file called `decorating_tasks.py`.

Add the imports:

```python
import logging
import union
from functools import partial, wraps
```

Create a logger to monitor the execution's progress.

```python
logger = logging.getLogger(__file__)
```

## Using a single decorator

We define a decorator that logs the input and output details for a decorated task.

```python
def log_io(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.info(f"task {fn.__name__} called with args: {args}, kwargs: {kwargs}")
        out = fn(*args, **kwargs)
        logger.info(f"task {fn.__name__} output: {out}")
        return out

    return wrapper
```

We create a task named `t1` that is decorated with `log_io`.

> [!NOTE]
> The order of invoking the decorators is important. `@task` should always be the outer-most decorator.

```python
@union.task
@log_io
def t1(x: int) -> int:
    return x + 1
```

## Stacking multiple decorators

You can also stack multiple decorators on top of each other as long as `@task` is the outer-most decorator.

We define a decorator that verifies if the output from the decorated function is a positive number before it's returned.
If this assumption is violated, it raises a `ValueError` exception.

```python
def validate_output(fn=None, *, floor=0):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        out = fn(*args, **kwargs)
        if out <= floor:
            raise ValueError(f"output of task {fn.__name__} must be a positive number, found {out}")
        return out

    if fn is None:
        return partial(validate_output, floor=floor)

    return wrapper
```

> [!NOTE]
> The output of the `validate_output` task uses `functools.partial` to implement parameterized decorators.

We define a function that uses both the logging and validator decorators.

```python
@union.task
@log_io
@validate_output(floor=10)
def t2(x: int) -> int:
    return x + 10
```

Finally, we compose a workflow that calls `t1` and `t2`.

```python
@union.workflow
def decorating_task_wf(x: int) -> int:
    return t2(x=t1(x=x))
```

## Run the example on Union.ai

To run the workflow, execute the following command:

```bash
union run --remote decorating_tasks.py decorating_task_wf --x 10
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/programming/decorating_workflows ===

# Decorating workflows

The behavior of workflows can be modified in a lightweight fashion by using the built-in `functools.wraps`
decorator pattern, similar to using decorators to
[customize task behavior](#decorating_tasks). However, unlike in the case of
tasks, we need to do a little extra work to make sure that the DAG underlying the workflow executes tasks in the correct order.

## Setup-teardown pattern

The main use case of decorating `@union.workflow`-decorated functions is to establish a setup-teardown pattern to execute task
before and after your main workflow logic. This is useful when integrating with other external services
like [wandb](https://wandb.ai/site) or [clearml](https://clear.ml/), which enable you to track metrics of model training runs.

To begin, create a file called `decorating_workflows`.

Import the necessary libraries:

```python
from functools import partial, wraps
from unittest.mock import MagicMock

import union
from flytekit import FlyteContextManager
from flytekit.core.node_creation import create_node
```

Let's define the tasks we need for setup and teardown. In this example, we use the
`unittest.mock.MagicMock` class to create a fake external service that we want to initialize at the
beginning of our workflow and finish at the end.

```python
external_service = MagicMock()

@union.task
def setup():
    print("initializing external service")
    external_service.initialize(id=flytekit.current_context().execution_id)

@union.task
def teardown():
    print("finish external service")
    external_service.complete(id=flytekit.current_context().execution_id)
```

As you can see, you can even use Flytekit's current context to access the `execution_id` of the current workflow
if you need to link Flyte with the external service so that you reference the same unique identifier in both the
external service and Flyte.

## Workflow decorator

We create a decorator that we want to use to wrap our workflow function.

```python
def setup_teardown(fn=None, *, before, after):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        # get the current flyte context to obtain access to the compilation state of the workflow DAG.
        ctx = FlyteContextManager.current_context()

        # defines before node
        before_node = create_node(before)
        # ctx.compilation_state.nodes == [before_node]

        # under the hood, flytekit compiler defines and threads
        # together nodes within the `my_workflow` function body
        outputs = fn(*args, **kwargs)
        # ctx.compilation_state.nodes == [before_node, *nodes_created_by_fn]

        # defines the after node
        after_node = create_node(after)
        # ctx.compilation_state.nodes == [before_node, *nodes_created_by_fn, after_node]

        # compile the workflow correctly by making sure `before_node`
        # runs before the first workflow node and `after_node`
        # runs after the last workflow node.
        if ctx.compilation_state is not None:
            # ctx.compilation_state.nodes is a list of nodes defined in the
            # order of execution above
            workflow_node0 = ctx.compilation_state.nodes[1]
            workflow_node1 = ctx.compilation_state.nodes[-2]
            before_node >> workflow_node0
            workflow_node1 >> after_node
        return outputs

    if fn is None:
        return partial(setup_teardown, before=before, after=after)

    return wrapper
```

There are a few key pieces to note in the `setup_teardown` decorator above:

1. It takes a `before` and `after` argument, both of which need to be `@union.task`-decorated functions. These
   tasks will run before and after the main workflow function body.
2. The [create_node](https://github.com/flyteorg/flytekit/blob/9e156bb0cf3d1441c7d1727729e8f9b4bbc3f168/flytekit/core/node_creation.py#L18) function
   to create nodes associated with the `before` and `after` tasks.
3. When `fn` is called, under the hood the system creates all the nodes associated with the workflow function body
4. The code within the `if ctx.compilation_state is not None:` conditional is executed at compile time, which
   is where we extract the first and last nodes associated with the workflow function body at index `1` and `-2`.
5. The `>>` right shift operator ensures that `before_node` executes before the
   first node and `after_node` executes after the last node of the main workflow function body.

## Defining the DAG

We define two tasks that will constitute the workflow.

```python
@union.task
def t1(x: float) -> float:
    return x - 1

@union.task
def t2(x: float) -> float:
    return x**2
```

And then create our decorated workflow:

```python
@union.workflow
@setup_teardown(before=setup, after=teardown)
def decorating_workflow(x: float) -> float:
    return t2(x=t1(x=x))

```

## Run the example on the Flyte cluster

To run the provided workflow on the Flyte cluster, use the following command:

```bash
union run --remote decorating_workflows.py decorating_workflow --x 10.0
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/programming/intratask_checkpoints ===

# Intratask checkpoints

A checkpoint in Flyte serves to recover a task from a previous failure by preserving the task's state before the failure
and resuming from the latest recorded state.

## Why intratask checkpoints?

The inherent design of Flyte, being a workflow engine, allows users to break down operations, programs or ideas
into smaller tasks within workflows. In the event of a task failure, the workflow doesn't need to rerun the
previously completed tasks. Instead, it can retry the specific task that encountered an issue.
Once the problematic task succeeds, it won't be rerun. Consequently, the natural boundaries between tasks act as implicit checkpoints.

However, there are scenarios where breaking a task into smaller tasks is either challenging or undesirable due to the associated overhead.
This is especially true when running a substantial computation in a tight loop.
In such cases, users may consider splitting each loop iteration into individual tasks using dynamic workflows.
Yet, the overhead of spawning new tasks, recording intermediate results, and reconstructing the state can incur additional expenses.

### Use case: Model training

An exemplary scenario illustrating the utility of intra-task checkpointing is during model training.
In situations where executing multiple epochs or iterations with the same dataset might be time-consuming,
setting task boundaries can incur a high bootstrap time and be costly.

Flyte addresses this challenge by providing a mechanism to checkpoint progress within a task execution,
saving it as a file or set of files. In the event of a failure, the checkpoint file can be re-read to
resume most of the state without rerunning the entire task.
This feature opens up possibilities to leverage alternate, more cost-effective compute systems,
such as [AWS spot instances](https://aws.amazon.com/ec2/spot/),
[GCP pre-emptible instances](https://cloud.google.com/compute/docs/instances/preemptible) and others.

These instances offer great performance at significantly lower price points compared to their on-demand or reserved counterparts.
This becomes feasible when tasks are constructed in a fault-tolerant manner.
For tasks running within a short duration, e.g., less than 10 minutes, the likelihood of failure is negligible,
and task-boundary-based recovery provides substantial fault tolerance for successful completion.

However, as the task execution time increases, the cost of re-running it also increases,
reducing the chances of successful completion. This is precisely where Flyte's intra-task checkpointing proves to be highly beneficial.

Here's an example illustrating how to develop tasks that leverage intra-task checkpointing.
It's important to note that Flyte currently offers the low-level API for checkpointing.
Future integrations aim to incorporate higher-level checkpointing APIs from popular training frameworks
like Keras, PyTorch, Scikit-learn, and big-data frameworks such as Spark and Flink, enhancing their fault-tolerance capabilities.

Create a file called `checkpoint.py`:

Import the required libraries:

```python
import union
from flytekit.exceptions.user import FlyteRecoverableException

RETRIES = 3
```

We define a task to iterate precisely `n_iterations`, checkpoint its state, and recover from simulated failures:

```python
# Define a task to iterate precisely `n_iterations`, checkpoint its state, and recover from simulated failures.
@union.task(retries=RETRIES)
def use_checkpoint(n_iterations: int) -> int:
    cp = union.current_context().checkpoint
    prev = cp.read()

    start = 0
    if prev:
        start = int(prev.decode())

    # Create a failure interval to simulate failures across 'n' iterations and then succeed after configured retries
    failure_interval = n_iterations // RETRIES
    index = 0
    for index in range(start, n_iterations):
        # Simulate a deterministic failure for demonstration. Showcasing how it eventually completes within the given retries
        if index > start and index % failure_interval == 0:
            raise FlyteRecoverableException(f"Failed at iteration {index}, failure_interval {failure_interval}.")
        # Save progress state. It is also entirely possible to save state every few intervals
        cp.write(f"{index + 1}".encode())
    return index
```

The checkpoint system offers additional APIs. The code can be found at
[checkpointer code](https://github.com/flyteorg/flytekit/blob/master/flytekit/core/checkpointer.py).

Create a workflow that invokes the task:
The task will automatically undergo retries in the event of a [FlyteRecoverableException](https://www.union.ai/docs/v1/union/api-reference/flytekit-sdk/packages/flytekit.exceptions.base)

```python
@union.workflow
def checkpointing_example(n_iterations: int) -> int:
    return use_checkpoint(n_iterations=n_iterations)
```

The local checkpoint is not utilized here because retries are not supported:

```python
if __name__ == "__main__":
    try:
        checkpointing_example(n_iterations=10)
    except RuntimeError as e:  # noqa : F841
        # Since no retries are performed, an exception is expected when run locally
        pass
```

## Run the example on the Flyte cluster

To run the provided workflow on the Flyte cluster, use the following command:

```bash
pyflyte run --remote \
  https://raw.githubusercontent.com/flyteorg/flytesnacks/69dbe4840031a85d79d9ded25f80397c6834752d/examples/advanced_composition/advanced_composition/checkpoint.py \
  checkpointing_example --n_iterations 10
```

```bash
union run --remote checkpoint.py checkpointing_example --n_iterations 10
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/programming/waiting_for_external_inputs ===

# Waiting for external inputs

There are use cases where you may want a workflow execution to pause, only to continue
when some time has passed or when it receives some inputs that are external to
the workflow execution inputs. You can think of these as execution-time inputs,
since they need to be supplied to the workflow after it's launched. Examples of
this use case would be:

1. **Model Deployment**: A hyperparameter-tuning workflow that
   trains `n` models, where a human needs to inspect a report before approving
   the model for downstream deployment to some serving layer.
2. **Data Labeling**: A workflow that iterates through an image dataset,
   presenting individual images to a human annotator for them to label.
3. **Active Learning**: An [active learning](https://en.wikipedia.org/wiki/Active_learning_(machine_learning))
   workflow that trains a model, shows examples for a human annotator to label
   based on which examples it's least/most certain about or would provide the most
   information to the model.

These use cases can be achieved in Flyte with the `flytekit.sleep`,
`flytekit.wait_for_input`, and `flytekit.approve` workflow nodes.
Although all of the examples above are human-in-the-loop processes, these
constructs allow you to pass inputs into a workflow from some arbitrary external
process (human or machine) in order to continue.

> [!NOTE]
> These functions can only be used inside `@union.workflow`-decorated
> functions, `@union.dynamic`-decorated functions, or
> imperative workflows.

## Pause executions with the `sleep` node

The simplest case is when you want your workflow to `flytekit.sleep`
for some specified amount of time before continuing.

Though this type of node may not be used often in a production setting,
you might want to use it, for example, if you want to simulate a delay in
your workflow to mock out the behavior of some long-running computation.

```python
from datetime import timedelta

import union
from flytekit import sleep

@union.task
def long_running_computation(num: int) -> int:
    """A mock task pretending to be a long-running computation."""
    return num

@union.workflow
def sleep_wf(num: int) -> int:
    """Simulate a "long-running" computation with sleep."""

    # increase the sleep duration to actually make it long-running
    sleeping = sleep(timedelta(seconds=10))
    result = long_running_computation(num=num)
    sleeping >> result
    return result
```

As you can see above, we define a simple `add_one` task and a `sleep_wf`
workflow. We first create a `sleeping` and `result` node, then
order the dependencies with the `>>` operator such that the workflow sleeps
for 10 seconds before kicking off the `result` computation. Finally, we
return the `result`.

> [!NOTE]
> You can learn more about the `>>` chaining operator [here](./chaining-entities).

Now that you have a general sense of how this works, let's move onto the
`flytekit.wait_for_input` workflow node.

## Supply external inputs with `wait_for_input`

With the `flytekit.wait_for_input` node, you can pause a
workflow execution that requires some external input signal. For example,
suppose that you have a workflow that publishes an automated analytics report,
but before publishing it you want to give it a custom title. You can achieve
this by defining a `wait_for_input` node that takes a `str` input and
finalizes the report:

```python
import typing

from flytekit import wait_for_input

@union.task
def create_report(data: typing.List[float]) -> dict:  # o0
    """A toy report task."""
    return {
        "mean": sum(data) / len(data),
        "length": len(data),
        "max": max(data),
        "min": min(data),
    }

@union.task
def finalize_report(report: dict, title: str) -> dict:
    return {"title": title, **report}

@union.workflow
def reporting_wf(data: typing.List[float]) -> dict:
    report = create_report(data=data)
    title_input = wait_for_input("title", timeout=timedelta(hours=1), expected_type=str)
    return finalize_report(report=report, title=title_input)
```

Let's break down what's happening in the code above:

- In `reporting_wf` we first create the raw `report`.
- Then, we define a `title` node that will wait for a string to be provided
  through the Flyte API, which can be done through the Flyte UI or through
  `FlyteRemote` (more on that later). This node will time out after 1 hour.
- Finally, we pass the `title_input` promise into `finalize_report`, which
  attaches the custom title to the report.

> [!NOTE]
> The `create_report` task is just a toy example. In a realistic example, this
> report might be an HTML file or set of visualizations. This can be rendered
> in the Flyte UI with [Flyte Decks](https://www.union.ai/docs/v1/union/user-guide/development-cycle/decks).

As mentioned in the beginning of this page, this construct can be used for
selecting the best-performing model in cases where there isn't a clear single
metric to determine the best model, or if you're doing data labeling using
a Flyte workflow.

## Continue executions with `approve`

Finally, the `flytekit.approve` workflow node allows you to wait on
an explicit approval signal before continuing execution. Going back to our
report-publishing use case, suppose that we want to block the publishing of
a report for some reason (e.g. if they don't appear to be valid):

```python
from flytekit import approve

@union.workflow
def reporting_with_approval_wf(data: typing.List[float]) -> dict:
    report = create_report(data=data)
    title_input = wait_for_input("title", timeout=timedelta(hours=1), expected_type=str)
    final_report = finalize_report(report=report, title=title_input)

    # approve the final report, where the output of approve is the final_report
    # dictionary.
    return approve(final_report, "approve-final-report", timeout=timedelta(hours=2))
```

The `approve` node will pass the `final_report` promise through as the
output of the workflow, provided that the `approve-final-report` gets an
approval input via the Flyte UI or Flyte API.

You can also use the output of the `approve` function as a promise, feeding
it to a subsequent task. Let's create a version of our report-publishing
workflow where the approval happens after `create_report`:

```python
@union.workflow
def approval_as_promise_wf(data: typing.List[float]) -> dict:
    report = create_report(data=data)
    title_input = wait_for_input("title", timeout=timedelta(hours=1), expected_type=str)

    # wait for report to run so that the user can view it before adding a custom
    # title to the report
    report >> title_input

    final_report = finalize_report(
        report=approve(report, "raw-report-approval", timeout=timedelta(hours=2)),
        title=title_input,
    )
    return final_report
```

## Working with conditionals

The node constructs by themselves are useful, but they become even more
useful when we combine them with other Flyte constructs, like [conditionals](./conditionals).

To illustrate this, let's extend the report-publishing use case so that we
produce an "invalid report" output in case we don't approve the final report:

```python
from flytekit import conditional

@union.task
def invalid_report() -> dict:
    return {"invalid_report": True}

@union.workflow
def conditional_wf(data: typing.List[float]) -> dict:
    report = create_report(data=data)
    title_input = wait_for_input("title-input", timeout=timedelta(hours=1), expected_type=str)

    # Define a "review-passes" wait_for_input node so that a human can review
    # the report before finalizing it.
    review_passed = wait_for_input("review-passes", timeout=timedelta(hours=2), expected_type=bool)
    report >> review_passed

    # This conditional returns the finalized report if the review passes,
    # otherwise it returns an invalid report output.
    return (
        conditional("final-report-condition")
        .if_(review_passed.is_true())
        .then(finalize_report(report=report, title=title_input))
        .else_()
        .then(invalid_report())
    )
```

On top of the `approved` node, which we use in the `conditional` to
determine which branch to execute, we also define a `disapprove_reason`
gate node, which will be used as an input to the `invalid_report` task.

## Sending inputs to `wait_for_input` and `approve` nodes

Assuming that you've registered the above workflows on a Flyte cluster that's
been started with [flytectl demo start](#getting_started_running_workflow_local_cluster),
there are two ways of using `wait_for_input` and `approve` nodes:

### Using the Flyte UI

If you launch the `reporting_wf` workflow on the Flyte UI, you'll see a
**Graph** view of the workflow execution like this:

![Reporting workflow wait for input graph](https://www.union.ai/docs/v1/union/_static/images/user-guide/programming/waiting-for-external-inputs/wait-for-input-graph.png)

Clicking on the play-circle icon of the `title` task node or the
**Resume** button on the sidebar will create a modal form that you can use to
provide the custom title input.

![Reporting workflow wait for input form](https://www.union.ai/docs/v1/union/_static/images/user-guide/programming/waiting-for-external-inputs/wait-for-input-form.png)

### Using `FlyteRemote`

For many cases it's enough to use Flyte UI to provide inputs/approvals on
gate nodes. However, if you want to pass inputs to `wait_for_input` and
`approve` nodes programmatically, you can use the
`FlyteRemote.set_signal` method. Using the `gate_node_with_conditional_wf` workflow, the example
below allows you to set values for `title-input` and `review-passes` nodes.

```python
import typing
from flytekit.remote.remote import FlyteRemote
from flytekit.configuration import Config

remote = FlyteRemote(
    Config.for_sandbox(),
    default_project="flytesnacks",
    default_domain="development",
)

# First kick off the workflow
flyte_workflow = remote.fetch_workflow(
    name="core.control_flow.waiting_for_external_inputs.conditional_wf"
)

# Execute the workflow
execution = remote.execute(flyte_workflow, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})

# Get a list of signals available for the execution
signals = remote.list_signals(execution.id.name)

# Set a signal value for the "title" node. Make sure that the "title-input"
# node is in the `signals` list above
remote.set_signal("title-input", execution.id.name, "my report")

# Set signal value for the "review-passes" node. Make sure that the "review-passes"
# node is in the `signals` list above
remote.set_signal("review-passes", execution.id.name, True)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/programming/nested-parallelism ===

# Nested parallelism

For exceptionally large or complicated workflows that can’t be adequately implemented as dynamic workflows or map tasks, it can be beneficial to have multiple levels of workflow parallelization.

This is useful for multiple reasons:

- Better code organization
- Better code reuse
- Better testing
- Better debugging
- Better monitoring, since each subworkflow can be run independently and monitored independently
- Better performance and scale, since each subworkflow is executed as a separate workflow and thus can be distributed among different propeller workers and shards. This allows for better parallelism and scale.

## Nested dynamic workflows
You can use nested dynamic workflows to break down a large workflow into smaller workflows and then compose them together to form a hierarchy. In this example, a top-level workflow uses two levels of dynamic workflows to process a list through some simple addition tasks and then flatten the list again.

### Example code

```python
"""
A core workflow parallelized as six items with a chunk size of two will be structured as follows:

multi_wf -> level1 -> level2 -> core_wf -> step1 -> step2
                             -> core_wf -> step1 -> step2
                      level2 -> core_wf -> step1 -> step2
                             -> core_wf -> step1 -> step2
                      level2 -> core_wf -> step1 -> step2
                             -> core_wf -> step1 -> step2
"""

import union

@union.task
def step1(a: int) -> int:
    return a + 1

@union.task
def step2(a: int) -> int:
    return a + 2

@union.workflow
def core_wf(a: int) -> int:
    return step2(a=step1(a=a))

core_wf_lp = union.LaunchPlan.get_or_create(core_wf)

@union.dynamic
def level2(l: list[int]) -> list[int]:
    return [core_wf_lp(a=a) for a in l]

@union.task
def reduce(l: list[list[int]]) -> list[int]:
    f = []
    for i in l:
        f.extend(i)
    return f

@union.dynamic
def level1(l: list[int], chunk: int) -> list[int]:
    v = []
    for i in range(0, len(l), chunk):
        v.append(level2(l=l[i:i + chunk]))
    return reduce(l=v)

@union.workflow
def multi_wf(l: list[int], chunk: int) -> list[int]:
    return level1(l=l, chunk=chunk)
```

Overrides let you add additional arguments to the launch plan you are looping over in the dynamic. Here we add caching:

```python
@union.task
def increment(num: int) -> int:
    return num + 1

@union.workflow
def child(num: int) -> int:
    return increment(num=num)

child_lp = union.LaunchPlan.get_or_create(child)

@union.dynamic
def spawn(n: int) -> list[int]:
    l = []
    for i in [1,2,3,4,5]:
        l.append(child_lp(num=i).with_overrides(cache=True, cache_version="1.0.0"))

    # you can also pass l to another task if you want
    return l
```

## Mixed parallelism
This example is similar to nested dynamic workflows, but instead of using a dynamic workflow to parallelize a core workflow with serial tasks, we use a core workflow to call a map task, which processes both inputs in parallel. This workflow has one less layer of parallelism, so the outputs won’t be the same as those of the nested parallelization example, but it does still demonstrate how you can mix these different approaches to achieve concurrency.

### Example code

```python
"""

A core workflow parallelized as six items with a chunk size of two will be structured as follows:
multi_wf -> level1 -> level2 -> mappable
                             -> mappable
                      level2 -> mappable
                             -> mappable
                      level2 -> mappable
                             -> mappable
"""

import union

@union.task
def mappable(a: int) -> int:
    return a + 2

@union.workflow
def level2(l: list[int]) -> list[int]:
    return union.map(mappable)(a=l)

@union.task
def reduce(l: list[list[int]]) -> list[int]:
    f = []
    for i in l:
        f.extend(i)
    return f

@union.dynamic
def level1(l: list[int], chunk: int) -> list[int]:
    v = []
    for i in range(0, len(l), chunk):
        v.append(level2(l=l[i : i + chunk]))
    return reduce(l=v)

@union.workflow
def multi_wf(l: list[int], chunk: int) -> list[int]:
    return level1(l=l, chunk=chunk)
```

## Design considerations

While you can nest even further if needed, or incorporate map tasks if your inputs are all the same type, the design of your workflow should be informed by the actual data you’re processing. For example, if you have a big library of music from which you’d like to extract the lyrics, the first level could loop through all the albums, and the second level could process each song.

If you’re just processing an enormous list of the same input, it’s best to keep your code simple and let the scheduler handle optimizing the execution. Additionally, unless you need dynamic workflow features like mixing and matching inputs and outputs, it’s usually most efficient to use a map task, which has the added benefit of keeping the UI clean.

You can also choose to limit the scale of parallel execution at a few levels. The max_parallelism attribute can be applied at the workflow level and will limit the number of parallel tasks being executed. (This is set to 25 by default.) Within map tasks, you can specify a concurrency argument, which will limit the number of mapped tasks that can run in parallel at any given time.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/programming/failure-node ===

# Failure node

The failure node feature enables you to designate a specific node to execute in the event of a failure within your workflow.

For example, a workflow involves creating a cluster at the beginning, followed by the execution of tasks, and concludes with the deletion of the cluster once all tasks are completed.
However, if any task within the workflow encounters an error, the system will abort the entire workflow and won’t delete the cluster.
This poses a challenge if you still need to clean up the cluster even in a task failure.

To address this issue, you can add a failure node into your workflow.
This ensures that critical actions, such as deleting the cluster, are executed even in the event of failures occurring throughout the workflow execution.

```python
import typing

import union
from flytekit import WorkflowFailurePolicy
from flytekit.types.error.error import FlyteError

@union.task
def create_cluster(name: str):
    print(f"Creating cluster: {name}")
```

Create a task that will fail during execution:

```python

# Create a task that will fail during execution
@union.task
def t1(a: int, b: str):
    print(f"{a} {b}")
    raise ValueError("Fail!")
```

Create a task that will be executed if any of the tasks in the workflow fail:

```python
@union.task
def clean_up(name: str, err: typing.Optional[FlyteError] = None):
    print(f"Deleting cluster {name} due to {err}")
```

Specify the `on_failure` to a cleanup task. This task will be executed if any of the tasks in the workflow fail.

The inputs of `clean_up` must exactly match the workflow’s inputs.
Additionally, the `err` parameter will be populated with the error message encountered during execution.

```python
@union.workflow
def wf(a: int, b: str):
    create_cluster(name=f"cluster-{a}")
    t1(a=a, b=b)
```

By setting the failure policy to `FAIL_AFTER_EXECUTABLE_NODES_COMPLETE` to ensure that the `wf1` is executed even if the subworkflow fails.
In this case, both parent and child workflows will fail, resulting in the `clean_up` task being executed twice:

```python
# In this case, both parent and child workflows will fail,
# resulting in the `clean_up` task being executed twice.
@union.workflow(on_failure=clean_up, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf1(name: str = "my_cluster"):
    c = create_cluster(name=name)
    subwf(name="another_cluster")
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d
```

You can also set the `on_failure` to a workflow.
This workflow will be executed if any of the tasks in the workflow fail:

```python
@union.workflow(on_failure=clean_up_wf)
def wf2(name: str = "my_cluster"):
    c = create_cluster(name=name)
    t = t1(a=1, b="2")
    d = delete_cluster(name=name)
    c >> t >> d
```

