# Core concepts
> This bundle contains all pages in the Core concepts section.
> Source: https://www.union.ai/docs/v1/union/user-guide/core-concepts/

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts ===

# Core concepts

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

Union.ai is a platform for building and orchestrating the execution of interconnected software processes across machines in a computer cluster.
In Union.ai terminology, the software processes are called *tasks* and the overall organization of connections between tasks is called a *workflow*.
The tasks in a workflow are connected to each other by their inputs and outputs. The output of one task becomes the input of another.

More precisely, a workflow in Union.ai is a *directed acyclic graph (DAG)* of *nodes* where each node is a unit of execution and the edges between nodes represent the flow of data between them.
The most common type of node is a task node (which encapsulates a task), though there are also workflow nodes (which encapsulate subworkflows) and branch nodes.
In most contexts we just say that a workflow is a DAG of tasks.

You define tasks and workflows in Python using the Union SDK. The Union SDK provides a set of decorators and classes that allow you to define tasks and workflows in a way that is easy to understand and work with.
Once defined, tasks and workflows are deployed to your Union.ai instance (we say they are *registered* to the instance), where they are compiled into a form that can be executed on your Union.ai cluster.

In addition to tasks and workflows, another important concept in Union.ai is the **Core concepts > Launch plans**.
A launch plan is like a template that can be used to define the inputs to a workflow.
Triggering a launch plan will launch its associated workflow with the specified parameters.

## Defining tasks and workflows

Using the Union SDK, tasks and workflows are defined as Python functions using the `@union.task` and `@union.workflow` decorators, respectively:

```python
import union

@union.task
def task_1(a: int, b: int, c: int) -> int:
    return a + b + c

@union.task
def task_2(m: int, n: int) -> int:
    return m * n

@union.task
def task_3(x: int, y: int) -> int:
    return x - y

@union.workflow
def my_workflow(a: int, b: int, c: int, m: int, n: int) -> int:
    x = task_1(a=a, b=b, c=c)
    y = task_2(m=m, n=n)
    return task_3(x=x, y=y)
```

Here we see three tasks defined using the `@union.task` decorator and a workflow defined using the `@union.workflow` decorator.
The workflow calls `task_1` and `task_2` and passes the results to `task_3` before finally outputting the result of `task_3`.

When the workflow is registered, Union.ai compiles the workflow into a directed acyclic graph (DAG) based on the input/output dependencies between the tasks.
The DAG is then used to execute the tasks in the correct order, taking advantage of any parallelism that is possible.
For example, the workflow above results in the following DAG:

![Workflow DAG](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workflow-dag.png)

### Type annotation is required

One important difference between Union.ai and generic Python is that in Union.ai all inputs and outputs *must be type annotated*.
This is because tasks are strongly typed, meaning that the types of the inputs and outputs are validated at deployment time.

See **Core concepts > Tasks** for more details.

### Workflows *are not* full Python functions

The definition of a workflow must be a valid Python function, so it can be run locally as a normal Python function during development,
but only *a subset of Python syntax is allowed*, because it must also be compiled into a DAG that is deployed and executed on Union.ai.

*Technically then, the language of a workflow function is a domain-specific language (DSL) that is a subset of Python.*

See **Core concepts > Workflows** for more details.

## Registering tasks and workflows

### Registering on the command line with `union` or `uctl`

In most cases, workflows and tasks (and possibly other things, such as launch plans) are defined in your project code and registered as a bundle using `union` or `uctl` For example:

```shell
$ union register ./workflows --project my_project --domain development
```

Tasks can also be registered individually, but it is more common to register alongside the workflow that uses them.

