TaskEnvironment
Package: flyte
Define an execution environment for a set of tasks.
Task configuration in Flyte has three levels (most general to most specific):
- TaskEnvironment — sets defaults for all tasks in the environment
- @env.task decorator — overrides per-task settings
- task.override() — overrides at invocation time
For shared parameters, the more specific level overrides the more general one.
Example:
env = flyte.TaskEnvironment(
name="my_env",
image=flyte.Image.from_debian_base(python="3.12").with_pip_packages("pandas"),
resources=flyte.Resources(cpu="1", memory="1Gi"),
)
@env.task
async def my_task():
passParameters
class TaskEnvironment(
name: str,
depends_on: List[Environment],
pod_template: Optional[Union[str, PodTemplate]],
description: Optional[str],
secrets: Optional[SecretRequest],
env_vars: Optional[Dict[str, str]],
resources: Optional[Resources],
interruptible: bool,
image: Union[str, Image, Literal['auto'], None],
cache: CacheRequest,
reusable: ReusePolicy | None,
plugin_config: Optional[Any],
queue: Optional[str],
)| Parameter | Type | Description |
|---|---|---|
name |
str |
Name of the environment (required). Must be snake_case or kebab-case. TaskEnvironment level only. |
depends_on |
List[Environment] |
List of other environments this one depends on. Used at deploy time to ensure dependencies are also deployed. TaskEnvironment level only. |
pod_template |
Optional[Union[str, PodTemplate]] |
Kubernetes pod template for advanced configuration (sidecars, volumes, etc.). Also settable in @env.task and task.override. |
description |
Optional[str] |
Human-readable description (max 255 characters). TaskEnvironment level only. |
secrets |
Optional[SecretRequest] |
Secrets to inject. Overridable via task.override(secrets=...) when not using reusable containers. |
env_vars |
Optional[Dict[str, str]] |
Environment variables as dict[str, str]. Overridable via task.override(env_vars=...) when not using reusable containers. |
resources |
Optional[Resources] |
Compute resources (CPU, memory, GPU, disk). Overridable via task.override(resources=...) when not using reusable containers. |
interruptible |
bool |
Whether tasks can run on spot/preemptible instances. Also settable in @env.task and task.override. |
image |
Union[str, Image, Literal['auto'], None] |
Docker image for the environment. Can be a string (image URI), an Image object, or "auto" to use the default image. TaskEnvironment level only. |
cache |
CacheRequest |
Cache policy — "auto", "override", "disable", or a Cache object. Also settable in @env.task(cache=...) and task.override(cache=...). |
reusable |
ReusePolicy | None |
ReusePolicy for container reuse. Also overridable via task.override(reusable=...). |
plugin_config |
Optional[Any] |
Plugin configuration for custom task types (e.g., Ray, Spark). Cannot be combined with reusable. TaskEnvironment level only. |
queue |
Optional[str] |
Queue name for scheduling. Also settable in @env.task and task.override. |
Properties
| Property | Type | Description |
|---|---|---|
sandbox |
None |
Access the sandbox namespace for creating sandboxed tasks. |
tasks |
None |
Get all tasks defined in the environment. |
Methods
| Method | Description |
|---|---|
add_dependency() |
Add a dependency to the environment. |
clone_with() |
Clone the TaskEnvironment with new parameters. |
from_task() |
Create a TaskEnvironment from a list of tasks. |
task() |
Decorate a function to be a task. |
add_dependency()
def add_dependency(
env: Environment,
)Add a dependency to the environment.
| Parameter | Type | Description |
|---|---|---|
env |
Environment |
clone_with()
def clone_with(
name: str,
image: Optional[Union[str, Image, Literal['auto']]],
resources: Optional[Resources],
env_vars: Optional[Dict[str, str]],
secrets: Optional[SecretRequest],
depends_on: Optional[List[Environment]],
description: Optional[str],
interruptible: Optional[bool],
kwargs: **kwargs,
) -> TaskEnvironmentClone the TaskEnvironment with new parameters.
Besides the base environment parameters, you can override kwargs like cache, reusable, etc.
| Parameter | Type | Description |
|---|---|---|
name |
str |
The name of the environment. |
image |
Optional[Union[str, Image, Literal['auto']]] |
The image to use for the environment. |
resources |
Optional[Resources] |
The resources to allocate for the environment. |
env_vars |
Optional[Dict[str, str]] |
The environment variables to set for the environment. |
secrets |
Optional[SecretRequest] |
The secrets to inject into the environment. |
depends_on |
Optional[List[Environment]] |
The environment dependencies to hint, so when you deploy the environment, the dependencies are also deployed. This is useful when you have a set of environments that depend on each other. |
description |
Optional[str] |
The description of the environment. |
interruptible |
Optional[bool] |
Whether the environment is interruptible and can be scheduled on spot/preemptible instances. |
kwargs |
**kwargs |
Additional parameters to override the environment (e.g., cache, reusable, plugin_config). |
from_task()
def from_task(
name: str,
tasks: TaskTemplate,
depends_on: Optional[List['Environment']],
) -> TaskEnvironmentCreate a TaskEnvironment from a list of tasks. All tasks should have the same image or no Image defined. Similarity of Image is determined by the python reference, not by value.
If images are different, an error is raised. If no image is defined, the image is set to “auto”.
For any other tasks that need to be use these tasks, the returned environment can be used in the depends_on
attribute of the other TaskEnvironment.
| Parameter | Type | Description |
|---|---|---|
name |
str |
The name of the environment. |
tasks |
TaskTemplate |
The list of tasks to create the environment from. |
depends_on |
Optional[List['Environment']] |
Optional list of environments that this environment depends on. |
Returns: The created TaskEnvironment.
Raises
| Exception | Description |
|---|---|
ValueError |
If tasks are assigned to multiple environments or have different images. |
task()
def task(
_func: F | None,
short_name: Optional[str],
cache: CacheRequest | None,
retries: Union[int, RetryStrategy],
timeout: Union[timedelta, int],
docs: Optional[Documentation],
pod_template: Optional[Union[str, PodTemplate]],
report: bool,
interruptible: bool | None,
max_inline_io_bytes: int,
queue: Optional[str],
triggers: Tuple[Trigger, ...] | Trigger,
links: Tuple[Link, ...] | Link,
task_resolver: Any | None,
) -> Callable[[F], AsyncFunctionTaskTemplate[P, R, F]] | AsyncFunctionTaskTemplate[P, R, F]Decorate a function to be a task.
| Parameter | Type | Description |
|---|---|---|
_func |
F | None |
Optional The function to decorate. If not provided, the decorator will return a callable that accepts a function to be decorated. |
short_name |
Optional[str] |
Optional A friendly name for the task (defaults to the function name) |
cache |
CacheRequest | None |
Optional The cache policy for the task, defaults to auto, which will cache the results of the task. |
retries |
Union[int, RetryStrategy] |
Optional The number of retries for the task, defaults to 0, which means no retries. |
timeout |
Union[timedelta, int] |
Optional The timeout for the task. |
docs |
Optional[Documentation] |
Optional The documentation for the task, if not provided the function docstring will be used. |
pod_template |
Optional[Union[str, PodTemplate]] |
Optional The pod template for the task, if not provided the default pod template will be used. |
report |
bool |
Optional Whether to generate the html report for the task, defaults to False. |
interruptible |
bool | None |
Optional Whether the task is interruptible, defaults to environment setting. |
max_inline_io_bytes |
int |
Maximum allowed size (in bytes) for all inputs and outputs passed directly to the task (e.g., primitives, strings, dicts). Does not apply to files, directories, or dataframes. |
queue |
Optional[str] |
Optional queue name to use for this task. If not set, the environment’s queue will be used. |
triggers |
Tuple[Trigger, ...] | Trigger |
Optional A tuple of triggers to associate with the task. This allows the task to be run on a schedule or in response to events. Triggers can be defined using the flyte.trigger module. |
links |
Tuple[Link, ...] | Link |
Optional A tuple of links to associate with the task. Links can be used to provide additional context or information about the task. Links should implement the flyte.Link protocol |
task_resolver |
Any | None |
Returns: A TaskTemplate that can be used to deploy the task.