0.1.198

union

Directory

Classes

Class Description
ActorEnvironment ActorEnvironment class.
Artifact This is a wrapper around the Flytekit Artifact class.
Cache Cache configuration for a task.
ContainerTask This is an intermediate class that represents Flyte Tasks that run a container at execution time.
Deck Deck enable users to get customizable and default visibility into their tasks.
FlyteDirectory
FlyteFile
ImageSpec This class is used to specify the docker image that will be used to run the task.
LaunchPlan Launch Plans are one of the core constructs of Flyte.
PodTemplate Custom PodTemplate specification for a Task.
Resources This class is used to specify both resource requests and resource limits.
Secret See :std:ref:cookbook:secrets for usage examples.
StructuredDataset This is the user facing StructuredDataset class.
UnionRemote Main entrypoint for programmatically accessing a Flyte remote backend.
VersionParameters Parameters used for version hash generation.

Protocols

Protocol Description
CachePolicy Base class for protocol classes.

Methods

Method Description
actor_cache() Cache function between actor executions.
current_context() Use this method to get a handle of specific parameters available in a flyte task.
map() Use to map over tasks, actors, launch plans, reference tasks and launch plans, and remote tasks and.
map_task() Wrapper that creates a map task utilizing either the existing ArrayNodeMapTask.
task() This is the core decorator to use for any task type in flytekit.
workflow() This decorator declares a function to be a Flyte workflow.

Methods

actor_cache()

def actor_cache(
    f,
)

Cache function between actor executions.

Parameter Type Description
f

current_context()

def current_context()

Use this method to get a handle of specific parameters available in a flyte task.

Usage

flytekit.current_context().logging.info(...)

Available params are documented in flytekit.core.context_manager.ExecutionParams. There are some special params, that should be available

map()

def map(
    target: typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')],
    bound_inputs: typing.Optional[typing.Dict[str, typing.Any]],
    concurrency: typing.Optional[int],
    min_successes: typing.Optional[int],
    min_success_ratio: float,
    kwargs,
)

Use to map over tasks, actors, launch plans, reference tasks and launch plans, and remote tasks and launch plans.

Parameter Type Description
target typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')] The Flyte entity of which will be mapped over
bound_inputs typing.Optional[typing.Dict[str, typing.Any]] Inputs that are bound to the array node and will not be mapped over
concurrency typing.Optional[int] If specified, this limits the number of mapped tasks than can run in parallel to the given batch size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until all inputs are processed. If set to 0, this means unbounded concurrency. If left unspecified, this means the array node will inherit parallelism from the workflow
min_successes typing.Optional[int] The minimum number of successful executions
min_success_ratio float The minimum ratio of successful executions
kwargs **kwargs

map_task()

def map_task(
    target: typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')],
    concurrency: typing.Optional[int],
    min_successes: typing.Optional[int],
    min_success_ratio: float,
    kwargs,
)

Wrapper that creates a map task utilizing either the existing ArrayNodeMapTask or the drop in replacement ArrayNode implementation

Parameter Type Description
target typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')] The Flyte entity of which will be mapped over
concurrency typing.Optional[int] If specified, this limits the number of mapped tasks than can run in parallel to the given batch size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until all inputs are processed. If set to 0, this means unbounded concurrency. If left unspecified, this means the array node will inherit parallelism from the workflow
min_successes typing.Optional[int] The minimum number of successful executions
min_success_ratio float The minimum ratio of successful executions
kwargs **kwargs

task()

def task(
    _task_function: Optional[Callable[P, FuncOut]],
    task_config: Optional[T],
    cache: Union[bool, Cache],
    retries: int,
    interruptible: Optional[bool],
    deprecated: str,
    timeout: Union[datetime.timedelta, int],
    container_image: Optional[Union[str, ImageSpec]],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    secret_requests: Optional[List[Secret]],
    execution_mode: PythonFunctionTask.ExecutionBehavior,
    node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
    task_resolver: Optional[TaskResolverMixin],
    docs: Optional[Documentation],
    disable_deck: Optional[bool],
    enable_deck: Optional[bool],
    deck_fields: Optional[Tuple[DeckField, ...]],
    pod_template: Optional['PodTemplate'],
    pod_template_name: Optional[str],
    accelerator: Optional[BaseAccelerator],
    pickle_untyped: bool,
    shared_memory: Optional[Union[L[True], str]],
    resources: Optional[Resources],
    labels: Optional[dict[str, str]],
    annotations: Optional[dict[str, str]],
    kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)
  • Strong interfaces (specified inputs and outputs)
  • Declarative
  • Independently executable
  • Unit testable

For a simple python task,

@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

For specific task types

@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.

Parameter Type Description
_task_function Optional[Callable[P, FuncOut]] This argument is implicitly passed and represents the decorated function
task_config Optional[T] This argument provides configuration for a specific task types. Please refer to the plugins documentation for the right object to use.
cache Union[bool, Cache] Boolean or Cache that indicates how caching is configured. :deprecated param cache_serialize: (deprecated - please use Cache) Boolean that indicates if identical (ie. same inputs) instances of this task should be executed in serial when caching is enabled. This means that given multiple concurrent executions over identical inputs, only a single instance executes and the rest wait to reuse the cached results. This parameter does nothing without also setting the cache parameter. :deprecated param cache_version: (deprecated - please use Cache) Cache version to use. Changes to the task signature will automatically trigger a cache miss, but you can always manually update this field as well to force a cache miss. You should also manually bump this version if the function body/business logic has changed, but the signature hasn’t. :deprecated param cache_ignore_input_vars: (deprecated - please use Cache) Input variables that should not be included when calculating hash for cache.
retries int Number of times to retry this task during a workflow execution.
interruptible Optional[bool] [Optional] Boolean that indicates that this task can be interrupted and/or scheduled on nodes with lower QoS guarantees. This will directly reduce the $/execution cost associated, at the cost of performance penalties due to potential interruptions. Requires additional Flyte platform level configuration. If no value is provided, the task will inherit this attribute from its workflow, as follows: No values set for interruptible at the task or workflow level - task is not interruptible Task has interruptible=True, but workflow has no value set - task is interruptible Workflow has interruptible=True, but task has no value set - task is interruptible Workflow has interruptible=False, but task has interruptible=True - task is interruptible Workflow has interruptible=True, but task has interruptible=False - task is not interruptible
deprecated str A string that can be used to provide a warning message for deprecated task. Absence / empty str indicates that the task is active and not deprecated
timeout Union[datetime.timedelta, int] the max amount of time for which one execution of this task should be executed for. The execution will be terminated if the runtime exceeds the given timeout (approximately).
container_image Optional[Union[str, ImageSpec]] By default the configured FLYTE_INTERNAL_IMAGE is used for every task. This directive can be used to provide an alternate image for a specific task. This is useful for the cases in which images bloat because of various dependencies and a dependency is only required for this or a set of tasks, and they vary from the default. python # Use default image name `fqn` and alter the tag to `tag-{{default.tag}}` tag of the default image # with a prefix. In this case, it is assumed that the image like # flytecookbook:tag-gitsha is published alongwith the default of flytecookbook:gitsha @task(container_image='{{.images.default.fqn}}:tag-{{images.default.tag}}') def foo(): ... # Refer to configurations to configure fqns for other images besides default. In this case it will # lookup for an image named xyz @task(container_image='{{.images.xyz.fqn}}:{{images.default.tag}}') def foo2(): ...
environment Optional[Dict[str, str]] Environment variables that should be added for this tasks execution
requests Optional[Resources] Specify compute resource requests for your task. For Pod-plugin tasks, these values will apply only to the primary container.
limits Optional[Resources] Compute limits. Specify compute resource limits for your task. For Pod-plugin tasks, these values will apply only to the primary container. For more information, please see {{< py_class_ref flytekit.Resources >}}.
secret_requests Optional[List[Secret]] Keys that can identify the secrets supplied at runtime. Ideally the secret keys should also be semi-descriptive. The key values will be available from runtime, if the backend is configured to provide secrets and if secrets are available in the configured secrets store. Possible options for secret stores are - Vault, Confidant, Kube secrets, AWS KMS etc Refer to {{< py_class_ref Secret >}} to understand how to specify the request for a secret. It may change based on the backend provider. > [!NOTE] > During local execution, the secrets will be pulled from the local environment variables with the format {GROUP}_{GROUP_VERSION}_{KEY}, where all the characters are capitalized and the prefix is not used.
execution_mode PythonFunctionTask.ExecutionBehavior This is mainly for internal use. Please ignore. It is filled in automatically.
node_dependency_hints Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]] A list of tasks, launchplans, or workflows that this task depends on. This is only for dynamic tasks/workflows, where flyte cannot automatically determine the dependencies prior to runtime. Even on dynamic tasks this is optional, but in some scenarios it will make registering the workflow easier, because it allows registration to be done the same as for static tasks/workflows. For example this is useful to run launchplans dynamically, because launchplans must be registered on flyteadmin before they can be run. Tasks and workflows do not have this requirement. python @workflow def workflow0(): ... launchplan0 = LaunchPlan.get_or_create(workflow0) # Specify node_dependency_hints so that launchplan0 will be registered on flyteadmin, despite this being a # dynamic task. @dynamic(node_dependency_hints=[launchplan0]) def launch_dynamically(): # To run a sub-launchplan it must have previously been registered on flyteadmin. return [launchplan0]*10
task_resolver Optional[TaskResolverMixin] Provide a custom task resolver.
docs Optional[Documentation] Documentation about this task
disable_deck Optional[bool] If true, this task will not output deck html file
enable_deck Optional[bool] If true, this task will output deck html file
deck_fields Optional[Tuple[DeckField, ...]] If specified and enble_deck is True, this task will output deck html file with the fields specified in the tuple
pod_template Optional['PodTemplate'] Custom PodTemplate for this task.
pod_template_name Optional[str] The name of the existing PodTemplate resource which will be used in this task.
accelerator Optional[BaseAccelerator] The accelerator to use for this task.
pickle_untyped bool Boolean that indicates if the task allows unspecified data types.
shared_memory Optional[Union[L[True], str]] If True, then shared memory will be attached to the container where the size is equal to the allocated memory. If int, then the shared memory is set to that size.
resources Optional[Resources] Specify both the request and the limit. When the value is set to a tuple or list, the first value is the request and the second value is the limit. If the value is a single value, then both the requests and limit is set to that value. For example, the Resource(cpu=("1", "2"), mem="1Gi") will set the cpu request to 1, cpu limit to 2, and mem request to 1Gi.
labels Optional[dict[str, str]] Labels to be applied to the task resource.
annotations Optional[dict[str, str]] Annotations to be applied to the task resource.
kwargs **kwargs

workflow()

def workflow(
    _workflow_function: Optional[Callable[P, FuncOut]],
    failure_policy: Optional[WorkflowFailurePolicy],
    interruptible: bool,
    on_failure: Optional[Union[WorkflowBase, Task]],
    docs: Optional[Documentation],
    pickle_untyped: bool,
    default_options: Optional[Options],
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionWorkflow], PythonFunctionWorkflow]

This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks.

Unlike a task, the function body of a workflow is evaluated at serialization-time (aka compile-time). This is because while we can determine the entire structure of a task by looking at the function’s signature, workflows need to run through the function itself because the body of the function is what expresses the workflow structure. It’s also important to note that, local execution notwithstanding, it is not evaluated again when the workflow runs on Flyte. That is, workflows should not call non-Flyte entities since they are only run once (again, this is with respect to the platform, local runs notwithstanding).

Example:

import os
import sys
import typing
from collections import OrderedDict
from unittest.mock import patch

import pytest
from typing_extensions import Annotated  # type: ignore

import flytekit.configuration
from flytekit import FlyteContextManager, StructuredDataset, kwtypes
from flytekit.configuration import Image, ImageConfig
from flytekit.core import context_manager
from flytekit.core.condition import conditional
from flytekit.core.task import task
from flytekit.core.workflow import WorkflowFailurePolicy, WorkflowMetadata, WorkflowMetadataDefaults, workflow
from flytekit.exceptions.user import FlyteValidationException, FlyteValueException, FlyteMissingReturnValueException
from flytekit.tools.translator import get_serializable
from flytekit.types.error.error import FlyteError

default_img = Image(name="default", fqn="test", tag="tag")
serialization_settings = flytekit.configuration.SerializationSettings(
    project="project",
    domain="domain",
    version="version",
    env=None,
    image_config=ImageConfig(default_image=default_img, images=[default_img]),
)

def test_metadata_values():
    with pytest.raises(FlyteValidationException):
        WorkflowMetadata(on_failure=0)

    wm = WorkflowMetadata(on_failure=WorkflowFailurePolicy.FAIL_IMMEDIATELY)
    assert wm.on_failure == WorkflowFailurePolicy.FAIL_IMMEDIATELY


def test_default_metadata_values():
    with pytest.raises(FlyteValidationException):
        WorkflowMetadataDefaults(3)

    wm = WorkflowMetadataDefaults(interruptible=False)
    assert wm.interruptible is False


