Connectors

Connectors are long-running, stateless services that receive execution requests via gRPC and initiate jobs with appropriate external or internal services. Each connector service is a Kubernetes deployment that receives gRPC requests from FlytePropeller when users trigger a particular type of task.

The connector service then initiates a job with the appropriate external service. Connectors can be run locally as long as the appropriate connection secrets are locally available, since they are spawned in process.

Connectors are designed to be scalable and can handle large workloads efficiently, and decrease load on FlytePropeller, since they run outside it. You can also test connectors locally without having to change the Flyte backend configuration, streamlining workflow development.

Connectors enable two key workflows:

  • Asynchronously launching jobs on hosted platforms (e.g. Databricks or Snowflake).
  • Calling external synchronous services, such as access control, data retrieval, and model inferencing.

Using existing connectors

In this section you will find documentation on how to use existing connectors in your workflows. Alternatively, you can also create your own connector.

Creating a new connector

You can implement an connector as a Python class, test it locally, and have the Union.ai team enable it in your Union.ai deployment. Your teammates will then be able to create tasks of the corresponding task type to connect to the external service.

There are two types of connectors: async and sync.

  • Async connectors enable long-running jobs that execute on an external platform over time. They communicate with external services that have asynchronous APIs that support create, get, and delete operations. The vast majority of connectors are async connectors.
  • Sync connectors enable request/response services that return immediate outputs (e.g. calling an internal API to fetch data or communicating with the OpenAI API).

While connectors can be written in any programming language since they use a protobuf interface, we currently only support Python connectors. We may support other languages in the future.

Async connector interface specification

To create a new async connector, extend the AsyncConnectorBase and implement create, get, and delete methods. These methods must be idempotent.

  • create: This method is used to initiate a new job. Users have the flexibility to use gRPC, REST, or an SDK to create a job.
  • get: This method retrieves the job resource (job ID or output literal) associated with the task, such as a BigQuery job ID or Databricks task ID.
  • delete: Invoking this method will send a request to delete the corresponding job.

For an example implementation, see the BigQuery connector code.

Sync connector interface specification

To create a new sync connector, extend the SyncConnectorBase class and implement a do method. This method must be idempotent.

  • do: This method is used to execute the synchronous task, and the worker in Flyte will be blocked until the method returns.

For an example implementation, see the ChatGPT connector code.

Testing your connector locally

To test your connector locally, create a class for the connector task that inherits from AsyncConnectorExecutorMixin. This mixin can handle both asynchronous tasks and synchronous tasks and allows Union to mimic FlytePropeller’s behavior in calling the connector.

For testing examples, see the BigQuery connector and Databricks connector documentation.

Enabling your connector in your Union.ai deployment

After you have finished testing your connector locally, you can contact the Union.ai team to enable the connector in your Union.ai deployment to use it in production.