See [Running your code](https://www.union.ai/docs/v1/union/user-guide/development-cycle/running-your-code/page.md).

### Registering in Python with `UnionRemote`

As with all Union.ai command line actions, you can also perform registration of workflows and tasks programmatically with [`UnionRemote`](), specifically, [`UnionRemote.register_script`](),
[`UnionRemote.register_workflow`](), and
[`UnionRemote.register_task`]().
<!-- TODO: Add link to API -->

## Results of registration

When the code above is registered to Union.ai, it results in the creation of five objects:

* The tasks `workflows.my_example.task_1`, `workflows.my_example.task_2`, and `workflows.my_example.task_3` (see **Core concepts > Tasks** for more details).
* The workflow `workflows.my_example.my_workflow`.
* The default launch plan `workflows.my_example.my_workflow` (see **Core concepts > Launch plans** for more details).

Notice that the task and workflow names are derived from the path, file name and function name of the Python code that defines them: `<folder>.<file>.<function>`.
The default launch plan for a workflow always has the same name as its workflow.

## Changing tasks and workflows

Tasks and workflows are changed by altering their definition in code and re-registering.
When a task or workflow with the same project, domain, and name as a preexisting one is re-registered, a new version of that entity is created.

## Inspecting tasks and workflows

### Inspecting workflows in the UI

Select **Workflows** in the sidebar to display a list of all the registered workflows in the project and domain.
You can search the workflows by name.

Click on a workflow in the list to see the **workflow view**.
The sections in this view are as follows:

* **Recent Workflow Versions**: A list of recent versions of this workflow.
  Select a version to see the **Workflow version view**.
  This view shows the DAG and a list of all version of the task.
  You can switch between versions with the radio buttons.

* **All Executions in the Workflow**: A list of all executions of this workflow.
  Click on an execution to go to the **Core concepts > Workflows > Viewing workflow executions**.

* **Launch Workflow button**: In the top right of the workflow view, you can click the **Launch Workflow** button to run the workflow with the default inputs.

### Inspecting tasks in the UI

Select **Tasks** in the sidebar to display a list of all the registered tasks in the project and domain.
You can search the launch plans by name.
To filter for only those that are archived, check the **Show Only Archived Tasks** box.

Click on a task in the list to see the task view
The sections in the task view are as follows:

* **Inputs & Outputs**: The name and type of each input and output for the latest version of this task.

* **Recent Task Versions**: A list of recent versions of this task.
  Select a version to see the **Task version view**:
  This view shows the task details and a list of all version of the task.
  You can switch between versions with the radio buttons.
  See **Core concepts > Tasks** for more information.

* **All Executions in the Task**: A list of all executions of this task.
  Click on an execution to go to the execution view.

* **Launch Task button**: In the top right of the task view, you can click the **Launch Task** button to run the task with the default inputs.

### Inspecting workflows on the command line with `uctl`

To view all tasks within a project and domain:

```shell
$ uctl get workflows \
       --project <project-id> \
       --domain <domain>
```

To view a specific workflow:

```shell
$ uctl get workflow \
       --project <project-id> \
       --domain <domain> \
       <workflow-name>
       <workflow-version>
```

See [Uctl CLI](https://www.union.ai/docs/v1/union/api-reference/uctl-cli/page.md) for more details.

### Inspecting tasks on the command line with `uctl`

To view all tasks within a project and domain:

```shell
$ uctl get tasks \
       --project <project-id> \
       --domain <domain>
```

To view a specific task:

```shell
$ uctl get task \
       --project <project-id> \
       --domain <domain> \
       <task-name>
       <task-version>
```

See [Uctl CLI](https://www.union.ai/docs/v1/union/api-reference/uctl-cli/page.md) for more details.

### Inspecting tasks and workflows in Python with `UnionRemote`

Use the method [`UnionRemote.fetch_workflow`]() or [`UnionRemote.client.get_workflow`]() to get a workflow.
See [`UnionRemote`]() for more options and details.

Use the method [`UnionRemote.fetch_task`]() or [`UnionRemote.client.get_task`]() to get a task.
See [`UnionRemote`]() for more options and details.
<!-- TODO: Add links to API -->

## Running tasks and workflows

### Running a task or workflow in the UI

To run a workflow in the UI, click the **Launch Workflow** button in the workflow view.

You can also run individual tasks in the UI by clicking the **Launch Task** button in the task view.

### Running a task or workflow locally on the command line with `union` or `python`

You can execute a Union.ai workflow or task locally simply by calling it just like any regular Python function.
For example, you can add the following to the above code:

```python
if __name__ == "__main__":
    my_workflow(a=1, b=2, c=3, m=4, n=5)
```

If the file is saved as `my_example.py`, you can run it locally using the following command:

```shell
$ python my_example.py
```

Alternatively, you can run the task locally with the `union` command line tool:

To run it locally, you can use the following `union run` command:

```shell
$ union run my_example.py my_workflow --a 1 --b 2 --c 3 --m 4 --n 5
```

This has the advantage of allowing you to specify the input values as command line arguments.
For more details on running workflows and tasks, see [Development cycle](https://www.union.ai/docs/v1/union/user-guide/development-cycle/page.md).

### Running a task or workflow remotely on the command line with `union`

To run a workflow remotely on your Union.ai installation, use the following command (this assumes that you have your [UNION_CONFIG set up correctly](https://www.union.ai/docs/v1/union/user-guide/development-cycle/setting-up-a-project/page.md)):

```shell
$  union run --remote my_example.py my_workflow --a 1 --b 2 --c 3 --m 4 --n 5
```

### Running a task or workflow remotely in Python with `UnionRemote`

To run a workflow or task remotely in Python, use the method [`UnionRemote.execute`](). See [`UnionRemote`]() for more options and details.

<!-- TODO: Add links to API -->

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows ===

# Workflows

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

So far in our discussion of workflows, we have focused on top-level workflows decorated with `@union.workflow`.
These are, in fact, more accurately termed **Core concepts > Workflows > Standard workflows** to differentiate them from the other types of workflows that exist in Union.ai: **Core concepts > Workflows > Subworkflows and sub-launch plans**, **Core concepts > Workflows > Dynamic workflows**, and **Core concepts > Workflows > Imperative workflows**.
In this section, we will delve deeper into the fundamentals of all of these workflow types, including their syntax, structure, and behavior.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/standard-workflows ===

# Standard workflows

A standard workflow is defined by a Python function decorated with the `@union.workflow` decorator.
The function is written in a domain specific language (DSL), a subset of Python syntax that describes the directed acyclic graph (DAG) that is deployed and executed on Union.ai.
The syntax of a standard workflow definition can only include the following:

* Calls to functions decorated with `@union.task` and assignment of variables to the returned values.
* Calls to other functions decorated with `@union.workflow` and assignment of variables to the returned values (see [Subworkflows](./subworkflows-and-sub-launch-plans)).
* Calls to [`LaunchPlan` objects](../launch-plans) (see [When to use sub-launch plans](./subworkflows-and-sub-launch-plans#when-to-use-sub-launch-plans))
* Calls to functions decorated with `@union.dynamic` and assignment of variables to the returned values (see [Dynamic workflows](./dynamic-workflows)).
* The special [`conditional` construct](https://www.union.ai/docs/v1/union/user-guide/programming/conditionals).
* Statements using the [chaining operator `>>`](https://www.union.ai/docs/v1/union/user-guide/programming/chaining-entities).

## Evaluation of a standard workflow

When a standard workflow is [run locally in a Python environment](https://www.union.ai/docs/v1/union/user-guide/development-cycle/running-your-code) it is executed as a normal Python function.
However, when it is registered to Union.ai, the top level `@union.workflow`-decorated function is evaluated as follows:

* Inputs to the workflow are materialized as lazily-evaluated promises which are propagated to downstream tasks and subworkflows.
* All values returned by calls to functions decorated with `@union.task` or `@union.dynamic` are also materialized as lazily-evaluated promises.

The resulting structure is used to construct the Directed Acyclic Graph (DAG) and deploy the required containers to the cluster.
The actual evaluation of these promises occurs when the tasks (or dynamic workflows) are executed in their respective containers.

## Conditional construct

Because standard workflows cannot directly include Python `if` statements, a special `conditional` construct is provided that allows you to define conditional logic in a workflow.
For details, see [Conditionals](https://www.union.ai/docs/v1/union/user-guide/programming/conditionals).
<!-- TODO: Add link to API -->

## Chaining operator

When Union.ai builds the DAG for a standard workflow, it uses the passing of values from one task to another to determine the dependency relationships between tasks.

There may be cases where you want to define a dependency between two tasks that is not based on the output of one task being passed as an input to another.
In that case, you can use the chaining operator `>>` to define the dependencies between tasks.
For details, see [Chaining Union.ai entities](https://www.union.ai/docs/v1/union/user-guide/programming/chaining-entities).

## Workflow decorator parameters

The `@union.workflow` decorator can take the following parameters:

* `failure_policy`: Use the options in [`flytekit.WorkflowFailurePolicy`](https://www.union.ai/docs/v1/union/api-reference/flytekit-sdk).
<!-- TODO: Add link to API -->

* `interruptible`: Indicates if tasks launched from this workflow are interruptible by default. See [Interruptible instances](../tasks/task-hardware-environment/interruptible-instances).

* `on_failure`: Invoke this workflow or task on failure. The workflow specified must have the same parameter signature as the current workflow, with an additional parameter called `error`.

* `docs`: A description entity for the workflow.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/subworkflows-and-sub-launch-plans ===

# Subworkflows and sub-launch plans

In Union.ai it is possible to invoke one workflow from within another.
A parent workflow can invoke a child workflow in two ways: as a **subworkflow** or via a [**sub-launch plan**](../launch-plans/running-launch-plans#sub-launch-plans).

In both cases the child workflow is defined and registered normally, exists in the system normally, and can be run independently.

But, if the child workflow is invoked from within the parent **by directly calling the child's function**, then it becomes a **subworkflow**.
The DAG of the subworkflow is embedded directly into the DAG of the parent and effectively become part of the parent workflow execution, sharing the same execution ID and execution context.

On the other hand, if the child workflow is invoked from within the parent [**by calling the child's launch plan**](../launch-plans), this is called a **sub-launch plan**. It results in a new top-level workflow execution being invoked with its own execution ID and execution context.
It also appears as a separate top-level entity in the system.
The only difference is that it happens to have been kicked off from within another workflow instead of from the command line or the UI.

Here is an example:

```python
import union

@union.workflow
def sub_wf(a: int, b: int) -> int:
    return t(a=a, b=b)

# Get the default launch plan of sub_wf, which we name sub_wf_lp
sub_wf_lp = union.LaunchPlan.get_or_create(sub_wf)

@union.workflow
def main_wf():
    # Invoke sub_wf directly.
    # An embedded subworkflow results.
    sub_wf(a=3, b=4)

    # Invoke sub_wf through its default launch plan, here called sub_wf_lp
    # An independent subworkflow results.
    sub_wf_lp(a=1, b=2)
```

## When to use subworkflows

Subworkflows allow you to manage parallelism between a workflow and its launched sub-flows, as they execute within the same context as the parent workflow.
Consequently, all nodes of a subworkflow adhere to the overall constraints imposed by the parent workflow.

<!-- TODO: a diagram of the above example. -->

Here's an example illustrating the calculation of slope, intercept and the corresponding y-value.

```python
import union

@union.task
def slope(x: list[int], y: list[int]) -> float:
    sum_xy = sum([x[i] * y[i] for i in range(len(x))])
    sum_x_squared = sum([x[i] ** 2 for i in range(len(x))])
    n = len(x)
    return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2)

@union.task
def intercept(x: list[int], y: list[int], slope: float) -> float:
    mean_x = sum(x) / len(x)
    mean_y = sum(y) / len(y)
    intercept = mean_y - slope * mean_x
    return intercept

@union.workflow
def slope_intercept_wf(x: list[int], y: list[int]) -> (float, float):
    slope_value = slope(x=x, y=y)
    intercept_value = intercept(x=x, y=y, slope=slope_value)
    return (slope_value, intercept_value)

@union.task
def regression_line(val: int, slope_value: float, intercept_value: float) -> float:
    return (slope_value * val) + intercept_value  # y = mx + c

@union.workflow
def regression_line_wf(val: int = 5, x: list[int] = [-3, 0, 3], y: list[int] = [7, 4, -2]) -> float:
    slope_value, intercept_value = slope_intercept_wf(x=x, y=y)
    return regression_line(val=val, slope_value=slope_value, intercept_value=intercept_value)
```

The `slope_intercept_wf` computes the slope and intercept of the regression line.
Subsequently, the `regression_line_wf` triggers `slope_intercept_wf` and then computes the y-value.

It is possible to nest a workflow that contains a subworkflow within yet another workflow.
Workflows can be easily constructed from other workflows, even if they also function as standalone entities.
For example, each workflow in the example below has the capability to exist and run independently:

```python
import union

@union.workflow
def nested_regression_line_wf() -> float:
    return regression_line_wf()
```

## When to use sub-launch plans

Sub-launch plans can be useful for implementing exceptionally large or complicated workflows that can’t be adequately implemented as [dynamic workflows](../workflows/dynamic-workflows) or [map tasks](../tasks/task-types#map-tasks).
Dynamic workflows and map tasks share the same context and single underlying Kubernetes resource definitions.
Sub-launch plan invoked workflows do not share the same context.
They are executed as separate top-level entities, allowing for better parallelism and scale.

Here is an example of invoking a workflow multiple times through its launch plan:

```python
import union

@union.task
def my_task(a: int, b: int, c: int) -> int:
    return a + b + c

@union.workflow
def my_workflow(a: int, b: int, c: int) -> int:
    return my_task(a=a, b=b, c=c)

my_workflow_lp = union.LaunchPlan.get_or_create(my_workflow)

@union.workflow
def wf() -> list[int]:
    return [my_workflow_lp(a=i, b=i, c=i) for i in [1, 2, 3]]
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/dynamic-workflows ===

# Dynamic workflows

A workflow whose directed acyclic graph (DAG) is computed at run-time is a [`dynamic`]() workflow. <!-- TODO: add link to API -->

The tasks in a dynamic workflow are executed at runtime using dynamic inputs. A dynamic workflow shares similarities with the [`workflow`]()<!-- TODO: add link to API -->, as it uses a Python-esque domain-specific language to declare dependencies between the tasks or define new workflows.

A key distinction lies in the dynamic workflow being assessed at runtime. This means that the inputs are initially materialized and forwarded to the dynamic workflow, resembling the behavior of a task. However, the return value from a dynamic workflow is a [`Promise`]() <!-- TODO: add link to API --> object, which can be materialized by the subsequent tasks.

Think of a dynamic workflow as a combination of a task and a workflow. It is used to dynamically decide the parameters of a workflow at runtime and is both compiled and executed at run-time.

Dynamic workflows become essential when you need to do the following:
- Handle conditional logic
- Modify the logic of the code at runtime
- Change or decide on feature extraction parameters on the fly

## Defining a dynamic workflow

You can define a dynamic workflow using the `@union.dynamic` decorator.

Within the `@union.dynamic` context, each invocation of a [`task`]() <!-- TODO: add link to API --> or a derivative of the [`Task`]() <!-- TODO: add link to API --> class leads to deferred evaluation using a Promise, rather than the immediate materialization of the actual value. While nesting other `@union.dynamic` and `@union.workflow` constructs within this task is possible, direct interaction with the outputs of a task/workflow is limited, as they are lazily evaluated. If you need to interact with the outputs, we recommend separating the logic in a dynamic workflow and creating a new task to read and resolve the outputs.

The example below uses a dynamic workflow to count the common characters between any two strings.

We define a task that returns the index of a character, where A-Z/a-z is equivalent to 0-25:

```python
import union

@union.task
def return_index(character: str) -> int:
    if character.islower():
        return ord(character) - ord("a")
    else:
        return ord(character) - ord("A")
```

We also create a task that prepares a list of 26 characters by populating the frequency of each character:

```python
@union.task
def update_list(freq_list: list[int], list_index: int) -> list[int]:
    freq_list[list_index] += 1
    return freq_list
```

We define a task to calculate the number of common characters between the two strings:

```python
@union.task
def derive_count(freq1: list[int], freq2: list[int]) -> int:
    count = 0
    for i in range(26):
        count += min(freq1[i], freq2[i])
    return count
```

We define a dynamic workflow to accomplish the following:

1. Initialize an empty 26-character list to be passed to the `update_list` task.
2. Iterate through each character of the first string (`s1`) and populate the frequency list.
3. Iterate through each character of the second string (`s2`) and populate the frequency list.
4. Determine the number of common characters by comparing the two frequency lists.

The looping process depends on the number of characters in both strings, which is unknown until runtime:

```python
@union.dynamic
def count_characters(s1: str, s2: str) -> int:
    # s1 and s2 should be accessible

    # Initialize empty lists with 26 slots each, corresponding to every alphabet (lower and upper case)
    freq1 = [0] * 26
    freq2 = [0] * 26

    # Loop through characters in s1
    for i in range(len(s1)):
        # Calculate the index for the current character in the alphabet
        index = return_index(character=s1[i])
        # Update the frequency list for s1
        freq1 = update_list(freq_list=freq1, list_index=index)
        # index and freq1 are not accessible as they are promises

    # looping through the string s2
    for i in range(len(s2)):
        # Calculate the index for the current character in the alphabet
        index = return_index(character=s2[i])
        # Update the frequency list for s2
        freq2 = update_list(freq_list=freq2, list_index=index)
        # index and freq2 are not accessible as they are promises

    # Count the common characters between s1 and s2
    return derive_count(freq1=freq1, freq2=freq2)
```

A dynamic workflow is modeled as a task in the Union.ai backend, but the body of the function is executed to produce a workflow at runtime. In both dynamic and static workflows, the output of tasks are Promise objects.

Union.ai executes the dynamic workflow within its container, resulting in a compiled DAG, which is then accessible in the UI. It uses the information acquired during the dynamic task's execution to schedule and execute each task within the dynamic workflow. Visualization of the dynamic workflow's graph in the UI is only available after it has completed its execution.

When a dynamic workflow is executed, it generates the entire workflow structure as its output, termed the *futures file*.
This name reflects the fact that the workflow has yet to be executed, so all subsequent outputs are considered futures.

> [!NOTE]
> Local execution works when a `@union.dynamic` decorator is used because Union treats it as a task that runs with native Python inputs.

Finally, we define a standard workflow that triggers the dynamic workflow:

```python
@union.workflow
def start_wf(s1: str, s2: str) -> int:
    return count_characters(s1=s1, s2=s2)
```

You can run the workflow locally as follows:

```python
if __name__ == "__main__":
    print(start_wf(s1="Pear", s2="Earth"))
```

## Advantages of dynamic workflows

### Flexibility

Dynamic workflows streamline the process of building pipelines, offering the flexibility to design workflows
according to the unique requirements of your project. This level of adaptability is not achievable with static workflows.

### Lower pressure on `etcd`

The workflow Custom Resource Definition (CRD) and the states associated with static workflows are stored in `etcd`,
the Kubernetes database. This database maintains Union.ai workflow CRDs as key-value pairs, tracking the status of each node's execution.

However, `etcd` has a hard limit on data size, encompassing the workflow and node status sizes, so it is important to ensure that static workflows don't excessively consume memory.

In contrast, dynamic workflows offload the workflow specification (including node/task definitions and connections) to the object store. Still, the statuses of nodes are stored in the workflow CRD within `etcd`.

Dynamic workflows help alleviate some pressure on `etcd` storage space, providing a solution to mitigate storage constraints.

## Dynamic workflows vs. map tasks

Dynamic tasks come with overhead for large fan-out tasks as they store metadata for the entire workflow.
In contrast, [map tasks](../tasks/task-types#map-tasks) prove efficient for such extensive fan-out scenarios since they refrain from storing metadata, resulting in less noticeable overhead.

## Using dynamic workflows to achieve recursion

Merge sort is a perfect example to showcase how to seamlessly achieve recursion using dynamic workflows.
Union.ai imposes limitations on the depth of recursion to prevent misuse and potential impacts on the overall stability of the system.

```python
from typing import Tuple

import union

@union.task
def split(numbers: list[int]) -> tuple[list[int], list[int]]:

    length = len(numbers)

    return (
        numbers[0 : int(length / 2)],
        numbers[int(length / 2) :]
    )

@union.task
def merge(sorted_list1: list[int], sorted_list2: list[int]) -> list[int]:
    result = []
    while len(sorted_list1) > 0 and len(sorted_list2) > 0:
        # Compare the current element of the first array with the current element of the second array.
        # If the element in the first array is smaller, append it to the result and increment the first array index.
        # Otherwise, do the same with the second array.
        if sorted_list1[0] < sorted_list2[0]:
            result.append(sorted_list1.pop(0))
        else:
            result.append(sorted_list2.pop(0))

    # Extend the result with the remaining elements from both arrays
    result.extend(sorted_list1)
    result.extend(sorted_list2)

    return result

@union.task
def sort_locally(numbers: list[int]) -> list[int]:
    return sorted(numbers)

@union.dynamic
def merge_sort_remotely(numbers: list[int], threshold: int) -> list[int]:
    split1, split2 = split(numbers=numbers)
    sorted1 = merge_sort(numbers=split1, threshold=threshold)
    sorted2 = merge_sort(numbers=split2, threshold=threshold)
    return merge(sorted_list1=sorted1, sorted_list2=sorted2)

@union.dynamic
def merge_sort(numbers: list[int], threshold: int=5) -> list[int]:

    if len(numbers) <= threshold:
        return sort_locally(numbers=numbers)
    else:
        return merge_sort_remotely(numbers=numbers, threshold=threshold)
```

By simply adding the `@union.dynamic` annotation, the `merge_sort_remotely` function transforms into a plan of execution,
generating a workflow with four distinct nodes. These nodes run remotely on potentially different hosts,
with Union.ai ensuring proper data reference passing and maintaining execution order with maximum possible parallelism.

`@union.dynamic` is essential in this context because the number of times `merge_sort` needs to be triggered is unknown at compile time. The dynamic workflow calls a static workflow, which subsequently calls the dynamic workflow again,
creating a recursive and flexible execution structure.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/imperative-workflows ===

# Imperative workflows

Workflows are commonly created by applying the `@union.workflow` decorator to Python functions.
During compilation, this involves processing the function's body and utilizing subsequent calls to
underlying tasks to establish and record the workflow structure. This is the *declarative* approach
and is suitable when manually drafting the workflow.

However, in cases where workflows are constructed programmatically, an imperative style is more appropriate.
For instance, if tasks have been defined already, their sequence and dependencies might have been specified in textual form (perhaps during a transition from a legacy system).
In such scenarios, you want to orchestrate these tasks.
This is where Union.ai's imperative workflows come into play, allowing you to programmatically construct workflows.

## Example

To begin, we define the `slope` and `intercept` tasks:

```python
import union

@union.task
def slope(x: list[int], y: list[int]) -> float:
    sum_xy = sum([x[i] * y[i] for i in range(len(x))])
    sum_x_squared = sum([x[i] ** 2 for i in range(len(x))])
    n = len(x)
    return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2)

@union.task
def intercept(x: list[int], y: list[int], slope: float) -> float:
    mean_x = sum(x) / len(x)
    mean_y = sum(y) / len(y)
    intercept = mean_y - slope * mean_x
    return intercept
```

Create an imperative workflow:

```python
imperative_wf = Workflow(name="imperative_workflow")
```

Add the workflow inputs to the imperative workflow:

```python
imperative_wf.add_workflow_input("x", list[int])
imperative_wf.add_workflow_input("y", list[int])
```

> If you want to assign default values to the workflow inputs, you can create a [launch plan](../launch-plans).

Add the tasks that need to be triggered from within the workflow:

```python
node_t1 = imperative_wf.add_entity(slope, x=imperative_wf.inputs["x"], y=imperative_wf.inputs["y"])
node_t2 = imperative_wf.add_entity(
    intercept, x=imperative_wf.inputs["x"], y=imperative_wf.inputs["y"], slope=node_t1.outputs["o0"]
)
```

Lastly, add the workflow output:

```python
imperative_wf.add_workflow_output("wf_output", node_t2.outputs["o0"])
```

You can execute the workflow locally as follows:

```python
if __name__ == "__main__":
    print(f"Running imperative_wf() {imperative_wf(x=[-3, 0, 3], y=[7, 4, -2])}")
```

You also have the option to provide a list of inputs and
retrieve a list of outputs from the workflow:

```python
wf_input_y = imperative_wf.add_workflow_input("y", list[str])
node_t3 = wf.add_entity(some_task, a=[wf.inputs["x"], wf_input_y])

wf.add_workflow_output(
    "list_of_outputs",
    [node_t1.outputs["o0"], node_t2.outputs["o0"]],
    python_type=list[str],
)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/launching-workflows ===

# Launching workflows

From the [individual workflow view](./viewing-workflows#workflow-view) (accessed, for example, by selecting a workflow in the [**Workflows** list](./viewing-workflows#workflows-list)) you can select **Launch Workflow** in the top right. This opens the **New Execution** dialog for workflows:

![New execution dialog settings](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workflows/launching-workflows/new-execution-dialog-settings.png)

At the top you can select:

* The specific version of this workflow that you want to launch.
* The launch plan to be used to launch this workflow (by default it is set to the [default launch plan of the workflow](../launch-plans#default-launch-plan)).

Along the left side the following sections are available:

* **Inputs**: The input parameters of the workflow function appear here as fields to be filled in.
* **Settings**:
  * **Execution name**: A custom name for this execution. If not specified, a name will be generated.
  * **Overwrite cached outputs**: A boolean. If set to `True`, this execution will overwrite any previously-computed cached outputs.
  * **Raw output data config**: Remote path prefix to store raw output data.
    By default, workflow output will be written to the built-in metadata storage.
    Alternatively, you can specify a custom location for output at the organization, project-domain, or individual execution levels.
    This field is for specifying this setting at the workflow execution level.
    If this field is filled in it overrides any settings at higher levels.
    The parameter is expected to be a URL to a writable resource (for example, `http://s3.amazonaws.com/my-bucket/`).
    See [Raw data store](https://www.union.ai/docs/v1/union/user-guide/data-input-output/task-input-and-output).
  * **Max parallelism**: Number of workflow nodes that can be executed in parallel. If not specified, project/domain defaults are used. If 0 then no limit is applied.
  * **Force interruptible**: A three valued setting for overriding the interruptible setting of the workflow for this particular execution.
    If not set, the workflow's interruptible setting is used.
    If set and **enabled** then `interruptible=True` is used for this execution.
    If set and **disabled** then `interruptible=False` is used for this execution.
    See [Interruptible instances](../tasks/task-hardware-environment/interruptible-instances)

  * **Service account**: The service account to use for this execution. If not specified, the default is used.

* **Environment variables**: Environment variables that will be available to tasks in this workflow execution.
* **Labels**: Labels to apply to the execution resource.
* **Notifications**: [Notifications](../launch-plans/notifications) configured for this workflow execution.

* **Debug**: The workflow execution details for debugging purposes.

Select **Launch** to launch the workflow execution. This will take you to the [Execution view](./viewing-workflow-executions).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/viewing-workflows ===

# Viewing workflows

## Workflows list

The workflows list shows all workflows in the current project and domain:

![Workflows list](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workflows/viewing-workflows/workflows-list.png)

You can search the list by name and filter for only those that are archived.
To archive a workflow, select the archive icon ![Archive icon](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workflows/viewing-workflows/archive-icon.png).

Each entry in the list provides some basic information about the workflow:

* **Last execution time**:
The time of the most recent execution of this workflow.
* **Last 10 executions**:
The status of the last 10 executions of this workflow.
* **Inputs**:
The input type for the workflow.
* **Outputs**:
The output type for the workflow.
* **Description**:
 The description of the workflow.

Select an entry on the list to go to that **Core concepts > Workflows > Viewing workflows > Workflow view**.

## Workflow view

The workflow view provides details about a specific workflow.

![Workflow view](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workflows/viewing-workflows/workflow-view.png)

This view provides:
* A list of recent workflow versions:
  Selecting a version will take you to the **Core concepts > Workflows > Viewing workflows > Workflow view > Workflow versions list**.
* A list of recent executions:
  Selecting an execution will take you to the [execution view](./viewing-workflow-executions).

### Workflow versions list

The workflow versions list shows the  a list of all versions of this workflow along with a graph view of the workflow structure:

![Workflow version list](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workflows/viewing-workflows/workflow-versions-list.png)

### Workflow and task descriptions

Union.ai enables the use of docstrings to document your code. Docstrings are stored in the control plane and displayed on the UI for each workflow or task.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/viewing-workflow-executions ===

# Viewing workflow executions

The **Executions list** shows all executions in a project and domain combination.
An execution represents a single run of all or part of a workflow (including subworkflows and individual tasks).
You can access it from the **Executions** link in the left navigation.

![Executions list](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workflows/viewing-workflow-executions/executions-list.png)

## Domain Settings

This section displays any domain-level settings that have been configured for this project-domain combination. They are:

* Security Context
* Labels
* Annotations
* Raw output data config
* Max parallelism

## All Executions in the Project

For each execution in this project and domain you can see the following:

* A graph of the **last 100 executions in the project**.
* **Start time**: Select to view the **Core concepts > Workflows > Viewing workflow executions > Execution view**.
* **Workflow/Task**: The [individual workflow](./viewing-workflows) or [individual task](../tasks/viewing-tasks) that ran in this execution.
* **Version**: The version of the workflow or task that ran in this execution.
* **Launch Plan**: The [Launch Plan](../launch-plans/viewing-launch-plans) that was used to launch this execution.
* **Schedule**: The schedule that was used to launch this execution (if any).
* **Execution ID**: The ID of the execution.
* **Status**: The status of the execution. One of **QUEUED**, **RUNNING**, **SUCCEEDED**, **FAILED** or **UNKNOWN**.
* **Duration**: The duration of the execution.

## Execution view

The execution view appears when you launch a workflow or task or select an already completed execution.

An execution represents a single run of all or part of a workflow (including subworkflows and individual tasks).

![Execution view - nodes](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workflows/viewing-workflow-executions/execution-view-nodes.png)

> [!NOTE]
> An execution usually represents the run of an entire workflow.
> But, because workflows are composed of tasks (and sometimes subworkflows) and Union.ai caches the outputs of those independently of the workflows in which they participate, it sometimes makes sense to execute a task or subworkflow independently.

The top part of execution view provides detailed general information about the execution.

The bottom part provides three tabs displaying different aspects of the execution: **Nodes**, **Graph**, and **Timeline**.

### Nodes

The default tab within the execution view is the **Nodes** tab.
It shows a list of the Union.ai nodes that make up this execution (A node in Union.ai is either a task or a (sub-)workflow).

Selecting an item in the list opens the right panel showing more details of that specific node:

![](../../../_static/images/user-guide/core-concepts/workflows/viewing-workflow-executions/execution-view-node-side-panel.png)

The top part of the side panel provides detailed information about the node as well as the **Rerun task** button.

Below that, you have the following tabs: **Executions**, **Inputs**, **Outputs**, and **Task**.

The **Executions** tab gives you details on the execution of this particular node as well as access to:

* **Task level monitoring**: You can access the [task-level monitoring](../tasks/task-hardware-environment/task-level-monitoring) information by selecting **View Utilization**.

* **Logs**: You can access logs by clicking the text under **Logs**. See [Logging](../tasks/viewing-logs).

The **Inputs**, **Outputs** tabs display the data that was passed into and out of the node, respectively.

If this node is a task (as opposed to a subworkflow) then the **Task** tab displays the Task definition structure.

### Graph

The Graph tab displays a visual representation of the execution as a directed acyclic graph:

![](../../../_static/images/user-guide/core-concepts/workflows/viewing-workflow-executions/execution-view-graph.png)

### Timeline

The Timeline tab displays a visualization showing the timing of each task in the execution:

![](../../../_static/images/user-guide/core-concepts/workflows/viewing-workflow-executions/execution-view-timeline.png)

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks ===

# Tasks

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

Tasks are the fundamental units of compute in Union.ai.
They are independently executable, strongly typed, and containerized building blocks that make up workflows.
Workflows are constructed by chaining together tasks, with the output of one task feeding into the input of the next to form a directed acyclic graph.

## Tasks are independently executable

Tasks are designed to be independently executable, meaning that they can be run in isolation from other tasks.
And since most tasks are just Python functions, they can be executed on your local machine, making it easy to unit test and debug tasks locally before deploying them to Union.ai.

Because they are independently executable, tasks can also be shared and reused across multiple workflows and, as long as their logic is deterministic, their input and outputs can be **Core concepts > Caching** to save compute resources and execution time.

## Tasks are strongly typed

Tasks have strongly typed inputs and outputs, which are validated at deployment time.
This helps catch bugs early and ensures that the data passing through tasks and workflows is compatible with the explicitly stated types.

Under the hood, Union.ai uses the [Flyte type system]() and translates between the Flyte types and the Python types.
Python type annotations make sure that the data passing through tasks and workflows is compatible with the explicitly stated types defined through a function signature.
The Union.ai type system is also used for caching, data lineage tracking, and automatic serialization and deserialization of data as it’s passed from one task to another.

## Tasks are containerized

While (most) tasks are locally executable, when a task is deployed to Union.ai as part of the registration process it is containerized and run in its own independent Kubernetes pod.

This allows tasks to have their own independent set of [software dependencies](./task-software-environment/_index) and [hardware requirements](./task-hardware-environment/_index).
For example, a task that requires a GPU can be deployed to Union.ai with a GPU-enabled container image, while a task that requires a specific version of a software library can be deployed with that version of the library installed.

## Tasks are named, versioned, and immutable

The fully qualified name of a task is a combination of its project, domain, and name. To update a task, you change it and re-register it under the same fully qualified name. This creates a new version of the task while the old version remains available. At the version level task are, therefore, immutable. This immutability is important for ensuring that workflows are reproducible and that the data lineage is accurate.

## Tasks are (usually) deterministic and cacheable

When deciding if a unit of execution is suitable to be encapsulated as a task, consider the following questions:

* Is there a well-defined graceful/successful exit criteria for the task?
    * A task is expected to exit after completion of input processing.
* Is it deterministic and repeatable?
    * Under certain circumstances, a task might be cached or rerun with the same inputs.
      It is expected to produce the same output every time.
      You should, for example, avoid using random number generators with the current clock as seed.
* Is it a pure function? That is, does it have side effects that are unknown to the system?
    * It is recommended to avoid side effects in tasks.
    * When side effects are unavoidable, ensure that the operations are idempotent.

For details on task caching, see **Core concepts > Caching**.

## Workflows can contain many types of tasks

One of the most powerful features of Union.ai is the ability to run widely differing computational workloads as tasks with a single workflow.

Because of the way that Union.ai is architected, tasks within a single workflow can differ along many dimensions. While the total number of ways that tasks can be configured is quite large, the options fall into three categories:

* **Task type**: These include standard Python tasks, map tasks, raw container tasks, and many specialized plugin tasks. For more information, see **Core concepts > Tasks > Other task types**.
* **Software environment**: Define the task container image, dependencies, and even programming language. For more information, see [Task software environment](./task-software-environment/_index).
* **Hardware environment**: Define the resource requirements (processor numbers, storage amounts) and machine node characteristics (CPU and GPU type). For more information, see [Task hardware environment](./task-hardware-environment/_index).

### Mix and match task characteristics

Along these three dimensions, you can mix and match characteristics to build a task definition that performs exactly the job you want, while still taking advantage of all the features provided at the workflow level like output caching, versioning, and reproducibility.

Tasks with diverse characteristics can be combined into a single workflow.
For example, a workflow might contain:

* A **Python task running on your default container image** with default dependencies and a default resource and hardware profile.
* A **Python task running on a container image with additional dependencies** configured to run on machine nodes with a specific type of GPU.
* A **raw container task** running a Java process.
* A **plugin task** running a Spark job that spawns its own cluster-in-a-cluster.
* A **map task** that runs multiple copies of a Python task in parallel.

The ability to build workflows from such a wide variety of heterogeneous tasks makes Union.ai uniquely flexible.

> [!NOTE]
> Not all parameters are compatible. For example, with specialized plugin task types, some configurations are
> not available (this depends on task plugin details).

## Task configuration

The `@union.task` decorator can take a number of parameters that allow you to configure the task's behavior.
For example, you can specify the task's software dependencies, hardware requirements, caching behavior, retry behavior, and more.
For more information, see **Core concepts > Tasks > Task parameters**.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/map-tasks ===

## Map tasks

A map task allows you to execute many instances of a task within a single workflow node.
This enables you to execute a task across a set of inputs without having to create a node for each input, resulting in significant performance improvements.

Map tasks find application in various scenarios, including:
* When multiple inputs require running through the same code logic.
* Processing multiple data batches concurrently.

Just like normal tasks, map tasks are automatically parallelized to the extent possible given resources available in the cluster.

```python
THRESHOLD = 11

@union.task
def detect_anomalies(data_point: int) -> bool:
    return data_point > THRESHOLD

@union.workflow
def map_workflow(data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]) -> list[bool]:
    # Use the map task to apply the anomaly detection function to each data point
    return union.map(detect_anomalies)(data_point=data)

```

> [!NOTE]
> Map tasks can also map over launch plans. For more information and example code, see [Mapping over launch plans](../launch-plans/mapping-over-launch-plans).

To customize resource allocations, such as memory usage for individual map tasks, you can leverage `with_overrides`. Here’s an example using the `detect_anomalies` map task within a workflow:

```python
import union

@union.workflow
def map_workflow_with_resource_overrides(
    data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]
) -> list[bool]:

    return (
        union.map(detect_anomalies)(data_point=data)
        .with_overrides(requests=union.Resources(mem="2Gi"))
    )
```

You can also configure `concurrency` and `min_success_ratio` for a map task:

- `concurrency` limits the number of mapped tasks that can run in parallel to the specified batch size. If the input size exceeds the concurrency value, multiple batches will run serially until all inputs are processed. If left unspecified, it implies unbounded concurrency.
- `min_success_ratio` determines the minimum fraction of total jobs that must complete successfully before terminating the map task and marking it as successful.

```python
@union.workflow
def map_workflow_with_additional_params(
    data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]
) -> list[typing.Optional[bool]]:

    return union.map(
        detect_anomalies,
        concurrency=1,
        min_success_ratio=0.75
    )(data_point=data)
```

For more details see [Map Task example](https://github.com/unionai-oss/union-cloud-docs-examples/tree/main/map_task) in the `unionai-examples` repository and [Map Tasks]() section.
<!-- TODO: Add link to API -->

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-types ===

# Other task types

Task types include:

* **`PythonFunctionTask`**: This Python class represents the standard default task.
It is the type that is created when you use the `@union.task` decorator.
* **`ContainerTask`**: This Python class represents a raw container.
It allows you to install any image you like, giving you complete control of the task.
* **Shell tasks**: Use them to execute `bash` scripts within Union.ai.
* **Specialized plugin tasks**: These include both specialized classes and specialized configurations of the `PythonFunctionTask`.
They implement integrations with third-party systems.

## PythonFunctionTask

This is the task type that is created when you add the `@union.task` decorator to a Python function.
It represents a Python function that will be run within a single container. For example::

```python
@union.task
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame

```

See the [Python Function Task example](https://github.com/unionai-oss/union-cloud-docs-examples/tree/main/python_function_task).

This is the most common task variant and the one that, thus far, we have focused on in this documentation.

## ContainerTask

This task variant represents a raw container, with no assumptions made about what is running within it.
Here is an example of declaring a `ContainerTask`:

```python
greeting_task = ContainerTask(
    name="echo_and_return_greeting",
    image="alpine:latest",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(name=str),
    outputs=kwtypes(greeting=str),
    command=["/bin/sh", "-c", "echo 'Hello, my name is {{.inputs.name}}.' | tee -a /var/outputs/greeting"],
)
```

The `ContainerTask` enables you to include a task in your workflow that executes arbitrary code in any language, not just Python.

In the following example, the tasks calculate an ellipse area. This name has to be unique in the entire project. Users can specify:

`input_data_dir` -> where inputs will be written to.

`output_data_dir` -> where Union.ai will expect the outputs to exist.

The `inputs` and `outputs` specify the interface for the task; thus it should be an ordered dictionary of typed input and output variables.

The image field specifies the container image for the task, either as an image name or an ImageSpec. To access the file that is not included in the image, use ImageSpec to copy files or directories into container `/root`.

Cache can be enabled in a ContainerTask by configuring the cache settings in the `TaskMetadata` in the metadata parameter.

```python
calculate_ellipse_area_haskell = ContainerTask(
    name="ellipse-area-metadata-haskell",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-haskell:v2",
    command=[
        "./calculate-ellipse-area",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
    metadata=TaskMetadata(cache=True, cache_version="1.0"),
)

calculate_ellipse_area_julia = ContainerTask(
    name="ellipse-area-metadata-julia",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-julia:v2",
    command=[
        "julia",
        "calculate-ellipse-area.jl",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
    metadata=TaskMetadata(cache=True, cache_version="1.0"),
)

@workflow
def wf(a: float, b: float):
    area_haskell, metadata_haskell = calculate_ellipse_area_haskell(a=a, b=b)
    area_julia, metadata_julia = calculate_ellipse_area_julia(a=a, b=b)
```

See the [Container Task example](https://github.com/unionai-oss/union-cloud-docs-examples/tree/main/container_task).

## Shell tasks

Shell tasks enable the execution of shell scripts within Union.ai.
To create a shell task, provide a name for it, specify the bash script to be executed, and define inputs and outputs if needed:

### Example
```python
from pathlib import Path
from typing import Tuple

import union
from flytekit import kwtypes
from flytekit.extras.tasks.shell import OutputLocation, ShellTask

t1 = ShellTask(
    name="task_1",
    debug=True,
    script="""
    set -ex
    echo "Hey there! Let's run some bash scripts using a shell task."
    echo "Showcasing shell tasks." >> {inputs.x}
    if grep "shell" {inputs.x}
    then
        echo "Found it!" >> {inputs.x}
    else
        echo "Not found!"
    fi
    """,
    inputs=kwtypes(x=FlyteFile),
    output_locs=[OutputLocation(var="i", var_type=FlyteFile, location="{inputs.x}")],
)

t2 = ShellTask(
    name="task_2",
    debug=True,
    script="""
    set -ex
    cp {inputs.x} {inputs.y}
    tar -zcvf {outputs.j} {inputs.y}
    """,
    inputs=kwtypes(x=FlyteFile, y=FlyteDirectory),
    output_locs=[OutputLocation(var="j", var_type=FlyteFile, location="{inputs.y}.tar.gz")],
)

t3 = ShellTask(
    name="task_3",
    debug=True,
    script="""
    set -ex
    tar -zxvf {inputs.z}
    cat {inputs.y}/$(basename {inputs.x}) | wc -m > {outputs.k}
    """,
    inputs=kwtypes(x=FlyteFile, y=FlyteDirectory, z=FlyteFile),
    output_locs=[OutputLocation(var="k", var_type=FlyteFile, location="output.txt")],
)
```
Here's a breakdown of the parameters of the `ShellTask`:

- The `inputs` parameter allows you to specify the types of inputs that the task will accept
- The `output_locs` parameter is used to define the output locations, which can be `FlyteFile` or `FlyteDirectory`
- The `script` parameter contains the actual bash script that will be executed
  (`{inputs.x}`, `{outputs.j}`, etc. will be replaced with the actual input and output values).
- The `debug` parameter is helpful for debugging purposes

We define a task to instantiate `FlyteFile` and `FlyteDirectory`.
A `.gitkeep` file is created in the `FlyteDirectory` as a placeholder to ensure the directory exists:

```python
@union.task
def create_entities() -> Tuple[union.FlyteFile, union.FlyteDirectory]:
    working_dir = Path(union.current_context().working_directory)
    flytefile = working_dir / "test.txt"
    flytefile.touch()

    flytedir = working_dir / "testdata"
    flytedir.mkdir(exist_ok=True)

    flytedir_file = flytedir / ".gitkeep"
    flytedir_file.touch()
    return flytefile, flytedir
```
We create a workflow to define the dependencies between the tasks:

```python
@union.workflow
def shell_task_wf() -> union.FlyteFile:
    x, y = create_entities()
    t1_out = t1(x=x)
    t2_out = t2(x=t1_out, y=y)
    t3_out = t3(x=x, y=y, z=t2_out)
    return t3_out
```
You can run the workflow locally:
```python
if __name__ == "__main__":
    print(f"Running shell_task_wf() {shell_task_wf()}")
```

## Specialized plugin task classes and configs

Union.ai supports a wide variety of plugin tasks.
Some of these are enabled as specialized task classes, others as specialized configurations of the default `@union.task` (`PythonFunctionTask`).

They enable things like:

* Querying external databases (AWS Athena, BigQuery, DuckDB, SQL, Snowflake, Hive).
* Executing specialized processing right in Union.ai (Spark in virtual cluster, Dask in Virtual cluster, Sagemaker, Airflow, Modin, Ray, MPI and Horovod).
* Handing off processing to external services(AWS Batch, Spark on Databricks, Ray on external cluster).
* Data transformation (Great Expectations, DBT, Dolt, ONNX, Pandera).
* Data tracking and presentation  (MLFlow, Papermill).

See the [Integration section]() for examples.
<!-- TODO: Add link to API -->

<!-- TODO: INCORPORATE THE FOLLOWING ABOVE WHERE NECESSARY

## @union.task parameters

`task_config`: This argument provides configuration for a specific task types. Please refer to the plugins documentation for the right object to use.
It is impossible to define the unit of execution of a task in the same
way for all tasks. Hence, Flyte allows for different task types in the
system. Flyte has a set of defined, battle-tested task types. It allows
for a flexible model to
`define new types <cookbook:plugins_extend>`{.interpreted-text
role="std:ref"}.
Flyte offers numerous plugins for tasks, including backend plugins like Athena.
Flyte exposes an extensible model to express tasks in an
execution-independent language. It contains first-class task plugins
(for example:
[Papermill](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-papermill/flytekitplugins/papermill/task.py),
[Great
Expectations](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-greatexpectations/flytekitplugins/great_expectations/task.py),
and `more <integrations>`{.interpreted-text role="ref"}.) that execute
the Flyte tasks. Almost any action can be implemented and introduced
into Flyte as a \"Plugin\", which includes:
- Tasks that run queries on distributed data warehouses like Redshift, Hive, Snowflake, etc.
- Tasks that run executions on compute engines like Spark, Flink, AWS Sagemaker, AWS Batch, Kubernetes pods, jobs, etc.
- Tasks that call web services.
Flyte ships with certain defaults, for example, running a simple Python
function does not need any hosted service. Flyte knows how to execute
these kinds of tasks on Kubernetes. It turns out these are the vast
majority of tasks in machine learning, and Flyte is adept at handling an
enormous scale on Kubernetes. This is achieved by implementing a unique
scheduler on Kubernetes.

-->

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-parameters ===

# Task parameters

You pass the following parameters to the `@union.task` decorator:

<!-- TODO: consider organizing by category rather than alphabetically. -->

* `accelerator`: The accelerator to use for this task.
  For more information, see [Specifying accelerators]().
  <!-- TODO: Add link to API -->

* `cache`: See [Caching](../caching).

* `cache_serialize`: See [Caching](../caching).

* `cache_version`: See [Caching](../caching).

* `cache_ignore_input_vars`: Input variables that should not be included when calculating the hash for the cache.

* `container_image`: See [`ImageSpec`](../image-spec).

* `deprecated`: A string that can be used to provide a warning message for deprecated task.
  The absence of a string, or an empty string, indicates that the task is active and not deprecated.

* `docs`: Documentation about this task.

* `enable_deck`: If true, this task will output a Deck which can be used to visualize the task execution. See [Decks](https://www.union.ai/docs/v1/union/user-guide/development-cycle/decks).

```python
@union.task(enable_deck=True)
def my_task(my_str: str):
    print("hello {my_str}")
```

* `environment`: See [Environment variables](./task-software-environment/environment-variables).

* `interruptible`: See [Interruptible instances](./task-hardware-environment/interruptible-instances).

* `limits`: See [Customizing task resources](./task-hardware-environment/customizing-task-resources).

* `node_dependency_hints`: A list of tasks, launch plans, or workflows that this task depends on.
  This is only for dynamic tasks/workflows, where Union.ai cannot automatically determine the dependencies prior to runtime.
  Even on dynamic tasks this is optional, but in some scenarios it will make registering the workflow easier,
  because it allows registration to be done the same as for static tasks/workflows.
  For example this is useful to run launch plans dynamically, because launch plans must be registered before they can be run.
  Tasks and workflows do not have this requirement.

```python
@union.workflow
def workflow0():
  launchplan0 = LaunchPlan.get_or_create(workflow0)
    # Specify node_dependency_hints so that launchplan0
    # will be registered on flyteadmin, despite this being a dynamic task.
@union.dynamic(node_dependency_hints=[launchplan0])
def launch_dynamically():
    # To run a sub-launchplan it must have previously been registered on flyteadmin.
    return [launchplan0]*10
```

* `pod_template`: See [Task hardware environment](./task-hardware-environment#pod_template-and-pod_template_name-task-parameters).

* `pod_template_name`: See [Task hardware environment](./task-hardware-environment#pod_template-and-pod_template_name-task-parameters).

* `requests`: See [Customizing task resources](./task-hardware-environment/customizing-task-resources)

* `retries`: Number of times to retry this task during a workflow execution.
  Tasks can define a retry strategy to let the system know how to handle failures (For example: retry 3 times on any kind of error).
  For more information, see [Interruptible instances](./task-hardware-environment/interruptible-instances)
  There are two kinds of retries *system retries* and *user retries*.

* `secret_requests`: See [Managing secrets](https://www.union.ai/docs/v1/union/user-guide/development-cycle/managing-secrets)

* `task_config`: Configuration for a specific task type.
  See the [Union.ai Connectors documentation](../../integrations/connectors) and
  [Union.ai plugins documentation]() for the right object to use.
  <!-- TODO: Add link to API -->

* `task_resolver`: Provide a custom task resolver.

* `timeout`: The max amount of time for which one execution of this task should be executed for.
  The execution will be terminated if the runtime exceeds the given timeout (approximately).
  To ensure that the system is always making progress, tasks must be guaranteed to end gracefully/successfully.
  The system defines a default timeout period for the tasks.
  It is possible for task authors to define a timeout period, after which the task is marked as `failure`.
  Note that a timed-out task will be retried if it has a retry strategy defined.
  The timeout can be handled in the
  [TaskMetadata]().
  <!-- TODO: Add link to API -->

## Use `partial` to provide default arguments to tasks

You can use the `functools.partial` function to assign default or constant values to the parameters of your tasks:
```python
import functools
import union

@union.task
def slope(x: list[int], y: list[int]) -> float:
    sum_xy = sum([x[i] * y[i] for i in range(len(x))])
    sum_x_squared = sum([x[i] ** 2 for i in range(len(x))])
    n = len(x)
    return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2)

@union.workflow
def simple_wf_with_partial(x: list[int], y: list[int]) -> float:
    partial_task = functools.partial(slope, x=x)
    return partial_task(y=y)
```

## Named outputs

By default, Union.ai employs a standardized convention to assign names to the outputs of tasks or workflows.
Each output is sequentially labeled as `o1`, `o2`, `o3`, ... `on`, where `o` serves as the standard prefix,
and `1`, `2`, ... `n` indicates the positional index within the returned values.

However, Union.ai allows the customization of output names for tasks or workflows.
This customization becomes beneficial when you're returning multiple outputs
and you wish to assign a distinct name to each of them.

The following example illustrates the process of assigning names to outputs for both a task and a workflow.

Define a `NamedTuple` and assign it as an output to a task:

```python
import union
from typing import NamedTuple

slope_value = NamedTuple("slope_value", [("slope", float)])

@union.task
def slope(x: list[int], y: list[int]) -> slope_value:
    sum_xy = sum([x[i] * y[i] for i in range(len(x))])
    sum_x_squared = sum([x[i] ** 2 for i in range(len(x))])
    n = len(x)
    return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2)
```

Likewise, assign a `NamedTuple` to the output of `intercept` task:

```python
intercept_value = NamedTuple("intercept_value", [("intercept", float)])

@union.task
def intercept(x: list[int], y: list[int], slope: float) -> intercept_value:
    mean_x = sum(x) / len(x)
    mean_y = sum(y) / len(y)
    intercept = mean_y - slope * mean_x
    return intercept
```

> [!NOTE]
> While it's possible to create `NamedTuple`s directly within the code,
> it's often better to declare them explicitly. This helps prevent potential linting errors in tools like mypy.
>
> ```python
> def slope() -> NamedTuple("slope_value", slope=float):
>     pass
> ```

You can easily unpack the `NamedTuple` outputs directly within a workflow.
Additionally, you can also have the workflow return a `NamedTuple` as an output.

> [!NOTE]
> Remember that we are extracting individual task execution outputs by dereferencing them.
> This is necessary because `NamedTuple`s function as tuples and require this dereferencing:

```python
slope_and_intercept_values = NamedTuple("slope_and_intercept_values", [("slope", float), ("intercept", float)])

@union.workflow
def simple_wf_with_named_outputs(x: list[int] = [-3, 0, 3], y: list[int] = [7, 4, -2]) -> slope_and_intercept_values:
    slope_value = slope(x=x, y=y)
    intercept_value = intercept(x=x, y=y, slope=slope_value.slope)
    return slope_and_intercept_values(slope=slope_value.slope, intercept=intercept_value.intercept)

```

You can run the workflow locally as follows:

```python
if __name__ == "__main__":
    print(f"Running simple_wf_with_named_outputs() {simple_wf_with_named_outputs()}")
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/launching-tasks ===

# Launching tasks

From the [task view](./viewing-tasks#task-view) (accessed, for example, by selecting a task in the [**Tasks** list](./viewing-tasks#tasks-list)) you can select **Launch Task** in the top right:

This opens the **New Execution** dialog for tasks:

![](../../../_static/images/user-guide/core-concepts/tasks/launching-tasks/new-execution-dialog.png)

The settings are similar to those for workflows. At the top you can select:

* The specific version of this task that you want to launch.

Along the left side the following sections are available:

* **Inputs**: The input parameters of the task function appear here as fields to be filled in.
* **Settings**:
  * **Execution name**: A custom name for this execution. If not specified, a name will be generated.
  * **Overwrite cached outputs**: A boolean. If set to `True`, this execution will overwrite any previously-computed cached outputs.
  * **Raw output data config**: Remote path prefix to store raw output data.
    By default, workflow output will be written to the built-in metadata storage.
    Alternatively, you can specify a custom location for output at the organization, project-domain, or individual execution levels.
    This field is for specifying this setting at the workflow execution level.
    If this field is filled in it overrides any settings at higher levels.
    The parameter is expected to be a URL to a writable resource (for example, `http://s3.amazonaws.com/my-bucket/`).
    See [Raw data store](https://www.union.ai/docs/v1/union/user-guide/data-input-output/task-input-and-output)
    **Max parallelism**: Number of workflow nodes that can be executed in parallel. If not specified, project/domain defaults are used. If 0 then no limit is applied.
  * **Force interruptible**: A three valued setting for overriding the interruptible setting of the workflow for this particular execution.
    If not set, the workflow's interruptible setting is used.
    If set and **enabled** then `interruptible=True` is used for this execution.
    If set and **disabled** then `interruptible=False` is used for this execution.
    See [Interruptible instances](./task-hardware-environment/interruptible-instances)

  * **Service account**: The service account to use for this execution. If not specified, the default is used.

* **Environment variables**: Environment variables that will be available to tasks in this workflow execution.
* **Labels**: Labels to apply to the execution resource.
* **Notifications**: [Notifications](../launch-plans/notifications) configured for this workflow execution.

* **Debug**: The workflow execution details for debugging purposes.

Select **Launch** to launch the task execution. This will take you to the [Execution view](../workflows/viewing-workflow-executions).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/viewing-tasks ===

# Viewing tasks

## Tasks list

Selecting **Tasks** in the sidebar displays a list of all the registered tasks:

![Tasks list](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-tasks/tasks-list.png)

You can search the tasks by name and filter for only those that are archived.

Each task in the list displays some basic information about the task:

* **Inputs**: The input type for the task.
* **Outputs**: The output type for the task.
* **Description**: A description of the task.

Select an entry on the list to go to that **Core concepts > Tasks > Viewing tasks > Task view**.

## Task view

Selecting an individual task from the **Core concepts > Tasks > Viewing tasks > Tasks list** will take you to the task view:

![Task view](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-tasks/task-view.png)

Here you can see:

* **Inputs & Outputs**: The input and output types for the task.
* Recent task versions. Selecting one of these takes you to the **Core concepts > Tasks > Viewing tasks > Task view > Task versions list**
* Recent executions of this task. Selecting one of these takes you to the [execution view](../workflows/viewing-workflow-executions).

### Task versions list

The task versions list give you detailed information about a specific version of a task:

![Task versions list](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-tasks/task-versions-list.png)

* **Image**: The Docker image used to run this task.
* **Env Vars**: The environment variables used by this task.
* **Commands**: The JSON object defining this task.

At the bottom is a list of all versions of the task with the current one selected.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment ===

# Task software environment

The @union.task decorator provides the following parameters to specify the software environment in which a task runs:

* `container_image`: Can be either a string referencing a specific image on a container repository, or an ImageSpec defining a build. See **Core concepts > Tasks > Task software environment > Local image building** for details.
* `environment`: See **Core concepts > Tasks > Task software environment > Environment variables** for details.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/image-spec ===

# Local image building

With Union.ai, every task in a workflow runs within its own dedicated container.
Since a container requires a container image to run, every task in Union.ai must have a container image associated with it.
You can specify the container image to be used by a task by defining an `ImageSpec` object and passing it to the `container_image` parameter of the `@union.task` decorator.
When you register the workflow, the container image is built locally and pushed to the container registry that you specify.
When the workflow is executed, the container image is pulled from that registry and used to run the task.

> [!NOTE]
> See the [ImageSpec API documentation]() for full documentation of `ImageSpec` class parameters and methods.
<!-- TODO: Add link to API -->

To illustrate the process, we will walk through an example.

## Project structure

```shell
├── requirements.txt
└── workflows
    ├── __init__.py
    └── imagespec-simple-example.py
```

### requirements.txt

```shell
union
pandas
```

### imagespec-simple-example.py

```python
import typing
import pandas as pd
import union

image_spec = union.ImageSpec(
    registry="ghcr.io/<my-github-org>",
    name="simple-example-image",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="requirements.txt"
)

@union.task(container_image=image_spec)
def get_pandas_dataframe() -> typing.Tuple[pd.DataFrame, pd.Series]:
    df = pd.read_csv("https://storage.googleapis.com/download.tensorflow.org/data/heart.csv")
    print(df.head())
    return df[["age", "thalach", "trestbps", "chol", "oldpeak"]], df.pop("target")

@union.workflow()
def wf() -> typing.Tuple[pd.DataFrame, pd.Series]:
    return get_pandas_dataframe()
```

## Install and configure `union` and Docker

To install Docker, see [Setting up container image handling](https://www.union.ai/docs/v1/union/user-guide/getting-started/local-setup).
To configure `union` to connect to your Union.ai instance, see [Getting started](../../../getting-started/_index).

## Set up an image registry

You will need an image registry where the container image can be stored and pulled by Union.ai when the task is executed.
You can use any image registry that you have access to, including public registries like Docker Hub or GitHub Container Registry.
Alternatively, you can use a registry that is part of your organization's infrastructure such as AWS Elastic Container Registry (ECR) or Google Artifact Registry (GAR).

The registry that you choose must be one that is accessible to the Union.ai instance where the workflow will be executed.
Additionally, you will need to ensure that the specific image, once pushed to the registry, is itself publicly accessible.

In this example, we use GitHub's `ghcr.io` container registry.
See [Working with the Container registry](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry) for more information.

* For an example using Amazon ECR see [ImageSpec with ECR](./image-spec-with-ecr).
* For an example using Google Artifact Registry see [ImageSpec with GAR](./image-spec-with-gar).
* For an example using Azure Container Registry see [ImageSpec with ACR](./image-spec-with-acr).

## Authenticate to the registry

You will need to set up your local Docker client to authenticate with GHCR. This is needed for `union` CLI to be able to push the image built according to the `ImageSpec` to GHCR.

Follow the directions [Working with the Container registry > Authenticating to the Container registry](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry#authenticating-to-the-container-registry).

## Set up your project and domain on Union.ai

You will need to set up a project on your Union.ai instance to which you can register your workflow.
See [Setting up the project](https://www.union.ai/docs/v1/union/user-guide/development-cycle/setting-up-a-project).

## Understand the requirements

The `requirements.txt` file contains the `union` package and the `pandas` package, both of which are needed by the task.

## Set up a virtual Python environment

Set up a virtual Python environment and install the dependencies defined in the `requirements.txt` file.
Assuming you are in the local project root, run `pip install -r requirements.txt`.

## Run the workflow locally

You can now run the workflow locally.
In the project root directory, run: `union run workflows/imagespec-simple-example.py wf`.
See [Running your code](https://www.union.ai/docs/v1/union/user-guide/development-cycle/running-your-code) for more details.

> [!NOTE]
> When you run the workflow in your local Python environment, the image is not built or pushed (in fact, no container image is used at all).

## Register the workflow

To register the workflow to Union.ai, in the local project root, run:

```shell
$ union register workflows/imagespec-simple-example.py
```

`union` will build the container image and push it to the registry that you specified in the `ImageSpec` object.
It will then register the workflow to Union.ai.

To see the registered workflow, go to the UI and navigate to the project and domain that you created above.

## Ensure that the image is publicly accessible

If you are using the `ghcr.io` image registry, you must switch the visibility of your container image to Public before you can run your workflow on Union.ai.
See [Configuring a package's access control and visibility](https://docs.github.com/en/packages/learn-github-packages/configuring-a-packages-access-control-and-visibility#about-inheritance-of-access-permissions-and-visibility).

## Run the workflow on Union.ai

Assuming your image is publicly accessible, you can now run the workflow on Union.ai by clicking **Launch Workflow**.

> [!WARNING] Make sure your image is accessible
> If you try to run a workflow that uses a private container image or an image that is inaccessible for some other reason, the system will return an error:
>
> ```
> ... Failed to pull image ...
> ... Error: ErrImagePull
> ... Back-off pulling image ...
> ... Error: ImagePullBackOff
> ```

## Multi-image workflows

You can also specify different images per task within the same workflow.
This is particularly useful if some tasks in your workflow have a different set of dependencies where most of the other tasks can use another image.

In this example we specify two tasks: one that uses CPUs and another that uses GPUs.
For the former task, we use the default image that ships with union while for the latter task, we specify a pre-built image that enables distributed training with the Kubeflow Pytorch integration.

```python
import numpy as np
import torch.nn as nn

@task(
    requests=Resources(cpu="2", mem="16Gi"),
    container_image="ghcr.io/flyteorg/flytekit:py3.9-latest",
)
def get_data() -> Tuple[np.ndarray, np.ndarray]:
    ...  # get dataset as numpy ndarrays

@task(
    requests=Resources(cpu="4", gpu="1", mem="16Gi"),
    container_image="ghcr.io/flyteorg/flytecookbook:kfpytorch-latest",
)
def train_model(features: np.ndarray, target: np.ndarray) -> nn.Module:
    ...  # train a model using gpus
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/image-spec-with-ecr ===

# ImageSpec with ECR

In this section we explain how to set up and use AWS Elastic Container Registry (ECR) to build and deploy task container images using `ImageSpec`.

## Prerequisites

If you are using ECR in the same AWS account as your Union.ai data plane, then you do not need to configure anything. Access to ECR in the same account is enabled by default.

If you want to store your task container images in an ECR instance in an AWS account _other than the one that holds your data plane_, then you will have to configure that ECR instance to permit access from your data plane. See [Enable AWS ECR](https://www.union.ai/docs/v1/union/user-guide/deployment/enabling-aws-resources/enabling-aws-ecr) for details.

## Set up the image repository

Unlike GitHub Container Registry, ECR does not allow you to simply push an arbitrarily named image to the registry. Instead, you must first create a repository in the ECR instance and then push the image to that repository.

> [!NOTE] Registry, repository, and image
> In ECR terminology the **registry** is the top-level storage service. The registry holds a collection of **repositories**.
> Each repository corresponds to a named image and holds all versions of that image.
>
> When you push an image to a registry, you are actually pushing it to a repository within that registry.
> Strictly speaking, the term *image* refers to a specific *image version* within that repository.

This means that you have to decide on the name of your image and create a repository by that name first, before registering your workflow. We will assume the following:

* The ECR instance you will be using has the base URL `123456789012.dkr.ecr.us-east-1.amazonaws.com`.

* Your image will be called `simple-example-image`.

In the AWS console, go to **Amazon ECR > Repositories** and find the correct ECR registry

If you are in the same account as your Union.ai data plane you should go directly to the ECR registry that was set up for you by Union.ai. If there are multiple ECR registries present, consult with your Union.ai administrator to find out which one to use.

Under **Create a Repository**, click **Get Started**:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-ecr/create-repository-1.png)

On the **Create repository** page:

* Select **Private** for the repository visibility, assuming you want to make it private. You can, alternatively, select **Public**, but in most cases, the main reason for using ECR is to keep your images private.

* Enter the name of the repository:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-ecr/create-repository-2.png)

and then scroll down to click **Create repository**:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-ecr/create-repository-3.png)

Your repository is now created.

## Authenticate to the registry

You will need to set up your local Docker client to authenticate with ECR. This is needed for `union` to be able to push the image built according to the `ImageSpec` to ECR.

To do this, you will need to [install the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html), use it to run the `aws ecr get-login-password` command to get the appropriate password, then perform a `docker login` with that password.

See [Private registry authentication](https://docs.aws.amazon.com/AmazonECR/latest/userguide/registry_auth.html) for details.

## Register your workflow to Union.ai

You can register tasks with `ImageSpec` declarations that reference this repository.

For example, to use the example repository shown here, we would alter the Python code in the **Core concepts > Tasks > Task software environment > ImageSpec with ECR**, to have the following `ImageSpec` declaration:

```python
image_spec = union.ImageSpec(
    registry="123456789012.dkr.ecr.us-eas-1.amazonaws.com",
    name="simple-example-image",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="image-requirements.txt"
)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/image-spec-with-gar ===

# ImageSpec with GAR

In this section we explain how to set up and use Google Artifact Registry (GAR) to build and deploy task container images using `ImageSpec`.

## Prerequisites

If you are using GAR in the same Google Cloud Platform (GCP) project as your Union.ai data plane, then you do not need to configure anything.
Access to GAR in the same project is enabled by default.

If you want to store your task container images in a GAR repository in a GCP project _other than the one that holds your data plane_, you must enable the node pool of your data plane to access that GAR.
See [Enable Google Artifact Registry](https://www.union.ai/docs/v1/union/user-guide/deployment/enabling-gcp-resources/enabling-google-artifact-registry) for details.

## Set up the image repository

Unlike GitHub Container Registry, GAR does not allow you to simply push an arbitrarily named image to the registry.
Instead, you must first create a repository in the GAR instance and then push the image to that repository.

> [!NOTE] Registry, repository, and image
> In GAR terminology the **registry** is the top-level storage service. The registry holds a collection of **repositories**.
> Each repository in turn holds some number of images, and each specific image name can have different versions.
>
> Note that this differs from the arrangement in AWS ECR where the repository name and image name are essentially the same.
>
> When you push an image to GAR, you are actually pushing it to an image name within a repository within that registry.
> Strictly speaking, the term *image* refers to a specific *image version* within that repository.

This means that you have to decide on the name of your repository and create it, before registering your workflow. You can, however, decide on the image name later, when you push the image to the repository. We will assume the following:

* The GAR instance you will be using has the base URL `us-east1-docker.pkg.dev/my-union-dataplane/my-registry/`.
* Your repository will be called `my-image-repository`.
* Your image will be called `simple-example-image`.

In the GCP console, within your Union.ai data plane project, go to **Artifact Registry**. You should see a list of repositories. The existing ones are used internally by Union.ai. For your own work you should create a new one. Click **Create Repository**:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-gar/gar-create-repository-1.png)

On the **Create repository** page,

* Enter the name of the repository. In this example it would be `my-image-repository`.
* Select **Docker** for the artifact type.

* Select the region. If you want to access the GAR without further configuration, make sure this the same region as your Union.ai data plane.

* Click **Create**:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-gar/gar-create-repository-2.png)

Your GAR repository is now created.

## Authenticate to the registry

You will need to set up your local Docker client to authenticate with GAR. This is needed for `union` to be able to push the image built according to the `ImageSpec` to GAR.

Directions can be found in the GAR console interface. Click on **Setup Instructions**:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-gar/gar-setup-instructions.png)

The directions are also reproduced below. (We show the directions for the `us-east1` region. You may need to adjust the command accordingly):

> [!NOTE] Setup Instructions
> Follow the steps below to configure your client to push and pull packages using this repository.
> You can also [view more detailed instructions here](https://cloud.google.com/artifact-registry/docs/docker/authentication?authuser=1).
> For more information about working with artifacts in this repository, see the [documentation](https://cloud.google.com/artifact-registry/docs/docker?authuser=1).
>
> **Initialize gcloud**
>
> The [Google Cloud SDK](https://cloud.google.com/sdk/docs/?authuser=1) is used to generate an access token when authenticating with Artifact Registry.
> Make sure that it is installed and initialized with [Application Default Credentials](https://cloud.google.com/sdk/gcloud/reference/auth/application-default/login?authuser=1) before proceeding.
>
> **Configure Docker**
>
> Run the following command to configure `gcloud` as the credential helper for the Artifact Registry domain associated with this repository's location:
>
> ```shell
> $ gcloud auth configure-docker us-east1-docker.pkg.dev
> ```

## Register your workflow to Union.ai

You can now register tasks with `ImageSpec` declarations that reference this repository.

For example, to use the example GAR repository shown here, we would alter the Python code in the **Core concepts > Tasks > Task software environment > ImageSpec with GAR**, to have the following `ImageSpec` declaration:

```python
image_spec = union.ImageSpec(
    registry="us-east1-docker.pkg.dev/my-union-dataplane/my-registry/my-image-repository",
    name="simple-example-image",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="image-requirements.txt"
)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/image-spec-with-acr ===

# ImageSpec with ACR

In this section we explain how to use [Azure Container Registry (ACR)](https://azure.microsoft.com/en-us/products/container-registry) to build and deploy task container images using `ImageSpec`.

Before proceeding, make sure that you have [enabled Azure Container Registry](../../../integrations/enabling-azure-resources/enabling-azure-container-registry) for you Union.ai installation.

## Authenticate to the registry

Authenticate with the container registry

```bash
az login
az acr login --name <acrName>
```

Refer to [Individual login with Microsoft Entra ID](https://learn.microsoft.com/en-us/azure/container-registry/container-registry-authentication?tabs=azure-cli#individual-login-with-microsoft-entra-id) in the Azure documentation for additional details.

## Register your workflow to Union.ai

You can now register tasks with `ImageSpec` declarations that reference this repository.

For example, to use an existing ACR repository, we would alter the Python code in the **Core concepts > Tasks > Task software environment > ImageSpec with ACR**, to have the following `ImageSpec` declaration:

```python
image_spec = union.ImageSpec(
    registry="<AZURE_CONTAINER_REGISTRY_NAME>.azurecr.io",
    name="my-repository/simple-example-image",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="image-requirements.txt"
)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/environment-variables ===

# Environment variables

The `environment` parameter lets you specify the values of any variables that you want to be present within the task container execution environment.
For example:

```python
@union.task(environment={"MY_ENV_VAR": "my_value"})
def my_task() -> str:
    return os.environ["MY_ENV_VAR"]
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/viewing-logs ===

# Viewing logs

In the [Execution view](../workflows/viewing-workflow-executions), selecting a task from the list in the **Nodes** tab will open the task details in the right panel.

Within that panel, in the **Execution** tab, under **Logs**, you will find a link labeled **Task Logs**.

![Task logs link](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-logs/task-logs-link.png)

This leads to the **Execution logs tab** of the **Execution details page**:

![Execution logs](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-logs/execution-logs.png)

The execution logs provide a live view into the standard output of the task execution.

For example, any `print` statements in the tasks Python code will be displayed here.

## Kubernetes cluster logs

On the left side of the page you can also see the Kubernetes cluster logs for the task execution:

![Kubernetes cluster logs](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-logs/k8s-cluster-logs.png)

## Other tabs

Alongside the **Execution logs** tab in the **Execution details page**, you will also find the **Execution resources** and **Inputs & Outputs** tabs.

## Cloud provider logs

In addition to the **Task Logs** link, you will also see a link to your cloud provider's logs (**Cloudwatch Logs** for AWS, **Stackdriver Logs** for GCP, and **Azure Logs** for Azure):

![Cloud provider logs link](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-logs/cloud-provider-logs-link.png)

Assuming you are logged into your cloud provider account with the appropriate permissions, this link will take you to the logs specific to the container in which this particular task execution is running.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/reference-tasks ===

# Reference tasks

A reference_task references tasks that have already been defined, serialized, and registered. You can reference tasks from other projects and create workflows that use tasks declared by others. These tasks can be in their own containers, python runtimes, flytekit versions, and even different languages.

> [!NOTE]
> Reference tasks cannot be run locally. To test locally, mock them out.

## Example

1. Create a file called `task.py` and insert this content into it:

    ```python
    import union

    @union.task
    def add_two_numbers(a: int, b: int) -> int:
        return a + b
    ```

2. Register the task:

   ```shell
   $ union register --project flytesnacks --domain development --version v1 task.py
   ```

3. Create a separate file `wf_ref_task.py` and copy the following code into it:

   ```python
   from flytekit import reference_task

   @reference_task(
       project="flytesnacks",
       domain="development",
       name="task.add_two_numbers",
       version="v1",
   )
   def add_two_numbers(a: int, b: int) -> int:
       ...

   @union.workflow
   def wf(a: int, b: int) -> int:
       return add_two_numbers(a, b)
   ```

4. Register the `wf` workflow:

    ```shell
    $ union register --project flytesnacks --domain development wf_ref_task.py
    ```

5. In the Union.ai UI, run the workflow `wf_ref_task.wf`.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment ===

# Task hardware environment

## Customizing task resources

You can customize the hardware environment in which your task code executes.

Depending on your needs, there are two different of ways to define and register tasks with their own custom hardware requirements:

* Configuration in the `@union.task` decorator
* Defining a `PodTemplate`

### Using the `@union.task` decorator

You can specify `requests` and `limits` on:

* CPU number
* GPU number
* Memory size
* Ephemeral storage size

See **Core concepts > Tasks > Task hardware environment > Customizing task resources** for details.

### Using PodTemplate

If your needs are more complex, you can use Kubernetes-level configuration to constrain a task to only run on a specific machine type.

This requires that you coordinate with Union.ai to set up the required machine types and node groups with the appropriate node assignment configuration (node selector labels, node affinities, taints, tolerations, etc.)

In your task definition you then use a `PodTemplate` that uses the matching node assignment configuration to make sure that the task will only be scheduled on the appropriate machine type.

### `pod_template` and `pod_template_name` @union.task parameters

The `pod_template` parameter can be used to supply a custom Kubernetes `PodTemplate` to the task.
This can be used to define details about node selectors, affinity, tolerations, and other Kubernetes-specific settings.

The `pod_template_name` is a related parameter that can be used to specify the name of an already existing `PodTemplate` resource which will be used in this task.

For details see [Configuring task pods with Kubernetes PodTemplates]().
<!-- TODO: Add link to API -->

## Accelerators

If you specify GPUs, you can also specify the type of GPU to be used by setting the `accelerator` parameter.
See **Core concepts > Tasks > Task hardware environment > Accelerators** for more information.

## Task-level monitoring

You can also monitor the hardware resources used by a task.
See **Core concepts > Tasks > Task hardware environment > Task-level monitoring** for details.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources ===

# Customizing task resources

When defining a task function, you can specify resource requirements for the pod that runs the task.
Union.ai will take this into account to ensure that the task pod is scheduled to run on a Kubernetes node that meets the specified resource profile.

Resources are specified in the `@union.task` decorator. Here is an example:

```python
from flytekit.extras.accelerators import A100

@union.task(
    requests=Resources(mem="120Gi", cpu="44", ephemeral_storage="100Gi"),
    limits=Resources(mem="200Gi", cpu="100", gpu="12", ephemeral_storage="200Gi"),
    accelerator=GPUAccelerator("nvidia-tesla-a100")
)
def my_task()
    ...
```

There are three separate resource-related settings:

* `requests`
* `limits`
* `accelerator`

## The `requests` and `limits` settings

The `requests` and `limits` settings each takes a [`Resource`]() object, which itself has five possible attributes:
<!-- TODO: Add link to API -->

* `cpu`: Number of CPU cores (in whole numbers or millicores (`m`)).
* `gpu`: Number of GPU cores (in whole numbers or millicores (`m`)).
* `mem`: Main memory (in `Mi`, `Gi`, etc.).
* `ephemeral_storage`: Ephemeral storage (in `Mi`, `Gi` etc.).

Note that CPU and GPU allocations can be specified either as whole numbers or in millicores (`m`). For example, `cpu="2500m"` means two and a half CPU cores and `gpu="3000m"`, meaning three GPU cores.

The type of ephemeral storage used depends on the node type and configuration you request from the Union.ai team. By default, all nodes will use network-attached storage for ephemeral storage. However, if a node type has attached NVMe SSD storage, you can request that the Union.ai team configure your cluster to use the attached NVMe as ephemeral storage for that node type.

The `requests` setting tells the system that the task requires _at least_ the resources specified and therefore the pod running this task should be scheduled only on a node that meets or exceeds the resource profile specified.

The `limits` setting serves as a hard upper bound on the resource profile of nodes to be scheduled to run the task.
The task will not be scheduled on a node that exceeds the resource profile specified (in any of the specified attributes).

> [!NOTE] GPUs take only `limits`
> GPUs should only be specified in the `limits` section of the task decorator:
>   * You should specify GPU requirements only in `limits`, not in `requests`, because Kubernetes will use the `limits` value as the `requests` value anyway.
>   * You _can_ specify GPU in both `limits` and `requests` but the two values must be equal.
>   * You cannot specify GPU `requests` without specifying `limits`.

## The `accelerator` setting

The `accelerator` setting further specifies the *type* of specialized hardware required for the task.
This can be a GPU, a specific variation of a GPU, a fractional GPU, or a different hardware device, such as a TPU.

See [Accelerators](./accelerators) for more information.

## Execution defaults and resource quotas

The execution defaults and resource quotas can be found on the right sidebar of the Dashboard.
They can be edited by selecting the gear icon:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources/execution-defaults-gear.png)

This will open a dialog:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources/execution-defaults-dialog.png)

> [!NOTE]
> An ephemeral storage default value of zero means that the task pod will consume storage on the node as needed.
> This makes it possible for a pod to get evicted if a node doesn't have enough storage. If your tasks are built to rely on
> ephemeral storage, we recommend being explicit with the ephemeral storage you request to avoid pod eviction.

## Task resource validation

If you attempt to execute a workflow with unsatisfiable resource requests, the execution will fail immediately rather than being allowed to queue forever.

To remedy such a failure, you should make sure that the appropriate node types are:

* Physically available in your cluster, meaning you have arranged with the Union.ai team to include them when [configuring your data plane](https://www.union.ai/docs/v1/union/user-guide/deployment/configuring-your-data-plane).
* Specified in the task decorator (via the `requests`, `limits`, `accelerator`, or other parameters).

Go to the **Resources > Compute** dashboard to find the available node types and their resource profiles.

To make changes to your cluster configuration, go to the [Union.ai Support Portal](https://get.support.union.ai/servicedesk/customer/portal/1/group/6/create/30).

## The `with_overrides` method

When `requests`, `limits`, or `accelerator` are specified in the `@union.task` decorator, they apply every time that a task is invoked from a workflow.
In some cases, you may wish to change the resources specified from one invocation to another.
To do that, use the [`with_overrides` method](https://www.union.ai/docs/v1/union/api-reference/flytekit-sdk/packages/flytekit.core.node) of the task function.

For example:

```python
@union.task
def my_task(ff: FlyteFile):
    ...

@union.workflow
def my_workflow():
    my_task(ff=smallFile)
    my_task(ff=bigFile).with_overrides(requests=Resources(mem="120Gi", cpu="10"))
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/accelerators ===

# Accelerators

> [!NOTE] _Accelerators_ and _Accelerated datasets_ are entirely different things
> An accelerator, in Union.ai, is a specialized hardware device that is used to accelerate the execution of a task.
> [Accelerated datasets](https://www.union.ai/docs/v1/union/user-guide/data-input-output/accelerated-datasets), on the other hand, is a Union.ai feature that enables quick access to large datasets from within a task.
> These concepts are entirely different and should not be confused.

Union.ai allows you to specify [requests and limits](./customizing-task-resources) for the number of GPUs available for a given task.
However, in some cases, you may want to be more specific about the type of GPU or other specialized device to be used.

You can use the `accelerator` parameter to specify specific GPU types, variations of GPU types, fractional GPUs, or other specialized hardware devices such as TPUs.

Your Union.ai installation will come pre-configured with the GPUs and other hardware that you requested during onboarding.
Each device type has a constant name that you can use to specify the device in the `accelerator` parameter.
For example:

```python
from flytekit.extras.accelerators import A100

    @union.task(
        limits=Resources(gpu="1"),
        accelerator=A100,
    )
    def my_task():
        ...

```

## Finding your available accelerators

You can find the accelerators available in your Union.ai installation by going to the **Usage > Compute** dashboard in the UI.
In the **Accelerators** section, you will see a list of available accelerators and the named constants to be used in code to refer to them.

## Requesting the provisioning of accelerators

If you need a specific accelerator that is not available in your Union.ai installation, you can request it by contacting the Union.ai team.
Just click on the **Adjust Configuration** button under **Usage** in the UI (or go [here](https://get.support.union.ai/servicedesk/customer/portal/1/group/6/create/30)).

## Using predefined accelerator constants

There are a number of predefined accelerator constants available in the `flytekit.extras.accelerators` module.

The predefined list is not exhaustive, but it includes the most common accelerators.
If you know the name of the accelerator, but there is no predefined constant for it, you can simply pass the string name to the task decorator directly.

Note that in order for a specific accelerator to be available in your Union.ai installation, it must have been provisioned by the Union.ai team.

If using the constants, you can import them directly from the module, e.g.:

```python
from flytekit.extras.accelerators import T4

@union.task(
    limits=Resources(gpu="1"),
    accelerator=T4,
)
def my_task():
    ...

```

if you want to use a fractional GPU, you can use the `partitioned` method on the accelerator constant, e.g.:

```python
from flytekit.extras.accelerators import A100

@union.task(
    limits=Resources(gpu="1"),
    accelerator=A100.partition_2g_10gb,
)
def my_task():
    ...
```

## List of predefined accelerator constants

* `A10G`: [NVIDIA A10 Tensor Core GPU](https://www.nvidia.com/en-us/data-center/products/a10-gpu/)
* `L4`: [NVIDIA L4 Tensor Core GPU](https://www.nvidia.com/en-us/data-center/l4/)
* `K80`: [NVIDIA Tesla K80 GPU](https://www.nvidia.com/en-gb/data-center/tesla-k80/)
* `M60`: [NVIDIA Tesla M60 GPU](https://www.nvidia.com/content/dam/en-zz/Solutions/design-visualization/solutions/resources/documents1/nvidia-m60-datasheet.pdf)
* `P4`: [NVIDIA Tesla P4 GPU](https://images.nvidia.com/content/pdf/tesla/184457-Tesla-P4-Datasheet-NV-Final-Letter-Web.pdf)
* `P100`: [NVIDIA Tesla P100 GPU](https://www.nvidia.com/en-us/data-center/tesla-p100/)
* `T4`: [NVIDIA T4 Tensor Core GPU](https://www.nvidia.com/en-us/data-center/tesla-t4/)
* `V100` [NVIDIA Tesla V100 GPU](https://www.nvidia.com/en-us/data-center/tesla-v100/)
* `A100`: An entire [NVIDIA A100 GPU](https://www.nvidia.com/en-us/data-center/a100/). Fractional partitions are also available:
    * `A100.partition_1g_5gb`: 5GB partition of an A100 GPU.
    * `A100.partition_2g_10gb`: 10GB partition of an A100 GPU - 2x5GB slices with 2/7th of the SM (streaming multiprocessor).
    * `A100.partition_3g_20gb`: 20GB partition of an A100 GPU - 4x5GB slices, with 3/7th fraction of the SM.
    * `A100.partition_4g_20gb`: 20GB partition of an A100 GPU - 4x5GB slices, with 4/7th fraction of the SM.
    * `A100.partition_7g_40gb`: 40GB partition of an A100 GPU - 8x5GB slices, with 7/7th fraction of the SM.
* `A100_80GB`: An entire [NVIDIA A100 80GB GPU](https://www.nvidia.com/en-us/data-center/a100/). Fractional partitions are also available:
    * `A100_80GB.partition_1g_10gb`: 10GB partition of an A100 80GB GPU - 2x5GB slices with 1/7th of the SM (streaming multiprocessor).
    * `A100_80GB.partition_2g_20gb`: 2GB partition of an A100 80GB GPU - 4x5GB slices with 2/7th of the SM.
    * `A100_80GB.partition_3g_40gb`: 3GB partition of an A100 80GB GPU - 8x5GB slices with 3/7th of the SM.
    * `A100_80GB.partition_4g_40gb`: 4GB partition of an A100 80GB GPU - 8x5GB slices with 4/7th of the SM.
    * `A100_80GB.partition_7g_80gb`: 7GB partition of an A100 80GB GPU - 16x5GB slices with 7/7th of the SM.

For more information on partitioning, see [Partitioned GPUs](https://docs.nvidia.com/datacenter/tesla/mig-user-guide/index.html#partitioning).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/retries-and-timeouts ===

# Retries and timeouts

## Retry types

Union.ai allows you to automatically retry failing tasks. This section explains the configuration and application of retries.

Errors causing task failure are categorized into two main types, influencing the retry logic differently:

* `SYSTEM`: These errors arise from infrastructure-related failures, such as hardware malfunctions or network issues.
  They are typically transient and can often be resolved with a retry.

* `USER`: These errors are due to issues in the user-defined code, like a value error or a logic mistake, which usually require code modifications to resolve.

## Configuring retries

Retries in Union.ai are configurable to address both `USER` and `SYSTEM` errors, allowing for tailored fault tolerance strategies:

`USER` error can be handled by setting the `retries` attribute in the task decorator to define how many times a task should retry.
This requires a `FlyteRecoverableException` to be raised in the task definition, any other exception will not be retried:

```python
import random
from flytekit import task
from flytekit.exceptions.user import FlyteRecoverableException

@task(retries=3)
def compute_mean(data: List[float]) -> float:
    if random() < 0.05:
        raise FlyteRecoverableException("Something bad happened 🔥")
    return sum(data) / len(data)
```

## Retrying interruptible tasks

Tasks marked as interruptible can be preempted and retried without counting against the USER error budget.
This is useful for tasks running on preemptible compute resources like spot instances.

See [Interruptible instances](./interruptible-instances)

## Retrying map tasks

For map tasks, the interruptible behavior aligns with that of regular tasks. The retries field in the task annotation is not necessary for handling SYSTEM errors, as these are managed by the platform’s configuration. Alternatively, the USER budget is set by defining retries in the task decorator.

See [Map tasks](../map-tasks).

## Timeouts

To protect against zombie tasks that hang due to system-level issues, you can supply the timeout argument to the task decorator to make sure that problematic tasks adhere to a maximum runtime.

In this example, we make sure that the task is terminated after it’s been running for more that one hour.

```python
from datetime import timedelta

@task(timeout=timedelta(hours=1))
def compute_mean(data: List[float]) -> float:
    return sum(data) / len(data)
```

Notice that the timeout argument takes a built-in Python `timedelta` object.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/interruptible-instances ===

# Interruptible instances

> [!NOTE]
> In AWS, the term *spot instance* is used.
> In GCP, the equivalent term is *spot vm*.
> Here we use the term *interruptible instance* generically for both providers.

An interruptible instance is a machine instance made available to your cluster by your cloud provider that is not guaranteed to be always available.
As a result, interruptible instances are cheaper than regular instances.
In order to use an interruptible instance for a compute workload you have to be prepared for the possibility that an attempt to run the workload could fail due to lack of available resources and will need to be retried.

When onboarding your organization onto Union.ai, you [specify the configuration of your cluster](https://www.union.ai/docs/v1/union/user-guide/deployment/configuring-your-data-plane).
Among the options available is the choice of whether to use interruptible instances.

For each interruptible instance node group that you specify, an additional on-demand node group (though identical in every other respect to the interruptible one) will also be configured.
This on-demand node group will be used as a fallback when attempts to complete the task on the interruptible instance have failed.

## Configuring tasks to use interruptible instances

To schedule tasks on interruptible instances and retry them if they fail, specify the `interruptible` and `retries` parameters in the `@union.task` decorator.
For example:

```python
@union.task(interruptible=True, retries=3)
```

* A task will only be scheduled on an interruptible instance if it has the parameter `interruptible=True` (or if its workflow has the parameter `interruptible=True` and the task does not have an explicit `interruptible` parameter).
* An interruptible task, like any other task, can have a `retries` parameter.
* If an interruptible task does not have an explicitly set `retries` parameter, then the `retries` value defaults to `1`.
* An interruptible task with `retries=n` will be attempted `n` times on an interruptible instance.
  If it still fails after `n` attempts, the final (`n+1`) retry will be done on the fallback on-demand instance.

## Workflow level interruptible

Interruptible is also available [at the workflow level](../../workflows). If you set it there, it will apply to all tasks in the workflow that do not themselves have an explicit value set. A task-level interruptible setting always overrides whatever the workflow-level setting is.

## Advantages and disadvantages of interruptible instances

The advantage of using interruptible instance for a task is simply that it is less costly than using an on-demand instance (all other parameters being equal).
However, there are two main disadvantages:

1. The task is successfully scheduled on an interruptible instance but is interrupted.
In the worst case scenario, for `retries=n` the task may be interrupted `n` times until, finally, the fallback on-demand instance is used.
Clearly, this may be a problem for time-critical tasks.

2. Interruptible instances of the selected node type may simply be unavailable on the initial attempt to schedule.
When this happens, the task may hang indefinitely until an interruptible instance becomes available.
Note that this is a distinct failure mode from the previous one where an interruptible node is successfully scheduled but is then interrupted.

In general, we recommend that you use interruptible instances whenever available, but only for tasks that are not time-critical.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring ===

# Task-level monitoring

In the [Execution view](../../workflows/viewing-workflow-executions), selecting a task within the list will open the right panel.
In that panel, you will find the **View Utilization** button:

![View Utilization](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/execution-view-right-panel-executions-view-util.png)

Clicking this will take you to the **task-level monitoring** page:

![Task-level monitoring](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring.png)

## Execution Resources

This tab displays details about the resources used by this specific task.
As an example, let's say that the definition of this task in your Python code has the following task decorator:

```python
@union.task(
   requests=Resources(cpu="44", mem="120Gi"),
   limits=Resources(cpu="44", mem="120Gi")
)
```

These parameters are reflected in the displayed **Memory Quota** and **CPU Cores Quota** charts as explained below:

### Memory Quota

![Memory Quota](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring-memory-quota.png)

This chart shows the memory consumption of the task.

* **Limit** refers to the value of the `limits.mem` parameter (the `mem` parameter within the `Resources` object assigned to `limits`)
* **Allocated** refers to the maximum of the value of the `requests.mem` parameter (the `mem` parameter within the `Resources` object assigned to `requests`) the amount of memory actually used by the task.
* **Used** refers to the actual memory used by the task.

This chart displays the ratio of memory used over memory requested, as a percentage.
Since the memory used can sometimes exceed the memory requested, this percentage may exceed 100.

### CPU Cores Quota

![CPU Cores Quota](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring-cpu-cores-quota.png)

This chart displays the number of CPU cores being used.

* **Limit** refers to the value of the `limits.cpu` parameter (the `cpu` parameter within the `Resources` object assigned to `limits`)
* **Allocated** refers to the value of the `requests.cpu` parameter (the `cpu` parameter within the `Resources` object assigned to `requests`)
* **Used** refers to the actual number of CPUs used by the task.

### GPU Memory Utilization

![GPU Memory Utilization](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring-gpu-memory-utilization.png)

This chart displays the amount of GPU memory used for each GPU.

### GPU Utilization

![GPU Utilization](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring-gpu-utilization.png)

This chart displays the GPU core utilization as a percentage of the GPUs allocated (the `requests.gpu` parameter).

## Execution Logs (Preview)

![Execution Logs (Preview)](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring-execution-logs.png)

This tab is a preview feature that displays the `stdout` (the standard output) of the container running the task.
Currently, it only shows content while the task is actually running.

## Map Tasks

When the task you want to monitor is a **map task**, accessing the utilization data is a bit different.
Here is the task execution view of map task. Open the drop-down to reveal each subtask within the map task:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/map-task-1.png)

Drill down by clicking on one of the subtasks:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/map-task-2.png)

This will bring you to the individual subtask information panel, where the **View Utilization** button for the subtask can be found:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/map-task-3.png)

Clicking on View Utilization will take you to the task-level monitoring page for the subtask, which will have the same structure and features as the task-level monitoring page for a standard task (see above).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans ===

# Launch plans

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

A launch plan is a template for a workflow invocation.
It brings together:

* A **Core concepts > Workflows**
* A (possibly partial) set of inputs required to initiate that workflow
* Optionally, **Core concepts > Launch plans > Notifications** and **Core concepts > Launch plans > Schedules**

When invoked, the launch plan starts the workflow, passing the inputs as parameters.
If the launch plan does not contain the entire set of required workflow inputs, additional input arguments must be provided at execution time.

## Default launch plan

Every workflow automatically comes with a *default launch plan*.
This launch plan does not define any default inputs, so they must all be provided at execution time.
A default launch plan always has the same name as its workflow.

## Launch plans are versioned

Like tasks and workflows, launch plans are versioned.
A launch plan can be updated to change, for example, the set of inputs, the schedule, or the notifications.
Each update creates a new version of the launch plan.

## Custom launch plans

Additional launch plans, other than the default one, can be defined for any workflow.
In general, a given workflow can be associated with multiple launch plans, but a given launch plan is always associated with exactly one workflow.

## Viewing launch plans for a workflow

To view the launch plans for a given workflow, in the UI, navigate to the workflow's page and click **Launch Workflow**.
You can choose which launch plan to use to launch the workflow from the **Launch Plan** dropdown menu.
The default launch plan will be selected by default. If you have not defined any custom launch plans for the workflow, only the default plan will be available.
If you have defined one or more custom launch plans, they will be available in the dropdown menu along with the default launch plan.
For more details, see **Core concepts > Launch plans > Running launch plans**.

## Registering a launch plan

### Registering a launch plan on the command line

In most cases, launch plans are defined alongside the workflows and tasks in your project code and registered as a bundle with the other entities using the CLI (see [Running your code](https://www.union.ai/docs/v1/union/user-guide/development-cycle/running-your-code/page.md)).

### Registering a launch plan in Python with `UnionRemote`

As with all Union.ai command line actions, you can also perform registration of launch plans programmatically with [`UnionRemote`](https://www.union.ai/docs/v1/union/user-guide/development-cycle/union-remote), specifically, `UnionRemote.register_launch_plan`.

### Results of registration

When the code above is registered to Union.ai, it results in the creation of four objects:

* The task `workflows.launch_plan_example.my_task`
* The workflow `workflows.launch_plan_example.my_workflow`
* The default launch plan `workflows.launch_plan_example.my_workflow` (notice that it has the same name as the workflow)
* The custom launch plan `my_workflow_custom_lp` (this is the one we defined in the code above)

### Changing a launch plan

Launch plans are changed by altering their definition in code and re-registering.
When a launch plan with the same project, domain, and name as a preexisting one is re-registered, a new version of that launch plan is created.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/defining-launch-plans ===

# Defining launch plans

You can define a launch plan with the [`LaunchPlan` class](https://www.union.ai/docs/v1/union/api-reference/flytekit-sdk/packages/flytekit.core.launch_plan).

This is a simple example of defining a launch plan:

```python
import union

@union.workflow
def my_workflow(a: int, b: str) -> str:
    return f"Result: {a} and {b}"

# Create a default launch plan
default_lp = @union.LaunchPlan.get_or_create(workflow=my_workflow)

# Create a named launch plan
named_lp = @union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="my_custom_launch_plan"
)
```

## Default and Fixed Inputs

Default inputs can be overridden at execution time, while fixed inputs cannot be changed.

```python
import union

# Launch plan with default inputs
lp_with_defaults = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="with_defaults",
    default_inputs={"a": 42, "b": "default_value"}
)

# Launch plan with fixed inputs
lp_with_fixed = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="with_fixed",
    fixed_inputs={"a": 100}  # 'a' will always be 100, only 'b' can be specified
)

# Combining default and fixed inputs
lp_combined = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="combined_inputs",
    default_inputs={"b": "default_string"},
    fixed_inputs={"a": 200}
)
```

## Scheduled Execution

```python
import union
from datetime import timedelta
from flytekit.core.schedule import CronSchedule, FixedRate

# Using a cron schedule (runs at 10:00 AM UTC every Monday)
cron_lp = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="weekly_monday",
    default_inputs={"a": 1, "b": "weekly"},
    schedule=CronSchedule(
        schedule="0 10 * * 1",  # Cron expression: minute hour day-of-month month day-of-week
        kickoff_time_input_arg=None
    )
)

# Using a fixed rate schedule (runs every 6 hours)
fixed_rate_lp = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="every_six_hours",
    default_inputs={"a": 1, "b": "periodic"},
    schedule=FixedRate(
        duration=timedelta(hours=6)
    )
)
```

## Labels and Annotations

Labels and annotations help with organization and can be used for filtering or adding metadata.

```python
import union
from flytekit.models.common import Labels, Annotations

# Adding labels and annotations
lp_with_metadata = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="with_metadata",
    default_inputs={"a": 1, "b": "metadata"},
    labels=Labels({"team": "data-science", "env": "staging"}),
    annotations=Annotations({"description": "Launch plan for testing", "owner": "jane.doe"})
)
```

## Execution Parameters

```python
import union

# Setting max parallelism to limit concurrent task execution
lp_with_parallelism = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="with_parallelism",
    default_inputs={"a": 1, "b": "parallel"},
    max_parallelism=10  # Only 10 task nodes can run concurrently
)

# Disable caching for this launch plan's executions
lp_no_cache = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="no_cache",
    default_inputs={"a": 1, "b": "fresh"},
    overwrite_cache=True  # Always execute fresh, ignoring cached results
)

# Auto-activate on registration
lp_auto_activate = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="auto_active",
    default_inputs={"a": 1, "b": "active"},
    auto_activate=True  # Launch plan will be active immediately after registration
)
```

## Security and Authentication

We can also override the auth role (either an iam role or a kubernetes service account) used to execute a launch plan.

```python
import union
from flytekit.models.common import AuthRole
from flytekit import SecurityContext

# Setting auth role for the launch plan
lp_with_auth = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="with_auth",
    default_inputs={"a": 1, "b": "secure"},
    auth_role=AuthRole(
        assumable_iam_role="arn:aws:iam::12345678:role/my-execution-role"
    )
)

# Setting security context
lp_with_security = union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="with_security",
    default_inputs={"a": 1, "b": "context"},
    security_context=SecurityContext(
        run_as=SecurityContext.K8sServiceAccount(name="my-service-account")
    )
)
```

## Raw Output Data Configuration

```python
from flytekit.models.common import RawOutputDataConfig

# Configure where large outputs should be stored
lp_with_output_config = LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="with_output_config",
    default_inputs={"a": 1, "b": "output"},
    raw_output_data_config=RawOutputDataConfig(
        output_location_prefix="s3://my-bucket/workflow-outputs/"
    )
)
```

## Putting It All Together

A pretty comprehensive example follows below. This custom launch plan has d

```python
comprehensive_lp = LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="comprehensive_example",
    default_inputs={"b": "configurable"},
    fixed_inputs={"a": 42},
    schedule=CronSchedule(schedule="0 9 * * *"),  # Daily at 9 AM UTC
    notifications=[
        Notification(
            phases=["SUCCEEDED", "FAILED"],
            email=EmailNotification(recipients_email=["team@example.com"])
        )
    ],
    labels=Labels({"env": "production", "team": "data"}),
    annotations=Annotations({"description": "Daily data processing"}),
    max_parallelism=20,
    overwrite_cache=False,
    auto_activate=True,
    auth_role=AuthRole(assumable_iam_role="arn:aws:iam::12345678:role/workflow-role"),
    raw_output_data_config=RawOutputDataConfig(
        output_location_prefix="s3://results-bucket/daily-run/"
    )
)
```

These examples demonstrate the flexibility of Launch Plans in Flyte, allowing you to customize execution parameters, inputs, schedules, and more to suit your workflow requirements.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/viewing-launch-plans ===

# Viewing launch plans

## Viewing launch plans in the UI

Select **Launch Plans** in the sidebar to display a list of all the registered launch plans in the project and domain:

![Launch plans list](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/launch-plans/viewing-launch-plans/launch-plans-list.png)

You can search the launch plans by name and filter for only those that are archived.

The columns in the launch plans table are defined as follows:

* **Name**: The name of the launch plan. Click to inspect a specific launch plan in detail.
* **Triggers**:
  * If the launch plan is active, a green **Active** badge is shown. When a launch plan is active, any attached schedule will be in effect and the launch plan will be invoked according to that schedule.
  * Shows whether the launch plan has a [trigger](./reactive-workflows). To filter for only those launch plans with a trigger, check the **Has Triggers** box in the top right.
* **Last Execution**: The last execution timestamp of this launch plan, irrespective of how the last execution was invoked (by schedule, by trigger, or manually).
* **Last 10 Executions**: A visual representation of the last 10 executions of this launch plan, irrespective of how these executions were invoked (by schedule, by trigger, or manually).

Select an entry on the list to go to that specific launch plan:

![Launch plan view](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/launch-plans/viewing-launch-plans/launch-plan-view.png)

Here you can see:
* **Launch Plan Detail (Latest Version)**:
  * **Expected Inputs**: The input and output types for the launch plan.
  * **Fixed Inputs**: If the launch plan includes predefined input values, they are shown here.
* **Launch Plan Versions**: A list of all versions of this launch plan.
* **All executions in the Launch Plan**: A list of all executions of this launch plan.

In the top right you can see if this launch plan is active (and if it is, which version, specifically, is active). There is also a control for changing the active version or deactivating the launch plan entirely.
See [Activating and deactivating](./activating-and-deactivating) for more details.

## Viewing launch plans on the command line with `uctl`

To view all launch plans within a project and domain:

```shell
$ uctl get launchplans \
       --project <project-id> \
       --domain <domain>
```

To view a specific launch plan:

```shell
$ uctl get launchplan \
       --project <project-id> \
       --domain <domain> \
       <launch-plan-name>
```

See the [Uctl CLI](https://www.union.ai/docs/v1/union/api-reference/uctl-cli) for more details.

## Viewing launch plans in Python with `UnionRemote`

Use the method `UnionRemote.client.list_launch_plans_paginated` to get the list of launch plans.

<!-- TODO need to add and link to full UnionRemote documentation to Union docs -- current UnionRemote page does not document all launch plan methods. -->

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/notifications ===

# Notifications

A launch plan may be associated with one or more notifications, which are triggered when the launch plan's workflow is completed.

There are three types of notifications:
* `Email`: Sends an email to the specified recipients.
* `PagerDuty`: Sends a PagerDuty notification to the PagerDuty service (with recipients specified).
  PagerDuty then forwards the notification as per your PagerDuty configuration.
* `Slack`: Sends a Slack notification to the email address of a specified channel. This requires that you configure your Slack account to accept notifications.

Separate notifications can be sent depending on the specific end state of the workflow. The options are:
* `WorkflowExecutionPhase.ABORTED`
* `WorkflowExecutionPhase.FAILED`
* `WorkflowExecutionPhase.SUCCEEDED`
* `WorkflowExecutionPhase.TIMED_OUT`

For example:

```python
from datetime import datetime

import union

from flytekit import (
    WorkflowExecutionPhase,
    Email,
    PagerDuty,
    Slack
)

@union.task
def add_numbers(a: int, b: int, c: int) -> int:
    return a + b + c

@union.task
def generate_message(s: int, kickoff_time: datetime) -> str:
    return f"sum: {s} at {kickoff_time}"

@union.workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime) -> str:
    return generate_message(
        add_numbers(a, b, c),
        kickoff_time,
    )

union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="my_workflow_custom_lp",
    fixed_inputs={"a": 3},
    default_inputs={"b": 4, "c": 5},
    notifications=[
        Email(
            phases=[WorkflowExecutionPhase.FAILED],
            recipients_email=["me@example.com", "you@example.com"],
        ),
        PagerDuty(
            phases=[WorkflowExecutionPhase.SUCCEEDED],
            recipients_email=["myboss@example.com"],
        ),
        Slack(
            phases=[
                WorkflowExecutionPhase.SUCCEEDED,
                WorkflowExecutionPhase.ABORTED,
                WorkflowExecutionPhase.TIMED_OUT,
            ],
            recipients_email=["your_slack_channel_email"],
        ),
    ],
)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/schedules ===

# Schedules

Launch plans let you schedule the invocation of your workflows.
A launch plan can be associated with one or more schedules, where at most one schedule is active at any one time.
If a schedule is activated on a launch plan, the workflow will be invoked automatically by the system at the scheduled time with the inputs provided by the launch plan. Schedules can be either fixed-rate or `cron`-based.

To set up a schedule, you can use the `schedule` parameter of the `LaunchPlan.get_or_create()` method.

## Fixed-rate schedules

In the following example we add a [FixedRate](https://www.union.ai/docs/v1/union/api-reference/flytekit-sdk/packages/flytekit.core.schedule) that will invoke the workflow every 10 minutes.

```python
from datetime import timedelta

import union
from flytekit import FixedRate

@union.task
def my_task(a: int, b: int, c: int) -> int:
    return a + b + c

@union.workflow
def my_workflow(a: int, b: int, c: int) -> int:
    return my_task(a=a, b=b, c=c)

union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="my_workflow_custom_lp",
    fixed_inputs={"a": 3},
    default_inputs={"b": 4, "c": 5},
    schedule=FixedRate(
        duration=timedelta(minutes=10)
    )
)
```
Above, we defined the duration of the `FixedRate` schedule using `minutes`.
Fixed rate schedules can also be defined using `days` or `hours`.

## Cron schedules

A [`CronSchedule`](https://www.union.ai/docs/v1/union/api-reference/flytekit-sdk/packages/flytekit.core.schedule) allows you to specify a schedule using a `cron` expression:

```python
import union
from flytekit import CronSchedule

@union.task
def my_task(a: int, b: int, c: int) -> int:
    return a + b + c

@union.workflow
def my_workflow(a: int, b: int, c: int) -> int:
    return my_task(a=a, b=b, c=c)

union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="my_workflow_custom_lp",
    fixed_inputs={"a": 3},
    default_inputs={"b": 4, "c": 5},
    schedule=CronSchedule(
        schedule="*/10 * * * *"
    )
)
```

### Cron expression format

A `cron` expression is a string that defines a schedule using five space-separated fields, each representing a time unit.
The format of the string is:

```
minute hour day-of-month month day-of-week
```

Each field can contain values and special characters.
The fields are defined as follows:

| Field          | Values              | Special characters |
|----------------|---------------------|--------------------|
| `minute`       | `0-59`              | `* / , -`          |
| `hour`         | `0-23`              | `* / , -`          |
| `day-of-month` | `1-31`              | `* / , - ?`        |
| `month`        | `1-12` or `JAN-DEC` | `* / , -`          |
| `day-of-week`  | `0-6` or` SUN-SAT`  | `* / , - ?`        |

* The `month` and `day-of-week` abbreviations are not case-sensitive.

* The `,` (comma) is used to specify multiple values.
For example, in the `month` field, `JAN,FEB,MAR` means every January, February, and March.

* The `-` (dash) specifies a range of values.
For example, in the `day-of-month` field, `1-15` means every day from `1` through `15` of the specified month.

* The `*` (asterisk) specifies all values of the field.
For example, in the `hour` field, `*` means every hour (on the hour), from `0` to `23`.
You cannot use `*` in both the `day-of-month` and `day-of-week` fields in the same `cron` expression.
If you use it in one, you must use `?` in the other.

* The `/` (slash) specifies increments.
For example, in the `minute` field, `1/10` means every tenth minute, starting from the first minute of the hour (that is, the 11th, 21st, and 31st minute, and so on).

* The `?` (question mark) specifies any value of the field.
For example, in the `day-of-month` field you could enter `7` and, if any day of the week was acceptable, you would enter `?` in the `day-of-week` field.

### Cron expression examples

| Expression         | Description                               |
|--------------------|-------------------------------------------|
| `0 0 * * *`        | Midnight every day.                       |
| `0 12 * * MON-FRI` | Noon every weekday.                       |
| `0 0 1 * *`        | Midnight on the first day of every month. |
| `0 0 * JAN,JUL *`  | Midnight every day in January and July.   |
| `*/5 * * * *`      | Every five minutes.                       |
| `30 2 * * 1`       | At 2:30 AM every Monday.                  |
| `0 0 15 * ?`       | Midnight on the 15th of every month.      |

### Cron aliases

The following aliases are also available.
An alias is used in place of an entire `cron` expression.

| Alias      | Description                                                      | Equivalent to   |
|------------|------------------------------------------------------------------|-----------------|
| `@yearly`  | Once a year at midnight at the start of 1 January.               | `0 0 1 1 *`     |
| `@monthly` | Once a month at midnight at the start of first day of the month. | `0 0 1 * *`     |
| `@weekly`  | Once a week at midnight at the start of Sunday.                  | `0 0 * * 0`     |
| `@daily`   | Once a day at midnight.                                          | `0 0 * * *`     |
| `@hourly`  | Once an hour at the beginning of the hour.                       | `0 * * * *`     |

## kickoff_time_input_arg

Both `FixedRate` and `CronSchedule` can take an optional parameter called `kickoff_time_input_arg`

This parameter is used to specify the name of a workflow input argument.
Each time the system invokes the workflow via this schedule, the time of the invocation will be passed to the workflow through the specified parameter.
For example:

```python
from datetime import datetime, timedelta

import union
from flytekit import FixedRate

@union.task
def my_task(a: int, b: int, c: int) -> int:
    return a + b + c

@union.workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime ) -> str:
    return f"sum: {my_task(a=a, b=b, c=c)} at {kickoff_time}"

union.LaunchPlan.get_or_create(
    workflow=my_workflow,
    name="my_workflow_custom_lp",
    fixed_inputs={"a": 3},
    default_inputs={"b": 4, "c": 5},
    schedule=FixedRate(
        duration=timedelta(minutes=10),
        kickoff_time_input_arg="kickoff_time"
    )
)
```

Here, each time the schedule calls `my_workflow`, the invocation time is passed in the `kickoff_time` argument.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/activating-and-deactivating ===

# Activating and deactivating

You can set an active/inactive status on launch plans. Specifically:

* Among the versions of a given launch plan (as defined by name), at most one can be set to active.
  All others are inactive.

* If a launch plan version that has a schedule attached is activated, then its schedule also becomes active and its workflow will be invoked automatically according to that schedule.

* When a launch plan version with a schedule is inactive, its schedule is inactive and will not be used to invoke its workflow.

Launch plans that do not have schedules attached can also have an active version.
For such non-scheduled launch plans, this status serves as a flag that can be used to distinguish one version from among the others.
It can, for example, be used by management logic to determine which version of a launch plan to use for new invocations.

Upon registration of a new launch plan, the first version is automatically inactive.
If it has a schedule attached, the schedule is also inactive.
Once activated, a launch plan version remains active even as new, later, versions are registered.

A launch plan version with a schedule attached can be activated through either the UI, `uctl`, or [`UnionRemote`](https://www.union.ai/docs/v1/union/user-guide/user-guide/development-cycle/union-remote).

## Activating and deactivating a launch plan in the UI

To activate a launch plan, go to the launch plan view and click **Add active launch plan** in the top right corner of the screen:

![Activate schedule](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/launch-plans/activating-and-deactivating/add-active-launch-plan.png)

A modal will appear that lets you select which launch plan version to activate:

![Activate schedule](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/launch-plans/activating-and-deactivating/update-active-launch-plan-dialog.png)

This modal will contain all versions of the launch plan that have an attached schedule.
Note that at most one version (and therefore at most one schedule) of a launch plan can be active at any given time.

Selecting the launch plan version and clicking **Update** activates the launch plan version and schedule.
The launch plan version and schedule are now activated. The launch plan will be triggered according to the schedule going forward.

> [!WARNING]
> Non-scheduled launch plans cannot be activated via the UI.
> The UI does not support activating launch plans that do not have schedules attached.
> You can activate them with `uctl` or `UnionRemote`.

To deactivate a launch plan, navigate to a launch plan with an active schedule, click the **...** icon in the top-right corner of the screen beside **Active launch plan**, and click “Deactivate”.

![Deactivate schedule](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/launch-plans/activating-and-deactivating/deactivate-launch-plan.png)

A confirmation modal will appear, allowing you to deactivate the launch plan and its schedule.

> [!WARNING]
> Non-scheduled launch plans cannot be deactivated via the UI.
> The UI does not support deactivating launch plans that do not have schedules attached.
> You can deactivate them with `uctl` or `UnionRemote`.

## Activating and deactivating a launch plan on the command line with `uctl`

To activate a launch plan version with `uctl`, execute the following command:

```shell
$ uctl update launchplan \
       --activate \
       --project <project-id> \
       --domain <domain> \
       <launch-plan-name> \
       --version <launch-plan-version>
```

To deactivate a launch plan version with `uctl`, execute the following command:

```shell
$ uctl update launchplan \
       --deactivate \
       --project <project-id> \
       --domain <domain> \
       <launch-plan-name> \
       --version <launch-plan-version>
```

See [Uctl CLI](https://www.union.ai/docs/v1/union/api-reference/uctl-cli) for more details.

## Activating and deactivating a launch plan in Python with `UnionRemote`

To activate a launch plan using version `UnionRemote`:

```python
from union.remote import UnionRemote
from flytekit.configuration import Config

remote = UnionRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(ame=<launch-plan-name>, version=<launch-plan-version>).id
remote.client.update_launch_plan(launch_plan.id, "ACTIVE")
```

To deactivate a launch plan version using `UnionRemote`:

```python
from union.remote import UnionRemote
from flytekit.remote import Config

remote = UnionRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(ame=<launch-plan-name>, version=<launch-plan-version>)
remote.client.update_launch_plan(launch_plan.id, "INACTIVE")
```

<!-- TODO need to add and link to full UnionRemote documentation to Union docs -- current UnionRemote page does not document all launch plan methods. -->

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/running-launch-plans ===

# Running launch plans

## Running a launch plan in the UI

To invoke a launch plan, go to the **Workflows** list, select the desired workflow, click **Launch Workflow**. In the new execution dialog, select the desired launch plan from the **Launch Plan** dropdown menu and click **Launch**.

## Running a launch plan on the command line with `uctl`

To invoke a launch plan via the command line, first generate the execution spec file for the launch plan:

```shell
$ uctl get launchplan \
       --project <project-id>
       --domain <domain> \
       <launch-plan-name> \
       --execFile <execution-spec-file-name>.yaml
```

Then you can execute the launch plan with the following command:

```shell
$ uctl create execution \
       --project <project-id> \
       --domain <domain> \
       --execFile <execution-spec-file-name>.yaml
```

See [Uctl CLI](https://www.union.ai/docs/v1/union/api-reference/uctl-cli) for more details.

## Running a launch plan in Python with `UnionRemote`

The following code executes a launch plan using `UnionRemote`:

```python
import union
from flytekit.remote import Config

remote = union.UnionRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(name=<launch-plan-name>, version=<launch-plan-version>)
remote.execute(launch_plan, inputs=<inputs>)
```

See the [UnionRemote](../../development-cycle/union-remote) for more details.

## Sub-launch plans

The above invocation examples assume you want to run your launch plan as a top-level entity within your project.
However, you can also invoke a launch plan from *within a workflow*, creating a *sub-launch plan*.
This causes the invoked launch plan to kick off its workflow, passing any parameters specified to that workflow.

This differs from the case of [subworkflows](../workflows/subworkflows-and-sub-launch-plans) where you invoke one workflow function from within another.
A subworkflow becomes part of the execution graph of the parent workflow and shares the same execution ID and context.
On the other hand, when a sub-launch plan is invoked a full, top-level workflow is kicked off with its own execution ID and context.

See [Subworkflows and sub-launch plans](../workflows/subworkflows-and-sub-launch-plans) for more details.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/reference-launch-plans ===

# Reference launch plans

A reference launch plan references previously defined, serialized, and registered launch plans. You can reference launch plans from other projects and create workflows that use launch plans declared by others.

When you create a reference launch plan, be sure to verify that the workflow interface corresponds to that of the referenced workflow.

> [!NOTE]
> Reference launch plans cannot be run locally. To test locally, mock them out.

## Example

<!-- TODO: Remove the mention of Flytesnacks below -->
In this example, we create a reference launch plan for the [`simple_wf`](https://github.com/flyteorg/flytesnacks/blob/master/examples/basics/basics/workflow.py#L25) workflow from the [Flytesnacks repository](https://github.com/flyteorg/flytesnacks).

1. Clone the Flytesnacks repository:

    ```shell
    $ git clone git@github.com:flyteorg/flytesnacks.git
    ```

2. Navigate to the `basics` directory:

    ```shell
    $ cd flytesnacks/examples/basics
    ```

3. Register the `simple_wf` workflow:

    ```shell
    $ union register --project flytesnacks --domain development --version v1 basics/workflow.py.
    ```

4. Create a file called `simple_wf_ref_lp.py` and copy the following code into it:

    ```python
    import union
    from flytekit import reference_launch_plan

    @reference_launch_plan(
        project="flytesnacks",
        domain="development",
        name="basics.workflow.simple_wf",
        version="v1",
    )

    def simple_wf_lp(
        x: list[int], y: list[int]
    ) -> float:
        return 1.0

    @union.workflow
    def run_simple_wf() -> float:
        x = [-8, 2, 4]
        y = [-2, 4, 7]
        return simple_wf_lp(x=x, y=y)
    ```

5. Register the `run_simple_wf` workflow:

    ```shell
    $ union register simple_wf_ref_lp.py
    ```

6. In the Union.ai UI, run the workflow `run_simple_wf`.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/mapping-over-launch-plans ===

# Mapping over launch plans

You can map over launch plans the same way you can [map over tasks](../tasks/task-types#map-tasks) to execute workflows in parallel across a series of inputs.

You can either map over a `LaunchPlan` object defined in one of your Python modules or a [reference launch plan](./reference-launch-plans) that points to a previously registered launch plan.

## Launch plan defined in your code

Here we define a workflow called `interest_workflow` that we want to parallelize, along with a launch plan called `interest_workflow_lp`, in a file we'll call `map_interest_wf.py`.
We then write a separate workflow, `map_interest_wf`, that uses a `map` to parallelize `interest_workflow` over a list of inputs.

```python
import union

# Task to calculate monthly interest payment on a loan
@union.task
def calculate_interest(principal: int, rate: float, time: int) -> float:
    return (principal * rate * time) / 12

# Workflow using the calculate_interest task
@union.workflow
def interest_workflow(principal: int, rate: float, time: int) -> float:
    return calculate_interest(principal=principal, rate=rate, time=time)

# Create LaunchPlan for interest_workflow
lp = union.LaunchPlan.get_or_create(
    workflow=interest_workflow,
    name="interest_workflow_lp",
)

# Mapping over the launch plan to calculate interest for multiple loans
@union.workflow
def map_interest_wf() -> list[float]:
    principal = [1000, 5000, 10000]
    rate = [0.05, 0.04, 0.03]  # Different interest rates for each loan
    time = [12, 24, 36]        # Loan periods in months
    return union.map(lp)(principal=principal, rate=rate, time=time)

# Mapping over the launch plan to calculate interest for multiple loans while fixing an input
@union.workflow
def map_interest_fixed_principal_wf() -> list[float]:
    rate = [0.05, 0.04, 0.03]  # Different interest rates for each loan
    time = [12, 24, 36]        # Loan periods in months
    # Note: principal is set to 1000 for all the calculations
    return union.map(lp, bound_inputs={'principal':1000})(rate=rate, time=time)
```

You can run the `map_interest` workflow locally:

```shell
$ union run map_interest_wf.py map_interest_wf
```

You can also run the `map_interest` workflow remotely on Union.ai:

```shell
$ union run --remote map_interest_wf.py map_interest_wf
```

<!-- TODO: Remove up the mention of Flytesnacks below -->
## Previously registered launch plan

To demonstrate the ability to map over previously registered launch plans, in this example, we map over the [`simple_wf`](https://github.com/flyteorg/flytesnacks/blob/master/examples/basics/basics/workflow.py#L25) launch plan from the basic workflow example in the [Flytesnacks repository](https://github.com/flyteorg/flytesnacks).

Recall that when a workflow is registered, an associated launch plan is created automatically. One of these launch plans will be leveraged in this example, though custom launch plans can also be used.

1. Clone the Flytesnacks repository:

    ```shell
    $ git clone git@github.com:flyteorg/flytesnacks.git
    ```

2. Navigate to the `basics` directory:

    ```shell
    $ cd flytesnacks/examples/basics
    ```

3. Register the `simple_wf` workflow:

    ```shell
    $ union register --project flytesnacks --domain development --version v1 basics/workflow.py
    ```

    Note that the `simple_wf` workflow is defined as follows:

    ```python
    @union.workflow
    def simple_wf(x: list[int], y: list[int]) -> float:
        slope_value = slope(x=x, y=y)
        intercept_value = intercept(x=x, y=y, slope=slope_value)
        return intercept_value
    ```

4. Create a file called `map_simple_wf.py` and copy the following code into it:

    ```python
    import union
    from flytekit import reference_launch_plan

    @reference_launch_plan(
        project="flytesnacks",
        domain="development",
        name="basics.workflow.simple_wf",
        version="v1",
    )
    def simple_wf_lp(
        x: list[int], y: list[int]
    ) -> float:
        pass

    @union.workflow
    def map_simple_wf() -> list[float]:
        x = [[-3, 0, 3], [-8, 2, 4], [7, 3, 1]]
        y = [[7, 4, -2], [-2, 4, 7], [3, 6, 4]]
        return union.map(simple_wf_lp)(x=x, y=y)

    ```

    Note the fact that the reference launch plan has an interface that corresponds exactly to the registered `simple_wf` we wish to map over.

5. Register the `map_simple_wf` workflow. Reference launch plans cannot be run locally, so we will register the `map_simple_wf` workflow to Union.ai and run it remotely.

    ```shell
    $ union register map_simple_wf.py
    ```

6. In the Union.ai UI, run the `map_simple_wf` workflow.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/reactive-workflows ===

# Reactive workflows

Reactive workflows leverage [artifacts](../artifacts) as the medium of exchange between workflows, such that when an upstream workflow emits an artifact, an artifact-driven trigger in a downstream workflow passes the artifact to a new downstream workflow execution.

A trigger is a rule defined in a launch plan that specifies that when a certain event occurs -- for instance, a new version of a particular artifact is materialized -- a particular launch plan will be executed. Triggers allow downstream data consumers, such as machine learning engineers, to automate their workflows to react to the output of upstream data producers, such as data engineers, while maintaining separation of concerns and eliminating the need for staggered schedules and manual executions.

Updating any trigger associated with a launch plan will create a new version of the launch plan, similar to how schedules are handled today. This means that multiple launch plans, each with different triggers, can be created to act on the same underlying workflow. Launch plans with triggers must be activated in order for the trigger to work.

> [!NOTE]
> Currently, there are only artifact event-based triggers, but in the future, triggers will be expanded to include other event-based workflow triggering mechanisms.

## Scope

Since a trigger is part of a launch plan, it is scoped as follows:
* Project
* Domain
* Launch plan name
* Launch plan version

## Trigger types

### Artifact events

An artifact event definition contains the following:
* Exactly one artifact that will activate the trigger when a new version of the artifact is created
* A workflow that is the target of the trigger
* (Optionally) Inputs to the workflow that will be executed by the trigger. It is possible to pass information from the source artifact, the source artifact itself, and other artifacts to the workflow that will be triggered.

For more information, see [Connecting workflows with artifact event triggers](../artifacts/connecting-workflows-with-artifact-event-triggers).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/concurrency-control ===

# Concurrency control

Concurrency control allows you to limit the number of concurrently running workflow executions for a specific launch plan, identified by its unique `project`, `domain`, and `name`.
This control is applied across all versions of that launch plan.

> [!NOTE]
> To clone and run the example code on this page, see the [Flytesnacks repo](https://github.com/flyteorg/flytesnacks/tree/master/examples/productionizing/).

## How it works

When a new execution for a launch plan with a `ConcurrencyPolicy` is requested, Flyte performs a check to count the number of currently active executions for that same launch plan (`project/domain/name`), irrespective of their versions.

This check is done using a database query that joins the `executions` table with the `launch_plans` table.
It filters for executions that are in an active phase (e.g., `QUEUED`, `RUNNING`, `ABORTING`, etc.) and belong to the launch plan name being triggered.

If the number of active executions is already at or above the `max_concurrency` limit defined in the policy of the launch plan version being triggered, the new execution will be handled according to the specified `behavior`.

## Basic usage

Here's an example of how to define a launch plan with concurrency control:

```python
from flytekit import ConcurrencyPolicy, ConcurrencyLimitBehavior, LaunchPlan, workflow

@workflow
def my_workflow() -> str:
    return "Hello, World!"

# Create a launch plan with concurrency control
concurrency_limited_lp = LaunchPlan.get_or_create(
    name="my_concurrent_lp",
    workflow=my_workflow,
    concurrency=ConcurrencyPolicy(
        max_concurrency=3,
        behavior=ConcurrencyLimitBehavior.SKIP,
    ),
)
```

## Scheduled workflows with concurrency control

Concurrency control is particularly useful for scheduled workflows to prevent overlapping executions:

```python
from flytekit import ConcurrencyPolicy, ConcurrencyLimitBehavior, CronSchedule, LaunchPlan, workflow

@workflow
def scheduled_workflow() -> str:
    # This workflow might take a long time to complete
    return "Processing complete"

# Create a scheduled launch plan with concurrency control
scheduled_lp = LaunchPlan.get_or_create(
    name="my_scheduled_concurrent_lp",
    workflow=scheduled_workflow,
    concurrency=ConcurrencyPolicy(
        max_concurrency=1,  # Only allow one execution at a time
        behavior=ConcurrencyLimitBehavior.SKIP,
    ),
    schedule=CronSchedule(schedule="*/5 * * * *"),  # Runs every 5 minutes
)
```

## Defining the policy

A `ConcurrencyPolicy` is defined with two main parameters:

- `max_concurrency` (integer): The maximum number of workflows that can be running concurrently for this launch plan name.
- `behavior` (enum): What to do when the `max_concurrency` limit is reached. Currently, only `SKIP` is supported, which means new executions will not be created if the limit is hit.

```python
from flytekit import ConcurrencyPolicy, ConcurrencyLimitBehavior

policy = ConcurrencyPolicy(
    max_concurrency=5,
    behavior=ConcurrencyLimitBehavior.SKIP
)
```

## Key behaviors and considerations

### Version-agnostic check, version-specific enforcement

The concurrency check counts all active workflow executions of a given launch plan (`project/domain/name`).
However, the enforcement (i.e., the `max_concurrency` limit and `behavior`) is based on the `ConcurrencyPolicy` defined in the specific version of the launch plan you are trying to launch.

**Example scenario:**

1. Launch plan `MyLP` version `v1` has a `ConcurrencyPolicy` with `max_concurrency = 3`.
2. Three executions of `MyLP` (they could be `v1` or any other version) are currently running.
3. You try to launch `MyLP` version `v2`, which has a `ConcurrencyPolicy` with `max_concurrency = 10`.
   - **Result**: This `v2` execution will launch successfully because its own limit (10) is not breached by the current 3 active executions.
4. Now, with 4 total active executions (3 original + the new `v2`), you try to launch `MyLP` version `v1` again.
   - **Result**: This `v1` execution will **fail**. The check sees 4 active executions, and `v1`'s policy only allows a maximum of 3.

### Concurrency limit on manual trigger

Upon manual trigger of an execution (via `pyflyte` for example) which would breach the concurrency limit, you should see this error in the console:

```bash
_InactiveRpcError:
    <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.RESOURCE_EXHAUSTED
        details = "Concurrency limit (1) reached for launch plan my_workflow_lp. Skipping execution."
    >
```

### Scheduled execution behavior

When the scheduler attempts to trigger an execution and the concurrency limit is met, the creation will fail and the error message from FlyteAdmin will be logged in FlyteScheduler logs.
**This will be transparent to the user. A skipped execution will not appear as skipped in the UI or project execution page**.

## Limitations

### "At most" enforcement

While the system aims to respect `max_concurrency`, it acts as an "at most" limit.
Due to the nature of scheduling, workflow execution durations, and the timing of the concurrency check (at launch time), there might be periods where the number of active executions is below `max_concurrency` even if the system could theoretically run more.

For example, if `max_concurrency` is 5 and all 5 workflows finish before the next scheduled check/trigger, the count will drop.
The system prevents exceeding the limit but doesn't actively try to always maintain `max_concurrency` running instances.

### Notifications for skipped executions

Currently, there is no built-in notification system for skipped executions.
When a scheduled execution is skipped due to concurrency limits, it will be logged in FlyteScheduler but no user notification will be sent.
This is an area for future enhancement.

## Best practices

1. **Use with scheduled workflows**: Concurrency control is most beneficial for scheduled workflows that might take longer than the schedule interval to complete.
2. **Set appropriate limits**: Consider your system resources and the resource requirements of your workflows when setting `max_concurrency`.
3. **Monitor skipped executions**: Regularly check FlyteAdmin logs to monitor if executions are being skipped due to concurrency limits.
4. **Version management**: Be aware that different versions of the same launch plan can have different concurrency policies, but the check is performed across all versions.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/actors ===

# Actors

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

Actors allow you to reuse a container and environment between tasks, avoiding the cost of starting a new container for each task. This can be useful when you have a task that requires a lot of setup or has a long startup time.

To create an actor, instantiate the [`ActorEnvironment`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.actor/page.md) class, then add the instance as a decorator to the task that requires that environment.

### `ActorEnvironment` parameters

* **container_image:** The container image to use for the task. This container must have the `union` python package installed, so this must be updated from the default (i.e. `cr.flyte.org/flyteorg/flytekit:py3.11-latest`).

* **environment:** Environment variables as key, value pairs in a Python dictionary.
* **limits:** Compute resource limits.
* **replica_count:** The number of workers to provision that are able to accept tasks.
* **requests:** Compute resource requests per task.
* **secret_requests:** Keys (ideally descriptive) that can identify the secrets supplied at runtime. For more information, see [Managing secrets](https://www.union.ai/docs/v1/union/user-guide/development-cycle/managing-secrets/page.md).
* **ttl_seconds:** How long to keep the Actor alive while no tasks are being run.

The following example shows how to create a basic `ActorEnvironment` and use it for one task:

```python
# hello_world.py

import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    requests=union.Resources(
        cpu="2",
        mem="300Mi",
    ),
    container_image=image,
)

@actor.task
def say_hello() -> str:
    return "hello"

@union.workflow
def wf():
    say_hello()
```

You can learn more about the trade-offs between actors and regular tasks, as well as the efficiency gains you can expect **Core concepts > Actors > Actors and regular tasks**.

## Caching on Actor Replicas

The `@actor_cache` decorator provides a powerful mechanism to cache the results of Python callables on individual actor replicas. This is particularly beneficial for workflows involving repetitive tasks, such as data preprocessing, model loading, or initialization of shared resources, where caching can minimize redundant operations and improve overall efficiency. Once a callable is cached on a replica, subsequent tasks that use the same actor can access the cached result, significantly improving performance and efficiency.

### When to Use `@actor_cache`

- **Shared Initialization Costs:**
  For expensive, shared initialization processes that multiple tasks rely on.

- **Repetitive Task Execution:**
  When tasks repeatedly require the same resource or computation on the same actor replica.

- **Complex Object Caching:**
  Use custom Python objects as keys to define unique cache entries.

Below is a simplified example showcasing the use of `@actor_cache` for caching repetitive tasks. This dummy example demonstrates caching model that is loaded by the `load_model` task.

```python
# caching_basic.py

from time import sleep
import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    container_image=image,
    replica_count=1,
)

@union.actor_cache
def load_model(state: int) -> callable:
    sleep(4)  # simulate model loading
    return lambda value: state + value

@actor.task
def evaluate(value: int, state: int) -> int:
    model = load_model(state=state)
    return model(value)

@union.workflow
def wf(init_value: int = 1, state: int = 3) -> int:
    out = evaluate(value=init_value, state=state)
    out = evaluate(value=out, state=state)
    out = evaluate(value=out, state=state)
    out = evaluate(value=out, state=state)
    return out
```

> [!NOTE]
> In order to get the `@actor_cache` functionality, you must pin `union` to at least `0.1.121`.

![Actor caching example 1](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/actors/caching/actor-cache-example-1.png)

You can see that the first call of `evaluate` took considerable time as it involves allocating a node for the task, creating a container, and loading the model. The subsequent calls of `evaluate` execute in a fraction of the time.

You can see examples of more advanced actor usage **Core concepts > Actors > Actor examples**.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/actors/actors-and-regular-tasks ===

# Actors and regular tasks

When deciding whether to use actors or traditional tasks in your workflows, it's important to consider the benefits
and trade-offs. This page outlines key scenarios where actors shine and where they may not be the best fit.

| When to Use Actors | When Not to Use Actors |
| ------------------ | ---------------------- |
| **Short Running Tasks** Traditional tasks spin up a new container and pod for each task, which adds overhead. Actors allow tasks to run on the same container, removing the repeated cost of pod creation, image pulling, and initialization. Actors offer the most benefit for short running tasks where the startup overhead is a larger component of total task runtime. | **Long Running Tasks** For long running tasks, container initialization overhead is minimal, therefore the performance benefits of actors become negligible when task runtime significantly exceeds startup time. |
| **Map Tasks with Large Input Arrays** Map tasks by default share the same image and resource definitions, making them a great use case for actors. Actors provide the greatest benefit when the input array is larger than the desired concurrency. For example, consider an input array with 2,000 entries and a concurrency level of 50. Without actors, map tasks would spin up 2,000 individual containers—one for each entry. With actors, only 50 containers are needed, corresponding to the number of replicas, dramatically reducing overhead. | **Map Tasks with Small Input Arrays** In a map task where the number of actor replicas matches the input array size, the same number of pods and container are initialized when a map task is used without an actor. For example, if there are 10 inputs and 10 replicas, 10 pods are created, resulting in no reduction in overhead. |
| **State Management and Efficient Initialization** Actors excel when state persistence between tasks is valuable. You can use `@actor_cache` to cache Python objects. For example, this lets you load a large model or dataset into memory once per replica, and access it across tasks run on that replica. You can also serve a model or initialize shared resources in an init container. Each task directed to that actor replica can then reuse the same model or resource. | **Strict Task Isolation Is Critical** While actors clear Python caches, global variables, and custom environment variables after each task, they still share the same container. The shared environment introduces edge cases where you could intentionally or unintentionally impact downstream tasks. For example, if you write to a file in one task, that file will remain mutated for the next task that is run on that actor replica. If strict isolation between tasks is a hard requirement, regular tasks provide a safer option. |
| **Shared Dependencies and Resources** If multiple tasks can use the same container image and have consistent resource requirements, actors are a natural fit. | |

# Efficiency Gains from Actors with Map Tasks

Let's see how using Actors with map tasks can cut runtime in half!

We compare three scenarios:

1. **Regular map tasks without specifying concurrency.** This is the fasted expected configuration as flyte will spawn as many pods as there are elements in the input array, allowing Kubernetes to manage scheduling based on available resources.
2. **Regular map tasks with fixed concurrency.** This limits the number of pods that are alive at any given time.
3. **Map tasks with Actors.** Here we set the number of replicas to match the concurrency of the previous example.

These will allow us to compare actors to vanilla map tasks when both speed is maximized and when alive pods are matched one-to-one.

## "Hello World" Benchmark

This benchmark simply runs a task that returns "Hello World", which is a near instantaneous task.

| Task Type      | Concurrency/Replicas | Duration (seconds) |
| -------------- | -------------------- | ------------------ |
| Without Actors | unbound              | 111                |
| Without Actors | 25                   | 1195               |
| With Actors    | 25                   | 42                 |

**Key Takeaway:** For near instantaneous tasks, using a 25-replica Actor with map tasks reduces runtime by 96% if live pods are matched, and 62% when map task concurrency is unbounded.

## "5s Sleep" Benchmark

This benchmark simply runs a task that sleeps for five seconds.

| Task Type      | Concurrency/Replicas | Duration (seconds) |
| -------------- | -------------------- | ------------------ |
| Without Actors | unbound              | 174                |
| Without Actors | 100                  | 507                |
| With Actors    | 100                  | 87                 |

**Key Takeaway:** For five-second long tasks, using a 100-replica Actor with map tasks reduces runtime by 83% if live pods are matched, and 50% when map task concurrency is unbounded.

If you have short running map tasks, you can cut your runtime in half. If you are already using concurrency limits on your map tasks, you can expect even better improvements!

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/actors/actor-examples ===

# Actor examples

### Refactoring from Regular Tasks to Actors

Notice that converting a non-actor workflow to use actors is as simple as replacing the `@union.task` decorator with the `@actor.task` decorator. Additionally, task decorator arguments can be moved either to the actor environment or the actor task decorator, depending on whether they apply to the entire environment (e.g. resource specifications) or to a single task execution (e.g. caching arguments).

```diff
import union

+ actor = union.ActorEnvironment(
+    name = "myenv",
+    replica_count = 10,
+    ttl_seconds = 120,
+    requests = union.Resources(mem="1Gi"),
+    container_image = "myrepo/myimage-with-scipy:latest",
+)
+
- @union.task(requests=union.Resources(mem="1Gi"))
+ @actor.task
def add_numbers(a: float, b: float) -> float:
    return a + b

- @union.task(container_image="myrepo/myimage-with-scipy:latest")
+ @actor.task
def calculate_distance(point_a: list[int], point_b: list[int]) -> float:
    from scipy.spatial.distance import euclidean
    return euclidean(point_a, point_b)

- @union.task(cache=True, cache_version="v1")
+ @actor.task(cache=True, cache_version="v1")
def is_even(number: int) -> bool:
    return number % 2 == 0

@union.workflow
def distance_add_wf(point_a: list[int], point_b: list[int]) -> float:
    distance = calculate_distance(point_a=point_a, point_b=point_b)
    return add_numbers(a=distance, b=1.5)

@union.workflow
def is_even_wf(point_a: list[int]) -> list[bool]:
    return union.map(is_even)(number=point_a)
```
<!-- TODO: emphasize-lines: 2,3,4,5,6,7,8,9,10,11,13,18,24 -->

## Multiple instances of the same task

In this example, the `actor.task`-decorated task is invoked multiple times in one workflow, and will use the same `ActorEnvironment` on each invocation:

```python
# plus_one.py

import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=300,
    requests=union.Resources(cpu="2", mem="500Mi"),
    container_image=image,
)

@actor.task
def plus_one(input: int) -> int:
    return input + 1

@union.workflow
def wf(input: int = 0) -> int:
    a = plus_one(input=input)
    b = plus_one(input=a)
    c = plus_one(input=b)
    return plus_one(input=c)

```

## Multiple tasks

Every task execution in the following example will execute in the same `ActorEnvironment`.
You can use the same environment for multiple tasks in the same workflow and tasks across workflow definitions, using both subworkflows and launch plans:

```python
# multiple_tasks.py

import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    requests=union.Resources(cpu="1", mem="450Mi"),
    container_image=image,
)

@actor.task
def say_hello(name: str) -> str:
    return f"hello {name}"

@actor.task
def scream_hello(name: str) -> str:
    return f"HELLO {name}"

@union.workflow
def my_child_wf(name: str) -> str:
    return scream_hello(name=name)

my_child_wf_lp = union.LaunchPlan.get_default_launch_plan(union.current_context(), my_child_wf)

@union.workflow
def my_parent_wf(name: str) -> str:
    a = say_hello(name=name)
    b = my_child_wf(name=a)
    return my_child_wf_lp(name=b)
```

## Custom PodTemplates

Both tasks in the following example will be executed in the same `ActorEnvironment`, which is created with a `PodTemplate` for additional configuration.

```python
# pod_template.py

import os

from kubernetes.client.models import (
    V1Container,
    V1PodSpec,
    V1ResourceRequirements,
    V1EnvVar,
)
import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union", "flytekitplugins-pod"],
)

pod_template = union.PodTemplate(
    primary_container_name="primary",
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="primary",
                image=image,
                resources=V1ResourceRequirements(
                    requests={
                        "cpu": "1",
                        "memory": "1Gi",
                    },
                    limits={
                        "cpu": "1",
                        "memory": "1Gi",
                    },
                ),
                env=[V1EnvVar(name="COMP_KEY_EX", value="compile_time")],
            ),
        ],
    ),
)

actor = union.ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    pod_template=pod_template,
)

@actor.task
def get_and_set() -> str:
    os.environ["RUN_KEY_EX"] = "run_time"
    return os.getenv("COMP_KEY_EX")

@actor.task
def check_set() -> str:
    return os.getenv("RUN_KEY_EX")

@union.workflow
def wf() -> tuple[str,str]:
    return get_and_set(), check_set()
```

## Example: `@actor_cache` with `map`

With map tasks, each task is executed within the same environment, making actors a natural fit for this pattern. If a task has an expensive operation, like model loading, caching it with `@actor_cache` can improve performance. This example shows how to cache model loading in a mapped task to avoid redundant work and save resources.

```python
# caching_map_task.py

from functools import partial
from pathlib import Path
from time import sleep
import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    container_image=image,
    replica_count=2,
)

class MyModel:
    """Simple model that multiples value with model_state."""

    def __init__(self, model_state: int):
        self.model_state = model_state

    def __call__(self, value: int):
        return self.model_state * value

@union.task(container_image=image, cache=True, cache_version="v1")
def create_model_state() -> union.FlyteFile:
    working_dir = Path(union.current_context().working_directory)
    model_state_path = working_dir / "model_state.txt"
    model_state_path.write_text("4")
    return model_state_path

@union.actor_cache
def load_model(model_state_path: union.FlyteFile) -> MyModel:
    # Simulate model loading time. This can take a long time
    # because the FlyteFile download is large, or when the
    # model is loaded onto the GPU.
    sleep(10)
    with model_state_path.open("r") as f:
        model_state = int(f.read())

    return MyModel(model_state=model_state)

@actor.task
def inference(value: int, model_state_path: union.FlyteFile) -> int:
    model = load_model(model_state_path)
    return model(value)

@union.workflow
def run_inference(values: list[int] = list(range(20))) -> list[int]:
    model_state = create_model_state()
    inference_ = partial(inference, model_state_path=model_state)
    return union.map(inference_)(value=values)
```

## Example: Caching with Custom Objects

Finally, we can cache custom objects by defining the `__hash__` and `__eq__` methods. These methods allow `@actor_cache` to determine if an object is the same between runs, ensuring that expensive operations are skipped if the object hasn’t changed.

```python
# caching_custom_object.py

from time import sleep
import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    container_image=image,
    replica_count=1,
)

class MyObj:
    def __init__(self, state: int):
        self.state = state

    def __hash__(self):
        return hash(self.state)

    def __eq__(self, other):
        return self.state == other.state

@union.actor_cache
def get_state(obj: MyObj) -> int:
    sleep(2)
    return obj.state

@actor.task
def construct_and_get_value(state: int) -> int:
    obj = MyObj(state=state)
    return get_state(obj)

@union.workflow
def wf(state: int = 2) -> int:
    value = construct_and_get_value(state=state)
    value = construct_and_get_value(state=value)
    value = construct_and_get_value(state=value)
    value = construct_and_get_value(state=value)
    return value
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/artifacts ===

# Artifacts

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

Union.ai produces many intermediate outputs when running tasks and workflows. These outputs are stored internally in Union.ai and are accessible through the relevant executions, but are not usually directly accessible to users.

The Artifact service indexes and adds semantic meaning to outputs of all Union.ai task and workflow executions, such as models, files, or any other kinds of data, enabling you to directly access, track, and orchestrate pipelines through the outputs themselves. Artifacts allow you to store additional metadata for these outputs in the form of **Core concepts > Artifacts > Partitions**, which are key-value pairs that describe the artifact and which can be used to query the Artifact Service to locate artifacts. Artifacts allow for loose coupling of workflows—for example, a downstream workflow can be configured to consume the latest result of an upstream workflow. With this higher-order abstraction, Union.ai aims to ease collaboration across teams, provide for reactivity and automation, and give you a broader view of how artifacts move across executions.

## Versioning

Artifacts are uniquely identified and versioned by the following information:

* Project
* Domain
* Artifact name
* Artifact version

You can set an artifact's name in code when you **Core concepts > Artifacts > Declaring artifacts** and the artifact version is automatically generated when the artifact is materialized as part of any task or workflow execution that emits an artifact with this name. Any execution of a task or workflow that emits an artifact creates a new version of that artifact.

## Partitions

When you declare an artifact, you can define partitions for it that enable semantic grouping of artifacts. Partitions are metadata that take the form of key-value pairs, with the keys defined at registration time and the values supplied at runtime. You can specify up to 10 partition keys for an artifact. You can set an optional partition called `time_partition` to capture information about the execution timestamp to your desired level of granularity. For more information, see **Core concepts > Artifacts > Declaring artifacts**.

> [!NOTE]
> The `time_partition` partition is not enabled by default. To enable it, set `time_partitioned=True` in the artifact declaration.
> For more information, see the **Core concepts > Artifacts > Declaring artifacts**.

## Queries

To consume an artifact in a workflow, you can define a query containing the artifact’s name as well as any required partition values. You then supply the query as an input value to the workflow definition. At execution time, the query will return the most recent version of the artifact that meets the criteria by default. You can also query for a specific artifact version.

For more information on querying for and consuming artifacts in workflows, see **Core concepts > Artifacts > Consuming artifacts in workflows**.

To query for artifacts programmatically in a Python script using `UnionRemote`, see [UnionRemote](https://www.union.ai/docs/v1/union/user-guide/development-cycle/union-remote).

> [!NOTE] `UnionRemote` vs `FlyteRemote`
> `UnionRemote` is identical to `FlyteRemote`, with additional functionality to handle artifacts.
> You cannot interact with artifacts using `FlyteRemote`.

## Lineage

Once an artifact is materialized, its lineage is visible in the UI. For more information, see **Core concepts > Artifacts > Viewing artifacts**.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/artifacts/declaring-artifacts ===

# Declaring artifacts

In order to define a task or workflow that emits an artifact, you must first declare the artifact and the keys for any **Core concepts > Artifacts > Declaring artifacts** you wish for it to have. For the `Artifact` class parameters and methods, see the [Artifact API documentation]().
<!-- TODO: Add link to API -->

## Basic artifact

In the following example, an artifact called `BasicTaskData` is declared, along with a task that emits that artifact. Since it is a basic artifact, it doesn't have any partitions.

> [!NOTE]
> To use the example code on this page, you will need to add your `registry`
> to the `pandas_image` ImageSpec block.

```python
# basic.py

import pandas as pd
import union
from typing_extensions import Annotated

pandas_image = union.ImageSpec(
    packages=["pandas==2.2.2"]
)

BasicTaskData = union.Artifact(
    name="my_basic_artifact"
)

@union.task(container_image=pandas_image)
def t1() -> Annotated[pd.DataFrame, BasicTaskData]:
    my_df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return BasicTaskData.create_from(my_df)

@union.workflow
def wf() -> pd.DataFrame:
    return t1()
```

## Time-partitioned artifact

By default, time partitioning is not enabled for artifacts. To enable it, declare the artifact with `time_partitioned` set to `True`. You can optionally set the granularity for the time partition to `MINUTE`, `HOUR`, `DAY`, or `MONTH`; the default is `DAY`.

You must also pass a value to `time_partition`, which you can do at runtime or by binding `time_partition` to an input.

### Passing a value to `time_partition` at runtime

```python
# time_partition_runtime.py

from datetime import datetime

import pandas as pd
import union
from flytekit.core.artifact import Granularity
from typing_extensions import Annotated

pandas_image = union.ImageSpec(
    packages=["pandas==2.2.2"]
)

BasicArtifact = union.Artifact(
    name="my_basic_artifact",
    time_partitioned=True,
    time_partition_granularity=Granularity.HOUR
)

@union.task(container_image=pandas_image)
def t1() -> Annotated[pd.DataFrame, BasicArtifact]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    dt = datetime.now()
    return BasicArtifact.create_from(df, time_partition=dt)

@union.workflow
def wf() -> pd.DataFrame:
    return t1()
```
<!-- TODO :emphasize-lines: 1,5,14-15,21-23 -->

### Passing a value to `time_partition` by input

```python
# time_partition_input.py

from datetime import datetime

import pandas as pd
import union
from flytekit.core.artifact import Granularity
from typing_extensions import Annotated

pandas_image = union.ImageSpec(
    packages=["pandas==2.2.2"]
)

BasicArtifact = union.Artifact(
    name="my_basic_artifact",
    time_partitioned=True,
    time_partition_granularity=Granularity.HOUR
)

@union.task(container_image=pandas_image)
def t1(date: datetime) -> Annotated[pd.DataFrame, BasicArtifact]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return BasicArtifact.create_from(df, time_partition=date)

@union.workflow
def wf(run_date: datetime):
    return t1(date=run_date)
```
<!-- TODO: emphasize-lines: 20-21,28 -->

## Artifact with custom partition keys

You can specify up to 10 custom partition keys when declaring an artifact. Custom partition keys can be set at runtime or be passed as inputs.

### Passing a value to a custom partition key at runtime

```python
# partition_keys_runtime.py

from datetime import datetime

import pandas as pd
import union
from flytekit.core.artifact import Inputs, Granularity
from typing_extensions import Annotated

pandas_image = union.ImageSpec(
    packages=["pandas==2.2.2"]
)

BasicArtifact = union.Artifact(
    name="my_basic_artifact",
    time_partitioned=True,
    time_partition_granularity=Granularity.HOUR,
    partition_keys=["key1"]
)

@union.task(container_image=pandas_image)
def t1(
    key1: str, date: datetime
) -> Annotated[pd.DataFrame, BasicArtifact(key1=Inputs.key1)]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return BasicArtifact.create_from(
        df,
        time_partition=date
    )

@union.workflow
def wf():
    run_date = datetime.now()
    values = ["value1", "value2", "value3"]
    for value in values:
        t1(key1=value, date=run_date)
```
<!-- TODO: emphasize-lines: 16,35-36 -->

### Passing a value to a custom partition key by input

```python
# partition_keys_input.py

from datetime import datetime

import pandas as pd
import union
from flytekit.core.artifact import Inputs, Granularity
from typing_extensions import Annotated

pandas_image = union.ImageSpec(
    packages=["pandas==2.2.2"]
)

BasicArtifact = union.Artifact(
    name="my_basic_artifact",
    time_partitioned=True,
    time_partition_granularity=Granularity.HOUR,
    partition_keys=["key1"]
)

@union.task(container_image=pandas_image)
def t1(
    key1: str, dt: datetime
) -> Annotated[pd.DataFrame, BasicArtifact(key1=Inputs.key1)]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return BasicArtifact.create_from(
        df,
        time_partition=dt,
        key1=key1
    )

@union.workflow
def wf(dt: datetime, val: str):
    t1(key1=val, dt=dt)
```
<!-- TODO: emphasize-lines: 16,34 -->

## Artifact with model card example

You can attach a model card with additional metadata to your artifact, formatted in Markdown:

```python
# model_card.py

import pandas as pd
import union
from union.artifacts import ModelCard
from typing_extensions import Annotated

pandas_image = union.ImageSpec(
    packages=["pandas==2.2.2"]
)

BasicArtifact = union.Artifact(name="my_basic_artifact")

def generate_md_contents(df: pd.DataFrame) -> str:
    contents = "# Dataset Card\n" "\n" "## Tabular Data\n"
    contents = contents + df.to_markdown()
    return contents

@union.task(container_image=pandas_image)
def t1() -> Annotated[pd.DataFrame, BasicArtifact]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})

    return BasicArtifact.create_from(
        df,
        ModelCard(generate_md_contents(df))
    )

@union.workflow
def wf():
    t1()
```
<!-- TODO: emphasize-lines: 4,14-17,26 -->

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/artifacts/materializing-artifacts ===

# Materializing artifacts

You can materialize an artifact by executing the task or workflow that emits the artifact.

In the example below, to materialize the `BasicArtifact` artifact, the `t1` task must be executed. The `wf` workflow runs the `t1` task three times with different values for the `key1` partition each time.
Note that each time `t1` is executed, it emits a new version of the `BasicArtifact` artifact.

> [!NOTE]
> To use the example code on this page, you will need to add your `registry` to the `pandas_image` ImageSpec block.

```python
# partition_keys_runtime.py

from datetime import datetime

import pandas as pd
import union
from flytekit.core.artifact import Inputs, Granularity
from typing_extensions import Annotated

pandas_image = union.ImageSpec(
    packages=["pandas==2.2.2"]
)

BasicArtifact = union.Artifact(
    name="my_basic_artifact",
    time_partitioned=True,
    time_partition_granularity=Granularity.HOUR,
    partition_keys=["key1"]
)

@union.task(container_image=pandas_image)
def t1(
    key1: str, date: datetime
) -> Annotated[pd.DataFrame, BasicArtifact(key1=Inputs.key1)]:
    df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return BasicArtifact.create_from(
        df,
        time_partition=date
    )

@union.workflow
def wf():
    run_date = datetime.now()
    values = ["value1", "value2", "value3"]
    for value in values:
        t1(key1=value, date=run_date)
```

> [!NOTE]
> You can also materialize an artifact by executing the `create_artifact` method of `UnionRemote`.
> For more information, see the [UnionRemote documentation](../../development-cycle/union-remote).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/artifacts/consuming-artifacts-in-workflows ===

# Consuming artifacts in workflows

## Defining a workflow that consumes an artifact

You can define a workflow that consumes an artifact by defining a query and passing it as an input to the consuming workflow.

The following code defines a query, `data_query`, that searches across all versions of `BasicArtifact` that match the partition values. This query binds parameters to the workflow's `key1` and `time_partition` inputs and returns the most recent version of the artifact.

> [!NOTE]
> To use the example code on this page, you will need to add your `registry` to the `pandas_image` ImageSpec block.

```python
# query.py

from datetime import datetime

import pandas as pd
import union
from flytekit.core.artifact import Inputs

pandas_image = union.ImageSpec(
    packages=["pandas==2.2.2"]
)

BasicArtifact = union.Artifact(
    name="my_basic_artifact"
)

@union.task(container_image=pandas_image)
def t1(key1: str, dt: datetime, data: pd.DataFrame):
    print(f"key1: {key1}")
    print(f"Date: {dt}")
    print(f"Data retrieved from query: {data}")

data_query = BasicArtifact.query(
    time_partition=Inputs.dt,
    key1=Inputs.key1,
)

@union.workflow
def query_wf(
    key1: str,
    dt: datetime,
    data: pd.DataFrame = data_query
):
    t1(key1=key1, dt=dt, data=data)
```
<!-- TODO :emphasize-lines: 23-26,35 -->

You can also directly reference a particular artifact version in a query using the `get()` method:

```python
data = BasicArtifact.get(<organization>/<domain>/BasicArtifact@<artifact-version>)
```

> [!NOTE]
> For a full list of Artifact class methods, see the [Artifact API documentation]().
<!-- TODO: Add link to API -->

## Launching a workflow that consumes an artifact

To launch a workflow that consumes an artifact as one of its inputs, navigate to the workflow in the UI and click **Launch Workflow**:

![Launch workflow UI with artifact query](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/artifacts/consuming-artifacts-in-workflows/launch-workflow-artifact-query.png)

In the `query_wf` example, the workflow takes three inputs: `key1`, `dt`, and a `BasicArtifact` artifact query. In order to create the workflow execution, you would enter values for `key1` and `dt` and click **Launch**. The artifacts service will supply the latest version of the `BasicData` artifact that meets the partition query criteria.

You can also override the artifact query from the launch form by clicking **Override**, directly supplying the input that the artifact references (in this case, a blob store URI), and clicking **Launch**:

![Launch workflow UI with artifact query override](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/artifacts/consuming-artifacts-in-workflows/launch-workflow-artifact-query-override.png)

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/artifacts/connecting-workflows-with-artifact-event-triggers ===

# Connecting workflows with artifact event triggers

In the following example, we define an upstream workflow and a downstream workflow, and define a [trigger](../launch-plans/reactive-workflows/) in a launch plan to connect the two workflows via an [artifact event](../launch-plans/reactive-workflows#artifact-events).

## Imports

> [!NOTE]
> To use the example code on this page, you will need to add your `registry` to the `pandas_image` ImageSpec block.

First we import the required packages:

```python
from datetime import datetime

import pandas as pd
import union
from union.artifacts import OnArtifact
from flytekit.core.artifact import Inputs
from typing_extensions import Annotated

```

## Upstream artifact and workflow definition

Then we define an upstream artifact and a workflow that emits a new version of `UpstreamArtifact` when executed:

```python
UpstreamArtifact = union.Artifact(
    name="my_upstream_artifact",
    time_partitioned=True,
    partition_keys=["key1"],
)

@union.task(container_image=pandas_image)
def upstream_t1(key1: str) -> Annotated[pd.DataFrame, UpstreamArtifact(key1=Inputs.key1)]:
    dt = datetime.now()
    my_df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return UpstreamArtifact.create_from(my_df, key1=key1, time_partition=dt)

@union.workflow
def upstream_wf() -> pd.DataFrame:
    return upstream_t1(key1="value1")
```

## Artifact event definition

Next we define the artifact event that will link the upstream and downstream workflows together:

```python
on_upstream_artifact = OnArtifact(
    trigger_on=UpstreamArtifact,
)
```

## Downstream workflow definition

Then we define the downstream task and workflow that will be triggered when the upstream artifact is created:

```python
@union.task
def downstream_t1():
    print("Downstream task triggered")

@union.workflow
def downstream_wf():
    downstream_t1()
```

## Launch plan with trigger definition

Finally, we create a launch plan with a trigger set to an `OnArtifact` object to link the two workflows via the `Upstream` artifact. The trigger will initiate an execution of the downstream `downstream_wf` workflow upon the creation of a new version of the `Upstream` artifact.

```python
downstream_triggered = union.LaunchPlan.create(
    "downstream_with_trigger_lp",
    downstream_wf,
    trigger=on_upstream_artifact
)
```

> [!NOTE]
> The `OnArtifact` object must be attached to a launch plan in order for the launch plan to be triggered by the creation of a new version of the artifact.

## Full example code

Here is the full example code file:

```python
# trigger_on_artifact.py
from datetime import datetime

import pandas as pd
import union
from union.artifacts import OnArtifact
from flytekit.core.artifact import Inputs
from typing_extensions import Annotated

pandas_image = union.ImageSpec(
    packages=["pandas==2.2.2"]
)

UpstreamArtifact = union.Artifact(
    name="my_upstream_artifact",
    time_partitioned=True,
    partition_keys=["key1"],
)

@union.task(container_image=pandas_image)
def upstream_t1(key1: str) -> Annotated[pd.DataFrame, UpstreamArtifact(key1=Inputs.key1)]:
    dt = datetime.now()
    my_df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
    return UpstreamArtifact.create_from(my_df, key1=key1, time_partition=dt)

@union.workflow
def upstream_wf() -> pd.DataFrame:
    return upstream_t1(key1="value1")

on_upstream_artifact = OnArtifact(
    trigger_on=UpstreamArtifact,
)

@union.task
def downstream_t1():
    print("Downstream task triggered")

@union.workflow
def downstream_wf():
    downstream_t1()

downstream_triggered = union.LaunchPlan.create(
    "downstream_with_trigger_lp",
    downstream_wf,
    trigger=on_upstream_artifact
)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/artifacts/viewing-artifacts ===

# Viewing artifacts

## Artifacts list

Artifacts can be viewed in the UI by navigating to the artifacts app in the left sidebar:

![Artifacts overview](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/artifacts/viewing-artifacts/artifacts-list.png)

## Artifact view

Selecting a specific artifact from the artifact list will take you to that artifact's **Overview** page:

![Single artifact overview](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/artifacts/viewing-artifacts/artifact-view.png)

Here you can see relevant metadata about the artifact, including:
* Its version
* Its partitions
* The task or workflow that produced it
* Its creation time
* Its object store URI
* Code for accessing the artifact via [UnionRemote](../../development-cycle/union-remote)

You can also view the artifact's object structure, model card, and lineage graph.

### Artifact lineage graph

Once an artifact is materialized, you can view its lineage in the UI, including the specific upstream task or workflow execution that created it, and any downstream workflows that consumed it. You can traverse the lineage graph by clicking between artifacts and inspecting any relevant workflow executions in order to understand and reproduce any step in the AI development process.

![Artifact lineage overview](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/artifacts/viewing-artifacts/artifact-lineage.png)

You can navigate through the lineage graph by clicking from artifact to artifact.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving ===

# App Serving

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

Union.ai lets you build and serve your own web apps, enabling you to build:

- **Model endpoints** with generic web frameworks like FastAPI or optimized inference frameworks like vLLM and SGLang.
- **AI inference-time** components like MCP servers, ephemeral agent memory state stores, etc.
- **Interactive dashboards** and other interfaces to interact with and visualize data and models from your workflows using frameworks like Streamlit, Gradio, Tensorboard, FastHTML, Dash, Panel, Voila, FiftyOne.
- **Flyte Connectors**, which are [light-weight, long running services](https://www.union.ai/docs/v1/union/integrations/connectors/_index) that connect to external
services like OpenAI, BigQuery, and Snowflake.
- **Any other web services** like [web hooks](https://www.union.ai/docs/v1/union/tutorials/serving/custom-webhooks/page.md) that can be implemented via web frameworks like FastAPI, Starlette.

## Example app

We will start with a simple Streamlit app. In this case we will use the default
Streamlit "Hello, World!" app.

In a local directory, create the following file:

```shell
└── app.py
```

## App declaration

The file `app.py` contains the app declaration:

```python
"""A simple Union.ai app using Streamlit"""

import union
import os

# The `ImageSpec` for the container that will run the `App`.
# `union-runtime` must be declared as a dependency,
# in addition to any other dependencies needed by the app code.
# Use Union remote Image builder to build the app container image
image = union.ImageSpec(
    name="streamlit-app",
    packages=["union-runtime>=0.1.18", "streamlit==1.51.0"],
    builder="union"
)

# The `App` declaration.
# Uses the `ImageSpec` declared above.
# In this case we do not need to supply any app code
# as we are using the built-in Streamlit `hello` app.
app = union.app.App(
    name="streamlit-hello",
    container_image=image,
    args="streamlit hello --server.port 8080",
    port=8080,
    limits=union.Resources(cpu="1", mem="1Gi"),
)
```

Here the `App` constructor is initialized with the following parameters:

* `name`: The name of the app. This name will be displayed in app listings (via CLI and UI) and used to refer to the app when deploying and stopping.
* `container_image`: The container image that will be used to for the container that will run the app. Here we use a prebuilt container provided by Union.ai that support Streamlit.
* `args`: The command that will be used within the container to start the app. The individual strings in this array will be concatenated and the invoked as a single command.
* `port`: The port of the app container from which the app will be served.
* `limits`: A `union.Resources` object defining the resource limits for the app container.
  The same object is used for the same purpose in the `@union.task` decorator in Union.ai workflows.
  See **Core concepts > Tasks > Task hardware environment > Customizing task resources** for details.

The parameters above are the minimum needed to initialize the app.

There are a few additional available parameters that we do not use in this example (but we will cover later):

* `include`: A list of files to be added to the container at deployment time, containing the custom code that defines the specific functionality of your app.
* `inputs`: A `List` of `union.app.Input` objects. Used to provide default inputs to the app on startup.
* `requests`: A `union.Resources` object defining the resource requests for the app container. The same object is used for the same purpose in the `@union.task` decorator in Union.ai workflows (see **Core concepts > Tasks > Task hardware environment > Customizing task resources** for details).
* `min_replicas`: The minimum number of replica containers permitted for this app.
  This defines the lower bound for auto-scaling the app. The default is 0 <!-- TODO: (see [App autoscaling]() for details) -->.
* `max_replicas`: The maximum number of replica containers permitted for this app.
  This defines the upper bound for auto-scaling the app. The default is 1 <!-- TODO: (see [App autoscaling]() for details) -->.

## Deploy the app

Deploy the app with:

```shell
$ union deploy apps APP_FILE APP_NAME
```

* `APP_FILE` is the Python file that contains one or more app declarations.
* `APP_NAME` is the name of (one of) the declared apps in APP_FILE. The name of an app is the value of the `name` parameter passed into the `App` constructor.

If an app with the name `APP_NAME` does not yet exist on the system then this command creates that app and starts it.
If an app by that name already exists then this command stops the app, updates its code and restarts it.

In this case, we do the following:

```shell
$ union deploy apps app.py streamlit-hello
```

This will return output like the following:

```shell
✨ Creating Application: streamlit-demo
Created Endpoint at: https://withered--firefly--8ca31.apps.demo.hosted.unionai.cloud/
```

Click on the displayed endpoint to go to the app:

![A simple app](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/streamlit-hello.png)

## Viewing deployed apps

Go to **Apps** in the left sidebar in Union.ai to see a list of all your deployed apps:

![Apps list](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/apps-list.png)

To connect to an app click on its **Endpoint**.
To see more information about the app, click on its **Name**.
This will take you to the **App view**:

![App view](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/app-view.png)

Buttons to **Copy Endpoint** and **Start app** are available at the top of the view.

You can also view all apps deployed in your Union.ai instance from the command-line with:

```shell
$ union get apps
```

This will display the app list:

```shell
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━┳━━━━━━━━┓
┃ Name                                    ┃ Link       ┃ Status     ┃ Desired State ┃ CPU ┃ Memory ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━╇━━━━━━━━┩
│ streamlit-query-2                       │ Click Here │ Started    │ Stopped       │ 2   │ 2Gi    │
│ streamlit-demo-1                        │ Click Here │ Started    │ Started       │ 3   │ 2Gi    │
│ streamlit-query-3                       │ Click Here │ Started    │ Started       │ 2   │ 2Gi    │
│ streamlit-demo                          │ Click Here │ Unassigned │ Started       │ 2   │ 2Gi    │
└─────────────────────────────────────────┴────────────┴────────────┴───────────────┴─────┴────────┘
```

## Stopping apps

To stop an app from the command-line, perform the following command:

```shell
$ union stop apps --name APP_NAME
```

`APP_NAME` is the name of an app deployed on the Union.ai instance.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/adding-your-own-code ===

# Serving custom code

In the introductory section we saw how to define and deploy a simple Streamlit app.
The app deployed was the default hello world Streamlit example app.
In this section, we will expand on this by adding our own custom code to the app.

## Example app

We will initialize the app in `app.py` as before, but now we will add two files containing our own code, `main.py` and `utils.py`.

In a local directory, create the following files:

```shell
├── app.py
├── main.py
└── utils.py
```

## App declaration

The file `app.py` contains the app declaration:

```python
"""A Union.ai app with custom code"""

import os
import union

# The `ImageSpec` for the container that will run the `App`.
# `union-runtime` must be declared as a dependency,
# in addition to any other dependencies needed by the app code.
# Set the environment variable `REGISTRY` to be the URI for your container registry.
# If you are using `ghcr.io` as your registry, make sure the image is public.
image = union.ImageSpec(
    name="streamlit-app",
    packages=["streamlit==1.51.0", "union-runtime>=0.1.18", "pandas==2.2.3", "numpy==2.2.3"],
    builder="union"
)

# The `App` declaration.
# Uses the `ImageSpec` declared above.
# Your core logic of the app resides in the files declared
# in the `include` parameter, in this case, `main.py` and `utils.py`.
app = union.app.App(
    name="streamlit-custom-code",
    container_image=image,
    args="streamlit run main.py --server.port 8080",
    port=8080,
    include=["main.py", "utils.py"],
    limits=union.Resources(cpu="1", mem="1Gi"),
)
```

Compared to the first example we have added one more parameter:

* `include`: A list of files to be added to the container at deployment time, containing the custom code that defines the specific functionality of your app.

## Custom code

In this example we include two files containing custom logic: `main.py` and `utils.py`.

The file `main.py` contains the bulk of our custom code:

```python
"""Streamlit App that plots data"""
import streamlit as st
from utils import generate_data

all_columns = ["Apples", "Orange", "Pineapple"]
with st.container(border=True):
    columns = st.multiselect("Columns", all_columns, default=all_columns)

all_data = st.cache_data(generate_data)(columns=all_columns, seed=101)

data = all_data[columns]

tab1, tab2 = st.tabs(["Chart", "Dataframe"])
tab1.line_chart(data, height=250)
tab2.dataframe(data, height=250, use_container_width=True)
```

The file `utils.py` contains a supporting data generating function that is imported into the file above

```python
"""Function to generate sample data."""
import numpy as np
import pandas as pd

def generate_data(columns: list[str], seed: int = 42):
    rng = np.random.default_rng(seed)
    data = pd.DataFrame(rng.random(size=(20, len(columns))), columns=columns)
    return data
```

## Deploy the app

Deploy the app with:

```shell
$ union deploy apps app.py streamlit-custom-code
```

The output displays the console URL and endpoint for the Streamlit app:

```shell
✨ Deploying Application: streamlit-custom-code
🔎 Console URL:
https://<union-host-url>/org/...
[Status] Pending: OutOfDate: The Configuration is still working to reflect the latest desired
specification.
[Status] Started: Service is ready

🚀 Deployed Endpoint: https://<unique-subhost>.apps.<union-host-url>
```

Navigate to the endpoint to see the Streamlit App!

![Streamlit App](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/custom-code-streamlit.png)

## App deployment with included files

When a new app is deployed for the first time (i.e., there is no app registered with the specified `name`),
a container is spun up using the specified `container_image` and the files specified in `include` are
copied into the container. The `args` is the then executed in the container, starting the app.

If you alter the `include` code you need to re-deploy your app.
When `union deploy apps` is called using an app name that corresponds to an already existing app,
the app code is updated in the container and the app is restarted.

You can iterate on your app easily by changing your `include` code and re-deploying.

Because there is a slight performance penalty involved in copying the `include` files into the container,
you may wish to consolidate you code directly into custom-built image once you have successfully iterated to production quality.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/serving-a-model ===

# Serving a Model from a Workflow With FastAPI

In this section, we create a Union.ai app to serve a scikit-learn model created by a Union.ai workflow
using `FastAPI`.

## Example app

In this example, we first use a Union.ai workflow to train a model and output it as a Union.ai `Artifact`.
We then use a Union.ai app to serve the model using `FastAPI`.

In a local directory, create the following files:

```shell
├── app.py
├── main.py
└── train_wf.py
```

## App configuration

In the code below, we declare the resources, runtime image, and FastAPI app that
exposes a `/predict` endpoint.

```python
"""A Union.ai app that uses FastAPI to serve model created by a Union.ai workflow."""

import os
import union
import joblib
from fastapi import FastAPI

SklearnModel = union.Artifact(name="sklearn-model")

# The `ImageSpec` for the container that will run the `App`, where `union-runtime`
# must be declared as a dependency. In addition to any other dependencies needed
# by the app code. Set the environment variable `REGISTRY` to be the URI for your
# container registry. If you are using `ghcr.io` as your registry, make sure the
# image is public.
image_spec = union.ImageSpec(
    name="union-serve-sklearn-fastapi",
    packages=["union-runtime>=0.1.18", "scikit-learn==1.5.2", "joblib==1.5.1", "fastapi[standard]"],
    builder="union"
)

ml_models = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    model_file = os.getenv("SKLEARN_MODEL")
    ml_models["model"] = joblib.load(model_file)
    yield

app = FastAPI(lifespan=lifespan)

# The `App` declaration, which uses the `ImageSpec` declared above.
# Your core logic of the app resides in the files declared in the `include`
# parameter, in this case, `main.py`. Input artifacts are declared in the
# `inputs` parameter
fast_api_app = union.app.App(
    name="simple-fastapi-sklearn",
    inputs=[
        union.app.Input(
            value=SklearnModel.query(),
            download=True,
            env_var="SKLEARN_MODEL",
        )
    ],
    container_image=image_spec,
    framework_app=app,
    limits=union.Resources(cpu="1", mem="1Gi"),
    port=8082,
)

@app.get("/predict")
async def predict(x: float, y: float) -> float:
    result = ml_models["model"]([[x, y]])
    return {"result": result}

```

Note that the Artifact is provided as an `Input` to the App definition. With `download=True`,
the model is downloaded to the container's working directory. The full local path to the
model is set to `SKLEARN_MODEL` by the runtime.

During startup, the FastAPI app loads the model using the `SKLEARN_MODEL` environment
variable. Then it serves an endpoint at `/predict` that takes two float inputs and
returns a float result.

## Training workflow

The training workflow trains a random forest regression and saves it to a Union.ai
`Artifact`.

```python
"""A Union.ai workflow that trains a model."""

import os
from pathlib import Path
from typing import Annotated

import joblib
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor

import union

# Declare the `Artifact`.
SklearnModel = union.Artifact(name="sklearn-model")

# The `ImageSpec` for the container that runs the tasks.
# Set the environment variable `REGISTRY` to be the URI for your container registry.
# If you are using `ghcr.io` as your registry, make sure the image is public.
image_spec = union.ImageSpec(
    packages=["scikit-learn==1.5.2", "joblib==1.5.1"],
    builder="union"
)

# The `task` that trains a `RandomForestRegressor` model.
@union.task(
    limits=union.Resources(cpu="2", mem="2Gi"),
    container_image=image_spec,
)
def train_model() -> Annotated[union.FlyteFile, SklearnModel]:
    """Train a RandomForestRegressor model and save it as a file."""
    X, y = make_regression(n_features=2, random_state=42)
    working_dir = Path(union.current_context().working_directory)
    model_file = working_dir / "model.joblib"

    rf = RandomForestRegressor().fit(X, y)
    joblib.dump(rf, model_file)
    return model_file
```

## Run the example

To run this example you will need to register and run the workflow first:

```shell
$ union run --remote train_wf.py train_model
```

This task trains a `RandomForestRegressor`, saves it to a file, and uploads it to
a Union.ai `Artifact`. This artifact is retrieved by the FastAPI app for
serving the model.

![scikit-learn Artifact](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/fastapi-sklearn/sklearn-artifact.png)

Once the workflow has completed, you can deploy the app:

```shell
$ union deploy apps app.py simple-fastapi-sklearn
```

The output displays the console URL and endpoint for the FastAPI App:

```shell
✨ Deploying Application: simple-fastapi-sklearn
🔎 Console URL: https://<union-host-url>/org/...
[Status] Pending: OutOfDate: The Configuration is still working to reflect the latest desired
specification.
[Status] Pending: IngressNotConfigured: Ingress has not yet been reconciled.
[Status] Pending: Uninitialized: Waiting for load balancer to be ready
[Status] Started: Service is ready

🚀 Deployed Endpoint: https://<unique-subhost>.apps.<union-host-url>
```

You can see the Swagger docs of the FastAPI endpoint, by going to `/docs`:

![scikit-learn FastAPI App](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/fastapi-sklearn/sklearn-fastapi.png)

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/fast-api-auth ===

# API Key Authentication with FastAPI

In this guide, we'll deploy a FastAPI app that uses API key authentication. This
allows you to invoke the endpoint from the public internet in a secure manner.

## Define the Fast API app

First we define the `ImageSpec` for the runtime image:

```python
import os
from union import ImageSpec, Resources, Secret
from union.app import App

image_spec = ImageSpec(
    name="fastapi-with-auth-image",
    builder="union",
    packages=["union-runtime>=0.1.18", "fastapi[standard]==0.115.11", "union>=0.1.150"],
)
```

Then we define a simple FastAPI app that uses `HTTPAuthorizationCredentials` to
authenticate requests.

```python
import os

from fastapi import FastAPI, HTTPException, Security, status, Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from typing import Annotated

from union import UnionRemote

app = FastAPI()
fast_api_app = union.app.App(
    name="fastapi-with-auth",
    secrets=[
        union.Secret(key="AUTH_API_KEY", env_var="AUTH_API_KEY"),
        union.Secret(key="MY_UNION_API_KEY", env_var="UNION_API_KEY"),
    ],
    container_image=image_spec,
    framework_app=app,
    limits=union.Resources(cpu="1", mem="1Gi"),
    port=8082,
    requires_auth=False,
)

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(HTTPBearer()),
) -> HTTPAuthorizationCredentials:
    auth_api_key = os.getenv("AUTH_API_KEY")
    if credentials.credentials != AUTH_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials

@app.get("/")
def root(
    credentials: Annotated[HTTPAuthorizationCredentials, Depends(verify_token)],
):
    return {"message": "Hello, World!"}
```

As you can see, we define a `FastAPI` app and provide it as an input in the
`union.app.App` definition. Then, we define a `verify_token` function that
verifies the API key. Finally, we define a root endpoint that uses the
`verify_token` function to authenticate requests.

Note that we are also requesting for two secrets:
- The `AUTH_API_KEY` is used by the FastAPI app to authenticate the webhook.
- The `MY_UNION_API_KEY` is used to authenticate UnionRemote with Union.

With `requires_auth=False`, you can reach the endpoint without going through Union’s authentication, which is okay since we are rolling our own `AUTH_API_KEY`. Before
we can deploy the app, we create the secrets required by the application:

```bash
union create secret --name AUTH_API_KEY
```

Next, to create the MY_UNION_API_KEY secret, we need to first create a admin api-key:

```bash
union create admin-api-key --name MY_UNION_API_KEY
```
## Deploy the Fast API app

Finally, you can now deploy the FastAPI app:

```bash
union deploy apps app.py fastapi-with-auth
```

Deploying the application will stream the status to the console:

```
Image ghcr.io/.../webhook-serving:KXwIrIyoU_Decb0wgPy23A found. Skip building.
✨ Deploying Application: fastapi-webhook
🔎 Console URL: https://<union-tenant>/console/projects/thomasjpfan/domains/development/apps/fastapi-webhook
[Status] Pending: App is pending deployment
[Status] Pending: RevisionMissing: Configuration "fastapi-webhook" is waiting for a Revision to become ready.
[Status] Pending: IngressNotConfigured: Ingress has not yet been reconciled.
[Status] Pending: Uninitialized: Waiting for load balancer to be ready
[Status] Started: Service is ready
🚀 Deployed Endpoint: https://rough-meadow-97cf5.apps.<union-tenant>
```

Then to invoke the endpoint, you can use the following curl command:

```bash
curl -X GET "https://rough-meadow-97cf5.apps.<union-tenant>/" \
-H "Authorization: Bearer <MY_UNION_API_KEY>"
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/cache-huggingface-model ===

# Cache a HuggingFace Model as an Artifact

This guide shows you how to cache HuggingFace models as Union Artifacts.

The [`union cache model-from-hf`](https://www.union.ai/docs/v1/union/api-reference/union-cli) command allows you to automatically download and cache models from HuggingFace Hub as Union Artifacts. This is particularly useful for serving large language models (LLMs) and other AI models efficiently in production environments.

## Why Cache Models from HuggingFace?

Caching models from HuggingFace Hub as Union Artifacts provides several key benefits:

- **Faster Model Downloads**: Once cached, models load much faster since they're stored in Union's optimized blob storage.
- **Stream model weights into GPU memory**: Union's [`SGLangApp`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.app.llm) and [`VLLMApp`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.app.llm) classes also allow you to load model weights
  directly into GPU memory instead of downloading the weights to disk first, then loading to GPU memory.
- **Reliability**: Eliminates dependency on HuggingFace Hub availability during model serving.
- **Cost Efficiency**: Reduces repeated downloads and bandwidth costs from HuggingFace Hub.
- **Version Control**: Each cached model gets a unique artifact ID for reproducible deployments.
- **Sharding Support**: Large models can be automatically sharded for distributed inference.
- **Streaming**: Models can be streamed directly from blob storage to GPU memory.

## Prerequisites

Before using the `union cache model-from-hf` command, you need to set up authentication:

1. **Create a HuggingFace API Token**:
   - Go to [HuggingFace Settings](https://huggingface.co/settings/tokens)
   - Create a new token with read access
   - Store it as a Union secret:
   ```bash
   union create secret --name HUGGINGFACE_TOKEN
   ```

2. **Create a Union API Key** (optional):
   ```bash
   union create api-key admin --name MY_API_KEY
   union create secret --name MY_API_KEY
   ```

If you don't want to create a Union API key, Union tenants typically ship with
a `EAGER_API_KEY` secret, which is an internally-provision Union API key that
you can use for the purpose of caching HuggingFace models.

## Basic Example: Cache a Model As-Is

The simplest way to cache a model is to download it directly from HuggingFace without any modifications:

```bash
union cache model-from-hf Qwen/Qwen2.5-0.5B-Instruct \
    --hf-token-key HUGGINGFACE_TOKEN \
    --union-api-key EAGER_API_KEY \
    --artifact-name qwen2-5-0-5b-instruct \
    --cpu 2 \
    --mem 8Gi \
    --ephemeral-storage 10Gi \
    --wait
```

### Command Breakdown

- `Qwen/Qwen2.5-0.5B-Instruct`: The HuggingFace model repository
- `--hf-token-key HUGGINGFACE_TOKEN`: Union secret containing your HuggingFace API token
- `--union-api-key EAGER_API_KEY`: Union secret with admin permissions
- `--artifact-name qwen2-5-0-5b-instruct`: Custom name for the cached artifact.
  If not provided, the model repository name is lower-cased and `.` characters are
  replaced with `-`.
- `--cpu 2`: CPU resources for downloading the caching
- `--mem 8Gi`: Memory resources for downloading and caching
- `--ephemeral-storage 10Gi`: Temporary storage for the download process
- `--wait`: Wait for the caching process to complete

### Output

When the command runs, you'll see outputs like this:

```
🔄 Started background process to cache model from Hugging Face repo Qwen/Qwen2.5-0.5B-Instruct.
 Check the console for status at
https://acme.union.ai/console/projects/flytesnacks/domains/development/executions/a5nr2
g79xb9rtnzczqtp
```

You can then visit the URL to see the model caching workflow on the Union UI.

If you provide the `--wait` flag to the `union cache model-from-hf` command,
the command will wait for the model to be cached and then output additional
information:

```
Cached model at:
/tmp/flyte-axk70dc8/sandbox/local_flytekit/50b27158c2bb42efef8e60622a4d2b6d/model_snapshot
Model Artifact ID:
flyte://av0.2/acme/flytesnacks/development/qwen2-5-0-5b-instruct@322a60c7ba4df41621be528a053f3b1a

To deploy this model run:
union deploy model --project None --domain development
flyte://av0.2/acme/flytesnacks/development/qwen2-5-0-5b-instruct@322a60c7ba4df41621be528a053f3b1a
```

## Using Cached Models in Applications

Once you have cached a model, you can use it in your Union serving apps:

### VLLM App Example

```python
import os
from union import Artifact, Resources
from union.app.llm import VLLMApp
from flytekit.extras.accelerators import L4

# Use the cached model artifact
Model = Artifact(name="qwen2-5-0-5b-instruct")

vllm_app = VLLMApp(
    name="vllm-app-3",
    requests=Resources(cpu="12", mem="24Gi", gpu="1"),
    accelerator=L4,
    model=Model.query(),  # Query the cached artifact
    model_id="qwen2",
    scaledown_after=300,
    stream_model=True,
    port=8084,
)
```

### SGLang App Example

```python
import os
from union import Artifact, Resources
from union.app.llm import SGLangApp
from flytekit.extras.accelerators import L4

# Use the cached model artifact
Model = Artifact(name="qwen2-5-0-5b-instruct")

sglang_app = SGLangApp(
    name="sglang-app-3",
    requests=Resources(cpu="12", mem="24Gi", gpu="1"),
    accelerator=L4,
    model=Model.query(),  # Query the cached artifact
    model_id="qwen2",
    scaledown_after=300,
    stream_model=True,
    port=8000,
)
```

## Advanced Example: Sharding a Model with the vLLM Engine

For large models that require distributed inference, you can use the `--shard-config` option to automatically shard the model using the [vLLM](https://docs.vllm.ai/en/latest/) inference engine.

### Create a Shard Configuration File

Create a YAML file (e.g., `shard_config.yaml`) with the sharding parameters:

```yaml
engine: vllm
args:
  model: unsloth/Llama-3.3-70B-Instruct
  tensor_parallel_size: 4
  gpu_memory_utilization: 0.9
  extra_args:
    max_model_len: 16384
```

The `shard_config.yaml` file is a YAML file that should conform to the
[`remote.ShardConfig`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.remote)
dataclass, where the `args` field contains configuration that's forwarded to the
underlying inference engine. Currently, only the `vLLM` engine is supported for sharding, so
the `args` field should conform to the [`remote.VLLMShardArgs`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.remote) dataclass.

### Cache the Sharded Model

```bash
union cache model-from-hf unsloth/Llama-3.3-70B-Instruct \
    --hf-token-key HUGGINGFACE_TOKEN \
    --union-api-key EAGER_API_KEY \
    --artifact-name llama-3-3-70b-instruct-sharded \
    --cpu 36 \
    --gpu 4 \
    --mem 300Gi \
    --ephemeral-storage 300Gi \
    --accelerator nvidia-l40s \
    --shard-config shard_config.yaml \
    --project flytesnacks \
    --domain development \
    --wait
```

## Best Practices

When caching models without sharding

1. **Resource Sizing**: Allocate sufficient resources for the model size:
   - Small models (< 1B): 2-4 CPU, 4-8Gi memory
   - Medium models (1-7B): 4-8 CPU, 8-16Gi memory
   - Large models (7B+): 8+ CPU, 16Gi+ memory

2. **Sharding for Large Models**: Use tensor parallelism for models > 7B parameters:
   - 7-13B models: 2-4 GPUs
   - 13-70B models: 4-8 GPUs
   - 70B+ models: 8+ GPUs

3. **Storage Considerations**: Ensure sufficient ephemeral storage for the download process

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/deploy-optimized-llm-endpoints ===

# Deploy Optimized LLM Endpoints with vLLM and SGLang

This guide shows you how to deploy high-performance LLM endpoints using SGLang
and vLLM. It also shows how to use Union's optimized serving images that are
designed to reduce cold start times and provide efficient model serving
capabilities.

For information on how to cache models from HuggingFace Hub as Union Artifacts,
see the [Cache a HuggingFace Model as an Artifact](./cache-huggingface-model) guide.

## Overview

Union provides two specialized app classes for serving high-performance LLM endpoints:

- **[`SGLangApp`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.app.llm)**: uses [SGLang](https://docs.sglang.ai/), a fast serving framework for large language models and vision language models.
- **[`VLLMApp`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.app.llm)**: uses [vLLM](https://docs.vllm.ai/en/latest/), a fast and easy-to-use library for LLM inference and serving.

By default, both classes provide:

- **Reduced cold start times** through optimized image loading.
- **Fast model loading** by streaming model weights directly from blob storage to GPU memory.
- **Distributed inference** with options for shared memory and tensor parallelism.

You can also serve models with other frameworks like [FastAPI](./serving-a-model), but doing so would require more
effort to achieve high performance, whereas vLLM and SGLang provide highly performant LLM endpoints out of the box.

## Basic Example: Deploy a Non-Sharded Model

### Deploy with vLLM

Assuming that you have followed the guide to [cache models from huggingface](./cache-huggingface-model)
and have a model artifact named `qwen2-5-0-5b-instruct`, you can deploy a simple LLM endpoint with the following code:

```python
# vllm_app.py

import union
from union.app.llm import VLLMApp
from flytekit.extras.accelerators import L4

# Reference the cached model artifact
Model = union.Artifact(name="qwen2-5-0-5b-instruct")

# Deploy with default image
vllm_app = VLLMApp(
    name="vllm-app",
    requests=union.Resources(cpu="12", mem="24Gi", gpu="1"),
    accelerator=L4,
    model=Model.query(),  # Query the cached artifact
    model_id="qwen2",
    scaledown_after=300,
    stream_model=True,  # Enable streaming for faster loading
    port=8084,
    requires_auth=False,
)
```

To use the optimized image, use the `OPTIMIZED_VLLM_IMAGE` variable:

```python
from union.app.llm import OPTIMIZED_VLLM_IMAGE

vllm_app = VLLMApp(
    name="vllm-app",
    container_image=OPTIMIZED_VLLM_IMAGE,
    ...
)
```

Here we're using a single L4 GPU to serve the model and specifying `stream_model=True`
to stream the model weights directly to GPU memory.

Deploy the app:

```bash
union deploy apps vllm_app.py vllm-app
```

### Deploy with SGLang

```python
# sglang_app.py

import union
from union.app.llm import SGLangApp
from flytekit.extras.accelerators import L4

# Reference the cached model artifact
Model = union.Artifact(name="qwen2-5-0-5b-instruct")

# Deploy with default image
sglang_app = SGLangApp(
    name="sglang-app",
    requests=union.Resources(cpu="12", mem="24Gi", gpu="1"),
    accelerator=L4,
    model=Model.query(),  # Query the cached artifact
    model_id="qwen2",
    scaledown_after=300,
    stream_model=True,  # Enable streaming for faster loading
    port=8000,
    requires_auth=False,
)
```

To use the optimized image, use the `OPTIMIZED_SGLANG_IMAGE` variable:

```python
from union.app.llm import OPTIMIZED_SGLANG_IMAGE

sglang_app = SGLangApp(
    name="sglang-app",
    container_image=OPTIMIZED_SGLANG_IMAGE,
    ...
)
```

Deploy the app:

```bash
union deploy apps sglang_app.py sglang-app
```

## Custom Image Example: Deploy with Your Own Image

If you need more control over the serving environment, you can define a custom `ImageSpec`.
For vLLM apps, that would look like this:

```python
import union
from union.app.llm import VLLMApp
from flytekit.extras.accelerators import L4

# Reference the cached model artifact
Model = union.Artifact(name="qwen2-5-0-5b-instruct")

# Define custom optimized image
image = union.ImageSpec(
    name="vllm-serving-custom",
    builder="union",
    apt_packages=["build-essential"],
    packages=["union[vllm]>=0.1.189"],
    env={
        "NCCL_DEBUG": "INFO",
        "CUDA_LAUNCH_BLOCKING": "1",
    },
)

# Deploy with custom image
vllm_app = VLLMApp(
    name="vllm-app-custom",
    container_image=image,
    ...
)
```

And for SGLang apps, it would look like this:

```python
# sglang_app.py

import union
from union.app.llm import SGLangApp
from flytekit.extras.accelerators import L4

# Reference the cached model artifact
Model = union.Artifact(name="qwen2-5-0-5b-instruct")

# Define custom optimized image
image = union.ImageSpec(
    name="sglang-serving-custom",
    builder="union",
    python_version="3.12",
    apt_packages=["build-essential"],
    packages=["union[sglang]>=0.1.189"],
)

# Deploy with custom image
sglang_app = SGLangApp(
    name="sglang-app-custom",
    container_image=image,
    ...
)
```

This allows you to control the exact package versions in the image, but at the
cost of increased cold start times. This is because the Union images are optimized
with [Nydus](https://github.com/dragonflyoss/nydus), which reduces the cold start
time by streaming container image layers. This allows the container to start before
the image is fully downloaded.

## Advanced Example: Deploy a Sharded Model

For large models that require distributed inference, deploy using a sharded model artifact:

### Cache a Sharded Model

First, cache a large model with sharding (see [Cache a HuggingFace Model as an Artifact](./cache-huggingface-model#advanced-example-sharding-a-model-with-the-vllm-engine) for details).
First create a shard configuration file:

```yaml
# shard_config.yaml
engine: vllm
args:
  model: unsloth/Llama-3.3-70B-Instruct
  tensor_parallel_size: 4
  gpu_memory_utilization: 0.9
  extra_args:
    max_model_len: 16384
```

Then cache the model:

```bash
union cache model-from-hf unsloth/Llama-3.3-70B-Instruct \
    --hf-token-key HUGGINGFACE_TOKEN \
    --union-api-key EAGER_API_KEY \
    --artifact-name llama-3-3-70b-instruct-sharded \
    --cpu 36 \
    --gpu 4 \
    --mem 300Gi \
    --ephemeral-storage 300Gi \
    --accelerator nvidia-l40s \
    --shard-config shard_config.yaml \
    --project flytesnacks \
    --domain development \
    --wait
```

### Deploy with VLLMApp

Once the model is cached, you can deploy it to a vLLM app:

```python
# vllm_app_sharded.py

from flytekit.extras.accelerators import L40S
from union import Artifact, Resources
from union.app.llm import VLLMApp

# Reference the sharded model artifact
LLMArtifact = Artifact(name="llama-3-3-70b-instruct-sharded")

# Deploy sharded model with optimized configuration
vllm_app = VLLMApp(
    name="vllm-app-sharded",
    requests=Resources(
        cpu="36",
        mem="300Gi",
        gpu="4",
        ephemeral_storage="300Gi",
    ),
    accelerator=L40S,
    model=LLMArtifact.query(),
    model_id="llama3",

    # Additional arguments to pass into the vLLM engine:
    # see https://docs.vllm.ai/en/stable/serving/engine_args.html
    # or run `vllm serve --help` to see all available arguments
    extra_args=[
        "--tensor-parallel-size", "4",
        "--gpu-memory-utilization", "0.8",
        "--max-model-len", "4096",
        "--max-num-seqs", "256",
        "--enforce-eager",
    ],
    env={
        "NCCL_DEBUG": "INFO",
        "CUDA_LAUNCH_BLOCKING": "1",
        "VLLM_SKIP_P2P_CHECK": "1",
    },
    shared_memory=True,  # Enable shared memory for multi-GPU
    scaledown_after=300,
    stream_model=True,
    port=8084,
    requires_auth=False,
)
```

Then deploy the app:

```bash
union deploy apps vllm_app_sharded.py vllm-app-sharded-optimized
```

### Deploy with SGLangApp

You can also deploy the sharded model to a SGLang app:

```python
import os
from flytekit.extras.accelerators import GPUAccelerator
from union import Artifact, Resources
from union.app.llm import SGLangApp

# Reference the sharded model artifact
LLMArtifact = Artifact(name="llama-3-3-70b-instruct-sharded")

# Deploy sharded model with SGLang
sglang_app = SGLangApp(
    name="sglang-app-sharded",
    requests=Resources(
        cpu="36",
        mem="300Gi",
        gpu="4",
        ephemeral_storage="300Gi",
    ),
    accelerator=GPUAccelerator("nvidia-l40s"),
    model=LLMArtifact.query(),
    model_id="llama3",

    # Additional arguments to pass into the SGLang engine:
    # See https://docs.sglang.ai/backend/server_arguments.html for details.
    extra_args=[
        "--tensor-parallel-size", "4",
        "--mem-fraction-static", "0.8",
    ],
    env={
        "NCCL_DEBUG": "INFO",
        "CUDA_LAUNCH_BLOCKING": "1",
    },
    shared_memory=True,
    scaledown_after=300,
    stream_model=True,
    port=8084,
    requires_auth=False,
)
```

Then deploy the app:

```bash
union deploy apps sglang_app_sharded.py sglang-app-sharded-optimized
```

## Authentication via API Key

To secure your `SGLangApp`s and `VLLMApp`s with API key authentication, you can
specify a secret in the `extra_args` parameter. First, create a secret:

```bash
union secrets create --name AUTH_SECRET
```

Add the secret value to the input field and save the secret.

Then, add the secret to the `extra_args` parameter. For SGLang, do the following:

```python
from union import Secret

sglang_app = SGLangApp(
    name="sglang-app",
    ...,
    # Disable Union's platform-level authentication so you can access the
    # endpoint in the public internet
    requires_auth=False,
    secrets=[Secret(key="AUTH_SECRET", env_var="AUTH_SECRET")],
    extra_args=[
        ...,
        "--api-key", "$AUTH_SECRET",  # Use the secret in the extra_args
    ],
)
```

And similarly for vLLM, do the following:

```python
from union import Secret

vllm_app = VLLMApp(
    name="vllm-app",
    ...,
    # Disable Union's platform-level authentication so you can access the
    # endpoint in the public internet
    requires_auth=False,
    secrets=[Secret(key="AUTH_SECRET", env_var="AUTH_SECRET")],
    extra_args=[
        ...,
        "--api-key", "$AUTH_SECRET",  # Use the secret in the extra_args
    ],
)
```

## Performance Tuning

You can refer to the corresponding documentation for vLLM and SGLang for more
information on how to tune the performance of your app.

- **vLLM**: see the [optimization and tuning](https://docs.vllm.ai/en/latest/configuration/optimization.html) and [engine arguments](https://docs.vllm.ai/en/latest/configuration/engine_args.html) pages to learn about how to tune the performance of your app. You can also look at the [distributed inference and serving](https://docs.vllm.ai/en/latest/serving/distributed_serving.html) page to learn more about distributed inference.
- **SGLang**: see the [environment variables](https://docs.sglang.ai/references/environment_variables.html#performance-tuning) and [server arguments](https://docs.sglang.ai/backend/server_arguments.html) pages to learn about all of the available serving
options in SGLang.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/deploying-your-connector ===

# Deploying Custom Flyte Connectors

[Flyte connectors](https://www.union.ai/docs/v1/union/user-guide/integrations/connectors/_index) allow you to extend Union's capabilities by integrating with external services.
This guide explains how to deploy custom connectors that can be used in your Flyte workflows.

## Overview

Connectors enable your workflows to interact with third-party services or systems.
Union.ai supports deploying connectors as services using the `FlyteConnectorApp` class. You can deploy connectors in two ways:

1. **Module-based deployment**: Include your connector code directly in the deployment
2. **ImageSpec-based deployment**: Use pre-built images with connectors already installed

## Prerequisites

Before deploying a connector, ensure you have:

- A Union.ai account
- Any required API keys or credentials for your connector
- Docker registry access (if using custom images)

## Connector Deployment Options

### Module-based Deployment

Module-based deployment is ideal when you want to iterate quickly on connector development. With this approach, you include your connector code directly using the `include` parameter.

```python
# app.py

from union import ImageSpec, Resources, Secret
from union.app import FlyteConnectorApp

image = ImageSpec(
    name="flyteconnector",
    packages=[
        "flytekit[connector]",
        "union",
        "union-runtime",
        "openai",  # ChatGPT connector needs openai SDK
    ],
    env={"FLYTE_SDK_LOGGING_LEVEL": "10"},
    builder="union",
)

openai_connector_app = FlyteConnectorApp(
    name="openai-connector-app",
    container_image=image,
    secrets=[Secret(key="flyte_openai_api_key")],
    limits=Resources(cpu="1", mem="1Gi"),
    include=["./chatgpt"],  # Include the connector module directory
)
```

With this approach, you organize your connector code in a module structure:

```bash
chatgpt/
├── __init__.py
├── connector.py
└── constants.py
```

The `include` parameter takes a list of files or directories to include in the deployment.

### ImageSpec-based Deployment

ImageSpec-based deployment is preferred for production environments where you have stable connector implementations. In this approach, your connector code is pre-installed in a container image.

```python
# app.py

from union import ImageSpec, Resources, Secret
from union.app import FlyteConnectorApp

image = ImageSpec(
    name="flyteconnector",
    packages=[
        "flytekit[connector]",
        "flytekitplugins-slurm",
        "union",
        "union-runtime",
    ],
    apt_packages=["build-essential", "libmagic1", "vim", "openssh-client", "ca-certificates"],
    env={"FLYTE_SDK_LOGGING_LEVEL": "10"},
    builder="union",
)

slurm_connector_app = FlyteConnectorApp(
    name="slurm-connector-app",
    container_image=image,
    secrets=[Secret(key="flyte_slurm_private_key")],
    limits=Resources(cpu="1", mem="1Gi"),
)
```

## Managing Secrets

Most connectors require credentials to authenticate with external services. Union.ai allows you to manage these securely:

```bash
# Create a secret for OpenAI API key
union create secret flyte_openai_api_key -f /etc/secrets/flyte_openai_api_key --project flytesnacks --domain development

# Create a secret for SLURM access
union create secret flyte_slurm_private_key -f /etc/secrets/flyte_slurm_private_key --project flytesnacks --domain development
```

Reference these secrets in your connector app:

```python
from union import Secret

# In your app definition
secrets=[Secret(key="flyte_openai_api_key")]
```

Inside your connector code, access these secrets using:

```python
from flytekit.extend.backend.utils import get_connector_secret

api_key = get_connector_secret(secret_key="FLYTE_OPENAI_API_KEY")
```

## Example: Creating a ChatGPT Connector

Here's how to implement a ChatGPT connector:

1. Create a connector class:

```python
# chatgpt/connector.py

import asyncio
import logging
from typing import Optional

import openai
from flyteidl.core.execution_pb2 import TaskExecution
from flytekit import FlyteContextManager
from flytekit.core.type_engine import TypeEngine
from flytekit.extend.backend.base_connector import ConnectorRegistry, Resource, SyncConnectorBase
from flytekit.extend.backend.utils import get_connector_secret
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate

from .constants import OPENAI_API_KEY, TIMEOUT_SECONDS

class ChatGPTConnector(SyncConnectorBase):
    name = "ChatGPT Connector"

    def __init__(self):
        super().__init__(task_type_name="chatgpt")

    async def do(
        self,
        task_template: TaskTemplate,
        inputs: Optional[LiteralMap] = None,
        **kwargs,
    ) -> Resource:
        ctx = FlyteContextManager.current_context()
        input_python_value = TypeEngine.literal_map_to_kwargs(ctx, inputs, {"message": str})
        message = input_python_value["message"]

        custom = task_template.custom
        custom["chatgpt_config"]["messages"] = [{"role": "user", "content": message}]
        client = openai.AsyncOpenAI(
            organization=custom["openai_organization"],
            api_key=get_connector_secret(secret_key=OPENAI_API_KEY),
        )

        logger = logging.getLogger("httpx")
        logger.setLevel(logging.WARNING)

        completion = await asyncio.wait_for(client.chat.completions.create(**custom["chatgpt_config"]), TIMEOUT_SECONDS)
        message = completion.choices[0].message.content
        outputs = {"o0": message}

        return Resource(phase=TaskExecution.SUCCEEDED, outputs=outputs)

ConnectorRegistry.register(ChatGPTConnector())
```

2. Define constants:

```python
# chatgpt/constants.py

# Constants for ChatGPT connector
TIMEOUT_SECONDS = 10
OPENAI_API_KEY = "FLYTE_OPENAI_API_KEY"
```

3. Create an `__init__.py` file:

```python
# chatgpt/__init__.py

from .connector import ChatGPTConnector

__all__ = ["ChatGPTConnector"]
```

## Using the Connector in a Workflow

After deploying your connector, you can use it in your workflows:

```python
# workflow.py

from flytekit import workflow
from flytekitplugins.openai import ChatGPTTask

chatgpt_small_job = ChatGPTTask(
    name="3.5-turbo",
    chatgpt_config={
        "model": "gpt-3.5-turbo",
        "temperature": 0.7,
    },
)

chatgpt_big_job = ChatGPTTask(
    name="gpt-4",
    chatgpt_config={
        "model": "gpt-4",
        "temperature": 0.7,
    },
)

@workflow
def wf(message: str) -> str:
    message = chatgpt_small_job(message=message)
    message = chatgpt_big_job(message=message)
    return message
```

Run the workflow:

```bash
union run --remote workflow.py wf --message "Tell me about Union.ai"
```

## Creating Your Own Connector

To create a custom connector:

1. Inherit from `SyncConnectorBase` or `AsyncConnectorBase`
2. Implement the required methods (`do` for synchronous connectors, `create`, `get`, and `delete` for asynchronous connectors)
3. Register your connector with `ConnectorRegistry.register(YourConnector())`
4. Deploy your connector using one of the methods above

## Deployment Commands

Deploy your connector app:

```bash
# Module-based deployment
union deploy apps app_module_deployment/app.py openai-connector-app

# ImageSpec-based deployment
union deploy apps app_image_spec_deployment/app.py slurm-connector-app
```

## Best Practices

1. **Security**: Never hardcode credentials; always use Union.ai secrets
2. **Error Handling**: Include robust error handling in your connector implementation
3. **Timeouts**: Set appropriate timeouts for external API calls
4. **Logging**: Implement detailed logging for debugging
5. **Testing**: Test your connector thoroughly before deploying to production

By following this guide, you can create and deploy custom connectors that extend Union.ai's capabilities to integrate with any external service or system your workflows need to interact with.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching ===

# Caching

Union.ai allows you to cache the output of nodes ([tasks](./tasks), [subworkflows, and sub-launch plans](./workflows/subworkflows-and-sub-launch-plans)) to make subsequent executions faster.

Caching is useful when many executions of identical code with the same input may occur.

Here's a video with a brief explanation and demo, focused on task caching:

📺 [Watch on YouTube](https://www.youtube.com/watch?v=WNkThCp-gqo)

> [!NOTE]
> * Caching is available and individiually enablable for all nodes *within* a workflow directed acyclic graph (DAG).
> * Nodes in this sense include tasks, subworkflows (workflows called directly within another workflow), and sub-launch plans (launch plans called within a workflow).
> * Caching is *not available* for top-level workflows or launch plans (that is, those invoked from UI or CLI).
> * By default, caching is *disabled* on all tasks, subworkflows and sub-launch plans, to avoid unintended consequences when caching executions with side effects. It must be explcitly enabled on any node where caching is desired.

## Enabling and configuring caching

Caching can be enabled by setting the `cache` parameter of the `@union.task` (for tasks) decorator or `with_overrides` method (for subworkflows or sub-launch plans) to a `Cache` object. The parameters of the `Cache` object are used to configure the caching behavior.
For example:

```python
import union

# Define a task and enable caching for it

@union.task(cache=union.Cache(version="1.0", serialize=True, ignored_inputs=["a"]))
def sum(a: int, b: int, c: int) -> int:
    return a + b + c

# Define a workflow to be used as a subworkflow

@union.workflow
def child*wf(a: int, b: int, c: int) -> list[int]:
    return [
        sum(a=a, b=b, c=c)
        for _ in range(5)
    ]

# Define a launch plan to be used as a sub-launch plan

child_lp = union.LaunchPlan.get_or_create(child_wf)

# Define a parent workflow that uses the subworkflow

@union.workflow
def parent_wf_with_subwf(input: int = 0):
    return [
        # Enable caching on the subworkflow
        child_wf(a=input, b=3, c=4).with_overrides(cache=union.Cache(version="1.0", serialize=True, ignored_inputs=["a"]))
        for i in [1, 2, 3]
    ]

# Define a parent workflow that uses the sub-launch plan

@union.workflow
def parent_wf_with_sublp(input: int = 0):
    return [
        child_lp(a=input, b=1, c=2).with_overrides(cache=union.Cache(version="1.0", serialize=True, ignored_inputs=["a"]))
        for i in [1, 2, 3]
    ]
```

In the above example, caching is enabled at multiple levels:

* At the task level, in the `@union.task` decorator of the task `sum`.
* At the workflow level, in the `with_overrides` method of the invocation of the workflow `child_wf`.
* At the launch plan level, in the `with_overrides` method of the invocation of the launch plan `child_lp`.

In each case, the result of the execution is cached and reused in subsequent executions.
Here the reuse is demonstrated by calling the `child_wf` and `child_lp` workflows multiple times with the same inputs.
Additionally, if the same node is invoked again with the same inputs (excluding input "a", as it is ignored for purposes of versioning)
the cached result is returned immediately instead of re-executing the process.
This applies even if the cached node is invoked externally through the UI or CLI.

## The `Cache` object

The [Cache]() object takes the following parameters:
<!-- TODO: Add link to API -->

* `version` (`Optional[str]`): Part of the cache key.
  A change to this parameter from one invocation to the next will invalidate the cache.
  This allows you to explicitly indicate when a change has been made to the node that should invalidate any existing cached results.
  Note that this is not the only change that will invalidate the cache (see below).
  Also, note that you can manually trigger cache invalidation per execution using the **Core concepts > Caching > The `overwrite-cache` flag**.

  If not set, the version will be generated based on the specified cache policies.
  When using `cache=True`, **Core concepts > Caching > Enabling caching with the default configuration**, the **Core concepts > Caching > Default cache policy** generates the version.

* `serialize` (`bool`): Enables or disables **Core concepts > Caching > Cache serialization**.
  When enabled, Union.ai ensures that a single instance of the node is run before any other instances that would otherwise run concurrently.
  This allows the initial instance to cache its result and lets the later instances reuse the resulting cached outputs.
  If not set, cache serialization is disabled.

* `ignored_inputs` (`Union[Tuple[str, ...], str]`): Input variables that should not be included when calculating the hash for the cache.
  If not set, no inputs are ignored.

* `policies` (`Optional[Union[List[CachePolicy], CachePolicy]]`): A list of [CachePolicy]() objects used for automatic version generation.
  If no `version` is specified and one or more polices are specified then these policies automatically generate the version.
  Policies are applied in the order they are specified to produce the final `version`.
  If no `version` is specified and no policies are specified then the **Core concepts > Caching > Default cache policy** generates the version.
  When using `cache=True`, **Core concepts > Caching > Enabling caching with the default configuration**, the **Core concepts > Caching > Default cache policy** generates the version.

* `salt` (`str`): A [salt](<https://en.wikipedia.org/wiki/salt_(cryptography)>) used in the hash generation. A salt is a random value that is combined with the input values before hashing.

## Enabling caching with the default configuration

Instead of specifying a `Cache` object, a simpler way to enable caching is to set `cache=True` in the `@union.task` decorator (for tasks) or the `with_overrides` method (for subworkflows and sub-launch plans).

When `cache=True` is set, caching is enabled with the following configuration:

* `version` is automatically generated by the [default cache policy](#).
* `serialize` is set to `False`.
* `ignored_inputs` is not set. No parameters are ignored.

You can convert the example above to use the default configuration throughout by changing each instance of `cache=union.Cache(...)` to `cache=True`. For example, the task `sum` would now be:

```python
@union.task(cache=True)
def sum(a: int, b: int, c: int) -> int:
    return a + b + c
```

## Automatic version generation

Automatic version generation is useful when you want to generate the version based on the function body of the task, or other criteria.

You can enable automatic version generation by specifying `cache=Cache(...)` with one or more `CachePolicy` classes in the `policies` parameter of the `Cache` object (and by not specifying an explicit `version` parameter), like this:

```python
@union.task(cache=Cache(policies=[CacheFunctionBody()]))
def sum(a: int, b: int, c: int) -> int:
    return a + b + c
```

Alternatively, you can just use the default configuration by specify use `cache=True`. In this case the default cache policy is used to generate the version.

## Default cache policy

Automatic version generation using the default cache policy is used

* if you set `cache=True`, or
* if you set `cache=Cache(...)` but do not specify an explicit `version` or `policies` parameters within the `Cache` object.

The default cache policy is `union.cache.CacheFunctionBody`.
This policy generates a version by hashing the text of the function body of the task.
This means that if the code in the function body changes, the version changes, and the cache is invalidated. Note that `CacheFunctionBody` does not recursively check for changes in functions or classes referenced in the function body.

## The `overwrite-cache` flag

When launching the execution of a workflow, launch plan or task, you can use the `overwrite-cache` flag to invalidate the cache and force re-execution.

### Overwrite cache on the command line

The `overwrite-cache` flag can be used from the command line with the `union run` command. For example:

```shell
$ union run --remote --overwrite-cache example.py wf
```

### Overwrite cache in the UI

You can also trigger cache invalidation when launching an execution from the UI by checking **Override**, in the launch dialog:

![Overwrite cache flag in the UI](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/caching/overwrite-cached-outputs.png)

### Overwrite cache programmatically

When using `UnionRemote`, you can use the `overwrite_cache` parameter in the [`UnionRemote.execute`]() method:
<!-- TODO: Add link to API -->

```python
from flytekit.configuration import Config
from union.remote import UnionRemote

remote = UnionRemote(
    config=Config.auto(),
    default_project="flytesnacks",
    default_domain="development"
)

wf = remote.fetch_workflow(name="workflows.example.wf")
execution = remote.execute(wf, inputs={"name": "Kermit"}, overwrite_cache=True)
```

## How caching works

When a node (with caching enabled) completes on Union.ai, a **key-value entry** is created in the **caching table**. The **value** of the entry is the output.
The **key** is composed of:

* **Project:** A task run under one project cannot use the cached task execution from another project which would cause inadvertent results between project teams that could result in data corruption.
* **Domain:** To separate test, staging, and production data, task executions are not shared across these environments.

* **Cache Version:** The cache version is either explicitly set using the `version` parameter in the `Cache` object or automatically set by a cache policy (see **Core concepts > Caching > Automatic version generation**).
  If the version changes (either explicitly or automatically), the cache entry is invalidated.

* **Node signature:** The cache is specific to the signature associated with the execution.
  The signature comprises the name, input parameter names/types, and the output parameter name/type of the node.
  If the signature changes, the cache entry is invalidated.
* **Input values:** A well-formed Union.ai node always produces deterministic outputs.
  This means that, given a set of input values, every execution should have identical outputs.
  When an execution is cached, the input values are part of the cache key.
  If a node is run with a new set of inputs, a new cache entry is created for the combination of that particular entity with those particular inputs.

The result is that within a given project and domain, a cache entry is created for each distinct combination of name, signature, cache version, and input set for every node that has caching enabled.
If the same node with the same input values is encountered again, the cached output is used instead of running the process again.

### Explicit cache version

When a change to code is made that should invalidate the cache for that node, you can explicitly indicate this by incrementing the `version` parameter value.
For a task example, see below. (For workflows and launch plans, the parameter would be specified in the `with_overrides` method.)

```python
@union.task(cache=union.Cache(version="1.1"))
def t(n: int) -> int:
    return n \* n + 1
```

Here the `version` parameter has been bumped from `1.0`to `1.1`, invalidating of the existing cache.
The next time the task is called it will be executed and the result re-cached under an updated key.
However, if you change the version back to `1.0`, you will get a "cache hit" again and skip the execution of the task code.

If used, the `version` parameter must be explicitly changed in order to invalidate the cache.

If not used, then a cache policy may be specified to generate the version, or you can rely on the default cache policy.

Not every Git revision of a node will necessarily invalidate the cache.
A change in Git SHA does not necessarily correlate to a change in functionality.
You can refine your code without invalidating the cache as long as you explicitly use, and don't change, the `version` parameter (or the signature, see below) of the node.

The idea behind this is to decouple syntactic sugar (for example, changed documentation or renamed variables) from changes to logic that can affect the process's result.
When you use Git (or any version control system), you have a new version per code change.
Since the behavior of most nodes in a Git repository will remain unchanged, you don't want their cached outputs to be lost.

When a node's behavior does change though, you can bump `version` to invalidate the cache entry and make the system recompute the outputs.

### Node signature

If you modify the signature of a node by adding, removing, or editing input parameters or output return types, Union.ai invalidates the cache entries for that node.
During the next execution, Union.ai executes the process again and caches the outputs as new values stored under an updated key.

### Caching when running locally

The description above applies to caching when executing a node remotely on your Union.ai cluster.
Caching is also available [when running on a local cluster](https://www.union.ai/docs/v1/union/user-guide/development-cycle/running-in-a-local-cluster).

When running locally the caching mechanism is the same except that the cache key does not include **project** or **domain** (since there are none).
The cache key is composed only of **cache version**, **signature**, and **inputs**.
The results of local executions are stored under `~/.flyte/local-cache/`.

Similar to the remote case, a local cache entry for a node will be invalidated if either the `cache_version` or the signature is modified.
In addition, the local cache can also be emptied by running

```shell
$ union local-cache clear
```

This removes the contents of the `~/.flyte/local-cache/` directory.

Occasionally, you may want to disable the local cache for testing purposes, without making any code changes to your task decorators. You can set the `FLYTE_LOCAL_CACHE_ENABLED` environment variable to `false` in your terminal in order to bypass caching temporarily. 

## Cache serialization

Cache serialization means only executing a single instance of a unique cacheable task (determined by the `cache_version` parameter and task signature) at a time.
Using this mechanism, Union.ai ensures that during multiple concurrent executions of a task only a single instance is evaluated, and all others wait until completion and reuse the resulting cached outputs.

Ensuring serialized evaluation requires a small degree of overhead to coordinate executions using a lightweight artifact reservation system.
Therefore, this should be viewed as an extension to rather than a replacement for non-serialized cacheable tasks.
It is particularly well fit for long-running or otherwise computationally expensive tasks executed in scenarios similar to the following examples:

* Periodically scheduled workflow where a single task evaluation duration may span multiple scheduled executions.
* Running a commonly shared task within different workflows (which receive the same inputs).

### Enabling cache serialization

Task cache serializing is disabled by default to avoid unexpected behavior for task executions.
To enable, set `serialize=True` in the `@union.task` decorator.
The cache key definitions follow the same rules as non-serialized cache tasks.

```python
@union.task(cache=union.Cache(version="1.1", serialize=True))
def t(n: int) -> int:
return n \* n
```

In the above example calling `t(n=2)` multiple times concurrently (even in different executions or workflows) will only execute the multiplication operation once.
Concurrently evaluated tasks will wait for completion of the first instance before reusing the cached results and subsequent evaluations will instantly reuse existing cache results.

### How does cache serialization work?

The cache serialization paradigm introduces a new artifact reservation system. Executions with cache serialization enabled use this reservation system to acquire an artifact reservation, indicating that they are actively evaluating a node, and release the reservation once the execution is completed.
Union.ai uses a clock-skew algorithm to define reservation timeouts. Therefore, executions are required to periodically extend the reservation during their run.

The first execution of a serializable node will successfully acquire the artifact reservation.
Execution will be performed as usual and upon completion, the results are written to the cache, and the reservation is released.
Concurrently executed node instances (those that would otherwise run in parallel with the initial execution) will observe an active reservation, in which case these instances will wait until the next reevaluation and perform another check.
Once the initial execution completes, they will reuse the cached results as will any subsequent instances of the same node.

Union.ai handles execution failures using a timeout on the reservation.
If the execution currently holding the reservation fails to extend it before it times out, another execution may acquire the reservation and begin processing.

## Caching of offloaded objects

In some cases, the default behavior displayed by Union.ai’s caching feature might not match the user's intuition.
For example, this code makes use of pandas dataframes:

```python
@union.task
def foo(a: int, b: str) -> pandas.DataFrame:
    df = pandas.DataFrame(...)
    ...
    return df

@union.task(cache=True)
def bar(df: pandas.DataFrame) -> int:
    ...

@union.workflow
def wf(a: int, b: str):
    df = foo(a=a, b=b)
    v = bar(df=df)
```

If run twice with the same inputs, one would expect that `bar` would trigger a cache hit, but that’s not the case because of the way dataframes are represented in Union.ai.

However, Union.ai provides a new way to control the caching behavior of literals.
This is done via a `typing.Annotated` call on the node signature.
For example, in order to cache the result of calls to `bar`, you can rewrite the code above like this:

```python
def hash_pandas_dataframe(df: pandas.DataFrame) -> str:
    return str(pandas.util.hash_pandas_object(df))

@union.task
def foo_1(a: int, b: str) -> Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]:
    df = pandas.DataFrame(...)
    ...
    return df

@union.task(cache=True)
def bar_1(df: pandas.DataFrame) -> int:
    ...

@union.workflow
def wf_1(a: int, b: str):
    df = foo(a=a, b=b)
    v = bar(df=df)
```

Note how the output of the task `foo` is annotated with an object of type `HashMethod`.
Essentially, it represents a function that produces a hash that is used as part of the cache key calculation in calling the task `bar`.

### How does caching of offloaded objects work?

Recall how input values are taken into account to derive a cache key.
This is done by turning the literal representation into a string and using that string as part of the cache key.
In the case of dataframes annotated with `HashMethod`, we use the hash as the representation of the literal.
In other words, the literal hash is used in the cache key.
This feature also works in local execution.

Here’s a complete example of the feature:

```python
def hash_pandas_dataframe(df: pandas.DataFrame) -> str:
    return str(pandas.util.hash_pandas_object(df))

@union.task
def uncached_data_reading_task() -> Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]:
    return pandas.DataFrame({"column_1": [1, 2, 3]})

@union.task(cache=True)
def cached_data_processing_task(df: pandas.DataFrame) -> pandas.DataFrame:
    time.sleep(1)
    return df \* 2

@union.task
def compare_dataframes(df1: pandas.DataFrame, df2: pandas.DataFrame):
    assert df1.equals(df2)

@union.workflow
def cached_dataframe_wf():
    raw_data = uncached_data_reading_task()

    # Execute `cached_data_processing_task` twice, but force those
    # two executions to happen serially to demonstrate how the second run
    # hits the cache.
    t1_node = create_node(cached_data_processing_task, df=raw_data)
    t2_node = create_node(cached_data_processing_task, df=raw_data)
    t1_node >> t2_node

    # Confirm that the dataframes actually match
    compare_dataframes(df1=t1_node.o0, df2=t2_node.o0)

if **name** == "**main**":
    df1 = cached_dataframe_wf()
    stickioesprint(f"Running cached_dataframe_wf once : {df1}")
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/workspaces ===

# Workspaces

Workspaces provide a convenient VSCode development environment for iterating on
your Union.ai tasks, workflows, and apps.

With workspaces, you can:

* Develop and debug your tasks, workflows, or code in general
* Run your tasks and workflows in a way that matches your production environment
* Deploy your workflows and apps to development, staging, or production environments
* Persist files across workspace restarts to save your work
* Specify secrets and resources for your workspace
* Specify custom container images
* Specify custom `on_startup` commands
* Adjust the idle time-to-live (TTL) for your workspace to avoid unneeded expenses
* Authenticate with GitHub to clone private repositories

## Creating a workspace

To create a workspace, click on the **Workspace** tab on left navbar and click
on the **New Workspace** button on the top right.

![Create Workspace](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/create-new-workspace-1.png)

Provide a name for your workspace, set an **Idle TTL** (time to live), and
click **Create**.

![Create Workspace](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/create-new-workspace-2.png)

> [!NOTE]
> The Idle TTL is the amount of time a workspace will be idle before it is
> automatically stopped. Workspaces have a global TTL of 1 day, but you can set
> the idle TTL field to a shorter duration to stop the workspace sooner.

You should see a new workspace created in the Workspaces view:

![Create Workspace](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/create-new-workspace-3.png)

## Running a workspace

To run a workspace, click on the switch on the workspace item:

![Run Workspace](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/run-workspace-1.png)

Once the workspace has started, you can click on the **Open in VSCode** button:

![Run Workspace](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/run-workspace-2.png)

Once the startup commands have completed, you'll see a browser-based VSCode IDE:

![Run Workspace](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/run-workspace-3.png)

To stop a workspace, click on the toggle switch on the workspace item.

## Filesystem persistence

Any changes to the filesystem that you make in the working directory of your
workspace (the directory you find yourself in when you first open the workspace)
are persisted across workspace restarts.

This allows you to save data, code, models, and other files in your workspace.

> [!NOTE]
> Storing large datasets, models, and other files in your workspace may slow down
> the start and stop times of your workspace. This is because the workspace
> instance needs time to download/upload the files from persistent storage.

## Editing a workspace

Change the workspace configuration by clicking on the **Edit** button:

![Edit Workspace](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/edit-workspace-1.png)

Note that you can change everything except the workspace name.

![Edit Workspace](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/edit-workspace-2.png)

## The workspace detail view

Clicking on the workspace item on the list view will reveal the workspace detail view,
which provides all the information about the workspace.

![Workspace Detail](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/workspace-detail.png)

## Archiving a workspace

Archive a workspace by clicking on the **Archive** button:

![Archive Workspace](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/archive-workspace.png)

Show archived workspaces by clicking on the **Show archived** toggle
on the top right of the workspaces list view. Unarchive a workspace by clicking
on the **Unarchive** button:

![Unarchive Workspace](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/unarchive-workspace.png)

## Workspace CLI commands

The `union` CLI also provides commands for managing workspaces.

### Create a workspace configuration

The first step is to create a yaml file that describes the workspace.

```shell
$ union create workspace-config --init base_image workspace.yaml
```

This will create a `workspace.yaml` file in the current directory, with the
default configuration values that you can edit for your needs:

```yaml
name: my-workspace
description: my workspace description
project: <project>
domain: <domain>
container_image: public.ecr.aws/unionai/workspace-base:py3.11-latest
resources:
    cpu: "2"
    mem: "4Gi"
    gpu: null
accelerator: null
on_startup: null
ttl_seconds: 1200
```

Note that the yaml file contains a `project` and `domain` field that you can set to create a
workspace in a specific project and domain.

### Create a workspace

Then, create a workspace using the `union create workspace` command:

```shell
$ union create workspace workspace.yaml
```

This command will also start your workspace, and will print out the workspace
link that you click on to open the workspace in your browser:

```shell
Created: workspace_definition {
  ...
}

Starting workspace 'my-workspace'

🚀 Workspace started: Open VSCode in Browser
```

### Stop a workspace

When you want to stop a workspace, use the `union stop workspace` command:

```shell
$ union stop workspace --name my-workspace
```

This will print out a message indicating that the workspace has been stopped:

```shell
Workspace instance stopped: org: "org"
...
```

### Update a workspace

To update a workspace, modify the `workspace.yaml` file and run the
`union update workspace` command:

```shell
$ union update workspace workspace.yaml
```

This will print out a message that looks something like:

```shell
Updated: workspace_definition {
  ...
}
```

### Get existing workspaces

To get existing workspaces, use the `union get workspace` command:

```shell
$ union get workspace
```

This will print out a table of all the workspaces you have access to in the
specified project and domain (the command uses the default project and domain
if you don't provide them).

```shell
┏━━━━━━━━━━━━━━━━━━━━━━┳━━━━━┳━━━━━━━━┳━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Workspace name       ┃ CPU ┃ Memory ┃ GPU ┃ Accelerator         ┃ TTL Seconds ┃ Active URL ┃
┡━━━━━━━━━━━━━━━━━━━━━━╇━━━━━╇━━━━━━━━╇━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ my-workspace         │ 2   │ 4Gi    │ -   │ -                   │ 1200        │ -          │
└──────────────────────┴─────┴────────┴─────┴─────────────────────┴─────────────┴────────────┘
```

To get the details of a specific workspace, provide
the workspace name with the `--name` flag.

### Start a workspace

To start a workspace, use the `union start workspace` command, specifying the
name of the workspace you want to start in the `--name` flag.

```shell
$ union start workspace --name my-workspace
```

You should see a message that looks like:

```shell
Starting workspace 'my-workspace'

🚀 Workspace started: Open VSCode in Browser
```

## Customizing a workspace

There are several settings that you can customize for a workspace in the UI or
the CLI.

### Setting secrets

If you don't have any secrets yet, create them with the `union create secret`
command:

```shell
$ union create secret --project my_project --domain my_domain --name my_secret
```

You'll be prompted to enter a secret value in the terminal:

```shell
Enter secret value: ...
```

> [!NOTE]
> You can learn more about secrets management [here](https://www.union.ai/docs/v1/union/user-guide/development-cycle/managing-secrets).

Set secrets for your workspace by clicking on the **Secrets** tab in the sidebar.
Provide the `my_secret` key and optionally, the environment variable you want
to assign it to in the workspace.

![Secrets](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/setting-secrets.png)

#### Setting secrets via the CLI

Set secrets via the CLI using the `secrets` key, which is a list of objects with
a `key` and `env_var` (optional) field:

```yaml
name: my-workspace
description: my workspace description
project: flytesnacks
domain: development
container_image: public.ecr.aws/unionai/workspace-base:py3.11-latest
secrets:
    - key: my_secret  # this is the secret key you set when you create the secret
      env_var: MY_SECRET  # this is an optional environment variable that you
                          # can bind the secret value onto.
...
```

### Setting CPU, memory, and GPU resources

You can also set the resources for your workspace:

![Resources](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/setting-resources.png)

These resources must be compatible with the resources available on your
Union cluster. Find the details of your cluster in the top-level dashboard:

![Cluster Compute Resources](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/byoc-compute-resources.png)

You can choose [the GPU accelerator](./tasks/task-hardware-environment/accelerators) that corresponds to your available instance types. In the screen shot above, the accelerator
value is `nvidia-tesla-v100`.

### Specifying custom `on_startup` commands

If you need to run any commands like install additional dependencies or `wget`
a file from the web, specify custom `on_startup` commands:

![On Startup](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/customize-onstartup.png)

### Specifying custom container images

By default, the workspace will use a Union.ai-provided container image which contains
the following Python libraries:

- `union`
- `flytekit`
- `uv`
- `ipykernel`
- `pandas`
- `pyarrow`
- `scikit-learn`
- `matplotlib`

#### Specifying a custom container image in the UI

You can specify a pre-built custom container image by clicking on the **Container**
tab in the sidebar and provide the image name in the workspace creation form.

> [!NOTE]
> The minimum requirement for custom images is that it has `union>=0.1.166`
> installed in it.

![Custom Container](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/customize-container-image.png)

In many cases, you may want to use the same container image as a task execution
that you want to debug. You can find the container image URI by going to the
task execution details page:

![Task Execution](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/customize-container-image-get-uri.png)

You can specify:
- Any public container image URI as long as it has `union>=0.1.166` installed
- Images built with the Union.ai [image builder service](https://www.union.ai/docs/v1/union/user-guide/development-cycle/image-spec)
- Images available in your private container registry (e.g. [AWS ECR](../integrations/enabling-aws-resources/enabling-aws-ecr), [GCP Artifact Registry](../integrations/enabling-gcp-resources/enabling-google-artifact-registry), or [Azure Container Registry](../integrations/enabling-azure-resources/enabling-azure-container-registry))

#### Specifying a custom container image in the CLI

The `union` CLI provides a way to specify a custom container image that's built
by Union's image builder service. To do this, run the following command:

```shell
union create workspace-config --init custom_image workspace.yaml
```

This will create a `workspace.yaml` file with a `container_image` image key
that supports the [ImageSpec](https://www.union.ai/docs/v1/union/user-guide/development-cycle/image-spec) arguments.
When you run the `union create workspace` command with this `workspace.yaml` file,
it will first build the image before creating the workspace definition.

#### Example: Specifying a workspace with GPUs

The following example shows a `workspace.yaml` file that specifies a workspace
with a GPU accelerator.

```yaml
# workspace.yaml
name: workspace-with-gpu
description: Workspace that uses GPUs
# Make sure that the project and domain exists
project: <project>
domain: <domain>
container_image:
    name: custom-image
    builder: union
    packages:
    - torch
resources:
    cpu: "2"
    mem: "4Gi"
    gpu: "1"
accelerator: nvidia-l4
on_startup: null
ttl_seconds: 1200
```

Then run the following command to create the workspace:

```shell
union create workspace workspace.yaml
```

The configuration above will first build a custom container with `torch` installed.
Then, it will create a workspace definition with a single `nvidia-l4` GPU accelerator.
Finally, it will start a workspace session. In the VSCode browser IDE, you can quickly
verify that `torch` has access to GPUs by running the following in a Python REPL:

```python
import torch
print(torch.cuda.is_available())
```

> [!NOTE]
> See the **Core concepts > Workspaces > Customizing a workspace > Setting CPU, memory, and GPU resources**
> section for more details on how to configure specific GPU accelerators.

## Authenticating with GitHub

If you want to clone a private GitHub repository into your workspace, you can
using the pre-installed `gh` CLI to authenticate your workspace session:

```shell
gh auth login
```

You'll be prompted to enter either a GitHub personal access token (PAT) or
authenticate via the browser.

> [!NOTE]
> You can create and set a `GITHUB_TOKEN` secret to set the access token for your
> workspace, but you'll need to authenticate via `gh auth login` in every new
> workspace session:

* Create a secret with the `union create secret` command
* Create a workspace or update an existing one with the `GITHUB_TOKEN` secret,
  setting the environment variable to e.g. `GITHUB_TOKEN`
* In the workspace session, run `gh auth login` to authenticate with GitHub and
  use the `$GITHUB_TOKEN` environment variable as the personal access token.

## Sorting and filtering workspaces

You can filter workspaces to only the active ones by clicking on the **Active**
toggle on the top left of the workspaces list view.

![Active Workspaces](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/active-workspaces.png)

Sort by recently updated by clicking on the **Recently updated** toggle on the
top right of the workspaces list view, and you can also sort by recently
updated by clicking on the **Recently updated** toggle on the top right of the
workspaces list view.

![Filtering and Sorting Workspaces](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/filtering-sorting-workspaces.png)

## Troubleshooting

You may come across issues starting up a workspace due to various reasons,
including:

* Resource requests not being available on your Union cluster.
* Secrets key typpos of not being defined on the project/domain.
* Container image typos or container images not existing.

Under the hood, workspaces are powered by Union.ai tasks, so to debug these kinds
of issues, the workspace detail page provides a link to the underlying
task that's hosting the VSCode IDE:

![Workspace Detail](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/failed-workspace-detail.png)

Clicking on the link will open the task details page, where you can see the
underlying task definition, pod events, and logs to debug further.

![Task Detail](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/workspaces/failed-task-detail.png)

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/named-outputs ===

# Named outputs

By default, Union.ai employs a standardized convention to assign names to the outputs of tasks or workflows. Each output is sequentially labeled as `o1`, `o2`, `o3`, and so on.

You can, however, customize these output names by using a `NamedTuple`.

To begin, import the required dependencies:

```python
# basics/named_outputs.py

from typing import NamedTuple

import union
```

Here we define a `NamedTuple` and assign it as an output to a task called `slope`:

```python
slope_value = NamedTuple("slope_value", [("slope", float)])

@union.task
def slope(x: list[int], y: list[int]) -> slope_value:
    sum_xy = sum([x[i] * y[i] for i in range(len(x))])
    sum_x_squared = sum([x[i] ** 2 for i in range(len(x))])
    n = len(x)
    return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2)
```

Similarly, we define another `NamedTuple` and assign it to the output of another task, `intercept`:

```python
intercept_value = NamedTuple("intercept_value", [("intercept", float)])

@union.task
def intercept(x: list[int], y: list[int], slope: float) -> intercept_value:
    mean_x = sum(x) / len(x)
    mean_y = sum(y) / len(y)
    intercept = mean_y - slope * mean_x
    return intercept
```

> [!Note]
> While it’s possible to create `NamedTuples` directly within the code,
> it’s often better to declare them explicitly.
> This helps prevent potential linting errors in tools like `mypy`.
>
> ```python
> def slope() -> NamedTuple("slope_value", slope=float):
>     pass
> ```

You can easily unpack the `NamedTuple` outputs directly within a workflow.
Additionally, you can also have the workflow return a `NamedTuple` as an output.

>[!Note]
> Remember that we are extracting individual task execution outputs by dereferencing them.
> This is necessary because `NamedTuples` function as tuples and require dereferencing.

```python
slope_and_intercept_values = NamedTuple("slope_and_intercept_values", [("slope", float), ("intercept", float)])

@union.workflow
def simple_wf_with_named_outputs(x: list[int] = [-3, 0, 3], y: list[int] = [7, 4, -2]) -> slope_and_intercept_values:
    slope_value = slope(x=x, y=y)
    intercept_value = intercept(x=x, y=y, slope=slope_value.slope)
    return slope_and_intercept_values(slope=slope_value.slope, intercept=intercept_value.intercept)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/image-spec ===

# ImageSpec

In this section, you will uncover how Union.ai utilizes Docker images to construct containers under the hood, and you'll learn how to craft your own images to encompass all the necessary dependencies for your tasks or workflows.

You will explore how to execute a raw container with custom commands,
indicate multiple container images within a single workflow,
and get familiar with the ins and outs of `ImageSpec`!

`ImageSpec` allows you to customize the container image for your Union.ai tasks without a Dockerfile. `ImageSpec` speeds up the build process by allowing you to reuse previously downloaded packages from the PyPI and APT caches.

By default, the `ImageSpec` will be built using the [remote builder](https://www.union.ai/docs/v1/union/user-guide/development-cycle/image-spec), but you can always specify your own e.g. local Docker.

For every `union.PythonFunctionTask` task or a task decorated with the `@task` decorator, you can specify rules for binding container images. By default, union binds a single container image, i.e.,
the [default Docker image](https://ghcr.io/flyteorg/flytekit), to all tasks. To modify this behavior, use the `container_image` parameter available in the `union.task` decorator, and pass an `ImageSpec` definition.

Before building the image, union checks the container registry to see if the image already exists. If the image does not exist, union will build the image before registering the workflow and replace the image name in the task template with the newly built image name.

## Install Python or APT packages
You can specify Python packages and APT packages in the `ImageSpec`.
These specified packages will be added on top of the [default image](https://github.com/flyteorg/flytekit/blob/master/Dockerfile), which can be found in the union Dockerfile.
More specifically, union invokes [DefaultImages.default_image()](https://github.com/flyteorg/flytekit/blob/master/flytekit/configuration/default_images.py#L26-L27) function. This function determines and returns the default image based on the Python version and union version. For example, if you are using Python 3.8 and flytekit 1.6.0, the default image assigned will be `ghcr.io/flyteorg/flytekit:py3.8-1.6.0`.

```python
from union import ImageSpec

sklearn_image_spec = ImageSpec(
  packages=["scikit-learn", "tensorflow==2.5.0"],
  apt_packages=["curl", "wget"],
)
```

## Install Conda packages

Define the `ImageSpec` to install packages from a specific conda channel.

```python
image_spec = ImageSpec(
  conda_packages=["langchain"],
  conda_channels=["conda-forge"],  # List of channels to pull packages from.
)
```

## Use different Python versions in the image

You can specify the Python version in the `ImageSpec` to build the image with a different Python version.

```python
image_spec = ImageSpec(
  packages=["pandas"],
  python_version="3.9",
)
```

## Import modules only in a specify imageSpec environment

The `is_container()` method is used to determine whether the task is utilizing the image constructed from the `ImageSpec`. If the task is indeed using the image built from the `ImageSpec`, it will return true. This approach helps minimize module loading time and prevents unnecessary dependency installation within a single image.

In the following example, both `task1` and `task2` will import the `pandas` module. However, `Tensorflow` will only be imported in `task2`.

```python
from flytekit import ImageSpec, task
import pandas as pd

pandas_image_spec = ImageSpec(
  packages=["pandas"],
  registry="ghcr.io/flyteorg",
)

tensorflow_image_spec = ImageSpec(
  packages=["tensorflow", "pandas"],
  registry="ghcr.io/flyteorg",
)

# Return if and only if the task is using the image built from tensorflow_image_spec.
if tensorflow_image_spec.is_container():
  import tensorflow as tf

@task(container_image=pandas_image_spec)
def task1() -> pd.DataFrame:
  return pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [1, 22]})

@task(container_image=tensorflow_image_spec)
def task2() -> int:
  num_gpus = len(tf.config.list_physical_devices('GPU'))
  print("Num GPUs Available: ", num_gpus)
  return num_gpus
```

## Install CUDA in the image

There are few ways to install CUDA in the image.

### Use Nvidia docker image

CUDA is pre-installed in the Nvidia docker image. You can specify the base image in the `ImageSpec`.

```python
image_spec = ImageSpec(
  base_image="nvidia/cuda:12.6.1-cudnn-devel-ubuntu22.04",
  packages=["tensorflow", "pandas"],
  python_version="3.9",
)
```

### Install packages from extra index

CUDA can be installed by specifying the `pip_extra_index_url` in the `ImageSpec`.

```python
image_spec = ImageSpec(
  name="pytorch-mnist",
  packages=["torch", "torchvision", "flytekitplugins-kfpytorch"],
  pip_extra_index_url=["https://download.pytorch.org/whl/cu118"],
)
```

## Build an image in different architecture

You can specify the platform in the `ImageSpec` to build the image in a different architecture, such as `linux/arm64` or `darwin/arm64`.

```python
image_spec = ImageSpec(
  packages=["pandas"],
  platform="linux/arm64",
)
```

## Customize the tag of the image

You can customize the tag of the image by specifying the `tag_format` in the `ImageSpec`. In the following example, the tag will be `<spec_hash>-dev`.

```python
image_spec = ImageSpec(
  name="my-image",
  packages=["pandas"],
  tag_format="{spec_hash}-dev",
)
```

## Copy additional files or directories

You can specify files or directories to be copied into the container `/root`, allowing users to access the required files. The directory structure will match the relative path. Since Docker only supports relative paths, absolute paths and paths outside the current working directory (e.g., paths with "../") are not allowed.

```python
from union import task, workflow, ImageSpec

image_spec = ImageSpec(
    name="image_with_copy",
    copy=["files/input.txt"],
)

@task(container_image=image_spec)
def my_task() -> str:
    with open("/root/files/input.txt", "r") as f:
        return f.read()
```

## Define ImageSpec in a YAML File

You can override the container image by providing an ImageSpec YAML file to the `union run` or `union register` command. This allows for greater flexibility in specifying a custom container image. For example:

```yaml
# imageSpec.yaml
python_version: 3.11
packages:
  - sklearn
env:
  Debug: "True"
```

Use union to register the workflow:

```shell
$ union run --remote --image image.yaml image_spec.py wf
```

## Build the image without registering the workflow

If you only want to build the image without registering the workflow, you can use the `union build` command.

```shell
$ union build --remote image_spec.py wf
```

## Force push an image

In some cases, you may want to force an image to rebuild, even if the ImageSpec hasn’t changed. To overwrite an existing image, pass the `FLYTE_FORCE_PUSH_IMAGE_SPEC=True` to the `union` command.

```bash
FLYTE_FORCE_PUSH_IMAGE_SPEC=True union run --remote image_spec.py wf
```

You can also force push an image in the Python code by calling the `force_push()` method.

```python
image = ImageSpec(packages=["pandas"]).force_push()
```

## Getting source files into ImageSpec

Typically, getting source code files into a task's image at run time on a live Union.ai backend is done through the fast registration mechanism.

However, if your `ImageSpec` constructor specifies a `source_root` and the `copy` argument is set to something other than `CopyFileDetection.NO_COPY`, then files will be copied regardless of fast registration status.
If the `source_root` and `copy` fields to an `ImageSpec` are left blank, then whether or not your source files are copied into the built `ImageSpec` image depends on whether or not you use fast registration. Please see [Running your code](https://www.union.ai/docs/v1/union/user-guide/development-cycle/running-your-code) for the full explanation.

Since files are sometimes copied into the built image, the tag that is published for an ImageSpec will change based on whether fast register is enabled, and the contents of any files copied.

