union
Directory
Classes
| Class | Description |
|---|---|
ActorEnvironment |
ActorEnvironment class. |
Artifact |
This is a wrapper around the Flytekit Artifact class. |
Cache |
Cache configuration for a task. |
ContainerTask |
This is an intermediate class that represents Flyte Tasks that run a container at execution time. |
Deck |
Deck enable users to get customizable and default visibility into their tasks. |
FlyteDirectory |
|
FlyteFile |
|
ImageSpec |
This class is used to specify the docker image that will be used to run the task. |
LaunchPlan |
Launch Plans are one of the core constructs of Flyte. |
PodTemplate |
Custom PodTemplate specification for a Task. |
Resources |
This class is used to specify both resource requests and resource limits. |
Secret |
See :std:ref:cookbook:secrets for usage examples. |
StructuredDataset |
This is the user facing StructuredDataset class. |
UnionRemote |
Main entrypoint for programmatically accessing a Flyte remote backend. |
VersionParameters |
Parameters used for version hash generation. |
Protocols
| Protocol | Description |
|---|---|
CachePolicy |
Base class for protocol classes. |
Methods
| Method | Description |
|---|---|
actor_cache() |
Cache function between actor executions. |
current_context() |
Use this method to get a handle of specific parameters available in a flyte task. |
map() |
Use to map over tasks, actors, launch plans, reference tasks and launch plans, and remote tasks and. |
map_task() |
Wrapper that creates a map task utilizing either the existing ArrayNodeMapTask. |
task() |
This is the core decorator to use for any task type in flytekit. |
workflow() |
This decorator declares a function to be a Flyte workflow. |
Methods
actor_cache()
def actor_cache(
f,
)Cache function between actor executions.
| Parameter | Type | Description |
|---|---|---|
f |
current_context()
def current_context()Use this method to get a handle of specific parameters available in a flyte task.
Usage
flytekit.current_context().logging.info(...)Available params are documented in flytekit.core.context_manager.ExecutionParams.
There are some special params, that should be available
map()
def map(
target: typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')],
bound_inputs: typing.Optional[typing.Dict[str, typing.Any]],
concurrency: typing.Optional[int],
min_successes: typing.Optional[int],
min_success_ratio: float,
kwargs,
)Use to map over tasks, actors, launch plans, reference tasks and launch plans, and remote tasks and launch plans.
| Parameter | Type | Description |
|---|---|---|
target |
typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')] |
The Flyte entity of which will be mapped over |
bound_inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
Inputs that are bound to the array node and will not be mapped over |
concurrency |
typing.Optional[int] |
If specified, this limits the number of mapped tasks than can run in parallel to the given batch size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until all inputs are processed. If set to 0, this means unbounded concurrency. If left unspecified, this means the array node will inherit parallelism from the workflow |
min_successes |
typing.Optional[int] |
The minimum number of successful executions |
min_success_ratio |
float |
The minimum ratio of successful executions |
kwargs |
**kwargs |
map_task()
def map_task(
target: typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')],
concurrency: typing.Optional[int],
min_successes: typing.Optional[int],
min_success_ratio: float,
kwargs,
)Wrapper that creates a map task utilizing either the existing ArrayNodeMapTask or the drop in replacement ArrayNode implementation
| Parameter | Type | Description |
|---|---|---|
target |
typing.Union[flytekit.core.launch_plan.LaunchPlan, flytekit.core.python_function_task.PythonFunctionTask, ForwardRef('FlyteLaunchPlan')] |
The Flyte entity of which will be mapped over |
concurrency |
typing.Optional[int] |
If specified, this limits the number of mapped tasks than can run in parallel to the given batch size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until all inputs are processed. If set to 0, this means unbounded concurrency. If left unspecified, this means the array node will inherit parallelism from the workflow |
min_successes |
typing.Optional[int] |
The minimum number of successful executions |
min_success_ratio |
float |
The minimum ratio of successful executions |
kwargs |
**kwargs |
task()
def task(
_task_function: Optional[Callable[P, FuncOut]],
task_config: Optional[T],
cache: Union[bool, Cache],
retries: int,
interruptible: Optional[bool],
deprecated: str,
timeout: Union[datetime.timedelta, int],
container_image: Optional[Union[str, ImageSpec]],
environment: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
secret_requests: Optional[List[Secret]],
execution_mode: PythonFunctionTask.ExecutionBehavior,
node_dependency_hints: Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]],
task_resolver: Optional[TaskResolverMixin],
docs: Optional[Documentation],
disable_deck: Optional[bool],
enable_deck: Optional[bool],
deck_fields: Optional[Tuple[DeckField, ...]],
pod_template: Optional['PodTemplate'],
pod_template_name: Optional[str],
accelerator: Optional[BaseAccelerator],
pickle_untyped: bool,
shared_memory: Optional[Union[L[True], str]],
resources: Optional[Resources],
labels: Optional[dict[str, str]],
annotations: Optional[dict[str, str]],
kwargs,
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T]]This is the core decorator to use for any task type in flytekit.
Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties
- Versioned (usually tied to the git revision SHA1)
- Strong interfaces (specified inputs and outputs)
- Declarative
- Independently executable
- Unit testable
For a simple python task,
@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
...For specific task types
@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
...Please see some cookbook :std:ref:task examples <cookbook:tasks> for additional information.
| Parameter | Type | Description |
|---|---|---|
_task_function |
Optional[Callable[P, FuncOut]] |
This argument is implicitly passed and represents the decorated function |
task_config |
Optional[T] |
This argument provides configuration for a specific task types. Please refer to the plugins documentation for the right object to use. |
cache |
Union[bool, Cache] |
Boolean or Cache that indicates how caching is configured. :deprecated param cache_serialize: (deprecated - please use Cache) Boolean that indicates if identical (ie. same inputs) instances of this task should be executed in serial when caching is enabled. This means that given multiple concurrent executions over identical inputs, only a single instance executes and the rest wait to reuse the cached results. This parameter does nothing without also setting the cache parameter. :deprecated param cache_version: (deprecated - please use Cache) Cache version to use. Changes to the task signature will automatically trigger a cache miss, but you can always manually update this field as well to force a cache miss. You should also manually bump this version if the function body/business logic has changed, but the signature hasn’t. :deprecated param cache_ignore_input_vars: (deprecated - please use Cache) Input variables that should not be included when calculating hash for cache. |
retries |
int |
Number of times to retry this task during a workflow execution. |
interruptible |
Optional[bool] |
[Optional] Boolean that indicates that this task can be interrupted and/or scheduled on nodes with lower QoS guarantees. This will directly reduce the $/execution cost associated, at the cost of performance penalties due to potential interruptions. Requires additional Flyte platform level configuration. If no value is provided, the task will inherit this attribute from its workflow, as follows: No values set for interruptible at the task or workflow level - task is not interruptible Task has interruptible=True, but workflow has no value set - task is interruptible Workflow has interruptible=True, but task has no value set - task is interruptible Workflow has interruptible=False, but task has interruptible=True - task is interruptible Workflow has interruptible=True, but task has interruptible=False - task is not interruptible |
deprecated |
str |
A string that can be used to provide a warning message for deprecated task. Absence / empty str indicates that the task is active and not deprecated |
timeout |
Union[datetime.timedelta, int] |
the max amount of time for which one execution of this task should be executed for. The execution will be terminated if the runtime exceeds the given timeout (approximately). |
container_image |
Optional[Union[str, ImageSpec]] |
By default the configured FLYTE_INTERNAL_IMAGE is used for every task. This directive can be used to provide an alternate image for a specific task. This is useful for the cases in which images bloat because of various dependencies and a dependency is only required for this or a set of tasks, and they vary from the default. python # Use default image name `fqn` and alter the tag to `tag-{{default.tag}}` tag of the default image # with a prefix. In this case, it is assumed that the image like # flytecookbook:tag-gitsha is published alongwith the default of flytecookbook:gitsha @task(container_image='{{.images.default.fqn}}:tag-{{images.default.tag}}') def foo(): ... # Refer to configurations to configure fqns for other images besides default. In this case it will # lookup for an image named xyz @task(container_image='{{.images.xyz.fqn}}:{{images.default.tag}}') def foo2(): ... |
environment |
Optional[Dict[str, str]] |
Environment variables that should be added for this tasks execution |
requests |
Optional[Resources] |
Specify compute resource requests for your task. For Pod-plugin tasks, these values will apply only to the primary container. |
limits |
Optional[Resources] |
Compute limits. Specify compute resource limits for your task. For Pod-plugin tasks, these values will apply only to the primary container. For more information, please see {{< py_class_ref flytekit.Resources >}}. |
secret_requests |
Optional[List[Secret]] |
Keys that can identify the secrets supplied at runtime. Ideally the secret keys should also be semi-descriptive. The key values will be available from runtime, if the backend is configured to provide secrets and if secrets are available in the configured secrets store. Possible options for secret stores are - Vault, Confidant, Kube secrets, AWS KMS etc Refer to {{< py_class_ref Secret >}} to understand how to specify the request for a secret. It may change based on the backend provider. > [!NOTE] > During local execution, the secrets will be pulled from the local environment variables with the format {GROUP}_{GROUP_VERSION}_{KEY}, where all the characters are capitalized and the prefix is not used. |
execution_mode |
PythonFunctionTask.ExecutionBehavior |
This is mainly for internal use. Please ignore. It is filled in automatically. |
node_dependency_hints |
Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]] |
A list of tasks, launchplans, or workflows that this task depends on. This is only for dynamic tasks/workflows, where flyte cannot automatically determine the dependencies prior to runtime. Even on dynamic tasks this is optional, but in some scenarios it will make registering the workflow easier, because it allows registration to be done the same as for static tasks/workflows. For example this is useful to run launchplans dynamically, because launchplans must be registered on flyteadmin before they can be run. Tasks and workflows do not have this requirement. python @workflow def workflow0(): ... launchplan0 = LaunchPlan.get_or_create(workflow0) # Specify node_dependency_hints so that launchplan0 will be registered on flyteadmin, despite this being a # dynamic task. @dynamic(node_dependency_hints=[launchplan0]) def launch_dynamically(): # To run a sub-launchplan it must have previously been registered on flyteadmin. return [launchplan0]*10 |
task_resolver |
Optional[TaskResolverMixin] |
Provide a custom task resolver. |
docs |
Optional[Documentation] |
Documentation about this task |
disable_deck |
Optional[bool] |
If true, this task will not output deck html file |
enable_deck |
Optional[bool] |
If true, this task will output deck html file |
deck_fields |
Optional[Tuple[DeckField, ...]] |
If specified and enble_deck is True, this task will output deck html file with the fields specified in the tuple |
pod_template |
Optional['PodTemplate'] |
Custom PodTemplate for this task. |
pod_template_name |
Optional[str] |
The name of the existing PodTemplate resource which will be used in this task. |
accelerator |
Optional[BaseAccelerator] |
The accelerator to use for this task. |
pickle_untyped |
bool |
Boolean that indicates if the task allows unspecified data types. |
shared_memory |
Optional[Union[L[True], str]] |
If True, then shared memory will be attached to the container where the size is equal to the allocated memory. If int, then the shared memory is set to that size. |
resources |
Optional[Resources] |
Specify both the request and the limit. When the value is set to a tuple or list, the first value is the request and the second value is the limit. If the value is a single value, then both the requests and limit is set to that value. For example, the Resource(cpu=("1", "2"), mem="1Gi") will set the cpu request to 1, cpu limit to 2, and mem request to 1Gi. |
labels |
Optional[dict[str, str]] |
Labels to be applied to the task resource. |
annotations |
Optional[dict[str, str]] |
Annotations to be applied to the task resource. |
kwargs |
**kwargs |
workflow()
def workflow(
_workflow_function: Optional[Callable[P, FuncOut]],
failure_policy: Optional[WorkflowFailurePolicy],
interruptible: bool,
on_failure: Optional[Union[WorkflowBase, Task]],
docs: Optional[Documentation],
pickle_untyped: bool,
default_options: Optional[Options],
) -> Union[Callable[P, FuncOut], Callable[[Callable[P, FuncOut]], PythonFunctionWorkflow], PythonFunctionWorkflow]This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks.
Unlike a task, the function body of a workflow is evaluated at serialization-time (aka compile-time). This is because while we can determine the entire structure of a task by looking at the function’s signature, workflows need to run through the function itself because the body of the function is what expresses the workflow structure. It’s also important to note that, local execution notwithstanding, it is not evaluated again when the workflow runs on Flyte. That is, workflows should not call non-Flyte entities since they are only run once (again, this is with respect to the platform, local runs notwithstanding).
Example:
import os
import sys
import typing
from collections import OrderedDict
from unittest.mock import patch
import pytest
from typing_extensions import Annotated # type: ignore
import flytekit.configuration
from flytekit import FlyteContextManager, StructuredDataset, kwtypes
from flytekit.configuration import Image, ImageConfig
from flytekit.core import context_manager
from flytekit.core.condition import conditional
from flytekit.core.task import task
from flytekit.core.workflow import WorkflowFailurePolicy, WorkflowMetadata, WorkflowMetadataDefaults, workflow
from flytekit.exceptions.user import FlyteValidationException, FlyteValueException, FlyteMissingReturnValueException
from flytekit.tools.translator import get_serializable
from flytekit.types.error.error import FlyteError
default_img = Image(name="default", fqn="test", tag="tag")
serialization_settings = flytekit.configuration.SerializationSettings(
project="project",
domain="domain",
version="version",
env=None,
image_config=ImageConfig(default_image=default_img, images=[default_img]),
)
def test_metadata_values():
with pytest.raises(FlyteValidationException):
WorkflowMetadata(on_failure=0)
wm = WorkflowMetadata(on_failure=WorkflowFailurePolicy.FAIL_IMMEDIATELY)
assert wm.on_failure == WorkflowFailurePolicy.FAIL_IMMEDIATELY
def test_default_metadata_values():
with pytest.raises(FlyteValidationException):
WorkflowMetadataDefaults(3)
wm = WorkflowMetadataDefaults(interruptible=False)
assert wm.interruptible is False
def test_workflow_values():
@task
def t1(a: int) -> typing.NamedTuple("OutputsBC", [("t1_int_output", int), ("c", str)]):
a = a + 2
return a, "world-" + str(a)
@workflow(interruptible=True, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf(a: int) -> typing.Tuple[str, str]:
x, y = t1(a=a)
_, v = t1(a=x)
return y, v
wf_spec = get_serializable(OrderedDict(), serialization_settings, wf)
assert wf_spec.template.metadata_defaults.interruptible
assert wf_spec.template.metadata.on_failure == 1
def test_default_values():
@task
def t() -> bool:
return True
@task
def f() -> bool:
return False
@workflow
def wf(a: bool = True) -> bool:
return conditional("bool").if_(a.is_true()).then(t()).else_().then(f()) # type: ignore
assert wf() is True
assert wf(a=False) is False
def test_list_output_wf():
@task
def t1(a: int) -> int:
a = a + 5
return a
@workflow
def list_output_wf() -> typing.List[int]:
v = []
for i in range(2):
v.append(t1(a=i))
return v
x = list_output_wf()
assert x == [5, 6]
def test_sub_wf_single_named_tuple():
nt = typing.NamedTuple("SingleNamedOutput", [("named1", int)])
@task
def t1(a: int) -> nt:
a = a + 2
return nt(a)
@workflow
def subwf(a: int) -> nt:
return t1(a=a)
@workflow
def wf(b: int) -> nt:
out = subwf(a=b)
return t1(a=out.named1)
x = wf(b=3)
assert x == (7,)
def test_sub_wf_multi_named_tuple():
nt = typing.NamedTuple("Multi", [("named1", int), ("named2", int)])
@task
def t1(a: int) -> nt:
a = a + 2
return nt(a, a)
@workflow
def subwf(a: int) -> nt:
return t1(a=a)
@workflow
def wf(b: int) -> nt:
out = subwf(a=b)
return t1(a=out.named1)
x = wf(b=3)
assert x == (7, 7)
def test_sub_wf_varying_types():
@task
def t1l(
a: typing.List[typing.Dict[str, typing.List[int]]],
b: typing.Dict[str, typing.List[int]],
c: typing.Union[typing.List[typing.Dict[str, typing.List[int]]], typing.Dict[str, typing.List[int]], int],
d: int,
) -> str:
xx = ",".join([f"{k}:{v}" for d in a for k, v in d.items()])
yy = ",".join([f"{k}: {i}" for k, v in b.items() for i in v])
if isinstance(c, list):
zz = ",".join([f"{k}:{v}" for d in c for k, v in d.items()])
elif isinstance(c, dict):
zz = ",".join([f"{k}: {i}" for k, v in c.items() for i in v])
else:
zz = str(c)
return f"First: {xx} Second: {yy} Third: {zz} Int: {d}"
@task
def get_int() -> int:
return 1
@workflow
def subwf(
a: typing.List[typing.Dict[str, typing.List[int]]],
b: typing.Dict[str, typing.List[int]],
c: typing.Union[typing.List[typing.Dict[str, typing.List[int]]], typing.Dict[str, typing.List[int]]],
d: int,
) -> str:
return t1l(a=a, b=b, c=c, d=d)
@workflow
def wf() -> str:
ds = [
{"first_map_a": [42], "first_map_b": [get_int(), 2]},
{
"second_map_c": [33],
"second_map_d": [9, 99],
},
]
ll = {
"ll_1": [get_int(), get_int(), get_int()],
"ll_2": [4, 5, 6],
}
out = subwf(a=ds, b=ll, c=ds, d=get_int())
return out
wf.compile()
x = wf()
expected = (
"First: first_map_a:[42],first_map_b:[1, 2],second_map_c:[33],second_map_d:[9, 99] "
"Second: ll_1: 1,ll_1: 1,ll_1: 1,ll_2: 4,ll_2: 5,ll_2: 6 "
"Third: first_map_a:[42],first_map_b:[1, 2],second_map_c:[33],second_map_d:[9, 99] "
"Int: 1"
)
assert x == expected
wf_spec = get_serializable(OrderedDict(), serialization_settings, wf)
assert set(wf_spec.template.nodes[5].upstream_node_ids) == {"n2", "n1", "n0", "n4", "n3"}
@workflow
def wf() -> str:
ds = [
{"first_map_a": [42], "first_map_b": [get_int(), 2]},
{
"second_map_c": [33],
"second_map_d": [9, 99],
},
]
ll = {
"ll_1": [get_int(), get_int(), get_int()],
"ll_2": [4, 5, 6],
}
out = subwf(a=ds, b=ll, c=ll, d=get_int())
return out
x = wf()
expected = (
"First: first_map_a:[42],first_map_b:[1, 2],second_map_c:[33],second_map_d:[9, 99] "
"Second: ll_1: 1,ll_1: 1,ll_1: 1,ll_2: 4,ll_2: 5,ll_2: 6 "
"Third: ll_1: 1,ll_1: 1,ll_1: 1,ll_2: 4,ll_2: 5,ll_2: 6 "
"Int: 1"
)
assert x == expected
def test_unexpected_outputs():
@task
def t1(a: int) -> int:
a = a + 5
return a
@workflow
def no_outputs_wf():
return t1(a=3)
# Should raise an exception because the workflow returns something when it shouldn't
with pytest.raises(FlyteValueException):
no_outputs_wf()
@pytest.mark.skipif(sys.version_info < (3, 10, 10), reason="inspect module does not work correctly with Python <3.10.10. https://github.com/python/cpython/issues/102647#issuecomment-1466868212")
def test_missing_return_value():
@task
def t1(a: int) -> int:
a = a + 5
return a
# Should raise an exception because it doesn't return something when it should
with pytest.raises(FlyteMissingReturnValueException):
@workflow
def one_output_wf() -> int: # type: ignore
t1(a=3)
one_output_wf()
def test_custom_wrapper():
def our_task(
_task_function: typing.Optional[typing.Callable] = None,
**kwargs,
):
def wrapped(_func: typing.Callable):
return task(_task_function=_func)
if _task_function:
return wrapped(_task_function)
else:
return wrapped
@our_task(
foo={
"bar1": lambda x: print(x),
"bar2": lambda x: print(x),
},
)
def missing_func_body() -> str:
return "foo"
def test_wf_no_output():
@task
def t1(a: int) -> int:
a = a + 5
return a
@workflow
def no_outputs_wf():
t1(a=3)
assert no_outputs_wf() is None
def test_wf_nested_comp(exec_prefix):
@task
def t1(a: int) -> int:
a = a + 5
return a
@workflow
def outer() -> typing.Tuple[int, int]:
# You should not do this. This is just here for testing.
@workflow
def wf2() -> int:
return t1(a=5)
return t1(a=3), wf2()
assert (8, 10) == outer()
entity_mapping = OrderedDict()
model_wf = get_serializable(entity_mapping, serialization_settings, outer)
assert len(model_wf.template.interface.outputs) == 2
assert len(model_wf.template.nodes) == 2
assert model_wf.template.nodes[1].workflow_node is not None
sub_wf = model_wf.sub_workflows[0]
assert len(sub_wf.nodes) == 1
assert sub_wf.nodes[0].id == "n0"
assert sub_wf.nodes[0].task_node.reference_id.name == f"{exec_prefix}tests.flytekit.unit.core.test_workflows.t1"
@task
def add_5(a: int) -> int:
a = a + 5
return a
@workflow
def simple_wf() -> int:
return add_5(a=1)
@workflow
def my_wf_example(a: int) -> typing.Tuple[int, int]:
'''example
Workflows can have inputs and return outputs of simple or complex types.
'''
x = add_5(a=a)
# You can use outputs of a previous task as inputs to other nodes.
z = add_5(a=x)
# You can call other workflows from within this workflow
d = simple_wf()
# You can add conditions that can run on primitive types and execute different branches
e = conditional("bool").if_(a == 5).then(add_5(a=d)).else_().then(add_5(a=z))
# Outputs of the workflow have to be outputs returned by prior nodes.
# No outputs and single or multiple outputs are supported
return x, e
def test_workflow_lhs():
assert my_wf_example._lhs == "my_wf_example"
def test_all_node_types():
assert my_wf_example(a=1) == (6, 16)
entity_mapping = OrderedDict()
model_wf = get_serializable(entity_mapping, serialization_settings, my_wf_example)
assert len(model_wf.template.interface.outputs) == 2
assert len(model_wf.template.nodes) == 4
assert model_wf.template.nodes[2].workflow_node is not None
sub_wf = model_wf.sub_workflows[0]
assert len(sub_wf.nodes) == 1
assert sub_wf.nodes[0].id == "n0"
assert sub_wf.nodes[0].task_node.reference_id.name == "tests.flytekit.unit.core.test_workflows.add_5"
def test_wf_docstring():
model_wf = get_serializable(OrderedDict(), serialization_settings, my_wf_example)
assert len(model_wf.template.interface.outputs) == 2
assert model_wf.template.interface.outputs["o0"].description == "outputs"
assert model_wf.template.interface.outputs["o1"].description == "outputs"
assert len(model_wf.template.interface.inputs) == 1
assert model_wf.template.interface.inputs["a"].description == "input a"
@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
def test_structured_dataset_wf():
import pandas as pd
from pandas.testing import assert_frame_equal
from flytekit.types.schema import FlyteSchema
superset_cols = kwtypes(Name=str, Age=int, Height=int)
subset_cols = kwtypes(Name=str)
superset_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22], "Height": [160, 178]})
subset_df = pd.DataFrame({"Name": ["Tom", "Joseph"]})
@task
def t1() -> Annotated[pd.DataFrame, superset_cols]:
return superset_df
@task
def t2(df: Annotated[pd.DataFrame, subset_cols]) -> Annotated[pd.DataFrame, subset_cols]:
return df
@task
def t3(df: FlyteSchema[superset_cols]) -> FlyteSchema[superset_cols]:
return df
@task
def t4() -> FlyteSchema[superset_cols]:
return superset_df
@task
def t5(sd: Annotated[StructuredDataset, subset_cols]) -> Annotated[pd.DataFrame, subset_cols]:
return sd.open(pd.DataFrame).all()
@workflow
def sd_wf() -> Annotated[pd.DataFrame, subset_cols]:
# StructuredDataset -> StructuredDataset
df = t1()
return t2(df=df)
@workflow
def sd_to_schema_wf() -> pd.DataFrame:
# StructuredDataset -> schema
df = t1()
return t3(df=df)
@workflow
def schema_to_sd_wf() -> typing.Tuple[pd.DataFrame, pd.DataFrame]:
# schema -> StructuredDataset
df = t4()
return t2(df=df), t5(sd=df) # type: ignore
assert_frame_equal(sd_wf(), subset_df)
assert_frame_equal(sd_to_schema_wf(), superset_df)
assert_frame_equal(schema_to_sd_wf()[0], subset_df)
assert_frame_equal(schema_to_sd_wf()[1], subset_df)
@pytest.mark.skipif("pandas" not in sys.modules, reason="Pandas is not installed.")
def test_compile_wf_at_compile_time():
import pandas as pd
from flytekit.types.schema import FlyteSchema
superset_cols = kwtypes(Name=str, Age=int, Height=int)
superset_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22], "Height": [160, 178]})
ctx = FlyteContextManager.current_context()
with FlyteContextManager.with_context(
ctx.with_execution_state(
ctx.new_execution_state().with_params(mode=context_manager.ExecutionState.Mode.TASK_EXECUTION)
)
):
@task
def t4() -> FlyteSchema[superset_cols]:
return superset_df
@workflow
def wf():
t4()
assert ctx.compilation_state is None
@pytest.mark.parametrize(
"error_message", [
"Fail!",
None,
"",
("big", "boom!")
]
)
@patch("builtins.print")
def test_failure_node_local_execution(mock_print, error_message, exec_prefix):
@task
def clean_up(name: str, err: typing.Optional[FlyteError] = None):
print(f"Deleting cluster {name} due to {err}")
print("This is err:", str(err))
@task
def create_cluster(name: str):
print(f"Creating cluster: {name}")
@task
def delete_cluster(name: str, err: typing.Optional[FlyteError] = None):
print(f"Deleting cluster {name}")
print(err)
@task
def t1(a: int, b: str):
print(f"{a} {b}")
raise ValueError(error_message)
@workflow(on_failure=clean_up)
def wf(name: str = "flyteorg"):
c = create_cluster(name=name)
t = t1(a=1, b="2")
d = delete_cluster(name=name)
c >> t >> d
with pytest.raises(ValueError):
wf()
# Adjusted the error message to match the one in the failure
expected_error_message = str(
FlyteError(message=f"Error encountered while executing '{exec_prefix}tests.flytekit.unit.core.test_workflows.t1':
rror_message}", failed_node_id="fn0")
)
assert mock_print.call_count > 0
mock_print.assert_any_call("Creating cluster: flyteorg")
mock_print.assert_any_call("1 2")
mock_print.assert_any_call(f"Deleting cluster flyteorg due to {expected_error_message}")
mock_print.assert_any_call("This is err:", expected_error_message)Again, users should keep in mind that even though the body of the function looks like regular Python, it is
actually not. When flytekit scans the workflow function, the objects being passed around between the tasks are not
your typical Python values. So even though you may have a task t1() -> int, when a = t1() is called, a
will not be an integer so if you try to range(a) you’ll get an error.
Please see the :ref:user guide <cookbook:workflow> for more usage examples.
| Parameter | Type | Description |
|---|---|---|
_workflow_function |
Optional[Callable[P, FuncOut]] |
This argument is implicitly passed and represents the decorated function. |
failure_policy |
Optional[WorkflowFailurePolicy] |
Use the options in flytekit.WorkflowFailurePolicy |
interruptible |
bool |
Whether or not tasks launched from this workflow are by default interruptible |
on_failure |
Optional[Union[WorkflowBase, Task]] |
Invoke this workflow or task on failure. The Workflow / task has to match the signature of the current workflow, with an additional parameter called error Error |
docs |
Optional[Documentation] |
Description entity for the workflow |
pickle_untyped |
bool |
This is a flag that allows users to bypass the type-checking that Flytekit does when constructing the workflow. This is not recommended for general use. |
default_options |
Optional[Options] |
Default options for the workflow when creating a default launch plan. Currently only the labels and annotations are allowed to be set as defaults. |
union.ActorEnvironment
ActorEnvironment class.
class ActorEnvironment(
name: str,
container_image: Optional[Union[str, ImageSpec]],
replica_count: int,
ttl_seconds: Optional[int],
environment: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
accelerator: Optional[BaseAccelerator],
secret_requests: Optional[List[Secret]],
pod_template: Optional[PodTemplate],
interruptible: bool,
)| Parameter | Type | Description |
|---|---|---|
name |
str |
|
container_image |
Optional[Union[str, ImageSpec]] |
|
replica_count |
int |
|
ttl_seconds |
Optional[int] |
|
environment |
Optional[Dict[str, str]] |
|
requests |
Optional[Resources] |
|
limits |
Optional[Resources] |
|
accelerator |
Optional[BaseAccelerator] |
|
secret_requests |
Optional[List[Secret]] |
|
pod_template |
Optional[PodTemplate] |
|
interruptible |
bool |
Properties
| Property | Type | Description |
|---|---|---|
task |
||
version |
union.Artifact
This is a wrapper around the Flytekit Artifact class.
This Python class has two purposes - as a Python representation of a materialized Artifact, and as a way for users to specify that tasks/workflows create Artifacts and the manner in which they are created.
Use one as input to workflow (only workflow for now) df_artifact = Artifact.get(“flyte://a1”) remote.execute(wf, inputs={“a”: df_artifact})
Note that Python fields will be missing when retrieved from the service.
class Artifact(
args,
project: Optional[str],
domain: Optional[str],
name: Optional[str],
version: Optional[str],
time_partitioned: bool,
time_partition: Optional[TimePartition],
time_partition_granularity: Optional[Granularity],
partition_keys: Optional[typing.List[str]],
partitions: Optional[Union[Partitions, typing.Dict[str, str]]],
python_val: Optional[typing.Any],
python_type: Optional[typing.Type],
literal: Optional[Literal],
literal_type: Optional[LiteralType],
short_description: Optional[str],
source: Optional[artifacts_pb2.ArtifactSource],
card: Optional[Card],
kwargs,
)| Parameter | Type | Description |
|---|---|---|
args |
*args |
|
project |
Optional[str] |
Should not be directly user provided, the project/domain will come from the project/domain of the execution that produced the output. These values will be filled in automatically when retrieving however. |
domain |
Optional[str] |
See above. |
name |
Optional[str] |
The name of the Artifact. This should be user provided. |
version |
Optional[str] |
Version of the Artifact, typically the execution ID, plus some additional entropy. Not user provided. |
time_partitioned |
bool |
Whether or not this Artifact will have a time partition. |
time_partition |
Optional[TimePartition] |
If you want to manually pass in the full TimePartition object |
time_partition_granularity |
Optional[Granularity] |
If you don’t want to manually pass in the full TimePartition object, but want to control the granularity when one is automatically created for you. Note that consistency checking is limited while in alpha. |
partition_keys |
Optional[typing.List[str]] |
This is a list of keys that will be used to partition the Artifact. These are not the values. Values are set via a () on the artifact and will end up in the partition_values field. |
partitions |
Optional[Union[Partitions, typing.Dict[str, str]]] |
This is a dictionary of partition keys to values. |
python_val |
Optional[typing.Any] |
The Python value. |
python_type |
Optional[typing.Type] |
The Python type. |
literal |
Optional[Literal] |
|
literal_type |
Optional[LiteralType] |
|
short_description |
Optional[str] |
|
source |
Optional[artifacts_pb2.ArtifactSource] |
|
card |
Optional[Card] |
|
kwargs |
**kwargs |
Methods
| Method | Description |
|---|---|
create_from() |
This function allows users to declare partition values dynamically from the body of a task. |
embed_as_query() |
This should only be called in the context of a Trigger. |
from_flyte_idl() |
Converts the IDL representation to this object. |
get() |
This function is supposed to mimic the get() behavior inputs/outputs as returned by FlyteRemote for an. |
initialize() |
Use this for when you have a Python value you want to get an Artifact object out of. |
metadata() |
|
query() |
|
set_resolver() |
|
set_source() |
|
to_create_request() |
|
to_id_idl() |
Converts this object to the IDL representation. |
create_from()
def create_from(
o: O,
card: Optional[SerializableToString],
args: *args,
kwargs,
) -> OThis function allows users to declare partition values dynamically from the body of a task. Note that you’ll still need to annotate your task function output with the relevant Artifact object. Below, one of the partition values is bound to an input, and the other is set at runtime. Note that since tasks are not run at compile time, flytekit cannot check that you’ve bound all the partition values. It’s up to you to ensure that you’ve done so.
Pricing = Artifact(name="pricing", partition_keys=["region"])
EstError = Artifact(name="estimation_error", partition_keys=["dataset"], time_partitioned=True)
@task
def t1() -> Annotated[pd.DataFrame, Pricing], Annotated[float, EstError]:
df = get_pricing_results()
dt = get_time()
return Pricing.create_from(df, region="dubai"), EstError.create_from(msq_error, dataset="train", time_partition=dt)
You can mix and match with the input syntax as well.
@task
def my_task() -> Annotated[pd.DataFrame, RideCountData(region=Inputs.region)]:
...
return RideCountData.create_from(df, time_partition=datetime.datetime.now())
| Parameter | Type | Description |
|---|---|---|
o |
O |
|
card |
Optional[SerializableToString] |
|
args |
*args |
|
kwargs |
**kwargs |
embed_as_query()
def embed_as_query(
partition: Optional[str],
bind_to_time_partition: Optional[bool],
expr: Optional[str],
op: Optional[Op],
) -> art_id.ArtifactQueryThis should only be called in the context of a Trigger. The type of query this returns is different from the query() function. This type of query is used to reference the triggering artifact, rather than running a query.
| Parameter | Type | Description |
|---|---|---|
partition |
Optional[str] |
Can embed a time partition |
bind_to_time_partition |
Optional[bool] |
Set to true if you want to bind to a time partition |
expr |
Optional[str] |
Only valid if there’s a time partition. |
op |
Optional[Op] |
If expr is given, then op is what to do with it. |
from_flyte_idl()
def from_flyte_idl(
pb2: artifacts_pb2.Artifact,
) -> ArtifactConverts the IDL representation to this object.
| Parameter | Type | Description |
|---|---|---|
pb2 |
artifacts_pb2.Artifact |
get()
def get(
as_type: Optional[typing.Type],
) -> Optional[typing.Any]This function is supposed to mimic the get() behavior inputs/outputs as returned by FlyteRemote for an execution, leveraging the LiteralsResolver (and underneath that the TypeEngine) to turn the literal into a Python value.
| Parameter | Type | Description |
|---|---|---|
as_type |
Optional[typing.Type] |
initialize()
def initialize(
python_val: typing.Any,
python_type: typing.Type,
name: Optional[str],
literal_type: Optional[LiteralType],
version: Optional[str],
tags: Optional[typing.List[str]],
) -> ArtifactUse this for when you have a Python value you want to get an Artifact object out of.
This function readies an Artifact for creation, it doesn’t actually create it just yet since this is a network-less call. You will need to persist it with a FlyteRemote instance: remote.create_artifact(Artifact.initialize(…))
Artifact.initialize("/path/to/file", tags={“tag1”: “val1”}) Artifact.initialize("/path/to/parquet", type=pd.DataFrame, tags=[“0.1.0”])
What’s set here is everything that isn’t set by the server. What is set by the server?
- name, version, if not set by user.
- uri Set by remote
- project, domain
| Parameter | Type | Description |
|---|---|---|
python_val |
typing.Any |
|
python_type |
typing.Type |
|
name |
Optional[str] |
|
literal_type |
Optional[LiteralType] |
|
version |
Optional[str] |
|
tags |
Optional[typing.List[str]] |
metadata()
def metadata()query()
def query(
project: Optional[str],
domain: Optional[str],
time_partition: Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]],
partitions: Optional[Union[typing.Dict[str, str], Partitions]],
kwargs,
) -> ArtifactQuery| Parameter | Type | Description |
|---|---|---|
project |
Optional[str] |
|
domain |
Optional[str] |
|
time_partition |
Optional[Union[datetime.datetime, TimePartition, art_id.InputBindingData]] |
|
partitions |
Optional[Union[typing.Dict[str, str], Partitions]] |
|
kwargs |
**kwargs |
set_resolver()
def set_resolver(
resolver: LiteralsResolver,
)| Parameter | Type | Description |
|---|---|---|
resolver |
LiteralsResolver |
set_source()
def set_source(
source: artifacts_pb2.ArtifactSource,
)| Parameter | Type | Description |
|---|---|---|
source |
artifacts_pb2.ArtifactSource |
to_create_request()
def to_create_request(
a: Artifact,
) -> artifacts_pb2.CreateArtifactRequest| Parameter | Type | Description |
|---|---|---|
a |
Artifact |
to_id_idl()
def to_id_idl()Converts this object to the IDL representation. This is here instead of translator because it’s in the interface, a relatively simple proto object that’s exposed to the user.
Properties
| Property | Type | Description |
|---|---|---|
concrete_artifact_id |
||
partitions |
||
time_partition |
union.Cache
Cache configuration for a task.
class Cache(
version: typing.Optional[str],
serialize: bool,
ignored_inputs: typing.Union[typing.Tuple[str, ...], str],
salt: str,
policies: typing.Union[typing.List[flytekit.core.cache.CachePolicy], flytekit.core.cache.CachePolicy, NoneType],
)| Parameter | Type | Description |
|---|---|---|
version |
typing.Optional[str] |
|
serialize |
bool |
|
ignored_inputs |
typing.Union[typing.Tuple[str, ...], str] |
|
salt |
str |
|
policies |
typing.Union[typing.List[flytekit.core.cache.CachePolicy], flytekit.core.cache.CachePolicy, NoneType] |
Methods
| Method | Description |
|---|---|
get_ignored_inputs() |
|
get_version() |
get_ignored_inputs()
def get_ignored_inputs()get_version()
def get_version(
params: flytekit.core.cache.VersionParameters,
) -> str| Parameter | Type | Description |
|---|---|---|
params |
flytekit.core.cache.VersionParameters |
union.CachePolicy
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol):
def meth(self) -> int:
...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C:
def meth(self) -> int:
return 0
def func(x: Proto) -> int:
return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol):
def meth(self) -> T:
...
protocol CachePolicy()Methods
| Method | Description |
|---|---|
get_version() |
get_version()
def get_version(
salt: str,
params: flytekit.core.cache.VersionParameters,
) -> str| Parameter | Type | Description |
|---|---|---|
salt |
str |
|
params |
flytekit.core.cache.VersionParameters |
union.ContainerTask
This is an intermediate class that represents Flyte Tasks that run a container at execution time. This is the vast
majority of tasks - the typical @task decorated tasks for instance all run a container. An example of
something that doesn’t run a container would be something like the Athena SQL task.
class ContainerTask(
name: str,
image: typing.Union[str, flytekit.image_spec.image_spec.ImageSpec],
command: typing.List[str],
inputs: typing.Optional[typing.OrderedDict[str, typing.Type]],
metadata: typing.Optional[flytekit.core.base_task.TaskMetadata],
arguments: typing.Optional[typing.List[str]],
outputs: typing.Optional[typing.Dict[str, typing.Type]],
requests: typing.Optional[flytekit.core.resources.Resources],
limits: typing.Optional[flytekit.core.resources.Resources],
input_data_dir: typing.Optional[str],
output_data_dir: typing.Optional[str],
metadata_format: <enum 'MetadataFormat'>,
io_strategy: typing.Optional[flytekit.core.container_task.ContainerTask.IOStrategy],
secret_requests: typing.Optional[typing.List[flytekit.models.security.Secret]],
pod_template: typing.Optional[ForwardRef('PodTemplate')],
pod_template_name: typing.Optional[str],
local_logs: bool,
resources: typing.Optional[flytekit.core.resources.Resources],
kwargs,
)| Parameter | Type | Description |
|---|---|---|
name |
str |
A unique name for the task instantiation. This is unique for every instance of task. |
image |
typing.Union[str, flytekit.image_spec.image_spec.ImageSpec] |
|
command |
typing.List[str] |
|
inputs |
typing.Optional[typing.OrderedDict[str, typing.Type]] |
|
metadata |
typing.Optional[flytekit.core.base_task.TaskMetadata] |
|
arguments |
typing.Optional[typing.List[str]] |
|
outputs |
typing.Optional[typing.Dict[str, typing.Type]] |
|
requests |
typing.Optional[flytekit.core.resources.Resources] |
|
limits |
typing.Optional[flytekit.core.resources.Resources] |
|
input_data_dir |
typing.Optional[str] |
|
output_data_dir |
typing.Optional[str] |
|
metadata_format |
<enum 'MetadataFormat'> |
|
io_strategy |
typing.Optional[flytekit.core.container_task.ContainerTask.IOStrategy] |
|
secret_requests |
typing.Optional[typing.List[flytekit.models.security.Secret]] |
|
pod_template |
typing.Optional[ForwardRef('PodTemplate')] |
|
pod_template_name |
typing.Optional[str] |
|
local_logs |
bool |
|
resources |
typing.Optional[flytekit.core.resources.Resources] |
|
kwargs |
**kwargs |
Methods
| Method | Description |
|---|---|
compile() |
Generates a node that encapsulates this task in a workflow definition. |
construct_node_metadata() |
Used when constructing the node that encapsulates this task as part of a broader workflow definition. |
dispatch_execute() |
This method translates Flyte’s Type system based input values and invokes the actual call to the executor. |
execute() |
This method will be invoked to execute the task. |
find_lhs() |
|
get_config() |
Returns the task config as a serializable dictionary. |
get_container() |
Returns the container definition (if any) that is used to run the task on hosted Flyte. |
get_custom() |
Return additional plugin-specific custom data (if any) as a serializable dictionary. |
get_extended_resources() |
Returns the extended resources to allocate to the task on hosted Flyte. |
get_input_types() |
Returns the names and python types as a dictionary for the inputs of this task. |
get_k8s_pod() |
Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte. |
get_sql() |
Returns the Sql definition (if any) that is used to run the task on hosted Flyte. |
get_type_for_input_var() |
Returns the python type for an input variable by name. |
get_type_for_output_var() |
Returns the python type for the specified output variable by name. |
local_execute() |
This function is used only in the local execution path and is responsible for calling dispatch execute. |
local_execution_mode() |
|
post_execute() |
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,. |
pre_execute() |
This is the method that will be invoked directly before executing the task method and before all the inputs. |
sandbox_execute() |
Call dispatch_execute, in the context of a local sandbox execution. |
compile()
def compile(
ctx: flytekit.core.context_manager.FlyteContext,
args,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, NoneType]Generates a node that encapsulates this task in a workflow definition.
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
args |
*args |
|
kwargs |
**kwargs |
construct_node_metadata()
def construct_node_metadata()Used when constructing the node that encapsulates this task as part of a broader workflow definition.
dispatch_execute()
def dispatch_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> typing.Union[flytekit.models.literals.LiteralMap, flytekit.models.dynamic_job.DynamicJobSpec, typing.Coroutine]This method translates Flyte’s Type system based input values and invokes the actual call to the executor This method is also invoked during runtime.
VoidPromiseis returned in the case when the task itself declares no outputs.Literal Mapis returned when the task returns either one more outputs in the declaration. Individual outputs may be noneDynamicJobSpecis returned when a dynamic workflow is executed
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
input_literal_map |
flytekit.models.literals.LiteralMap |
execute()
def execute(
kwargs,
) -> flytekit.models.literals.LiteralMapThis method will be invoked to execute the task.
| Parameter | Type | Description |
|---|---|---|
kwargs |
**kwargs |
find_lhs()
def find_lhs()get_config()
def get_config(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, str]]Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_container()
def get_container(
settings: flytekit.configuration.SerializationSettings,
) -> flytekit.models.task.ContainerReturns the container definition (if any) that is used to run the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_custom()
def get_custom(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[typing.Dict[str, typing.Any]]Return additional plugin-specific custom data (if any) as a serializable dictionary.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_extended_resources()
def get_extended_resources(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flyteidl.core.tasks_pb2.ExtendedResources]Returns the extended resources to allocate to the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_input_types()
def get_input_types()Returns the names and python types as a dictionary for the inputs of this task.
get_k8s_pod()
def get_k8s_pod(
settings: flytekit.configuration.SerializationSettings,
) -> flytekit.models.task.K8sPodReturns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_sql()
def get_sql(
settings: flytekit.configuration.SerializationSettings,
) -> typing.Optional[flytekit.models.task.Sql]Returns the Sql definition (if any) that is used to run the task on hosted Flyte.
| Parameter | Type | Description |
|---|---|---|
settings |
flytekit.configuration.SerializationSettings |
get_type_for_input_var()
def get_type_for_input_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]Returns the python type for an input variable by name.
| Parameter | Type | Description |
|---|---|---|
k |
str |
|
v |
typing.Any |
get_type_for_output_var()
def get_type_for_output_var(
k: str,
v: typing.Any,
) -> typing.Type[typing.Any]Returns the python type for the specified output variable by name.
| Parameter | Type | Description |
|---|---|---|
k |
str |
|
v |
typing.Any |
local_execute()
def local_execute(
ctx: flytekit.core.context_manager.FlyteContext,
kwargs,
) -> typing.Union[typing.Tuple[flytekit.core.promise.Promise], flytekit.core.promise.Promise, flytekit.core.promise.VoidPromise, typing.Coroutine, NoneType]This function is used only in the local execution path and is responsible for calling dispatch execute. Use this function when calling a task with native values (or Promises containing Flyte literals derived from Python native values).
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
kwargs |
**kwargs |
local_execution_mode()
def local_execution_mode()post_execute()
def post_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
rval: typing.Any,
) -> typing.AnyPost execute is called after the execution has completed, with the user_params and can be used to clean-up, or alter the outputs to match the intended tasks outputs. If not overridden, then this function is a No-op
| Parameter | Type | Description |
|---|---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
are the modified user params as created during the pre_execute step |
rval |
typing.Any |
pre_execute()
def pre_execute(
user_params: typing.Optional[flytekit.core.context_manager.ExecutionParameters],
) -> typing.Optional[flytekit.core.context_manager.ExecutionParameters]This is the method that will be invoked directly before executing the task method and before all the inputs are converted. One particular case where this is useful is if the context is to be modified for the user process to get some user space parameters. This also ensures that things like SparkSession are already correctly setup before the type transformers are called
This should return either the same context of the mutated context
| Parameter | Type | Description |
|---|---|---|
user_params |
typing.Optional[flytekit.core.context_manager.ExecutionParameters] |
sandbox_execute()
def sandbox_execute(
ctx: flytekit.core.context_manager.FlyteContext,
input_literal_map: flytekit.models.literals.LiteralMap,
) -> flytekit.models.literals.LiteralMapCall dispatch_execute, in the context of a local sandbox execution. Not invoked during runtime.
| Parameter | Type | Description |
|---|---|---|
ctx |
flytekit.core.context_manager.FlyteContext |
|
input_literal_map |
flytekit.models.literals.LiteralMap |
Properties
| Property | Type | Description |
|---|---|---|
deck_fields |
If not empty, this task will output deck html file for the specified decks |
|
disable_deck |
If true, this task will not output deck html file |
|
docs |
||
enable_deck |
If true, this task will output deck html file |
|
environment |
Any environment variables that supplied during the execution of the task. |
|
instantiated_in |
||
interface |
||
lhs |
||
location |
||
metadata |
||
name |
||
python_interface |
Returns this task’s python interface. |
|
resources |
||
security_context |
||
task_config |
Returns the user-specified task config which is used for plugin-specific handling of the task. |
|
task_type |
||
task_type_version |
union.Deck
Deck enable users to get customizable and default visibility into their tasks.
Deck contains a list of renderers (FrameRenderer, MarkdownRenderer) that can generate a html file. For example, FrameRenderer can render a DataFrame as an HTML table, MarkdownRenderer can convert Markdown string to HTML
Flyte context saves a list of deck objects, and we use renderers in those decks to render the data and create an HTML file when those tasks are executed
Each task has a least three decks (input, output, default). Input/output decks are used to render tasks’ input/output data, and the default deck is used to render line plots, scatter plots or Markdown text. In addition, users can create new decks to render their data with custom renderers.
iris_df = px.data.iris()
@task()
def t1() -> str:
md_text = '#Hello Flyte##Hello Flyte###Hello Flyte'
m = MarkdownRenderer()
s = BoxRenderer("sepal_length")
deck = flytekit.Deck("demo", s.to_html(iris_df))
deck.append(m.to_html(md_text))
default_deck = flytekit.current_context().default_deck
default_deck.append(m.to_html(md_text))
return md_text
# Use Annotated to override default renderer
@task()
def t2() -> Annotated[pd.DataFrame, TopFrameRenderer(10)]:
return iris_dfclass Deck(
name: str,
html: typing.Optional[str],
auto_add_to_deck: bool,
)| Parameter | Type | Description |
|---|---|---|
name |
str |
|
html |
typing.Optional[str] |
|
auto_add_to_deck |
bool |
Methods
| Method | Description |
|---|---|
append() |
|
publish() |
append()
def append(
html: str,
) -> Deck| Parameter | Type | Description |
|---|---|---|
html |
str |
publish()
def publish()Properties
| Property | Type | Description |
|---|---|---|
html |
||
name |
union.FlyteDirectory
class FlyteDirectory(
path: typing.Union[str, os.PathLike],
downloader: typing.Optional[typing.Callable],
remote_directory: typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]],
)| Parameter | Type | Description |
|---|---|---|
path |
typing.Union[str, os.PathLike] |
The source path that users are expected to call open() on |
downloader |
typing.Optional[typing.Callable] |
Optional function that can be passed that used to delay downloading of the actual fil until a user actually calls open(). |
remote_directory |
typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]] |
If the user wants to return something and also specify where it should be uploaded to. If set to False, then flytekit will not upload the directory to the remote store. |
Methods
| Method | Description |
|---|---|
crawl() |
Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”. |
deserialize_flyte_dir() |
|
download() |
|
extension() |
|
from_dict() |
|
from_json() |
|
from_source() |
Create a new FlyteDirectory object with the remote source set to the input. |
listdir() |
This function will list all files and folders in the given directory, but without downloading the contents. |
new() |
Create a new FlyteDirectory object in current Flyte working directory. |
new_dir() |
This will create a new folder under the current folder. |
new_file() |
This will create a new file under the current folder. |
new_remote() |
Create a new FlyteDirectory object using the currently configured default remote in the context (i. |
schema() |
|
serialize_flyte_dir() |
|
to_dict() |
|
to_json() |
crawl()
def crawl(
maxdepth: typing.Optional[int],
topdown: bool,
kwargs,
) -> Generator[Tuple[typing.Union[str, os.PathLike[Any]], typing.Dict[Any, Any]], None, None]Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”. if details=True is passed, then it will return a dictionary as specified by fsspec.
Example:
>>> list(fd.crawl())
[("/base", "file1"), ("/base", "dir1/file1"), ("/base", "dir2/file1"), ("/base", "dir1/dir/file1")]
>>> list(x.crawl(detail=True))
[('/tmp/test', {'my-dir/ab.py': {'name': '/tmp/test/my-dir/ab.py', 'size': 0, 'type': 'file',
'created': 1677720780.2318847, 'islink': False, 'mode': 33188, 'uid': 501, 'gid': 0,
'mtime': 1677720780.2317934, 'ino': 1694329, 'nlink': 1}})]
| Parameter | Type | Description |
|---|---|---|
maxdepth |
typing.Optional[int] |
|
topdown |
bool |
|
kwargs |
**kwargs |
deserialize_flyte_dir()
def deserialize_flyte_dir(
info,
) -> FlyteDirectory| Parameter | Type | Description |
|---|---|---|
info |
download()
def download()extension()
def extension()from_dict()
def from_dict(
kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
infer_missing,
) -> ~A| Parameter | Type | Description |
|---|---|---|
kvs |
typing.Union[dict, list, str, int, float, bool, NoneType] |
|
infer_missing |
from_json()
def from_json(
s: typing.Union[str, bytes, bytearray],
parse_float,
parse_int,
parse_constant,
infer_missing,
kw,
) -> ~A| Parameter | Type | Description |
|---|---|---|
s |
typing.Union[str, bytes, bytearray] |
|
parse_float |
||
parse_int |
||
parse_constant |
||
infer_missing |
||
kw |
from_source()
def from_source(
source: str | os.PathLike,
) -> FlyteDirectoryCreate a new FlyteDirectory object with the remote source set to the input
| Parameter | Type | Description |
|---|---|---|
source |
str | os.PathLike |
listdir()
def listdir(
directory: FlyteDirectory,
) -> typing.List[typing.Union[FlyteDirectory, FlyteFile]]This function will list all files and folders in the given directory, but without downloading the contents. In addition, it will return a list of FlyteFile and FlyteDirectory objects that have ability to lazily download the contents of the file/folder. For example:
entity = FlyteDirectory.listdir(directory)
for e in entity:
print("s3 object:", e.remote_source)
# s3 object: s3://test-flytedir/file1.txt
# s3 object: s3://test-flytedir/file2.txt
# s3 object: s3://test-flytedir/sub_dir
open(entity[0], "r") # This will download the file to the local disk.
open(entity[0], "r") # flytekit will read data from the local disk if you open it again.| Parameter | Type | Description |
|---|---|---|
directory |
FlyteDirectory |
new()
def new(
dirname: str | os.PathLike,
) -> FlyteDirectoryCreate a new FlyteDirectory object in current Flyte working directory.
| Parameter | Type | Description |
|---|---|---|
dirname |
str | os.PathLike |
new_dir()
def new_dir(
name: typing.Optional[str],
) -> FlyteDirectoryThis will create a new folder under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.
| Parameter | Type | Description |
|---|---|---|
name |
typing.Optional[str] |
new_file()
def new_file(
name: typing.Optional[str],
) -> FlyteFileThis will create a new file under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.
| Parameter | Type | Description |
|---|---|---|
name |
typing.Optional[str] |
new_remote()
def new_remote(
stem: typing.Optional[str],
alt: typing.Optional[str],
) -> FlyteDirectoryCreate a new FlyteDirectory object using the currently configured default remote in the context (i.e. the raw_output_prefix configured in the current FileAccessProvider object in the context). This is used if you explicitly have a folder somewhere that you want to create files under. If you want to write a whole folder, you can let your task return a FlyteDirectory object, and let flytekit handle the uploading.
| Parameter | Type | Description |
|---|---|---|
stem |
typing.Optional[str] |
A stem to append to the path as the final prefix “directory”. |
alt |
typing.Optional[str] |
An alternate first member of the prefix to use instead of the default. :return FlyteDirectory: A new FlyteDirectory object that points to a remote location. |
schema()
def schema(
infer_missing: bool,
only,
exclude,
many: bool,
context,
load_only,
dump_only,
partial: bool,
unknown,
) -> SchemaType[A]| Parameter | Type | Description |
|---|---|---|
infer_missing |
bool |
|
only |
||
exclude |
||
many |
bool |
|
context |
||
load_only |
||
dump_only |
||
partial |
bool |
|
unknown |
serialize_flyte_dir()
def serialize_flyte_dir()to_dict()
def to_dict(
encode_json,
) -> typing.Dict[str, typing.Union[dict, list, str, int, float, bool, NoneType]]| Parameter | Type | Description |
|---|---|---|
encode_json |
to_json()
def to_json(
skipkeys: bool,
ensure_ascii: bool,
check_circular: bool,
allow_nan: bool,
indent: typing.Union[int, str, NoneType],
separators: typing.Tuple[str, str],
default: typing.Callable,
sort_keys: bool,
kw,
) -> str| Parameter | Type | Description |
|---|---|---|
skipkeys |
bool |
|
ensure_ascii |
bool |
|
check_circular |
bool |
|
allow_nan |
bool |
|
indent |
typing.Union[int, str, NoneType] |
|
separators |
typing.Tuple[str, str] |
|
default |
typing.Callable |
|
sort_keys |
bool |
|
kw |
Properties
| Property | Type | Description |
|---|---|---|
downloaded |
||
remote_directory |
||
remote_source |
If this is an input to a task, and the original path is s3://something, flytekit will download the directory for the user. In case the user wants access to the original path, it will be here. |
|
sep |
union.FlyteFile
class FlyteFile(
path: typing.Union[str, os.PathLike],
downloader: typing.Callable,
remote_path: typing.Optional[typing.Union[os.PathLike, str, bool]],
metadata: typing.Optional[dict[str, str]],
)FlyteFile’s init method.
| Parameter | Type | Description |
|---|---|---|
path |
typing.Union[str, os.PathLike] |
The source path that users are expected to call open() on. |
downloader |
typing.Callable |
Optional function that can be passed that used to delay downloading of the actual fil until a user actually calls open(). |
remote_path |
typing.Optional[typing.Union[os.PathLike, str, bool]] |
If the user wants to return something and also specify where it should be uploaded to. Alternatively, if the user wants to specify a remote path for a file that’s already in the blob store, the path should point to the location and remote_path should be set to False. |
metadata |
typing.Optional[dict[str, str]] |
Methods
| Method | Description |
|---|---|
deserialize_flyte_file() |
|
download() |
|
extension() |
|
from_dict() |
|
from_json() |
|
from_source() |
Create a new FlyteFile object with the remote source set to the input. |
new() |
Create a new FlyteFile object in the current Flyte working directory. |
new_remote_file() |
Create a new FlyteFile object with a remote path. |
open() |
Returns a streaming File handle. |
serialize_flyte_file() |
|
to_dict() |
|
to_json() |
deserialize_flyte_file()
def deserialize_flyte_file(
info,
) -> 'FlyteFile'| Parameter | Type | Description |
|---|---|---|
info |
download()
def download()extension()
def extension()from_dict()
def from_dict(
d,
dialect,
)| Parameter | Type | Description |
|---|---|---|
d |
||
dialect |
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
) -> ~T| Parameter | Type | Description |
|---|---|---|
data |
typing.Union[str, bytes, bytearray] |
|
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
|
from_dict_kwargs |
typing.Any |
from_source()
def from_source(
source: str | os.PathLike,
) -> FlyteFileCreate a new FlyteFile object with the remote source set to the input
| Parameter | Type | Description |
|---|---|---|
source |
str | os.PathLike |
new()
def new(
filename: str | os.PathLike,
) -> FlyteFileCreate a new FlyteFile object in the current Flyte working directory
| Parameter | Type | Description |
|---|---|---|
filename |
str | os.PathLike |
new_remote_file()
def new_remote_file(
name: typing.Optional[str],
alt: typing.Optional[str],
) -> FlyteFileCreate a new FlyteFile object with a remote path.
| Parameter | Type | Description |
|---|---|---|
name |
typing.Optional[str] |
If you want to specify a different name for the file, you can specify it here. |
alt |
typing.Optional[str] |
If you want to specify a different prefix head than the default one, you can specify it here. |
open()
def open(
mode: str,
cache_type: typing.Optional[str],
cache_options: typing.Optional[typing.Dict[str, typing.Any]],
)Returns a streaming File handle
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file()
with ff.open("rb", cache_type="readahead") as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file| Parameter | Type | Description |
|---|---|---|
mode |
str |
Open mode. For example :type mode: str |
cache_type |
typing.Optional[str] |
Specifies the cache type. Possible values are “blockcache”, “bytes”, “mmap”, “readahead”, “first”, or “background”. This is especially useful for large file reads. See https://filesystem-spec.readthedocs.io/en/latest/api.html#readbuffering. :type cache_type: str, optional |
cache_options |
typing.Optional[typing.Dict[str, typing.Any]] |
A Dict corresponding to the parameters for the chosen cache_type. Refer to fsspec caching options above. :type cache_options: Dict[str, Any], optional |
serialize_flyte_file()
def serialize_flyte_file()to_dict()
def to_dict()to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]| Parameter | Type | Description |
|---|---|---|
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
|
to_dict_kwargs |
typing.Any |
Properties
| Property | Type | Description |
|---|---|---|
downloaded |
||
remote_path |
||
remote_source |
If this is an input to a task, and the original path is an s3 bucket, Flytekit downloads thefile for the user. In case the user wants access to the original path, it will be here. |
union.ImageSpec
This class is used to specify the docker image that will be used to run the task.
Attributes:
name (str): Name of the image.
python_version (str): Python version of the image. Use default python in the base image if None.
builder (Optional[str]): Type of plugin to build the image. Use envd by default.
source_root (Optional[str]): Source root of the image.
env (Optional[Dict[str, str]]): Environment variables of the image.
registry (Optional[str]): Registry of the image.
packages (Optional[List[str]]): List of python packages to install.
conda_packages (Optional[List[str]]): List of conda packages to install.
conda_channels (Optional[List[str]]): List of conda channels.
requirements (Optional[str]): Path to the requirements.txt file.
apt_packages (Optional[List[str]]): List of apt packages to install.
cuda (Optional[str]): Version of cuda to install.
cudnn (Optional[str]): Version of cudnn to install.
base_image (Optional[Union[str, ‘ImageSpec’]]): Base image of the image.
platform (Optional[str]): Specify the target platforms for the build output (for example, windows/amd64 or linux/amd64,darwin/arm64).
pip_index (Optional[str]): Specify the custom pip index url.
pip_extra_index_url (Optional[List[str]]): Specify one or more pip index urls as a list.
pip_secret_mounts (Optional[List[Tuple[str, str]]]): Specify a list of tuples to mount secret for pip install. Each tuple should contain the path to
the secret file and the mount path. For example, [(".gitconfig", “/etc/gitconfig”)]. This is experimental and
the interface may change in the future. Configuring this should not change the built image.
pip_extra_args (Optional[str]): Specify one or more extra pip install arguments as a space-delimited string.
registry_config (Optional[str]): Specify the path to a JSON registry config file.
entrypoint (Optional[List[str]]): List of strings to overwrite the entrypoint of the base image with, set to [] to remove the entrypoint.
commands (Optional[List[str]]): Command to run during the building process.
tag_format (Optional[str]): Custom string format for image tag. The ImageSpec hash passed in as spec_hash. For example,
to add a “dev” suffix to the image tag, set tag_format="{spec_hash}-dev".
source_copy_mode (Optional[CopyFileDetection]): This option allows the user to specify which source files to copy from the local host, into the image.
Not setting this option means to use the default flytekit behavior. The default behavior is:
- if fast register is used, source files are not copied into the image (because they’re already copied
into the fast register tar layer).
- if fast register is not used, then the LOADED_MODULES (aka ‘auto’) option is used to copy loaded
Python files into the image.
If the option is set by the user, then that option is of course used.
copy (Optional[List[str]]): List of files/directories to copy to /root. e.g. [“src/file1.txt”, “src/file2.txt”].
python_exec (Optional[str]): Python executable to use for install packages.
runtime_packages (Optional[List[str]]): List of packages to be installed during runtime. runtime_packages requires pip to be installed
in your base image.
- If you are using an ImageSpec as your base image, please include pip into your packages:
ImageSpec(..., packages=["pip"]).
- If you want to install runtime packages into a fixed base_image and not use an image builder, you can
use builder="noop": ImageSpec(base_image="ghcr.io/name/my-custom-image", builder="noop").with_runtime_packages(["numpy"]).
builder_options (Optional[Dict[str, Any]]): Additional options for the builder. This is a dictionary that will be passed to the builder.
The options are builder-specific and may not be supported by all builders.
builder_config (Optional[typing.Dict[str, typing.Any]]): Custom builder images configuration, such as uv and micromamba images.
class ImageSpec(
name: str,
python_version: str,
builder: typing.Optional[str],
source_root: typing.Optional[str],
env: typing.Optional[typing.Dict[str, str]],
registry: typing.Optional[str],
packages: typing.Optional[typing.List[str]],
conda_packages: typing.Optional[typing.List[str]],
conda_channels: typing.Optional[typing.List[str]],
requirements: typing.Optional[str],
apt_packages: typing.Optional[typing.List[str]],
cuda: typing.Optional[str],
cudnn: typing.Optional[str],
base_image: typing.Union[str, ForwardRef('ImageSpec'), NoneType],
platform: typing.Optional[str],
pip_index: typing.Optional[str],
pip_extra_index_url: typing.Optional[typing.List[str]],
pip_secret_mounts: typing.Optional[typing.List[typing.Tuple[str, str]]],
pip_extra_args: typing.Optional[str],
registry_config: typing.Optional[str],
entrypoint: typing.Optional[typing.List[str]],
commands: typing.Optional[typing.List[str]],
tag_format: typing.Optional[str],
source_copy_mode: typing.Optional[flytekit.constants.CopyFileDetection],
copy: typing.Optional[typing.List[str]],
python_exec: typing.Optional[str],
runtime_packages: typing.Optional[typing.List[str]],
builder_options: typing.Optional[typing.Dict[str, typing.Any]],
builder_config: typing.Optional[typing.Dict[str, typing.Any]],
)| Parameter | Type | Description |
|---|---|---|
name |
str |
|
python_version |
str |
|
builder |
typing.Optional[str] |
|
source_root |
typing.Optional[str] |
|
env |
typing.Optional[typing.Dict[str, str]] |
|
registry |
typing.Optional[str] |
|
packages |
typing.Optional[typing.List[str]] |
|
conda_packages |
typing.Optional[typing.List[str]] |
|
conda_channels |
typing.Optional[typing.List[str]] |
|
requirements |
typing.Optional[str] |
|
apt_packages |
typing.Optional[typing.List[str]] |
|
cuda |
typing.Optional[str] |
|
cudnn |
typing.Optional[str] |
|
base_image |
typing.Union[str, ForwardRef('ImageSpec'), NoneType] |
|
platform |
typing.Optional[str] |
|
pip_index |
typing.Optional[str] |
|
pip_extra_index_url |
typing.Optional[typing.List[str]] |
|
pip_secret_mounts |
typing.Optional[typing.List[typing.Tuple[str, str]]] |
|
pip_extra_args |
typing.Optional[str] |
|
registry_config |
typing.Optional[str] |
|
entrypoint |
typing.Optional[typing.List[str]] |
|
commands |
typing.Optional[typing.List[str]] |
|
tag_format |
typing.Optional[str] |
|
source_copy_mode |
typing.Optional[flytekit.constants.CopyFileDetection] |
|
copy |
typing.Optional[typing.List[str]] |
|
python_exec |
typing.Optional[str] |
|
runtime_packages |
typing.Optional[typing.List[str]] |
|
builder_options |
typing.Optional[typing.Dict[str, typing.Any]] |
|
builder_config |
typing.Optional[typing.Dict[str, typing.Any]] |
Methods
| Method | Description |
|---|---|
exist() |
Check if the image exists in the registry. |
force_push() |
Builder that returns a new image spec with force push enabled. |
from_env() |
Create ImageSpec with the environment’s Python version and packages pinned to the ones in the environment. |
image_name() |
Full image name with tag. |
is_container() |
Check if the current container image in the pod is built from current image spec. |
with_apt_packages() |
Builder that returns a new image spec with an additional list of apt packages that will be executed during the building process. |
with_builder_options() |
Builder that returns a new image spec with additional builder options. |
with_commands() |
Builder that returns a new image spec with an additional list of commands that will be executed during the building process. |
with_copy() |
Builder that returns a new image spec with the source files copied to the destination directory. |
with_packages() |
Builder that returns a new image speck with additional python packages that will be installed during the building process. |
with_runtime_packages() |
Builder that returns a new image spec with runtime packages. |
exist()
def exist()Check if the image exists in the registry. Return True if the image exists in the registry, False otherwise. Return None if failed to check if the image exists due to the permission issue or other reasons.
force_push()
def force_push()Builder that returns a new image spec with force push enabled.
from_env()
def from_env(
pinned_packages: typing.Optional[typing.List[str]],
kwargs,
) -> ImageSpecCreate ImageSpec with the environment’s Python version and packages pinned to the ones in the environment.
| Parameter | Type | Description |
|---|---|---|
pinned_packages |
typing.Optional[typing.List[str]] |
|
kwargs |
**kwargs |
image_name()
def image_name()Full image name with tag.
is_container()
def is_container()Check if the current container image in the pod is built from current image spec. :return: True if the current container image in the pod is built from current image spec, False otherwise.
with_apt_packages()
def with_apt_packages(
apt_packages: typing.Union[str, typing.List[str]],
) -> ImageSpecBuilder that returns a new image spec with an additional list of apt packages that will be executed during the building process.
| Parameter | Type | Description |
|---|---|---|
apt_packages |
typing.Union[str, typing.List[str]] |
with_builder_options()
def with_builder_options(
builder_options: typing.Dict[str, typing.Any],
) -> ImageSpecBuilder that returns a new image spec with additional builder options.
| Parameter | Type | Description |
|---|---|---|
builder_options |
typing.Dict[str, typing.Any] |
with_commands()
def with_commands(
commands: typing.Union[str, typing.List[str]],
) -> ImageSpecBuilder that returns a new image spec with an additional list of commands that will be executed during the building process.
| Parameter | Type | Description |
|---|---|---|
commands |
typing.Union[str, typing.List[str]] |
with_copy()
def with_copy(
src: typing.Union[str, typing.List[str]],
) -> ImageSpecBuilder that returns a new image spec with the source files copied to the destination directory.
| Parameter | Type | Description |
|---|---|---|
src |
typing.Union[str, typing.List[str]] |
with_packages()
def with_packages(
packages: typing.Union[str, typing.List[str]],
) -> ImageSpecBuilder that returns a new image speck with additional python packages that will be installed during the building process.
| Parameter | Type | Description |
|---|---|---|
packages |
typing.Union[str, typing.List[str]] |
with_runtime_packages()
def with_runtime_packages(
runtime_packages: typing.List[str],
) -> ImageSpecBuilder that returns a new image spec with runtime packages. Dev packages will be installed during runtime.
| Parameter | Type | Description |
|---|---|---|
runtime_packages |
typing.List[str] |
union.LaunchPlan
Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the
:std:ref:core concepts <flyte:divedeep-launchplans> if you are unfamiliar with them.
Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow
@workflow
def wf(a: int, c: str) -> str:
...Create the default launch plan with
LaunchPlan.get_or_create(workflow=my_wf)If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:
Additionally, a launch plan can be configured to run on a schedule and emit notifications.
Please see the relevant Schedule and Notification objects as well.
To configure the remaining parameters, you’ll need to import the relevant model objects as well.
from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfigThen use as follows:
class LaunchPlan(
name: str,
workflow: _annotated_workflow.WorkflowBase,
parameters: _interface_models.ParameterMap,
fixed_inputs: _literal_models.LiteralMap,
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
concurrency: Optional[ConcurrencyPolicy],
)| Parameter | Type | Description |
|---|---|---|
name |
str |
|
workflow |
_annotated_workflow.WorkflowBase |
|
parameters |
_interface_models.ParameterMap |
|
fixed_inputs |
_literal_models.LiteralMap |
|
schedule |
Optional[_schedule_model.Schedule] |
|
notifications |
Optional[List[_common_models.Notification]] |
|
labels |
Optional[_common_models.Labels] |
|
annotations |
Optional[_common_models.Annotations] |
|
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
|
max_parallelism |
Optional[int] |
|
security_context |
Optional[security.SecurityContext] |
|
trigger |
Optional[LaunchPlanTriggerBase] |
|
overwrite_cache |
Optional[bool] |
|
auto_activate |
bool |
|
concurrency |
Optional[ConcurrencyPolicy] |
Methods
| Method | Description |
|---|---|
clone_with() |
|
construct_node_metadata() |
|
create() |
|
get_default_launch_plan() |
Users should probably call the get_or_create function defined below instead. |
get_or_create() |
This function offers a friendlier interface for creating launch plans. |
clone_with()
def clone_with(
name: str,
parameters: Optional[_interface_models.ParameterMap],
fixed_inputs: Optional[_literal_models.LiteralMap],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
) -> LaunchPlan| Parameter | Type | Description |
|---|---|---|
name |
str |
|
parameters |
Optional[_interface_models.ParameterMap] |
|
fixed_inputs |
Optional[_literal_models.LiteralMap] |
|
schedule |
Optional[_schedule_model.Schedule] |
|
notifications |
Optional[List[_common_models.Notification]] |
|
labels |
Optional[_common_models.Labels] |
|
annotations |
Optional[_common_models.Annotations] |
|
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
|
max_parallelism |
Optional[int] |
|
security_context |
Optional[security.SecurityContext] |
|
trigger |
Optional[LaunchPlanTriggerBase] |
|
overwrite_cache |
Optional[bool] |
|
auto_activate |
bool |
construct_node_metadata()
def construct_node_metadata()create()
def create(
name: str,
workflow: _annotated_workflow.WorkflowBase,
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
concurrency: Optional[ConcurrencyPolicy],
) -> LaunchPlan| Parameter | Type | Description |
|---|---|---|
name |
str |
|
workflow |
_annotated_workflow.WorkflowBase |
|
default_inputs |
Optional[Dict[str, Any]] |
|
fixed_inputs |
Optional[Dict[str, Any]] |
|
schedule |
Optional[_schedule_model.Schedule] |
|
notifications |
Optional[List[_common_models.Notification]] |
|
labels |
Optional[_common_models.Labels] |
|
annotations |
Optional[_common_models.Annotations] |
|
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
|
max_parallelism |
Optional[int] |
|
security_context |
Optional[security.SecurityContext] |
|
auth_role |
Optional[_common_models.AuthRole] |
|
trigger |
Optional[LaunchPlanTriggerBase] |
|
overwrite_cache |
Optional[bool] |
|
auto_activate |
bool |
|
concurrency |
Optional[ConcurrencyPolicy] |
get_default_launch_plan()
def get_default_launch_plan(
ctx: FlyteContext,
workflow: _annotated_workflow.WorkflowBase,
) -> LaunchPlanUsers should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.
| Parameter | Type | Description |
|---|---|---|
ctx |
FlyteContext |
This is not flytekit.current_context(). This is an internal context object. Users familiar with flytekit should feel free to use this however. |
workflow |
_annotated_workflow.WorkflowBase |
The workflow to create a launch plan for. |
get_or_create()
def get_or_create(
workflow: _annotated_workflow.WorkflowBase,
name: Optional[str],
default_inputs: Optional[Dict[str, Any]],
fixed_inputs: Optional[Dict[str, Any]],
schedule: Optional[_schedule_model.Schedule],
notifications: Optional[List[_common_models.Notification]],
labels: Optional[_common_models.Labels],
annotations: Optional[_common_models.Annotations],
raw_output_data_config: Optional[_common_models.RawOutputDataConfig],
max_parallelism: Optional[int],
security_context: Optional[security.SecurityContext],
auth_role: Optional[_common_models.AuthRole],
trigger: Optional[LaunchPlanTriggerBase],
overwrite_cache: Optional[bool],
auto_activate: bool,
concurrency: Optional[ConcurrencyPolicy],
) -> LaunchPlanThis function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.
The resulting launch plan is also cached and if called again with the same name, the cached version is returned
| Parameter | Type | Description |
|---|---|---|
workflow |
_annotated_workflow.WorkflowBase |
The Workflow to create a launch plan for. |
name |
Optional[str] |
If you supply a name, keep it mind it needs to be unique. That is, project, domain, version, and this name form a primary key. If you do not supply a name, this function will assume you want the default launch plan for the given workflow. |
default_inputs |
Optional[Dict[str, Any]] |
Default inputs, expressed as Python values. |
fixed_inputs |
Optional[Dict[str, Any]] |
Fixed inputs, expressed as Python values. At call time, these cannot be changed. |
schedule |
Optional[_schedule_model.Schedule] |
Optional schedule to run on. |
notifications |
Optional[List[_common_models.Notification]] |
Notifications to send. |
labels |
Optional[_common_models.Labels] |
Optional labels to attach to executions created by this launch plan. |
annotations |
Optional[_common_models.Annotations] |
Optional annotations to attach to executions created by this launch plan. |
raw_output_data_config |
Optional[_common_models.RawOutputDataConfig] |
Optional location of offloaded data for things like S3, etc. |
max_parallelism |
Optional[int] |
Controls the maximum number of tasknodes that can be run in parallel for the entire workflow. This is useful to achieve fairness. Note: MapTasks are regarded as one unit, and parallelism/concurrency of MapTasks is independent from this. |
security_context |
Optional[security.SecurityContext] |
Security context for the execution |
auth_role |
Optional[_common_models.AuthRole] |
Add an auth role if necessary. |
trigger |
Optional[LaunchPlanTriggerBase] |
[alpha] This is a new syntax for specifying schedules. |
overwrite_cache |
Optional[bool] |
If set to True, the execution will always overwrite cache |
auto_activate |
bool |
If set to True, the launch plan will be activated automatically on registration. Default is False. |
concurrency |
Optional[ConcurrencyPolicy] |
Defines execution concurrency limits and policy when limit is reached |
Properties
| Property | Type | Description |
|---|---|---|
annotations |
||
concurrency |
||
fixed_inputs |
||
interface |
||
labels |
||
max_parallelism |
||
name |
||
notifications |
||
overwrite_cache |
||
parameters |
||
python_interface |
||
raw_output_data_config |
||
saved_inputs |
||
schedule |
||
security_context |
||
should_auto_activate |
||
trigger |
||
workflow |
union.PodTemplate
Custom PodTemplate specification for a Task.
class PodTemplate(
pod_spec: typing.Optional[ForwardRef('V1PodSpec')],
primary_container_name: str,
labels: typing.Optional[typing.Dict[str, str]],
annotations: typing.Optional[typing.Dict[str, str]],
)| Parameter | Type | Description |
|---|---|---|
pod_spec |
typing.Optional[ForwardRef('V1PodSpec')] |
|
primary_container_name |
str |
|
labels |
typing.Optional[typing.Dict[str, str]] |
|
annotations |
typing.Optional[typing.Dict[str, str]] |
Methods
| Method | Description |
|---|---|
version_hash() |
version_hash()
def version_hash()union.Resources
This class is used to specify both resource requests and resource limits.
Resources(cpu="1", mem="2048") # This is 1 CPU and 2 KB of memory
Resources(cpu="100m", mem="2Gi") # This is 1/10th of a CPU and 2 gigabytes of memory
Resources(cpu=0.5, mem=1024) # This is 500m CPU and 1 KB of memory
# For Kubernetes-based tasks, pods use ephemeral local storage for scratch space, caching, and for logs.
# This allocates 1Gi of such local storage.
Resources(ephemeral_storage="1Gi")When used together with @task(resources=), you a specific the request and limits with one object.
When the value is set to a tuple or list, the first value is the request and the
second value is the limit. If the value is a single value, then both the requests and limit is
set to that value. For example, the Resource(cpu=("1", "2"), mem=1024) will set the cpu request to 1, cpu limit to 2,
mem limit and request to 1024.
Persistent storage is not currently supported on the Flyte backend.
Please see the :std:ref:User Guide <cookbook:customizing task resources> for detailed examples.
Also refer to the
K8s conventions.
class Resources(
cpu: typing.Union[str, int, float, list, tuple, NoneType],
mem: typing.Union[str, int, list, tuple, NoneType],
gpu: typing.Union[str, int, list, tuple, NoneType],
ephemeral_storage: typing.Union[str, int, NoneType],
)| Parameter | Type | Description |
|---|---|---|
cpu |
typing.Union[str, int, float, list, tuple, NoneType] |
|
mem |
typing.Union[str, int, list, tuple, NoneType] |
|
gpu |
typing.Union[str, int, list, tuple, NoneType] |
|
ephemeral_storage |
typing.Union[str, int, NoneType] |
Methods
| Method | Description |
|---|---|
from_dict() |
|
from_json() |
|
to_dict() |
|
to_json() |
from_dict()
def from_dict(
d,
dialect,
)| Parameter | Type | Description |
|---|---|---|
d |
||
dialect |
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
) -> ~T| Parameter | Type | Description |
|---|---|---|
data |
typing.Union[str, bytes, bytearray] |
|
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
|
from_dict_kwargs |
typing.Any |
to_dict()
def to_dict()to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]| Parameter | Type | Description |
|---|---|---|
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
|
to_dict_kwargs |
typing.Any |
union.Secret
See :std:ref:cookbook:secrets for usage examples.
class Secret(
group: typing.Optional[str],
key: typing.Optional[str],
group_version: typing.Optional[str],
mount_requirement: <enum 'MountType'>,
env_var: typing.Optional[str],
)| Parameter | Type | Description |
|---|---|---|
group |
typing.Optional[str] |
|
key |
typing.Optional[str] |
|
group_version |
typing.Optional[str] |
|
mount_requirement |
<enum 'MountType'> |
|
env_var |
typing.Optional[str] |
Methods
| Method | Description |
|---|---|
from_flyte_idl() |
|
serialize_to_string() |
|
short_string() |
:rtype: Text. |
to_flyte_idl() |
from_flyte_idl()
def from_flyte_idl(
pb2_object: flyteidl.core.security_pb2.Secret,
) -> Secret| Parameter | Type | Description |
|---|---|---|
pb2_object |
flyteidl.core.security_pb2.Secret |
serialize_to_string()
def serialize_to_string()short_string()
def short_string():rtype: Text
to_flyte_idl()
def to_flyte_idl()Properties
| Property | Type | Description |
|---|---|---|
is_empty |
union.StructuredDataset
This is the user facing StructuredDataset class. Please don’t confuse it with the literals.StructuredDataset class (that is just a model, a Python class representation of the protobuf).
class StructuredDataset(
dataframe: typing.Optional[typing.Any],
uri: typing.Optional[str],
metadata: typing.Optional[literals.StructuredDatasetMetadata],
kwargs,
)| Parameter | Type | Description |
|---|---|---|
dataframe |
typing.Optional[typing.Any] |
|
uri |
typing.Optional[str] |
|
metadata |
typing.Optional[literals.StructuredDatasetMetadata] |
|
kwargs |
**kwargs |
Methods
| Method | Description |
|---|---|
all() |
|
column_names() |
|
columns() |
|
deserialize_structured_dataset() |
|
from_dict() |
|
from_json() |
|
iter() |
|
open() |
|
serialize_structured_dataset() |
|
set_literal() |
A public wrapper method to set the StructuredDataset Literal. |
to_dict() |
|
to_json() |
all()
def all()column_names()
def column_names()columns()
def columns()deserialize_structured_dataset()
def deserialize_structured_dataset(
info,
) -> StructuredDataset| Parameter | Type | Description |
|---|---|---|
info |
from_dict()
def from_dict(
d,
dialect,
)| Parameter | Type | Description |
|---|---|---|
d |
||
dialect |
from_json()
def from_json(
data: typing.Union[str, bytes, bytearray],
decoder: collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]],
from_dict_kwargs: typing.Any,
) -> ~T| Parameter | Type | Description |
|---|---|---|
data |
typing.Union[str, bytes, bytearray] |
|
decoder |
collections.abc.Callable[[typing.Union[str, bytes, bytearray]], dict[typing.Any, typing.Any]] |
|
from_dict_kwargs |
typing.Any |
iter()
def iter()open()
def open(
dataframe_type: Type[DF],
)| Parameter | Type | Description |
|---|---|---|
dataframe_type |
Type[DF] |
serialize_structured_dataset()
def serialize_structured_dataset()set_literal()
def set_literal(
ctx: FlyteContext,
expected: LiteralType,
)A public wrapper method to set the StructuredDataset Literal.
This method provides external access to the internal _set_literal method.
| Parameter | Type | Description |
|---|---|---|
ctx |
FlyteContext |
|
expected |
LiteralType |
to_dict()
def to_dict()to_json()
def to_json(
encoder: collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]],
to_dict_kwargs: typing.Any,
) -> typing.Union[str, bytes, bytearray]| Parameter | Type | Description |
|---|---|---|
encoder |
collections.abc.Callable[[typing.Any], typing.Union[str, bytes, bytearray]] |
|
to_dict_kwargs |
typing.Any |
Properties
| Property | Type | Description |
|---|---|---|
dataframe |
||
literal |
||
metadata |
union.UnionRemote
Main entrypoint for programmatically accessing a Flyte remote backend.
The term ‘remote’ is synonymous with ‘backend’ or ‘deployment’ and refers to a hosted instance of the Flyte platform, which comes with a Flyte Admin server on some known URI.
class UnionRemote(
config: typing.Optional[Union[Config, str]],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: typing.Optional[bool],
kwargs,
)Initialize a FlyteRemote object.
:type kwargs: All arguments that can be passed to create the SynchronousFlyteClient. These are usually grpc parameters, if you want to customize credentials, ssl handling etc.
| Parameter | Type | Description |
|---|---|---|
config |
typing.Optional[Union[Config, str]] |
|
default_project |
typing.Optional[str] |
default project to use when fetching or executing flyte entities. |
default_domain |
typing.Optional[str] |
default domain to use when fetching or executing flyte entities. |
data_upload_location |
str |
this is where all the default data will be uploaded when providing inputs. The default location - s3://my-s3-bucket/data works for sandbox/demo environment. Please override this for non-sandbox cases. |
interactive_mode_enabled |
typing.Optional[bool] |
If set to True, the FlyteRemote will pickle the task/workflow, if False, it will not. If set to None, then it will automatically detect if it is running in an interactive environment like a Jupyter notebook and enable interactive mode. |
kwargs |
**kwargs |
Methods
| Method | Description |
|---|---|
activate_launchplan() |
Given a launchplan, activate it, all previous versions are deactivated. |
approve() |
|
async_channel() |
|
auto() |
|
create_artifact() |
Create an artifact in FlyteAdmin. |
deactivate_launchplan() |
Given a launchplan, deactivate it, all previous versions are deactivated. |
deploy_app() |
Deploy an application. |
download() |
Download the data to the specified location. |
execute() |
Execute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity. |
execute_local_launch_plan() |
Execute a locally defined LaunchPlan. |
execute_local_task() |
Execute a @task-decorated function or TaskTemplate task. |
execute_local_workflow() |
Execute an @workflow decorated function. |
execute_reference_launch_plan() |
Execute a ReferenceLaunchPlan. |
execute_reference_task() |
Execute a ReferenceTask. |
execute_reference_workflow() |
Execute a ReferenceWorkflow. |
execute_remote_task_lp() |
Execute a FlyteTask, or FlyteLaunchplan. |
execute_remote_wf() |
Execute a FlyteWorkflow. |
fast_package() |
Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location. |
fast_register_workflow() |
Use this method to register a workflow with zip mode. |
fetch_active_launchplan() |
Returns the active version of the launch plan if it exists or returns None. |
fetch_execution() |
Fetch a workflow execution entity from flyte admin. |
fetch_launch_plan() |
Fetch a launchplan entity from flyte admin. |
fetch_task() |
Fetch a task entity from flyte admin. |
fetch_task_lazy() |
Similar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily. |
fetch_workflow() |
Fetch a workflow entity from flyte admin. |
fetch_workflow_lazy() |
Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily. |
find_launch_plan() |
|
find_launch_plan_for_node() |
|
for_endpoint() |
|
for_sandbox() |
|
from_api_key() |
Call this if you want to directly instantiate a UnionRemote from an API key. |
generate_console_http_domain() |
This should generate the domain where console is hosted. |
generate_console_url() |
Generate a UnionAI console URL for the given Flyte remote endpoint. |
get() |
General function that works with flyte tiny urls. |
get_artifact() |
Get the specified artifact. |
get_domains() |
Lists registered domains from flyte admin. |
get_execution_metrics() |
Get the metrics for a given execution. |
get_extra_headers_for_protocol() |
|
launch_backfill() |
Creates and launches a backfill workflow for the given launchplan. |
list_projects() |
Lists registered projects from flyte admin. |
list_signals() |
|
list_tasks_by_version() |
|
raw_register() |
Raw register method, can be used to register control plane entities. |
recent_executions() |
|
register_launch_plan() |
Register a given launchplan, possibly applying overrides from the provided options. |
register_script() |
Use this method to register a workflow via script mode. |
register_task() |
Register a qualified task (PythonTask) with Remote. |
register_workflow() |
Use this method to register a workflow. |
reject() |
|
remote_context() |
Context manager with remote-specific configuration. |
search_artifacts() |
|
set_input() |
|
set_signal() |
|
stop_app() |
Stop an application. |
stream_execution_events() |
Stream execution events from the given tenant. |
sync() |
This function was previously a singledispatchmethod. |
sync_execution() |
Sync a FlyteWorkflowExecution object with its corresponding remote state. |
sync_node_execution() |
Get data backing a node execution. |
sync_task_execution() |
Sync a FlyteTaskExecution object with its corresponding remote state. |
terminate() |
Terminate a workflow execution. |
upload_file() |
Function will use remote’s client to hash and then upload the file using Admin’s data proxy service. |
wait() |
Wait for an execution to finish. |
activate_launchplan()
def activate_launchplan(
ident: Identifier,
)Given a launchplan, activate it, all previous versions are deactivated.
| Parameter | Type | Description |
|---|---|---|
ident |
Identifier |
approve()
def approve(
signal_id: str,
execution_name: str,
project: str,
domain: str,
)| Parameter | Type | Description |
|---|---|---|
signal_id |
str |
The name of the signal, this is the key used in the approve() or wait_for_input() call. |
execution_name |
str |
The name of the execution. This is the tail-end of the URL when looking at the workflow execution. |
project |
str |
The execution project, will default to the Remote’s default project. |
domain |
str |
The execution domain, will default to the Remote’s default domain. |
async_channel()
def async_channel()auto()
def auto(
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
) -> 'FlyteRemote'| Parameter | Type | Description |
|---|---|---|
config_file |
typing.Union[str, ConfigFile] |
|
default_project |
typing.Optional[str] |
|
default_domain |
typing.Optional[str] |
|
data_upload_location |
str |
|
interactive_mode_enabled |
bool |
|
kwargs |
**kwargs |
create_artifact()
def create_artifact(
artifact: Artifact,
) -> ArtifactCreate an artifact in FlyteAdmin.
| Parameter | Type | Description |
|---|---|---|
artifact |
Artifact |
The artifact to create. :return: The artifact as persisted in the service. |
deactivate_launchplan()
def deactivate_launchplan(
ident: Identifier,
)Given a launchplan, deactivate it, all previous versions are deactivated.
| Parameter | Type | Description |
|---|---|---|
ident |
Identifier |
deploy_app()
def deploy_app(
app: App,
project: Optional[str],
domain: Optional[str],
) -> AppIDLDeploy an application.
| Parameter | Type | Description |
|---|---|---|
app |
App |
Application to deploy. |
project |
Optional[str] |
Domain name. If None, uses default_domain. :return: The App IDL for the deployed application. |
domain |
Optional[str] |
download()
def download(
data: typing.Union[LiteralsResolver, Literal, LiteralMap],
download_to: str,
recursive: bool,
)Download the data to the specified location. If the data is a LiteralsResolver, LiteralMap and if recursive is specified, then all file like objects will be recursively downloaded (e.g. FlyteFile/Dir (blob), StructuredDataset etc).
Note: That it will use your sessions credentials to access the remote location. For sandbox, this should be automatically configured, assuming you are running sandbox locally. For other environments, you will need to configure your credentials appropriately.
| Parameter | Type | Description |
|---|---|---|
data |
typing.Union[LiteralsResolver, Literal, LiteralMap] |
data to be downloaded |
download_to |
str |
location to download to (str) that should be a valid path |
recursive |
bool |
if the data is a LiteralsResolver or LiteralMap, then this flag will recursively download |
execute()
def execute(
entity: typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity],
inputs: typing.Optional[typing.Dict[str, typing.Any]],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteWorkflowExecutionExecute a task, workflow, or launchplan, either something that’s been declared locally, or a fetched entity.
This method supports:
Flyte{Task, Workflow, LaunchPlan}remote module objects.@task-decorated functions andTaskTemplatetasks.@workflow-decorated functions.LaunchPlanobjects.
For local entities, this code will attempt to find the entity first, and if missing, will compile and register the object.
Not all arguments are relevant in all circumstances. For example, there’s no reason to use the serialization settings for entities that have already been registered on Admin.
| Parameter | Type | Description |
|---|---|---|
entity |
typing.Union[FlyteTask, FlyteLaunchPlan, FlyteWorkflow, PythonTask, WorkflowBase, LaunchPlan, ReferenceEntity] |
entity to execute |
inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
dictionary mapping argument names to values |
project |
str |
execute entity in this project. If entity doesn’t exist in the project, register the entity first before executing. |
domain |
str |
execute entity in this domain. If entity doesn’t exist in the domain, register the entity first before executing. |
name |
str |
execute entity using this name. If not None, use this value instead of entity.name |
version |
str |
execute entity using this version. If None, uses auto-generated value. |
execution_name |
typing.Optional[str] |
name of the execution. If None, uses auto-generated value. |
execution_name_prefix |
typing.Optional[str] |
execution prefix to use. If provided, a random suffix will be appended |
image_config |
typing.Optional[ImageConfig] |
|
options |
typing.Optional[Options] |
|
wait |
bool |
if True, waits for execution to complete |
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
Python types to be passed to the TypeEngine so that it knows how to properly convert the input values for the execution into Flyte literals. If missing, will default to first guessing the type using the type engine, and then to type(v). Providing the correct Python types is particularly important if the inputs are containers like lists or maps, or if the Python type is one of the more complex Flyte provided classes (like a StructuredDataset that’s annotated with columns). |
overwrite_cache |
typing.Optional[bool] |
Allows for all cached values of a workflow and its tasks to be overwritten for a single execution. If enabled, all calculations are performed even if cached results would be available, overwriting the stored data once execution finishes successfully. |
interruptible |
typing.Optional[bool] |
Optional flag to override the default interruptible flag of the executed entity. |
envs |
typing.Optional[typing.Dict[str, str]] |
Environment variables to be set for the execution. |
tags |
typing.Optional[typing.List[str]] |
Tags to be set for the execution. |
cluster_pool |
typing.Optional[str] |
Specify cluster pool on which newly created execution should be placed. |
execution_cluster_label |
typing.Optional[str] |
Specify label of cluster(s) on which newly created execution should be placed. |
serialization_settings |
typing.Optional[SerializationSettings] |
Optionally provide serialization settings, in case the entity being run needs to first be registered. If not provided, a default will be used. > [!NOTE] > The name and version arguments do not apply to FlyteTask, FlyteLaunchPlan, and FlyteWorkflow entity inputs. These values are determined by referencing the entity identifier values. |
execute_local_launch_plan()
def execute_local_launch_plan(
entity: LaunchPlan,
inputs: typing.Optional[typing.Dict[str, typing.Any]],
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
name: typing.Optional[str],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteWorkflowExecutionExecute a locally defined LaunchPlan.
| Parameter | Type | Description |
|---|---|---|
entity |
LaunchPlan |
The locally defined launch plan object |
inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
Inputs to be passed into the execution as a dict with Python native values. |
version |
str |
The version to look up/register the launch plan (if not already exists) |
project |
typing.Optional[str] |
The same as version, but will default to the Remote object’s project |
domain |
typing.Optional[str] |
The same as version, but will default to the Remote object’s domain |
name |
typing.Optional[str] |
The same as version, but will default to the entity’s name |
execution_name |
typing.Optional[str] |
If specified, will be used as the execution name instead of randomly generating. |
execution_name_prefix |
typing.Optional[str] |
|
options |
typing.Optional[Options] |
Options to be passed into the execution. |
wait |
bool |
If True, will wait for the execution to complete before returning. |
overwrite_cache |
typing.Optional[bool] |
If True, will overwrite the cache. |
interruptible |
typing.Optional[bool] |
Optional flag to override the default interruptible flag of the executed entity. |
envs |
typing.Optional[typing.Dict[str, str]] |
Environment variables to be passed into the execution. |
tags |
typing.Optional[typing.List[str]] |
Tags to be passed into the execution. |
cluster_pool |
typing.Optional[str] |
Specify cluster pool on which newly created execution should be placed. |
execution_cluster_label |
typing.Optional[str] |
Specify label of cluster(s) on which newly created execution should be placed. |
serialization_settings |
typing.Optional[SerializationSettings] |
Optionally provide serialization settings, in case the entity being run needs :return: FlyteWorkflowExecution object |
execute_local_task()
def execute_local_task(
entity: PythonTask,
inputs: typing.Optional[typing.Dict[str, typing.Any]],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteWorkflowExecutionExecute a @task-decorated function or TaskTemplate task.
| Parameter | Type | Description |
|---|---|---|
entity |
PythonTask |
local task entity. |
inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
register the task, which requires compiling the task, before running it. |
project |
str |
The execution project, will default to the Remote’s default project. |
domain |
str |
The execution domain, will default to the Remote’s default domain. |
name |
str |
specific name of the task to run. |
version |
str |
specific version of the task to run, default is a special string latest, which implies latest version by time |
execution_name |
typing.Optional[str] |
If provided, will use this name for the execution. |
execution_name_prefix |
typing.Optional[str] |
If provided, will use this prefix for the execution name. |
image_config |
typing.Optional[ImageConfig] |
If provided, will use this image config in the pod. |
wait |
bool |
If True, will wait for the execution to complete before returning. |
overwrite_cache |
typing.Optional[bool] |
If True, will overwrite the cache. |
interruptible |
typing.Optional[bool] |
Optional flag to override the default interruptible flag of the executed entity. |
envs |
typing.Optional[typing.Dict[str, str]] |
Environment variables to set for the execution. |
tags |
typing.Optional[typing.List[str]] |
Tags to set for the execution. |
cluster_pool |
typing.Optional[str] |
Specify cluster pool on which newly created execution should be placed. |
execution_cluster_label |
typing.Optional[str] |
Specify label of cluster(s) on which newly created execution should be placed. |
options |
typing.Optional[Options] |
Options to customize the execution. |
serialization_settings |
typing.Optional[SerializationSettings] |
If the task needs to be registered, this can be passed in. :return: FlyteWorkflowExecution object. |
execute_local_workflow()
def execute_local_workflow(
entity: WorkflowBase,
inputs: typing.Optional[typing.Dict[str, typing.Any]],
project: str,
domain: str,
name: str,
version: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
image_config: typing.Optional[ImageConfig],
options: typing.Optional[Options],
wait: bool,
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteWorkflowExecutionExecute an @workflow decorated function.
| Parameter | Type | Description |
|---|---|---|
entity |
WorkflowBase |
The workflow to execute |
inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
Input dictionary |
project |
str |
Project to execute in |
domain |
str |
Domain to execute in |
name |
str |
Optional name override for the workflow |
version |
str |
Optional version for the workflow |
execution_name |
typing.Optional[str] |
Optional name for the execution |
execution_name_prefix |
typing.Optional[str] |
|
image_config |
typing.Optional[ImageConfig] |
Optional image config override |
options |
typing.Optional[Options] |
Optional Options object |
wait |
bool |
Whether to wait for execution completion |
overwrite_cache |
typing.Optional[bool] |
If True, will overwrite the cache |
interruptible |
typing.Optional[bool] |
Optional flag to override the default interruptible flag of the executed entity |
envs |
typing.Optional[typing.Dict[str, str]] |
Environment variables to set for the execution |
tags |
typing.Optional[typing.List[str]] |
Tags to set for the execution |
cluster_pool |
typing.Optional[str] |
Specify cluster pool on which newly created execution should be placed |
execution_cluster_label |
typing.Optional[str] |
Specify label of cluster(s) on which newly created execution should be placed |
serialization_settings |
typing.Optional[SerializationSettings] |
Optionally provide serialization settings, in case the entity being run needs to be registered :return: FlyteWorkflowExecution object |
execute_reference_launch_plan()
def execute_reference_launch_plan(
entity: ReferenceLaunchPlan,
inputs: typing.Optional[typing.Dict[str, typing.Any]],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecutionExecute a ReferenceLaunchPlan.
| Parameter | Type | Description |
|---|---|---|
entity |
ReferenceLaunchPlan |
|
inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
|
execution_name |
typing.Optional[str] |
|
execution_name_prefix |
typing.Optional[str] |
|
options |
typing.Optional[Options] |
|
wait |
bool |
|
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
|
overwrite_cache |
typing.Optional[bool] |
|
interruptible |
typing.Optional[bool] |
|
envs |
typing.Optional[typing.Dict[str, str]] |
|
tags |
typing.Optional[typing.List[str]] |
|
cluster_pool |
typing.Optional[str] |
|
execution_cluster_label |
typing.Optional[str] |
execute_reference_task()
def execute_reference_task(
entity: ReferenceTask,
inputs: typing.Optional[typing.Dict[str, typing.Any]],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecutionExecute a ReferenceTask.
| Parameter | Type | Description |
|---|---|---|
entity |
ReferenceTask |
|
inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
|
execution_name |
typing.Optional[str] |
|
execution_name_prefix |
typing.Optional[str] |
|
options |
typing.Optional[Options] |
|
wait |
bool |
|
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
|
overwrite_cache |
typing.Optional[bool] |
|
interruptible |
typing.Optional[bool] |
|
envs |
typing.Optional[typing.Dict[str, str]] |
|
tags |
typing.Optional[typing.List[str]] |
|
cluster_pool |
typing.Optional[str] |
|
execution_cluster_label |
typing.Optional[str] |
execute_reference_workflow()
def execute_reference_workflow(
entity: ReferenceWorkflow,
inputs: typing.Optional[typing.Dict[str, typing.Any]],
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecutionExecute a ReferenceWorkflow.
| Parameter | Type | Description |
|---|---|---|
entity |
ReferenceWorkflow |
|
inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
|
execution_name |
typing.Optional[str] |
|
execution_name_prefix |
typing.Optional[str] |
|
options |
typing.Optional[Options] |
|
wait |
bool |
|
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
|
overwrite_cache |
typing.Optional[bool] |
|
interruptible |
typing.Optional[bool] |
|
envs |
typing.Optional[typing.Dict[str, str]] |
|
tags |
typing.Optional[typing.List[str]] |
|
cluster_pool |
typing.Optional[str] |
|
execution_cluster_label |
typing.Optional[str] |
execute_remote_task_lp()
def execute_remote_task_lp(
entity: typing.Union[FlyteTask, FlyteLaunchPlan],
inputs: typing.Optional[typing.Dict[str, typing.Any]],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecutionExecute a FlyteTask, or FlyteLaunchplan.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
| Parameter | Type | Description |
|---|---|---|
entity |
typing.Union[FlyteTask, FlyteLaunchPlan] |
|
inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
|
project |
str |
|
domain |
str |
|
execution_name |
typing.Optional[str] |
|
execution_name_prefix |
typing.Optional[str] |
|
options |
typing.Optional[Options] |
|
wait |
bool |
|
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
|
overwrite_cache |
typing.Optional[bool] |
|
interruptible |
typing.Optional[bool] |
|
envs |
typing.Optional[typing.Dict[str, str]] |
|
tags |
typing.Optional[typing.List[str]] |
|
cluster_pool |
typing.Optional[str] |
|
execution_cluster_label |
typing.Optional[str] |
execute_remote_wf()
def execute_remote_wf(
entity: FlyteWorkflow,
inputs: typing.Optional[typing.Dict[str, typing.Any]],
project: str,
domain: str,
execution_name: typing.Optional[str],
execution_name_prefix: typing.Optional[str],
options: typing.Optional[Options],
wait: bool,
type_hints: typing.Optional[typing.Dict[str, typing.Type]],
overwrite_cache: typing.Optional[bool],
interruptible: typing.Optional[bool],
envs: typing.Optional[typing.Dict[str, str]],
tags: typing.Optional[typing.List[str]],
cluster_pool: typing.Optional[str],
execution_cluster_label: typing.Optional[str],
) -> FlyteWorkflowExecutionExecute a FlyteWorkflow.
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
| Parameter | Type | Description |
|---|---|---|
entity |
FlyteWorkflow |
|
inputs |
typing.Optional[typing.Dict[str, typing.Any]] |
|
project |
str |
|
domain |
str |
|
execution_name |
typing.Optional[str] |
|
execution_name_prefix |
typing.Optional[str] |
|
options |
typing.Optional[Options] |
|
wait |
bool |
|
type_hints |
typing.Optional[typing.Dict[str, typing.Type]] |
|
overwrite_cache |
typing.Optional[bool] |
|
interruptible |
typing.Optional[bool] |
|
envs |
typing.Optional[typing.Dict[str, str]] |
|
tags |
typing.Optional[typing.List[str]] |
|
cluster_pool |
typing.Optional[str] |
|
execution_cluster_label |
typing.Optional[str] |
fast_package()
def fast_package(
root: os.PathLike,
deref_symlinks: bool,
output: str,
options: typing.Optional[FastPackageOptions],
) -> typing.Tuple[bytes, str]Packages the given paths into an installable zip and returns the md5_bytes and the URL of the uploaded location
| Parameter | Type | Description |
|---|---|---|
root |
os.PathLike |
path to the root of the package system that should be uploaded |
deref_symlinks |
bool |
if symlinks should be dereferenced. Defaults to True |
output |
str |
output path. Optional, will default to a tempdir |
options |
typing.Optional[FastPackageOptions] |
additional options to customize fast_package behavior :return: md5_bytes, url |
fast_register_workflow()
def fast_register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
fast_package_options: typing.Optional[FastPackageOptions],
) -> FlyteWorkflowUse this method to register a workflow with zip mode.
| Parameter | Type | Description |
|---|---|---|
entity |
WorkflowBase |
The workflow to be registered |
serialization_settings |
typing.Optional[SerializationSettings] |
The serialization settings to be used |
version |
typing.Optional[str] |
version for the entity to be registered as |
default_launch_plan |
typing.Optional[bool] |
This should be true if a default launch plan should be created for the workflow |
options |
typing.Optional[Options] |
Additional execution options that can be configured for the default launchplan |
fast_package_options |
typing.Optional[FastPackageOptions] |
Options to customize copying behavior :return: |
fetch_active_launchplan()
def fetch_active_launchplan(
project: str,
domain: str,
name: str,
) -> typing.Optional[FlyteLaunchPlan]Returns the active version of the launch plan if it exists or returns None
| Parameter | Type | Description |
|---|---|---|
project |
str |
|
domain |
str |
|
name |
str |
fetch_execution()
def fetch_execution(
project: str,
domain: str,
name: str,
) -> FlyteWorkflowExecutionFetch a workflow execution entity from flyte admin.
| Parameter | Type | Description |
|---|---|---|
project |
str |
fetch entity from this project. If None, uses the default_project attribute. |
domain |
str |
fetch entity from this domain. If None, uses the default_domain attribute. |
name |
str |
fetch entity with matching name. :returns: :class:~flytekit.remote.workflow_execution.FlyteWorkflowExecution :raises: FlyteAssertion if name is None |
fetch_launch_plan()
def fetch_launch_plan(
project: str,
domain: str,
name: str,
version: str,
) -> FlyteLaunchPlanFetch a launchplan entity from flyte admin.
| Parameter | Type | Description |
|---|---|---|
project |
str |
fetch entity from this project. If None, uses the default_project attribute. |
domain |
str |
fetch entity from this domain. If None, uses the default_domain attribute. |
name |
str |
fetch entity with matching name. |
version |
str |
fetch entity with matching version. If None, gets the latest version of the entity. :returns: :class:~flytekit.remote.launch_plan.FlyteLaunchPlan :raises: FlyteAssertion if name is None |
fetch_task()
def fetch_task(
project: str,
domain: str,
name: str,
version: str,
) -> FlyteTaskFetch a task entity from flyte admin.
| Parameter | Type | Description |
|---|---|---|
project |
str |
fetch entity from this project. If None, uses the default_project attribute. |
domain |
str |
fetch entity from this domain. If None, uses the default_domain attribute. |
name |
str |
fetch entity with matching name. |
version |
str |
fetch entity with matching version. If None, gets the latest version of the entity. :returns: :class:~flytekit.remote.tasks.task.FlyteTask :raises: FlyteAssertion if name is None |
fetch_task_lazy()
def fetch_task_lazy(
project: str,
domain: str,
name: str,
version: str,
) -> LazyEntitySimilar to fetch_task, just that it returns a LazyEntity, which will fetch the workflow lazily.
| Parameter | Type | Description |
|---|---|---|
project |
str |
|
domain |
str |
|
name |
str |
|
version |
str |
fetch_workflow()
def fetch_workflow(
project: str,
domain: str,
name: str,
version: str,
) -> FlyteWorkflowFetch a workflow entity from flyte admin.
| Parameter | Type | Description |
|---|---|---|
project |
str |
fetch entity from this project. If None, uses the default_project attribute. |
domain |
str |
fetch entity from this domain. If None, uses the default_domain attribute. |
name |
str |
fetch entity with matching name. |
version |
str |
fetch entity with matching version. If None, gets the latest version of the entity. :raises: FlyteAssertion if name is None |
fetch_workflow_lazy()
def fetch_workflow_lazy(
project: str,
domain: str,
name: str,
version: str,
) -> LazyEntity[FlyteWorkflow]Similar to fetch_workflow, just that it returns a LazyEntity, which will fetch the workflow lazily.
| Parameter | Type | Description |
|---|---|---|
project |
str |
|
domain |
str |
|
name |
str |
|
version |
str |
find_launch_plan()
def find_launch_plan(
lp_ref: id_models,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
)| Parameter | Type | Description |
|---|---|---|
lp_ref |
id_models |
|
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
find_launch_plan_for_node()
def find_launch_plan_for_node(
node: Node,
node_launch_plans: Dict[id_models, launch_plan_models.LaunchPlanSpec],
)| Parameter | Type | Description |
|---|---|---|
node |
Node |
|
node_launch_plans |
Dict[id_models, launch_plan_models.LaunchPlanSpec] |
for_endpoint()
def for_endpoint(
endpoint: str,
insecure: bool,
data_config: typing.Optional[DataConfig],
config_file: typing.Union[str, ConfigFile],
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
) -> 'FlyteRemote'| Parameter | Type | Description |
|---|---|---|
endpoint |
str |
|
insecure |
bool |
|
data_config |
typing.Optional[DataConfig] |
|
config_file |
typing.Union[str, ConfigFile] |
|
default_project |
typing.Optional[str] |
|
default_domain |
typing.Optional[str] |
|
data_upload_location |
str |
|
interactive_mode_enabled |
bool |
|
kwargs |
**kwargs |
for_sandbox()
def for_sandbox(
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
interactive_mode_enabled: bool,
kwargs,
) -> 'FlyteRemote'| Parameter | Type | Description |
|---|---|---|
default_project |
typing.Optional[str] |
|
default_domain |
typing.Optional[str] |
|
data_upload_location |
str |
|
interactive_mode_enabled |
bool |
|
kwargs |
**kwargs |
from_api_key()
def from_api_key(
api_key: str,
default_project: typing.Optional[str],
default_domain: typing.Optional[str],
data_upload_location: str,
kwargs,
) -> 'UnionRemote'Call this if you want to directly instantiate a UnionRemote from an API key
| Parameter | Type | Description |
|---|---|---|
api_key |
str |
|
default_project |
typing.Optional[str] |
|
default_domain |
typing.Optional[str] |
|
data_upload_location |
str |
|
kwargs |
**kwargs |
generate_console_http_domain()
def generate_console_http_domain()This should generate the domain where console is hosted.
:return:
generate_console_url()
def generate_console_url(
entity: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, FlyteLaunchPlan, Artifact],
)Generate a UnionAI console URL for the given Flyte remote endpoint. It will also handle Union AI specific entities like Artifacts.
This will automatically determine if this is an execution or an entity and change the type automatically.
| Parameter | Type | Description |
|---|---|---|
entity |
typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution, FlyteWorkflow, FlyteTask, FlyteLaunchPlan, Artifact] |
get()
def get(
uri: typing.Optional[str],
) -> typing.Optional[typing.Union[LiteralsResolver, Literal, bytes]]General function that works with flyte tiny urls. This can return outputs (in the form of LiteralsResolver, or individual Literals for singular requests), or HTML if passed a deck link, or bytes containing HTML, if ipython is not available locally.
| Parameter | Type | Description |
|---|---|---|
uri |
typing.Optional[str] |
get_artifact()
def get_artifact(
uri: typing.Optional[str],
artifact_key: typing.Optional[art_id.ArtifactKey],
artifact_id: typing.Optional[art_id.ArtifactID],
query: typing.Optional[typing.Union[art_id.ArtifactQuery, ArtifactQuery]],
get_details: bool,
) -> typing.Optional[Artifact]Get the specified artifact.
| Parameter | Type | Description |
|---|---|---|
uri |
typing.Optional[str] |
An artifact URI. |
artifact_key |
typing.Optional[art_id.ArtifactKey] |
An artifact key. |
artifact_id |
typing.Optional[art_id.ArtifactID] |
The artifact ID. |
query |
typing.Optional[typing.Union[art_id.ArtifactQuery, ArtifactQuery]] |
An artifact query. |
get_details |
bool |
A bool to indicate whether or not to return artifact details. :return: The artifact as persisted in the service. |
get_domains()
def get_domains()Lists registered domains from flyte admin.
:returns: typing.List[flytekit.models.domain.Domain]
get_execution_metrics()
def get_execution_metrics(
id: WorkflowExecutionIdentifier,
depth: int,
) -> FlyteExecutionSpanGet the metrics for a given execution.
| Parameter | Type | Description |
|---|---|---|
id |
WorkflowExecutionIdentifier |
|
depth |
int |
get_extra_headers_for_protocol()
def get_extra_headers_for_protocol(
native_url,
)| Parameter | Type | Description |
|---|---|---|
native_url |
launch_backfill()
def launch_backfill(
project: str,
domain: str,
from_date: datetime,
to_date: datetime,
launchplan: str,
launchplan_version: str,
execution_name: str,
version: str,
dry_run: bool,
execute: bool,
parallel: bool,
failure_policy: typing.Optional[WorkflowFailurePolicy],
overwrite_cache: typing.Optional[bool],
) -> typing.Optional[FlyteWorkflowExecution, FlyteWorkflow, WorkflowBase]Creates and launches a backfill workflow for the given launchplan. If launchplan version is not specified, then the latest launchplan is retrieved. The from_date is exclusive and end_date is inclusive and backfill run for all instances in between. :: -> (start_date - exclusive, end_date inclusive)
If dry_run is specified, the workflow is created and returned. If execute==False is specified then the workflow is created and registered. In the last case, the workflow is created, registered and executed.
The parallel flag can be used to generate a workflow where all launchplans can be run in parallel. Default
is that execute backfill is run sequentially
| Parameter | Type | Description |
|---|---|---|
project |
str |
str project name |
domain |
str |
str domain name |
from_date |
datetime |
datetime generate a backfill starting at this datetime (exclusive) |
to_date |
datetime |
datetime generate a backfill ending at this datetime (inclusive) |
launchplan |
str |
str launchplan name in the flyte backend |
launchplan_version |
str |
str (optional) version for the launchplan. If not specified the most recent will be retrieved |
execution_name |
str |
str (optional) the generated execution will be named so. this can help in ensuring idempotency |
version |
str |
str (optional) version to be used for the newly created workflow. |
dry_run |
bool |
bool do not register or execute the workflow |
execute |
bool |
bool Register and execute the wwkflow. |
parallel |
bool |
if the backfill should be run in parallel. False (default) will run each bacfill sequentially. |
failure_policy |
typing.Optional[WorkflowFailurePolicy] |
WorkflowFailurePolicy (optional) to be used for the newly created workflow. This can control failure behavior - whether to continue on failure or stop immediately on failure |
overwrite_cache |
typing.Optional[bool] |
if True, will overwrite the cache. :return: In case of dry-run, return WorkflowBase, else if no_execute return FlyteWorkflow else in the default case return a FlyteWorkflowExecution |
list_projects()
def list_projects(
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
sort_by: typing.Optional[admin_common_models.Sort],
) -> typing.List[Project]Lists registered projects from flyte admin.
| Parameter | Type | Description |
|---|---|---|
limit |
typing.Optional[int] |
[Optional[int]] The maximum number of entries to return. |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
|
sort_by |
typing.Optional[admin_common_models.Sort] |
list_signals()
def list_signals(
execution_name: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: int,
filters: typing.Optional[typing.List[filter_models.Filter]],
) -> typing.List[Signal]| Parameter | Type | Description |
|---|---|---|
execution_name |
str |
The name of the execution. This is the tailend of the URL when looking at the workflow execution. |
project |
typing.Optional[str] |
The execution project, will default to the Remote’s default project. |
domain |
typing.Optional[str] |
The execution domain, will default to the Remote’s default domain. |
limit |
int |
The number of signals to fetch |
filters |
typing.Optional[typing.List[filter_models.Filter]] |
Optional list of filters |
list_tasks_by_version()
def list_tasks_by_version(
version: str,
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
) -> typing.List[FlyteTask]| Parameter | Type | Description |
|---|---|---|
version |
str |
|
project |
typing.Optional[str] |
|
domain |
typing.Optional[str] |
|
limit |
typing.Optional[int] |
raw_register()
def raw_register(
cp_entity: FlyteControlPlaneEntity,
settings: SerializationSettings,
version: str,
create_default_launchplan: bool,
options: Options,
og_entity: FlyteLocalEntity,
) -> typing.Optional[Identifier]Raw register method, can be used to register control plane entities. Usually if you have a Flyte Entity like a WorkflowBase, Task, LaunchPlan then use other methods. This should be used only if you have already serialized entities
| Parameter | Type | Description |
|---|---|---|
cp_entity |
FlyteControlPlaneEntity |
The controlplane “serializable” version of a flyte entity. This is in the form that FlyteAdmin understands. |
settings |
SerializationSettings |
SerializationSettings to be used for registration - especially to identify the id |
version |
str |
Version to be registered |
create_default_launchplan |
bool |
boolean that indicates if a default launch plan should be created |
options |
Options |
Options to be used if registering a default launch plan |
og_entity |
FlyteLocalEntity |
Pass in the original workflow (flytekit type) if create_default_launchplan is true :return: Identifier of the created entity |
recent_executions()
def recent_executions(
project: typing.Optional[str],
domain: typing.Optional[str],
limit: typing.Optional[int],
filters: typing.Optional[typing.List[filter_models.Filter]],
) -> typing.List[FlyteWorkflowExecution]| Parameter | Type | Description |
|---|---|---|
project |
typing.Optional[str] |
|
domain |
typing.Optional[str] |
|
limit |
typing.Optional[int] |
|
filters |
typing.Optional[typing.List[filter_models.Filter]] |
register_launch_plan()
def register_launch_plan(
entity: LaunchPlan,
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
options: typing.Optional[Options],
serialization_settings: typing.Optional[SerializationSettings],
) -> FlyteLaunchPlanRegister a given launchplan, possibly applying overrides from the provided options. If the underlying workflow is not already registered, it, along with any underlying entities, will also be registered. If the underlying workflow does exist (with the given project/domain/version), then only the launchplan will be registered.
| Parameter | Type | Description |
|---|---|---|
entity |
LaunchPlan |
Launchplan to be registered |
version |
typing.Optional[str] |
Version to be registered for the launch plan, and used to check (and register) underlying wf |
project |
typing.Optional[str] |
Optionally provide a project, if not already provided in flyteremote constructor or a separate one |
domain |
typing.Optional[str] |
Optionally provide a domain, if not already provided in FlyteRemote constructor or a separate one |
options |
typing.Optional[Options] |
|
serialization_settings |
typing.Optional[SerializationSettings] |
Optionally provide serialization settings, if not provided, will use the default |
register_script()
def register_script(
entity: typing.Union[WorkflowBase, PythonTask, LaunchPlan],
image_config: typing.Optional[ImageConfig],
version: typing.Optional[str],
project: typing.Optional[str],
domain: typing.Optional[str],
destination_dir: str,
copy_all: bool,
default_launch_plan: bool,
options: typing.Optional[Options],
source_path: typing.Optional[str],
module_name: typing.Optional[str],
envs: typing.Optional[typing.Dict[str, str]],
default_resources: typing.Optional[ResourceSpec],
fast_package_options: typing.Optional[FastPackageOptions],
) -> typing.Union[FlyteWorkflow, FlyteTask, FlyteLaunchPlan, ReferenceEntity]Use this method to register a workflow via script mode.
| Parameter | Type | Description |
|---|---|---|
entity |
typing.Union[WorkflowBase, PythonTask, LaunchPlan] |
The workflow to be registered or the task to be registered |
image_config |
typing.Optional[ImageConfig] |
The image config to use for the workflow. |
version |
typing.Optional[str] |
version for the entity to be registered as |
project |
typing.Optional[str] |
The project to register the workflow in. |
domain |
typing.Optional[str] |
The domain to register the workflow in. |
destination_dir |
str |
The destination directory where the workflow will be copied to. |
copy_all |
bool |
[deprecated] Please use the copy_style field in fast_package_options instead. |
default_launch_plan |
bool |
This should be true if a default launch plan should be created for the workflow |
options |
typing.Optional[Options] |
Additional execution options that can be configured for the default launchplan |
source_path |
typing.Optional[str] |
The root of the project path |
module_name |
typing.Optional[str] |
the name of the module |
envs |
typing.Optional[typing.Dict[str, str]] |
Environment variables to be passed to the serialization |
default_resources |
typing.Optional[ResourceSpec] |
Default resources to be passed to the serialization. These override the resource spec for any tasks that have no statically defined resource requests and limits. |
fast_package_options |
typing.Optional[FastPackageOptions] |
Options to customize copy_all behavior, ignored when copy_all is False. :return: |
register_task()
def register_task(
entity: PythonTask,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
) -> FlyteTaskRegister a qualified task (PythonTask) with Remote For any conflicting parameters method arguments are regarded as overrides
| Parameter | Type | Description |
|---|---|---|
entity |
PythonTask |
PythonTask can be either @task or a instance of a Task class |
serialization_settings |
typing.Optional[SerializationSettings] |
Settings that will be used to override various serialization parameters. |
version |
typing.Optional[str] |
version that will be used to register. If not specified will default to using the serialization settings default :return: |
register_workflow()
def register_workflow(
entity: WorkflowBase,
serialization_settings: typing.Optional[SerializationSettings],
version: typing.Optional[str],
default_launch_plan: typing.Optional[bool],
options: typing.Optional[Options],
) -> FlyteWorkflowUse this method to register a workflow.
| Parameter | Type | Description |
|---|---|---|
entity |
WorkflowBase |
The workflow to be registered |
serialization_settings |
typing.Optional[SerializationSettings] |
The serialization settings to be used |
version |
typing.Optional[str] |
version for the entity to be registered as |
default_launch_plan |
typing.Optional[bool] |
This should be true if a default launch plan should be created for the workflow |
options |
typing.Optional[Options] |
Additional execution options that can be configured for the default launchplan :return: |
reject()
def reject(
signal_id: str,
execution_name: str,
project: str,
domain: str,
)| Parameter | Type | Description |
|---|---|---|
signal_id |
str |
The name of the signal, this is the key used in the approve() or wait_for_input() call. |
execution_name |
str |
The name of the execution. This is the tail-end of the URL when looking at the workflow execution. |
project |
str |
The execution project, will default to the Remote’s default project. |
domain |
str |
The execution domain, will default to the Remote’s default domain. |
remote_context()
def remote_context()Context manager with remote-specific configuration.
search_artifacts()
def search_artifacts(
project: typing.Optional[str],
domain: typing.Optional[str],
name: typing.Optional[str],
artifact_key: typing.Optional[art_id.ArtifactKey],
query: typing.Optional[ArtifactQuery],
partitions: typing.Optional[Union[Partitions, typing.Dict[str, str]]],
time_partition: typing.Optional[Union[datetime.datetime, TimePartition]],
group_by_key: bool,
limit: int,
) -> typing.List[Artifact]| Parameter | Type | Description |
|---|---|---|
project |
typing.Optional[str] |
|
domain |
typing.Optional[str] |
|
name |
typing.Optional[str] |
|
artifact_key |
typing.Optional[art_id.ArtifactKey] |
|
query |
typing.Optional[ArtifactQuery] |
|
partitions |
typing.Optional[Union[Partitions, typing.Dict[str, str]]] |
|
time_partition |
typing.Optional[Union[datetime.datetime, TimePartition]] |
|
group_by_key |
bool |
|
limit |
int |
set_input()
def set_input(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project,
domain,
python_type,
literal_type,
)| Parameter | Type | Description |
|---|---|---|
signal_id |
str |
The name of the signal, this is the key used in the approve() or wait_for_input() call. |
execution_name |
str |
The name of the execution. This is the tail-end of the URL when looking at the workflow execution. |
value |
typing.Union[literal_models.Literal, typing.Any] |
This is either a Literal or a Python value which FlyteRemote will invoke the TypeEngine to convert into a Literal. This argument is only value for wait_for_input type signals. |
project |
The execution project, will default to the Remote’s default project. | |
domain |
The execution domain, will default to the Remote’s default domain. | |
python_type |
Provide a python type to help with conversion if the value you provided is not a Literal. | |
literal_type |
Provide a Flyte literal type to help with conversion if the value you provided is not a Literal |
set_signal()
def set_signal(
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project: typing.Optional[str],
domain: typing.Optional[str],
python_type: typing.Optional[typing.Type],
literal_type: typing.Optional[type_models.LiteralType],
)| Parameter | Type | Description |
|---|---|---|
signal_id |
str |
The name of the signal, this is the key used in the approve() or wait_for_input() call. |
execution_name |
str |
The name of the execution. This is the tail-end of the URL when looking at the workflow execution. |
value |
typing.Union[literal_models.Literal, typing.Any] |
This is either a Literal or a Python value which FlyteRemote will invoke the TypeEngine to convert into a Literal. This argument is only value for wait_for_input type signals. |
project |
typing.Optional[str] |
The execution project, will default to the Remote’s default project. |
domain |
typing.Optional[str] |
The execution domain, will default to the Remote’s default domain. |
python_type |
typing.Optional[typing.Type] |
Provide a python type to help with conversion if the value you provided is not a Literal. |
literal_type |
typing.Optional[type_models.LiteralType] |
Provide a Flyte literal type to help with conversion if the value you provided is not a Literal |
stop_app()
def stop_app(
name: str,
project: Optional[str],
domain: Optional[str],
)Stop an application.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Name of application to stop. |
project |
Optional[str] |
Domain name. If None, uses default_domain. :return: The App IDL for the stopped application. |
domain |
Optional[str] |
stream_execution_events()
def stream_execution_events(
event_count: Optional[int],
include_workflow_executions: bool,
include_task_executions: bool,
include_node_executions: bool,
) -> AsyncGenerator[Union[CloudEventWorkflowExecution, CloudEventNodeExecution, CloudEventTaskExecution], None]Stream execution events from the given tenant. This is a generator that yields events as they are received.
Events are guaranteed to be delivered at least once, and clients must implement handling for potentially out-of-order event processing. Events will be retransmitted until acknowledged, with acknowledgment occurring automatically upon normal return from the caller. Note: if an exception is raised during event processing, the acknowledgment will not occur, and the event will be redelivered in a subsequent transmission.
| Parameter | Type | Description |
|---|---|---|
event_count |
Optional[int] |
Number of events to receive before closing the stream. If None, receive unlimited events. |
include_workflow_executions |
bool |
Whether to include workflow execution events |
include_task_executions |
bool |
Whether to include task execution events |
include_node_executions |
bool |
Whether to include node execution events |
sync()
def sync(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
) -> FlyteWorkflowExecutionThis function was previously a singledispatchmethod. We’ve removed that but this function remains so that we don’t break people.
| Parameter | Type | Description |
|---|---|---|
execution |
FlyteWorkflowExecution |
|
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
|
sync_nodes |
bool |
By default sync will fetch data on all underlying node executions (recursively, so subworkflows and launch plans will also get picked up). Set this to False in order to prevent that (which will make this call faster). :return: Returns the same execution object, but with additional information pulled in. |
sync_execution()
def sync_execution(
execution: FlyteWorkflowExecution,
entity_definition: typing.Union[FlyteWorkflow, FlyteTask],
sync_nodes: bool,
) -> FlyteWorkflowExecutionSync a FlyteWorkflowExecution object with its corresponding remote state.
| Parameter | Type | Description |
|---|---|---|
execution |
FlyteWorkflowExecution |
|
entity_definition |
typing.Union[FlyteWorkflow, FlyteTask] |
|
sync_nodes |
bool |
sync_node_execution()
def sync_node_execution(
execution: FlyteNodeExecution,
node_mapping: typing.Dict[str, FlyteNode],
) -> FlyteNodeExecutionGet data backing a node execution. These FlyteNodeExecution objects should’ve come from Admin with the model fields already populated correctly. For purposes of the remote experience, we’d like to supplement the object with some additional fields:
- inputs/outputs
- task/workflow executions, and/or underlying node executions in the case of parent nodes
- TypedInterface (remote wrapper type)
A node can have several different types of executions behind it. That is, the node could’ve run (perhaps multiple times because of retries):
- A task
- A static subworkflow
- A dynamic subworkflow (which in turn may have run additional tasks, subwfs, and/or launch plans)
- A launch plan
The data model is complicated, so ascertaining which of these happened is a bit tricky. That logic is encapsulated in this function.
| Parameter | Type | Description |
|---|---|---|
execution |
FlyteNodeExecution |
|
node_mapping |
typing.Dict[str, FlyteNode] |
sync_task_execution()
def sync_task_execution(
execution: FlyteTaskExecution,
entity_interface: typing.Optional[TypedInterface],
get_task_exec_data: bool,
) -> FlyteTaskExecutionSync a FlyteTaskExecution object with its corresponding remote state.
| Parameter | Type | Description |
|---|---|---|
execution |
FlyteTaskExecution |
|
entity_interface |
typing.Optional[TypedInterface] |
|
get_task_exec_data |
bool |
terminate()
def terminate(
execution: FlyteWorkflowExecution,
cause: str,
)Terminate a workflow execution.
| Parameter | Type | Description |
|---|---|---|
execution |
FlyteWorkflowExecution |
workflow execution to terminate |
cause |
str |
reason for termination |
upload_file()
def upload_file(
to_upload: pathlib.Path,
project: typing.Optional[str],
domain: typing.Optional[str],
filename_root: typing.Optional[str],
) -> typing.Tuple[bytes, str]Function will use remote’s client to hash and then upload the file using Admin’s data proxy service.
| Parameter | Type | Description |
|---|---|---|
to_upload |
pathlib.Path |
Must be a single file |
project |
typing.Optional[str] |
Project to upload under, if not supplied will use the remote’s default |
domain |
typing.Optional[str] |
Domain to upload under, if not specified will use the remote’s default |
filename_root |
typing.Optional[str] |
If provided will be used as the root of the filename. If not, Admin will use a hash :return: The uploaded location. |
wait()
def wait(
execution: FlyteWorkflowExecution,
timeout: typing.Optional[typing.Union[timedelta, int]],
poll_interval: typing.Optional[typing.Union[timedelta, int]],
sync_nodes: bool,
) -> FlyteWorkflowExecutionWait for an execution to finish.
| Parameter | Type | Description |
|---|---|---|
execution |
FlyteWorkflowExecution |
execution object to wait on |
timeout |
typing.Optional[typing.Union[timedelta, int]] |
maximum amount of time to wait. It can be a timedelta or a duration in seconds as int. |
poll_interval |
typing.Optional[typing.Union[timedelta, int]] |
sync workflow execution at this interval. It can be a timedelta or a duration in seconds as int. |
sync_nodes |
bool |
passed along to the sync call for the workflow execution |
Properties
| Property | Type | Description |
|---|---|---|
apps_service_client |
||
artifacts_client |
||
authorizer_service_client |
||
client |
Return a SynchronousFlyteClient for additional operations. |
|
config |
Image config. |
|
context |
||
default_domain |
Default project to use when fetching or executing flyte entities. |
|
default_project |
Default project to use when fetching or executing flyte entities. |
|
file_access |
File access provider to use for offloading non-literal inputs/outputs. |
|
hooks_async_client |
||
hooks_sync_client |
||
images_client |
||
interactive_mode_enabled |
If set to True, the FlyteRemote will pickle the task/workflow. |
|
secret_client |
||
sync_channel |
Return channel from client. This channel already has the org passed in dynamically by the interceptor. |
|
user_service_client |
||
users_client |
union.VersionParameters
Parameters used for version hash generation.
param func: The function to generate a version for. This is an optional parameter and can be any callable that matches the specified parameter and return types. :type func: Optional[Callable[P, FuncOut]]
class VersionParameters(
func: typing.Callable[~P, ~FuncOut],
container_image: typing.Union[str, flytekit.image_spec.image_spec.ImageSpec, NoneType],
pod_template: typing.Optional[flytekit.core.pod_template.PodTemplate],
pod_template_name: typing.Optional[str],
)| Parameter | Type | Description |
|---|---|---|
func |
typing.Callable[~P, ~FuncOut] |
|
container_image |
typing.Union[str, flytekit.image_spec.image_spec.ImageSpec, NoneType] |
|
pod_template |
typing.Optional[flytekit.core.pod_template.PodTemplate] |
|
pod_template_name |
typing.Optional[str] |