def test_workflow_values():
    @task
    def t1(a: int) -> typing.NamedTuple("OutputsBC", [("t1_int_output", int), ("c", str)]):
        a = a + 2
        return a, "world-" + str(a)

    @workflow(interruptible=True, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
    def wf(a: int) -> typing.Tuple[str, str]:
        x, y = t1(a=a)
        _, v = t1(a=x)
        return y, v

    wf_spec = get_serializable(OrderedDict(), serialization_settings, wf)
    assert wf_spec.template.metadata_defaults.interruptible
    assert wf_spec.template.metadata.on_failure == 1

def test_default_values():
    @task
    def t() -> bool:
        return True

    @task
    def f() -> bool:
        return False

    @workflow
    def wf(a: bool = True) -> bool:
        return conditional("bool").if_(a.is_true()).then(t()).else_().then(f())  # type: ignore

    assert wf() is True
    assert wf(a=False) is False


def test_list_output_wf():
    @task
    def t1(a: int) -> int:
        a = a + 5
        return a

    @workflow
    def list_output_wf() -> typing.List[int]:
        v = []
        for i in range(2):
            v.append(t1(a=i))
        return v

    x = list_output_wf()
    assert x == [5, 6]


def test_sub_wf_single_named_tuple():
    nt = typing.NamedTuple("SingleNamedOutput", [("named1", int)])

    @task
    def t1(a: int) -> nt:
        a = a + 2
        return nt(a)

    @workflow
    def subwf(a: int) -> nt:
        return t1(a=a)

    @workflow
    def wf(b: int) -> nt:
        out = subwf(a=b)
        return t1(a=out.named1)

    x = wf(b=3)
    assert x == (7,)


def test_sub_wf_multi_named_tuple():
    nt = typing.NamedTuple("Multi", [("named1", int), ("named2", int)])

    @task
    def t1(a: int) -> nt:
        a = a + 2
        return nt(a, a)

    @workflow
    def subwf(a: int) -> nt:
        return t1(a=a)

    @workflow
    def wf(b: int) -> nt:
        out = subwf(a=b)
        return t1(a=out.named1)

    x = wf(b=3)
    assert x == (7, 7)


def test_sub_wf_varying_types():
    @task
    def t1l(
        a: typing.List[typing.Dict[str, typing.List[int]]],
        b: typing.Dict[str, typing.List[int]],
        c: typing.Union[typing.List[typing.Dict[str, typing.List[int]]], typing.Dict[str, typing.List[int]], int],
        d: int,
    ) -> str:
        xx = ",".join([f"{k}:{v}" for d in a for k, v in d.items()])
        yy = ",".join([f"{k}: {i}" for k, v in b.items() for i in v])
        if isinstance(c, list):
            zz = ",".join([f"{k}:{v}" for d in c for k, v in d.items()])
        elif isinstance(c, dict):
            zz = ",".join([f"{k}: {i}" for k, v in c.items() for i in v])
        else:
            zz = str(c)
        return f"First: {xx} Second: {yy} Third: {zz} Int: {d}"

    @task
    def get_int() -> int:
        return 1

    @workflow
    def subwf(
        a: typing.List[typing.Dict[str, typing.List[int]]],
        b: typing.Dict[str, typing.List[int]],
        c: typing.Union[typing.List[typing.Dict[str, typing.List[int]]], typing.Dict[str, typing.List[int]]],
        d: int,
    ) -> str:
        return t1l(a=a, b=b, c=c, d=d)

    @workflow
    def wf() -> str:
        ds = [
            {"first_map_a": [42], "first_map_b": [get_int(), 2]},
            {
                "second_map_c": [33],
                "second_map_d": [9, 99],
            },
        ]
        ll = {
            "ll_1": [get_int(), get_int(), get_int()],
            "ll_2": [4, 5, 6],
        }
        out = subwf(a=ds, b=ll, c=ds, d=get_int())
        return out

    wf.compile()
    x = wf()
    expected = (
        "First: first_map_a:[42],first_map_b:[1, 2],second_map_c:[33],second_map_d:[9, 99] "
        "Second: ll_1: 1,ll_1: 1,ll_1: 1,ll_2: 4,ll_2: 5,ll_2: 6 "
        "Third: first_map_a:[42],first_map_b:[1, 2],second_map_c:[33],second_map_d:[9, 99] "
        "Int: 1"
    )
    assert x == expected
    wf_spec = get_serializable(OrderedDict(), serialization_settings, wf)
    assert set(wf_spec.template.nodes[5].upstream_node_ids) == {"n2", "n1", "n0", "n4", "n3"}

    @workflow
    def wf() -> str:
        ds = [
            {"first_map_a": [42], "first_map_b": [get_int(), 2]},
            {
                "second_map_c": [33],
                "second_map_d": [9, 99],
            },
        ]
        ll = {
            "ll_1": [get_int(), get_int(), get_int()],
            "ll_2": [4, 5, 6],
        }
        out = subwf(a=ds, b=ll, c=ll, d=get_int())
        return out

    x = wf()
    expected = (
        "First: first_map_a:[42],first_map_b:[1, 2],second_map_c:[33],second_map_d:[9, 99] "
        "Second: ll_1: 1,ll_1: 1,ll_1: 1,ll_2: 4,ll_2: 5,ll_2: 6 "
        "Third: ll_1: 1,ll_1: 1,ll_1: 1,ll_2: 4,ll_2: 5,ll_2: 6 "
        "Int: 1"
    )
    assert x == expected


def test_unexpected_outputs():
    @task
    def t1(a: int) -> int:
        a = a + 5
        return a

    @workflow
    def no_outputs_wf():
        return t1(a=3)

    # Should raise an exception because the workflow returns something when it shouldn't
    with pytest.raises(FlyteValueException):
        no_outputs_wf()

@pytest.mark.skipif(sys.version_info < (3, 10, 10), reason="inspect module does not work correctly with Python <3.10.10. https://github.com/python/cpython/issues/102647#issuecomment-1466868212")
def test_missing_return_value():
    @task
    def t1(a: int) -> int:
        a = a + 5
        return a

    # Should raise an exception because it doesn't return something when it should
    with pytest.raises(FlyteMissingReturnValueException):

        @workflow
        def one_output_wf() -> int:  # type: ignore
            t1(a=3)

        one_output_wf()


def test_custom_wrapper():
    def our_task(
            _task_function: typing.Optional[typing.Callable] = None,
            **kwargs,
    ):
        def wrapped(_func: typing.Callable):
            return task(_task_function=_func)

        if _task_function:
            return wrapped(_task_function)
        else:
            return wrapped

    @our_task(
        foo={
            "bar1": lambda x: print(x),
            "bar2": lambda x: print(x),
        },
    )
    def missing_func_body() -> str:
        return "foo"


def test_wf_no_output():
    @task
    def t1(a: int) -> int:
        a = a + 5
        return a

    @workflow
    def no_outputs_wf():
        t1(a=3)

    assert no_outputs_wf() is None


def test_wf_nested_comp(exec_prefix):
    @task
    def t1(a: int) -> int:
        a = a + 5
        return a

    @workflow
    def outer() -> typing.Tuple[int, int]:
        # You should not do this. This is just here for testing.
        @workflow
        def wf2() -> int:
            return t1(a=5)

        return t1(a=3), wf2()

    assert (8, 10) == outer()
    entity_mapping = OrderedDict()

    model_wf = get_serializable(entity_mapping, serialization_settings, outer)

    assert len(model_wf.template.interface.outputs) == 2
    assert len(model_wf.template.nodes) == 2
    assert model_wf.template.nodes[1].workflow_node is not None

    sub_wf = model_wf.sub_workflows[0]
    assert len(sub_wf.nodes) == 1
    assert sub_wf.nodes[0].id == "n0"
    assert sub_wf.nodes[0].task_node.reference_id.name == f"{exec_prefix}tests.flytekit.unit.core.test_workflows.t1"


@task
def add_5(a: int) -> int:
    a = a + 5
    return a


@workflow
def simple_wf() -> int:
    return add_5(a=1)

@workflow
def my_wf_example(a: int) -> typing.Tuple[int, int]:
    '''example

    Workflows can have inputs and return outputs of simple or complex types.

    '''

    x = add_5(a=a)

    # You can use outputs of a previous task as inputs to other nodes.
    z = add_5(a=x)

    # You can call other workflows from within this workflow
    d = simple_wf()

    # You can add conditions that can run on primitive types and execute different branches
    e = conditional("bool").if_(a == 5).then(add_5(a=d)).else_().then(add_5(a=z))

    # Outputs of the workflow have to be outputs returned by prior nodes.
    # No outputs and single or multiple outputs are supported
    return x, e

    def test_workflow_lhs():
    assert my_wf_example._lhs == "my_wf_example"


def test_all_node_types():
    assert my_wf_example(a=1) == (6, 16)
    entity_mapping = OrderedDict()

    model_wf = get_serializable(entity_mapping, serialization_settings, my_wf_example)

    assert len(model_wf.template.interface.outputs) == 2
    assert len(model_wf.template.nodes) == 4
    assert model_wf.template.nodes[2].workflow_node is not None

    sub_wf = model_wf.sub_workflows[0]
    assert len(sub_wf.nodes) == 1
    assert sub_wf.nodes[0].id == "n0"
    assert sub_wf.nodes[0].task_node.reference_id.name == "tests.flytekit.unit.core.test_workflows.add_5"


def test_wf_docstring():
    model_wf = get_serializable(OrderedDict(), serialization_settings, my_wf_example)

    assert len(model_wf.template.interface.outputs) == 2
    assert model_wf.template.interface.outputs["o0"].description == "outputs"
    assert model_wf.template.interface.outputs["o1"].description == "outputs"
    assert len(model_wf.template.interface.inputs) == 1
    assert model_wf.template.interface.inputs["a"].description == "input a"


@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
def test_structured_dataset_wf():
    import pandas as pd
    from pandas.testing import assert_frame_equal

    from flytekit.types.schema import FlyteSchema

    superset_cols = kwtypes(Name=str, Age=int, Height=int)
    subset_cols = kwtypes(Name=str)
    superset_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22], "Height": [160, 178]})
    subset_df = pd.DataFrame({"Name": ["Tom", "Joseph"]})

    @task
    def t1() -> Annotated[pd.DataFrame, superset_cols]:
        return superset_df

    @task
    def t2(df: Annotated[pd.DataFrame, subset_cols]) -> Annotated[pd.DataFrame, subset_cols]:
        return df

    @task
    def t3(df: FlyteSchema[superset_cols]) -> FlyteSchema[superset_cols]:
        return df

    @task
    def t4() -> FlyteSchema[superset_cols]:
        return superset_df

    @task
    def t5(sd: Annotated[StructuredDataset, subset_cols]) -> Annotated[pd.DataFrame, subset_cols]:
        return sd.open(pd.DataFrame).all()

    @workflow
    def sd_wf() -> Annotated[pd.DataFrame, subset_cols]:
        # StructuredDataset -> StructuredDataset
        df = t1()
        return t2(df=df)

    @workflow
    def sd_to_schema_wf() -> pd.DataFrame:
        # StructuredDataset -> schema
        df = t1()
        return t3(df=df)

    @workflow
    def schema_to_sd_wf() -> typing.Tuple[pd.DataFrame, pd.DataFrame]:
        # schema -> StructuredDataset
        df = t4()
        return t2(df=df), t5(sd=df)  # type: ignore

    assert_frame_equal(sd_wf(), subset_df)
    assert_frame_equal(sd_to_schema_wf(), superset_df)
    assert_frame_equal(schema_to_sd_wf()[0], subset_df)
    assert_frame_equal(schema_to_sd_wf()[1], subset_df)


@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
def test_compile_wf_at_compile_time():
    import pandas as pd

    from flytekit.types.schema import FlyteSchema

    superset_cols = kwtypes(Name=str, Age=int, Height=int)
    superset_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22], "Height": [160, 178]})

    ctx = FlyteContextManager.current_context()
    with FlyteContextManager.with_context(
        ctx.with_execution_state(
            ctx.new_execution_state().with_params(mode=context_manager.ExecutionState.Mode.TASK_EXECUTION)
        )
    ):

        @task
        def t4() -> FlyteSchema[superset_cols]:
            return superset_df

        @workflow
        def wf():
            t4()

        assert ctx.compilation_state is None


@pytest.mark.parametrize(
    "error_message", [
        "Fail!",
        None,
        "",
        ("big", "boom!")
    ]
)
@patch("builtins.print")
def test_failure_node_local_execution(mock_print, error_message, exec_prefix):
    @task
    def clean_up(name: str, err: typing.Optional[FlyteError] = None):
        print(f"Deleting cluster {name} due to {err}")
        print("This is err:", str(err))

    @task
    def create_cluster(name: str):
        print(f"Creating cluster: {name}")

    @task
    def delete_cluster(name: str, err: typing.Optional[FlyteError] = None):
        print(f"Deleting cluster {name}")
        print(err)

    @task
    def t1(a: int, b: str):
        print(f"{a} {b}")
        raise ValueError(error_message)

    @workflow(on_failure=clean_up)
    def wf(name: str = "flyteorg"):
        c = create_cluster(name=name)
        t = t1(a=1, b="2")
        d = delete_cluster(name=name)
        c >> t >> d

    with pytest.raises(ValueError):
        wf()

    # Adjusted the error message to match the one in the failure
    expected_error_message = str(
        FlyteError(message=f"Error encountered while executing '{exec_prefix}tests.flytekit.unit.core.test_workflows.t1':
rror_message}", failed_node_id="fn0")
    )

    assert mock_print.call_count > 0

    mock_print.assert_any_call("Creating cluster: flyteorg")
    mock_print.assert_any_call("1 2")
    mock_print.assert_any_call(f"Deleting cluster flyteorg due to {expected_error_message}")
    mock_print.assert_any_call("This is err:", expected_error_message)

Again, users should keep in mind that even though the body of the function looks like regular Python, it is actually not. When flytekit scans the workflow function, the objects being passed around between the tasks are not your typical Python values. So even though you may have a task t1() -> int, when a = t1() is called, a will not be an integer so if you try to range(a) you’ll get an error.

Please see the :ref:user guide <cookbook:workflow> for more usage examples.

Parameter Type Description
_workflow_function Optional[Callable[P, FuncOut]] This argument is implicitly passed and represents the decorated function.
failure_policy Optional[WorkflowFailurePolicy] Use the options in flytekit.WorkflowFailurePolicy
interruptible bool Whether or not tasks launched from this workflow are by default interruptible
on_failure Optional[Union[WorkflowBase, Task]] Invoke this workflow or task on failure. The Workflow / task has to match the signature of the current workflow, with an additional parameter called error Error
docs Optional[Documentation] Description entity for the workflow
pickle_untyped bool This is a flag that allows users to bypass the type-checking that Flytekit does when constructing the workflow. This is not recommended for general use.
default_options Optional[Options] Default options for the workflow when creating a default launch plan. Currently only the labels and annotations are allowed to be set as defaults.

union.ActorEnvironment

ActorEnvironment class.

class ActorEnvironment(
    name: str,
    container_image: Optional[Union[str, ImageSpec]],
    replica_count: int,
    ttl_seconds: Optional[int],
    environment: Optional[Dict[str, str]],
    requests: Optional[Resources],
    limits: Optional[Resources],
    accelerator: Optional[BaseAccelerator],
    secret_requests: Optional[List[Secret]],
    pod_template: Optional[PodTemplate],
    interruptible: bool,
)
Parameter Type Description
name str
container_image Optional[Union[str, ImageSpec]]
replica_count int
ttl_seconds Optional[int]
environment Optional[Dict[str, str]]
requests Optional[Resources]
limits Optional[Resources]
accelerator Optional[BaseAccelerator]
secret_requests Optional[List[Secret]]
pod_template Optional[PodTemplate]
interruptible bool

Properties

Property Type Description
task
version

union.Artifact

This is a wrapper around the Flytekit Artifact class.

This Python class has two purposes - as a Python representation of a materialized Artifact, and as a way for users to specify that tasks/workflows create Artifacts and the manner in which they are created.

