# Development cycle
> This bundle contains all pages in the Development cycle section.
> Source: https://www.union.ai/docs/v1/union/user-guide/development-cycle/

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle ===

# Development cycle

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

This section covers developing production-ready workflows for Union.ai.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/authentication ===

# Authentication

Authentication is required to interact with Union.ai using the command-line interface (CLI). The authentication method depends on whether you are working on a local or remote machine. This guide walks you through different authentication mechanisms and helps you choose the best one for your use case.

Before diving into authentication, ensure you have installed the Union CLI. See [Local Setup](https://www.union.ai/docs/v1/union/user-guide/getting-started/local-setup) for details.

## Authentication Methods

Union CLI supports three authentication mechanisms:

| Authentication Method | Works on Local? | Works on Remote? | Use Case                                                        |
|-----------------------|-----------------|------------------|-----------------------------------------------------------------|
| PKCE (default)        | ✅ Yes          | ❌ No            | Best on local machines with a browser.                          |
| DeviceFlow            | ✅ Yes          | ✅ Yes           | Best on remote machines without a browser, like an ssh session. |
| ClientSecret          | ✅ Yes          | ✅ Yes           | Best for CI/CD or automation.                                   |

> [!NOTE]
> If you used `union create login --host <union-host-url>`, this used PKCE by default.

## 1. PKCE (Proof Key of Code Exchange)

PKCE is the default authentication method. When you run a Union CLI command, it opens a browser window for authentication.

Authentication Flow:
- Run a Union CLI command.
- You are redirected to your default browser and log in.

Example Configuration:
```yaml
admin:
  endpoint: https://<YourOrg>.hosted.unionai.cloud
  insecure: false
  authType: Pkce
logger:
  show-source: true
  level: 0
```

> [!NOTE]
> PKCE requires a local browser, making it unsuitable for using the Union CLI on remote machines within an ssh session.

## 2. DeviceFlow (Best for Remote Machines)

If you are working with the Union CLI on a remote machine without a browser, use DeviceFlow. This method provides a URL that you can open in your local browser.

Authentication Flow:
- Run a Union CLI command.
- The CLI returns a URL.
- Open the URL in your local browser and log in.

Example Configuration:
```
admin:
  endpoint: dns:///<YourOrg>.hosted.unionai.cloud
  insecure: false
  authType: DeviceFlow
logger:
  show-source: true
  level: 0
```
> [!NOTE]
> During authentication, Union.ai attempts to store an authentication token on the keyring service of the operating system. If you are authenticating from within an SSH session on a Linux based machine, there may not be a keyring service by default. If you find that browser based authentication is required every time you run or register your workflows, you may need to `run pip install keyring` or `pip install keyrings.alt` to install a keyring service on your machine.

## 3. ClientSecret (Best for CI/CD and Automation)

The ClientSecret method is a headless authentication option, ideal for automation and CI/CD pipelines.

Steps to Set Up ClientSecret Authentication:

1. Create an API Key:
    ```
    $ union create api-key admin --name my-custom-name
    ```
    The output provides a Client ID and API Key. Store the API Key securely, as it will not be shown again.

2. Set the Environment Variable:
    ```
    export UNION_API_KEY="<SECRET>"
    ```
   With this environment variable set, `union` commands do not require a configuration yaml to be referenced.

3. Give the API Key admin permissions with a [`uctl`](https://www.union.ai/docs/v1/union/api-reference/uctl-cli) command:
    ```
    uctl --config ~/path/to/a/pkce/config.yaml append identityassignment --application my-custom-name --policy admin --org <org name>
    ```
   Let's note a couple of things here. First, the config file here must be [PKCE](./authentication#1-pkce-proof-key-of-code-exchange) which will require you to authenticate though your browser. If you don't know where your config file is, check `~/.union/config.yaml`. This is where the automatically generated config would have been saved if you followed the ["Getting Started"](https://www.union.ai/docs/v1/union/user-guide/getting-started) guide. Second, your org name can be found from your endpoint. For example, if your endpoint is `https://my-org.hosted.unionai.cloud`, then your org name is `my-org`.

Now, with your `UNION_API_KEY` environment variable set, your `union` command will use the API key to authenticate automatically - no need to pass in a config file anymore!

> [!NOTE]
> Never commit API keys to version control. Use environment variables or a secure vault.

## Managing Authentication Configuration

By default, the Union CLI looks for configuration files in `~/.union/config.yaml`. You can override this by:

- Setting the `UNION_CONFIG` environment variable:
    ```
    export UNION_CONFIG=~/.my-config-location/my-config.yaml
    ```

- Using the `--config` flag:
    ```
    $ union --config ~/.my-config-location/my-config.yaml run my_script.py my_workflow
    ```

## Troubleshooting Authentication Issues

- Old configuration files causing conflicts? Remove the deprecated directory from `~/.unionai/`.

- Need to switch authentication methods? Update `~/.union/config.yaml` or use a different config file.

- Getting prompted for login every time? If using DeviceFlow on Linux, install a `keyring` service (`pip install keyring keyrings.alt`).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/project-structure ===

# Project structure

Organizing a workflow project repository effectively is key for ensuring scalability, collaboration, and easy maintenance.
Here are best practices for structuring a Union.ai workflow project repo, covering task organization, workflow management, dependency handling, and documentation.

## Recommended Directory Structure

A typical Union.ai workflow project structure could look like this:

```shell
├── .github/workflows/
├── .gitignore
├── docs/
│   └── README.md
├── src/
│   ├── core/                    # Core logic specific to the use case
│   │   ├── __init__.py
│   │   ├── model.py
│   │   ├── data.py
│   │   └── structs.py
│   ├── tasks/                   # Contains individual tasks
│   │   ├── __init__.py
│   │   ├── preprocess.py
│   │   ├── fit.py
│   │   ├── test.py
│   │   └── plot.py
│   ├── workflows/               # Contains workflow definitions
│   │   ├── __init__.py
│   │   ├── inference.py
│   │   └── train.py
│   └── orchestration/           # For helper constructs (e.g., secrets, images)
│       ├── __init__.py
│       └── constants.py
├── uv.lock
└── pyproject.toml

```

This structure is designed to ensure each project component has a clear, logical home, making it easy for team members to find and modify files.

## Organizing Tasks and Workflows

In Union.ai, tasks are the building blocks of workflows, so it’s important to structure them intuitively:

* **Tasks**: Store each task in its own file within the `tasks/` directory. If multiple tasks are closely related, consider grouping them within a module. Alternatively, each task can have its own module to allow more granular organization and sub-directories could be used to group similar tasks.

* **Workflows**: Store workflows, which combine tasks into end-to-end processes, in the `workflows/` directory. This separation ensures workflows are organized independently from core task logic, promoting modularity and reuse.

## Orchestration Directory for Helper Constructs

Include a directory, such as `orchestration/` or `union_utils/`, for constructs that facilitate workflow orchestration. This can house helper files like:

* **Secrets**: Definitions for accessing secrets (e.g., API keys) in Union.ai.

* **ImageSpec**: A tool that simplifies container management, allowing you to avoid writing Dockerfiles directly.

## Core Logic for Workflow-Specific Functionality

Use a `core/` directory for business logic specific to your workflows. This keeps the core application code separate from workflow orchestration code, improving maintainability and making it easier for new team members to understand core functionality.

## Importance of `__init__.py`

Adding `__init__.py` files within each directory is essential:

* **For Imports**: These files make the directory a Python package, enabling proper imports across modules.

* **For Union.ai's Fast Registration**: When performing fast registration, Union.ai considers the first directory without an `__init__.py` as the root. Union.ai will then package the root and its contents into a tarball, streamlining the registration process and avoiding the need to rebuild the container image every time you make code changes.

## Monorepo vs Multi-repo: Choosing a structure

When working with multiple teams, you have two main options:

* **Monorepo**: A single repository shared across all teams, which can simplify dependency management and allow for shared constructs. However, it can introduce complexity in permissions and version control for different teams.

* **Multi-repo**: Separate repositories for each team or project can improve isolation and control. In this case, consider creating shared, installable packages for constructs that multiple teams use, ensuring consistency without merging codebases.

## CI/CD

The GitHub action should:
* Register (and promote if needed) on merge to domain branch.
* Execute on merge of input YAML.
* Inject git SHA as entity version.

## Documentation and Docstrings

Writing clear docstrings is encouraged, as they are automatically propagated to the Union.ai UI. This provides useful context for anyone viewing the workflows and tasks in the UI, reducing the need to consult source code for explanations.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/projects-and-domains ===

# Projects and domains

Projects and domains are the principle organizational categories into which you group your workflows in Union.ai.

Projects define groups of task, workflows, launch plans and other entities that share a functional purpose.
Domains represent distinct steps through which the entities in a project transition as they proceed through the development cycle.

By default, Union.ai provides three domains: `development`, `staging`, and `production`.
During onboarding, you can configure your Union.ai instance to have different domains.
Speak to the Union.ai team for more information.

Projects and domains are orthogonal to each other, meaning that a project has
multiple domains and a domain has multiple projects.

Here is an example arrangement:

|           | Development       | Staging           | Production        |
|-----------|-------------------|-------------------|-------------------|
| Project 1 | workflow_1 (v2.0) | workflow_1 (v1.0) | workflow_1 (v1.0) |
| Project 2 | workflow_2 (v2.0) | workflow_2 (v1.0) | workflow_2 (v1.0) |

## Projects

Projects represent independent workflows related to specific teams, business
areas, or applications.  Each project is isolated from others, but workflows can
reference entities (workflows or tasks) from other projects to reuse
generalizable resources.

## Domains

Domains represent distinct environments orthogonal to the set of projects in
your org within Union.ai, such as development, staging, and production.  These
enable dedicated configurations, permissions, secrets, cached execution history,
and resource allocations for each environment, preventing unintended impact on
other projects and/or domains.

Using domains allows for a clear separation between environments, helping ensure
that development and testing don't interfere with production workflows.

A production domain ensures a “clean slate” so that cached development
executions do not result in unexpected behavior.  Additionally, secrets may be
configured for external production data sources.

## When to use different Union.ai projects?

Projects help group independent workflows related to specific teams, business
areas, or applications.  Generally speaking, each independent team or ML product
should have its own Union.ai project.  Even though these are isolated from one
another, teams may reference entities (workflows or tasks) from other Union.ai
projects to reuse generalizable resources.  For example, one team may create a
generalizable task to train common model types.  However, this requires advanced
collaboration and common coding standards.

When setting up workflows in Union.ai, effective use of **projects** and
**domains** is key to managing environments, permissions, and resource
allocation.  Below are best practices to consider when organizing workflows in
Union.ai.

## Projects and Domains: The Power of the Project-Domain Pair

Union.ai uses a project-domain pair to create isolated configurations for
workflows. This pairing allows for:

* **Dedicated Permissions**: Through Role-Based Access Control (RBAC), users can be assigned roles with tailored permissions—such as contributor or admin—specific to individual project-domain pairs. This allows fine-grained control over who can manage or execute workflows within each pair, ensuring that permissions are both targeted and secure. More details [here](https://www.union.ai/docs/v1/union/user-guide/administration/user-management).

* **Resource and Execution Monitoring**: Track and monitor resource utilization, executions, and performance metrics on a dashboard unique to each project-domain pair. This helps maintain visibility over workflow execution and ensures optimal performance. More details [here](https://www.union.ai/docs/v1/union/user-guide/administration/resources).

* **Resource Allocations and Quotas**: By setting quotas for each project-domain pair, Union.ai can ensure that workflows do not exceed designated limits, preventing any project or domain from unintentionally impacting resources available to others. Additionally, you can configure unique resource defaults—such as memory, CPU, and storage allocations—for each project-domain pair. This allows each pair to meet the specific requirements of its workflows, which is particularly valuable given the unique needs across different projects. More details [here](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources) and [here](https://www.union.ai/docs/v1/union/user-guide/administration/resources).

* **Configuring Secrets**: Union.ai allows you to configure secrets at the project-domain level, ensuring sensitive information, such as API keys and tokens, is accessible only within the specific workflows that need them. This enhances security by isolating secrets according to the project and domain, reducing the risk of unauthorized access across environments. More details [here](./managing-secrets).

## Domains: Clear Environment Separation

Domains represent distinct environments within Union.ai, allowing clear separation between development, staging, and production. This structure helps prevent cross-environment interference, ensuring that changes made in development or testing do not affect production workflows. Using domains for this separation ensures that workflows can evolve in a controlled manner across different stages, from initial development through to production deployment.

## Projects: Organizing Workflows by Teams, Business Areas, or Applications

Projects in Union.ai are designed to group independent workflows around specific teams, business functions, or applications. By aligning projects to organizational structure, you can simplify access control and permissions while encouraging a clean separation of workflows across different teams or use cases. Although workflows can reference each other across projects, it's generally cleaner to maintain independent workflows within each project to avoid complexity.

Union.ai’s CLI tools and SDKs provide options to specify projects and domains easily:

* **CLI Commands**: In most commands within the `union` and `uctl` CLIs, you can specify the project and domain by using the `--project` and `--domain` flags, enabling precise control over which project-domain pair a command applies to. More details [here](https://www.union.ai/docs/v1/union/api-reference/union-cli) and [here](https://www.union.ai/docs/v1/union/api-reference/uctl-cli).

* **Python SDK**: When working with the `union` SDK, you can leverage `UnionRemote` to define the project and domain for workflow interactions programmatically, ensuring that all actions occur in the intended environment. More details [here](union-remote).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/building-workflows ===

# Building workflows

## When should I decompose tasks?

There are several reasons why one may choose to decompose a task into smaller tasks.
Doing so may result in better computational performance, improved cache performance, and taking advantage of interruptible tasks.
However, decomposition comes at the cost of the overhead among tasks, including spinning up nodes and downloading data.
In some cases, these costs may be remediated by using [Actors](https://www.union.ai/docs/v1/union/user-guide/core-concepts/actors).

### Differing runtime requirements

Firstly, decomposition provides support for heterogeneous environments among the operations in the task.
For example, you may have some large task that trains a machine learning model and then uses the model to run batch inference on your test data.
However, training a model typically requires significantly more memory than inference.
For that reason, given large enough scale, it could actually be beneficial to decompose this large task into two tasks that (1) train a model and then (2) run batch inference.
By doing so, you could request significantly less memory for the second task in order to save on the expense of this workflow.
If you are working with even more data, then you might benefit from decomposing the batch inference task via `map_task` such that you may further parallelize this operation, substantially reducing the runtime of this step.
Generally speaking, decomposition provides infrastructural flexibility regarding the ability to define resources, dependencies, and execution parallelism.

### Improved cache performance

Secondly, you may decompose large tasks into smaller tasks to enable “fine-grained” caching.
In other words, each unique task provides an automated “checkpoint” system.
Thus, by breaking down a large workflow into its many natural tasks, one may minimize redundant work among multiple serial workflow executions.
This is especially useful during rapid, iterative development, during which a user may attempt to run the same workflow multiple times in a short period of time.
“Fine-grained” caching will dramatically improve productivity while executing workflows both locally and remotely.

### Take advantage of interruptible tasks

Lastly, one may utilize “fine-grained” caching to leverage interruptible tasks.
Interruptible tasks will attempt to run on spot instances or spot VMs, where possible.
These nodes are interruptible, meaning that the task may occasionally fail due to another organization willing to pay more to use it.
However, these spot instances can be substantially cheaper than their non-interruptible counterparts (on-demand instances / VMs).
By utilizing “fine-grained” caching, one may reap the significant cost savings on interruptible tasks while minimizing the effects of having their tasks being interrupted.

## When should I parallelize tasks?

In general, parallelize early and often.
A lot of Union.ai’s powerful ergonomics like caching and workflow recovery happen at the task level, as mentioned above.
Decomposing into smaller tasks and parallelizing enables for a performant and fault-tolerant workflow.

One caveat is for very short duration tasks, where the overhead of spinning up a pod and cleaning it up negates any benefits of parallelism.
With reusable containers via [Actors](https://www.union.ai/docs/v1/union/user-guide/core-concepts/actors), however, these overheads are transparently obviated, providing the best of both worlds at the cost of some up-front work in setting up that environment.
In any case, it may be useful to batch the inputs and outputs to amortize any overheads.
Please be mindful to keep the sequencing of inputs within a batch, and of the batches themselves, to ensure reliable cache hits.

### Parallelization constructs

The two main parallelization constructs in Union.ai are the [map task](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-types) and the [dynamic workflow](https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/dynamic-workflows).
They accomplish roughly the same goal but are implemented quite differently and have different advantages.

Dynamic tasks are more akin to a `for` loop, iterating over inputs sequentially.
The parallelism is controlled by the overall workflow parallelism.

Map tasks are more efficient and have no such sequencing guarantees.
They also have their own concurrency setting separate from the overall workflow and can have a minimum failure threshold of their constituent tasks.
A deeper explanation of their differences is available [here]() while examples of how to use them together can be found [here]().

## When should I use caching?

Caching should be enabled once the body of a task has stabilized.
Cache keys are implicitly derived from the task signature, most notably the inputs and outputs.
If the body of a task changes without a modification to the signature, and the same inputs are used, it will produce a cache hit.
This can result in unexpected behavior when iterating on the core functionality of the task and expecting different inputs downstream.
Moreover, caching will not introspect the contents of a `FlyteFile` for example.
If the same URI is used as input with completely different contents, it will also produce a cache hit.
For these reasons, it’s wise to add an explicit cache key so that it can be invalidated at any time.

Despite these caveats, caching is a huge time saver during workflow development.
Caching upstream tasks enable a rapid run through of the workflow up to the node you’re iterating on.
Additionally, caching can be valuable in complex parallelization scenarios where you’re debugging the failure state of large map tasks, for example.
In production, if your cluster is under heavy resource constraints, caching can allow a workflow to complete across re-runs as more and more tasks are able to return successfully with each run.
While not an ideal scenario, caching can help soften the blow of production failures.
With these caveats in mind, there are very few scenarios where caching isn’t warranted.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/setting-up-a-project ===

# Setting up a production project

In Union.ai, your work is organized in a hierarchy with the following structure:

* **Organization**: Your Union.ai instance, accessible at a specific URL like `union.my-company.com`.
* **Domains** Within an organization there are (typically) three domains, `development`, `staging`, and `production`, used to organize your code during the development process.
You can configure a custom set of domains to suit your needs during [onboarding](https://www.union.ai/docs/v1/union/user-guide/deployment/configuring-your-data-plane).
* **Projects**: Orthogonal to domains, projects are used to organize your code into logical groups. You can create as many projects as you need.

A given workflow will reside in a specific project. For example, let's say `my_workflow` is a workflow in `my_project`.

When you start work on `my_workflow` you would typically register it in the project-domain `my_project/development`.

As you work on successive iterations of the workflow you might promote `my_workflow` to `my_project/staging` and eventually `my_project/production`.

Promotion is done simply by [re-registering the workflow to the new project-domain](./running-your-code).

## Terminology

In everyday use, the term "project" is often used to refer to not just the Union.ai entity that holds a set of workflows,
but also to the local directory in which you are developing those workflows, and to the GitHub (or other SCM) repository that you are using to store the same workflow code.

To avoid confusion, in this guide we will stick to the following naming conventions:

* **Union.ai project**: The entity in your Union.ai instance that holds a set of workflows, as described above. Often referred to simply as a **project**.
* **Local project**: The local directory (usually the working directory of a GitHub repository) in which you are developing workflows.

## Create a Union.ai project

You can create a new project in the Union.ai UI by clicking on the project breadcrumb at the top left and selecting **All projects**:

![Select all projects](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/setting-up-a-project/select-all-projects.png)

This will take you to the **Projects list**:

![Projects list](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/setting-up-a-project/projects-list.png)

Click on the **New Project** button and fill in the details for your new project.

You now have a project on Union.ai into which you can register your workflows.
The next step is to set up a local workflow directory.

## Creating a local production project directory using `union init`

Earlier, in the [Getting started](../getting-started/_index) section we used `union init`
to create a new local project based on the `union-simple`.

Here, we will do the same, but use the `union-production` template. Perform the following command:

```shell
$ union init --template union-production my-project
```

## Directory structure

In the `basic-example` directory you’ll see the following file structure:

```shell
├── LICENSE
├── README.md
├── docs
│   └── docs.md
├── pyproject.toml
├── src
│   ├── core
│   │   ├── __init__.py
│   │   └── core.py
│   ├── orchestration
│   │   ├── __init__.py
│   │   └── orchestration.py
│   ├── tasks
│   │   ├── __init__.py
│   │   └── say_hello.py
│   └── workflows
│       ├── __init__.py
│       └── hello_world.py
└── uv.lock
```

You can create your own conventions and file structure for your production projects, but this tempkate provides a good starting point.

However, the separate `workflows` subdirectory and the contained `__init__.py` file are significant.
We will discuss them when we cover the [registration process](./running-your-code).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/local-dependencies ===

# Local dependencies

During the development cycle you will want to be able to run your workflows both locally on your machine and remotely on Union.ai.
To enable this, you need to ensure that the required dependencies are installed in both places.
Here we will explain how to install your dependencies locally.
For information on how to make your dependencies available on Union.ai, see [ImageSpec](./image-spec).

## Define your dependencies in your `pyproject.toml`

We recommend using the [`uv` tool](https://docs.astral.sh/uv/) for project and dependency management.

When using the best way declare your dependencies is to list them under `dependencies` in your `pyproject.toml` file, like this:

```toml
[project]
name = "union-simple"
version = "0.1.0"
description = "A simple Union.ai project"
readme = "README.md"
requires-python = ">=3.9,<3.13"
dependencies = ["union"]
```

## Create a Python virtual environment

Ensure that your Python virtual environment is properly set up with the required dependencies.

Using `uv`, you can install the dependencies with the command:

```shell
$ uv sync
```

You can then activate the virtual environment with:

```shell
$ source .venv/bin/activate
```

> [!NOTE] `activate` vs `uv run`
> When running the Union CLI within your local project you must run it in the virtual environment _associated with_ that project.
>
> To run `union` within your project's virtual environment using `uv`, you can prefix it use the `uv run` command. For example:
>
> `uv run union ...`
>
> Alternatively, you can activate the virtual environment with `source .venv/bin/activate` and then run the `union` command directly.
> In our examples we assume that you are doing the latter.

Having installed your dependencies in your local environment, you can now [run your workflows locally using `union run`](./running-your-code).

The next step is to ensure that the same dependencies are also [available in the remote environment on Union.ai](./image-spec).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/image-spec ===

# ImageSpec

During the development cycle you will want to be able to run your workflows both locally on your machine and remotely on Union.ai,
so you will need to ensure that the required dependencies are installed in both environments.

Here we will explain how to set up the dependencies for your workflow to run remotely on Union.ai.
For information on how to make your dependencies available locally, see [Local dependencies](./local-dependencies).

When a workflow is deployed to Union.ai, each task is set up to run in its own container in the Kubernetes cluster.
You specify the dependencies as part of the definition of the container image to be used for each task using the `ImageSpec` class.
For example::

```python
import union

image_spec = union.ImageSpec(
builder="union",
    name="say-hello-image",
    requirements="uv.lock",
)

@union.task(container_image=image_spec)
def say_hello(name: str) -> str:
    return f"Hello, {name}!"

@union.workflow
def hello_world_wf(name: str = "world") -> str:
    greeting = say_hello(name=name)
    return greeting
```

Here, the `ImageSpec` class is used to specify the container image to be used for the `say_hello` task.

* The `builder` parameter specifies how the image should be built. The value `union` means that the image will be built using Union.ai's built-in cloud builder.
  In some cases you may want to build the image locally on your machine and push it to a container registry. In that case, you would remove the `builder` parameter
  (or set it to `envd`) and add a `registry` parameter with the URL of the registry to push the image to. See below for more details.

* The `name` parameter specifies the name of the image. This name will be used to identify the image in the container registry.

* The `requirements` parameter specifies the path to a file (relative to the directory in which the `union run` or `union register` command is invoked) that specifies the dependencies to be installed in the image.
  The file may be:
  * A `requirements.txt` file.
  * A `uv.lock` file generated by the `uv sync` command.
  * A `poetry.lock` file generated by the `poetry install` command.
  * A `pyproject.toml` file.

When you execute the `union run` or `union register` command, Union.ai will build the container image defined in `ImageSpec` block
(as well as registering the tasks and workflows defined in your code).

## Union.ai cloud image builder {#cloud-image-builder}

If you have specified `builder="union"` in the `ImageSpec`, Union.ai will build the image using its `ImageBuilder` service in the cloud
and registered the image in Union.ai's own container registry. From there it will be pulled and installed in the task container when it spins up.
All this is done transparently and does not require any set up by the user.

## Local image builder

> [!NOTE] Local image build in BYOC
> In Union.ai BYOC, you can build images from ImageSpec either using the Union.ai cloud image builder (by specifying `builder="union"`) or on your local machine
> (by omitting the `builder` parameter or specifying `builder="envd"`).
> In Union.ai Serverless, images defined by `ImageSpec` are always built using the Union.ai cloud image builder.
> Local image building is not supported in Serverless.

If you have not specified a `builder` or have specified `builder="envd"`, Union.ai will build the image locally on your machine and push it to the registry you specify.
This also requires that you specify a `registry` parameter in the `ImageSpec`.
For example:

```python
image_spec = union.ImageSpec(
builder="envd",
name="say-hello-image",
requirements="uv.lock",
registry="https://ghcr.io/<my-github-org>",
)
```

Here we assume you are using GitHub's GHCR, and that you substitute your GitHub organization name for `<my-github-org>`.

### Local container engine

To enable local image building you must have an [OCI-compatible](https://opencontainers.org/) container engine, like [Docker](https://docs.docker.com/get-docker/), installed and running locally.
Other options include [Podman](https://podman.io/), [LXD](https://linuxcontainers.org/lxd/introduction/), or [Containerd](https://containerd.io/).

### Access to a container registry

You will also need access to a container registry.
You must specify the URL of the registry in the `registry` parameter of the `ImageSpec`.

Above we used the GitHub Container Registry (GHCR) that comes as part of your GitHub account.
For more information, see [Working with the Container registry](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry).

You may use another container registry if you prefer,
such as [Docker Hub](https://hub.docker.com/),
[Amazon Elastic Container Registry (ECR)](../integrations/enabling-aws-resources/enabling-aws-ecr),
or [Google Artifact Registry (GAR)](../integrations/enabling-gcp-resources/enabling-google-artifact-registry).

You will need to set up your local Docker client to authenticate to GHCR in order for `union` to be able to push the image built according to the `ImageSpec` to GHCR.

Follow the directions in [Working with the Container registry > Authenticating to the Container registry](https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry#authenticating-to-the-container-registry).

### Make your image accessible to Union.ai

In addition to making sure your registry is accessible from your local machine, you will need to ensure that the specific image, once pushed to the registry, is itself publicly accessible.

> [!NOTE] Make your image public
> Note that in the case of our example registry (GHCR), making the image public can only be done once the image _has been_ pushed.
> This means that you will need to register your workflow first, then make the image public and then run the workflow from the Union.ai UI.
> If you try to run the workflow before making the image public (for example by doing a `union run` which both registers and runs immediately)
> the workflow execution will fail with an `ImagePullBackOff `error.

In the GitHub Container Registry, switch the visibility of your container image to Public. For more information, see [Configuring a package's access control and visibility](https://docs.github.com/en/packages/learn-github-packages/configuring-a-packages-access-control-and-visibility#about-inheritance-of-access-permissions-and-visibility).

At this point, you can run the workflow from the Union.ai interface.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/running-your-code ===

# Running your code

## Set up your development environment

If you have not already done so, follow the [Getting started](https://www.union.ai/docs/v1/union/user-guide/getting-started) section to sign in to Union.ai, and set up your local environment.

## CLI commands for running your code

The Union CLI and Uctl CLI provide commands that allow you to deploy and run your code at different stages of the development cycle:

<!-- TODO: Link to the union commands below to the reference section -->

1. `union run`: For deploying and running a single script immediately in your local Python environment.
2. `union run --remote`: For deploying and running a single script immediately in the cloud on Union.ai.
3. `union register`: For deploying multiple scripts to Union.ai and running them from the Web interface.

4. `union package` and `uctl register`: For deploying workflows to production and for scripting within a CI/CD pipeline.

> [!NOTE]
> In some cases, you may want to test your code in a local cluster before deploying it to Union.ai.
> This step corresponds to using the commands 2, 3, or 4, but targeting your local cluster instead of Union.ai.
> For more details, see [Running in a local cluster](./running-in-a-local-cluster).

## Running a script in local Python with `union run` {#running-a-script-in-local-python}

During the development cycle you will want to run a specific workflow or task in your local Python environment to test it.
To quickly try out the code locally use `union run`:

```shell
$ union run workflows/example.py wf --name 'Albert'
```

Here you are invoking `union run` and passing the name of the Python file and the name of the workflow within that file that you want to run.
In addition, you are passing the named parameter `name` and its value.

This command is useful for quickly testing a workflow locally to check for basic errors.
For more details see [union run details](./details-of-union-run).

## Running a script on Union.ai with `union run --remote`

To quickly run a workflow on Union.ai, use `union run --remote`:

```shell
$ union run --remote --project basic-example --domain development workflows/example.py wf --name 'Albert'
```

Here we are invoking `union run --remote` and passing:
* The project, `basic-example`
* The domain, `development`
* The Python file, `workflows/example.py`
* The workflow within that file that you want to run, `wf`
* The named parameter `name`, and its value

This command will:
* Build the container image defined in your `ImageSpec`.

* Package up your code and deploy it to the specified project and domain in Union.ai.
* Run the workflow on Union.ai.

This command is useful for quickly deploying and running a specific workflow on Union.ai.
For more details see [union run details](./details-of-union-run).

This command is useful for quickly deploying and running a specific workflow on Union.ai.
For more details see [union run details](./details-of-union-run).

## Running tasks through uctl

This is a multi-step process where we create an execution spec file, update the spec file, and then create the execution.

### Generate execution spec file

```shell
$ uctl launch task --project flytesnacks --domain development --name workflows.example.generate_normal_df --version v1
```

### Update the input spec file for arguments to the workflow

```yaml
iamRoleARN: 'arn:aws:iam::12345678:role/defaultrole'
inputs:
  n: 200
  mean: 0.0
  sigma: 1.0
kubeServiceAcct: ""
targetDomain: ""
targetProject: ""
task: workflows.example.generate_normal_df
version: "v1"
```

### Create execution using the exec spec file

```shell
$ uctl create execution -p flytesnacks -d development --execFile exec_spec.yaml
```

### Monitor the execution by providing the execution id from create command

```shell
$ uctl get execution -p flytesnacks -d development <execid>
```

## Running workflows through uctl

Workflows on their own are not runnable directly. However, a launchplan is always bound to a workflow (at least the auto-create default launch plan) and you can use
launchplans to `launch` a workflow. The `default launchplan` for a workflow has the same name as its workflow and all argument defaults are also identical.

Tasks also can be executed using the launch command.
One difference between running a task and a workflow via launchplans is that launchplans cannot be associated with a
task. This is to avoid triggers and scheduling.

## Running launchplans through uctl

This is multi-step process where we create an execution spec file, update the spec file and then create the execution.
More details can be found [here](https://www.union.ai/docs/v1/union/api-reference/uctl-cli/uctl-create/uctl-create-execution).

### Generate an execution spec file

```shell
$ uctl get launchplan -p flytesnacks -d development myapp.workflows.example.my_wf  --execFile exec_spec.yaml
```

### Update the input spec file for arguments to the workflow

```yaml
inputs:
    name: "adam"
```

### Create execution using the exec spec file

```shell
$ uctl create execution -p flytesnacks -d development --execFile exec_spec.yaml
```

### Monitor the execution by providing the execution id from create command

```bash
$ uctl get execution -p flytesnacks -d development <execid>
```

## Deploying your code to Union.ai with `union register`

```shell
$ union register workflows --project basic-example --domain development
```

Here we are registering all the code in the `workflows` directory to the project `basic-example` in the domain `development`.

This command will:
* Build the container image defined in your `ImageSpec`.
* Package up your code and deploy it to the specified project and domain in Union.ai.
  The package will contain the code in the Python package located in the `workflows` directory.
  Note that the presence of the `__init__.py` file in this directory is necessary in order to make it a Python package.

The command will not run the workflow. You can run it from the Web interface.

This command is useful for deploying your full set of workflows to Union.ai for testing.

### Fast registration

`union register` packages up your code through a mechanism called fast registration.
Fast registration is useful when you already have a container image that’s hosted in your container registry of choice, and you change your workflow/task code without any changes in your system-level/Python dependencies. At a high level, fast registration:

* Packages and zips up the directory/file that you specify as the argument to `union register`, along with any files in the root directory of your project. The result of this is a tarball that is packaged into a `.tar.gz` file, which also includes the serialized task (in `protobuf` format) and workflow specifications defined in your workflow code.

* Registers the package to the specified cluster and uploads the tarball containing the user-defined code into the configured blob store (e.g. S3, GCS).

At workflow execution time, Union.ai knows to automatically inject the zipped up task/workflow code into the running container, thereby overriding the user-defined tasks/workflows that were originally baked into the image.

<!-- TODO: determine if this section should be included. There was some discussion of flyteignore no longer being relevant

Ignoring files during fast registration

In step (1) of the fast registration process, by default Flyte will package up all user-defined code at the root of your project. In some cases, your project directory may contain datasets, model files, and other potentially large artifacts that you want to exclude from the tarball.

You can do so by specifying these files in a .flyteignore file in the root of your project. You can also use .gitignore or .dockerignore if you’d like to avoid adding another file.
-->

> [!NOTE] `WORKDIR`, `PYTHONPATH`, and `PATH`
> When executing any of the above commands, the archive that gets creates is extracted wherever the `WORKDIR` is set.
> This can be handled directly via the `WORKDIR` directive in a `Dockerfile`, or specified via `source_root` if using `ImageSpec`.
> This is important for discovering code and executables via `PATH` or `PYTHONPATH`.
> A common pattern for making your Python packages fully discoverable is to have a top-level `src` folder, adding that to your `PYTHONPATH`,
> and making all your imports absolute.
> This avoids having to “install” your Python project in the image at any point e.g. via `pip install -e`.

## Inspecting executions

Uctl supports inspecting execution by retrieving its details. For a deeper dive, refer to the
[Reference](https://www.union.ai/docs/v1/union/user-guide/api-reference/uctl-cli/_index) guide.

Monitor the execution by providing the execution id from create command which can be task or workflow execution.

```shell
$ uctl get execution -p flytesnacks -d development <execid>
```

For more details use `--details` flag which shows node executions along with task executions on them.

```shell
$ uctl get execution -p flytesnacks -d development <execid> --details
```

If you prefer to see yaml/json view for the details then change the output format using the -o flag.

```shell
$ uctl get execution -p flytesnacks -d development <execid> --details -o yaml
```

To see the results of the execution you can inspect the node closure outputUri in detailed yaml output.

```shell
"outputUri": "s3://my-s3-bucket/metadata/propeller/flytesnacks-development-<execid>/n0/data/0/outputs.pb"
```

## Deploying your code to production

### Package your code with `union package`

The combination of `union package` and `uctl register` is the standard way of deploying your code to production.
This method is often used in scripts to [build and deploy workflows in a CI/CD pipeline](./ci-cd-deployment).

First, package your workflows:

```shell
$ union --pkgs workflows package
```

This will create a tar file called `flyte-package.tgz` of the Python package located in the `workflows` directory.
Note that the presence of the `__init__.py` file in this directory is necessary in order to make it a Python package.

> [!NOTE]
> You can specify multiple workflow directories using the following command:
>
> `union --pkgs DIR1 --pkgs DIR2 package ...`
>
> This is useful in cases where you want to register two different projects that you maintain in a single place.
>
> If you encounter a ModuleNotFoundError when packaging, use the --source option to include the correct source paths. For instance:
>
> `union --pkgs <dir1> package --source ./src -f`

### Register the package with `uctl register`

Once the code is packaged you register it using the `uctl` CLI:

```shell
$ uctl register files \
      --project basic-example \
      --domain development \
      --archive flyte-package.tgz \
      --version "$(git rev-parse HEAD)"
```

Let’s break down what each flag is doing here:

* `--project`: The target Union.ai project.

* `--domain`:  The target domain. Usually one of `development`, `staging`, or `production`.

* `--archive`: This argument allows you to pass in a package file, which in this case is the `flyte-package.tgz` produced earlier.

* `--version`: This is a version string that can be any string, but we recommend using the Git SHA in general, especially in production use cases.

See [Uctl CLI](https://www.union.ai/docs/v1/union/user-guide/api-reference/uctl-cli/_index) for more details.

## Using union register versus union package + uctl register

As a rule of thumb, `union register` works well when you are working on a single cluster and iterating quickly on your task/workflow code.

On the other hand, `union package` and `uctl register` is appropriate if you are:

* Working with multiple clusters, since it uses a portable package

* Deploying workflows to a production context

* Testing your workflows in your CI/CD infrastructure.

> [!NOTE] Programmatic Python API
> You can also perform the equivalent of the three methods of registration using a [UnionRemote object](../development-cycle/union-remote/_index).

## Image management and registration method

The `ImageSpec` construct available in `union` also has a mechanism to copy files into the image being built.
Its behavior depends on the type of registration used:

* If fast register is used, then it’s assumed that you don’t also want to copy source files into the built image.

* If fast register is not used (which is the default for `union package`, or if `union register --copy none` is specified), then it’s assumed that you do want source files copied into the built image.

* If your `ImageSpec` constructor specifies a `source_root` and the `copy` argument is set to something other than `CopyFileDetection.NO_COPY`, then files will be copied regardless of fast registration status.

## Building your own images

While we recommend that you use `ImageSpec` and the `union` cloud image builder, you can, if you wish build and deploy your own images.

You can start with `union init --template basic-template-dockerfile`, the resulting template project includes a `docker_build.sh` script that you can use to build and tag a container according to the recommended practice:

```shell
$ ./docker_build.sh
```

By default, the `docker_build.sh` script:

* Uses the `PROJECT_NAME` specified in the union  command, which in this case is my_project.

* Will not use any remote registry.

* Uses the Git SHA to version your tasks and workflows.

You can override the default values with the following flags:

```shell
$ ./docker_build.sh -p PROJECT_NAME -r REGISTRY -v VERSION
```

For example, if you want to push your Docker image to Github’s container registry you can specify the `-r ghcr.io` flag.

> [!NOTE]
> The `docker_build.sh` script is purely for convenience; you can always roll your own way of building Docker containers.

Once you’ve built the image, you can push it to the specified registry. For example, if you’re using Github container registry, do the following:

```shell
$ docker login ghcr.io
$ docker push TAG
```

## CI/CD with Flyte and GitHub Actions

You can use any of the commands we learned in this guide to register, execute, or test Union.ai workflows in your CI/CD process.
Union.ai provides two GitHub actions that facilitate this:

* `flyte-setup-action`: This action handles the installation of uctl in your action runner.

* `flyte-register-action`: This action uses `uctl register` under the hood to handle registration of packages, for example, the `.tgz` archives that are created by `union package`.

### Some CI/CD best practices

In the case where workflows are registered on each commit in your build pipelines, you can consider the following recommendations and approach:

* **Versioning Strategy** : Determining the version of the build for different types of commits makes them consistent and identifiable.
  For commits on feature branches, use `{branch-name}-{short-commit-hash}` and for the ones on main branches, use `main-{short-commit-hash}`.
  Use version numbers for the released (tagged) versions.

* **Workflow Serialization and Registration** : Workflows should be serialized and registered based on the versioning of the build and the container image.
  Depending on whether the build is for a feature branch or `main`, the registration domain should be adjusted accordingly.

* **Container Image Specification** : When managing multiple images across tasks within a workflow, use the `--image` flag during registration to specify which image to use.
  This avoids hardcoding the image within the task definition, promoting reusability and flexibility in workflows.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/overriding-parameters ===

# Overriding parameters

The `with_overrides` method allows you to specify parameter overrides on [tasks](../core-concepts/tasks/_index),
[subworkflows, and sub-launch plans](https://www.union.ai/docs/v1/union/user-guide/core-concepts/workflows/subworkflows-and-sub-launch-plans) at execution time.
This is useful when you want to change the behavior of a task, subworkflow, or sub-launch plan without modifying the original definition.

## Task parameters

When calling a task, you can specify the following parameters in `with_overrides`:

* `accelerator`: Specify [accelerators](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/accelerators).
* `cache_serialize`: Enable [cache serialization](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching).
* `cache_version`: Specify the [cache version](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching).
* `cache`: Enable [caching](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching).
* `container_image`: Specify a [container image](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-software-environment/image-spec).
* `interruptible`: Specify whether the task is [interruptible](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/interruptible-instances).
* `limits`: Specify [resource limits](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources).
* `name`: Give a specific name to this task execution. This will appear in the workflow flowchart in the UI (see **Development cycle > Overriding parameters > Task parameters > Using `with_overrides` with `name` and `node_name`**).
* `node_name`: Give a specific name to the DAG node for this task. This will appear in the workflow flowchart in the UI (see **Development cycle > Overriding parameters > Task parameters > Using `with_overrides` with `name` and `node_name`**).
* `requests`: Specify [resource requests](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources).
* `retries`: Specify the [number of times to retry this task](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-parameters).
* `task_config`: Specify a [task config](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-parameters).
* `timeout`: Specify the [task timeout](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-parameters).

For example, if you have a task that does not have caching enabled, you can use `with_overrides` to enable caching at execution time as follows:

```python
my_task(a=1, b=2, c=3).with_overrides(cache=True)
```

### Using `with_overrides` with `name` and `node_name`

Using `with_overrides` with `name` on a task is a particularly useful feature.
For example, you can use `with_overrides(name="my_task")` to give a specific name to a task execution, which will appear in the UI.
The name specified can be chosen or generated at invocation time without modifying the task definition.

```python
@union.workflow
def wf() -> int:
    my_task(a=1, b=1, c=1).with_overrides(name="my_task_1")
    my_task(a=2, b=2, c=2).with_overrides(name="my_task_2", node_name="my_node_2")
    return my_task(a=1, b=1, c=1)
```

The above code would produce the following workflow display in the UI:

![Overriding name](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/overriding-parameters/override-name.png)

There is also a related parameter called `node_name` that can be used to give a specific name to the DAG node for this task.
The DAG node name is usually autogenerated as `n0`, `n1`, `n2`, etc. It appears in the `node` column of the workflow table.
Overriding `node_name` results in the autogenerated name being replaced by the specified name:

![Overriding node name](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/overriding-parameters/override-node-name.png)

Note that the `node_name` was specified as `my_node_2` in the code but appears as `my_node_2` in the UI. This is to the fact that Kubernetes node names cannot contain underscores. Union.ai automatically alters the name to be Kubernetes-compliant.

## Subworkflow and sub-launch plan parameters

When calling a workflow or launch plan from within a high-level workflow
(in other words, when invoking a subworkflow or sub-launch plan),
you can specify the following parameters in `with_overrides`:

* `cache_serialize`: Enable [cache serialization](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching).
* `cache_version`: Specify the [cache version](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching).
* `cache`: Enable [caching](https://www.union.ai/docs/v1/union/user-guide/core-concepts/caching).

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/run-details ===

# Run details

The `union run` command is used to run a specific workflow or task in your local Python environment or on Union.ai.
In this section we will discuss some details of how and why to use it.

## Passing parameters

`union run` enables you to execute a specific workflow using the syntax:

```shell
$ union run <path/to/script.py> <workflow_or_task_function_name>
```

Keyword arguments can be supplied to `union run` by passing them in like this:

```shell
--<keyword> <value>
```

For example, above we invoked `union run` with script `example.py`, workflow `wf`, and named parameter `name`:

```shell
$ union run example.py wf --name 'Albert'
```

The value `Albert` is passed for the parameter `name`.

With `snake_case` argument names, you have to convert them to `kebab-case`. For example,
if the code were altered to accept a `last_name` parameter then the following command:

```shell
$ union run example.py wf --last-name 'Einstein'
```

This passes the value `Einstein` for that parameter.

## Why `union run` rather than `python`?

You could add a `main` guard at the end of the script like this:

```python
if __name__ == "__main__":
    training_workflow(hyperparameters={"C": 0.1})
```

This would let you run it with `python example.py`, though you have to hard code your arguments.

It would become even more verbose if you want to pass in your arguments:

```python
if __name__ == "__main__":
    import json
    from argparse import ArgumentParser

    parser = ArgumentParser()
    parser.add_argument("--hyperparameters", type=json.loads)
    ...  # add the other options

    args = parser.parse_args()
    training_workflow(hyperparameters=args.hyperparameters)Py
```

`union run` is less verbose and more convenient for running workflows with arguments.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/debugging-with-interactive-tasks ===

# Debugging with interactive tasks

With interactive tasks you can inspect and debug live task code directly in the UI in an embedded Visual Studio Code IDE.

## Enabling interactive tasks in your code

To enable interactive tasks, you need to:

* Include `flytekitplugins-flyteinteractive` as a dependency
* Use the `@vscode` decorator on the tasks you want to make interactive.

The `@vscode` decorator, when applied, converts a task into a Visual Studio Code
server during runtime.  This process overrides the standard execution of the
task’s function body, initiating a command to start a Visual Studio Code server
instead.

<!-- TODO: Remove mention of flytesnacks and flytekit -->

> [!NOTE] No need for ingress or port forwarding
> The Union.ai interactive tasks feature is an adaptation of the open-source
> [FlyteInteractive plugin](https://www.union.ai/docs/v1/union/integrations/external-service-backend-plugins/flyteinteractive-plugin).
> It improves on the open-source version by removing the need for ingress
> configuration or port forwarding, providing a more seamless debugging
> experience.

## Basic example

The following example demonstrates interactive tasks in a simple workflow.

### requirements.txt

This `requirements.txt` file is used by all the examples in this section:

```text
flytekit
flytekitplugins-flyteinteractive
```

### example.py

```python
"""Union.ai workflow example of interactive tasks (@vscode)"""

import union
from flytekitplugins.flyteinteractive import vscode

image = union.ImageSpec(
    registry="<my-image-registry>",
    name="interactive-tasks-example",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="requirements.txt"
)

@union.task(container_image=image)
@vscode
def say_hello(name: str) -> str:
    s = f"Hello, {name}!"
    return s

@union.workflow
def wf(name: str = "world") -> str:
    greeting = say_hello(name=name)
    return greeting
```

## Register and run the workflow

To register the code to a project on Union.ai and run the workflow, follow the
directions in [Running your code](../development-cycle/running-your-code)

## Access the IDE

1. Select the first task in the workflow page (in this example the task is called `say_hello`).
   The task info pane will appear on the right side of the page.
2. Wait until the task is in the **Running** state and the **VSCode (User)** link appears.
3. Click the **VSCode (User)** link.

![VSCode link](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/debugging-with-interactive-tasks/vscode-link.png)

## Inspect the task code

Once the IDE opens, you will be able to see your task code in the editor.

![Inspect code](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/debugging-with-interactive-tasks/inspect-code.png)

## Interactive debugging

To run the task in VSCode, click the _Run and debug_ symbol on the left rail of the IDE and select the **Interactive Debugging** configuration.

![Interactive debugging](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/debugging-with-interactive-tasks/interactive-debugging.png)

Click the **Play** button beside the configuration drop-down to run the task.
This will run your task with inputs from the previous task. To inspect intermediate states, set breakpoints in the Python code and use the debugger for tracing.

> [!NOTE] No task output written to Union.ai storage
> It’s important to note that during the debugging phase the task runs entirely within VSCode and does not write the output to Union.ai storage.

## Update your code

You can edit your code in the VSCode environment and run the task again to see the changes.
Note, however, that the changes will not be automatically persisted anywhere.
You will have to manually copy and paste the changes back to your local environment.

## Resume task

After you finish debugging, you can resume your task with updated code by executing the **Resume Task** configuration.
This will terminate the code server, run the task with inputs from the previous task, and write the output to Union.ai storage.

> [!NOTE] Remember to persist your code
> Remember to persist your code (for example, by checking it into GitHub) before resuming the task, since you will lose the connection to the VSCode server afterwards.

![Resume task](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/debugging-with-interactive-tasks/resume-task.png)

## Auxiliary Python files

You will notice that aside from your code, there are some additional files in the VSCode file explorer that have been automatically generated by the system:

### flyteinteractive_interactive_entrypoint.py

The `flyteinteractive_interactive_entrypoint.py` script implements the **Interactive Debugging** action that we used above:

![Interactive entrypoint](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/debugging-with-interactive-tasks/flyteinteractive-interactive-entrypoint-py.png)

### flyteinteractive_resume_task.py

The `flyteinteractive_resume_task.py` script implements the **Resume Task** action that we used above:

![Resume task](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/debugging-with-interactive-tasks/flyteinteractive-resume-task-py.png)

### launch.json

The `launch.json` file in the `.vscode` directory configures the **Interactive Debugging** and **Resume Task** actions.

![launch.json](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/debugging-with-interactive-tasks/launch-json.png)

## Integrated terminal

In addition to using the convenience functions defined by the auxiliary files, you can also run your Python code script directly from the integrated terminal using `python <script_name>.py` (in this example, `python hello.py`).

![Interactive terminal](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/debugging-with-interactive-tasks/interactive-terminal.png)

## Install extensions

As with local VSCode, you can install a variety of extensions to assist development.
Available extensions differ from official VSCode for legal reasons and are hosted on the [Open VSX Registry](https://open-vsx.org/).

Python and Jupyter extensions are installed by default.
Additional extensions can be added by defining a configuration object and passing it to the `@vscode` decorator, as shown below:

### example-extensions.py

```python
"""Union.ai workflow example of interactive tasks (@vscode) with extensions"""

import union
from flytekitplugins.flyteinteractive import COPILOT_EXTENSION, VscodeConfig, vscode

image = union.ImageSpec(
    registry="<my-image-registry>",
    name="interactive-tasks-example",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="requirements.txt"
)

config = VscodeConfig()
config.add_extensions(COPILOT_EXTENSION) # Use predefined URL
config.add_extensions(
    "https://open-vsx.org/api/vscodevim/vim/1.27.0/file/vscodevim.vim-1.27.0.vsix"
) # Copy raw URL from Open VSX

@union.task(container_image=image)
@vscode(config=config)
def say_hello(name: str) -> str:
    s = f"Hello, {name}!"
    return s

@union.workflow
def wf(name: str = "world") -> str:
    greeting = say_hello(name=name)
    return greeting
```

## Manage resources

To manage resources, the VSCode server is terminated after a period of idleness (no active HTTP connections).
Idleness is monitored via a heartbeat file.

The `max_idle_seconds` parameter can be used to set the maximum number of seconds the VSCode server can be idle before it is terminated.

### example-manage-resources.py

```python
"""Union.ai workflow example of interactive tasks (@vscode) with max_idle_seconds"""

import union
from flytekitplugins.flyteinteractive import vscode

image = union.ImageSpec(
    registry="<my-image-registry>",
    name="interactive-tasks-example",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="requirements.txt"
)

@union.task(container_image=image)
@vscode(max_idle_seconds=60000)
def say_hello(name: str) -> str:
   s = f"Hello, {name}!"
   return s

@union.workflow
def wf(name: str = "world") -> str:
    greeting = say_hello(name=name)
    return greeting
```

## Pre and post hooks

Interactive tasks also allow the registration of functions to be executed both before and after VSCode starts.
This can be used for tasks requiring setup or cleanup.

### example-pre-post-hooks.py

```python
"""Union.ai workflow example of interactive tasks (@vscode) with pre and post hooks"""

import union
from flytekitplugins.flyteinteractive import vscode

image = union.ImageSpec(
    registry="<my-image-registry>",
    name="interactive-tasks-example",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="requirements.txt"
)

def set_up_proxy():
    print("set up")

def push_code():
    print("push code")

@union.task(container_image=image)
@vscode(pre_execute=set_up_proxy, post_execute=push_code)
def say_hello(name: str) -> str:
    s = f"Hello, {name}!"
    return s

@union.workflow
def wf(name: str = "world") -> str:
    greeting = say_hello(name=name)
    return greeting
```

## Only initiate VSCode on task failure

The system can also be set to only initiate VSCode _after a task failure_, preventing task termination and thus enabling inspection.
This is done by setting the `run_task_first` parameter to `True`.

### example-run-task-first.py

```python
"""Union.ai workflow example of interactive tasks (@vscode) with run_task_first"""

import union
from flytekitplugins.flyteinteractive import vscode

image = union.ImageSpec(
    registry="<my-image-registry>",
    name="interactive-tasks-example",
    base_image="ghcr.io/flyteorg/flytekit:py3.11-latest",
    requirements="requirements.txt"
)

@union.task(container_image=image)
@vscode(run_task_first=True)
def say_hello(name: str) -> str:
    s = f"Hello, {name}!"
    return s

@union.workflow
def wf(name: str = "world") -> str:
    greeting = say_hello(name=name)
    return greeting
```

## Debugging execution issues

The inspection of task and workflow execution provides log links to debug things further.

Using `--details` flag you can view node executions with log links.

```shell
└── n1 - FAILED - 2021-06-30 08:51:07.3111846 +0000 UTC - 2021-06-30 08:51:17.192852 +0000 UTC
    └── Attempt :0
        └── Task - FAILED - 2021-06-30 08:51:07.3111846 +0000 UTC - 2021-06-30 08:51:17.192852 +0000 UTC
        └── Logs :
            └── Name :Kubernetes Logs (User)
            └── URI :http://localhost:30082/#/log/flytectldemo-development/f3a5a4034960f4aa1a09-n1-0/pod?namespace=flytectldemo-development
```

Additionally, you can check the pods launched in `\<project>-\<domain>` namespace

```shell
$ kubectl get pods -n <project>-<domain>
```

The launched pods will have a prefix of execution name along with suffix of `nodeId`:

```shell
NAME                        READY   STATUS             RESTARTS   AGE
f65009af77f284e50959-n0-0   0/1     ErrImagePull       0          18h
```

For example, above we see that the `STATUS` indicates an issue with pulling the image.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/managing-secrets ===

# Managing secrets

You can use secrets to interact with external services.

## Creating secrets

### Creating a secret on the command line

To create a secret, use the `union create secret` command:

```shell
$ union create secret my_secret_name
```

You'll be prompted to enter a secret value in the terminal:

```
Enter secret value: ...
```

### Creating a secret from a file

To create a secret from a file, run the following command:

```shell
$ union create secret my_secret_name -f /path/to/secret_file
```

### Scoping secrets

* When you create a secret without specifying a project` or domain, as we did above, the secret will be available across all projects-domain combinations.
* If you specify only a domain, the secret will be available across all projects, but only in that domain.
* If you specify both a project and a domain, the secret will be available in that project-domain combination only.
* If you specify only a project, you will get an error.

For example, to create a secret so that it is only available in `my_project-development`, you would run:

```shell
$ union create secret my_secret_name --project my_project --domain development
```

## Listing secrets

You can list existing secrets with the `union get secret` command.
For example, the following command will list all secrets in the organization:

```shell
$ union get secret
```

Specifying either or both of the `--project` and `--domain` flags will list the secrets that are **only** available in that project and/or domain.

For example, to list the secrets that are only available in `my_project` and domain `development`, you would run:

```shell
$ union get secret --project my_project --domain development
```

## Using secrets in workflow code

Note that a workflow can only access secrets whose scope includes the project and domain of the workflow.

### Using a secret created on the command line

To use a secret created on the command line, see the example code below. To run the example code:

1. **Development cycle > Managing secrets > Creating secrets > Creating a secret on the command line** with the key `my_secret`.
2. Copy the following example code to a new file and save it as `using_secrets.py`.
3. Run the script with `union run --remote using_secrets.py main`.

```python
import union

@union.task(secret_requests=[union.Secret(key="my_secret")])
def t1():
    secret_value = union.current_context().secrets.get(key="my_secret")
    # do something with the secret. For example, communication with an external API.
    ...
```

> [!WARNING]
> Do not return secret values from tasks, as this will expose secrets to the control plane.

With `env_var`, you can automatically load the secret into the environment. This is useful
with libraries that expect the secret to have a specific name:

```python
import union

@union.task(secret_requests=[union.Secret(key="my_union_api_key", env_var="UNION_API_KEY")])
def t1():
    # Authenticates the remote with UNION_API_KEY
    remote = union.UnionRemote(default_project="flytesnacks", default_domain="development")
```

### Using a secret created from a file

To use a secret created from a file in your workflow code, you must mount it as a file. To run the example code below:

1. **Development cycle > Managing secrets > Creating secrets > Creating a secret from a file** with the key `my_secret`.
2. Copy the example code below to a new file and save it as `using_secrets_file.py`.
4. Run the script with `union run --remote using_secrets_file.py main`.

```python
import union

@union.task(
    secret_requests=[
        union.Secret(key="my_file_secret", mount_requirement=union.Secret.MountType.FILE),
    ]
)
def t1():
    path_to_secret_file = union.current_context().secrets.get_secrets_file("my_file_secret")
    with open(path_to_secret_file, "r") as f:
        secret_value = f.read()
    # do something with the secret. For example, communication with an external API.
    ...
```

> [!WARNING]
> Do not return secret values from tasks, as this will expose secrets to the control plane.

> [!NOTE]
> The `get_secrets_file` method takes the secret key and returns the path to the secret file.

## Updating secrets

To update a secret, run the `union update secret` command. You will be prompted to enter a new value:

```shell
$ union update secret --project my_project --domain my_domain my_secret
```

## Deleting secrets

To delete a secret, use the `union delete secret` command:

```shell
$ union delete secret --project my_project --domain my_domain my_secret
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/managing-api-keys ===

# Managing API keys

You need to create an API key to allow external systems to run compute
on Union.ai, e.g. a GitHub action that registers or runs workflows.

## Creating an API key

To create an API key, run the following with the Union CLI with any name.

```shell
$ union create api-key admin --name my-custom-name

Client ID: my-custom-name
The following API key will only be shown once. Be sure to keep it safe!
Configure your headless CLI by setting the following environment variable:

export UNION_API_KEY="<SECRET>"
```

Store the `<SECRET>` in a secure location. For `git` development, make sure to not check in the `<SECRET>` into your repository.
Within a GitHub action, you can use [Github Secrets](https://docs.github.com/en/actions/security-guides/using-secrets-in-github-actions) to store the secret.

For this example, copy the following workflow into a file called `hello.py`:

```python
import union

@union.task
def welcome(name: str) -> str:
    return f"Welcome to Union.ai! {name}"

@union.workflow
def main(name: str) -> str:
    return welcome(name=name)
```

You can run this workflow from any machine by setting the `UNION_API_KEY`
environment variable:

```shell
$ export UNION_API_KEY="<SECRET>"
$ union run --remote hello.py main --name "Union.ai"
```

## Listing and deleting applications

You can list all your application by running:

```shell
$ union get api-key admin
```

```shell
┏━━━━━━━━━━━━━━━━┓
┃ client_id      ┃
┡━━━━━━━━━━━━━━━━┩
│ my-custom-name │
└────────────────┘
```

The `client_id` contains your custom application name and a prefix that contains your
username.

Finally, you can delete your application by running:

```shell
$ union delete api-key admin --name my-custom-name
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/accessing-aws-s3 ===

# Accessing AWS S3 buckets

Here we will take a look at how to access data on AWS S3 Buckets from Union.ai.
As a prerequisite, we assume that our AWS S3 bucket is accessible with API keys: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`.

## Creating secrets on Union.ai

First, we create secrets on Union.ai by running the following command:

```shell
$ union create secret AWS_ACCESS_KEY_ID
```

This will open a prompt where we paste in our AWS credentials:

```shell
Enter secret value: 🗝️
```

Repeat this process for all other AWS credentials, such as `AWS_SECRET_ACCESS_KEY`.

## Using secrets in a task

Next, we can use the secrets directly in a task! With AWS CLI, we create a small text file and move it to a AWS bucket

```shell
$ aws s3 mb s3://test_bucket
$ echo "Hello Union.ai" > my_file.txt
$ aws s3 cp my_file.txt s3://test_bucket/my_file.txt
```

Next, we give a task access to our AWS secrets by supplying them through `secret_requests`. For this guide, save the following snippet as `aws-s3-access.py` and run:

```python
import union

@union.task(
    secret_requests=[
        union.Secret(key="AWS_ACCESS_KEY_ID"),
        union.Secret(key="AWS_SECRET_ACCESS_KEY"),
    ],
)
def read_s3_data() -> str:
    import s3fs
    secrets = union.current_context().secrets

    s3 = s3fs.S3FileSystem(
        secret=secrets.get(key="AWS_SECRET_ACCESS_KEY"),
        key=secrets.get(key="AWS_ACCESS_KEY_ID"),
    )

    with s3.open("test_bucket/my_file.txt") as f:
        content = f.read().decode("utf-8")
    return content

@union.workflow
def main():
    read_s3_data()
```

Within the task, the secrets are available through `current_context().secrets` and passed to `s3fs`. Running the following command to execute the workflow:

```shell
$ union run --remote aws-s3-access.py main
```

## Conclusion

You can easily access your AWS S3 buckets by running `union create secret` and configuring your tasks to access the secrets!

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/task-resource-validation ===

# Task resource validation

In Union.ai, when you attempt to execute a workflow with unsatisfiable resource requests, we fail the execution immediately rather than allowing it to queue forever.

We intercept execution creation requests in executions service to validate that their resource requirements can be met and fast-fail if not. A failed validation returns a message similar to

```text
Request failed with status code 400 rpc error: code = InvalidArgument desc = no node satisfies task 'workflows.fotd.fotd_directory' resource requests
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/running-in-a-local-cluster ===

# Running in a local cluster

## Running in a local Kubernetes cluster

Ultimately you will be running your workflows in a Kubernetes cluster in Union.ai. But it can be handy to try out a workflow in a cluster on your local machine.

First, ensure that you have [Docker](https://www.docker.com/products/docker-desktop/) (or a similar OCI-compliant container engine) installed locally and that _the daemon is running_.

Then start the demo cluster using `uctl`:

```shell
$ uctl demo start
```

### Configuration

When `uctl` starts the cluster in your local container engine it also writes configuration information to the directory `~/.union/`.

Most importantly, it creates the file `~/.union/config-sandbox.yaml`. This file holds (among other things) the location of the Kubernetes cluster to which we will be deploying the workflow:

```yaml
admin:
  endpoint: localhost:30080
  authType: Pkce
  insecure: true
console:
  endpoint: http://localhost:30080
logger:
  show-source: true
  level: 0
```

Right now this file indicates that the target cluster is your local Docker instance (`localhost:30080`), but later we will change it to point to your Union.ai cluster.

Later invocations of `uctl` or `union` will need to know the location of the target cluster. This can be provided in two ways:

1. Explicitly passing the location of the config file on the command line
   * `uctl --config ~/.union/config-sandbox.yaml <command>`
   * `union --config ~/.union/config-sandbox.yaml <command>`
2. Setting the environment variable `UNION_CONFIG`to the location of the config file:
   * `export UNION_CONFIG=~/.union/config-sandbox.yaml`

> [!NOTE]
> In this guide, we assume that you have set the `UNION_CONFIG` environment variable in your shell to the location of the configuration file.

### Start the workflow

Now you can run your workflow in the local cluster simply by adding the `--remote` flag to your `union` command:

```shell
$ union run --remote \
          workflows/example.py \
          training_workflow \
          --hyperparameters '{"C": 0.1}'
```

The output supplies a URL to your workflow execution in the UI.

### Inspect the results

Navigate to the URL produced by `union run` to see your workflow in the Union.ai UI.

## Local cluster with default image

```shell
$ union run --remote my_file.py my_workflow
```

_Where `union` is configured to point to the local cluster started with `uctl demo start`._

* Task code runs in the environment of the default image in your local cluster.
* Python code is dynamically overlaid into the container at runtime.
* Only supports Python code whose dependencies are installed in the default image (see here).
* Includes a local S3.
* Supports some plugins but not all.
* Single workflow runs immediately.
* Workflow is registered to a default project.
* Useful for demos.

## Local cluster with custom image

```shell
$ union run --remote \
              --image my_cr.io/my_org/my_image:latest \
              my_file.py \
              my_workflow
```

_Where `union` is configured to point to the local cluster started with `uctl demo start`._

* Task code runs in the environment of your custom image (`my_cr.io/my_org/my_image:latest`) in your local cluster.
* Python code is dynamically overlaid into the container at runtime
* Supports any Python dependencies you wish, since you have full control of the image.
* Includes a local S3.
* Supports some plugins but not all.
* Single workflow runs immediately.
* Workflow is registered to a default project.
* Useful for advanced testing during the development cycle.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/ci-cd-deployment ===

# CI/CD deployment

So far we have covered the steps of deploying a project manually from the command line.
In many cases, you will want to automate this process through a CI/CD system.
In this section, we explain how to set up a CI/CD system to register, execute and promote workflows on Union.ai.
We will use GitHub Actions as the example CI/CD system.

## Create a Union.ai API key

An API key is registered in your Union.ai control plane to enable external systems to perform actions on your behalf.
To allow your CI/CD system to authenticate with Union.ai, create a Union.ai API key.
See [Managing API keys](./managing-api-keys) for details.

```shell
$ union create api-key admin --name my-cicd-key
```

Copy the `UNION_API_KEY` value for later use; this is the only time the secret is displayed.

## Store the secret in your CI/CD secrets store

Store the secret in your CI/CD secrets store.
In GitHub, from the repository page:

1. Select **Settings > Secrets and variables > Actions**.
2. Select the **Secrets** tab and click **New repository secret**.
3. Give a meaningful name to the secret, like `UNION_CICD_API_KEY`.
4. Paste in the string from above as the value.
5. Click **Add secret**.

## Configure your CI/CD workflow file

Create the CI/CD workflow file. For GitHub Actions, you might add `example-project/.github/workflows/deploy.yaml` similar to:

```yaml
name: Deploy

on:
  push:
    branches:
      - main

env:
  PROJECT: flytesnacks
  DOMAIN: production

jobs:
  build_and_register:
    runs-on: ubuntu-latest
    permissions:
      contents: read
      packages: write

    steps:
      - name: Checkout repository
        uses: actions/checkout@v3

      - name: Install python & uv 
        run: |
          sudo apt-get install python3
          curl -LsSf https://astral.sh/uv/install.sh | sh
      - name: Install dependencies
        run: uv sync
      - name: Register to Union
        env:
          UNION_API_KEY: ${{ secrets.CICD_API_KEY }}
        run: |
          source .venv/bin/activate
          union register --version ${{ github.sha }} -p ${{ env.PROJECT }} \
          -d ${{ env.DOMAIN }} --activate-launchplans ./launchplans
```

> [!NOTE]
> The `Register to Union` step registers the launch plans and related Flyte entities in the `launchplans` directory. It sets the project and domain, activates launch plans automatically, and pins the version to the Git commit SHA for traceability across all registered Flyte entities. See union [register](https://www.union.ai/docs/v1/union/api-reference/union-cli) for additional options.

Once this is set up, every push to the main branch in your repository will build and deploy your project to Union.ai.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/jupyter-notebooks ===

# Jupyter notebooks

Union.ai supports the development, running, and debugging of tasks and workflows in an interactive Jupyter notebook environment, which accelerates the iteration speed when building data- or machine learning-driven applications.

## Write your workflows and tasks in cells

When building tasks and workflows in a notebook, you write the code in cells as you normally would.

From those cells you can run the code locally (i.e., in the notebook itself, not on Union.ai) by clikcing the run button, as you would in any notebook.

## Enable the notebook to register workflows to Union.ai

To enable the tasks and workflows in your notebok to be easily registered and run on your Union.ai instance, you needdto set up an _interactive_ UnionRemote object and then use to invoke the remote executions:

First, in a cell, create an interactive UnionRemote  object:

```python
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote

remote = UnionRemote(
    config=Config.auto(),
    default_project="default",
    default_domain="development",
    interactive_mode_enabled=True,
)
```

The `interactive_mode_enabled` flag must be set to `True` when running in a Jupyter notebook environment, enabling interactive registration and execution of workflows.

Next, set up the execution invocation in another cell:

```python
execution = remote.execute(my_task, inputs={"name": "Joe"})
execution = remote.execute(my_wf, inputs={"name": "Anne"})
```

The interactive UnionRemote client re-registers an entity whenever it’s redefined in the notebook, including when you re-execute a cell containing the entity definition, even if the entity remains unchanged. This behavior facilitates iterative development and debugging of tasks and workflows in a Jupyter notebook.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/decks ===

# Decks

Decks lets you display customized data visualizations from within your task code.
Decks are rendered as HTML and appear right in the Union.ai UI when you run your workflow.

> [!NOTE]
> Decks is an opt-in feature; to enable it, set `enable_deck` to `True` in the task parameters.

To begin, import the dependencies:

```python
import union
from flytekit.deck.renderer import MarkdownRenderer
from sklearn.decomposition import PCA
import plotly.express as px
import plotly
```

> [!NOTE]
> The renderers are packaged separately from `flytekit` itself.
> To enable the `MarkdownRenderer` imported above
> you first have to install the package `flytekitplugins-deck-standard`
> in your local Python environment and include it in your `ImageSpec` (as shown below).

We create a new deck named `pca` and render Markdown content along with a
[PCA](https://en.wikipedia.org/wiki/Principal_component_analysis) plot.

Now, declare the required dependnecies in an `ImageSpec`:

```python
custom_image = union.ImageSpec(
    packages=[
        "flytekitplugins-deck-standard",
        "markdown",
        "pandas",
        "pillow",
        "plotly",
        "pyarrow",
        "scikit-learn",
        "ydata_profiling",
    ],
    builder="union",
)
```

Next, we define the task that will construct the figure and create the Deck:

```python
@union.task(enable_deck=True, container_image=custom_image)
def pca_plot():
    iris_df = px.data.iris()
    X = iris_df[["sepal_length", "sepal_width", "petal_length", "petal_width"]]
    pca = PCA(n_components=3)
    components = pca.fit_transform(X)
    total_var = pca.explained_variance_ratio_.sum() * 100
    fig = px.scatter_3d(
        components,
        x=0,
        y=1,
        z=2,
        color=iris_df["species"],
        title=f"Total Explained Variance: {total_var:.2f}%",
        labels={"0": "PC 1", "1": "PC 2", "2": "PC 3"},
    )
    main_deck = union.Deck("pca", MarkdownRenderer().to_html("### Principal Component Analysis"))
    main_deck.append(plotly.io.to_html(fig))
```

Note the usage of `append` to append the Plotly figure to the Markdown deck.

The following is the expected output containing the path to the `deck.html` file:

```
{"asctime": "2023-07-11 13:16:04,558", "name": "flytekit", "levelname": "INFO", "message": "pca_plot task creates flyte deck html to file:///var/folders/6f/xcgm46ds59j7g__gfxmkgdf80000gn/T/flyte-0_8qfjdd/sandbox/local_flytekit/c085853af5a175edb17b11cd338cbd61/deck.html"}
```

![Union deck plot](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/decks/flyte-deck-plot-local.webp)

Once you execute this task on the Union.ai instance, you can access the deck by going to the task view and clicking the _Deck_ button:

![Union deck button](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/decks/flyte-deck-button.png)

## Deck tabs

Each Deck has a minimum of three tabs: input, output and default.
The input and output tabs are used to render the input and output data of the task,
while the default deck can be used to creta cusom renderings such as line plots, scatter plots, Markdown text, etc.
Additionally, you can create other tabs as well.

## Deck renderers

> [!NOTE]
> The renderers are packaged separately from `flytekit` itself.
> To enable them you first have to install the package `flytekitplugins-deck-standard`
> in your local Python environment and include it in your `ImageSpec`.

### Frame profiling renderer

The frame profiling render creates a profile report from a Pandas DataFrame.

```python
import union
import pandas as pd
from flytekitplugins.deck.renderer import FrameProfilingRenderer

@union.task(enable_deck=True, container_image=custom_image)
def frame_renderer() -> None:
    df = pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})
    union.Deck("Frame Renderer", FrameProfilingRenderer().to_html(df=df))
```

![Frame renderer](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/decks/flyte-decks-frame-renderer.png)

### Top-frame renderer

The top-fram renderer renders a DataFrame as an HTML table.

```python
import union
from typing import Annotated
from flytekit.deck import TopFrameRenderer

@union.task(enable_deck=True, container_image=custom_image)
def top_frame_renderer() -> Annotated[pd.DataFrame, TopFrameRenderer(1)]:
    return pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})
```

![Top frame renderer](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/decks/flyte-decks-top-frame-renderer.png)

### Markdown renderer

The Markdown renderer converts a Markdown string into HTML.

```python
import union
from flytekit.deck import MarkdownRenderer

@union.task(enable_deck=True, container_image=custom_image)
def markdown_renderer() -> None:
    union.current_context().default_deck.append(
        MarkdownRenderer().to_html("You can install flytekit using this command: ```import flytekit```")
    )
```

![Markdown renderer](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/decks/flyte-decks-markdown-renderer.png)

### Box renderer

The box renderer groups rows of a DataFrame together into a
box-and-whisker mark to visualize their distribution.

Each box extends from the first quartile (Q1) to the third quartile (Q3).
The median (Q2) is indicated by a line within the box.
Typically, the whiskers extend to the edges of the box,
plus or minus 1.5 times the interquartile range (IQR: Q3-Q1).

```python
import union
from flytekitplugins.deck.renderer import BoxRenderer

@union.task(enable_deck=True, container_image=custom_image)
def box_renderer() -> None:
    iris_df = px.data.iris()
    union.Deck("Box Plot", BoxRenderer("sepal_length").to_html(iris_df))
```

![Box renderer](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/decks/flyte-decks-box-renderer.png)

### Image renderer

The image renderer converts a `FlyteFile` or `PIL.Image.Image` object into an HTML displayable image,
where the image data is encoded as a base64 string.

```python
import union
from flytekitplugins.deck.renderer import ImageRenderer

@union.task(enable_deck=True, container_image=custom_image)
def image_renderer(image: union.FlyteFile) -> None:
    union.Deck("Image Renderer", ImageRenderer().to_html(image_src=image))

@union.workflow
def image_renderer_wf(image: union.FlyteFile = "https://bit.ly/3KZ95q4",) -> None:
    image_renderer(image=image)
```

![Image renderer](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/decks/flyte-decks-image-renderer.png)

#### Table renderer

The table renderer converts a Pandas DataFrame into an HTML table.

```python
import union
from flytekitplugins.deck.renderer import TableRenderer

@union.task(enable_deck=True, container_image=custom_image)
def table_renderer() -> None:
    union.Deck(
        "Table Renderer",
        TableRenderer().to_html(df=pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}), table_width=50),
    )
```

![Table renderer](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/decks/flyte-decks-table-renderer.png)

### Custom renderers

You can also create your own custom renderer.
A renderer is essentially a class with a `to_html` method.
Here we create custom renderer that summarizes the data from a Pandas `DataFrame` instead of showing raw values.

```python
class DataFrameSummaryRenderer:

    def to_html(self, df: pd.DataFrame) -> str:
        assert isinstance(df, pd.DataFrame)
        return df.describe().to_html()
```

Then we can use the Annotated type to override the default renderer of the `pandas.DataFrame` type:

```python
try:
    from typing import Annotated
except ImportError:
    from typing_extensions import Annotated

@task(enable_deck=True)
def iris_data(
    sample_frac: Optional[float] = None,
    random_state: Optional[int] = None,
) -> Annotated[pd.DataFrame, DataFrameSummaryRenderer()]:
    data = px.data.iris()
    if sample_frac is not None:
        data = data.sample(frac=sample_frac, random_state=random_state)

    md_text = (
        "# Iris Dataset\n"
        "This task loads the iris dataset using the  `plotly` package."
    )
    flytekit.current_context().default_deck.append(MarkdownRenderer().to_html(md_text))
    flytekit.Deck("box plot", BoxRenderer("sepal_length").to_html(data))
    return data
```

## Streaming Decks

You can stream a Deck directly using `Deck.publish()`:

```python
import union

@task(enable_deck=True)
def t_deck():
    union.Deck.publish()
```

This will create a live deck that where you can click the refresh button and see the deck update until the task succeeds.

### Union Deck Succeed Video

📺 [Watch on YouTube](https://www.youtube.com/watch?v=LJaBP0mdFeE)

### Union Deck Fail Video

📺 [Watch on YouTube](https://www.youtube.com/watch?v=xaBF6Jlzjq0)

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/remote-management ===

# UnionRemote

The `UnionRemote` Python API supports functionality similar to that of the Union CLI, enabling you to manage Union.ai workflows, tasks, launch plans and artifacts from within your Python code.

> [!NOTE]
> The primary use case of `UnionRemote` is to automate the deployment of Union.ai entities. As such, it is intended for use within scripts *external* to actual Union.ai workflow and task code, for example CI/CD pipeline scripts.
>
> In other words: _Do not use `UnionRemote` within task code._

## Creating a `UnionRemote` object

Ensure that you have the Union SDK installed, import the `UnionRemote` class and create the object like this:

```python
import union

remote = union.UnionRemote()
```

By default, when created with a no-argument constructor, `UnionRemote` will use the prevailing configuration in the local environment to connect to Union.ai,
that is, the same configuration as would be used by the Union CLI in that environment
(see [Union CLI configuration search path](https://www.union.ai/docs/v1/union/api-reference/union-cli/page.md)).

In the default case, as with the Union CLI, all operations will be applied to the default project, `flytesnacks` and default domain, `development`.

Alternatively, you can initialize `UnionRemote` by explicitly specifying a `flytekit.configuration.Config` object with connection information to a Union.ai instance, a project, and a domain. Additionall, the constructor supports specifying a file upload location (equivalent to a default raw data prefix):

```python
import union
from flytekit.configuration import Config

remote = union.UnionRemote(
    config=Config.for_endpoint(endpoint="union.example.com"),
    default_project="my-project",
    default_domain="my-domain",
    data_upload_location="<s3|gs|abs>://my-bucket/my-prefix",
)
```

Here we use the `Config.for_endpoint` method to specify the URL to connect to.
There are other ways to configure the `Config` object.
In general, you have all the same options as you would when specifying a connection for the Union CLI using a `config.yaml` file.

### Authenticating using a client secret

In some cases, you may be running a script with `UnionRemote` in a CI/CD pipeline or via SSH, where you don't have access to a browser for the default authentication flow. In such scenarios, you can use the **Development cycle > Authentication** authentication method to establish a connection to Union.ai. After **Development cycle > Managing API keys**, you can initialize `UnionRemote` as follows:

```python
import union
from flytekit.configuration import Config, PlatformConfig

remote = union.UnionRemote(
        config=Config(
            platform=PlatformConfig(
                endpoint="union.example.com",
                insecure=False,
                client_id="<your-client-id>",  # this is the api-key name
                client_credentials_secret="<your-client-secret>",  # this is the api-key
                auth_mode="client_credentials",
            )
        ),
    )
```

For details see [the API docs for `flytekit.configuration.Config`](https://www.union.ai/docs/v1/union/api-reference/flytekit-sdk/packages/flytekit.configuration/page.md)

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/remote-management/remote-examples ===

# UnionRemote examples

## Registering and running a workflow

In the following example we register and run a workflow and retrieve its output:

```shell
├── remote.py
└── workflow
    ├── __init__.py
    └── example.py
```

The workflow code that will be registered and run on Union.ai resides in the `workflow` directory and consists of an empty `__init__.py` file and the workflow and task code in `example.py`:

```python
import os
import union

@union.task()
def create_file(message: str) -> union.FlyteFile:
    with open("data.txt", "w") as f:
        f.write(message)
    return union.FlyteFile(path="data.txt")

@union.workflow
def my_workflow(message: str) -> union.FlyteFile:
    f = create_file(message)
    return f
```

The file `remote.py` contains the `UnionRemote` logic. It is not part of the workflow code, and is meant to be run on your local machine.

```python
import union
from workflow.example import my_workflow

def run_workflow():
    remote = union.UnionRemote()
    remote.fast_register_workflow(entity=my_workflow)
    execution = remote.execute(
        entity=my_workflow,
        inputs={"message": "Hello, world!"},
        wait=True)
    output = execution.outputs["o0"]
    print(output)
    with open(output, "r") as f:
        read_lines = f.readlines()
    print(read_lines)
```

The `my_workflow` workflow and the `create_file` task is registered and run.
Once the workflow completes, the output is passed back to the `run_workflow` function and printed out.

The output is also be available via the UI, in the **Outputs** tab of the `create_file` task details view:

![Outputs](https://www.union.ai/docs/v1/union/_static/images/user-guide/development-cycle/union-remote/outputs.png)

The steps above demonstrates the simplest way of registering and running a workflow with `UnionRemote`.
For more options and details see [Reference > UnionRemote](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.remote).

## Fetching outputs

By default, `UnionRemote.execute` is non-blocking, but you can also pass in `wait=True` to make it synchronously wait for the task or workflow to complete, as we did above.

You can print out the Union.ai console URL corresponding to your execution with:

```python
print(f"Execution url: {remote.generate_console_url(execution)}")
```

And you can synchronize the state of the execution object with the remote state with the `sync()` method:

```python
synced_execution = remote.sync(execution)
print(synced_execution.inputs)  # print out the inputs
```

You can also wait for the execution after you’ve launched it and access the outputs:

```shell
completed_execution = remote.wait(execution)
print(completed_execution.outputs)  # print out the outputs
```

## Terminating all running executions for a workflow

This example shows how to terminate all running executions in a given workflow name.

```python
import union
from dataclasses import dataclass
import json
from flytekit.configuration import Config
from flytekit.models.core.execution import NodeExecutionPhase

@dataclass
class Execution:
    name: str
    link: str

SOME_LARGE_LIMIT = 5000
PHASE = NodeExecutionPhase.RUNNING
WF_NAME = "your_workflow_name"
EXECUTIONS_TO_IGNORE = ["some_execution_name_to_ignore"]
PROJECT = "your_project"
DOMAIN = "production"
ENDPOINT = "union.example.com"

remote = union.UnionRemote(
    config=Config.for_endpoint(endpoint=ENDPOINT),
    default_project=PROJECT,
    default_domain=DOMAIN,
)

executions_of_interest = []

executions = remote.recent_executions(limit=SOME_LARGE_LIMIT)

for e in executions:
    if e.closure.phase == PHASE:
        if e.spec.launch_plan.name == WF_NAME:
            if e.id.name not in EXECUTIONS_TO_IGNORE:
                execution_on_interest = Execution(name=e.id.name, link=f"https://{ENDPOINT}/console/projects/{PROJECT}/domains/{DOMAIN}/executions/{e.id.name}")
                executions_of_interest.append(execution_on_interest)
                remote.terminate(e, cause="Terminated manually via script.")

with open('terminated_executions.json', 'w') as f:
    json.dump([{'name': e.name, 'link': e.link} for e in executions_of_interest], f, indent=2)

print(f"Terminated {len(executions_of_interest)} executions.")
```

## Rerunning all failed executions of a workflow

This example shows how to identify all failed executions from a given workflow since a certain time, and re-run them with the same inputs and a pinned workflow version.

```python
import datetime
import pytz
import union
from flytekit.models.core.execution import NodeExecutionPhase

SOME_LARGE_LIMIT = 5000
WF_NAME = "your_workflow_name"
PROJECT = "your_project"
DOMAIN = "production"
ENDPOINT = "union.example.com"
VERSION = "your_target_workflow_version"

remote = union.UnionRemote(
    config=Config.for_endpoint(endpoint=ENDPOINT),
    default_project=PROJECT,
    default_domain=DOMAIN,
)

executions = remote.recent_executions(limit=SOME_LARGE_LIMIT)

failures = [
    NodeExecutionPhase.FAILED,
    NodeExecutionPhase.ABORTED,
    NodeExecutionPhase.FAILING,
]

# time of the last successful execution
date = datetime.datetime(2024, 10, 30, tzinfo=pytz.UTC)

# filter executions by name
filtered = [execution for execution in executions if execution.spec.launch_plan.name == WF_NAME]

# filter executions by phase
failed = [execution for execution in filtered if execution.closure.phase in failures]

# filter executions by time
windowed = [execution for execution in failed if execution.closure.started_at > date]

# get inputs for each execution
inputs = [remote.sync(execution).inputs for execution in windowed]

# get new workflow version entity
workflow = remote.fetch_workflow(name=WF_NAME, version=VERSION)

# execute new workflow for each failed previous execution
[remote.execute(workflow, inputs=X) for X in inputs]
```

## Filtering for executions using a `Filter`

This example shows how to use a `Filter` to only query for the executions you want.

```python
from flytekit.models import filters
import union

WF_NAME = "your_workflow_name"
LP_NAME = "your_launchplan_name"
PROJECT = "your_project"
DOMAIN = "production"
ENDPOINT = "union.example.com"

remote = union.UnionRemote.for_endpoint(ENDPOINT)

# Only query executions from your project
project_filter = filters.Filter.from_python_std(f"eq(workflow.name,{WF_NAME})")
project_executions = remote.recent_executions(project=PROJECT, domain=DOMAIN, filters=[project_filter])

# Query for the latest execution that succeeded and was between 8 and 16 minutes
latest_success = remote.recent_executions(
    limit=1,
    filters=[
        filters.Equal("launch_plan.name", LP_NAME),
        filters.Equal("phase", "SUCCEEDED"),
        filters.GreaterThan("duration", 8 * 60),
        filters.LessThan("duration", 16 * 60),
    ],
)
```

## Launch task via UnionRemote with a new version

```python
import union
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, SerializationSettings

# UnionRemote object is the main entrypoint to API
remote = union.UnionRemote(
    config=Config.for_endpoint(endpoint="flyte.example.net"),
    default_project="flytesnacks",
    default_domain="development",
)

# Get Task
task = remote.fetch_task(name="workflows.example.generate_normal_df", version="v1")

task = remote.register_task(
    entity=flyte_task,
    serialization_settings=SerializationSettings(image_config=None),
    version="v2",
)

# Run Task
execution = remote.execute(
     task, inputs={"n": 200, "mean": 0.0, "sigma": 1.0}, execution_name="task-execution", wait=True
)

# Or use execution_name_prefix to avoid repeated execution names
execution = remote.execute(
     task, inputs={"n": 200, "mean": 0.0, "sigma": 1.0}, execution_name_prefix="flyte", wait=True
)

# Inspecting execution
# The 'inputs' and 'outputs' correspond to the task execution.
input_keys = execution.inputs.keys()
output_keys = execution.outputs.keys()
```

## Launch workflow via UnionRemote

Workflows can be executed with `UnionRemote` because under the hood it fetches and triggers a default launch plan.

```python
import union
from flytekit.configuration import Config

# UnionRemote object is the main entrypoint to API
remote = union.UnionRemote(
    config=Config.for_endpoint(endpoint="flyte.example.net"),
    default_project="flytesnacks",
    default_domain="development",
)

# Fetch workflow
workflow = remote.fetch_workflow(name="workflows.example.wf", version="v1")

# Execute
execution = remote.execute(
    workflow, inputs={"mean": 1}, execution_name="workflow-execution", wait=True
)

# Or use execution_name_prefix to avoid repeated execution names
execution = remote.execute(
    workflow, inputs={"mean": 1}, execution_name_prefix="flyte", wait=True
)
```

## Launch launchplan via UnionRemote

A launch plan can be launched via UnionRemote programmatically.

```python
import union
from flytekit.configuration import Config

# UnionRemote object is the main entrypoint to API
remote = union.UnionRemote(
    config=Config.for_endpoint(endpoint="flyte.example.net"),
    default_project="flytesnacks",
    default_domain="development",
)

# Fetch launch plan
lp = remote.fetch_launch_plan(
    name="workflows.example.wf", version="v1", project="flytesnacks", domain="development"
)

# Execute
execution = remote.execute(
    lp, inputs={"mean": 1}, execution_name="lp-execution", wait=True
)

# Or use execution_name_prefix to avoid repeated execution names
execution = remote.execute(
    lp, inputs={"mean": 1}, execution_name_prefix="flyte", wait=True
)
```

## Inspecting executions

With `UnionRemote`, you can fetch the inputs and outputs of executions and inspect them.

```python
import union
from flytekit.configuration import Config

# UnionRemote object is the main entrypoint to API
remote = union.UnionRemote(
    config=Config.for_endpoint(endpoint="flyte.example.net"),
    default_project="flytesnacks",
    default_domain="development",
)

execution = remote.fetch_execution(
    name="fb22e306a0d91e1c6000", project="flytesnacks", domain="development"
)

input_keys = execution.inputs.keys()
output_keys = execution.outputs.keys()

# The inputs and outputs correspond to the top-level execution or the workflow itself.
# To fetch a specific output, say, a model file:
model_file = execution.outputs["model_file"]
with open(model_file) as f:
    ...

# You can use UnionRemote.sync() to sync the entity object's state with the remote state during the execution run.
synced_execution = remote.sync(execution, sync_nodes=True)
node_keys = synced_execution.node_executions.keys()

# node_executions will fetch all the underlying node executions recursively.
# To fetch output of a specific node execution:
node_execution_output = synced_execution.node_executions["n1"].outputs["model_file"]

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/development-cycle/streaming-execution-events ===

# Streaming execution events

Union.ai exposes a streaming API so you can receive workflow, task, and node execution phase events in real time and implement your own alerting, dashboards, or automation. This page describes the interface and shows how to consume events and react to them (for example, sending Slack alerts when a node fails or is queued too long).

## Overview

Using the Union SDK, you connect to the event stream with a `UnionRemote` instance and iterate over execution events as they occur. You choose which event types to subscribe to (workflow, task, or node executions), and you process each event (e.g., check phase, update state, call a webhook). Events are acknowledged after processing so the service can stop retransmitting them.

## Interface: `stream_execution_events`

The `UnionRemote` class provides the **`stream_execution_events`** async generator.

**Delivery and acknowledgment**

- You may receive the same event more than once, or events out of order. Your code should tolerate duplicates and reordering (for example by treating updates as idempotent or by tracking the latest phase per execution).
- The server keeps sending an event until your side has “acknowledged” it. The SDK acknowledges an event automatically when your loop moves on to the next one (when you pull the next item from the generator).
- If you raise an exception while handling an event, that event is never acknowledged. The server will send it again later. Finish handling without raising if you want that event to be considered done; or raise on purpose to force redelivery.

**Parameters**

| Parameter | Type | Description |
|-----------|------|-------------|
| `event_count` | `Optional[int]` | Number of events to receive before closing the stream. `None` means unlimited. |
| `include_workflow_executions` | `bool` | If `True`, include workflow execution events. |
| `include_task_executions` | `bool` | If `True`, include task execution events. |
| `include_node_executions` | `bool` | If `True`, include node execution events. |

## Event contents

The yielded values are protobuf messages from `flyteidl.event.cloudevents_pb2`: `CloudEventWorkflowExecution`, `CloudEventNodeExecution`, or `CloudEventTaskExecution`. Each has a similar shape:

**Common fields (all event types)**

| Field | Type | Description |
|-------|------|-------------|
| `phase` | enum | Execution phase (e.g. UNDEFINED, QUEUED, RUNNING, SUCCEEDED, FAILED, ABORTED, TIMED_OUT). Use `WorkflowExecutionPhase.enum_to_string(phase)` in Python for a string. |

**Identity**

Identity (which execution and, for node/task, which node or task) can appear as a top-level `execution_id` or under an `id` submessage, depending on event type. Typical attributes:

| Attribute | Description |
|-----------|-------------|
| `execution_id.name` | Execution name (unique per run). |
| `execution_id.project` | Project. |
| `execution_id.domain` | Domain. |
| `node_id` | Present on node (and often task) events. Either a string node id or an object with a `node_id` field. Identifies the node within the workflow. |
| `task_id` | On task events; identifies the task execution. |

**By event type**

- **CloudEventWorkflowExecution** — One of `id` or `execution_id` carries workflow execution identity (project, domain, name). No `node_id`; scope is the whole workflow.
- **CloudEventNodeExecution** — Carries workflow execution identity plus `node_id` (the node that changed phase). Use this for node-level alerting and “which node failed or is queued.”
- **CloudEventTaskExecution** — Carries workflow execution identity plus task-level identifiers (e.g. `task_id`, node association) for task-level phase changes.

In Python, use `getattr(event, "execution_id", None)` or `getattr(getattr(event, "id", None), "execution_id", None)` to resolve the execution id, and `getattr(event, "node_id", None)` or the same from `event.id` when handling node or task events. The example in this page shows this pattern.

## Phases

Execution phases follow the Flyte model (e.g., `UNDEFINED`, `QUEUED`, `RUNNING`, `SUCCEEDED`, `FAILED`, `ABORTED`, `TIMED_OUT`). In Python you can use `flytekit.models.core.execution.WorkflowExecutionPhase.enum_to_string(phase)` to get a string like `"QUEUED"` or `"FAILED"` from the raw phase field on the event.

## How this differs from LaunchPlan notifications

**LaunchPlan notifications** are a separate feature: you attach a webhook URL to a launch plan (or schedule), and the platform calls that URL when a *workflow* run reaches a **terminal** phase (e.g. `SUCCEEDED`, `FAILED`, `ABORTED`, `TIMED_OUT`). You get one HTTP request per workflow completion, with no visibility into intermediate states or into individual nodes/tasks.

| | LaunchPlan notifications | Streaming execution events |
|--|---------------------------|----------------------------|
| **Trigger** | One webhook call when the workflow finishes (terminal phase only). | Continuous stream; you receive every phase change as it happens. |
| **Phases** | Terminal only (SUCCEEDED, FAILED, etc.). No QUEUED or RUNNING. | All phases (QUEUED, RUNNING, SUCCEEDED, FAILED, etc.). |
| **Granularity** | Workflow-level only. | Workflow, task, or node level (you choose via `include_*` flags). |
| **Who runs** | Platform pushes to your URL; no long-lived process on your side. | You run a process that connects and consumes the stream; you implement alerting logic. |
| **Use case** | “Notify me when this workflow run is done.” | “Notify me when a node fails,” “alert if queued too long,” dashboards, custom automation. |

In summary, use **LaunchPlan notifications** for simpler notifications on terminal state. Use the **streaming API** when you need visibility into intermediate phases (e.g. QUEUED) or custom alerting logic, detached from task execution.

## Example: Node-level alerts (Slack)

The following example streams **node execution** events and:

1. Logs every node phase change.
2. Sends a Slack alert when a node enters **FAILED**.
3. Tracks how long nodes stay **QUEUED** and alerts when they exceed a threshold (e.g., 5 minutes).

It uses environment variables for the Union endpoint, Slack webhook, and queue threshold. The stream runs in a loop with reconnection on error.

> This pattern works best with `union >= v0.1.202`

```python
"""
Stream Flyte node execution events and alert to Slack when:
 - A node enters FAILED status
 - A node has been QUEUED longer than QUEUED_THRESHOLD_SEC (default 300s)

Usage:
  python get_node_events.py

  export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/..."
  export QUEUED_THRESHOLD_SEC=600  # Alert if queued > 10 min
  python get_node_events.py
"""

import asyncio
import json
import os
import sys
import time
import urllib.request

from union.remote import UnionRemote
from flytekit.configuration import Config
from flytekit.models.core.execution import WorkflowExecutionPhase

# Slack webhook (override via SLACK_WEBHOOK_URL or ALERT_WEBHOOK_URL env)
SLACK_WEBHOOK_URL = ""
QUEUED_THRESHOLD_SEC = int(os.getenv("QUEUED_THRESHOLD_SEC", "300"))
QUEUED_CHECK_INTERVAL_SEC = 60

# (execution_name, node_id) -> (first_seen_timestamp, project, domain)
queued_since = {}

def _post_webhook_sync(url: str, payload: dict) -> None:
    data = json.dumps(payload).encode("utf-8")
    req = urllib.request.Request(
        url, data=data,
        headers={"Content-Type": "application/json"},
        method="POST",
    )
    with urllib.request.urlopen(req, timeout=10) as resp:
        resp.read()

def _get_webhook_url():
    return os.getenv("SLACK_WEBHOOK_URL") or os.getenv("ALERT_WEBHOOK_URL") or SLACK_WEBHOOK_URL

async def alert_on_failed(*, event_line: str, execution_name: str, node_id: str | None = None):
    if "FAILED" not in event_line:
        return
    node_info = f" Node: `{node_id}`" if node_id else ""
    print(f"[ALERT] Node FAILED: execution={execution_name}{node_info} - {event_line}", file=sys.stderr)
    webhook_url = _get_webhook_url()
    if not webhook_url:
        return
    payload = {
        "text": f"Flyte node execution *FAILED*{node_info}\nExecution: `{execution_name}`\n{event_line}",
    }
    await asyncio.to_thread(_post_webhook_sync, webhook_url, payload)

async def alert_on_queued_threshold(*, execution_name: str, node_id: str | None, project: str, domain: str, queued_sec: float):
    webhook_url = _get_webhook_url()
    if not webhook_url:
        return
    node_info = f" Node: `{node_id}`" if node_id else ""
    project_domain = f"{project}/{domain}" if project or domain else "-"
    payload = {
        "text": (
            f"Flyte node *queued too long*{node_info}\n"
            f"Execution: `{execution_name}`\nProject/Domain: `{project_domain}`\n"
            f"Queued for: {queued_sec:.0f}s (threshold: {QUEUED_THRESHOLD_SEC}s)"
        ),
    }
    await asyncio.to_thread(_post_webhook_sync, webhook_url, payload)

async def _check_queued_threshold():
    while True:
        await asyncio.sleep(QUEUED_CHECK_INTERVAL_SEC)
        now = time.time()
        to_alert = []
        for key, (first_seen, project, domain) in list(queued_since.items()):
            elapsed = now - first_seen
            if elapsed >= QUEUED_THRESHOLD_SEC:
                to_alert.append((key, project, domain, elapsed))
                del queued_since[key]
        for (execution_name, node_id), project, domain, queued_sec in to_alert:
            print(
                f"[ALERT] Node queued {queued_sec:.0f}s (threshold {QUEUED_THRESHOLD_SEC}s): "
                f"{project}/{domain} execution={execution_name} node={node_id}",
                file=sys.stderr,
            )
            await alert_on_queued_threshold(
                execution_name=execution_name, node_id=node_id,
                project=project, domain=domain, queued_sec=queued_sec,
            )

async def _process_event(raw_event):
    """Process a single streamed event (node execution)."""
    event_id = getattr(raw_event, "id", None)
    exec_id = getattr(event_id, "execution_id", None) if event_id else None
    if exec_id is None:
        exec_id = getattr(raw_event, "execution_id", None)

    execution_name = getattr(exec_id, "name", None) if exec_id else "unknown"
    project = getattr(exec_id, "project", "") if exec_id else ""
    domain = getattr(exec_id, "domain", "") if exec_id else ""

    phase_str = WorkflowExecutionPhase.enum_to_string(raw_event.phase)

    node_id_raw = getattr(event_id, "node_id", None) if event_id else getattr(raw_event, "node_id", None)
    node_id_str = None
    if node_id_raw is not None:
        node_id_str = (
            node_id_raw.node_id if hasattr(node_id_raw, "node_id") else
            node_id_raw if isinstance(node_id_raw, str) else str(node_id_raw)
        )

    key = (execution_name, node_id_str)

    if phase_str == "QUEUED":
        if key not in queued_since:
            queued_since[key] = (time.time(), project, domain)
    else:
        queued_since.pop(key, None)

    event_line = f"Node execution {execution_name} in {project}-{domain} is {phase_str}"
    if node_id_str:
        event_line = f"{event_line} (node={node_id_str})"
    print(event_line)
    await alert_on_failed(event_line=event_line, execution_name=execution_name, node_id=node_id_str)

async def main():
    union_remote = UnionRemote(
        config=Config.for_endpoint(endpoint="YOUR_UNION_ENDPOINT"),
        default_project="flytesnacks",
        default_domain="development",
    )

    asyncio.create_task(_check_queued_threshold())

    while True:
        try:
            async for response in union_remote.stream_execution_events(
                event_count=None,
                include_workflow_executions=False,
                include_task_executions=False,
                include_node_executions=True,
            ):
                # response is CloudEventNodeExecution (or workflow/task depending on filters)
                await _process_event(response)

if __name__ == "__main__":
    asyncio.run(main())
```

### What the example does

- **Stream subscription**: Calls `stream_execution_events(..., include_node_executions=True)` so only node execution events are received.
- **Phase handling**: Uses `WorkflowExecutionPhase.enum_to_string(raw_event.phase)` to get phases like `QUEUED`, `RUNNING`, `FAILED`.
- **Identity**: Reads execution name, project, domain, and node id from the event (or its `id` submessage) to log and alert with context.
- **FAILED alerts**: When the phase is FAILED, it prints to stderr and POSTs a Slack-compatible payload to `SLACK_WEBHOOK_URL` or `ALERT_WEBHOOK_URL`.
- **Queued-too-long alerts**: Maintains a map of `(execution_name, node_id)` to the time they first entered QUEUED. A background task periodically checks this map and alerts (and removes the entry) when queued time exceeds `QUEUED_THRESHOLD_SEC`.

You can adapt the same pattern to workflow or task events by setting `include_workflow_executions` or `include_task_executions`, and by adjusting how you read identity and phase from the corresponding CloudEvent type.

## Summary

| Capability | Description |
|------------|-------------|
| **Stream** | `union_remote.stream_execution_events(...)` — async generator over workflow/task/node events. |
| **Filter** | `include_workflow_executions`, `include_task_executions`, `include_node_executions` control which event types you receive. |
| **Phases** | Use `WorkflowExecutionPhase.enum_to_string(phase)` for human-readable phase strings. |
| **Delivery** | At-least-once; handle duplicates and out-of-order events. Events are acknowledged when you consume the next item; raise to avoid ack and get redelivery. |
| **Alerting** | Implement your own logic (e.g., webhooks, Slack) in the async loop that processes each event. |

Using this interface, you can build custom alerting, dashboards, and automation on top of Union.ai execution events.

