Kevin Su

Faster Airflow to Flyte migration powered by Flyte Airflow Agents

Summary

We have had the privilege of seeing data teams experience the value of a unified platform for both machine learning and data pipelines.

As we work with hundreds of organizations and their machine learning teams that have migrated to Flyte, we have had the privilege of seeing data teams experience the value of a unified platform for both machine learning and data pipelines. Flyte powers this vision with features like rapid iteration, versioning, and containerization, streamlining workflows and fostering collaboration. Moreover, Flyte scales effortlessly and leverages cost-saving features like spot instances, caching, and the Flyte Agents framework, minimizing data orchestration expenses. Taken together, these features mean faster innovation, less complexity, and a single platform for all your data and machine learning requirements.

In the previous blog post, we discussed the reasons behind Porch's decision to migrate from Airflow to Flyte. In this blog post, we will provide you with a comprehensive, step-by-step guide for seamlessly migrating your workflows.

Why are migrations challenging?

Migrating poses several challenges. Obtaining business sign-offs can be difficult, especially if the process is prolonged, which can be influenced by broader macro factors and current team priorities. In our interactions with users, transitioning from Airflow to Flyte necessitates creating any missing integrations and rewriting workflows. Many users have extensive pipelines in legacy Airflow (often on version 1.10) accumulated over years of use.

Flyte's superior data awareness, dynamism in tasks and agents, and robust scalability are appealing. However, simultaneously running Flyte and Airflow during migration may pose a significant hurdle for teams.

At Union, we delved into this issue, consulting our community to understand their pain points. We discovered that most customers seek to shorten migration time for several reasons:

  • Minimize maintenance overhead from operating two systems concurrently.
  • Immediately capitalize on the new system's capabilities.
  • Mitigate the need for extensive migrations in case of dependencies on old Airflow pipelines.

Consequently, we developed the Flyte Airflow agents suite, enabling users to reuse their current Airflow tasks while harnessing the capabilities of Flyte. The migration process can then proceed based on impact assessment and a more gradual pace.

Migrating your workflows with flytekitplugins-airflow

After considering our users' migration challenges, we developed the Airflow agent to enable  the seamless integration of Airflow operators inside a Flyte workflow with minimal code rewrites. Flytekit automatically compiles Airflow tasks into Flyte tasks, which also enables you to execute Airflow tasks locally without needing to run an Airflow server. 

When executing Airflow tasks on a Flyte cluster, the tasks that interact with external systems will be run on Flyte agents, thereby eliminating the need for unnecessary pod creation. Conversely, tasks that are CPU-bound, such as shell and Python tasks, will automatically be executed within a container by Flyte. This enables you to complete a fast initial migration to using Flyte as your central, scalable and reliable orchestration platform and compute engine. Once this switchover is complete, the actual process of rewriting the pipelines can be done in a more piecemeal way, depending on impact analysis. As Flyte workflows are completely isolated from each other, gradual migration is possible.

Here is an example of using the Airflow bash operator inside a Flyte workflow without using the Flytekit shell task:

Copied to clipboard!
from airflow.operators.bash import BashOperator
from flytekit import workflow

@workflow
def airflow_wf():
    BashOperator(task_id="airflow_bash_operator", bash_command="echo hello")

The next example demonstrates executing a Spark job on GCP. Begin by using `DataprocCreateClusterOperator` to create a Dataproc cluster. Next, utilize the `DataprocSubmitSparkJobOperator` to submit your Spark job. Finally, upon job completion, remove the cluster by using `DataprocDeleteClusterOperator`.

Note: You can also run the below example locally as long as you have Google credentials stored locally.

Copied to clipboard!
from datetime import timedelta

from flytekit import workflow
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator, DataprocDeleteClusterOperator, DataprocSubmitSparkJobOperator

cluster_name = "flyte-dataproc-demo"

@workflow(on_failure=delete_cluster)
def wf():
    create_cluster = DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        image_version="2.0.27-debian10",
        storage_bucket="my-bucket",
        master_machine_type="n1-highmem-32",
        master_disk_size=1024,
        num_workers=2,
        worker_machine_type="n1-highmem-64",
        worker_disk_size=1024,
        region="us-west1",
        cluster_name=cluster_name,
        project_id="flyte",
    )

    run_spark = DataprocSubmitSparkJobOperator(
        job_name="spark_pi",
        task_id="run_spark",
        dataproc_jars=["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        main_class="org.apache.spark.examples.JavaWordCount",
        arguments=["gs://my-bucket/spark/file.txt"],
        cluster_name=cluster_name,
        region="us-west1",
        project_id="flyte",
    )

    delete_cluster = DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        project_id="flyte",
        cluster_name=cluster_name,
        region="us-west1",
        retries=3,
        retry_delay=timedelta(minutes=5),
        email_on_failure=True,
    )

    create_cluster >> run_spark >> delete_cluster

A combination of Airflow operators and Flyte tasks

The Flyte Airflow agent is designed not only to facilitate the migration of workflows for existing Airflow users but also to expand the capabilities of existing Flyte users. By leveraging Airflow operators, Flyte users gain the flexibility to incorporate a wider range of tasks into their workflows, enhancing the versatility and utility of the Flyte platform.

The workflow below sends a Slack notification (Airflow task) once model training (Flyte task) has been completed:

Copied to clipboard!
@task(container_image=image_spec)
def train():
    ...
    model_training()


@workflow
def wf():
    slack = SlackWebhookOperator(
        task_id="slack-webhook",
        slack_webhook_conn_id="slack",
        message="Training is done!",
        channel="ML team"
    )
    train() >> slack

While flytekitplugins-airflow streamlines the migration of Airflow tasks with minimal code adjustments, it's important to note that it may not support all DAG features such as trigger rules and xcom. Our approach aimed to simplify migration and modernize users' existing Airflow DAGs. However, this is not intended to replace Flyte's comprehensive DSL; transitioning to Flyte's native constructs should ultimately be the goal of every migration.

For current Flyte users, this plugin brings exciting new integrations from Airflow that immediately expand capabilities.

Conclusion

If you're an Airflow user seeking to transition to Flyte, the Flyte Airflow agent serves as a valuable tool by simplifying the migration process. This plugin allows you to seamlessly integrate familiar Airflow operators into Flyte workflows, ensuring continuity and incremental migration. Moreover, you can capitalize on the benefits offered by Flyte, such as scalability, workflow versioning, and local execution, further enhancing your workflow management experience.

We’ve tested many Airflow operators, ensuring their seamless functionality within the Flyte cluster. If you encounter any specific cases requiring adjustments, our supportive community is here to help. Just raise a GitHub issue or connect with us on Slack.

Article