Kevin Su

Flyte Agents: A Developer Perspective

Summary

In modern data-driven and machine-learning workflows, efficient orchestration & execution of tasks are crucial in achieving productivity & scalability

In modern data-driven and machine-learning workflows, efficient orchestration and execution of tasks are crucial in achieving productivity and scalability. To address these needs, we introduced  Flyte Agents — long-running, stateless services designed to execute tasks efficiently. With agents, you can enhance the overall efficiency and reliability of your data-driven and machine-learning pipelines, ultimately driving higher productivity levels and scaling for your projects.

In this blog post, we will provide a step-by-step guide for developing custom agents, exploring real-world use cases where the Flyte Agent emerges as a game changer, unlocking new possibilities for streamlining complex workflows and accelerating data processing tasks. Whether you are a data scientist, ML engineer, or workflow enthusiast, this blog will equip you with the knowledge and tools to harness Flyte Agent's full potential in your projects.

Agents in two different modalities

Agents enable two ways of interacting with external services: async and sync.

Async agents enable long-running jobs that execute on an external platform. These agents communicate with external services featuring asynchronous APIs that support `create`, `get`, and `delete` operations. For example, you can use an async agent to run a BigQuery or Snowflake job, or to submit your Python task to a container executor like MMCloud, Databricks, or NVIDIA DGX platform instead of Kubernetes. You can also create custom sensors to monitor specific events or observe object storages like S3.

Sync agents facilitate request-response services that deliver instantaneous outputs. These agents are invaluable for linking internal organizational services that can supply data artifacts, metadata, or serve as steps within your workflow by issuing alerts. Synchronous agents also prove beneficial for integrating with more straightforward external services that synchronously return responses. A prime example is sending Slack notifications or generating outputs from AI models like OpenAI's APIs based on provided prompts.

Creating and using custom agents

To streamline the development of these agents, we have established archetypes for implementing Agents. These archetypes can be employed to address four distinct types of tasks, each tailored to specific use cases. 

SQL tasks

TL;DR Run SQL queries on any SQL Query Service (e.g., a data warehouse)

You can use an agent to execute a SQL query. The agent will dispatch the query job to a platform like BigQuery or Snowflake and monitor its progress until completion. 

To implement a SQL query agent, you must instruct Flyte on how to create and delete the query job, retrieve its status, and subsequently return the structured dataset as output to Flyte. Flyte will rely on the `create`, `get`, and `delete` methods to manage the job's lifecycle.

Copied to clipboard!
class BigQueryAgent(AsyncAgentBase):
    def __init__(self):
        super().__init__(task_type_name="bigquery", metadata_type=TaskMetadata)

    def create(
        self,
        task_template: TaskTemplate,
        inputs: typing.Optional[LiteralMap] = None,
        **kwargs,
    ) -> TaskMetadata:
        job_id = submit_bigquery_job(inputs)
        return TaskMetadata(job_id=job_id)

    def get(self, resource_meta: TaskMetadata, **kwargs) -> Resource:
        phase, outputs = get_job_status(resource_meta.job_id)
        return Resource(phase=phase, outputs=outputs)

    def delete(self, resource_meta: TaskMetadata, **kwargs):
        cancel_remote_job(resource_meta.job_id)

Explore the BigQuery agent, which enables you to execute a query on BigQuery and convert the resulting table into a structured dataset.

Container tasks

TL;DR Run your containers or jobs on an alternate platform that allows container execution

In certain scenarios, you may want to execute your Python tasks on platforms other than Kubernetes. For instance, deploying a job on Memory Machine Cloud can dynamically optimize cloud resources and ensure security by running stateful tasks on spot instances. Training an ML model on the NVIDIA DGX platform allows you to harness its robust GPU capabilities. Databricks agent enables you to execute a Spark job on the Databricks platform with minimal configuration changes to your task.

Agents built for Container Tasks excel at creating and executing containers on remote services. Flyte's strength lies in its adept handling of containers, and with the use of ImageSpec, it simplifies the process of building container images. Prior to the introduction of agents, one of the challenges was reconciling local execution with remote execution for these dependent services. Agents streamline this local execution workflow by automatically building and deploying the necessary container to the remote service when required.

Sensor task

TL;DR Sense and observe external systems for state changes

You can utilize a sensor to monitor events in AWS SQS or Kafka, or even to check for the existence of files.

A sensor is a specialized abstraction built on top of the agent framework in Flyte. It is designed to simplify adding sensors to Flyte workflows. You only need to implement the `poke` method, which is responsible for verifying whether a specific condition has been met.

For example, if a sensor is tasked with monitoring the existence of a file or the completion status of a workflow, you can define the required logic within the `poke` function. This function should return `True` if the file exists or if the workflow has been completed.

Copied to clipboard!
class FileSensor(BaseSensor):
    def __init__(self):
        super().__init__(task_type="file_sensor")

    def poke(self, path: str) -> bool:
        fs = fsspec.filesystem(get_protocol(path))
        return fs.exists(path)

Synchronous API call task

TL;DR Invoke APIs and wait for an immediate response. 

To utilize a synchronous agent in your Flyte workflow, first identify tasks that involve making immediate API calls and receiving prompt responses. Ensure that these tasks align with the nature of synchronous operations, where the goal is for APIs to return within a short timeframe, ideally less than 5 seconds.

Tasks of this nature consistently yield immediate results and lack an API to retrieve job status. As a result, the Flyte agent will be blocked by this API call until the job is completed. For example, the OpenAI agent is a synchronous agent that calls the ChatGPT API, returning model predictions immediately.

Copied to clipboard!
class OpenAIAgent(SyncAgentBase):
    def __init__(self):
        super().__init__(task_type_name="openai")

    def do(self, task_template: TaskTemplate, inputs: LiteralMap, **kwargs) -> Resource:
        response = ask_chatgpt_question(inputs)
        return Resource(phase=SUCCEEDED, outputs={"o0": response})

Conclusion

As Flyte adoption expands, we see users pushing boundaries by running increasingly intricate and time-sensitive tasks on the platform. Some of these tasks require extremely low latency for optimal performance. By leveraging agents, Flyte effectively mitigates the overhead associated with container creation, allowing the platform to cater to a broader range of real-world use cases. 

In this blog post, you have learned how to run different tasks on an agent and how to implement your custom agent. If you're eager to contribute a new agent, refer to the "Developing agents" documentation. If you have any questions, don’t hesitate to reach out to us on Slack.

Feature