Other task types

Task types include:

  • PythonFunctionTask: This Python class represents the standard default task. It is the type that is created when you use the @fl.task decorator.
  • ContainerTask: This Python class represents a raw container. It allows you to install any image you like, giving you complete control of the task.
  • Shell tasks: Use them to execute bash scripts within Flyte.
  • Specialized plugin tasks: These include both specialized classes and specialized configurations of the PythonFunctionTask. They implement integrations with third-party systems.

PythonFunctionTask

This is the task type that is created when you add the @fl.task decorator to a Python function. It represents a Python function that will be run within a single container. For example::

@fl.task
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame

See the Python Function Task example.

This is the most common task variant and the one that, thus far, we have focused on in this documentation.

ContainerTask

This task variant represents a raw container, with no assumptions made about what is running within it. Here is an example of declaring a ContainerTask:

greeting_task = ContainerTask(
    name="echo_and_return_greeting",
    image="alpine:latest",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(name=str),
    outputs=kwtypes(greeting=str),
    command=["/bin/sh", "-c", "echo 'Hello, my name is {{.inputs.name}}.' | tee -a /var/outputs/greeting"],
)

The ContainerTask enables you to include a task in your workflow that executes arbitrary code in any language, not just Python.

In the following example, the tasks calculate an ellipse area. This name has to be unique in the entire project. Users can specify:

input_data_dir -> where inputs will be written to.

output_data_dir -> where Flyte will expect the outputs to exist.

The inputs and outputs specify the interface for the task; thus it should be an ordered dictionary of typed input and output variables.

The image field specifies the container image for the task, either as an image name or an ImageSpec. To access the file that is not included in the image, use ImageSpec to copy files or directories into container /root.

Cache can be enabled in a ContainerTask by configuring the cache settings in the TaskMetadata in the metadata parameter.

calculate_ellipse_area_haskell = ContainerTask(
    name="ellipse-area-metadata-haskell",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-haskell:v2",
    command=[
        "./calculate-ellipse-area",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
    metadata=TaskMetadata(cache=True, cache_version="1.0"),
)

calculate_ellipse_area_julia = ContainerTask(
    name="ellipse-area-metadata-julia",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-julia:v2",
    command=[
        "julia",
        "calculate-ellipse-area.jl",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
    metadata=TaskMetadata(cache=True, cache_version="1.0"),
)

@workflow
def wf(a: float, b: float):
    area_haskell, metadata_haskell = calculate_ellipse_area_haskell(a=a, b=b)
    area_julia, metadata_julia = calculate_ellipse_area_julia(a=a, b=b)

See the Container Task example.

Shell tasks

Shell tasks enable the execution of shell scripts within Flyte. To create a shell task, provide a name for it, specify the bash script to be executed, and define inputs and outputs if needed:

Example

from pathlib import Path
from typing import Tuple

import flytekit as fl
from flytekit import kwtypes
from flytekit.extras.tasks.shell import OutputLocation, ShellTask

t1 = ShellTask(
    name="task_1",
    debug=True,
    script="""
    set -ex
    echo "Hey there! Let's run some bash scripts using a shell task."
    echo "Showcasing shell tasks." >> {inputs.x}
    if grep "shell" {inputs.x}
    then
        echo "Found it!" >> {inputs.x}
    else
        echo "Not found!"
    fi
    """,
    inputs=kwtypes(x=FlyteFile),
    output_locs=[OutputLocation(var="i", var_type=FlyteFile, location="{inputs.x}")],
)


t2 = ShellTask(
    name="task_2",
    debug=True,
    script="""
    set -ex
    cp {inputs.x} {inputs.y}
    tar -zcvf {outputs.j} {inputs.y}
    """,
    inputs=kwtypes(x=FlyteFile, y=FlyteDirectory),
    output_locs=[OutputLocation(var="j", var_type=FlyteFile, location="{inputs.y}.tar.gz")],
)


t3 = ShellTask(
    name="task_3",
    debug=True,
    script="""
    set -ex
    tar -zxvf {inputs.z}
    cat {inputs.y}/$(basename {inputs.x}) | wc -m > {outputs.k}
    """,
    inputs=kwtypes(x=FlyteFile, y=FlyteDirectory, z=FlyteFile),
    output_locs=[OutputLocation(var="k", var_type=FlyteFile, location="output.txt")],
)

Here’s a breakdown of the parameters of the ShellTask:

  • The inputs parameter allows you to specify the types of inputs that the task will accept
  • The output_locs parameter is used to define the output locations, which can be FlyteFile or FlyteDirectory
  • The script parameter contains the actual bash script that will be executed ({inputs.x}, {outputs.j}, etc. will be replaced with the actual input and output values).
  • The debug parameter is helpful for debugging purposes

We define a task to instantiate FlyteFile and FlyteDirectory. A .gitkeep file is created in the FlyteDirectory as a placeholder to ensure the directory exists:

@fl.task
def create_entities() -> Tuple[fl.FlyteFile, fl.FlyteDirectory]:
    working_dir = Path(fl.current_context().working_directory)
    flytefile = working_dir / "test.txt"
    flytefile.touch()

    flytedir = working_dir / "testdata"
    flytedir.mkdir(exist_ok=True)

    flytedir_file = flytedir / ".gitkeep"
    flytedir_file.touch()
    return flytefile, flytedir

We create a workflow to define the dependencies between the tasks:

@fl.workflow
def shell_task_wf() -> fl.FlyteFile:
    x, y = create_entities()
    t1_out = t1(x=x)
    t2_out = t2(x=t1_out, y=y)
    t3_out = t3(x=x, y=y, z=t2_out)
    return t3_out

You can run the workflow locally:

if __name__ == "__main__":
    print(f"Running shell_task_wf() {shell_task_wf()}")

Specialized plugin task classes and configs

Flyte supports a wide variety of plugin tasks. Some of these are enabled as specialized task classes, others as specialized configurations of the default @fl.task (PythonFunctionTask).

They enable things like:

  • Querying external databases (AWS Athena, BigQuery, DuckDB, SQL, Snowflake, Hive).
  • Executing specialized processing right in Flyte (Spark in virtual cluster, Dask in Virtual cluster, Sagemaker, Airflow, Modin, Ray, MPI and Horovod).
  • Handing off processing to external services(AWS Batch, Spark on Databricks, Ray on external cluster).
  • Data transformation (Great Expectations, DBT, Dolt, ONNX, Pandera).
  • Data tracking and presentation (MLFlow, Papermill).

See the Integration section for examples.