# App Serving
> This bundle contains all pages in the App Serving section.
> Source: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving ===

# App Serving

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](https://www.union.ai/docs/v1/union/user-guide/core-concepts/section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

Union.ai lets you build and serve your own web apps, enabling you to build:

- **Model endpoints** with generic web frameworks like FastAPI or optimized inference frameworks like vLLM and SGLang.
- **AI inference-time** components like MCP servers, ephemeral agent memory state stores, etc.
- **Interactive dashboards** and other interfaces to interact with and visualize data and models from your workflows using frameworks like Streamlit, Gradio, Tensorboard, FastHTML, Dash, Panel, Voila, FiftyOne.
- **Flyte Connectors**, which are [light-weight, long running services](https://www.union.ai/docs/v1/union/integrations/connectors/_index) that connect to external
services like OpenAI, BigQuery, and Snowflake.
- **Any other web services** like [web hooks](https://www.union.ai/docs/v1/union/tutorials/serving/custom-webhooks/page.md) that can be implemented via web frameworks like FastAPI, Starlette.

## Example app

We will start with a simple Streamlit app. In this case we will use the default
Streamlit "Hello, World!" app.

In a local directory, create the following file:

```shell
└── app.py
```

## App declaration

The file `app.py` contains the app declaration:

```python
"""A simple Union.ai app using Streamlit"""

import union
import os

# The `ImageSpec` for the container that will run the `App`.
# `union-runtime` must be declared as a dependency,
# in addition to any other dependencies needed by the app code.
# Use Union remote Image builder to build the app container image
image = union.ImageSpec(
    name="streamlit-app",
    packages=["union-runtime>=0.1.18", "streamlit==1.51.0"],
    builder="union"
)

# The `App` declaration.
# Uses the `ImageSpec` declared above.
# In this case we do not need to supply any app code
# as we are using the built-in Streamlit `hello` app.
app = union.app.App(
    name="streamlit-hello",
    container_image=image,
    args="streamlit hello --server.port 8080",
    port=8080,
    limits=union.Resources(cpu="1", mem="1Gi"),
)
```

Here the `App` constructor is initialized with the following parameters:

* `name`: The name of the app. This name will be displayed in app listings (via CLI and UI) and used to refer to the app when deploying and stopping.
* `container_image`: The container image that will be used to for the container that will run the app. Here we use a prebuilt container provided by Union.ai that support Streamlit.
* `args`: The command that will be used within the container to start the app. The individual strings in this array will be concatenated and the invoked as a single command.
* `port`: The port of the app container from which the app will be served.
* `limits`: A `union.Resources` object defining the resource limits for the app container.
  The same object is used for the same purpose in the `@union.task` decorator in Union.ai workflows.
  See [The requests and limits settings](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources/page.md) for details.

The parameters above are the minimum needed to initialize the app.

There are a few additional available parameters that we do not use in this example (but we will cover later):

* `include`: A list of files to be added to the container at deployment time, containing the custom code that defines the specific functionality of your app.
* `inputs`: A `List` of `union.app.Input` objects. Used to provide default inputs to the app on startup.
* `requests`: A `union.Resources` object defining the resource requests for the app container. The same object is used for the same purpose in the `@union.task` decorator in Union.ai workflows (see [The requests and limits settings](https://www.union.ai/docs/v1/union/user-guide/core-concepts/tasks/task-hardware-environment/customizing-task-resources/page.md) for details).
* `min_replicas`: The minimum number of replica containers permitted for this app.
  This defines the lower bound for auto-scaling the app. The default is 0 <!-- TODO: (see [App autoscaling]() for details) -->.
* `max_replicas`: The maximum number of replica containers permitted for this app.
  This defines the upper bound for auto-scaling the app. The default is 1 <!-- TODO: (see [App autoscaling]() for details) -->.

## Deploy the app

Deploy the app with:

```shell
$ union deploy apps APP_FILE APP_NAME
```

* `APP_FILE` is the Python file that contains one or more app declarations.
* `APP_NAME` is the name of (one of) the declared apps in APP_FILE. The name of an app is the value of the `name` parameter passed into the `App` constructor.

If an app with the name `APP_NAME` does not yet exist on the system then this command creates that app and starts it.
If an app by that name already exists then this command stops the app, updates its code and restarts it.

In this case, we do the following:

```shell
$ union deploy apps app.py streamlit-hello
```

This will return output like the following:

```shell
✨ Creating Application: streamlit-demo
Created Endpoint at: https://withered--firefly--8ca31.apps.demo.hosted.unionai.cloud/
```

Click on the displayed endpoint to go to the app:

![A simple app](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/streamlit-hello.png)

## Viewing deployed apps

Go to **Apps** in the left sidebar in Union.ai to see a list of all your deployed apps:

![Apps list](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/apps-list.png)

To connect to an app click on its **Endpoint**.
To see more information about the app, click on its **Name**.
This will take you to the **App view**:

![App view](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/app-view.png)

Buttons to **Copy Endpoint** and **Start app** are available at the top of the view.

You can also view all apps deployed in your Union.ai instance from the command-line with:

```shell
$ union get apps
```

This will display the app list:

```shell
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━┳━━━━━━━━┓
┃ Name                                    ┃ Link       ┃ Status     ┃ Desired State ┃ CPU ┃ Memory ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━╇━━━━━━━━┩
│ streamlit-query-2                       │ Click Here │ Started    │ Stopped       │ 2   │ 2Gi    │
│ streamlit-demo-1                        │ Click Here │ Started    │ Started       │ 3   │ 2Gi    │
│ streamlit-query-3                       │ Click Here │ Started    │ Started       │ 2   │ 2Gi    │
│ streamlit-demo                          │ Click Here │ Unassigned │ Started       │ 2   │ 2Gi    │
└─────────────────────────────────────────┴────────────┴────────────┴───────────────┴─────┴────────┘
```

## Stopping apps

To stop an app from the command-line, perform the following command:

```shell
$ union stop apps --name APP_NAME
```

`APP_NAME` is the name of an app deployed on the Union.ai instance.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/adding-your-own-code ===

# Serving custom code

In the introductory section we saw how to define and deploy a simple Streamlit app.
The app deployed was the default hello world Streamlit example app.
In this section, we will expand on this by adding our own custom code to the app.

## Example app

We will initialize the app in `app.py` as before, but now we will add two files containing our own code, `main.py` and `utils.py`.

In a local directory, create the following files:

```shell
├── app.py
├── main.py
└── utils.py
```

## App declaration

The file `app.py` contains the app declaration:

```python
"""A Union.ai app with custom code"""

import os
import union

# The `ImageSpec` for the container that will run the `App`.
# `union-runtime` must be declared as a dependency,
# in addition to any other dependencies needed by the app code.
# Set the environment variable `REGISTRY` to be the URI for your container registry.
# If you are using `ghcr.io` as your registry, make sure the image is public.
image = union.ImageSpec(
    name="streamlit-app",
    packages=["streamlit==1.51.0", "union-runtime>=0.1.18", "pandas==2.2.3", "numpy==2.2.3"],
    builder="union"
)

# The `App` declaration.
# Uses the `ImageSpec` declared above.
# Your core logic of the app resides in the files declared
# in the `include` parameter, in this case, `main.py` and `utils.py`.
app = union.app.App(
    name="streamlit-custom-code",
    container_image=image,
    args="streamlit run main.py --server.port 8080",
    port=8080,
    include=["main.py", "utils.py"],
    limits=union.Resources(cpu="1", mem="1Gi"),
)
```

Compared to the first example we have added one more parameter:

* `include`: A list of files to be added to the container at deployment time, containing the custom code that defines the specific functionality of your app.

## Custom code

In this example we include two files containing custom logic: `main.py` and `utils.py`.

The file `main.py` contains the bulk of our custom code:

```python
"""Streamlit App that plots data"""
import streamlit as st
from utils import generate_data

all_columns = ["Apples", "Orange", "Pineapple"]
with st.container(border=True):
    columns = st.multiselect("Columns", all_columns, default=all_columns)

all_data = st.cache_data(generate_data)(columns=all_columns, seed=101)

data = all_data[columns]

tab1, tab2 = st.tabs(["Chart", "Dataframe"])
tab1.line_chart(data, height=250)
tab2.dataframe(data, height=250, use_container_width=True)
```

The file `utils.py` contains a supporting data generating function that is imported into the file above

```python
"""Function to generate sample data."""
import numpy as np
import pandas as pd

def generate_data(columns: list[str], seed: int = 42):
    rng = np.random.default_rng(seed)
    data = pd.DataFrame(rng.random(size=(20, len(columns))), columns=columns)
    return data
```

## Deploy the app

Deploy the app with:

```shell
$ union deploy apps app.py streamlit-custom-code
```

The output displays the console URL and endpoint for the Streamlit app:

```shell
✨ Deploying Application: streamlit-custom-code
🔎 Console URL:
https://<union-host-url>/org/...
[Status] Pending: OutOfDate: The Configuration is still working to reflect the latest desired
specification.
[Status] Started: Service is ready

🚀 Deployed Endpoint: https://<unique-subhost>.apps.<union-host-url>
```

Navigate to the endpoint to see the Streamlit App!

![Streamlit App](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/custom-code-streamlit.png)

## App deployment with included files

When a new app is deployed for the first time (i.e., there is no app registered with the specified `name`),
a container is spun up using the specified `container_image` and the files specified in `include` are
copied into the container. The `args` is the then executed in the container, starting the app.

If you alter the `include` code you need to re-deploy your app.
When `union deploy apps` is called using an app name that corresponds to an already existing app,
the app code is updated in the container and the app is restarted.

You can iterate on your app easily by changing your `include` code and re-deploying.

Because there is a slight performance penalty involved in copying the `include` files into the container,
you may wish to consolidate you code directly into custom-built image once you have successfully iterated to production quality.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/serving-a-model ===

# Serving a Model from a Workflow With FastAPI

In this section, we create a Union.ai app to serve a scikit-learn model created by a Union.ai workflow
using `FastAPI`.

## Example app

In this example, we first use a Union.ai workflow to train a model and output it as a Union.ai `Artifact`.
We then use a Union.ai app to serve the model using `FastAPI`.

In a local directory, create the following files:

```shell
├── app.py
├── main.py
└── train_wf.py
```

## App configuration

In the code below, we declare the resources, runtime image, and FastAPI app that
exposes a `/predict` endpoint.

```python
"""A Union.ai app that uses FastAPI to serve model created by a Union.ai workflow."""

import os
import union
import joblib
from fastapi import FastAPI

SklearnModel = union.Artifact(name="sklearn-model")

# The `ImageSpec` for the container that will run the `App`, where `union-runtime`
# must be declared as a dependency. In addition to any other dependencies needed
# by the app code. Set the environment variable `REGISTRY` to be the URI for your
# container registry. If you are using `ghcr.io` as your registry, make sure the
# image is public.
image_spec = union.ImageSpec(
    name="union-serve-sklearn-fastapi",
    packages=["union-runtime>=0.1.18", "scikit-learn==1.5.2", "joblib==1.5.1", "fastapi[standard]"],
    builder="union"
)

ml_models = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    model_file = os.getenv("SKLEARN_MODEL")
    ml_models["model"] = joblib.load(model_file)
    yield

app = FastAPI(lifespan=lifespan)

# The `App` declaration, which uses the `ImageSpec` declared above.
# Your core logic of the app resides in the files declared in the `include`
# parameter, in this case, `main.py`. Input artifacts are declared in the
# `inputs` parameter
fast_api_app = union.app.App(
    name="simple-fastapi-sklearn",
    inputs=[
        union.app.Input(
            value=SklearnModel.query(),
            download=True,
            env_var="SKLEARN_MODEL",
        )
    ],
    container_image=image_spec,
    framework_app=app,
    limits=union.Resources(cpu="1", mem="1Gi"),
    port=8082,
)

@app.get("/predict")
async def predict(x: float, y: float) -> float:
    result = ml_models["model"]([[x, y]])
    return {"result": result}

```

Note that the Artifact is provided as an `Input` to the App definition. With `download=True`,
the model is downloaded to the container's working directory. The full local path to the
model is set to `SKLEARN_MODEL` by the runtime.

During startup, the FastAPI app loads the model using the `SKLEARN_MODEL` environment
variable. Then it serves an endpoint at `/predict` that takes two float inputs and
returns a float result.

## Training workflow

The training workflow trains a random forest regression and saves it to a Union.ai
`Artifact`.

```python
"""A Union.ai workflow that trains a model."""

import os
from pathlib import Path
from typing import Annotated

import joblib
from sklearn.datasets import make_regression
from sklearn.ensemble import RandomForestRegressor

import union

# Declare the `Artifact`.
SklearnModel = union.Artifact(name="sklearn-model")

# The `ImageSpec` for the container that runs the tasks.
# Set the environment variable `REGISTRY` to be the URI for your container registry.
# If you are using `ghcr.io` as your registry, make sure the image is public.
image_spec = union.ImageSpec(
    packages=["scikit-learn==1.5.2", "joblib==1.5.1"],
    builder="union"
)

# The `task` that trains a `RandomForestRegressor` model.
@union.task(
    limits=union.Resources(cpu="2", mem="2Gi"),
    container_image=image_spec,
)
def train_model() -> Annotated[union.FlyteFile, SklearnModel]:
    """Train a RandomForestRegressor model and save it as a file."""
    X, y = make_regression(n_features=2, random_state=42)
    working_dir = Path(union.current_context().working_directory)
    model_file = working_dir / "model.joblib"

    rf = RandomForestRegressor().fit(X, y)
    joblib.dump(rf, model_file)
    return model_file
```

## Run the example

To run this example you will need to register and run the workflow first:

```shell
$ union run --remote train_wf.py train_model
```

This task trains a `RandomForestRegressor`, saves it to a file, and uploads it to
a Union.ai `Artifact`. This artifact is retrieved by the FastAPI app for
serving the model.

![scikit-learn Artifact](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/fastapi-sklearn/sklearn-artifact.png)

Once the workflow has completed, you can deploy the app:

```shell
$ union deploy apps app.py simple-fastapi-sklearn
```

The output displays the console URL and endpoint for the FastAPI App:

```shell
✨ Deploying Application: simple-fastapi-sklearn
🔎 Console URL: https://<union-host-url>/org/...
[Status] Pending: OutOfDate: The Configuration is still working to reflect the latest desired
specification.
[Status] Pending: IngressNotConfigured: Ingress has not yet been reconciled.
[Status] Pending: Uninitialized: Waiting for load balancer to be ready
[Status] Started: Service is ready

🚀 Deployed Endpoint: https://<unique-subhost>.apps.<union-host-url>
```

You can see the Swagger docs of the FastAPI endpoint, by going to `/docs`:

![scikit-learn FastAPI App](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/serving/fastapi-sklearn/sklearn-fastapi.png)

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/fast-api-auth ===

# API Key Authentication with FastAPI

In this guide, we'll deploy a FastAPI app that uses API key authentication. This
allows you to invoke the endpoint from the public internet in a secure manner.

## Define the Fast API app

First we define the `ImageSpec` for the runtime image:

```python
import os
from union import ImageSpec, Resources, Secret
from union.app import App

image_spec = ImageSpec(
    name="fastapi-with-auth-image",
    builder="union",
    packages=["union-runtime>=0.1.18", "fastapi[standard]==0.115.11", "union>=0.1.150"],
)
```

Then we define a simple FastAPI app that uses `HTTPAuthorizationCredentials` to
authenticate requests.

```python
import os

from fastapi import FastAPI, HTTPException, Security, status, Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from typing import Annotated

from union import UnionRemote

app = FastAPI()
fast_api_app = union.app.App(
    name="fastapi-with-auth",
    secrets=[
        union.Secret(key="AUTH_API_KEY", env_var="AUTH_API_KEY"),
        union.Secret(key="MY_UNION_API_KEY", env_var="UNION_API_KEY"),
    ],
    container_image=image_spec,
    framework_app=app,
    limits=union.Resources(cpu="1", mem="1Gi"),
    port=8082,
    requires_auth=False,
)

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(HTTPBearer()),
) -> HTTPAuthorizationCredentials:
    auth_api_key = os.getenv("AUTH_API_KEY")
    if credentials.credentials != AUTH_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials

@app.get("/")
def root(
    credentials: Annotated[HTTPAuthorizationCredentials, Depends(verify_token)],
):
    return {"message": "Hello, World!"}
```

As you can see, we define a `FastAPI` app and provide it as an input in the
`union.app.App` definition. Then, we define a `verify_token` function that
verifies the API key. Finally, we define a root endpoint that uses the
`verify_token` function to authenticate requests.

Note that we are also requesting for two secrets:
- The `AUTH_API_KEY` is used by the FastAPI app to authenticate the webhook.
- The `MY_UNION_API_KEY` is used to authenticate UnionRemote with Union.

With `requires_auth=False`, you can reach the endpoint without going through Union’s authentication, which is okay since we are rolling our own `AUTH_API_KEY`. Before
we can deploy the app, we create the secrets required by the application:

```bash
union create secret --name AUTH_API_KEY
```

Next, to create the MY_UNION_API_KEY secret, we need to first create a admin api-key:

```bash
union create admin-api-key --name MY_UNION_API_KEY
```
## Deploy the Fast API app

Finally, you can now deploy the FastAPI app:

```bash
union deploy apps app.py fastapi-with-auth
```

Deploying the application will stream the status to the console:

```
Image ghcr.io/.../webhook-serving:KXwIrIyoU_Decb0wgPy23A found. Skip building.
✨ Deploying Application: fastapi-webhook
🔎 Console URL: https://<union-tenant>/console/projects/thomasjpfan/domains/development/apps/fastapi-webhook
[Status] Pending: App is pending deployment
[Status] Pending: RevisionMissing: Configuration "fastapi-webhook" is waiting for a Revision to become ready.
[Status] Pending: IngressNotConfigured: Ingress has not yet been reconciled.
[Status] Pending: Uninitialized: Waiting for load balancer to be ready
[Status] Started: Service is ready
🚀 Deployed Endpoint: https://rough-meadow-97cf5.apps.<union-tenant>
```

Then to invoke the endpoint, you can use the following curl command:

```bash
curl -X GET "https://rough-meadow-97cf5.apps.<union-tenant>/" \
-H "Authorization: Bearer <MY_UNION_API_KEY>"
```

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/cache-huggingface-model ===

# Cache a HuggingFace Model as an Artifact

This guide shows you how to cache HuggingFace models as Union Artifacts.

The [`union cache model-from-hf`](https://www.union.ai/docs/v1/union/api-reference/union-cli) command allows you to automatically download and cache models from HuggingFace Hub as Union Artifacts. This is particularly useful for serving large language models (LLMs) and other AI models efficiently in production environments.

## Why Cache Models from HuggingFace?

Caching models from HuggingFace Hub as Union Artifacts provides several key benefits:

- **Faster Model Downloads**: Once cached, models load much faster since they're stored in Union's optimized blob storage.
- **Stream model weights into GPU memory**: Union's [`SGLangApp`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.app.llm) and [`VLLMApp`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.app.llm) classes also allow you to load model weights
  directly into GPU memory instead of downloading the weights to disk first, then loading to GPU memory.
- **Reliability**: Eliminates dependency on HuggingFace Hub availability during model serving.
- **Cost Efficiency**: Reduces repeated downloads and bandwidth costs from HuggingFace Hub.
- **Version Control**: Each cached model gets a unique artifact ID for reproducible deployments.
- **Sharding Support**: Large models can be automatically sharded for distributed inference.
- **Streaming**: Models can be streamed directly from blob storage to GPU memory.

## Prerequisites

Before using the `union cache model-from-hf` command, you need to set up authentication:

1. **Create a HuggingFace API Token**:
   - Go to [HuggingFace Settings](https://huggingface.co/settings/tokens)
   - Create a new token with read access
   - Store it as a Union secret:
   ```bash
   union create secret --name HUGGINGFACE_TOKEN
   ```

2. **Create a Union API Key** (optional):
   ```bash
   union create api-key admin --name MY_API_KEY
   union create secret --name MY_API_KEY
   ```

If you don't want to create a Union API key, Union tenants typically ship with
a `EAGER_API_KEY` secret, which is an internally-provision Union API key that
you can use for the purpose of caching HuggingFace models.

## Basic Example: Cache a Model As-Is

The simplest way to cache a model is to download it directly from HuggingFace without any modifications:

```bash
union cache model-from-hf Qwen/Qwen2.5-0.5B-Instruct \
    --hf-token-key HUGGINGFACE_TOKEN \
    --union-api-key EAGER_API_KEY \
    --artifact-name qwen2-5-0-5b-instruct \
    --cpu 2 \
    --mem 8Gi \
    --ephemeral-storage 10Gi \
    --wait
```

### Command Breakdown

- `Qwen/Qwen2.5-0.5B-Instruct`: The HuggingFace model repository
- `--hf-token-key HUGGINGFACE_TOKEN`: Union secret containing your HuggingFace API token
- `--union-api-key EAGER_API_KEY`: Union secret with admin permissions
- `--artifact-name qwen2-5-0-5b-instruct`: Custom name for the cached artifact.
  If not provided, the model repository name is lower-cased and `.` characters are
  replaced with `-`.
- `--cpu 2`: CPU resources for downloading the caching
- `--mem 8Gi`: Memory resources for downloading and caching
- `--ephemeral-storage 10Gi`: Temporary storage for the download process
- `--wait`: Wait for the caching process to complete

### Output

When the command runs, you'll see outputs like this:

```
🔄 Started background process to cache model from Hugging Face repo Qwen/Qwen2.5-0.5B-Instruct.
 Check the console for status at
https://acme.union.ai/console/projects/flytesnacks/domains/development/executions/a5nr2
g79xb9rtnzczqtp
```

You can then visit the URL to see the model caching workflow on the Union UI.

If you provide the `--wait` flag to the `union cache model-from-hf` command,
the command will wait for the model to be cached and then output additional
information:

```
Cached model at:
/tmp/flyte-axk70dc8/sandbox/local_flytekit/50b27158c2bb42efef8e60622a4d2b6d/model_snapshot
Model Artifact ID:
flyte://av0.2/acme/flytesnacks/development/qwen2-5-0-5b-instruct@322a60c7ba4df41621be528a053f3b1a

To deploy this model run:
union deploy model --project None --domain development
flyte://av0.2/acme/flytesnacks/development/qwen2-5-0-5b-instruct@322a60c7ba4df41621be528a053f3b1a
```

## Using Cached Models in Applications

Once you have cached a model, you can use it in your Union serving apps:

### VLLM App Example

```python
import os
from union import Artifact, Resources
from union.app.llm import VLLMApp
from flytekit.extras.accelerators import L4

# Use the cached model artifact
Model = Artifact(name="qwen2-5-0-5b-instruct")

vllm_app = VLLMApp(
    name="vllm-app-3",
    requests=Resources(cpu="12", mem="24Gi", gpu="1"),
    accelerator=L4,
    model=Model.query(),  # Query the cached artifact
    model_id="qwen2",
    scaledown_after=300,
    stream_model=True,
    port=8084,
)
```

### SGLang App Example

```python
import os
from union import Artifact, Resources
from union.app.llm import SGLangApp
from flytekit.extras.accelerators import L4

# Use the cached model artifact
Model = Artifact(name="qwen2-5-0-5b-instruct")

sglang_app = SGLangApp(
    name="sglang-app-3",
    requests=Resources(cpu="12", mem="24Gi", gpu="1"),
    accelerator=L4,
    model=Model.query(),  # Query the cached artifact
    model_id="qwen2",
    scaledown_after=300,
    stream_model=True,
    port=8000,
)
```

## Advanced Example: Sharding a Model with the vLLM Engine

For large models that require distributed inference, you can use the `--shard-config` option to automatically shard the model using the [vLLM](https://docs.vllm.ai/en/latest/) inference engine.

### Create a Shard Configuration File

Create a YAML file (e.g., `shard_config.yaml`) with the sharding parameters:

```yaml
engine: vllm
args:
  model: unsloth/Llama-3.3-70B-Instruct
  tensor_parallel_size: 4
  gpu_memory_utilization: 0.9
  extra_args:
    max_model_len: 16384
```

The `shard_config.yaml` file is a YAML file that should conform to the
[`remote.ShardConfig`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.remote)
dataclass, where the `args` field contains configuration that's forwarded to the
underlying inference engine. Currently, only the `vLLM` engine is supported for sharding, so
the `args` field should conform to the [`remote.VLLMShardArgs`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.remote) dataclass.

### Cache the Sharded Model

```bash
union cache model-from-hf unsloth/Llama-3.3-70B-Instruct \
    --hf-token-key HUGGINGFACE_TOKEN \
    --union-api-key EAGER_API_KEY \
    --artifact-name llama-3-3-70b-instruct-sharded \
    --cpu 36 \
    --gpu 4 \
    --mem 300Gi \
    --ephemeral-storage 300Gi \
    --accelerator nvidia-l40s \
    --shard-config shard_config.yaml \
    --project flytesnacks \
    --domain development \
    --wait
```

## Best Practices

When caching models without sharding

1. **Resource Sizing**: Allocate sufficient resources for the model size:
   - Small models (< 1B): 2-4 CPU, 4-8Gi memory
   - Medium models (1-7B): 4-8 CPU, 8-16Gi memory
   - Large models (7B+): 8+ CPU, 16Gi+ memory

2. **Sharding for Large Models**: Use tensor parallelism for models > 7B parameters:
   - 7-13B models: 2-4 GPUs
   - 13-70B models: 4-8 GPUs
   - 70B+ models: 8+ GPUs

3. **Storage Considerations**: Ensure sufficient ephemeral storage for the download process

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/deploy-optimized-llm-endpoints ===

# Deploy Optimized LLM Endpoints with vLLM and SGLang

This guide shows you how to deploy high-performance LLM endpoints using SGLang
and vLLM. It also shows how to use Union's optimized serving images that are
designed to reduce cold start times and provide efficient model serving
capabilities.

For information on how to cache models from HuggingFace Hub as Union Artifacts,
see the [Cache a HuggingFace Model as an Artifact](./cache-huggingface-model) guide.

## Overview

Union provides two specialized app classes for serving high-performance LLM endpoints:

- **[`SGLangApp`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.app.llm)**: uses [SGLang](https://docs.sglang.ai/), a fast serving framework for large language models and vision language models.
- **[`VLLMApp`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.app.llm)**: uses [vLLM](https://docs.vllm.ai/en/latest/), a fast and easy-to-use library for LLM inference and serving.

By default, both classes provide:

- **Reduced cold start times** through optimized image loading.
- **Fast model loading** by streaming model weights directly from blob storage to GPU memory.
- **Distributed inference** with options for shared memory and tensor parallelism.

You can also serve models with other frameworks like [FastAPI](./serving-a-model), but doing so would require more
effort to achieve high performance, whereas vLLM and SGLang provide highly performant LLM endpoints out of the box.

## Basic Example: Deploy a Non-Sharded Model

### Deploy with vLLM

Assuming that you have followed the guide to [cache models from huggingface](./cache-huggingface-model)
and have a model artifact named `qwen2-5-0-5b-instruct`, you can deploy a simple LLM endpoint with the following code:

```python
# vllm_app.py

import union
from union.app.llm import VLLMApp
from flytekit.extras.accelerators import L4

# Reference the cached model artifact
Model = union.Artifact(name="qwen2-5-0-5b-instruct")

# Deploy with default image
vllm_app = VLLMApp(
    name="vllm-app",
    requests=union.Resources(cpu="12", mem="24Gi", gpu="1"),
    accelerator=L4,
    model=Model.query(),  # Query the cached artifact
    model_id="qwen2",
    scaledown_after=300,
    stream_model=True,  # Enable streaming for faster loading
    port=8084,
    requires_auth=False,
)
```

To use the optimized image, use the `OPTIMIZED_VLLM_IMAGE` variable:

```python
from union.app.llm import OPTIMIZED_VLLM_IMAGE

vllm_app = VLLMApp(
    name="vllm-app",
    container_image=OPTIMIZED_VLLM_IMAGE,
    ...
)
```

Here we're using a single L4 GPU to serve the model and specifying `stream_model=True`
to stream the model weights directly to GPU memory.

Deploy the app:

```bash
union deploy apps vllm_app.py vllm-app
```

### Deploy with SGLang

```python
# sglang_app.py

import union
from union.app.llm import SGLangApp
from flytekit.extras.accelerators import L4

# Reference the cached model artifact
Model = union.Artifact(name="qwen2-5-0-5b-instruct")

# Deploy with default image
sglang_app = SGLangApp(
    name="sglang-app",
    requests=union.Resources(cpu="12", mem="24Gi", gpu="1"),
    accelerator=L4,
    model=Model.query(),  # Query the cached artifact
    model_id="qwen2",
    scaledown_after=300,
    stream_model=True,  # Enable streaming for faster loading
    port=8000,
    requires_auth=False,
)
```

To use the optimized image, use the `OPTIMIZED_SGLANG_IMAGE` variable:

```python
from union.app.llm import OPTIMIZED_SGLANG_IMAGE

sglang_app = SGLangApp(
    name="sglang-app",
    container_image=OPTIMIZED_SGLANG_IMAGE,
    ...
)
```

Deploy the app:

```bash
union deploy apps sglang_app.py sglang-app
```

## Custom Image Example: Deploy with Your Own Image

If you need more control over the serving environment, you can define a custom `ImageSpec`.
For vLLM apps, that would look like this:

```python
import union
from union.app.llm import VLLMApp
from flytekit.extras.accelerators import L4

# Reference the cached model artifact
Model = union.Artifact(name="qwen2-5-0-5b-instruct")

# Define custom optimized image
image = union.ImageSpec(
    name="vllm-serving-custom",
    builder="union",
    apt_packages=["build-essential"],
    packages=["union[vllm]>=0.1.189"],
    env={
        "NCCL_DEBUG": "INFO",
        "CUDA_LAUNCH_BLOCKING": "1",
    },
)

# Deploy with custom image
vllm_app = VLLMApp(
    name="vllm-app-custom",
    container_image=image,
    ...
)
```

And for SGLang apps, it would look like this:

```python
# sglang_app.py

import union
from union.app.llm import SGLangApp
from flytekit.extras.accelerators import L4

# Reference the cached model artifact
Model = union.Artifact(name="qwen2-5-0-5b-instruct")

# Define custom optimized image
image = union.ImageSpec(
    name="sglang-serving-custom",
    builder="union",
    python_version="3.12",
    apt_packages=["build-essential"],
    packages=["union[sglang]>=0.1.189"],
)

# Deploy with custom image
sglang_app = SGLangApp(
    name="sglang-app-custom",
    container_image=image,
    ...
)
```

This allows you to control the exact package versions in the image, but at the
cost of increased cold start times. This is because the Union images are optimized
with [Nydus](https://github.com/dragonflyoss/nydus), which reduces the cold start
time by streaming container image layers. This allows the container to start before
the image is fully downloaded.

## Advanced Example: Deploy a Sharded Model

For large models that require distributed inference, deploy using a sharded model artifact:

### Cache a Sharded Model

First, cache a large model with sharding (see [Cache a HuggingFace Model as an Artifact](./cache-huggingface-model#advanced-example-sharding-a-model-with-the-vllm-engine) for details).
First create a shard configuration file:

```yaml
# shard_config.yaml
engine: vllm
args:
  model: unsloth/Llama-3.3-70B-Instruct
  tensor_parallel_size: 4
  gpu_memory_utilization: 0.9
  extra_args:
    max_model_len: 16384
```

Then cache the model:

```bash
union cache model-from-hf unsloth/Llama-3.3-70B-Instruct \
    --hf-token-key HUGGINGFACE_TOKEN \
    --union-api-key EAGER_API_KEY \
    --artifact-name llama-3-3-70b-instruct-sharded \
    --cpu 36 \
    --gpu 4 \
    --mem 300Gi \
    --ephemeral-storage 300Gi \
    --accelerator nvidia-l40s \
    --shard-config shard_config.yaml \
    --project flytesnacks \
    --domain development \
    --wait
```

### Deploy with VLLMApp

Once the model is cached, you can deploy it to a vLLM app:

```python
# vllm_app_sharded.py

from flytekit.extras.accelerators import L40S
from union import Artifact, Resources
from union.app.llm import VLLMApp

# Reference the sharded model artifact
LLMArtifact = Artifact(name="llama-3-3-70b-instruct-sharded")

# Deploy sharded model with optimized configuration
vllm_app = VLLMApp(
    name="vllm-app-sharded",
    requests=Resources(
        cpu="36",
        mem="300Gi",
        gpu="4",
        ephemeral_storage="300Gi",
    ),
    accelerator=L40S,
    model=LLMArtifact.query(),
    model_id="llama3",

    # Additional arguments to pass into the vLLM engine:
    # see https://docs.vllm.ai/en/stable/serving/engine_args.html
    # or run `vllm serve --help` to see all available arguments
    extra_args=[
        "--tensor-parallel-size", "4",
        "--gpu-memory-utilization", "0.8",
        "--max-model-len", "4096",
        "--max-num-seqs", "256",
        "--enforce-eager",
    ],
    env={
        "NCCL_DEBUG": "INFO",
        "CUDA_LAUNCH_BLOCKING": "1",
        "VLLM_SKIP_P2P_CHECK": "1",
    },
    shared_memory=True,  # Enable shared memory for multi-GPU
    scaledown_after=300,
    stream_model=True,
    port=8084,
    requires_auth=False,
)
```

Then deploy the app:

```bash
union deploy apps vllm_app_sharded.py vllm-app-sharded-optimized
```

### Deploy with SGLangApp

You can also deploy the sharded model to a SGLang app:

```python
import os
from flytekit.extras.accelerators import GPUAccelerator
from union import Artifact, Resources
from union.app.llm import SGLangApp

# Reference the sharded model artifact
LLMArtifact = Artifact(name="llama-3-3-70b-instruct-sharded")

# Deploy sharded model with SGLang
sglang_app = SGLangApp(
    name="sglang-app-sharded",
    requests=Resources(
        cpu="36",
        mem="300Gi",
        gpu="4",
        ephemeral_storage="300Gi",
    ),
    accelerator=GPUAccelerator("nvidia-l40s"),
    model=LLMArtifact.query(),
    model_id="llama3",

    # Additional arguments to pass into the SGLang engine:
    # See https://docs.sglang.ai/backend/server_arguments.html for details.
    extra_args=[
        "--tensor-parallel-size", "4",
        "--mem-fraction-static", "0.8",
    ],
    env={
        "NCCL_DEBUG": "INFO",
        "CUDA_LAUNCH_BLOCKING": "1",
    },
    shared_memory=True,
    scaledown_after=300,
    stream_model=True,
    port=8084,
    requires_auth=False,
)
```

Then deploy the app:

```bash
union deploy apps sglang_app_sharded.py sglang-app-sharded-optimized
```

## Authentication via API Key

To secure your `SGLangApp`s and `VLLMApp`s with API key authentication, you can
specify a secret in the `extra_args` parameter. First, create a secret:

```bash
union secrets create --name AUTH_SECRET
```

Add the secret value to the input field and save the secret.

Then, add the secret to the `extra_args` parameter. For SGLang, do the following:

```python
from union import Secret

sglang_app = SGLangApp(
    name="sglang-app",
    ...,
    # Disable Union's platform-level authentication so you can access the
    # endpoint in the public internet
    requires_auth=False,
    secrets=[Secret(key="AUTH_SECRET", env_var="AUTH_SECRET")],
    extra_args=[
        ...,
        "--api-key", "$AUTH_SECRET",  # Use the secret in the extra_args
    ],
)
```

And similarly for vLLM, do the following:

```python
from union import Secret

vllm_app = VLLMApp(
    name="vllm-app",
    ...,
    # Disable Union's platform-level authentication so you can access the
    # endpoint in the public internet
    requires_auth=False,
    secrets=[Secret(key="AUTH_SECRET", env_var="AUTH_SECRET")],
    extra_args=[
        ...,
        "--api-key", "$AUTH_SECRET",  # Use the secret in the extra_args
    ],
)
```

## Performance Tuning

You can refer to the corresponding documentation for vLLM and SGLang for more
information on how to tune the performance of your app.

- **vLLM**: see the [optimization and tuning](https://docs.vllm.ai/en/latest/configuration/optimization.html) and [engine arguments](https://docs.vllm.ai/en/latest/configuration/engine_args.html) pages to learn about how to tune the performance of your app. You can also look at the [distributed inference and serving](https://docs.vllm.ai/en/latest/serving/distributed_serving.html) page to learn more about distributed inference.
- **SGLang**: see the [environment variables](https://docs.sglang.ai/references/environment_variables.html#performance-tuning) and [server arguments](https://docs.sglang.ai/backend/server_arguments.html) pages to learn about all of the available serving
options in SGLang.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/serving/deploying-your-connector ===

# Deploying Custom Flyte Connectors

[Flyte connectors](https://www.union.ai/docs/v1/union/user-guide/integrations/connectors/_index) allow you to extend Union's capabilities by integrating with external services.
This guide explains how to deploy custom connectors that can be used in your Flyte workflows.

## Overview

Connectors enable your workflows to interact with third-party services or systems.
Union.ai supports deploying connectors as services using the `FlyteConnectorApp` class. You can deploy connectors in two ways:

1. **Module-based deployment**: Include your connector code directly in the deployment
2. **ImageSpec-based deployment**: Use pre-built images with connectors already installed

## Prerequisites

Before deploying a connector, ensure you have:

- A Union.ai account
- Any required API keys or credentials for your connector
- Docker registry access (if using custom images)

## Connector Deployment Options

### Module-based Deployment

Module-based deployment is ideal when you want to iterate quickly on connector development. With this approach, you include your connector code directly using the `include` parameter.

```python
# app.py

from union import ImageSpec, Resources, Secret
from union.app import FlyteConnectorApp

image = ImageSpec(
    name="flyteconnector",
    packages=[
        "flytekit[connector]",
        "union",
        "union-runtime",
        "openai",  # ChatGPT connector needs openai SDK
    ],
    env={"FLYTE_SDK_LOGGING_LEVEL": "10"},
    builder="union",
)

openai_connector_app = FlyteConnectorApp(
    name="openai-connector-app",
    container_image=image,
    secrets=[Secret(key="flyte_openai_api_key")],
    limits=Resources(cpu="1", mem="1Gi"),
    include=["./chatgpt"],  # Include the connector module directory
)
```

With this approach, you organize your connector code in a module structure:

```bash
chatgpt/
├── __init__.py
├── connector.py
└── constants.py
```

The `include` parameter takes a list of files or directories to include in the deployment.

### ImageSpec-based Deployment

ImageSpec-based deployment is preferred for production environments where you have stable connector implementations. In this approach, your connector code is pre-installed in a container image.

```python
# app.py

from union import ImageSpec, Resources, Secret
from union.app import FlyteConnectorApp

image = ImageSpec(
    name="flyteconnector",
    packages=[
        "flytekit[connector]",
        "flytekitplugins-slurm",
        "union",
        "union-runtime",
    ],
    apt_packages=["build-essential", "libmagic1", "vim", "openssh-client", "ca-certificates"],
    env={"FLYTE_SDK_LOGGING_LEVEL": "10"},
    builder="union",
)

slurm_connector_app = FlyteConnectorApp(
    name="slurm-connector-app",
    container_image=image,
    secrets=[Secret(key="flyte_slurm_private_key")],
    limits=Resources(cpu="1", mem="1Gi"),
)
```

## Managing Secrets

Most connectors require credentials to authenticate with external services. Union.ai allows you to manage these securely:

```bash
# Create a secret for OpenAI API key
union create secret flyte_openai_api_key -f /etc/secrets/flyte_openai_api_key --project flytesnacks --domain development

# Create a secret for SLURM access
union create secret flyte_slurm_private_key -f /etc/secrets/flyte_slurm_private_key --project flytesnacks --domain development
```

Reference these secrets in your connector app:

```python
from union import Secret

# In your app definition
secrets=[Secret(key="flyte_openai_api_key")]
```

Inside your connector code, access these secrets using:

```python
from flytekit.extend.backend.utils import get_connector_secret

api_key = get_connector_secret(secret_key="FLYTE_OPENAI_API_KEY")
```

## Example: Creating a ChatGPT Connector

Here's how to implement a ChatGPT connector:

1. Create a connector class:

```python
# chatgpt/connector.py

import asyncio
import logging
from typing import Optional

import openai
from flyteidl.core.execution_pb2 import TaskExecution
from flytekit import FlyteContextManager
from flytekit.core.type_engine import TypeEngine
from flytekit.extend.backend.base_connector import ConnectorRegistry, Resource, SyncConnectorBase
from flytekit.extend.backend.utils import get_connector_secret
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate

from .constants import OPENAI_API_KEY, TIMEOUT_SECONDS

class ChatGPTConnector(SyncConnectorBase):
    name = "ChatGPT Connector"

    def __init__(self):
        super().__init__(task_type_name="chatgpt")

    async def do(
        self,
        task_template: TaskTemplate,
        inputs: Optional[LiteralMap] = None,
        **kwargs,
    ) -> Resource:
        ctx = FlyteContextManager.current_context()
        input_python_value = TypeEngine.literal_map_to_kwargs(ctx, inputs, {"message": str})
        message = input_python_value["message"]

        custom = task_template.custom
        custom["chatgpt_config"]["messages"] = [{"role": "user", "content": message}]
        client = openai.AsyncOpenAI(
            organization=custom["openai_organization"],
            api_key=get_connector_secret(secret_key=OPENAI_API_KEY),
        )

        logger = logging.getLogger("httpx")
        logger.setLevel(logging.WARNING)

        completion = await asyncio.wait_for(client.chat.completions.create(**custom["chatgpt_config"]), TIMEOUT_SECONDS)
        message = completion.choices[0].message.content
        outputs = {"o0": message}

        return Resource(phase=TaskExecution.SUCCEEDED, outputs=outputs)

ConnectorRegistry.register(ChatGPTConnector())
```

2. Define constants:

```python
# chatgpt/constants.py

# Constants for ChatGPT connector
TIMEOUT_SECONDS = 10
OPENAI_API_KEY = "FLYTE_OPENAI_API_KEY"
```

3. Create an `__init__.py` file:

```python
# chatgpt/__init__.py

from .connector import ChatGPTConnector

__all__ = ["ChatGPTConnector"]
```

## Using the Connector in a Workflow

After deploying your connector, you can use it in your workflows:

```python
# workflow.py

from flytekit import workflow
from flytekitplugins.openai import ChatGPTTask

chatgpt_small_job = ChatGPTTask(
    name="3.5-turbo",
    chatgpt_config={
        "model": "gpt-3.5-turbo",
        "temperature": 0.7,
    },
)

chatgpt_big_job = ChatGPTTask(
    name="gpt-4",
    chatgpt_config={
        "model": "gpt-4",
        "temperature": 0.7,
    },
)

@workflow
def wf(message: str) -> str:
    message = chatgpt_small_job(message=message)
    message = chatgpt_big_job(message=message)
    return message
```

Run the workflow:

```bash
union run --remote workflow.py wf --message "Tell me about Union.ai"
```

## Creating Your Own Connector

To create a custom connector:

1. Inherit from `SyncConnectorBase` or `AsyncConnectorBase`
2. Implement the required methods (`do` for synchronous connectors, `create`, `get`, and `delete` for asynchronous connectors)
3. Register your connector with `ConnectorRegistry.register(YourConnector())`
4. Deploy your connector using one of the methods above

## Deployment Commands

Deploy your connector app:

```bash
# Module-based deployment
union deploy apps app_module_deployment/app.py openai-connector-app

# ImageSpec-based deployment
union deploy apps app_image_spec_deployment/app.py slurm-connector-app
```

## Best Practices

1. **Security**: Never hardcode credentials; always use Union.ai secrets
2. **Error Handling**: Include robust error handling in your connector implementation
3. **Timeouts**: Set appropriate timeouts for external API calls
4. **Logging**: Implement detailed logging for debugging
5. **Testing**: Test your connector thoroughly before deploying to production

By following this guide, you can create and deploy custom connectors that extend Union.ai's capabilities to integrate with any external service or system your workflows need to interact with.

