# Tasks
> This bundle contains all pages in the Tasks section.
> Source: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks ===

# Tasks

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](https://www.union.ai/docs/v1/union/user-guide/core-concepts/section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

Tasks are the fundamental units of compute in Union.ai.
They are independently executable, strongly typed, and containerized building blocks that make up workflows.
Workflows are constructed by chaining together tasks, with the output of one task feeding into the input of the next to form a directed acyclic graph.

## Tasks are independently executable

Tasks are designed to be independently executable, meaning that they can be run in isolation from other tasks.
And since most tasks are just Python functions, they can be executed on your local machine, making it easy to unit test and debug tasks locally before deploying them to Union.ai.

Because they are independently executable, tasks can also be shared and reused across multiple workflows and, as long as their logic is deterministic, their input and outputs can be [cached](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching/page.md) to save compute resources and execution time.

## Tasks are strongly typed

Tasks have strongly typed inputs and outputs, which are validated at deployment time.
This helps catch bugs early and ensures that the data passing through tasks and workflows is compatible with the explicitly stated types.

Under the hood, Union.ai uses the [Flyte type system]() and translates between the Flyte types and the Python types.
Python type annotations make sure that the data passing through tasks and workflows is compatible with the explicitly stated types defined through a function signature.
The Union.ai type system is also used for caching, data lineage tracking, and automatic serialization and deserialization of data as it’s passed from one task to another.

## Tasks are containerized

While (most) tasks are locally executable, when a task is deployed to Union.ai as part of the registration process it is containerized and run in its own independent Kubernetes pod.

This allows tasks to have their own independent set of [software dependencies](./task-software-environment/_index) and [hardware requirements](./task-hardware-environment/_index).
For example, a task that requires a GPU can be deployed to Union.ai with a GPU-enabled container image, while a task that requires a specific version of a software library can be deployed with that version of the library installed.

## Tasks are named, versioned, and immutable

The fully qualified name of a task is a combination of its project, domain, and name. To update a task, you change it and re-register it under the same fully qualified name. This creates a new version of the task while the old version remains available. At the version level task are, therefore, immutable. This immutability is important for ensuring that workflows are reproducible and that the data lineage is accurate.

## Tasks are (usually) deterministic and cacheable

When deciding if a unit of execution is suitable to be encapsulated as a task, consider the following questions:

* Is there a well-defined graceful/successful exit criteria for the task?
    * A task is expected to exit after completion of input processing.
* Is it deterministic and repeatable?
    * Under certain circumstances, a task might be cached or rerun with the same inputs.
      It is expected to produce the same output every time.
      You should, for example, avoid using random number generators with the current clock as seed.
* Is it a pure function? That is, does it have side effects that are unknown to the system?
    * It is recommended to avoid side effects in tasks.
    * When side effects are unavoidable, ensure that the operations are idempotent.

For details on task caching, see [Caching](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching/page.md).

## Workflows can contain many types of tasks

One of the most powerful features of Union.ai is the ability to run widely differing computational workloads as tasks with a single workflow.

Because of the way that Union.ai is architected, tasks within a single workflow can differ along many dimensions. While the total number of ways that tasks can be configured is quite large, the options fall into three categories:

* **Task type**: These include standard Python tasks, map tasks, raw container tasks, and many specialized plugin tasks. For more information, see **Core concepts > Tasks > Other task types**.
* **Software environment**: Define the task container image, dependencies, and even programming language. For more information, see [Task software environment](./task-software-environment/_index).
* **Hardware environment**: Define the resource requirements (processor numbers, storage amounts) and machine node characteristics (CPU and GPU type). For more information, see [Task hardware environment](./task-hardware-environment/_index).

### Mix and match task characteristics

Along these three dimensions, you can mix and match characteristics to build a task definition that performs exactly the job you want, while still taking advantage of all the features provided at the workflow level like output caching, versioning, and reproducibility.

Tasks with diverse characteristics can be combined into a single workflow.
For example, a workflow might contain:

* A **Python task running on your default container image** with default dependencies and a default resource and hardware profile.
* A **Python task running on a container image with additional dependencies** configured to run on machine nodes with a specific type of GPU.
* A **raw container task** running a Java process.
* A **plugin task** running a Spark job that spawns its own cluster-in-a-cluster.
* A **map task** that runs multiple copies of a Python task in parallel.

The ability to build workflows from such a wide variety of heterogeneous tasks makes Union.ai uniquely flexible.

> [!NOTE]
> Not all parameters are compatible. For example, with specialized plugin task types, some configurations are
> not available (this depends on task plugin details).

## Task configuration

The `@union.task` decorator can take a number of parameters that allow you to configure the task's behavior.
For example, you can specify the task's software dependencies, hardware requirements, caching behavior, retry behavior, and more.
For more information, see **Core concepts > Tasks > Task parameters**.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/map-tasks ===

## Map tasks

A map task allows you to execute many instances of a task within a single workflow node.
This enables you to execute a task across a set of inputs without having to create a node for each input, resulting in significant performance improvements.

Map tasks find application in various scenarios, including:
* When multiple inputs require running through the same code logic.
* Processing multiple data batches concurrently.

Just like normal tasks, map tasks are automatically parallelized to the extent possible given resources available in the cluster.

```python
THRESHOLD = 11

@union.task
def detect_anomalies(data_point: int) -> bool:
    return data_point > THRESHOLD

@union.workflow
def map_workflow(data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]) -> list[bool]:
    # Use the map task to apply the anomaly detection function to each data point
    return union.map(detect_anomalies)(data_point=data)

```

> [!NOTE]
> Map tasks can also map over launch plans. For more information and example code, see [Mapping over launch plans](https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/mapping-over-launch-plans).

To customize resource allocations, such as memory usage for individual map tasks, you can leverage `with_overrides`. Here’s an example using the `detect_anomalies` map task within a workflow:

```python
import union

@union.workflow
def map_workflow_with_resource_overrides(
    data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]
) -> list[bool]:

    return (
        union.map(detect_anomalies)(data_point=data)
        .with_overrides(requests=union.Resources(mem="2Gi"))
    )
```

You can also configure `concurrency` and `min_success_ratio` for a map task:

- `concurrency` limits the number of mapped tasks that can run in parallel to the specified batch size. If the input size exceeds the concurrency value, multiple batches will run serially until all inputs are processed. If left unspecified, it implies unbounded concurrency.
- `min_success_ratio` determines the minimum fraction of total jobs that must complete successfully before terminating the map task and marking it as successful.

```python
@union.workflow
def map_workflow_with_additional_params(
    data: list[int] = [10, 12, 11, 10, 13, 12, 100, 11, 12, 10]
) -> list[typing.Optional[bool]]:

    return union.map(
        detect_anomalies,
        concurrency=1,
        min_success_ratio=0.75
    )(data_point=data)
```

For more details see [Map Task example](https://github.com/unionai-oss/union-cloud-docs-examples/tree/main/map_task) in the `unionai-examples` repository and [Map Tasks]() section.
<!-- TODO: Add link to API -->

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-types ===

# Other task types

Task types include:

* **`PythonFunctionTask`**: This Python class represents the standard default task.
It is the type that is created when you use the `@union.task` decorator.
* **`ContainerTask`**: This Python class represents a raw container.
It allows you to install any image you like, giving you complete control of the task.
* **Shell tasks**: Use them to execute `bash` scripts within Union.ai.
* **Specialized plugin tasks**: These include both specialized classes and specialized configurations of the `PythonFunctionTask`.
They implement integrations with third-party systems.

## PythonFunctionTask

This is the task type that is created when you add the `@union.task` decorator to a Python function.
It represents a Python function that will be run within a single container. For example::

```python
@union.task
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame

```

See the [Python Function Task example](https://github.com/unionai-oss/union-cloud-docs-examples/tree/main/python_function_task).

This is the most common task variant and the one that, thus far, we have focused on in this documentation.

## ContainerTask

This task variant represents a raw container, with no assumptions made about what is running within it.
Here is an example of declaring a `ContainerTask`:

```python
greeting_task = ContainerTask(
    name="echo_and_return_greeting",
    image="alpine:latest",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(name=str),
    outputs=kwtypes(greeting=str),
    command=["/bin/sh", "-c", "echo 'Hello, my name is {{.inputs.name}}.' | tee -a /var/outputs/greeting"],
)
```

The `ContainerTask` enables you to include a task in your workflow that executes arbitrary code in any language, not just Python.

In the following example, the tasks calculate an ellipse area. This name has to be unique in the entire project. Users can specify:

`input_data_dir` -> where inputs will be written to.

`output_data_dir` -> where Union.ai will expect the outputs to exist.

The `inputs` and `outputs` specify the interface for the task; thus it should be an ordered dictionary of typed input and output variables.

The image field specifies the container image for the task, either as an image name or an ImageSpec. To access the file that is not included in the image, use ImageSpec to copy files or directories into container `/root`.

Cache can be enabled in a ContainerTask by configuring the cache settings in the `TaskMetadata` in the metadata parameter.

```python
calculate_ellipse_area_haskell = ContainerTask(
    name="ellipse-area-metadata-haskell",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-haskell:v2",
    command=[
        "./calculate-ellipse-area",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
    metadata=TaskMetadata(cache=True, cache_version="1.0"),
)

calculate_ellipse_area_julia = ContainerTask(
    name="ellipse-area-metadata-julia",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-julia:v2",
    command=[
        "julia",
        "calculate-ellipse-area.jl",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
    metadata=TaskMetadata(cache=True, cache_version="1.0"),
)

@workflow
def wf(a: float, b: float):
    area_haskell, metadata_haskell = calculate_ellipse_area_haskell(a=a, b=b)
    area_julia, metadata_julia = calculate_ellipse_area_julia(a=a, b=b)
```

See the [Container Task example](https://github.com/unionai-oss/union-cloud-docs-examples/tree/main/container_task).

## Shell tasks

Shell tasks enable the execution of shell scripts within Union.ai.
To create a shell task, provide a name for it, specify the bash script to be executed, and define inputs and outputs if needed:

### Example
```python
from pathlib import Path
from typing import Tuple

import union
from flytekit import kwtypes
from flytekit.extras.tasks.shell import OutputLocation, ShellTask

t1 = ShellTask(
    name="task_1",
    debug=True,
    script="""
    set -ex
    echo "Hey there! Let's run some bash scripts using a shell task."
    echo "Showcasing shell tasks." >> {inputs.x}
    if grep "shell" {inputs.x}
    then
        echo "Found it!" >> {inputs.x}
    else
        echo "Not found!"
    fi
    """,
    inputs=kwtypes(x=FlyteFile),
    output_locs=[OutputLocation(var="i", var_type=FlyteFile, location="{inputs.x}")],
)

t2 = ShellTask(
    name="task_2",
    debug=True,
    script="""
    set -ex
    cp {inputs.x} {inputs.y}
    tar -zcvf {outputs.j} {inputs.y}
    """,
    inputs=kwtypes(x=FlyteFile, y=FlyteDirectory),
    output_locs=[OutputLocation(var="j", var_type=FlyteFile, location="{inputs.y}.tar.gz")],
)

t3 = ShellTask(
    name="task_3",
    debug=True,
    script="""
    set -ex
    tar -zxvf {inputs.z}
    cat {inputs.y}/$(basename {inputs.x}) | wc -m > {outputs.k}
    """,
    inputs=kwtypes(x=FlyteFile, y=FlyteDirectory, z=FlyteFile),
    output_locs=[OutputLocation(var="k", var_type=FlyteFile, location="output.txt")],
)
```
Here's a breakdown of the parameters of the `ShellTask`:

- The `inputs` parameter allows you to specify the types of inputs that the task will accept
- The `output_locs` parameter is used to define the output locations, which can be `FlyteFile` or `FlyteDirectory`
- The `script` parameter contains the actual bash script that will be executed
  (`{inputs.x}`, `{outputs.j}`, etc. will be replaced with the actual input and output values).
- The `debug` parameter is helpful for debugging purposes

We define a task to instantiate `FlyteFile` and `FlyteDirectory`.
A `.gitkeep` file is created in the `FlyteDirectory` as a placeholder to ensure the directory exists:

```python
@union.task
def create_entities() -> Tuple[union.FlyteFile, union.FlyteDirectory]:
    working_dir = Path(union.current_context().working_directory)
    flytefile = working_dir / "test.txt"
    flytefile.touch()

    flytedir = working_dir / "testdata"
    flytedir.mkdir(exist_ok=True)

    flytedir_file = flytedir / ".gitkeep"
    flytedir_file.touch()
    return flytefile, flytedir
```
We create a workflow to define the dependencies between the tasks:

```python
@union.workflow
def shell_task_wf() -> union.FlyteFile:
    x, y = create_entities()
    t1_out = t1(x=x)
    t2_out = t2(x=t1_out, y=y)
    t3_out = t3(x=x, y=y, z=t2_out)
    return t3_out
```
You can run the workflow locally:
```python
if __name__ == "__main__":
    print(f"Running shell_task_wf() {shell_task_wf()}")
```

## Specialized plugin task classes and configs

Union.ai supports a wide variety of plugin tasks.
Some of these are enabled as specialized task classes, others as specialized configurations of the default `@union.task` (`PythonFunctionTask`).

They enable things like:

* Querying external databases (AWS Athena, BigQuery, DuckDB, SQL, Snowflake, Hive).
* Executing specialized processing right in Union.ai (Spark in virtual cluster, Dask in Virtual cluster, Sagemaker, Airflow, Modin, Ray, MPI and Horovod).
* Handing off processing to external services(AWS Batch, Spark on Databricks, Ray on external cluster).
* Data transformation (Great Expectations, DBT, Dolt, ONNX, Pandera).
* Data tracking and presentation  (MLFlow, Papermill).

See the [Integration section]() for examples.
<!-- TODO: Add link to API -->

<!-- TODO: INCORPORATE THE FOLLOWING ABOVE WHERE NECESSARY

## @union.task parameters

`task_config`: This argument provides configuration for a specific task types. Please refer to the plugins documentation for the right object to use.
It is impossible to define the unit of execution of a task in the same
way for all tasks. Hence, Flyte allows for different task types in the
system. Flyte has a set of defined, battle-tested task types. It allows
for a flexible model to
`define new types <cookbook:plugins_extend>`{.interpreted-text
role="std:ref"}.
Flyte offers numerous plugins for tasks, including backend plugins like Athena.
Flyte exposes an extensible model to express tasks in an
execution-independent language. It contains first-class task plugins
(for example:
[Papermill](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-papermill/flytekitplugins/papermill/task.py),
[Great
Expectations](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-greatexpectations/flytekitplugins/great_expectations/task.py),
and `more <integrations>`{.interpreted-text role="ref"}.) that execute
the Flyte tasks. Almost any action can be implemented and introduced
into Flyte as a \"Plugin\", which includes:
- Tasks that run queries on distributed data warehouses like Redshift, Hive, Snowflake, etc.
- Tasks that run executions on compute engines like Spark, Flink, AWS Sagemaker, AWS Batch, Kubernetes pods, jobs, etc.
- Tasks that call web services.
Flyte ships with certain defaults, for example, running a simple Python
function does not need any hosted service. Flyte knows how to execute
these kinds of tasks on Kubernetes. It turns out these are the vast
majority of tasks in machine learning, and Flyte is adept at handling an
enormous scale on Kubernetes. This is achieved by implementing a unique
scheduler on Kubernetes.

-->

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-parameters ===

# Task parameters

You pass the following parameters to the `@union.task` decorator:

<!-- TODO: consider organizing by category rather than alphabetically. -->

* `accelerator`: The accelerator to use for this task.
  For more information, see [Specifying accelerators]().
  <!-- TODO: Add link to API -->

* `cache`: See [Caching](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching).

* `cache_serialize`: See [Caching](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching).

* `cache_version`: See [Caching](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching).

* `cache_ignore_input_vars`: Input variables that should not be included when calculating the hash for the cache.

* `container_image`: See [`ImageSpec`](https://www.union.ai/docs/v1/union/user-guide/core-concepts/image-spec).

* `deprecated`: A string that can be used to provide a warning message for deprecated task.
  The absence of a string, or an empty string, indicates that the task is active and not deprecated.

* `docs`: Documentation about this task.

* `enable_deck`: If true, this task will output a Deck which can be used to visualize the task execution. See [Decks](https://www.union.ai/docs/v1/union/user-guide/development-cycle/decks).

```python
@union.task(enable_deck=True)
def my_task(my_str: str):
    print("hello {my_str}")
```

* `environment`: See [Environment variables](./task-software-environment/environment-variables).

* `interruptible`: See [Interruptible instances](./task-hardware-environment/interruptible-instances).

* `limits`: See [Customizing task resources](./task-hardware-environment/customizing-task-resources).

* `node_dependency_hints`: A list of tasks, launch plans, or workflows that this task depends on.
  This is only for dynamic tasks/workflows, where Union.ai cannot automatically determine the dependencies prior to runtime.
  Even on dynamic tasks this is optional, but in some scenarios it will make registering the workflow easier,
  because it allows registration to be done the same as for static tasks/workflows.
  For example this is useful to run launch plans dynamically, because launch plans must be registered before they can be run.
  Tasks and workflows do not have this requirement.

```python
@union.workflow
def workflow0():
  launchplan0 = LaunchPlan.get_or_create(workflow0)
    # Specify node_dependency_hints so that launchplan0
    # will be registered on flyteadmin, despite this being a dynamic task.
@union.dynamic(node_dependency_hints=[launchplan0])
def launch_dynamically():
    # To run a sub-launchplan it must have previously been registered on flyteadmin.
    return [launchplan0]*10
```

* `pod_template`: See [Task hardware environment](./task-hardware-environment#pod_template-and-pod_template_name-task-parameters).

* `pod_template_name`: See [Task hardware environment](./task-hardware-environment#pod_template-and-pod_template_name-task-parameters).

* `requests`: See [Customizing task resources](./task-hardware-environment/customizing-task-resources)

* `retries`: Number of times to retry this task during a workflow execution.
  Tasks can define a retry strategy to let the system know how to handle failures (For example: retry 3 times on any kind of error).
  For more information, see [Interruptible instances](./task-hardware-environment/interruptible-instances)
  There are two kinds of retries *system retries* and *user retries*.

* `secret_requests`: See [Managing secrets](https://www.union.ai/docs/v1/union/user-guide/development-cycle/managing-secrets)

* `task_config`: Configuration for a specific task type.
  See the [Union.ai Connectors documentation](https://www.union.ai/docs/v1/union/user-guide/core-concepts/integrations/connectors) and
  [Union.ai plugins documentation]() for the right object to use.
  <!-- TODO: Add link to API -->

* `task_resolver`: Provide a custom task resolver.

* `timeout`: The max amount of time for which one execution of this task should be executed for.
  The execution will be terminated if the runtime exceeds the given timeout (approximately).
  To ensure that the system is always making progress, tasks must be guaranteed to end gracefully/successfully.
  The system defines a default timeout period for the tasks.
  It is possible for task authors to define a timeout period, after which the task is marked as `failure`.
  Note that a timed-out task will be retried if it has a retry strategy defined.
  The timeout can be handled in the
  [TaskMetadata]().
  <!-- TODO: Add link to API -->

## Use `partial` to provide default arguments to tasks

You can use the `functools.partial` function to assign default or constant values to the parameters of your tasks:
```python
import functools
import union

@union.task
def slope(x: list[int], y: list[int]) -> float:
    sum_xy = sum([x[i] * y[i] for i in range(len(x))])
    sum_x_squared = sum([x[i] ** 2 for i in range(len(x))])
    n = len(x)
    return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2)

@union.workflow
def simple_wf_with_partial(x: list[int], y: list[int]) -> float:
    partial_task = functools.partial(slope, x=x)
    return partial_task(y=y)
```

## Named outputs

By default, Union.ai employs a standardized convention to assign names to the outputs of tasks or workflows.
Each output is sequentially labeled as `o1`, `o2`, `o3`, ... `on`, where `o` serves as the standard prefix,
and `1`, `2`, ... `n` indicates the positional index within the returned values.

However, Union.ai allows the customization of output names for tasks or workflows.
This customization becomes beneficial when you're returning multiple outputs
and you wish to assign a distinct name to each of them.

The following example illustrates the process of assigning names to outputs for both a task and a workflow.

Define a `NamedTuple` and assign it as an output to a task:

```python
import union
from typing import NamedTuple

slope_value = NamedTuple("slope_value", [("slope", float)])

@union.task
def slope(x: list[int], y: list[int]) -> slope_value:
    sum_xy = sum([x[i] * y[i] for i in range(len(x))])
    sum_x_squared = sum([x[i] ** 2 for i in range(len(x))])
    n = len(x)
    return (n * sum_xy - sum(x) * sum(y)) / (n * sum_x_squared - sum(x) ** 2)
```

Likewise, assign a `NamedTuple` to the output of `intercept` task:

```python
intercept_value = NamedTuple("intercept_value", [("intercept", float)])

@union.task
def intercept(x: list[int], y: list[int], slope: float) -> intercept_value:
    mean_x = sum(x) / len(x)
    mean_y = sum(y) / len(y)
    intercept = mean_y - slope * mean_x
    return intercept
```

> [!NOTE]
> While it's possible to create `NamedTuple`s directly within the code,
> it's often better to declare them explicitly. This helps prevent potential linting errors in tools like mypy.
>
> ```python
> def slope() -> NamedTuple("slope_value", slope=float):
>     pass
> ```

You can easily unpack the `NamedTuple` outputs directly within a workflow.
Additionally, you can also have the workflow return a `NamedTuple` as an output.

> [!NOTE]
> Remember that we are extracting individual task execution outputs by dereferencing them.
> This is necessary because `NamedTuple`s function as tuples and require this dereferencing:

```python
slope_and_intercept_values = NamedTuple("slope_and_intercept_values", [("slope", float), ("intercept", float)])

@union.workflow
def simple_wf_with_named_outputs(x: list[int] = [-3, 0, 3], y: list[int] = [7, 4, -2]) -> slope_and_intercept_values:
    slope_value = slope(x=x, y=y)
    intercept_value = intercept(x=x, y=y, slope=slope_value.slope)
    return slope_and_intercept_values(slope=slope_value.slope, intercept=intercept_value.intercept)

```

You can run the workflow locally as follows:

```python
if __name__ == "__main__":
    print(f"Running simple_wf_with_named_outputs() {simple_wf_with_named_outputs()}")
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/launching-tasks ===

# Launching tasks

From the [task view](./viewing-tasks#task-view) (accessed, for example, by selecting a task in the [**Tasks** list](./viewing-tasks#tasks-list)) you can select **Launch Task** in the top right:

This opens the **New Execution** dialog for tasks:

![](../../../_static/images/user-guide/core-concepts/tasks/launching-tasks/new-execution-dialog.png)

The settings are similar to those for workflows. At the top you can select:

* The specific version of this task that you want to launch.

Along the left side the following sections are available:

* **Inputs**: The input parameters of the task function appear here as fields to be filled in.
* **Settings**:
  * **Execution name**: A custom name for this execution. If not specified, a name will be generated.
  * **Overwrite cached outputs**: A boolean. If set to `True`, this execution will overwrite any previously-computed cached outputs.
  * **Raw output data config**: Remote path prefix to store raw output data.
    By default, workflow output will be written to the built-in metadata storage.
    Alternatively, you can specify a custom location for output at the organization, project-domain, or individual execution levels.
    This field is for specifying this setting at the workflow execution level.
    If this field is filled in it overrides any settings at higher levels.
    The parameter is expected to be a URL to a writable resource (for example, `http://s3.amazonaws.com/my-bucket/`).
    See [Raw data store](https://www.union.ai/docs/v1/union/user-guide/data-input-output/task-input-and-output)
    **Max parallelism**: Number of workflow nodes that can be executed in parallel. If not specified, project/domain defaults are used. If 0 then no limit is applied.
  * **Force interruptible**: A three valued setting for overriding the interruptible setting of the workflow for this particular execution.
    If not set, the workflow's interruptible setting is used.
    If set and **enabled** then `interruptible=True` is used for this execution.
    If set and **disabled** then `interruptible=False` is used for this execution.
    See [Interruptible instances](./task-hardware-environment/interruptible-instances)

  * **Service account**: The service account to use for this execution. If not specified, the default is used.

* **Environment variables**: Environment variables that will be available to tasks in this workflow execution.
* **Labels**: Labels to apply to the execution resource.
* **Notifications**: [Notifications](https://www.union.ai/docs/v1/union/user-guide/core-concepts/launch-plans/notifications) configured for this workflow execution.

* **Debug**: The workflow execution details for debugging purposes.

Select **Launch** to launch the task execution. This will take you to the [Execution view](https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/viewing-workflow-executions).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/viewing-tasks ===

# Viewing tasks

## Tasks list

Selecting **Tasks** in the sidebar displays a list of all the registered tasks:

![Tasks list](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-tasks/tasks-list.png)

You can search the tasks by name and filter for only those that are archived.

Each task in the list displays some basic information about the task:

* **Inputs**: The input type for the task.
* **Outputs**: The output type for the task.
* **Description**: A description of the task.

Select an entry on the list to go to that **Core concepts > Tasks > Viewing tasks > Task view**.

## Task view

Selecting an individual task from the **Core concepts > Tasks > Viewing tasks > Tasks list** will take you to the task view:

![Task view](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-tasks/task-view.png)

Here you can see:

* **Inputs & Outputs**: The input and output types for the task.
* Recent task versions. Selecting one of these takes you to the **Core concepts > Tasks > Viewing tasks > Task view > Task versions list**
* Recent executions of this task. Selecting one of these takes you to the [execution view](https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/viewing-workflow-executions).

### Task versions list

The task versions list give you detailed information about a specific version of a task:

![Task versions list](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-tasks/task-versions-list.png)

* **Image**: The Docker image used to run this task.
* **Env Vars**: The environment variables used by this task.
* **Commands**: The JSON object defining this task.

At the bottom is a list of all versions of the task with the current one selected.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment ===

# Task software environment

The @union.task decorator provides the following parameters to specify the software environment in which a task runs:

* `container_image`: Can be either a string referencing a specific image on a container repository, or an ImageSpec defining a build. See **Core concepts > Tasks > Task software environment > Local image building** for details.
* `environment`: See **Core concepts > Tasks > Task software environment > Environment variables** for details.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/image-spec ===

# Local image building

With Union.ai, every task in a workflow runs within its own dedicated container.
Since a container requires a container image to run, every task in Union.ai must have a container image associated with it.
You can specify the container image to be used by a task by defining an `ImageSpec` object and passing it to the `container_image` parameter of the `@union.task` decorator.
When you register the workflow, the container image is built locally and pushed to the container registry that you specify.
When the workflow is executed, the container image is pulled from that registry and used to run the task.

> [!NOTE]
> See the [ImageSpec API documentation]() for full documentation of `ImageSpec` class parameters and methods.
<!-- TODO: Add link to API -->

To illustrate the process, we will walk through an example.

## Project structure

```shell
├── requirements.txt
└── workflows
    ├── __init__.py
    └── imagespec-simple-example.py
```

### requirements.txt

```shell
union
pandas
```

### imagespec-simple-example.py

```python
import typing
import pandas as pd
import union

image_spec = union.ImageSpec(
    registry="ghcr.io/<my-github-org>",
    name="simple-example-image",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="requirements.txt"
)

@union.task(container_image=image_spec)
def get_pandas_dataframe() -> typing.Tuple[pd.DataFrame, pd.Series]:
    df = pd.read_csv("https://storage.googleapis.com/download.tensorflow.org/data/heart.csv")
    print(df.head())
    return df[["age", "thalach", "trestbps", "chol", "oldpeak"]], df.pop("target")

@union.workflow()
def wf() -> typing.Tuple[pd.DataFrame, pd.Series]:
    return get_pandas_dataframe()
```

## Install and configure `union` and Docker

To install Docker, see [Setting up container image handling](https://www.union.ai/docs/v1/union/user-guide/getting-started/local-setup).
To configure `union` to connect to your Union.ai instance, see [Getting started](https://www.union.ai/docs/v1/union/user-guide/core-concepts/getting-started/_index).

## Set up an image registry

You will need an image registry where the container image can be stored and pulled by Union.ai when the task is executed.
You can use any image registry that you have access to, including public registries like Docker Hub or GitHub Container Registry.
Alternatively, you can use a registry that is part of your organization's infrastructure such as AWS Elastic Container Registry (ECR) or Google Artifact Registry (GAR).

The registry that you choose must be one that is accessible to the Union.ai instance where the workflow will be executed.
Additionally, you will need to ensure that the specific image, once pushed to the registry, is itself publicly accessible.

In this example, we use GitHub's `ghcr.io` container registry.
See [Working with the Container registry](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry) for more information.

* For an example using Amazon ECR see [ImageSpec with ECR](./image-spec-with-ecr).
* For an example using Google Artifact Registry see [ImageSpec with GAR](./image-spec-with-gar).
* For an example using Azure Container Registry see [ImageSpec with ACR](./image-spec-with-acr).

## Authenticate to the registry

You will need to set up your local Docker client to authenticate with GHCR. This is needed for `union` CLI to be able to push the image built according to the `ImageSpec` to GHCR.

Follow the directions [Working with the Container registry > Authenticating to the Container registry](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry#authenticating-to-the-container-registry).

## Set up your project and domain on Union.ai

You will need to set up a project on your Union.ai instance to which you can register your workflow.
See [Setting up the project](https://www.union.ai/docs/v1/union/user-guide/development-cycle/setting-up-a-project).

## Understand the requirements

The `requirements.txt` file contains the `union` package and the `pandas` package, both of which are needed by the task.

## Set up a virtual Python environment

Set up a virtual Python environment and install the dependencies defined in the `requirements.txt` file.
Assuming you are in the local project root, run `pip install -r requirements.txt`.

## Run the workflow locally

You can now run the workflow locally.
In the project root directory, run: `union run workflows/imagespec-simple-example.py wf`.
See [Running your code](https://www.union.ai/docs/v1/union/user-guide/development-cycle/running-your-code) for more details.

> [!NOTE]
> When you run the workflow in your local Python environment, the image is not built or pushed (in fact, no container image is used at all).

## Register the workflow

To register the workflow to Union.ai, in the local project root, run:

```shell
$ union register workflows/imagespec-simple-example.py
```

`union` will build the container image and push it to the registry that you specified in the `ImageSpec` object.
It will then register the workflow to Union.ai.

To see the registered workflow, go to the UI and navigate to the project and domain that you created above.

## Ensure that the image is publicly accessible

If you are using the `ghcr.io` image registry, you must switch the visibility of your container image to Public before you can run your workflow on Union.ai.
See [Configuring a package's access control and visibility](https://docs.github.com/en/packages/learn-github-packages/configuring-a-packages-access-control-and-visibility#about-inheritance-of-access-permissions-and-visibility).

## Run the workflow on Union.ai

Assuming your image is publicly accessible, you can now run the workflow on Union.ai by clicking **Launch Workflow**.

> [!WARNING] Make sure your image is accessible
> If you try to run a workflow that uses a private container image or an image that is inaccessible for some other reason, the system will return an error:
>
> ```
> ... Failed to pull image ...
> ... Error: ErrImagePull
> ... Back-off pulling image ...
> ... Error: ImagePullBackOff
> ```

## Multi-image workflows

You can also specify different images per task within the same workflow.
This is particularly useful if some tasks in your workflow have a different set of dependencies where most of the other tasks can use another image.

In this example we specify two tasks: one that uses CPUs and another that uses GPUs.
For the former task, we use the default image that ships with union while for the latter task, we specify a pre-built image that enables distributed training with the Kubeflow Pytorch integration.

```python
import numpy as np
import torch.nn as nn

@task(
    requests=Resources(cpu="2", mem="16Gi"),
    container_image="ghcr.io/flyteorg/flytekit:py3.9-latest",
)
def get_data() -> Tuple[np.ndarray, np.ndarray]:
    ...  # get dataset as numpy ndarrays

@task(
    requests=Resources(cpu="4", gpu="1", mem="16Gi"),
    container_image="ghcr.io/flyteorg/flytecookbook:kfpytorch-latest",
)
def train_model(features: np.ndarray, target: np.ndarray) -> nn.Module:
    ...  # train a model using gpus
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/image-spec-with-ecr ===

# ImageSpec with ECR

In this section we explain how to set up and use AWS Elastic Container Registry (ECR) to build and deploy task container images using `ImageSpec`.

## Prerequisites

If you are using ECR in the same AWS account as your Union.ai data plane, then you do not need to configure anything. Access to ECR in the same account is enabled by default.

If you want to store your task container images in an ECR instance in an AWS account _other than the one that holds your data plane_, then you will have to configure that ECR instance to permit access from your data plane. See [Enable AWS ECR](https://www.union.ai/docs/v1/union/user-guide/deployment/enabling-aws-resources/enabling-aws-ecr) for details.

## Set up the image repository

Unlike GitHub Container Registry, ECR does not allow you to simply push an arbitrarily named image to the registry. Instead, you must first create a repository in the ECR instance and then push the image to that repository.

> [!NOTE] Registry, repository, and image
> In ECR terminology the **registry** is the top-level storage service. The registry holds a collection of **repositories**.
> Each repository corresponds to a named image and holds all versions of that image.
>
> When you push an image to a registry, you are actually pushing it to a repository within that registry.
> Strictly speaking, the term *image* refers to a specific *image version* within that repository.

This means that you have to decide on the name of your image and create a repository by that name first, before registering your workflow. We will assume the following:

* The ECR instance you will be using has the base URL `123456789012.dkr.ecr.us-east-1.amazonaws.com`.

* Your image will be called `simple-example-image`.

In the AWS console, go to **Amazon ECR > Repositories** and find the correct ECR registry

If you are in the same account as your Union.ai data plane you should go directly to the ECR registry that was set up for you by Union.ai. If there are multiple ECR registries present, consult with your Union.ai administrator to find out which one to use.

Under **Create a Repository**, click **Get Started**:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-ecr/create-repository-1.png)

On the **Create repository** page:

* Select **Private** for the repository visibility, assuming you want to make it private. You can, alternatively, select **Public**, but in most cases, the main reason for using ECR is to keep your images private.

* Enter the name of the repository:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-ecr/create-repository-2.png)

and then scroll down to click **Create repository**:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-ecr/create-repository-3.png)

Your repository is now created.

## Authenticate to the registry

You will need to set up your local Docker client to authenticate with ECR. This is needed for `union` to be able to push the image built according to the `ImageSpec` to ECR.

To do this, you will need to [install the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html), use it to run the `aws ecr get-login-password` command to get the appropriate password, then perform a `docker login` with that password.

See [Private registry authentication](https://docs.aws.amazon.com/AmazonECR/latest/userguide/registry_auth.html) for details.

## Register your workflow to Union.ai

You can register tasks with `ImageSpec` declarations that reference this repository.

For example, to use the example repository shown here, we would alter the Python code in the **Core concepts > Tasks > Task software environment > ImageSpec with ECR**, to have the following `ImageSpec` declaration:

```python
image_spec = union.ImageSpec(
    registry="123456789012.dkr.ecr.us-eas-1.amazonaws.com",
    name="simple-example-image",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="image-requirements.txt"
)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/image-spec-with-gar ===

# ImageSpec with GAR

In this section we explain how to set up and use Google Artifact Registry (GAR) to build and deploy task container images using `ImageSpec`.

## Prerequisites

If you are using GAR in the same Google Cloud Platform (GCP) project as your Union.ai data plane, then you do not need to configure anything.
Access to GAR in the same project is enabled by default.

If you want to store your task container images in a GAR repository in a GCP project _other than the one that holds your data plane_, you must enable the node pool of your data plane to access that GAR.
See [Enable Google Artifact Registry](https://www.union.ai/docs/v1/union/user-guide/deployment/enabling-gcp-resources/enabling-google-artifact-registry) for details.

## Set up the image repository

Unlike GitHub Container Registry, GAR does not allow you to simply push an arbitrarily named image to the registry.
Instead, you must first create a repository in the GAR instance and then push the image to that repository.

> [!NOTE] Registry, repository, and image
> In GAR terminology the **registry** is the top-level storage service. The registry holds a collection of **repositories**.
> Each repository in turn holds some number of images, and each specific image name can have different versions.
>
> Note that this differs from the arrangement in AWS ECR where the repository name and image name are essentially the same.
>
> When you push an image to GAR, you are actually pushing it to an image name within a repository within that registry.
> Strictly speaking, the term *image* refers to a specific *image version* within that repository.

This means that you have to decide on the name of your repository and create it, before registering your workflow. You can, however, decide on the image name later, when you push the image to the repository. We will assume the following:

* The GAR instance you will be using has the base URL `us-east1-docker.pkg.dev/my-union-dataplane/my-registry/`.
* Your repository will be called `my-image-repository`.
* Your image will be called `simple-example-image`.

In the GCP console, within your Union.ai data plane project, go to **Artifact Registry**. You should see a list of repositories. The existing ones are used internally by Union.ai. For your own work you should create a new one. Click **Create Repository**:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-gar/gar-create-repository-1.png)

On the **Create repository** page,

* Enter the name of the repository. In this example it would be `my-image-repository`.
* Select **Docker** for the artifact type.

* Select the region. If you want to access the GAR without further configuration, make sure this the same region as your Union.ai data plane.

* Click **Create**:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-gar/gar-create-repository-2.png)

Your GAR repository is now created.

## Authenticate to the registry

You will need to set up your local Docker client to authenticate with GAR. This is needed for `union` to be able to push the image built according to the `ImageSpec` to GAR.

Directions can be found in the GAR console interface. Click on **Setup Instructions**:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-software-environment/imagespec-with-gar/gar-setup-instructions.png)

The directions are also reproduced below. (We show the directions for the `us-east1` region. You may need to adjust the command accordingly):

> [!NOTE] Setup Instructions
> Follow the steps below to configure your client to push and pull packages using this repository.
> You can also [view more detailed instructions here](https://cloud.google.com/artifact-registry/docs/docker/authentication?authuser=1).
> For more information about working with artifacts in this repository, see the [documentation](https://cloud.google.com/artifact-registry/docs/docker?authuser=1).
>
> **Initialize gcloud**
>
> The [Google Cloud SDK](https://cloud.google.com/sdk/docs/?authuser=1) is used to generate an access token when authenticating with Artifact Registry.
> Make sure that it is installed and initialized with [Application Default Credentials](https://cloud.google.com/sdk/gcloud/reference/auth/application-default/login?authuser=1) before proceeding.
>
> **Configure Docker**
>
> Run the following command to configure `gcloud` as the credential helper for the Artifact Registry domain associated with this repository's location:
>
> ```shell
> $ gcloud auth configure-docker us-east1-docker.pkg.dev
> ```

## Register your workflow to Union.ai

You can now register tasks with `ImageSpec` declarations that reference this repository.

For example, to use the example GAR repository shown here, we would alter the Python code in the **Core concepts > Tasks > Task software environment > ImageSpec with GAR**, to have the following `ImageSpec` declaration:

```python
image_spec = union.ImageSpec(
    registry="us-east1-docker.pkg.dev/my-union-dataplane/my-registry/my-image-repository",
    name="simple-example-image",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="image-requirements.txt"
)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/image-spec-with-acr ===

# ImageSpec with ACR

In this section we explain how to use [Azure Container Registry (ACR)](https://azure.microsoft.com/en-us/products/container-registry) to build and deploy task container images using `ImageSpec`.

Before proceeding, make sure that you have [enabled Azure Container Registry](https://www.union.ai/docs/v1/union/user-guide/core-concepts/integrations/enabling-azure-resources/enabling-azure-container-registry) for you Union.ai installation.

## Authenticate to the registry

Authenticate with the container registry

```bash
az login
az acr login --name <acrName>
```

Refer to [Individual login with Microsoft Entra ID](https://learn.microsoft.com/en-us/azure/container-registry/container-registry-authentication?tabs=azure-cli#individual-login-with-microsoft-entra-id) in the Azure documentation for additional details.

## Register your workflow to Union.ai

You can now register tasks with `ImageSpec` declarations that reference this repository.

For example, to use an existing ACR repository, we would alter the Python code in the **Core concepts > Tasks > Task software environment > ImageSpec with ACR**, to have the following `ImageSpec` declaration:

```python
image_spec = union.ImageSpec(
    registry="<AZURE_CONTAINER_REGISTRY_NAME>.azurecr.io",
    name="my-repository/simple-example-image",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="image-requirements.txt"
)
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/environment-variables ===

# Environment variables

The `environment` parameter lets you specify the values of any variables that you want to be present within the task container execution environment.
For example:

```python
@union.task(environment={"MY_ENV_VAR": "my_value"})
def my_task() -> str:
    return os.environ["MY_ENV_VAR"]
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/viewing-logs ===

# Viewing logs

In the [Execution view](https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/viewing-workflow-executions), selecting a task from the list in the **Nodes** tab will open the task details in the right panel.

Within that panel, in the **Execution** tab, under **Logs**, you will find a link labeled **Task Logs**.

![Task logs link](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-logs/task-logs-link.png)

This leads to the **Execution logs tab** of the **Execution details page**:

![Execution logs](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-logs/execution-logs.png)

The execution logs provide a live view into the standard output of the task execution.

For example, any `print` statements in the tasks Python code will be displayed here.

## Kubernetes cluster logs

On the left side of the page you can also see the Kubernetes cluster logs for the task execution:

![Kubernetes cluster logs](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-logs/k8s-cluster-logs.png)

## Other tabs

Alongside the **Execution logs** tab in the **Execution details page**, you will also find the **Execution resources** and **Inputs & Outputs** tabs.

## Cloud provider logs

In addition to the **Task Logs** link, you will also see a link to your cloud provider's logs (**Cloudwatch Logs** for AWS, **Stackdriver Logs** for GCP, and **Azure Logs** for Azure):

![Cloud provider logs link](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/viewing-logs/cloud-provider-logs-link.png)

Assuming you are logged into your cloud provider account with the appropriate permissions, this link will take you to the logs specific to the container in which this particular task execution is running.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/reference-tasks ===

# Reference tasks

A reference_task references tasks that have already been defined, serialized, and registered. You can reference tasks from other projects and create workflows that use tasks declared by others. These tasks can be in their own containers, python runtimes, flytekit versions, and even different languages.

> [!NOTE]
> Reference tasks cannot be run locally. To test locally, mock them out.

## Example

1. Create a file called `task.py` and insert this content into it:

    ```python
    import union

    @union.task
    def add_two_numbers(a: int, b: int) -> int:
        return a + b
    ```

2. Register the task:

   ```shell
   $ union register --project flytesnacks --domain development --version v1 task.py
   ```

3. Create a separate file `wf_ref_task.py` and copy the following code into it:

   ```python
   from flytekit import reference_task

   @reference_task(
       project="flytesnacks",
       domain="development",
       name="task.add_two_numbers",
       version="v1",
   )
   def add_two_numbers(a: int, b: int) -> int:
       ...

   @union.workflow
   def wf(a: int, b: int) -> int:
       return add_two_numbers(a, b)
   ```

4. Register the `wf` workflow:

    ```shell
    $ union register --project flytesnacks --domain development wf_ref_task.py
    ```

5. In the Union.ai UI, run the workflow `wf_ref_task.wf`.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment ===

# Task hardware environment

## Customizing task resources

You can customize the hardware environment in which your task code executes.

Depending on your needs, there are two different of ways to define and register tasks with their own custom hardware requirements:

* Configuration in the `@union.task` decorator
* Defining a `PodTemplate`

### Using the `@union.task` decorator

You can specify `requests` and `limits` on:

* CPU number
* GPU number
* Memory size
* Ephemeral storage size

See **Core concepts > Tasks > Task hardware environment > Customizing task resources** for details.

### Using PodTemplate

If your needs are more complex, you can use Kubernetes-level configuration to constrain a task to only run on a specific machine type.

This requires that you coordinate with Union.ai to set up the required machine types and node groups with the appropriate node assignment configuration (node selector labels, node affinities, taints, tolerations, etc.)

In your task definition you then use a `PodTemplate` that uses the matching node assignment configuration to make sure that the task will only be scheduled on the appropriate machine type.

### `pod_template` and `pod_template_name` @union.task parameters

The `pod_template` parameter can be used to supply a custom Kubernetes `PodTemplate` to the task.
This can be used to define details about node selectors, affinity, tolerations, and other Kubernetes-specific settings.

The `pod_template_name` is a related parameter that can be used to specify the name of an already existing `PodTemplate` resource which will be used in this task.

For details see [Configuring task pods with Kubernetes PodTemplates]().
<!-- TODO: Add link to API -->

## Accelerators

If you specify GPUs, you can also specify the type of GPU to be used by setting the `accelerator` parameter.
See **Core concepts > Tasks > Task hardware environment > Accelerators** for more information.

## Task-level monitoring

You can also monitor the hardware resources used by a task.
See **Core concepts > Tasks > Task hardware environment > Task-level monitoring** for details.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources ===

# Customizing task resources

When defining a task function, you can specify resource requirements for the pod that runs the task.
Union.ai will take this into account to ensure that the task pod is scheduled to run on a Kubernetes node that meets the specified resource profile.

Resources are specified in the `@union.task` decorator. Here is an example:

```python
from flytekit.extras.accelerators import A100

@union.task(
    requests=Resources(mem="120Gi", cpu="44", ephemeral_storage="100Gi"),
    limits=Resources(mem="200Gi", cpu="100", gpu="12", ephemeral_storage="200Gi"),
    accelerator=GPUAccelerator("nvidia-tesla-a100")
)
def my_task()
    ...
```

There are three separate resource-related settings:

* `requests`
* `limits`
* `accelerator`

## The `requests` and `limits` settings

The `requests` and `limits` settings each takes a [`Resource`]() object, which itself has five possible attributes:
<!-- TODO: Add link to API -->

* `cpu`: Number of CPU cores (in whole numbers or millicores (`m`)).
* `gpu`: Number of GPU cores (in whole numbers or millicores (`m`)).
* `mem`: Main memory (in `Mi`, `Gi`, etc.).
* `ephemeral_storage`: Ephemeral storage (in `Mi`, `Gi` etc.).

Note that CPU and GPU allocations can be specified either as whole numbers or in millicores (`m`). For example, `cpu="2500m"` means two and a half CPU cores and `gpu="3000m"`, meaning three GPU cores.

The type of ephemeral storage used depends on the node type and configuration you request from the Union.ai team. By default, all nodes will use network-attached storage for ephemeral storage. However, if a node type has attached NVMe SSD storage, you can request that the Union.ai team configure your cluster to use the attached NVMe as ephemeral storage for that node type.

The `requests` setting tells the system that the task requires _at least_ the resources specified and therefore the pod running this task should be scheduled only on a node that meets or exceeds the resource profile specified.

The `limits` setting serves as a hard upper bound on the resource profile of nodes to be scheduled to run the task.
The task will not be scheduled on a node that exceeds the resource profile specified (in any of the specified attributes).

> [!NOTE] GPUs take only `limits`
> GPUs should only be specified in the `limits` section of the task decorator:
>   * You should specify GPU requirements only in `limits`, not in `requests`, because Kubernetes will use the `limits` value as the `requests` value anyway.
>   * You _can_ specify GPU in both `limits` and `requests` but the two values must be equal.
>   * You cannot specify GPU `requests` without specifying `limits`.

## The `accelerator` setting

The `accelerator` setting further specifies the *type* of specialized hardware required for the task.
This can be a GPU, a specific variation of a GPU, a fractional GPU, or a different hardware device, such as a TPU.

See [Accelerators](./accelerators) for more information.

## Execution defaults and resource quotas

The execution defaults and resource quotas can be found on the right sidebar of the Dashboard.
They can be edited by selecting the gear icon:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources/execution-defaults-gear.png)

This will open a dialog:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources/execution-defaults-dialog.png)

> [!NOTE]
> An ephemeral storage default value of zero means that the task pod will consume storage on the node as needed.
> This makes it possible for a pod to get evicted if a node doesn't have enough storage. If your tasks are built to rely on
> ephemeral storage, we recommend being explicit with the ephemeral storage you request to avoid pod eviction.

## Task resource validation

If you attempt to execute a workflow with unsatisfiable resource requests, the execution will fail immediately rather than being allowed to queue forever.

To remedy such a failure, you should make sure that the appropriate node types are:

* Physically available in your cluster, meaning you have arranged with the Union.ai team to include them when [configuring your data plane](https://www.union.ai/docs/v1/union/user-guide/deployment/configuring-your-data-plane).
* Specified in the task decorator (via the `requests`, `limits`, `accelerator`, or other parameters).

Go to the **Resources > Compute** dashboard to find the available node types and their resource profiles.

To make changes to your cluster configuration, go to the [Union.ai Support Portal](https://get.support.union.ai/servicedesk/customer/portal/1/group/6/create/30).

## The `with_overrides` method

When `requests`, `limits`, or `accelerator` are specified in the `@union.task` decorator, they apply every time that a task is invoked from a workflow.
In some cases, you may wish to change the resources specified from one invocation to another.
To do that, use the [`with_overrides` method](https://www.union.ai/docs/v1/union/api-reference/flytekit-sdk/packages/flytekit.core.node) of the task function.

For example:

```python
@union.task
def my_task(ff: FlyteFile):
    ...

@union.workflow
def my_workflow():
    my_task(ff=smallFile)
    my_task(ff=bigFile).with_overrides(requests=Resources(mem="120Gi", cpu="10"))
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/accelerators ===

# Accelerators

> [!NOTE] _Accelerators_ and _Accelerated datasets_ are entirely different things
> An accelerator, in Union.ai, is a specialized hardware device that is used to accelerate the execution of a task.
> [Accelerated datasets](https://www.union.ai/docs/v1/union/user-guide/data-input-output/accelerated-datasets), on the other hand, is a Union.ai feature that enables quick access to large datasets from within a task.
> These concepts are entirely different and should not be confused.

Union.ai allows you to specify [requests and limits](./customizing-task-resources) for the number of GPUs available for a given task.
However, in some cases, you may want to be more specific about the type of GPU or other specialized device to be used.

You can use the `accelerator` parameter to specify specific GPU types, variations of GPU types, fractional GPUs, or other specialized hardware devices such as TPUs.

Your Union.ai installation will come pre-configured with the GPUs and other hardware that you requested during onboarding.
Each device type has a constant name that you can use to specify the device in the `accelerator` parameter.
For example:

```python
from flytekit.extras.accelerators import A100

    @union.task(
        limits=Resources(gpu="1"),
        accelerator=A100,
    )
    def my_task():
        ...

```

## Finding your available accelerators

You can find the accelerators available in your Union.ai installation by going to the **Usage > Compute** dashboard in the UI.
In the **Accelerators** section, you will see a list of available accelerators and the named constants to be used in code to refer to them.

## Requesting the provisioning of accelerators

If you need a specific accelerator that is not available in your Union.ai installation, you can request it by contacting the Union.ai team.
Just click on the **Adjust Configuration** button under **Usage** in the UI (or go [here](https://get.support.union.ai/servicedesk/customer/portal/1/group/6/create/30)).

## Using predefined accelerator constants

There are a number of predefined accelerator constants available in the `flytekit.extras.accelerators` module.

The predefined list is not exhaustive, but it includes the most common accelerators.
If you know the name of the accelerator, but there is no predefined constant for it, you can simply pass the string name to the task decorator directly.

Note that in order for a specific accelerator to be available in your Union.ai installation, it must have been provisioned by the Union.ai team.

If using the constants, you can import them directly from the module, e.g.:

```python
from flytekit.extras.accelerators import T4

@union.task(
    limits=Resources(gpu="1"),
    accelerator=T4,
)
def my_task():
    ...

```

if you want to use a fractional GPU, you can use the `partitioned` method on the accelerator constant, e.g.:

```python
from flytekit.extras.accelerators import A100

@union.task(
    limits=Resources(gpu="1"),
    accelerator=A100.partition_2g_10gb,
)
def my_task():
    ...
```

## List of predefined accelerator constants

* `A10G`: [NVIDIA A10 Tensor Core GPU](https://www.nvidia.com/en-us/data-center/products/a10-gpu/)
* `L4`: [NVIDIA L4 Tensor Core GPU](https://www.nvidia.com/en-us/data-center/l4/)
* `K80`: [NVIDIA Tesla K80 GPU](https://www.nvidia.com/en-gb/data-center/tesla-k80/)
* `M60`: [NVIDIA Tesla M60 GPU](https://www.nvidia.com/content/dam/en-zz/Solutions/design-visualization/solutions/resources/documents1/nvidia-m60-datasheet.pdf)
* `P4`: [NVIDIA Tesla P4 GPU](https://images.nvidia.com/content/pdf/tesla/184457-Tesla-P4-Datasheet-NV-Final-Letter-Web.pdf)
* `P100`: [NVIDIA Tesla P100 GPU](https://www.nvidia.com/en-us/data-center/tesla-p100/)
* `T4`: [NVIDIA T4 Tensor Core GPU](https://www.nvidia.com/en-us/data-center/tesla-t4/)
* `V100` [NVIDIA Tesla V100 GPU](https://www.nvidia.com/en-us/data-center/tesla-v100/)
* `A100`: An entire [NVIDIA A100 GPU](https://www.nvidia.com/en-us/data-center/a100/). Fractional partitions are also available:
    * `A100.partition_1g_5gb`: 5GB partition of an A100 GPU.
    * `A100.partition_2g_10gb`: 10GB partition of an A100 GPU - 2x5GB slices with 2/7th of the SM (streaming multiprocessor).
    * `A100.partition_3g_20gb`: 20GB partition of an A100 GPU - 4x5GB slices, with 3/7th fraction of the SM.
    * `A100.partition_4g_20gb`: 20GB partition of an A100 GPU - 4x5GB slices, with 4/7th fraction of the SM.
    * `A100.partition_7g_40gb`: 40GB partition of an A100 GPU - 8x5GB slices, with 7/7th fraction of the SM.
* `A100_80GB`: An entire [NVIDIA A100 80GB GPU](https://www.nvidia.com/en-us/data-center/a100/). Fractional partitions are also available:
    * `A100_80GB.partition_1g_10gb`: 10GB partition of an A100 80GB GPU - 2x5GB slices with 1/7th of the SM (streaming multiprocessor).
    * `A100_80GB.partition_2g_20gb`: 2GB partition of an A100 80GB GPU - 4x5GB slices with 2/7th of the SM.
    * `A100_80GB.partition_3g_40gb`: 3GB partition of an A100 80GB GPU - 8x5GB slices with 3/7th of the SM.
    * `A100_80GB.partition_4g_40gb`: 4GB partition of an A100 80GB GPU - 8x5GB slices with 4/7th of the SM.
    * `A100_80GB.partition_7g_80gb`: 7GB partition of an A100 80GB GPU - 16x5GB slices with 7/7th of the SM.

For more information on partitioning, see [Partitioned GPUs](https://docs.nvidia.com/datacenter/tesla/mig-user-guide/index.html#partitioning).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/retries-and-timeouts ===

# Retries and timeouts

## Retry types

Union.ai allows you to automatically retry failing tasks. This section explains the configuration and application of retries.

Errors causing task failure are categorized into two main types, influencing the retry logic differently:

* `SYSTEM`: These errors arise from infrastructure-related failures, such as hardware malfunctions or network issues.
  They are typically transient and can often be resolved with a retry.

* `USER`: These errors are due to issues in the user-defined code, like a value error or a logic mistake, which usually require code modifications to resolve.

## Configuring retries

Retries in Union.ai are configurable to address both `USER` and `SYSTEM` errors, allowing for tailored fault tolerance strategies:

`USER` error can be handled by setting the `retries` attribute in the task decorator to define how many times a task should retry.
This requires a `FlyteRecoverableException` to be raised in the task definition, any other exception will not be retried:

```python
import random
from flytekit import task
from flytekit.exceptions.user import FlyteRecoverableException

@task(retries=3)
def compute_mean(data: List[float]) -> float:
    if random() < 0.05:
        raise FlyteRecoverableException("Something bad happened 🔥")
    return sum(data) / len(data)
```

## Retrying interruptible tasks

Tasks marked as interruptible can be preempted and retried without counting against the USER error budget.
This is useful for tasks running on preemptible compute resources like spot instances.

See [Interruptible instances](./interruptible-instances)

## Retrying map tasks

For map tasks, the interruptible behavior aligns with that of regular tasks. The retries field in the task annotation is not necessary for handling SYSTEM errors, as these are managed by the platform’s configuration. Alternatively, the USER budget is set by defining retries in the task decorator.

See [Map tasks](../map-tasks).

## Timeouts

To protect against zombie tasks that hang due to system-level issues, you can supply the timeout argument to the task decorator to make sure that problematic tasks adhere to a maximum runtime.

In this example, we make sure that the task is terminated after it’s been running for more that one hour.

```python
from datetime import timedelta

@task(timeout=timedelta(hours=1))
def compute_mean(data: List[float]) -> float:
    return sum(data) / len(data)
```

Notice that the timeout argument takes a built-in Python `timedelta` object.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/interruptible-instances ===

# Interruptible instances

> [!NOTE]
> In AWS, the term *spot instance* is used.
> In GCP, the equivalent term is *spot vm*.
> Here we use the term *interruptible instance* generically for both providers.

An interruptible instance is a machine instance made available to your cluster by your cloud provider that is not guaranteed to be always available.
As a result, interruptible instances are cheaper than regular instances.
In order to use an interruptible instance for a compute workload you have to be prepared for the possibility that an attempt to run the workload could fail due to lack of available resources and will need to be retried.

When onboarding your organization onto Union.ai, you [specify the configuration of your cluster](https://www.union.ai/docs/v1/union/user-guide/deployment/configuring-your-data-plane).
Among the options available is the choice of whether to use interruptible instances.

For each interruptible instance node group that you specify, an additional on-demand node group (though identical in every other respect to the interruptible one) will also be configured.
This on-demand node group will be used as a fallback when attempts to complete the task on the interruptible instance have failed.

## Configuring tasks to use interruptible instances

To schedule tasks on interruptible instances and retry them if they fail, specify the `interruptible` and `retries` parameters in the `@union.task` decorator.
For example:

```python
@union.task(interruptible=True, retries=3)
```

* A task will only be scheduled on an interruptible instance if it has the parameter `interruptible=True` (or if its workflow has the parameter `interruptible=True` and the task does not have an explicit `interruptible` parameter).
* An interruptible task, like any other task, can have a `retries` parameter.
* If an interruptible task does not have an explicitly set `retries` parameter, then the `retries` value defaults to `1`.
* An interruptible task with `retries=n` will be attempted `n` times on an interruptible instance.
  If it still fails after `n` attempts, the final (`n+1`) retry will be done on the fallback on-demand instance.

## Workflow level interruptible

Interruptible is also available [at the workflow level](https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows). If you set it there, it will apply to all tasks in the workflow that do not themselves have an explicit value set. A task-level interruptible setting always overrides whatever the workflow-level setting is.

## Advantages and disadvantages of interruptible instances

The advantage of using interruptible instance for a task is simply that it is less costly than using an on-demand instance (all other parameters being equal).
However, there are two main disadvantages:

1. The task is successfully scheduled on an interruptible instance but is interrupted.
In the worst case scenario, for `retries=n` the task may be interrupted `n` times until, finally, the fallback on-demand instance is used.
Clearly, this may be a problem for time-critical tasks.

2. Interruptible instances of the selected node type may simply be unavailable on the initial attempt to schedule.
When this happens, the task may hang indefinitely until an interruptible instance becomes available.
Note that this is a distinct failure mode from the previous one where an interruptible node is successfully scheduled but is then interrupted.

In general, we recommend that you use interruptible instances whenever available, but only for tasks that are not time-critical.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring ===

# Task-level monitoring

In the [Execution view](https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/viewing-workflow-executions), selecting a task within the list will open the right panel.
In that panel, you will find the **View Utilization** button:

![View Utilization](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/execution-view-right-panel-executions-view-util.png)

Clicking this will take you to the **task-level monitoring** page:

![Task-level monitoring](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring.png)

## Execution Resources

This tab displays details about the resources used by this specific task.
As an example, let's say that the definition of this task in your Python code has the following task decorator:

```python
@union.task(
   requests=Resources(cpu="44", mem="120Gi"),
   limits=Resources(cpu="44", mem="120Gi")
)
```

These parameters are reflected in the displayed **Memory Quota** and **CPU Cores Quota** charts as explained below:

### Memory Quota

![Memory Quota](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring-memory-quota.png)

This chart shows the memory consumption of the task.

* **Limit** refers to the value of the `limits.mem` parameter (the `mem` parameter within the `Resources` object assigned to `limits`)
* **Allocated** refers to the maximum of the value of the `requests.mem` parameter (the `mem` parameter within the `Resources` object assigned to `requests`) the amount of memory actually used by the task.
* **Used** refers to the actual memory used by the task.

This chart displays the ratio of memory used over memory requested, as a percentage.
Since the memory used can sometimes exceed the memory requested, this percentage may exceed 100.

### CPU Cores Quota

![CPU Cores Quota](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring-cpu-cores-quota.png)

This chart displays the number of CPU cores being used.

* **Limit** refers to the value of the `limits.cpu` parameter (the `cpu` parameter within the `Resources` object assigned to `limits`)
* **Allocated** refers to the value of the `requests.cpu` parameter (the `cpu` parameter within the `Resources` object assigned to `requests`)
* **Used** refers to the actual number of CPUs used by the task.

### GPU Memory Utilization

![GPU Memory Utilization](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring-gpu-memory-utilization.png)

This chart displays the amount of GPU memory used for each GPU.

### GPU Utilization

![GPU Utilization](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring-gpu-utilization.png)

This chart displays the GPU core utilization as a percentage of the GPUs allocated (the `requests.gpu` parameter).

## Execution Logs (Preview)

![Execution Logs (Preview)](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/task-level-monitoring-execution-logs.png)

This tab is a preview feature that displays the `stdout` (the standard output) of the container running the task.
Currently, it only shows content while the task is actually running.

## Map Tasks

When the task you want to monitor is a **map task**, accessing the utilization data is a bit different.
Here is the task execution view of map task. Open the drop-down to reveal each subtask within the map task:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/map-task-1.png)

Drill down by clicking on one of the subtasks:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/map-task-2.png)

This will bring you to the individual subtask information panel, where the **View Utilization** button for the subtask can be found:

![](../../../../_static/images/user-guide/core-concepts/tasks/task-hardware-environment/task-level-monitoring/map-task-3.png)

Clicking on View Utilization will take you to the task-level monitoring page for the subtask, which will have the same structure and features as the task-level monitoring page for a standard task (see above).