Use one as input to workflow (only workflow for now) df_artifact = Artifact.get(“flyte://a1”) remote.execute(wf, inputs={“a”: df_artifact})

Note that Python fields will be missing when retrieved from the service.

class Artifact(
    args,
    project: Optional[str],
    domain: Optional[str],
    name: Optional[str],
    version: Optional[str],
    time_partitioned: bool,
    time_partition: Optional[TimePartition],
    time_partition_granularity: Optional[Granularity],
    partition_keys: Optional[typing.List[str]],
    partitions: Optional[Union[Partitions, typing.Dict[str, str]]],
    python_val: Optional[typing.Any],
    python_type: Optional[typing.Type],
    literal: Optional[Literal],
    literal_type: Optional[LiteralType],
    short_description: Optional[str],
    source: Optional[artifacts_pb2.ArtifactSource],
    card: Optional[Card],
    kwargs,
)
Parameter Type Description
args *args
project Optional[str] Should not be directly user provided, the project/domain will come from the project/domain of the execution that produced the output. These values will be filled in automatically when retrieving however.
domain Optional[str] See above.
name Optional[str] The name of the Artifact. This should be user provided.
version Optional[str] Version of the Artifact, typically the execution ID, plus some additional entropy. Not user provided.
time_partitioned bool Whether or not this Artifact will have a time partition.
time_partition Optional[TimePartition] If you want to manually pass in the full TimePartition object
time_partition_granularity Optional[Granularity] If you don’t want to manually pass in the full TimePartition object, but want to control the granularity when one is automatically created for you. Note that consistency checking is limited while in alpha.
partition_keys Optional[typing.List[str]] This is a list of keys that will be used to partition the Artifact. These are not the values. Values are set via a () on the artifact and will end up in the partition_values field.
partitions Optional[Union[Partitions, typing.Dict[str, str]]] This is a dictionary of partition keys to values.
python_val Optional[typing.Any] The Python value.
python_type Optional[typing.Type] The Python type.
literal Optional[Literal]
literal_type Optional[LiteralType]
short_description Optional[str]
source Optional[artifacts_pb2.ArtifactSource]
card Optional[Card]
kwargs **kwargs

Methods

Method Description
create_from() This function allows users to declare partition values dynamically from the body of a task.
embed_as_query() This should only be called in the context of a Trigger.
from_flyte_idl() Converts the IDL representation to this object.
get() This function is supposed to mimic the get() behavior inputs/outputs as returned by FlyteRemote for an.
initialize() Use this for when you have a Python value you want to get an Artifact object out of.
metadata()
query()
set_resolver()
set_source()
to_create_request()
to_id_idl() Converts this object to the IDL representation.

create_from()

def create_from(
    o: O,
    card: Optional[SerializableToString],
    args: *args,
    kwargs,
) -> O

This function allows users to declare partition values dynamically from the body of a task. Note that you’ll still need to annotate your task function output with the relevant Artifact object. Below, one of the partition values is bound to an input, and the other is set at runtime. Note that since tasks are not run at compile time, flytekit cannot check that you’ve bound all the partition values. It’s up to you to ensure that you’ve done so.

Pricing = Artifact(name="pricing", partition_keys=["region"])
EstError = Artifact(name="estimation_error", partition_keys=["dataset"], time_partitioned=True)

@task
def t1() -> Annotated[pd.DataFrame, Pricing], Annotated[float, EstError]:
    df = get_pricing_results()
    dt = get_time()
    return Pricing.create_from(df, region="dubai"),             EstError.create_from(msq_error, dataset="train", time_partition=dt)

You can mix and match with the input syntax as well.

@task
def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]:
    ...
    return RideCountData.create_from(df, time_partition=datetime.datetime.now())
Parameter Type Description
o O
card Optional[SerializableToString]
args *args
kwargs **kwargs

embed_as_query()

def embed_as_query(
    partition: Optional[str],
    bind_to_time_partition: Optional[bool],
    expr: Optional[str],
    op: Optional[Op],
) -> art_id.ArtifactQuery

This should only be called in the context of a Trigger. The type of query this returns is different from the query() function. This type of query is used to reference the triggering artifact, rather than running a query.

Parameter Type Description
partition Optional[str] Can embed a time partition
bind_to_time_partition Optional[bool] Set to true if you want to bind to a time partition
expr Optional[str] Only valid if there’s a time partition.
op Optional[Op] If expr is given, then op is what to do with it.

from_flyte_idl()

def from_flyte_idl(
    pb2: artifacts_pb2.Artifact,
) -> Artifact

Converts the IDL representation to this object.

Parameter Type Description
pb2 artifacts_pb2.Artifact

get()

def get(
    as_type: Optional[typing.Type],
) -> Optional[typing.Any]

This function is supposed to mimic the get() behavior inputs/outputs as returned by FlyteRemote for an execution, leveraging the LiteralsResolver (and underneath that the TypeEngine) to turn the literal into a Python value.

Parameter Type Description
as_type Optional[typing.Type]

initialize()

def initialize(
    python_val: typing.Any,
    python_type: typing.Type,
    name: Optional[str],
    literal_type: Optional[LiteralType],
    version: Optional[str],
    tags: Optional[typing.List[str]],
) -> Artifact

Use this for when you have a Python value you want to get an Artifact object out of.

This function readies an Artifact for creation, it doesn’t actually create it just yet since this is a network-less call. You will need to persist it with a FlyteRemote instance: remote.create_artifact(Artifact.initialize(…))

Artifact.initialize("/path/to/file", tags={“tag1”: “val1”}) Artifact.initialize("/path/to/parquet", type=pd.DataFrame, tags=[“0.1.0”])

What’s set here is everything that isn’t set by the server. What is set by the server?

  • name, version, if not set by user.
  • uri Set by remote
  • project, domain
Parameter Type Description
python_val typing.Any
python_type typing.Type
name Optional[str]
literal_type Optional[LiteralType]
version Optional[str]
tags Optional[typing.List[str]]

metadata()

def metadata()

query()

def query(
    project: Optional[str],
    domain: Optional[str],
    time_partition: Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]],
    partitions: Optional[Union[typing.Dict[str, str], Partitions]],
    kwargs,
) -> ArtifactQuery
Parameter Type Description
project Optional[str]
domain Optional[str]
time_partition Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]]
partitions Optional[Union[typing.Dict[str, str], Partitions]]
kwargs **kwargs

set_resolver()

def set_resolver(
    resolver: LiteralsResolver,
)
Parameter Type Description
resolver LiteralsResolver

set_source()

def set_source(
    source: artifacts_pb2.ArtifactSource,
)
Parameter Type Description
source artifacts_pb2.ArtifactSource

to_create_request()

def to_create_request(
    a: Artifact,
) -> artifacts_pb2.CreateArtifactRequest
Parameter Type Description
a Artifact

to_id_idl()

def to_id_idl()

Converts this object to the IDL representation. This is here instead of translator because it’s in the interface, a relatively simple proto object that’s exposed to the user.

Properties

Property Type Description
concrete_artifact_id
partitions
time_partition

union.Cache

Cache configuration for a task.

class Cache(
    version: typing.Optional[str],
    serialize: bool,
    ignored_inputs: typing.Union[typing.Tuple[str, ...], str],
    salt: str,
    policies: typing.Union[typing.List[flytekit.core.cache.CachePolicy], flytekit.core.cache.CachePolicy, NoneType],
)
Parameter Type Description
version typing.Optional[str]
serialize bool
ignored_inputs typing.Union[typing.Tuple[str, ...], str]
salt str
policies typing.Union[typing.List[flytekit.core.cache.CachePolicy], flytekit.core.cache.CachePolicy, NoneType]

Methods

Method Description
get_ignored_inputs()
get_version()

get_ignored_inputs()

def get_ignored_inputs()

get_version()

def get_version(
    params: flytekit.core.cache.VersionParameters,
) -> str
Parameter Type Description
params flytekit.core.cache.VersionParameters

union.CachePolicy

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
protocol CachePolicy()

Methods

Method Description
get_version()

get_version()

def get_version(
    salt: str,
    params: flytekit.core.cache.VersionParameters,
) -> str
Parameter Type Description
salt str
params flytekit.core.cache.VersionParameters

union.ContainerTask

This is an intermediate class that represents Flyte Tasks that run a container at execution time. This is the vast majority of tasks - the typical @task decorated tasks for instance all run a container. An example of something that doesn’t run a container would be something like the Athena SQL task.

class ContainerTask(
    name: str,
    image: typing.Union[str, flytekit.image_spec.image_spec.ImageSpec],
    command: typing.List[str],
    inputs: typing.Optional[typing.OrderedDict[str, typing.Type]],
    metadata: typing.Optional[flytekit.core.base_task.TaskMetadata],
    arguments: typing.Optional[typing.List[str]],
    outputs: typing.Optional[typing.Dict[str, typing.Type]],
    requests: typing.Optional[flytekit.core.resources.Resources],
    limits: typing.Optional[flytekit.core.resources.Resources],
    input_data_dir: typing.Optional[str],
    output_data_dir: typing.Optional[str],
    metadata_format: <enum 'MetadataFormat'>,
    io_strategy: typing.Optional[flytekit.core.container_task.ContainerTask.IOStrategy],
    secret_requests: typing.Optional[typing.List[flytekit.models.security.Secret]],
    pod_template: typing.Optional[ForwardRef('PodTemplate')],
    pod_template_name: typing.Optional[str],
    local_logs: bool,
    resources: typing.Optional[flytekit.core.resources.Resources],
    kwargs,
)
Parameter Type Description
name str A unique name for the task instantiation. This is unique for every instance of task.
image typing.Union[str, flytekit.image_spec.image_spec.ImageSpec]
command typing.List[str]
inputs typing.Optional[typing.OrderedDict[str, typing.Type]]
metadata typing.Optional[flytekit.core.base_task.TaskMetadata]
arguments typing.Optional[typing.List[str]]
outputs typing.Optional[typing.Dict[str, typing.Type]]
requests typing.Optional[flytekit.core.resources.Resources]
limits typing.Optional[flytekit.core.resources.Resources]
input_data_dir typing.Optional[str]
output_data_dir typing.Optional[str]
metadata_format <enum 'MetadataFormat'>
io_strategy typing.Optional[flytekit.core.container_task.ContainerTask.IOStrategy]
secret_requests typing.Optional[typing.List[flytekit.models.security.Secret]]
pod_template typing.Optional[ForwardRef('PodTemplate')]
pod_template_name typing.Optional[str]
local_logs bool
resources typing.Optional[flytekit.core.resources.Resources]
kwargs **kwargs

Methods

Method Description
compile() Generates a node that encapsulates this task in a workflow definition.
construct_node_metadata() Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute() This method translates Flyte’s Type system based input values and invokes the actual call to the executor.
execute() This method will be invoked to execute the task.
find_lhs()
get_config() Returns the task config as a serializable dictionary.
get_container() Returns the container definition (if any) that is used to run the task on hosted Flyte.
get_custom() Return additional plugin-specific custom data (if any) as a serializable dictionary.
get_extended_resources() Returns the extended resources to allocate to the task on hosted Flyte.
get_input_types() Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod() Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
get_sql() Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
get_type_for_input_var() Returns the python type for an input variable by name.
get_type_for_output_var() Returns the python type for the specified output variable by name.
local_execute() This function is used only in the local execution path and is responsible for calling dispatch execute.
local_execution_mode()
post_execute() Post execute is called after the execution has completed, with the user_params and can be used to clean-up,.
pre_execute() This is the method that will be invoked directly before executing the task method and before all the inputs.
sandbox_execute() Call dispatch_execute, in the context of a local sandbox execution.

compile()

def compile(
    ctx: flytekit.core.context_manager.FlyteContext,
    args,
    kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]

Generates a node that encapsulates this task in a workflow definition.

Parameter Type Description
ctx flytekit.core.context_manager.FlyteContext
args *args
kwargs **kwargs

construct_node_metadata()

def construct_node_metadata()

Used when constructing the node that encapsulates this task as part of a broader workflow definition.

dispatch_execute()

def dispatch_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]

This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.

  • VoidPromise is returned in the case when the task itself declares no outputs.
  • Literal Map is returned when the task returns either one more outputs in the declaration. Individual outputs may be none
  • DynamicJobSpec is returned when a dynamic workflow is executed
Parameter Type Description
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

execute()

def execute(
    kwargs,
) -> flytekit.models.literals.LiteralMap

This method will be invoked to execute the task.

Parameter Type Description
kwargs **kwargs

find_lhs()

def find_lhs()

get_config()

def get_config(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, str]]

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameter Type Description
settings flytekit.configuration.SerializationSettings

get_container()

def get_container(
    settings: flytekit.configuration.SerializationSettings,
) -> flytekit.models.task.Container

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameter Type Description
settings flytekit.configuration.SerializationSettings

get_custom()

def get_custom(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, typing.Any]]

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameter Type Description
settings flytekit.configuration.SerializationSettings

get_extended_resources()

def get_extended_resources(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flyteidl.core.tasks_pb2.ExtendedResources]

Returns the extended resources to allocate to the task on hosted Flyte.

Parameter Type Description
settings flytekit.configuration.SerializationSettings

get_input_types()

def get_input_types()

Returns the names and python types as a dictionary for the inputs of this task.

get_k8s_pod()

def get_k8s_pod(
    settings: flytekit.configuration.SerializationSettings,
) -> flytekit.models.task.K8sPod

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameter Type Description
settings flytekit.configuration.SerializationSettings

get_sql()

def get_sql(
    settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]

Returns the Sql definition (if any) that is used to run the task on hosted Flyte.

Parameter Type Description
settings flytekit.configuration.SerializationSettings

get_type_for_input_var()

def get_type_for_input_var(
    k: str,
    v: typing.Any,
) -> typing.Type[typing.Any]

Returns the python type for an input variable by name.

Parameter Type Description
k str
v typing.Any

get_type_for_output_var()

def get_type_for_output_var(
    k: str,
    v: typing.Any,
) -> typing.Type[typing.Any]

Returns the python type for the specified output variable by name.

Parameter Type Description
k str
v typing.Any

local_execute()

def local_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, typing.Coroutine, NoneType]

This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).

Parameter Type Description
ctx flytekit.core.context_manager.FlyteContext
kwargs **kwargs

local_execution_mode()

def local_execution_mode()

post_execute()

def post_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
    rval: typing.Any,
) -> typing.Any

Post execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op

Parameter Type Description
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters] are the modified user params as created during the pre_execute step
rval typing.Any

pre_execute()

def pre_execute(
    user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
) -> typing.Optional[flytekit.core.context_manager.ExecutionParameters]

This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called

This should return either the same context of the mutated context

Parameter Type Description
user_params typing.Optional[flytekit.core.context_manager.ExecutionParameters]

sandbox_execute()

