# Integrations
> This bundle contains all pages in the Integrations section.
> Source: https://www.union.ai/docs/v1/union/integrations/

=== PAGE: https://www.union.ai/docs/v1/union/integrations ===

# Integrations

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

Union supports integration with a variety of third-party services and systems.

## Connectors

Union.ai supports [the following connectors out-of-the-box](./connectors/_index).
If you don't see the connector you need below, have a look at **Connectors**.

| Agent | Description |
|-------|-------------|
| [SageMaker connector](./connectors/sagemaker-inference-connector/_index) | Deploy models and create, as well as trigger inference endpoints on AWS SageMaker. |
| [Airflow connector](./connectors/airflow-connector/_index) | Run Airflow jobs in your workflows with the Airflow connector. |
| [BigQuery connector](./connectors/bigquery-connector/_index) | Run BigQuery jobs in your workflows with the BigQuery connector. |
| [ChatGPT connector](./connectors/chatgpt-connector/_index) | Run ChatGPT jobs in your workflows with the ChatGPT connector. |
| [Databricks connector](./connectors/databricks-connector/_index) | Run Databricks jobs in your workflows with the Databricks connector. |
| [Memory Machine Cloud connector](./connectors/mmcloud-connector/_index) | Execute tasks using the MemVerge Memory Machine Cloud connector. |
| [OpenAI Batch connector](./connectors/openai-batch-connector/_index) | Submit requests for asynchronous batch processing on OpenAI. |
| [Perian connector](./connectors/perian-connector/_index) | Execute tasks on Perian Job Platform. |
| [Sensor connector](./connectors/sensor/_index) | Run sensor jobs in your workflows with the sensor connector. |
| [Slurm connector](./connectors/slurm-connector/_index) | Run Slurm jobs in your workflows with the Slurm connector. |
| [Snowflake connector](./connectors//snowflake-connector/_index) | Run Snowflake jobs in your workflows with the Snowflake connector. |

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors ===

# Connectors

Connectors are long-running, stateless services that receive execution requests via gRPC and initiate jobs with appropriate external or internal services. Each connector service is a Kubernetes deployment that receives gRPC requests when users trigger a particular type of task. (For example, the BigQuery connector is tiggered by the invocation of a BigQuery tasks.) The connector service then initiates a job with the appropriate service.

Connectors can be run locally as long as the appropriate connection secrets are locally available, since they are spawned in-process.

Connectors are designed to be scalable and can handle large workloads efficiently, and decrease load on the core system, since they run outside it.
You can also test connectors locally without having to change the backend configuration, streamlining workflow development.

Connectors enable two key use cases:

* **Asynchronously** launching jobs on hosted platforms (e.g. Databricks or Snowflake).
* Calling external **synchronous** services, such as access control, data retrieval, or model inferencing.

This section covers all currently available connectors:

* [Airflow connector](./airflow-connector/_index)
* [BigQuery connector](./bigquery-connector/_index)
* [OpenAI ChatGPT connector](./chatgpt-connector/_index)
* [OpenAI Batch connector](./openai-batch-connector/_index)
* [Databricks connector](./databricks-connector/_index)
* [Memory Machine Cloud connector](./mmcloud-connector/_index)
* [Perian connector](./perian-connector/_index)
* [Sagemaker connector](./sagemaker-inference-connector/_index)
* [File sensor connector](./sensor/_index)
* [Slurm connector](./slurm-connector/_index)
* [Snowflake connector](./snowflake-connector/_index)
* **Connectors > DGX connector**

## Creating a new connector

If none of the existing connectors meet your needs, you can implement your own connector.

There are two types of connectors: **async** and **sync**.
* **Async connectors** enable long-running jobs that execute on an external platform over time.
  They communicate with external services that have asynchronous APIs that support `create`, `get`, and `delete` operations.
  The vast majority of connectors are async connectors.
* **Sync connectors** enable request/response services that return immediate outputs (e.g. calling an internal API to fetch data or communicating with the OpenAI API).

> [!NOTE]
> While connectors can be written in any programming language since they use a protobuf interface,
> we currently only support Python connectors.
> We may support other languages in the future.

### Async connector interface specification

To create a new async connector, extend the `AsyncConnectorBase` and implement `create`, `get`, and `delete` methods. These methods must be idempotent.

- `create`: This method is used to initiate a new job. Users have the flexibility to use gRPC, REST, or an SDK to create a job.
- `get`: This method retrieves the job resource (job ID or output literal) associated with the task, such as a BigQuery job ID or Databricks task ID.
- `delete`: Invoking this method will send a request to delete the corresponding job.

For an example implementation, see the [BigQuery connector code](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/connector.py).

### Sync connector interface specification

To create a new sync connector, extend the `SyncConnectorBase` class and implement a `do` method. This method must be idempotent.

- `do`: This method is used to execute the synchronous task, and the worker in Union.ai will be blocked until the method returns.

For an example implementation, see the [ChatGPT connector code](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-openai/flytekitplugins/openai/chatgpt/connector.py).

### Testing your connector locally

To test your connector locally, create a class for the connector task that inherits from [`AsyncConnectorExecutorMixin`](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L354). This mixin can handle both asynchronous tasks and synchronous tasks and allows Union to mimic the system's behavior in calling the connector.

For testing examples, see the **Connectors > BigQuery connector** and **Connectors > Databricks connector** documentation.

## Enabling a connector in your Union.ai deployment

To enable a connector in your Union.ai deployment, contact the Union.ai team.

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/airflow-connector ===

# Airflow connector

[Apache Airflow](https://airflow.apache.org) is a widely used open source platform for managing workflows with a robust ecosystem. Union.ai provides an Airflow plugin that allows you to run Airflow tasks as Union.ai tasks.
This allows you to use the Airflow plugin ecosystem in conjunction with Union.ai's powerful task execution and orchestration capabilities.

> [!NOTE]
> The Airflow connector does not support all [Airflow operators](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html).
> We have tested many, but if you run into issues,
> please [file a bug report](https://github.com/flyteorg/flyte/issues/new?assignees=&labels=bug%2Cuntriaged&projects=&template=bug_report.yaml&title=%5BBUG%5D+).

## Installation

To install the plugin, run the following command:

`pip install flytekitplugins-airflow`

This plugin has two components:
* **Airflow compiler:** This component compiles Airflow tasks to Union.ai tasks, so Airflow tasks can be directly used inside the Union.ai workflow.
* **Airflow connector:** This component allows you to execute Airflow tasks either locally or on a Union.ai cluster.

> [!NOTE]
> You don't need an Airflow cluster to run Airflow tasks, since flytekit will
> automatically compile Airflow tasks to Union.ai tasks and execute them on the Airflow connector.

## Example usage

For an example query, see **Connectors > Airflow connector > Page**

## Local testing

Airflow doesn't support local execution natively. However, Union.ai compiles Airflow tasks to Union.ai tasks,
which enables you to test Airflow tasks locally in flytekit's local execution mode.

> [!NOTE]
> In some cases, you will need to store credentials in your local environment when testing locally.

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/airflow-connector/airflow-connector-example-usage-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/airflow-connector/airflow-connector-example-usage-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/airflow-connector/airflow-connector-example-usage-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/bigquery-connector ===

# BigQuery connector

## Installation

To install the BigQuery connector, run the following command:

This connector is purely a spec. Since SQL is completely portable, there is no need to build a Docker container.

## Example usage

For an example query, see **Connectors > BigQuery connector > Page**

## Local testing

To test the BigQuery connector locally, create a class for the connector task that inherits from
[AsyncConnectorExecutorMixin](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L354).
This mixin can handle asynchronous tasks and allows the SDK to mimic the system's behavior in calling the connector.

For more information, see **Connectors**.

> [!NOTE]
> In some cases, you will need to store credentials in your local environment when testing locally.

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/bigquery-connector/bigquery-connector-example-usage-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/bigquery-connector/bigquery-connector-example-usage-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/bigquery-connector/bigquery-connector-example-usage-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/chatgpt-connector ===

# ChatGPT connector

## Installation

To install the ChatGPT connector, run the following command:

```shell
$ pip install flytekitplugins-openai
```

## Example usage

For an example query, see **Connectors > ChatGPT connector > Page**

## Local testing

To test the ChatGPT connector locally, create a class for the connector task that inherits from
[SyncConnectorExecutorMixin](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L304).
This mixin can handle synchronous tasks and allows the SDK to mimic the system's behavior in calling the connector.

For more information, see **Connectors**.

> [!NOTE]
> In some cases, you will need to store credentials in your local environment when testing locally.

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/chatgpt-connector/chatgpt-connector-example-usage-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/chatgpt-connector/chatgpt-connector-example-usage-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/chatgpt-connector/chatgpt-connector-example-usage-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/databricks-connector ===

# Databricks connector

Union.ai can be integrated with the [Databricks](https://www.databricks.com/) service,
enabling you to submit Spark jobs to the Databricks platform.

## Installation

The Databricks connector comes bundled with the Spark plugin. To install the Spark plugin, run the following command:

```shell
$ pip install flytekitplugins-spark
```

## Example usage

For an example query, see **Connectors > Databricks connector > Page**

## Local testing

To test the Databricks connector locally, create a class for the connector task that inherits from
[AsyncConnectorExecutorMixin](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L354).
This mixin can handle asynchronous tasks and allows the SDK to mimic the system's behavior in calling the connector.

For more information, see **Connectors**.

> [!NOTE]
> In some cases, you will need to store credentials in your local environment when testing locally.

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/databricks-connector/databricks-connector-example-usage-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/databricks-connector/databricks-connector-example-usage-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/databricks-connector/databricks-connector-example-usage-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/mmcloud-connector ===

# Memory Machine Cloud connector

[MemVerge](https://memverge.com/) [Memory Machine Cloud](https://www.mmcloud.io/) (MMCloud)—available on AWS, GCP, and AliCloud—empowers users to continuously optimize cloud resources during runtime, safely execute stateful tasks on spot instances, and monitor resource usage in real time. These capabilities make it an excellent fit for long-running batch workloads. Union.ai can be integrated with MMCloud, allowing you to execute Union.ai tasks using MMCloud.

## Installation

To install the connector, run the following command:

```shell
$ pip install flytekitplugins-mmcloud
```

To get started with Memory Machine Cloud, see the [Memory Machine Cloud user guide](https://docs.memverge.com/MMCloud/latest/User%20Guide/about).

## Example usage

For an example query, see **Connectors > Memory Machine Cloud connector > Page**

## Local testing

To test the MMCloud connector locally, create a class for the connector task that inherits from
[AsyncConnectorExecutorMixin](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L354).
This mixin can handle asynchronous tasks and allows the SDK to mimic the system's behavior in calling the connector.

For more information, see **Connectors**.

> [!NOTE]
> In some cases, you will need to store credentials in your local environment when testing locally.

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/mmcloud-connector/mmcloud-connector-example-usage-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/mmcloud-connector/mmcloud-connector-example-usage-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/mmcloud-connector/mmcloud-connector-example-usage-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/openai-batch-connector ===

# OpenAI Batch connector

The Batch API connector allows you to submit requests for asynchronous batch processing on OpenAI.
You can provide either a JSONL file or a JSON iterator, and the connector handles the upload to OpenAI,
creation of the batch, and downloading of the output and error files.

## Installation

To use the OpenAI Batch connector, run the following command:

```shell
$ pip install flytekitplugins-openai
```

## Example usage

For an example query, see **Connectors > OpenAI Batch connector > Page**

## Local testing

To test an connector locally, create a class for the connector task that inherits from
[SyncConnectorExecutorMixin](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L304)
or [AsyncConnectorExecutorMixin](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L354).
These mixins can handle synchronous and synchronous tasks, respectively, and allow the SDK to mimic the system's behavior in calling the connector.

For more information, see **Connectors**.

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/openai-batch-connector/openai-batch-connector-example-usage-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/openai-batch-connector/openai-batch-connector-example-usage-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/openai-batch-connector/openai-batch-connector-example-usage-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/perian-connector ===

# Perian connector

The Perian connector enables you to execute Union.ai tasks on the [Perian Sky Platform](https://perian.io/).
Perian allows the execution of any task on servers aggregated from multiple cloud providers.

To get started with Perian, see the [Perian documentation](https://perian.io/docs/overview) and the [Perian connector documentation](https://perian.io/docs/flyte-getting-started).

## Example usage

For an example, see **Connectors > Perian connector > Page**

## Connector setup

Consult the [PERIAN connector setup guide](https://perian.io/docs/flyte-setup-guide).

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/perian-connector/example-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/perian-connector/example-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/perian-connector/example-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/sagemaker-inference-connector ===

# SageMaker connector

The SageMaker connector allows you to deploy models, and create and trigger inference endpoints.
You can also fully remove the SageMaker deployment.

## Installation

To use the SageMaker connector, run the following command:

```shell
$ pip install flytekitplugins-awssagemaker
```

## Example usage

For an example query, see **Connectors > SageMaker connector > Page**

## Local testing

To test an connector locally, create a class for the connector task that inherits from
[SyncConnectorExecutorMixin](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L304)
or [AsyncConnectorExecutorMixin](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L354).
These mixins can handle synchronous and synchronous tasks, respectively, and allow the SDK to mimic the system's behavior in calling the connector.

For more information, see **Connectors**.

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/sagemaker-inference-connector/sagemaker-inference-connector-example-usage-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/sagemaker-inference-connector/sagemaker-inference-connector-example-usage-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/sagemaker-inference-connector/sagemaker-inference-connector-example-usage-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/sensor ===

# Sensor connector

## Example usage

For an example query, see **Connectors > Sensor connector > Page**

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/sensor/file-sensor-example-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/sensor/file-sensor-example-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/sensor/file-sensor-example-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/slurm-connector ===

# Slurm connector

## Installation

To install the Slurm connector, run the following command:

```shell
$ pip install flytekitplugins-slurm
```

## Example usage

For an example query, see **Connectors > Slurm connector > Page**

## Local testing

To test the Slurm connector locally, create a class for the connector task that inherits from
[AsyncConnectorExecutorMixin](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L354).
This mixin can handle asynchronous tasks and allows the SDK to mimic the system's behavior in calling the connector.

For more information, see **Connectors**.

> [!NOTE]
> In some cases, you will need to store credentials in your local environment when testing locally.

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/slurm-connector/slurm-connector-example-usage-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/slurm-connector/slurm-connector-example-usage-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/slurm-connector/slurm-connector-example-usage-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/snowflake-connector ===

# Snowflake connector

Union.ai can be seamlessly integrated with the [Snowflake](https://www.snowflake.com) service,
providing you with a straightforward means to query data in Snowflake.

## Installation

To use the Snowflake connector, run the following command:

```shell
$ pip install flytekitplugins-snowflake
```

## Example usage

For an example query, see **Connectors > Snowflake connector > Page**

## Local testing

To test the Snowflake connector locally, create a class for the connector task that inherits from
[AsyncConnectorExecutorMixin](https://github.com/flyteorg/flytekit/blob/1bc8302bb7a6cf4c7048a7f93627ee25fc6b88c4/flytekit/extend/backend/base_connector.py#L354).
This mixin can handle asynchronous tasks and allows the SDK to mimic the system's behavior in calling the connector.

For more information, see **Connectors**.

> [!NOTE]
> In some cases, you will need to store credentials in your local environment when testing locally.

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/snowflake-connector/snowflake-connector-example-usage-union ===

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/integrations/connectors/snowflake-connector/snowflake-connector-example-usage-union.md
**HTML**: https://www.union.ai/docs/v1/union/integrations/connectors/snowflake-connector/snowflake-connector-example-usage-union/

=== PAGE: https://www.union.ai/docs/v1/union/integrations/connectors/dgx-connector ===

# DGX connector

You can run workflows on the [NVIDIA DGX platform](https://www.nvidia.com/en-us/data-center/dgx-platform/) with the DGX connector.

## Installation

To install the DGX connector and have it enabled in your deployment, contact the Union.ai team.

## Example usage

```python
from typing import List

import union
from flytekitplugins.dgx import DGXConfig

dgx_image_spec = union.ImageSpec(
    base_image="my-image/dgx:v24",
    packages=["torch", "transformers", "accelerate", "bitsandbytes"],
    registry="my-registry",
)

DEFAULT_CHAT_TEMPLATE = """
{% for message in messages %}
{% if message['role'] == 'user' %}
{{ '<<|user|>> ' + message['content'].strip() + ' <</s>>' }}
{% elif message['role'] == 'system' %}
{{ '<<|system|>>\\n' + message['content'].strip() + '\\n<</s>>\\n\\n' }}
{% endif %}
{% endfor %}
{% if add_generation_prompt %}{{ '<|im_start|>assistant\n' }}{% endif %}
""".strip()

@union.task(container_image=dgx_image_spec, cache_version="1.0", cache=True)
def form_prompt(prompt: str, system_message: str) -> List[dict]:
    return [
        {"role": "system", "content": system_message},
        {"role": "user", "content": prompt},
    ]

@union.task(
    task_config=DGXConfig(instance="dgxa100.80g.8.norm"),
    container_image=dgx_image_spec,
)
def inference(messages: List[dict], n_variations: int) -> List[str]:
    import torch
    import transformers
    from transformers import AutoTokenizer

    print(f"gpu is available: {torch.cuda.is_available()}")

    model = "mistralai/Mixtral-8x7B-Instruct-v0.1"

    tokenizer = AutoTokenizer.from_pretrained(model)
    pipeline = transformers.pipeline(
        "text-generation",
        tokenizer=tokenizer,
        model=model,
        model_kwargs={"torch_dtype": torch.float16, "load_in_4bit": True},
    )
    print(f"{messages=}")
    prompt = pipeline.tokenizer.apply_chat_template(
        messages,
        chat_template=DEFAULT_CHAT_TEMPLATE,
        tokenize=False,
        add_generation_prompt=True,
    )
    outputs = pipeline(
        prompt,
        num_return_sequences=n_variations,
        max_new_tokens=256,
        do_sample=True,
        temperature=0.7,
        top_k=50,
        top_p=0.95,
        return_full_text=False,
    )
    print(f'generated text={outputs[0]["generated_text"]}')
    return [output["generated_text"] for output in outputs]

@union.workflow
def wf(
    prompt: str = "Explain what a Mixture of Experts is in less than 100 words.",
    n_variations: int = 8,
    system_message: str = "You are a helpful and polite bot.",
) -> List[str]:
    messages = form_prompt(prompt=prompt, system_message=system_message)
    return inference(messages=messages, n_variations=n_variations)
```

