Connectors
Connectors are stateless, long‑running services that receive execution requests via gRPC and then submit work to external (or internal) systems. Each connector runs as its own Kubernetes deployment, and is triggered when a Flyte task of the matching type is executed. For example: when a BigQueryTask is launched, the BigQuery connector receives the request and creates a BigQuery job.
The first connector for Flyte 2, the BigQuery connector (and the matching BigQueryTask), is in development and will be available soon.
Although they normally run inside the control plane, you can also run connectors locally — as long as the required secrets/credentials are present — because connectors are just Python services that can be spawned in‑process.
Connectors are designed to scale horizontally and reduce load on the core Flyte backend because they execute outside the core system. This decoupling makes connectors efficient, resilient, and easy to iterate on. You can even test them locally without modifying backend configuration, which reduces friction during development.
Creating a new connector
If none of the existing connectors meet your needs, you can build your own.
Connectors communicate via Protobuf, so in theory they can be implemented in any language. Today, only Python connectors are supported.
Async connector interface
To implement a new async connector, extend AsyncConnector and implement the following methods — all of which must be idempotent:
| Method | Purpose |
|---|---|
create |
Launch the external job (via REST, gRPC, SDK, or other API) |
get |
Fetch current job state (return job status or output) |
delete |
Delete / cancel the external job |
To test the connector locally, the connector task should inherit from AsyncConnectorExecutorMixin. This mixin simulates how the Flyte system executes asynchronous connector tasks, making it easier to validate your connector implementation before deploying it.
from dataclasses import dataclass
from flyte.connectors import AsyncConnector, Resource, ResourceMeta
from flyteidl2.core.execution_pb2 import TaskExecution, TaskLog
from flyteidl2.core.tasks_pb2 import TaskTemplate
from google.protobuf import json_format
import typing
import httpx
@dataclass
class ModelTrainJobMeta(ResourceMeta):
job_id: str
endpoint: str
class ModelTrainingConnector(AsyncConnector):
"""
Example connector that launches a ML model training job on an external training service.
POST → launch training job
GET → poll training progress
DELETE → cancel training job
"""
name = "Model Training Connector"
task_type_name = "external_model_training"
metadata_type = ModelTrainJobMeta
async def create(
self,
task_template: TaskTemplate,
inputs: typing.Optional[typing.Dict[str, typing.Any]],
**kwargs,
) -> ModelTrainJobMeta:
"""
Submit training job via POST.
Response returns job_id we later use in get().
"""
custom = json_format.MessageToDict(task_template.custom) if task_template.custom else None
async with httpx.AsyncClient() as client:
r = await client.post(
custom["endpoint"],
json={"dataset_uri": inputs["dataset_uri"], "epochs": inputs["epochs"]},
)
r.raise_for_status()
return ModelTrainJobMeta(job_id=r.json()["job_id"], endpoint=custom["endpoint"])
async def get(self, resource_meta: ModelTrainJobMeta, **kwargs) -> Resource:
"""
Poll external API until training job finishes.
Must be safe to call repeatedly.
"""
async with httpx.AsyncClient() as client:
r = await client.get(f"{resource_meta.endpoint}/{resource_meta.job_id}")
if r.status_code != 200:
return Resource(phase=TaskExecution.RUNNING)
data = r.json()
if data["status"] == "finished":
return Resource(
phase=TaskExecution.SUCCEEDED,
log_links=[TaskLog(name="training-dashboard", uri=f"https://example-mltrain.com/train/{resource_meta.job_id}")],
outputs={"results": data["results"]},
)
return Resource(phase=TaskExecution.RUNNING)
async def delete(self, resource_meta: ModelTrainJobMeta, **kwargs):
"""
Optionally call DELETE on external API.
Safe even if job already completed.
"""
async with httpx.AsyncClient() as client:
await client.delete(f"{resource_meta.endpoint}/{resource_meta.job_id}")To actually use this connector, you must also define a task whose task_type matches the connector.
import flyte.io
from typing import Any, Dict, Optional
from flyte.extend import TaskTemplate
from flyte.connectors import AsyncConnectorExecutorMixin
from flyte.models import NativeInterface, SerializationContext
class ModelTrainTask(AsyncConnectorExecutorMixin, TaskTemplate):
_TASK_TYPE = "external_model_training"
def __init__(
self,
name: str,
endpoint: str,
**kwargs,
):
super().__init__(
name=name,
interface=NativeInterface(
inputs={"epochs": int, "dataset_uri": str},
outputs={"results": flyte.io.File},
),
task_type=self._TASK_TYPE,
**kwargs,
)
self.endpoint = endpoint
def custom_config(self, sctx: SerializationContext) -> Optional[Dict[str, Any]]:
return {"endpoint": self.endpoint}Here is an example of how to use the ModelTrainTask:
import flyte
env = flyte.TaskEnvironment(name="hello_world", resources=flyte.Resources(memory="250Mi"))
model_train_task = ModelTrainTask(
name="model_train",
endpoint="https://example-mltrain.com",
)
@env.task
def data_prep() -> str:
return "gs://my-bucket/dataset.csv"
@env.task
def train_model(epochs: int) -> flyte.io.File:
dataset_uri = data_prep()
return model_train_task(epochs=epochs, dataset_uri=dataset_uri)Build Connector Docker Image
Build the custom image when you’re ready to deploy your connector to your cluster. To build the Docker image for your connector, run the following script:
import asyncio
from flyte import Image
from flyte.extend import ImageBuildEngine
async def build_flyte_connector_image(
registry: str, name: str, builder: str = "local"
):
"""
Build the SDK default connector image, optionally overriding
the container registry and image name.
Args:
registry: e.g. "ghcr.io/my-org" or "123456789012.dkr.ecr.us-west-2.amazonaws.com".
name: e.g. "my-connector".
builder: e.g. "local" or "remote".
"""
default_image = Image.from_debian_base(registry=registry, name=name).with_pip_packages(
"flyteplugins-connectors[bigquery]", pre=True
)
await ImageBuildEngine.build(default_image, builder=builder)
if __name__ == "__main__":
print("Building connector image...")
asyncio.run(build_flyte_connector_image(registry="<YOUR_REGISTRY>", name="flyte-connectors", builder="local"))