def sandbox_execute(
    ctx: flytekit.core.context_manager.FlyteContext,
    input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMap

Call dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.

Parameter Type Description
ctx flytekit.core.context_manager.FlyteContext
input_literal_map flytekit.models.literals.LiteralMap

Properties

Property Type Description
deck_fields
If not empty, this task will output deck html file for the specified decks
disable_deck
If true, this task will not output deck html file
docs
enable_deck
If true, this task will output deck html file
environment
Any environment variables that supplied during the execution of the task.
instantiated_in
interface
lhs
location
metadata
name
python_interface
Returns this task’s python interface.
resources
security_context
task_config
Returns the user-specified task config which is used for plugin-specific handling of the task.
task_type
task_type_version

union.Deck

Deck enable users to get customizable and default visibility into their tasks.

Deck contains a list of renderers (FrameRenderer, MarkdownRenderer) that can generate a html file. For example, FrameRenderer can render a DataFrame as an HTML table, MarkdownRenderer can convert Markdown string to HTML

Flyte context saves a list of deck objects, and we use renderers in those decks to render the data and create an HTML file when those tasks are executed

Each task has a least three decks (input, output, default). Input/output decks are used to render tasks’ input/output data, and the default deck is used to render line plots, scatter plots or Markdown text. In addition, users can create new decks to render their data with custom renderers.

iris_df = px.data.iris()

@task()
def t1() -> str:
    md_text = '#Hello Flyte##Hello Flyte###Hello Flyte'
    m = MarkdownRenderer()
    s = BoxRenderer("sepal_length")
    deck = flytekit.Deck("demo", s.to_html(iris_df))
    deck.append(m.to_html(md_text))
    default_deck = flytekit.current_context().default_deck
    default_deck.append(m.to_html(md_text))
    return md_text


# Use Annotated to override default renderer
@task()
def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]:
    return iris_df
class Deck(
    name: str,
    html: typing.Optional[str],
    auto_add_to_deck: bool,
)
Parameter Type Description
name str
html typing.Optional[str]
auto_add_to_deck bool

Methods

Method Description
append()
publish()

append()

def append(
    html: str,
) -> Deck
Parameter Type Description
html str

publish()

def publish()

Properties

Property Type Description
html
name

union.FlyteDirectory

class FlyteDirectory(
    path: typing.Union[str, os.PathLike],
    downloader: typing.Optional[typing.Callable],
    remote_directory: typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]],
)
Parameter Type Description
path typing.Union[str, os.PathLike] The source path that users are expected to call open() on
downloader typing.Optional[typing.Callable] Optional function that can be passed that used to delay downloading of the actual fil until a user actually calls open().
remote_directory typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]] If the user wants to return something and also specify where it should be uploaded to. If set to False, then flytekit will not upload the directory to the remote store.

Methods

Method Description
crawl() Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”.
deserialize_flyte_dir()
download()
extension()
from_dict()
from_json()
from_source() Create a new FlyteDirectory object with the remote source set to the input.
listdir() This function will list all files and folders in the given directory, but without downloading the contents.
new() Create a new FlyteDirectory object in current Flyte working directory.
new_dir() This will create a new folder under the current folder.
new_file() This will create a new file under the current folder.
new_remote() Create a new FlyteDirectory object using the currently configured default remote in the context (i.
schema()
serialize_flyte_dir()
to_dict()
to_json()

crawl()

def crawl(
    maxdepth: typing.Optional[int],
    topdown: bool,
    kwargs,
) -> Generator[Tuple[typing.Union[str, os.PathLike[Any]], typing.Dict[Any, Any]], None, None]

Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”. if details=True is passed, then it will return a dictionary as specified by fsspec.

Example:

>>> list(fd.crawl())
[("/base", "file1"), ("/base", "dir1/file1"), ("/base", "dir2/file1"), ("/base", "dir1/dir/file1")]

>>> list(x.crawl(detail=True))
[('/tmp/test', {'my-dir/ab.py': {'name': '/tmp/test/my-dir/ab.py', 'size': 0, 'type': 'file',
 'created': 1677720780.2318847, 'islink': False, 'mode': 33188, 'uid': 501, 'gid': 0,
  'mtime': 1677720780.2317934, 'ino': 1694329, 'nlink': 1}})]
Parameter Type Description
maxdepth typing.Optional[int]
topdown bool
kwargs **kwargs

deserialize_flyte_dir()

def deserialize_flyte_dir(
    info,
) -> FlyteDirectory
Parameter Type Description
info

download()

def download()

extension()

def extension()

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
) -> ~A
Parameter Type Description
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
) -> ~A
Parameter Type Description
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

from_source()

def from_source(
    source: str | os.PathLike,
) -> FlyteDirectory

Create a new FlyteDirectory object with the remote source set to the input

Parameter Type Description
source str | os.PathLike

listdir()

def listdir(
    directory: FlyteDirectory,
) -> typing.List[typing.Union[FlyteDirectory, FlyteFile]]

This function will list all files and folders in the given directory, but without downloading the contents. In addition, it will return a list of FlyteFile and FlyteDirectory objects that have ability to lazily download the contents of the file/folder. For example:

entity = FlyteDirectory.listdir(directory)
for e in entity:
    print("s3 object:", e.remote_source)
    # s3 object: s3://test-flytedir/file1.txt
    # s3 object: s3://test-flytedir/file2.txt
    # s3 object: s3://test-flytedir/sub_dir

open(entity[0], "r")  # This will download the file to the local disk.
open(entity[0], "r")  # flytekit will read data from the local disk if you open it again.
Parameter Type Description
directory FlyteDirectory

new()

def new(
    dirname: str | os.PathLike,
) -> FlyteDirectory

Create a new FlyteDirectory object in current Flyte working directory.

Parameter Type Description
dirname str | os.PathLike

new_dir()

def new_dir(
    name: typing.Optional[str],
) -> FlyteDirectory

This will create a new folder under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.

Parameter Type Description
name typing.Optional[str]

new_file()

def new_file(
    name: typing.Optional[str],
) -> FlyteFile

This will create a new file under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.

Parameter Type Description
name typing.Optional[str]

new_remote()

def new_remote(
    stem: typing.Optional[str],
    alt: typing.Optional[str],
) -> FlyteDirectory

Create a new FlyteDirectory object using the currently configured default remote in the context (i.e. the raw_output_prefix configured in the current FileAccessProvider object in the context). This is used if you explicitly have a folder somewhere that you want to create files under. If you want to write a whole folder, you can let your task return a FlyteDirectory object, and let flytekit handle the uploading.

Parameter Type Description
stem typing.Optional[str] A stem to append to the path as the final prefix “directory”.
alt typing.Optional[str] An alternate first member of the prefix to use instead of the default. :return FlyteDirectory: A new FlyteDirectory object that points to a remote location.

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
) -> SchemaType[A]
Parameter Type Description
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

serialize_flyte_dir()

def serialize_flyte_dir()

to_dict()

def to_dict(
    encode_json,
) -> typing.Dict[str, typing.Union[dict, list, str, int, float, bool, NoneType]]
Parameter Type Description
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
) -> str
Parameter Type Description
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

Properties

Property Type Description
downloaded
remote_directory
remote_source
If this is an input to a task, and the original path is s3://something, flytekit will download the
directory for the user. In case the user wants access to the original path, it will be here.
sep

union.FlyteFile

class FlyteFile(
    path: typing.Union[str, os.PathLike],
    downloader: typing.Callable,
    remote_path: typing.Optional[typing.Union[os.PathLike, str, bool]],
    metadata: typing.Optional[dict[str, str]],
)

FlyteFile’s init method.

Parameter Type Description
path typing.Union[str, os.PathLike] The source path that users are expected to call open() on.
downloader typing.Callable Optional function that can be passed that used to delay downloading of the actual fil until a user actually calls open().
remote_path typing.Optional[typing.Union[os.PathLike, str, bool]] If the user wants to return something and also specify where it should be uploaded to. Alternatively, if the user wants to specify a remote path for a file that’s already in the blob store, the path should point to the location and remote_path should be set to False.
metadata typing.Optional[dict[str, str]]

Methods

Method Description
deserialize_flyte_file()
download()
extension()
from_dict()
from_json()
from_source() Create a new FlyteFile object with the remote source set to the input.
new() Create a new FlyteFile object in the current Flyte working directory.
new_remote_file() Create a new FlyteFile object with a remote path.
open() Returns a streaming File handle.
serialize_flyte_file()
to_dict()
to_json()

deserialize_flyte_file()

def deserialize_flyte_file(
    info,
) -> 'FlyteFile'
Parameter Type Description
info

download()

def download()

extension()

def extension()

from_dict()

def from_dict(
    d,
    dialect,
)
Parameter Type Description
d
dialect

from_json()

def from_json(
    data: typing.Union[str, bytes, bytearray],
    decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
    from_dict_kwargs: typing.Any,
) -> ~T
Parameter Type Description
data typing.Union[str, bytes, bytearray]
decoder collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]]
from_dict_kwargs typing.Any

from_source()

def from_source(
    source: str | os.PathLike,
) -> FlyteFile

Create a new FlyteFile object with the remote source set to the input

Parameter Type Description
source str | os.PathLike

new()

def new(
    filename: str | os.PathLike,
) -> FlyteFile

Create a new FlyteFile object in the current Flyte working directory

Parameter Type Description
filename str | os.PathLike

new_remote_file()

def new_remote_file(
    name: typing.Optional[str],
    alt: typing.Optional[str],
) -> FlyteFile

Create a new FlyteFile object with a remote path.

Parameter Type Description
name typing.Optional[str] If you want to specify a different name for the file, you can specify it here.
alt typing.Optional[str] If you want to specify a different prefix head than the default one, you can specify it here.

open()

def open(
    mode: str,
    cache_type: typing.Optional[str],
    cache_options: typing.Optional[typing.Dict[str, typing.Any]],
)

Returns a streaming File handle

@task
def copy_file(ff: FlyteFile) -> FlyteFile:
    new_file = FlyteFile.new_remote_file()
    with ff.open("rb", cache_type="readahead") as r:
        with new_file.open("wb") as w:
            w.write(r.read())
    return new_file
Parameter Type Description
mode str Open mode. For example :type mode: str
cache_type typing.Optional[str] Specifies the cache type. Possible values are “blockcache”, “bytes”, “mmap”, “readahead”, “first”, or “background”. This is especially useful for large file reads. See https://filesystem-spec.readthedocs.io/en/latest/api.html#readbuffering. :type cache_type: str, optional
cache_options typing.Optional[typing.Dict[str, typing.Any]] A Dict corresponding to the parameters for the chosen cache_type. Refer to fsspec caching options above. :type cache_options: Dict[str, Any], optional

serialize_flyte_file()

def serialize_flyte_file()

to_dict()

def to_dict()

to_json()

def to_json(
    encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
    to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]
Parameter Type Description
encoder collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]]
to_dict_kwargs typing.Any

Properties

Property Type Description
downloaded
remote_path
remote_source
If this is an input to a task, and the original path is an s3 bucket, Flytekit downloads the
file for the user. In case the user wants access to the original path, it will be here.

union.ImageSpec

This class is used to specify the docker image that will be used to run the task.

Attributes: name (str): Name of the image. python_version (str): Python version of the image. Use default python in the base image if None. builder (Optional[str]): Type of plugin to build the image. Use envd by default. source_root (Optional[str]): Source root of the image. env (Optional[Dict[str, str]]): Environment variables of the image. registry (Optional[str]): Registry of the image. packages (Optional[List[str]]): List of python packages to install. conda_packages (Optional[List[str]]): List of conda packages to install. conda_channels (Optional[List[str]]): List of conda channels. requirements (Optional[str]): Path to the requirements.txt file. apt_packages (Optional[List[str]]): List of apt packages to install. cuda (Optional[str]): Version of cuda to install. cudnn (Optional[str]): Version of cudnn to install. base_image (Optional[Union[str, ‘ImageSpec’]]): Base image of the image. platform (Optional[str]): Specify the target platforms for the build output (for example, windows/amd64 or linux/amd64,darwin/arm64). pip_index (Optional[str]): Specify the custom pip index url. pip_extra_index_url (Optional[List[str]]): Specify one or more pip index urls as a list. pip_secret_mounts (Optional[List[Tuple[str, str]]]): Specify a list of tuples to mount secret for pip install. Each tuple should contain the path to the secret file and the mount path. For example, [(".gitconfig", “/etc/gitconfig”)]. This is experimental and the interface may change in the future. Configuring this should not change the built image. pip_extra_args (Optional[str]): Specify one or more extra pip install arguments as a space-delimited string. registry_config (Optional[str]): Specify the path to a JSON registry config file. entrypoint (Optional[List[str]]): List of strings to overwrite the entrypoint of the base image with, set to [] to remove the entrypoint. commands (Optional[List[str]]): Command to run during the building process. tag_format (Optional[str]): Custom string format for image tag. The ImageSpec hash passed in as spec_hash. For example, to add a “dev” suffix to the image tag, set tag_format="{spec_hash}-dev". source_copy_mode (Optional[CopyFileDetection]): This option allows the user to specify which source files to copy from the local host, into the image. Not setting this option means to use the default flytekit behavior. The default behavior is: - if fast register is used, source files are not copied into the image (because they’re already copied into the fast register tar layer). - if fast register is not used, then the LOADED_MODULES (aka ‘auto’) option is used to copy loaded Python files into the image. If the option is set by the user, then that option is of course used. copy (Optional[List[str]]): List of files/directories to copy to /root. e.g. [“src/file1.txt”, “src/file2.txt”]. python_exec (Optional[str]): Python executable to use for install packages. runtime_packages (Optional[List[str]]): List of packages to be installed during runtime. runtime_packages requires pip to be installed in your base image. - If you are using an ImageSpec as your base image, please include pip into your packages: ImageSpec(..., packages=["pip"]). - If you want to install runtime packages into a fixed base_image and not use an image builder, you can use builder="noop": ImageSpec(base_image="ghcr.io/name/my-custom-image", builder="noop").with_runtime_packages(["numpy"]). builder_options (Optional[Dict[str, Any]]): Additional options for the builder. This is a dictionary that will be passed to the builder. The options are builder-specific and may not be supported by all builders. builder_config (Optional[typing.Dict[str, typing.Any]]): Custom builder images configuration, such as uv and micromamba images.

