Task configuration
As we saw in
Getting started, you can run any Python function as a task in Flyte just by decorating it with @env.task
.
This allows you to run your Python code in a distributed manner, with each function running in its own container. Flyte manages the spinning up of the containers, the execution of the code, and the passing of data between the tasks.
In Getting started we demonstrated the simplest possible case, a TaskEnvironment
with only a name
parameter, and an env.task
decorator, with no parameters:
env = flyte.TaskEnvironment(name="hello_world")
@env.task
async def say_hello(data: str, lt: List[int]) -> str:
...
Notice how the TaskEnvironment
is assigned to the variable env
and then that variable is
used in the @env.task
. This is what connects the TaskEnvironment
to the task definition.
In the following we will often use @env.task
generically to refer to the decorator,
but it is important to remember that it is actually a decorator attached to a specific
TaskEnvironment
object, and the env
part can be any variable name you like.
This will run your task in the default container environment with default settings.
But, of course, one of the key advantages of Flyte is the ability to control the software environment, hardware environment, and other execution parameters for each task, right in your Python code. In this section we will explore the various configuration options available for tasks in Flyte.
Task configuration levels
Task configuration is done at three levels. From most general to most specific, they are:
- The
TaskEnvironment
level: setting parameters when defining theTaskEnvironment
object. - The
@env.task
decorator level: Setting parameters in the@env.task
decorator when defining a task function. - The task invocation level: Using the
task.override()
method when invoking task execution.
Each level has its own set of parameters, and some parameters are shared across levels. For shared parameters, the more specific level will override the more general one.
Example
Here is an example of how these levels work together, showing each level with all available parameters:
import flyte
# Level 1: TaskEnvironment - Base configuration
env = flyte.TaskEnvironment(
name="data_processing_env",
image=flyte.Image.from_debian_base(),
resources=flyte.Resources(cpu=1, memory="512Mi"),
env_vars={"MY_VAR": "value"},
secrets=flyte.Secret(key="my_api_key", as_env_var="MY_API_KEY"),
cache="disable",
pod_template=my_pod_template_spec,
reusable=flyte.ReusePolicy(replicas=2, idle_ttl=300),
depends_on=[another_env],
description="My task environment",
plugin_config=my_plugin_config
)
# Level 2: Decorator - Override some environment settings
@env.task(
name="data_processing_task",
secrets=flyte.Secret(key="my_api_key_2", as_env_var="MY_API_KEY"),
cache="auto"
pod_template=my_pod_template_spec_2,
report=True,
max_inline_io_bytes=100 * 1024
retries=3,
timeout=60
docs="This task processes data and generates a report."
)
async def process_data(data_path: str) -> str:
return f"Processed {data_path}"
@env.task
async def main() -> str:
result = await process_data.override(
resources=flyte.Resources(cpu=4, memory="2Gi"),
env_vars={"MY_VAR": "new_value"},
secrets=flyte.Secret(key="my_api_key_3", as_env_var="MY_API_KEY"),
cache="enable",
max_inline_io_bytes=100 * 1024,
retries=3,
timeout=60
)("input.csv")
return result
Parameter interaction
Here is an overview of all task configuration parameters available at each level and how they interact:
Parameter | TaskEnvironment |
@env.task decorator |
Task (and run) invocation |
---|---|---|---|
name | ✅ Yes (required) | ✅ Yes (sets friendly name) | ❌ No |
image | ✅ Yes | ❌ No | ❌ No |
resources | ✅ Yes | ❌ No | ✅ Yes (if not reusable ) |
env_vars | ✅ Yes | ❌ No | ✅ Yes (if not reusable ) |
secrets | ✅ Yes | ✅ Yes (if not reusable ) |
✅ Yes (if not reusable ) |
cache | ✅ Yes | ✅ Yes | ✅ Yes |
pod_template | ✅ Yes | ✅ Yes | ❌ No |
reusable | ✅ Yes (see below) | ❌ No | ✅ Yes |
depends_on | ✅ Yes | ❌ No | ❌ No |
description | ✅ Yes | ❌ No | ❌ No |
plugin_config | ✅ Yes | ❌ No | ❌ No |
report | ❌ No | ✅ Yes | ❌ No |
max_inline_io_bytes | ❌ No | ✅ Yes | ✅ Yes |
retries | ❌ No | ✅ Yes | ✅ Yes |
timeout | ❌ No | ✅ Yes | ✅ Yes |
docs | ❌ No | ✅ Yes | ❌ No |
Task configuration parameters
The full set of parameters available for configuring a task environment, task definition, and task invocation are:
name
-
Type:
str
-
In a
TaskEnvironment
constructor it defines the name of the environment and is required. Used in conjunction with the name of each@env.task
functions to define the fully-qualified task name. For example:env = flyte.TaskEnvironment(name="my_env") @env.task async def my_task(data: str) -> str: ...
Here, the fully qualified name of the task will be
my_env.my_task
. -
Can optionally be set in the
@env.task
decorator level, in which case it overrides, not theTaskEnvironment
name but the friendly name of the task. By default, the friendly name of a task is the name of the function. The friendly name is used for display purposes in the UI.
image
-
Type:
Union[str, Image, Literal['auto']]
-
Specifies the Docker image to use for the task container. Can be a URL reference to a Docker image, an
Image
object, or the stringauto
. If set toauto
, or if this parameter is not set, the default image will be used. See Container images. -
Only settable at the
TaskEnvironment
level.
resources
-
Type:
Optional[Resources]
-
Specifies the compute resources, such as CPU and Memory, required by the task environment using a
Resources
object.
- Can be set at the
TaskEnvironment
level and overridden at thetask.override()
invocation level (but only ifreuseable
is not in effect).
env_vars
-
Type:
Optional[Dict[str, str]]
-
A dictionary of environment variables to be made available in the task container. These variables can be used to configure the task at runtime, such as setting API keys or other configuration values.
secrets
-
Type:
Optional[SecretRequest]
whereSecretRequest
is an alias forUnion[str, Secret, List[str | Secret]]
-
The secrets to be made available in the task container. See the Secrets section and the API docs for the
Secret
object. -
Can be set at the
TaskEnvironment
level and overridden at the@env.task
decorator level and at thetask.override()
invocation level, but, in both cases, only ifreuseable
is not in effect.
cache
-
Type:
Union[CacheRequest]
whereCacheRequest
is an alias forLiteral["auto", "override", "disable", "enabled"] | Cache
. -
Specifies the caching policy to be used for this task. See Caching.
-
Can be set at the
TaskEnvironment
level and overridden at the@env.task
decorator level and at thetask.override()
invocation level.
pod_template
-
Type:
Optional[Union[str, kubernetes.client.V1PodTemplate]]
-
A pod template that defines the Kubernetes pod configuration for the task. A string reference to a named template or a
kubernetes.client.V1PodTemplate
object.
- Can be set at the
TaskEnvironment
level and overridden at the@env.task
decorator level, but not at thetask.override()
invocation level.
reusable
-
Type:
ReusePolicy | None
-
A
ReusePolicy
that defines whether the task environment can be reused. If set, the task environment will be reused across multiple task invocations. See Reusable containers and the API docs for theReusePolicy
object.
When a TaskEnvironment
has reusable
set, then resources
, env_vars
, and secrets
can only be overridden in task.override()
if accompanied by an
explicit reusable="off"
in the same task.override()
invocation.
For example:
env = flyte.TaskEnvironment(
name="my_env",
resources=Resources(cpu=1),
reusable=flyte.ReusePolicy(replicas=2, idle_ttl=300)
)
@env.task
async def my_task(data: str) -> str:
...
@env.task
async def main_workflow() -> str:
# `my_task.override(resources=Resources(cpu=4))` will fail. Instead use:
result = await my_task.override(reusable="off", resources=Resources(cpu=4))
Additionally, secrets
can only be overridden at the @env.task
decorator level if the TaskEnvironment
(env
) does not have reusable
set.
depends_on
-
Type:
List[Environment]
-
A list of
Environment
objects that thisTaskEnvironment
depends on. When deploying thisTaskEnvironment
, the system will ensure that any dependencies of the listedEnvironment
s are also available. This is useful when you have a set of task environments that depend on each other.
- Can only be set at the
TaskEnvironment
level, not at the@env.task
decorator level or thetask.override()
invocation level.
description
-
Type:
Optional[str]
-
A description of the task environment. This can be used to provide additional context about the task environment, such as its purpose or usage.
-
Can only be set at the
TaskEnvironment
level, not at the@env.task
decorator level or thetask.override()
invocation level.
plugin_config
-
Type:
Optional[Any]
-
Additional configuration for plugins that can be used with the task environment. This can include settings for specific plugins that are used in the task environment.
-
Can only be set at the
TaskEnvironment
level, not at the@env.task
decorator level or thetask.override()
invocation level.
report
- Type:
bool
- Whether to generate the HTML report for the task.
If set to
True
, the task will generate an HTML report that can be viewed in the Flyte UI. - Can only be set at the
@env.task
decorator level, not at theTaskEnvironment
level or thetask.override()
invocation level.
max_inline_io_bytes
-
Type:
int
-
Maximum allowed size (in bytes) for all inputs and outputs passed directly to the task (e.g., primitives, strings, dicts). Does not apply to
flyte.File
,flyte.Dir
, orflyte.DataFrame
(since these are passed by reference). -
Can be set at the
@env.task
decorator level and overridden at thetask.override()
invocation level. If not set, the default value isMAX_INLINE_IO_BYTES
(which is 100 MiB).
retries
-
Type:
Union[int, RetryStrategy]
-
The number of retries for the task, or a
RetryStrategy
object that defines the retry behavior. If set to0
, no retries will be attempted.
- Can be set at the
@env.task
decorator level and overridden at thetask.override()
invocation level.
timeout
-
Type:
Union[timedelta, int]
-
The timeout for the task, either as a
timedelta
object or an integer representing seconds. If set to0
, no timeout will be applied.
- Can be set at the
@env.task
decorator level and overridden at thetask.override()
invocation level.
docs
-
Type:
Optional[Documentation]
-
Documentation for the task, including usage examples and explanations of the task’s behavior.
-
Can only be set at the
@env.task
decorator level. It cannot be overridden.