class ImageSpec(
    name: str,
    python_version: str,
    builder: typing.Optional[str],
    source_root: typing.Optional[str],
    env: typing.Optional[typing.Dict[str, str]],
    registry: typing.Optional[str],
    packages: typing.Optional[typing.List[str]],
    conda_packages: typing.Optional[typing.List[str]],
    conda_channels: typing.Optional[typing.List[str]],
    requirements: typing.Optional[str],
    apt_packages: typing.Optional[typing.List[str]],
    cuda: typing.Optional[str],
    cudnn: typing.Optional[str],
    base_image: typing.Union[str, ForwardRef('ImageSpec'), NoneType],
    platform: typing.Optional[str],
    pip_index: typing.Optional[str],
    pip_extra_index_url: typing.Optional[typing.List[str]],
    pip_secret_mounts: typing.Optional[typing.List[typing.Tuple[str, str]]],
    pip_extra_args: typing.Optional[str],
    registry_config: typing.Optional[str],
    entrypoint: typing.Optional[typing.List[str]],
    commands: typing.Optional[typing.List[str]],
    tag_format: typing.Optional[str],
    source_copy_mode: typing.Optional[flytekit.constants.CopyFileDetection],
    copy: typing.Optional[typing.List[str]],
    python_exec: typing.Optional[str],
    runtime_packages: typing.Optional[typing.List[str]],
    builder_options: typing.Optional[typing.Dict[str, typing.Any]],
    builder_config: typing.Optional[typing.Dict[str, typing.Any]],
)
Parameter Type Description
name str
python_version str
builder typing.Optional[str]
source_root typing.Optional[str]
env typing.Optional[typing.Dict[str, str]]
registry typing.Optional[str]
packages typing.Optional[typing.List[str]]
conda_packages typing.Optional[typing.List[str]]
conda_channels typing.Optional[typing.List[str]]
requirements typing.Optional[str]
apt_packages typing.Optional[typing.List[str]]
cuda typing.Optional[str]
cudnn typing.Optional[str]
base_image typing.Union[str, ForwardRef('ImageSpec'), NoneType]
platform typing.Optional[str]
pip_index typing.Optional[str]
pip_extra_index_url typing.Optional[typing.List[str]]
pip_secret_mounts typing.Optional[typing.List[typing.Tuple[str, str]]]
pip_extra_args typing.Optional[str]
registry_config typing.Optional[str]
entrypoint typing.Optional[typing.List[str]]
commands typing.Optional[typing.List[str]]
tag_format typing.Optional[str]
source_copy_mode typing.Optional[flytekit.constants.CopyFileDetection]
copy typing.Optional[typing.List[str]]
python_exec typing.Optional[str]
runtime_packages typing.Optional[typing.List[str]]
builder_options typing.Optional[typing.Dict[str, typing.Any]]
builder_config typing.Optional[typing.Dict[str, typing.Any]]

Methods

Method Description
exist() Check if the image exists in the registry.
force_push() Builder that returns a new image spec with force push enabled.
from_env() Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment.
image_name() Full image name with tag.
is_container() Check if the current container image in the pod is built from current image spec.
with_apt_packages() Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process.
with_builder_options() Builder that returns a new image spec with additional builder options.
with_commands() Builder that returns a new image spec with an additional list of commands that will be executed during the building process.
with_copy() Builder that returns a new image spec with the source files copied to the destination directory.
with_packages() Builder that returns a new image speck with additional python packages that will be installed during the building process.
with_runtime_packages() Builder that returns a new image spec with runtime packages.

exist()

def exist()

Check if the image exists in the registry. Return True if the image exists in the registry, False otherwise. Return None if failed to check if the image exists due to the permission issue or other reasons.

force_push()

def force_push()

Builder that returns a new image spec with force push enabled.

from_env()

def from_env(
    pinned_packages: typing.Optional[typing.List[str]],
    kwargs,
) -> ImageSpec

Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment.

Parameter Type Description
pinned_packages typing.Optional[typing.List[str]]
kwargs **kwargs

image_name()

def image_name()

Full image name with tag.

is_container()

def is_container()

Check if the current container image in the pod is built from current image spec. :return: True if the current container image in the pod is built from current image spec, False otherwise.

with_apt_packages()

def with_apt_packages(
    apt_packages: typing.Union[str, typing.List[str]],
) -> ImageSpec

Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process.

Parameter Type Description
apt_packages typing.Union[str, typing.List[str]]

with_builder_options()

def with_builder_options(
    builder_options: typing.Dict[str, typing.Any],
) -> ImageSpec

Builder that returns a new image spec with additional builder options.

Parameter Type Description
builder_options typing.Dict[str, typing.Any]

with_commands()

def with_commands(
    commands: typing.Union[str, typing.List[str]],
) -> ImageSpec

Builder that returns a new image spec with an additional list of commands that will be executed during the building process.

Parameter Type Description
commands typing.Union[str, typing.List[str]]

with_copy()

def with_copy(
    src: typing.Union[str, typing.List[str]],
) -> ImageSpec

Builder that returns a new image spec with the source files copied to the destination directory.

Parameter Type Description
src typing.Union[str, typing.List[str]]

with_packages()

def with_packages(
    packages: typing.Union[str, typing.List[str]],
) -> ImageSpec

Builder that returns a new image speck with additional python packages that will be installed during the building process.

Parameter Type Description
packages typing.Union[str, typing.List[str]]

with_runtime_packages()

def with_runtime_packages(
    runtime_packages: typing.List[str],
) -> ImageSpec

Builder that returns a new image spec with runtime packages. Dev packages will be installed during runtime.

Parameter Type Description
runtime_packages typing.List[str]

union.LaunchPlan

Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the :std:ref:core concepts <flyte:divedeep-launchplans> if you are unfamiliar with them.

Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow

@workflow
def wf(a: int, c: str) -> str:
        ...

Create the default launch plan with

LaunchPlan.get_or_create(workflow=my_wf)

If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:

Additionally, a launch plan can be configured to run on a schedule and emit notifications.

Please see the relevant Schedule and Notification objects as well.

To configure the remaining parameters, you’ll need to import the relevant model objects as well.

from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig

Then use as follows:

class LaunchPlan(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    parameters: _interface_models.ParameterMap,
    fixed_inputs: _literal_models.LiteralMap,
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
    concurrency: Optional[ConcurrencyPolicy],
)
Parameter Type Description
name str
workflow _annotated_workflow.WorkflowBase
parameters _interface_models.ParameterMap
fixed_inputs _literal_models.LiteralMap
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool
concurrency Optional[ConcurrencyPolicy]

Methods

Method Description
clone_with()
construct_node_metadata()
create()
get_default_launch_plan() Users should probably call the get_or_create function defined below instead.
get_or_create() This function offers a friendlier interface for creating launch plans.

clone_with()

def clone_with(
    name: str,
    parameters: Optional[_interface_models.ParameterMap],
    fixed_inputs: Optional[_literal_models.LiteralMap],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
) -> LaunchPlan
Parameter Type Description
name str
parameters Optional[_interface_models.ParameterMap]
fixed_inputs Optional[_literal_models.LiteralMap]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool

construct_node_metadata()

def construct_node_metadata()

create()

def create(
    name: str,
    workflow: _annotated_workflow.WorkflowBase,
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
    concurrency: Optional[ConcurrencyPolicy],
) -> LaunchPlan
Parameter Type Description
name str
workflow _annotated_workflow.WorkflowBase
default_inputs Optional[Dict[str, Any]]
fixed_inputs Optional[Dict[str, Any]]
schedule Optional[_schedule_model.Schedule]
notifications Optional[List[_common_models.Notification]]
labels Optional[_common_models.Labels]
annotations Optional[_common_models.Annotations]
raw_output_data_config Optional[_common_models.RawOutputDataConfig]
max_parallelism Optional[int]
security_context Optional[security.SecurityContext]
auth_role Optional[_common_models.AuthRole]
trigger Optional[LaunchPlanTriggerBase]
overwrite_cache Optional[bool]
auto_activate bool
concurrency Optional[ConcurrencyPolicy]

get_default_launch_plan()

def get_default_launch_plan(
    ctx: FlyteContext,
    workflow: _annotated_workflow.WorkflowBase,
) -> LaunchPlan

Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.

Parameter Type Description
ctx FlyteContext This is not flytekit.current_context(). This is an internal context object. Users familiar with flytekit should feel free to use this however.
workflow _annotated_workflow.WorkflowBase The workflow to create a launch plan for.

get_or_create()

def get_or_create(
    workflow: _annotated_workflow.WorkflowBase,
    name: Optional[str],
    default_inputs: Optional[Dict[str, Any]],
    fixed_inputs: Optional[Dict[str, Any]],
    schedule: Optional[_schedule_model.Schedule],
    notifications: Optional[List[_common_models.Notification]],
    labels: Optional[_common_models.Labels],
    annotations: Optional[_common_models.Annotations],
    raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
    max_parallelism: Optional[int],
    security_context: Optional[security.SecurityContext],
    auth_role: Optional[_common_models.AuthRole],
    trigger: Optional[LaunchPlanTriggerBase],
    overwrite_cache: Optional[bool],
    auto_activate: bool,
    concurrency: Optional[ConcurrencyPolicy],
) -> LaunchPlan

This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.

The resulting launch plan is also cached and if called again with the same name, the cached version is returned

Parameter Type Description
workflow _annotated_workflow.WorkflowBase The Workflow to create a launch plan for.
name Optional[str] If you supply a name, keep it mind it needs to be unique. That is, project, domain, version, and this name form a primary key. If you do not supply a name, this function will assume you want the default launch plan for the given workflow.
default_inputs Optional[Dict[str, Any]] Default inputs, expressed as Python values.
fixed_inputs Optional[Dict[str, Any]] Fixed inputs, expressed as Python values. At call time, these cannot be changed.
schedule Optional[_schedule_model.Schedule] Optional schedule to run on.
notifications Optional[List[_common_models.Notification]] Notifications to send.
labels Optional[_common_models.Labels] Optional labels to attach to executions created by this launch plan.
annotations Optional[_common_models.Annotations] Optional annotations to attach to executions created by this launch plan.
raw_output_data_config Optional[_common_models.RawOutputDataConfig] Optional location of offloaded data for things like S3, etc.
max_parallelism Optional[int] Controls the maximum number of tasknodes that can be run in parallel for the entire workflow. This is useful to achieve fairness. Note: MapTasks are regarded as one unit, and parallelism/concurrency of MapTasks is independent from this.
security_context Optional[security.SecurityContext] Security context for the execution
auth_role Optional[_common_models.AuthRole] Add an auth role if necessary.
trigger Optional[LaunchPlanTriggerBase] [alpha] This is a new syntax for specifying schedules.
overwrite_cache Optional[bool] If set to True, the execution will always overwrite cache
auto_activate bool If set to True, the launch plan will be activated automatically on registration. Default is False.
concurrency Optional[ConcurrencyPolicy] Defines execution concurrency limits and policy when limit is reached

Properties

Property Type Description
annotations
concurrency
fixed_inputs
interface
labels
max_parallelism
name
notifications
overwrite_cache
parameters
python_interface
raw_output_data_config
saved_inputs
schedule
security_context
should_auto_activate
trigger
workflow

union.PodTemplate

Custom PodTemplate specification for a Task.

class PodTemplate(
    pod_spec: typing.Optional[ForwardRef('V1PodSpec')],
    primary_container_name: str,
    labels: typing.Optional[typing.Dict[str, str]],
    annotations: typing.Optional[typing.Dict[str, str]],
)
Parameter Type Description
pod_spec typing.Optional[ForwardRef('V1PodSpec')]
primary_container_name str
labels typing.Optional[typing.Dict[str, str]]
annotations typing.Optional[typing.Dict[str, str]]

Methods

Method Description
version_hash()

version_hash()

def version_hash()

union.Resources

This class is used to specify both resource requests and resource limits.

Resources(cpu="1", mem="2048")  # This is 1 CPU and 2 KB of memory
Resources(cpu="100m", mem="2Gi")  # This is 1/10th of a CPU and 2 gigabytes of memory
Resources(cpu=0.5, mem=1024) # This is 500m CPU and 1 KB of memory

# For Kubernetes-based tasks, pods use ephemeral local storage for scratch space, caching, and for logs.
# This allocates 1Gi of such local storage.
Resources(ephemeral_storage="1Gi")

When used together with @task(resources=), you a specific the request and limits with one object. When the value is set to a tuple or list, the first value is the request and the second value is the limit. If the value is a single value, then both the requests and limit is set to that value. For example, the Resource(cpu=("1", "2"), mem=1024) will set the cpu request to 1, cpu limit to 2, mem limit and request to 1024.

Persistent storage is not currently supported on the Flyte backend.

Please see the :std:ref:User Guide <cookbook:customizing task resources> for detailed examples. Also refer to the K8s conventions.

class Resources(
    cpu: typing.Union[str, int, float, list, tuple, NoneType],
    mem: typing.Union[str, int, list, tuple, NoneType],
    gpu: typing.Union[str, int, list, tuple, NoneType],
    ephemeral_storage: typing.Union[str, int, NoneType],
)
Parameter Type Description
cpu typing.Union[str, int, float, list, tuple, NoneType]
mem typing.Union[str, int, list, tuple, NoneType]
gpu typing.Union[str, int, list, tuple, NoneType]
ephemeral_storage typing.Union[str, int, NoneType]

Methods

Method Description
from_dict()
from_json()
to_dict()
to_json()

from_dict()

def from_dict(
    d,
    dialect,
)
Parameter Type Description
d
dialect

from_json()

def from_json(
    data: typing.Union[str, bytes, bytearray],
    decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
    from_dict_kwargs: typing.Any,
) -> ~T
Parameter Type Description
data typing.Union[str, bytes, bytearray]
decoder collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]]
from_dict_kwargs typing.Any

to_dict()

def to_dict()

to_json()

def to_json(
    encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
    to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]
Parameter Type Description
encoder collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]]
to_dict_kwargs typing.Any

union.Secret

See :std:ref:cookbook:secrets for usage examples.

class Secret(
    group: typing.Optional[str],
    key: typing.Optional[str],
    group_version: typing.Optional[str],
    mount_requirement: <enum 'MountType'>,
    env_var: typing.Optional[str],
)
Parameter Type Description
group typing.Optional[str]
key typing.Optional[str]
group_version typing.Optional[str]
mount_requirement <enum 'MountType'>
env_var typing.Optional[str]

Methods

Method Description
from_flyte_idl()
serialize_to_string()
short_string() :rtype: Text.
to_flyte_idl()

from_flyte_idl()

def from_flyte_idl(
    pb2_object: flyteidl.core.security_pb2.Secret,
) -> Secret
Parameter Type Description
pb2_object flyteidl.core.security_pb2.Secret

serialize_to_string()

def serialize_to_string()

short_string()

def short_string()

:rtype: Text

to_flyte_idl()

def to_flyte_idl()

Properties

Property Type Description
is_empty

union.StructuredDataset

This is the user facing StructuredDataset class. Please don’t confuse it with the literals.StructuredDataset class (that is just a model, a Python class representation of the protobuf).

class StructuredDataset(
    dataframe: typing.Optional[typing.Any],
    uri: typing.Optional[str],
    metadata: typing.Optional[literals.StructuredDatasetMetadata],
    kwargs,
)
Parameter Type Description
dataframe typing.Optional[typing.Any]
uri typing.Optional[str]
metadata typing.Optional[literals.StructuredDatasetMetadata]
kwargs **kwargs

Methods

Method Description
all()
column_names()
columns()
deserialize_structured_dataset()
from_dict()
from_json()
iter()
open()
serialize_structured_dataset()
set_literal() A public wrapper method to set the StructuredDataset Literal.
to_dict()
to_json()

all()

def all()

column_names()

def column_names()

columns()

def columns()

deserialize_structured_dataset()

def deserialize_structured_dataset(
    info,
) -> StructuredDataset
Parameter Type Description
info

from_dict()

def from_dict(
    d,
    dialect,
)
Parameter Type Description
d
dialect

from_json()

def from_json(
    data: typing.Union[str, bytes, bytearray],
    decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
    from_dict_kwargs: typing.Any,
) -> ~T
Parameter Type Description
data typing.Union[str, bytes, bytearray]
decoder collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]]
from_dict_kwargs typing.Any

iter()

def iter()

open()

def open(
    dataframe_type: Type[DF],
)
Parameter Type Description
dataframe_type Type[DF]

serialize_structured_dataset()

def serialize_structured_dataset()

set_literal()

def set_literal(
    ctx: FlyteContext,
    expected: LiteralType,
)

A public wrapper method to set the StructuredDataset Literal.

This method provides external access to the internal _set_literal method.

Parameter Type Description
ctx FlyteContext
expected LiteralType

to_dict()

def to_dict()

to_json()

def to_json(
    encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
    to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]
Parameter Type Description
encoder collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]]
to_dict_kwargs typing.Any

Properties

Property Type Description
dataframe
literal
metadata

union.UnionRemote

Main entrypoint for programmatically accessing a Flyte remote backend.

The term ‘remote’ is synonymous with ‘backend’ or ‘deployment’ and refers to a hosted instance of the Flyte platform, which comes with a Flyte Admin server on some known URI.

class UnionRemote(
    config: typing.Optional[Union[Config, str]],
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: typing.Optional[bool],
    kwargs,
)

Initialize a FlyteRemote object.

:type kwargs: All arguments that can be passed to create the SynchronousFlyteClient. These are usually grpc parameters, if you want to customize credentials, ssl handling etc.

Parameter Type Description
config typing.Optional[Union[Config, str]]
default_project typing.Optional[str] default project to use when fetching or executing flyte entities.
default_domain typing.Optional[str] default domain to use when fetching or executing flyte entities.
data_upload_location str this is where all the default data will be uploaded when providing inputs. The default location - s3://my-s3-bucket/data works for sandbox/demo environment. Please override this for non-sandbox cases.
interactive_mode_enabled typing.Optional[bool] If set to True, the FlyteRemote will pickle the task/workflow, if False, it will not. If set to None, then it will automatically detect if it is running in an interactive environment like a Jupyter notebook and enable interactive mode.
kwargs **kwargs

Methods

Method Description
activate_launchplan() Given a launchplan, activate it, all previous versions are deactivated.
approve()
async_channel()
auto()
create_artifact() Create an artifact in FlyteAdmin.
deactivate_launchplan() Given a launchplan, deactivate it, all previous versions are deactivated.
deploy_app() Deploy an application.
download() Download the data to the specified location.
execute() Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.
execute_local_launch_plan() Execute a locally defined LaunchPlan.
execute_local_task() Execute a @task-decorated function or TaskTemplate task.
execute_local_workflow() Execute an @workflow decorated function.
execute_reference_launch_plan() Execute a ReferenceLaunchPlan.
execute_reference_task() Execute a ReferenceTask.
execute_reference_workflow() Execute a ReferenceWorkflow.
execute_remote_task_lp() Execute a FlyteTask, or FlyteLaunchplan.
execute_remote_wf() Execute a FlyteWorkflow.
fast_package() Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location.
fast_register_workflow() Use this method to register a workflow with zip mode.
fetch_active_launchplan() Returns the active version of the launch plan if it exists or returns None.
fetch_execution() Fetch a workflow execution entity from flyte admin.
fetch_launch_plan() Fetch a launchplan entity from flyte admin.
fetch_task() Fetch a task entity from flyte admin.
fetch_task_lazy() Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.
fetch_workflow() Fetch a workflow entity from flyte admin.
fetch_workflow_lazy() Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.
find_launch_plan()
find_launch_plan_for_node()
for_endpoint()
for_sandbox()
from_api_key() Call this if you want to directly instantiate a UnionRemote from an API key.
generate_console_http_domain() This should generate the domain where console is hosted.
generate_console_url() Generate a UnionAI console URL for the given Flyte remote endpoint.
get() General function that works with flyte tiny urls.
get_artifact() Get the specified artifact.
get_domains() Lists registered domains from flyte admin.
get_execution_metrics() Get the metrics for a given execution.
get_extra_headers_for_protocol()
launch_backfill() Creates and launches a backfill workflow for the given launchplan.
list_projects() Lists registered projects from flyte admin.
list_signals()
list_tasks_by_version()
raw_register() Raw register method, can be used to register control plane entities.
recent_executions()
register_launch_plan() Register a given launchplan, possibly applying overrides from the provided options.
register_script() Use this method to register a workflow via script mode.
register_task() Register a qualified task (PythonTask) with Remote.
register_workflow() Use this method to register a workflow.
reject()
remote_context() Context manager with remote-specific configuration.
search_artifacts()
set_input()
set_signal()
stop_app() Stop an application.
stream_execution_events() Stream execution events from the given tenant.
sync() This function was previously a singledispatchmethod.
sync_execution() Sync a FlyteWorkflowExecution object with its corresponding remote state.
sync_node_execution() Get data backing a node execution.
sync_task_execution() Sync a FlyteTaskExecution object with its corresponding remote state.
terminate() Terminate a workflow execution.
upload_file() Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.
wait() Wait for an execution to finish.

activate_launchplan()

def activate_launchplan(
    ident: Identifier,
)

Given a launchplan, activate it, all previous versions are deactivated.

Parameter Type Description
ident Identifier

approve()

def approve(
    signal_id: str,
    execution_name: str,
    project: str,
    domain: str,
)
Parameter Type Description
signal_id str The name of the signal, this is the key used in the approve() or wait_for_input() call.
execution_name str The name of the execution. This is the tail-end of the URL when looking at the workflow execution.
project str The execution project, will default to the Remote’s default project.
domain str The execution domain, will default to the Remote’s default domain.

async_channel()

def async_channel()

auto()

def auto(
    config_file: typing.Union[str, ConfigFile],
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: bool,
    kwargs,
) -> 'FlyteRemote'
Parameter Type Description
config_file typing.Union[str, ConfigFile]
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled bool
kwargs **kwargs

create_artifact()

def create_artifact(
    artifact: Artifact,
) -> Artifact

Create an artifact in FlyteAdmin.

Parameter Type Description
artifact Artifact The artifact to create. :return: The artifact as persisted in the service.

deactivate_launchplan()

def deactivate_launchplan(
    ident: Identifier,
)

Given a launchplan, deactivate it, all previous versions are deactivated.

Parameter Type Description
ident Identifier

deploy_app()

def deploy_app(
    app: App,
    project: Optional[str],
    domain: Optional[str],
) -> AppIDL

Deploy an application.

Parameter Type Description
app App Application to deploy.
project Optional[str] Domain name. If None, uses default_domain. :return: The App IDL for the deployed application.
domain Optional[str]

download()

def download(
    data: typing.Union[LiteralsResolver, Literal, LiteralMap],
    download_to: str,
    recursive: bool,
)

Download the data to the specified location. If the data is a LiteralsResolver, LiteralMap and if recursive is specified, then all file like objects will be recursively downloaded (e.g. FlyteFile/Dir (blob), StructuredDataset etc).

Note: That it will use your sessions credentials to access the remote location. For sandbox, this should be automatically configured, assuming you are running sandbox locally. For other environments, you will need to configure your credentials appropriately.

Parameter Type Description
data typing.Union[LiteralsResolver, Literal, LiteralMap] data to be downloaded
download_to str location to download to (str) that should be a valid path
recursive bool if the data is a LiteralsResolver or LiteralMap, then this flag will recursively download

execute()

def execute(
    entity: typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity],
    inputs: typing.Optional[typing.Dict[str, typing.Any]],
    project: str,
    domain: str,
    name: str,
    version: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    image_config: typing.Optional[ImageConfig],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteWorkflowExecution

Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.

This method supports:

  • Flyte{Task, Workflow, LaunchPlan} remote module objects.
  • @task-decorated functions and TaskTemplate tasks.
  • @workflow-decorated functions.
  • LaunchPlan objects.

For local entities, this code will attempt to find the entity first, and if missing, will compile and register the object.

Not all arguments are relevant in all circumstances. For example, there’s no reason to use the serialization settings for entities that have already been registered on Admin.

Parameter Type Description
entity typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity] entity to execute
inputs typing.Optional[typing.Dict[str, typing.Any]] dictionary mapping argument names to values
project str execute entity in this project. If entity doesn’t exist in the project, register the entity first before executing.
domain str execute entity in this domain. If entity doesn’t exist in the domain, register the entity first before executing.
name str execute entity using this name. If not None, use this value instead of entity.name
version str execute entity using this version. If None, uses auto-generated value.
execution_name typing.Optional[str] name of the execution. If None, uses auto-generated value.
execution_name_prefix typing.Optional[str] execution prefix to use. If provided, a random suffix will be appended
image_config typing.Optional[ImageConfig]
options typing.Optional[Options]
wait bool if True, waits for execution to complete
type_hints typing.Optional[typing.Dict[str, typing.Type]] Python types to be passed to the TypeEngine so that it knows how to properly convert the input values for the execution into Flyte literals. If missing, will default to first guessing the type using the type engine, and then to type(v). Providing the correct Python types is particularly important if the inputs are containers like lists or maps, or if the Python type is one of the more complex Flyte provided classes (like a StructuredDataset that’s annotated with columns).
overwrite_cache typing.Optional[bool] Allows for all cached values of a workflow and its tasks to be overwritten for a single execution. If enabled, all calculations are performed even if cached results would be available, overwriting the stored data once execution finishes successfully.
interruptible typing.Optional[bool] Optional flag to override the default interruptible flag of the executed entity.
envs typing.Optional[typing.Dict[str, str]] Environment variables to be set for the execution.
tags typing.Optional[typing.List[str]] Tags to be set for the execution.
cluster_pool typing.Optional[str] Specify cluster pool on which newly created execution should be placed.
execution_cluster_label typing.Optional[str] Specify label of cluster(s) on which newly created execution should be placed.
serialization_settings typing.Optional[SerializationSettings] Optionally provide serialization settings, in case the entity being run needs to first be registered. If not provided, a default will be used. > [!NOTE] > The name and version arguments do not apply to FlyteTask, FlyteLaunchPlan, and FlyteWorkflow entity inputs. These values are determined by referencing the entity identifier values.

execute_local_launch_plan()

def execute_local_launch_plan(
    entity: LaunchPlan,
    inputs: typing.Optional[typing.Dict[str, typing.Any]],
    version: str,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    name: typing.Optional[str],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteWorkflowExecution

Execute a locally defined LaunchPlan.

Parameter Type Description
entity LaunchPlan The locally defined launch plan object
inputs typing.Optional[typing.Dict[str, typing.Any]] Inputs to be passed into the execution as a dict with Python native values.
version str The version to look up/register the launch plan (if not already exists)
project typing.Optional[str] The same as version, but will default to the Remote object’s project
domain typing.Optional[str] The same as version, but will default to the Remote object’s domain
name typing.Optional[str] The same as version, but will default to the entity’s name
execution_name typing.Optional[str] If specified, will be used as the execution name instead of randomly generating.
execution_name_prefix typing.Optional[str]
options typing.Optional[Options] Options to be passed into the execution.
wait bool If True, will wait for the execution to complete before returning.
overwrite_cache typing.Optional[bool] If True, will overwrite the cache.
interruptible typing.Optional[bool] Optional flag to override the default interruptible flag of the executed entity.
envs typing.Optional[typing.Dict[str, str]] Environment variables to be passed into the execution.
tags typing.Optional[typing.List[str]] Tags to be passed into the execution.
cluster_pool typing.Optional[str] Specify cluster pool on which newly created execution should be placed.
execution_cluster_label typing.Optional[str] Specify label of cluster(s) on which newly created execution should be placed.
serialization_settings typing.Optional[SerializationSettings] Optionally provide serialization settings, in case the entity being run needs :return: FlyteWorkflowExecution object

execute_local_task()

def execute_local_task(
    entity: PythonTask,
    inputs: typing.Optional[typing.Dict[str, typing.Any]],
    project: str,
    domain: str,
    name: str,
    version: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    image_config: typing.Optional[ImageConfig],
    wait: bool,
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    options: typing.Optional[Options],
    serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteWorkflowExecution

Execute a @task-decorated function or TaskTemplate task.

Parameter Type Description
entity PythonTask local task entity.
inputs typing.Optional[typing.Dict[str, typing.Any]] register the task, which requires compiling the task, before running it.
project str The execution project, will default to the Remote’s default project.
domain str The execution domain, will default to the Remote’s default domain.
name str specific name of the task to run.
version str specific version of the task to run, default is a special string latest, which implies latest version by time
execution_name typing.Optional[str] If provided, will use this name for the execution.
execution_name_prefix typing.Optional[str] If provided, will use this prefix for the execution name.
image_config typing.Optional[ImageConfig] If provided, will use this image config in the pod.
wait bool If True, will wait for the execution to complete before returning.
overwrite_cache typing.Optional[bool] If True, will overwrite the cache.
interruptible typing.Optional[bool] Optional flag to override the default interruptible flag of the executed entity.
envs typing.Optional[typing.Dict[str, str]] Environment variables to set for the execution.
tags typing.Optional[typing.List[str]] Tags to set for the execution.
cluster_pool typing.Optional[str] Specify cluster pool on which newly created execution should be placed.
execution_cluster_label typing.Optional[str] Specify label of cluster(s) on which newly created execution should be placed.
options typing.Optional[Options] Options to customize the execution.
serialization_settings typing.Optional[SerializationSettings] If the task needs to be registered, this can be passed in. :return: FlyteWorkflowExecution object.

execute_local_workflow()

def execute_local_workflow(
    entity: WorkflowBase,
    inputs: typing.Optional[typing.Dict[str, typing.Any]],
    project: str,
    domain: str,
    name: str,
    version: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    image_config: typing.Optional[ImageConfig],
    options: typing.Optional[Options],
    wait: bool,
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
    serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteWorkflowExecution

Execute an @workflow decorated function.

Parameter Type Description
entity WorkflowBase The workflow to execute
inputs typing.Optional[typing.Dict[str, typing.Any]] Input dictionary
project str Project to execute in
domain str Domain to execute in
name str Optional name override for the workflow
version str Optional version for the workflow
execution_name typing.Optional[str] Optional name for the execution
execution_name_prefix typing.Optional[str]
image_config typing.Optional[ImageConfig] Optional image config override
options typing.Optional[Options] Optional Options object
wait bool Whether to wait for execution completion
overwrite_cache typing.Optional[bool] If True, will overwrite the cache
interruptible typing.Optional[bool] Optional flag to override the default interruptible flag of the executed entity
envs typing.Optional[typing.Dict[str, str]] Environment variables to set for the execution
tags typing.Optional[typing.List[str]] Tags to set for the execution
cluster_pool typing.Optional[str] Specify cluster pool on which newly created execution should be placed
execution_cluster_label typing.Optional[str] Specify label of cluster(s) on which newly created execution should be placed
serialization_settings typing.Optional[SerializationSettings] Optionally provide serialization settings, in case the entity being run needs to be registered :return: FlyteWorkflowExecution object

execute_reference_launch_plan()

def execute_reference_launch_plan(
    entity: ReferenceLaunchPlan,
    inputs: typing.Optional[typing.Dict[str, typing.Any]],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution

Execute a ReferenceLaunchPlan.

Parameter Type Description
entity ReferenceLaunchPlan
inputs typing.Optional[typing.Dict[str, typing.Any]]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_reference_task()

def execute_reference_task(
    entity: ReferenceTask,
    inputs: typing.Optional[typing.Dict[str, typing.Any]],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution

Execute a ReferenceTask.

Parameter Type Description
entity ReferenceTask
inputs typing.Optional[typing.Dict[str, typing.Any]]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_reference_workflow()

def execute_reference_workflow(
    entity: ReferenceWorkflow,
    inputs: typing.Optional[typing.Dict[str, typing.Any]],
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution

Execute a ReferenceWorkflow.

Parameter Type Description
entity ReferenceWorkflow
inputs typing.Optional[typing.Dict[str, typing.Any]]
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_remote_task_lp()

def execute_remote_task_lp(
    entity: typing.Union[FlyteTask, FlyteLaunchPlan],
    inputs: typing.Optional[typing.Dict[str, typing.Any]],
    project: str,
    domain: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution

Execute a FlyteTask, or FlyteLaunchplan.

NOTE: the name and version arguments are currently not used and only there consistency in the function signature

Parameter Type Description
entity typing.Union[FlyteTask, FlyteLaunchPlan]
inputs typing.Optional[typing.Dict[str, typing.Any]]
project str
domain str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

execute_remote_wf()

def execute_remote_wf(
    entity: FlyteWorkflow,
    inputs: typing.Optional[typing.Dict[str, typing.Any]],
    project: str,
    domain: str,
    execution_name: typing.Optional[str],
    execution_name_prefix: typing.Optional[str],
    options: typing.Optional[Options],
    wait: bool,
    type_hints: typing.Optional[typing.Dict[str, typing.Type]],
    overwrite_cache: typing.Optional[bool],
    interruptible: typing.Optional[bool],
    envs: typing.Optional[typing.Dict[str, str]],
    tags: typing.Optional[typing.List[str]],
    cluster_pool: typing.Optional[str],
    execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecution

Execute a FlyteWorkflow.

NOTE: the name and version arguments are currently not used and only there consistency in the function signature

Parameter Type Description
entity FlyteWorkflow
inputs typing.Optional[typing.Dict[str, typing.Any]]
project str
domain str
execution_name typing.Optional[str]
execution_name_prefix typing.Optional[str]
options typing.Optional[Options]
wait bool
type_hints typing.Optional[typing.Dict[str, typing.Type]]
overwrite_cache typing.Optional[bool]
interruptible typing.Optional[bool]
envs typing.Optional[typing.Dict[str, str]]
tags typing.Optional[typing.List[str]]
cluster_pool typing.Optional[str]
execution_cluster_label typing.Optional[str]

fast_package()

def fast_package(
    root: os.PathLike,
    deref_symlinks: bool,
    output: str,
    options: typing.Optional[FastPackageOptions],
) -> typing.Tuple[bytes, str]

Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location

Parameter Type Description
root os.PathLike path to the root of the package system that should be uploaded
deref_symlinks bool if symlinks should be dereferenced. Defaults to True
output str output path. Optional, will default to a tempdir
options typing.Optional[FastPackageOptions] additional options to customize fast_package behavior :return: md5_bytes, url

fast_register_workflow()

def fast_register_workflow(
    entity: WorkflowBase,
    serialization_settings: typing.Optional[SerializationSettings],
    version: typing.Optional[str],
    default_launch_plan: typing.Optional[bool],
    options: typing.Optional[Options],
    fast_package_options: typing.Optional[FastPackageOptions],
) -> FlyteWorkflow

Use this method to register a workflow with zip mode.

Parameter Type Description
entity WorkflowBase The workflow to be registered
serialization_settings typing.Optional[SerializationSettings] The serialization settings to be used
version typing.Optional[str] version for the entity to be registered as
default_launch_plan typing.Optional[bool] This should be true if a default launch plan should be created for the workflow
options typing.Optional[Options] Additional execution options that can be configured for the default launchplan
fast_package_options typing.Optional[FastPackageOptions] Options to customize copying behavior :return:

fetch_active_launchplan()

def fetch_active_launchplan(
    project: str,
    domain: str,
    name: str,
) -> typing.Optional[FlyteLaunchPlan]

Returns the active version of the launch plan if it exists or returns None

Parameter Type Description
project str
domain str
name str

fetch_execution()

def fetch_execution(
    project: str,
    domain: str,
    name: str,
) -> FlyteWorkflowExecution

Fetch a workflow execution entity from flyte admin.

Parameter Type Description
project str fetch entity from this project. If None, uses the default_project attribute.
domain str fetch entity from this domain. If None, uses the default_domain attribute.
name str fetch entity with matching name. :returns: :class:~flytekit.remote.workflow_execution.FlyteWorkflowExecution :raises: FlyteAssertion if name is None

fetch_launch_plan()

def fetch_launch_plan(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> FlyteLaunchPlan

Fetch a launchplan entity from flyte admin.

Parameter Type Description
project str fetch entity from this project. If None, uses the default_project attribute.
domain str fetch entity from this domain. If None, uses the default_domain attribute.
name str fetch entity with matching name.
version str fetch entity with matching version. If None, gets the latest version of the entity. :returns: :class:~flytekit.remote.launch_plan.FlyteLaunchPlan :raises: FlyteAssertion if name is None

fetch_task()

def fetch_task(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> FlyteTask

Fetch a task entity from flyte admin.

Parameter Type Description
project str fetch entity from this project. If None, uses the default_project attribute.
domain str fetch entity from this domain. If None, uses the default_domain attribute.
name str fetch entity with matching name.
version str fetch entity with matching version. If None, gets the latest version of the entity. :returns: :class:~flytekit.remote.tasks.task.FlyteTask :raises: FlyteAssertion if name is None

fetch_task_lazy()

def fetch_task_lazy(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> LazyEntity

Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.

Parameter Type Description
project str
domain str
name str
version str

fetch_workflow()

def fetch_workflow(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> FlyteWorkflow

Fetch a workflow entity from flyte admin.

Parameter Type Description
project str fetch entity from this project. If None, uses the default_project attribute.
domain str fetch entity from this domain. If None, uses the default_domain attribute.
name str fetch entity with matching name.
version str fetch entity with matching version. If None, gets the latest version of the entity. :raises: FlyteAssertion if name is None

fetch_workflow_lazy()

def fetch_workflow_lazy(
    project: str,
    domain: str,
    name: str,
    version: str,
) -> LazyEntity[FlyteWorkflow]

Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.

Parameter Type Description
project str
domain str
name str
version str

find_launch_plan()

def find_launch_plan(
    lp_ref: id_models,
    node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
)
Parameter Type Description
lp_ref id_models
node_launch_plans Dict[id_models, launch_plan_models.LaunchPlanSpec]

find_launch_plan_for_node()

def find_launch_plan_for_node(
    node: Node,
    node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
)
Parameter Type Description
node Node
node_launch_plans Dict[id_models, launch_plan_models.LaunchPlanSpec]

for_endpoint()

def for_endpoint(
    endpoint: str,
    insecure: bool,
    data_config: typing.Optional[DataConfig],
    config_file: typing.Union[str, ConfigFile],
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: bool,
    kwargs,
) -> 'FlyteRemote'
Parameter Type Description
endpoint str
insecure bool
data_config typing.Optional[DataConfig]
config_file typing.Union[str, ConfigFile]
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled bool
kwargs **kwargs

for_sandbox()

def for_sandbox(
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    interactive_mode_enabled: bool,
    kwargs,
) -> 'FlyteRemote'
Parameter Type Description
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
interactive_mode_enabled bool
kwargs **kwargs

from_api_key()

def from_api_key(
    api_key: str,
    default_project: typing.Optional[str],
    default_domain: typing.Optional[str],
    data_upload_location: str,
    kwargs,
) -> 'UnionRemote'

Call this if you want to directly instantiate a UnionRemote from an API key

Parameter Type Description
api_key str
default_project typing.Optional[str]
default_domain typing.Optional[str]
data_upload_location str
kwargs **kwargs

generate_console_http_domain()

def generate_console_http_domain()

This should generate the domain where console is hosted.

:return:

generate_console_url()

def generate_console_url(
    entity: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, FlyteLaunchPlan, Artifact],
)

Generate a UnionAI console URL for the given Flyte remote endpoint. It will also handle Union AI specific entities like Artifacts.

This will automatically determine if this is an execution or an entity and change the type automatically.

Parameter Type Description
entity typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, FlyteLaunchPlan, Artifact]

get()

def get(
    uri: typing.Optional[str],
) -> typing.Optional[typing.Union[LiteralsResolver, Literal, bytes]]

General function that works with flyte tiny urls. This can return outputs (in the form of LiteralsResolver, or individual Literals for singular requests), or HTML if passed a deck link, or bytes containing HTML, if ipython is not available locally.

Parameter Type Description
uri typing.Optional[str]

get_artifact()

def get_artifact(
    uri: typing.Optional[str],
    artifact_key: typing.Optional[art_id.ArtifactKey],
    artifact_id: typing.Optional[art_id.ArtifactID],
    query: typing.Optional[typing.Union[art_id.ArtifactQuery, ArtifactQuery]],
    get_details: bool,
) -> typing.Optional[Artifact]

Get the specified artifact.

Parameter Type Description
uri typing.Optional[str] An artifact URI.
artifact_key typing.Optional[art_id.ArtifactKey] An artifact key.
artifact_id typing.Optional[art_id.ArtifactID] The artifact ID.
query typing.Optional[typing.Union[art_id.ArtifactQuery, ArtifactQuery]] An artifact query.
get_details bool A bool to indicate whether or not to return artifact details. :return: The artifact as persisted in the service.

get_domains()

def get_domains()

Lists registered domains from flyte admin.

:returns: typing.List[flytekit.models.domain.Domain]

get_execution_metrics()

def get_execution_metrics(
    id: WorkflowExecutionIdentifier,
    depth: int,
) -> FlyteExecutionSpan

Get the metrics for a given execution.

Parameter Type Description
id WorkflowExecutionIdentifier
depth int

get_extra_headers_for_protocol()

def get_extra_headers_for_protocol(
    native_url,
)
Parameter Type Description
native_url

launch_backfill()

def launch_backfill(
    project: str,
    domain: str,
    from_date: datetime,
    to_date: datetime,
    launchplan: str,
    launchplan_version: str,
    execution_name: str,
    version: str,
    dry_run: bool,
    execute: bool,
    parallel: bool,
    failure_policy: typing.Optional[WorkflowFailurePolicy],
    overwrite_cache: typing.Optional[bool],
) -> typing.Optional[FlyteWorkflowExecution, FlyteWorkflow, WorkflowBase]

Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified, then the latest launchplan is retrieved. The from_date is exclusive and end_date is inclusive and backfill run for all instances in between. :: -> (start_date - exclusive, end_date inclusive)

If dry_run is specified, the workflow is created and returned. If execute==False is specified then the workflow is created and registered. In the last case, the workflow is created, registered and executed.

The parallel flag can be used to generate a workflow where all launchplans can be run in parallel. Default is that execute backfill is run sequentially

Parameter Type Description
project str str project name
domain str str domain name
from_date datetime datetime generate a backfill starting at this datetime (exclusive)
to_date datetime datetime generate a backfill ending at this datetime (inclusive)
launchplan str str launchplan name in the flyte backend
launchplan_version str str (optional) version for the launchplan. If not specified the most recent will be retrieved
execution_name str str (optional) the generated execution will be named so. this can help in ensuring idempotency
version str str (optional) version to be used for the newly created workflow.
dry_run bool bool do not register or execute the workflow
execute bool bool Register and execute the wwkflow.
parallel bool if the backfill should be run in parallel. False (default) will run each bacfill sequentially.
failure_policy typing.Optional[WorkflowFailurePolicy] WorkflowFailurePolicy (optional) to be used for the newly created workflow. This can control failure behavior - whether to continue on failure or stop immediately on failure
overwrite_cache typing.Optional[bool] if True, will overwrite the cache. :return: In case of dry-run, return WorkflowBase, else if no_execute return FlyteWorkflow else in the default case return a FlyteWorkflowExecution

list_projects()

def list_projects(
    limit: typing.Optional[int],
    filters: typing.Optional[typing.List[filter_models.Filter]],
    sort_by: typing.Optional[admin_common_models.Sort],
) -> typing.List[Project]

Lists registered projects from flyte admin.

Parameter Type Description
limit typing.Optional[int] [Optional[int]] The maximum number of entries to return.
filters typing.Optional[typing.List[filter_models.Filter]]
sort_by typing.Optional[admin_common_models.Sort]

list_signals()

def list_signals(
    execution_name: str,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    limit: int,
    filters: typing.Optional[typing.List[filter_models.Filter]],
) -> typing.List[Signal]
Parameter Type Description
execution_name str The name of the execution. This is the tailend of the URL when looking at the workflow execution.
project typing.Optional[str] The execution project, will default to the Remote’s default project.
domain typing.Optional[str] The execution domain, will default to the Remote’s default domain.
limit int The number of signals to fetch
filters typing.Optional[typing.List[filter_models.Filter]] Optional list of filters

list_tasks_by_version()

def list_tasks_by_version(
    version: str,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    limit: typing.Optional[int],
) -> typing.List[FlyteTask]
Parameter Type Description
version str
project typing.Optional[str]
domain typing.Optional[str]
limit typing.Optional[int]

raw_register()

def raw_register(
    cp_entity: FlyteControlPlaneEntity,
    settings: SerializationSettings,
    version: str,
    create_default_launchplan: bool,
    options: Options,
    og_entity: FlyteLocalEntity,
) -> typing.Optional[Identifier]

Raw register method, can be used to register control plane entities. Usually if you have a Flyte Entity like a WorkflowBase, Task, LaunchPlan then use other methods. This should be used only if you have already serialized entities

Parameter Type Description
cp_entity FlyteControlPlaneEntity The controlplane “serializable” version of a flyte entity. This is in the form that FlyteAdmin understands.
settings SerializationSettings SerializationSettings to be used for registration - especially to identify the id
version str Version to be registered
create_default_launchplan bool boolean that indicates if a default launch plan should be created
options Options Options to be used if registering a default launch plan
og_entity FlyteLocalEntity Pass in the original workflow (flytekit type) if create_default_launchplan is true :return: Identifier of the created entity

recent_executions()

def recent_executions(
    project: typing.Optional[str],
    domain: typing.Optional[str],
    limit: typing.Optional[int],
    filters: typing.Optional[typing.List[filter_models.Filter]],
) -> typing.List[FlyteWorkflowExecution]
Parameter Type Description
project typing.Optional[str]
domain typing.Optional[str]
limit typing.Optional[int]
filters typing.Optional[typing.List[filter_models.Filter]]

register_launch_plan()

def register_launch_plan(
    entity: LaunchPlan,
    version: typing.Optional[str],
    project: typing.Optional[str],
    domain: typing.Optional[str],
    options: typing.Optional[Options],
    serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteLaunchPlan

Register a given launchplan, possibly applying overrides from the provided options. If the underlying workflow is not already registered, it, along with any underlying entities, will also be registered. If the underlying workflow does exist (with the given project/domain/version), then only the launchplan will be registered.

Parameter Type Description
entity LaunchPlan Launchplan to be registered
version typing.Optional[str] Version to be registered for the launch plan, and used to check (and register) underlying wf
project typing.Optional[str] Optionally provide a project, if not already provided in flyteremote constructor or a separate one
domain typing.Optional[str] Optionally provide a domain, if not already provided in FlyteRemote constructor or a separate one
options typing.Optional[Options]
serialization_settings typing.Optional[SerializationSettings] Optionally provide serialization settings, if not provided, will use the default

register_script()

def register_script(
    entity: typing.Union[WorkflowBase, PythonTask, LaunchPlan],
    image_config: typing.Optional[ImageConfig],
    version: typing.Optional[str],
    project: typing.Optional[str],
    domain: typing.Optional[str],
    destination_dir: str,
    copy_all: bool,
    default_launch_plan: bool,
    options: typing.Optional[Options],
    source_path: typing.Optional[str],
    module_name: typing.Optional[str],
    envs: typing.Optional[typing.Dict[str, str]],
    default_resources: typing.Optional[ResourceSpec],
    fast_package_options: typing.Optional[FastPackageOptions],
) -> typing.Union[FlyteWorkflow, FlyteTask, FlyteLaunchPlan, ReferenceEntity]

Use this method to register a workflow via script mode.

Parameter Type Description
entity typing.Union[WorkflowBase, PythonTask, LaunchPlan] The workflow to be registered or the task to be registered
image_config typing.Optional[ImageConfig] The image config to use for the workflow.
version typing.Optional[str] version for the entity to be registered as
project typing.Optional[str] The project to register the workflow in.
domain typing.Optional[str] The domain to register the workflow in.
destination_dir str The destination directory where the workflow will be copied to.
copy_all bool [deprecated] Please use the copy_style field in fast_package_options instead.
default_launch_plan bool This should be true if a default launch plan should be created for the workflow
options typing.Optional[Options] Additional execution options that can be configured for the default launchplan
source_path typing.Optional[str] The root of the project path
module_name typing.Optional[str] the name of the module
envs typing.Optional[typing.Dict[str, str]] Environment variables to be passed to the serialization
default_resources typing.Optional[ResourceSpec] Default resources to be passed to the serialization. These override the resource spec for any tasks that have no statically defined resource requests and limits.
fast_package_options typing.Optional[FastPackageOptions] Options to customize copy_all behavior, ignored when copy_all is False. :return:

register_task()

def register_task(
    entity: PythonTask,
    serialization_settings: typing.Optional[SerializationSettings],
    version: typing.Optional[str],
) -> FlyteTask

Register a qualified task (PythonTask) with Remote For any conflicting parameters method arguments are regarded as overrides

Parameter Type Description
entity PythonTask PythonTask can be either @task or a instance of a Task class
serialization_settings typing.Optional[SerializationSettings] Settings that will be used to override various serialization parameters.
version typing.Optional[str] version that will be used to register. If not specified will default to using the serialization settings default :return:

register_workflow()

def register_workflow(
    entity: WorkflowBase,
    serialization_settings: typing.Optional[SerializationSettings],
    version: typing.Optional[str],
    default_launch_plan: typing.Optional[bool],
    options: typing.Optional[Options],
) -> FlyteWorkflow

Use this method to register a workflow.

Parameter Type Description
entity WorkflowBase The workflow to be registered
serialization_settings typing.Optional[SerializationSettings] The serialization settings to be used
version typing.Optional[str] version for the entity to be registered as
default_launch_plan typing.Optional[bool] This should be true if a default launch plan should be created for the workflow
options typing.Optional[Options] Additional execution options that can be configured for the default launchplan :return:

reject()

def reject(
    signal_id: str,
    execution_name: str,
    project: str,
    domain: str,
)
Parameter Type Description
signal_id str The name of the signal, this is the key used in the approve() or wait_for_input() call.
execution_name str The name of the execution. This is the tail-end of the URL when looking at the workflow execution.
project str The execution project, will default to the Remote’s default project.
domain str The execution domain, will default to the Remote’s default domain.

remote_context()

def remote_context()

Context manager with remote-specific configuration.

search_artifacts()

def search_artifacts(
    project: typing.Optional[str],
    domain: typing.Optional[str],
    name: typing.Optional[str],
    artifact_key: typing.Optional[art_id.ArtifactKey],
    query: typing.Optional[ArtifactQuery],
    partitions: typing.Optional[Union[Partitions, typing.Dict[str, str]]],
    time_partition: typing.Optional[Union[datetime.datetime, TimePartition]],
    group_by_key: bool,
    limit: int,
) -> typing.List[Artifact]
Parameter Type Description
project typing.Optional[str]
domain typing.Optional[str]
name typing.Optional[str]
artifact_key typing.Optional[art_id.ArtifactKey]
query typing.Optional[ArtifactQuery]
partitions typing.Optional[Union[Partitions, typing.Dict[str, str]]]
time_partition typing.Optional[Union[datetime.datetime, TimePartition]]
group_by_key bool
limit int

set_input()

def set_input(
    signal_id: str,
    execution_name: str,
    value: typing.Union[literal_models.Literal, typing.Any],
    project,
    domain,
    python_type,
    literal_type,
)
Parameter Type Description
signal_id str The name of the signal, this is the key used in the approve() or wait_for_input() call.
execution_name str The name of the execution. This is the tail-end of the URL when looking at the workflow execution.
value typing.Union[literal_models.Literal, typing.Any] This is either a Literal or a Python value which FlyteRemote will invoke the TypeEngine to convert into a Literal. This argument is only value for wait_for_input type signals.
project The execution project, will default to the Remote’s default project.
domain The execution domain, will default to the Remote’s default domain.
python_type Provide a python type to help with conversion if the value you provided is not a Literal.
literal_type Provide a Flyte literal type to help with conversion if the value you provided is not a Literal

set_signal()

def set_signal(
    signal_id: str,
    execution_name: str,
    value: typing.Union[literal_models.Literal, typing.Any],
    project: typing.Optional[str],
    domain: typing.Optional[str],
    python_type: typing.Optional[typing.Type],
    literal_type: typing.Optional[type_models.LiteralType],
)
Parameter Type Description
signal_id str The name of the signal, this is the key used in the approve() or wait_for_input() call.
execution_name str The name of the execution. This is the tail-end of the URL when looking at the workflow execution.
value typing.Union[literal_models.Literal, typing.Any] This is either a Literal or a Python value which FlyteRemote will invoke the TypeEngine to convert into a Literal. This argument is only value for wait_for_input type signals.
project typing.Optional[str] The execution project, will default to the Remote’s default project.
domain typing.Optional[str] The execution domain, will default to the Remote’s default domain.
python_type typing.Optional[typing.Type] Provide a python type to help with conversion if the value you provided is not a Literal.
literal_type typing.Optional[type_models.LiteralType] Provide a Flyte literal type to help with conversion if the value you provided is not a Literal

stop_app()

def stop_app(
    name: str,
    project: Optional[str],
    domain: Optional[str],
)

Stop an application.

Parameter Type Description
name str Name of application to stop.
project Optional[str] Domain name. If None, uses default_domain. :return: The App IDL for the stopped application.
domain Optional[str]

stream_execution_events()

def stream_execution_events(
    event_count: Optional[int],
    include_workflow_executions: bool,
    include_task_executions: bool,
    include_node_executions: bool,
) -> AsyncGenerator[Union[CloudEventWorkflowExecution, CloudEventNodeExecution, CloudEventTaskExecution], None]

Stream execution events from the given tenant. This is a generator that yields events as they are received.

Events are guaranteed to be delivered at least once, and clients must implement handling for potentially out-of-order event processing. Events will be retransmitted until acknowledged, with acknowledgment occurring automatically upon normal return from the caller. Note: if an exception is raised during event processing, the acknowledgment will not occur, and the event will be redelivered in a subsequent transmission.

Parameter Type Description
event_count Optional[int] Number of events to receive before closing the stream. If None, receive unlimited events.
include_workflow_executions bool Whether to include workflow execution events
include_task_executions bool Whether to include task execution events
include_node_executions bool Whether to include node execution events

sync()

def sync(
    execution: FlyteWorkflowExecution,
    entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
    sync_nodes: bool,
) -> FlyteWorkflowExecution

This function was previously a singledispatchmethod. We’ve removed that but this function remains so that we don’t break people.

Parameter Type Description
execution FlyteWorkflowExecution
entity_definition typing.Union[FlyteWorkflow, FlyteTask]
sync_nodes bool By default sync will fetch data on all underlying node executions (recursively, so subworkflows and launch plans will also get picked up). Set this to False in order to prevent that (which will make this call faster). :return: Returns the same execution object, but with additional information pulled in.

sync_execution()

def sync_execution(
    execution: FlyteWorkflowExecution,
    entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
    sync_nodes: bool,
) -> FlyteWorkflowExecution

Sync a FlyteWorkflowExecution object with its corresponding remote state.

Parameter Type Description
execution FlyteWorkflowExecution
entity_definition typing.Union[FlyteWorkflow, FlyteTask]
sync_nodes bool

sync_node_execution()

def sync_node_execution(
    execution: FlyteNodeExecution,
    node_mapping: typing.Dict[str, FlyteNode],
) -> FlyteNodeExecution

Get data backing a node execution. These FlyteNodeExecution objects should’ve come from Admin with the model fields already populated correctly. For purposes of the remote experience, we’d like to supplement the object with some additional fields:

  • inputs/outputs
  • task/workflow executions, and/or underlying node executions in the case of parent nodes
  • TypedInterface (remote wrapper type)

A node can have several different types of executions behind it. That is, the node could’ve run (perhaps multiple times because of retries):

  • A task
  • A static subworkflow
  • A dynamic subworkflow (which in turn may have run additional tasks, subwfs, and/or launch plans)
  • A launch plan

The data model is complicated, so ascertaining which of these happened is a bit tricky. That logic is encapsulated in this function.

Parameter Type Description
execution FlyteNodeExecution
node_mapping typing.Dict[str, FlyteNode]

sync_task_execution()

def sync_task_execution(
    execution: FlyteTaskExecution,
    entity_interface: typing.Optional[TypedInterface],
    get_task_exec_data: bool,
) -> FlyteTaskExecution

Sync a FlyteTaskExecution object with its corresponding remote state.

Parameter Type Description
execution FlyteTaskExecution
entity_interface typing.Optional[TypedInterface]
get_task_exec_data bool

terminate()

def terminate(
    execution: FlyteWorkflowExecution,
    cause: str,
)

Terminate a workflow execution.

Parameter Type Description
execution FlyteWorkflowExecution workflow execution to terminate
cause str reason for termination

upload_file()

def upload_file(
    to_upload: pathlib.Path,
    project: typing.Optional[str],
    domain: typing.Optional[str],
    filename_root: typing.Optional[str],
) -> typing.Tuple[bytes, str]

Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.

Parameter Type Description
to_upload pathlib.Path Must be a single file
project typing.Optional[str] Project to upload under, if not supplied will use the remote’s default
domain typing.Optional[str] Domain to upload under, if not specified will use the remote’s default
filename_root typing.Optional[str] If provided will be used as the root of the filename. If not, Admin will use a hash :return: The uploaded location.

wait()

def wait(
    execution: FlyteWorkflowExecution,
    timeout: typing.Optional[typing.Union[timedelta, int]],
    poll_interval: typing.Optional[typing.Union[timedelta, int]],
    sync_nodes: bool,
) -> FlyteWorkflowExecution

Wait for an execution to finish.

Parameter Type Description
execution FlyteWorkflowExecution execution object to wait on
timeout typing.Optional[typing.Union[timedelta, int]] maximum amount of time to wait. It can be a timedelta or a duration in seconds as int.
poll_interval typing.Optional[typing.Union[timedelta, int]] sync workflow execution at this interval. It can be a timedelta or a duration in seconds as int.
sync_nodes bool passed along to the sync call for the workflow execution

Properties

Property Type Description
apps_service_client
artifacts_client
authorizer_service_client
client
Return a SynchronousFlyteClient for additional operations.
config
Image config.
context
default_domain
Default project to use when fetching or executing flyte entities.
default_project
Default project to use when fetching or executing flyte entities.
file_access
File access provider to use for offloading non-literal inputs/outputs.
hooks_async_client
hooks_sync_client
images_client
interactive_mode_enabled
If set to True, the FlyteRemote will pickle the task/workflow.
secret_client
sync_channel
Return channel from client. This channel already has the org passed in dynamically by the interceptor.
user_service_client
users_client

union.VersionParameters

Parameters used for version hash generation.

param func: The function to generate a version for. This is an optional parameter and can be any callable that matches the specified parameter and return types. :type func: Optional[Callable[P, FuncOut]]

class VersionParameters(
    func: typing.Callable[~P, ~FuncOut],
    container_image: typing.Union[str, flytekit.image_spec.image_spec.ImageSpec, NoneType],
    pod_template: typing.Optional[flytekit.core.pod_template.PodTemplate],
    pod_template_name: typing.Optional[str],
)
Parameter Type Description
func typing.Callable[~P, ~FuncOut]
container_image typing.Union[str, flytekit.image_spec.image_spec.ImageSpec, NoneType]
pod_template typing.Optional[flytekit.core.pod_template.PodTemplate]
pod_template_name typing.Optional[str]