flyte
Flyte SDK for authoring compound AI applications, services and workflows.
Directory
Classes
Class | Description |
---|---|
Cache |
Cache configuration for a task. |
Device |
Represents a device type, its quantity and partition if applicable. |
Environment |
|
Image |
This is a representation of Container Images, which can be used to create layered images programmatically. |
PodTemplate |
Custom PodTemplate specification for a Task. |
Resources |
Resources such as CPU, Memory, and GPU that can be allocated to a task. |
RetryStrategy |
Retry strategy for the task or task environment. |
ReusePolicy |
ReusePolicy can be used to configure a task to reuse the environment. |
Secret |
Secrets are used to inject sensitive information into tasks. |
TaskEnvironment |
Environment class to define a new environment for a set of tasks. |
Timeout |
Timeout class to define a timeout for a task. |
Protocols
Protocol | Description |
---|---|
CachePolicy |
Base class for protocol classes. |
Methods
Method | Description |
---|---|
GPU() |
Create a GPU device instance. |
TPU() |
Create a TPU device instance. |
build() |
Build an image. |
build_images() |
Build the images for the given environments. |
ctx() |
Retrieve the current task context from the context variable. |
deploy() |
Deploy the given environment or list of environments. |
group() |
Create a new group with the given name. |
init() |
Initialize the Flyte system with the given configuration. |
init_from_config() |
Initialize the Flyte system using a configuration file or Config object. |
run() |
Run a task with the given parameters. |
trace() |
A decorator that traces function execution with timing information. |
with_runcontext() |
Launch a new run with the given parameters as the context. |
Variables
Property | Type | Description |
---|---|---|
TimeoutType |
UnionType |
|
__version__ |
str |
Methods
GPU()
def GPU(
device: typing.Literal['T4', 'A100', 'A100 80G', 'H100', 'L4', 'L40s'],
quantity: typing.Literal[1, 2, 3, 4, 5, 6, 7, 8],
partition: typing.Union[typing.Literal['1g.5gb', '2g.10gb', '3g.20gb', '4g.20gb', '7g.40gb'], typing.Literal['1g.10gb', '2g.20gb', '3g.40gb', '4g.40gb', '7g.80gb'], NoneType],
) -> flyte._resources.Device
Create a GPU device instance.
Parameter | Type |
---|---|
device |
typing.Literal['T4', 'A100', 'A100 80G', 'H100', 'L4', 'L40s'] |
quantity |
typing.Literal[1, 2, 3, 4, 5, 6, 7, 8] |
partition |
typing.Union[typing.Literal['1g.5gb', '2g.10gb', '3g.20gb', '4g.20gb', '7g.40gb'], typing.Literal['1g.10gb', '2g.20gb', '3g.40gb', '4g.40gb', '7g.80gb'], NoneType] |
TPU()
def TPU(
device: typing.Literal['V5P', 'V6E'],
partition: typing.Union[typing.Literal['2x2x1', '2x2x2', '2x4x4', '4x4x4', '4x4x8', '4x8x8', '8x8x8', '8x8x16', '8x16x16', '16x16x16', '16x16x24'], typing.Literal['1x1', '2x2', '2x4', '4x4', '4x8', '8x8', '8x16', '16x16'], NoneType],
)
Create a TPU device instance.
Parameter | Type |
---|---|
device |
typing.Literal['V5P', 'V6E'] |
partition |
typing.Union[typing.Literal['2x2x1', '2x2x2', '2x4x4', '4x4x4', '4x4x8', '4x8x8', '8x8x8', '8x8x16', '8x16x16', '16x16x16', '16x16x24'], typing.Literal['1x1', '2x2', '2x4', '4x4', '4x8', '8x8', '8x16', '16x16'], NoneType] |
build()
def build(
image: Image,
) -> str
Build an image. The existing async context will be used.
Example:
import flyte
image = flyte.Image("example_image")
if __name__ == "__main__":
asyncio.run(flyte.build.aio(image))
Parameter | Type |
---|---|
image |
Image |
build_images()
def build_images(
envs: Environment,
) -> ImageCache
Build the images for the given environments.
Parameter | Type |
---|---|
envs |
Environment |
ctx()
def ctx()
Retrieve the current task context from the context variable.
deploy()
def deploy(
envs: Environment,
dryrun: bool,
version: str | None,
interactive_mode: bool | None,
copy_style: CopyFiles,
) -> Deployment
Deploy the given environment or list of environments.
Parameter | Type |
---|---|
envs |
Environment |
dryrun |
bool |
version |
str | None |
interactive_mode |
bool | None |
copy_style |
CopyFiles |
group()
def group(
name: str,
)
Create a new group with the given name. The method is intended to be used as a context manager.
Example:
@task
async def my_task():
...
with group("my_group"):
t1(x,y) # tasks in this block will be grouped under "my_group"
...
Parameter | Type |
---|---|
name |
str |
init()
def init(
org: str | None,
project: str | None,
domain: str | None,
root_dir: Path | None,
log_level: int | None,
endpoint: str | None,
headless: bool,
insecure: bool,
insecure_skip_verify: bool,
ca_cert_file_path: str | None,
auth_type: AuthType,
command: List[str] | None,
proxy_command: List[str] | None,
api_key: str | None,
client_id: str | None,
client_credentials_secret: str | None,
auth_client_config: ClientConfig | None,
rpc_retries: int,
http_proxy_url: str | None,
storage: Storage | None,
batch_size: int,
image_builder: ImageBuildEngine.ImageBuilderType,
)
Initialize the Flyte system with the given configuration. This method should be called before any other Flyte remote API methods are called. Thread-safe implementation.
Parameter | Type |
---|---|
org |
str | None |
project |
str | None |
domain |
str | None |
root_dir |
Path | None |
log_level |
int | None |
endpoint |
str | None |
headless |
bool |
insecure |
bool |
insecure_skip_verify |
bool |
ca_cert_file_path |
str | None |
auth_type |
AuthType |
command |
List[str] | None |
proxy_command |
List[str] | None |
api_key |
str | None |
client_id |
str | None |
client_credentials_secret |
str | None |
auth_client_config |
ClientConfig | None |
rpc_retries |
int |
http_proxy_url |
str | None |
storage |
Storage | None |
batch_size |
int |
image_builder |
ImageBuildEngine.ImageBuilderType |
init_from_config()
def init_from_config(
path_or_config: str | Config | None,
root_dir: Path | None,
log_level: int | None,
)
Initialize the Flyte system using a configuration file or Config object. This method should be called before any other Flyte remote API methods are called. Thread-safe implementation.
Parameter | Type |
---|---|
path_or_config |
str | Config | None |
root_dir |
Path | None |
log_level |
int | None |
run()
def run(
task: TaskTemplate[P, R],
args: *args,
kwargs: **kwargs,
) -> Union[R, Run]
Run a task with the given parameters
Parameter | Type |
---|---|
task |
TaskTemplate[P, R] |
args |
*args |
kwargs |
**kwargs |
trace()
def trace(
func: typing.Callable[..., ~T],
) -> typing.Callable[..., ~T]
A decorator that traces function execution with timing information. Works with regular functions, async functions, and async generators/iterators.
Parameter | Type |
---|---|
func |
typing.Callable[..., ~T] |
with_runcontext()
def with_runcontext(
mode: Mode | None,
name: Optional[str],
service_account: Optional[str],
version: Optional[str],
copy_style: CopyFiles,
dry_run: bool,
copy_bundle_to: pathlib.Path | None,
interactive_mode: bool | None,
raw_data_path: str | None,
run_base_dir: str | None,
overwrite_cache: bool,
project: str | None,
domain: str | None,
env: Dict[str, str] | None,
labels: Dict[str, str] | None,
annotations: Dict[str, str] | None,
interruptible: bool,
log_level: int | None,
) -> _Runner
Launch a new run with the given parameters as the context.
Example:
import flyte
env = flyte.TaskEnvironment("example")
@env.task
async def example_task(x: int, y: str) -> str:
return f"{x} {y}"
if __name__ == "__main__":
flyte.with_runcontext(name="example_run_id").run(example_task, 1, y="hello")
Parameter | Type |
---|---|
mode |
Mode | None |
name |
Optional[str] |
service_account |
Optional[str] |
version |
Optional[str] |
copy_style |
CopyFiles |
dry_run |
bool |
copy_bundle_to |
pathlib.Path | None |
interactive_mode |
bool | None |
raw_data_path |
str | None |
run_base_dir |
str | None |
overwrite_cache |
bool |
project |
str | None |
domain |
str | None |
env |
Dict[str, str] | None |
labels |
Dict[str, str] | None |
annotations |
Dict[str, str] | None |
interruptible |
bool |
log_level |
int | None |
flyte.Cache
Cache configuration for a task.
class Cache(
behavior: typing.Literal['auto', 'override', 'disable', 'enabled'],
version_override: typing.Optional[str],
serialize: bool,
ignored_inputs: typing.Union[typing.Tuple[str, ...], str],
salt: str,
policies: typing.Union[typing.List[flyte._cache.cache.CachePolicy], flyte._cache.cache.CachePolicy, NoneType],
)
Parameter | Type |
---|---|
behavior |
typing.Literal['auto', 'override', 'disable', 'enabled'] |
version_override |
typing.Optional[str] |
serialize |
bool |
ignored_inputs |
typing.Union[typing.Tuple[str, ...], str] |
salt |
str |
policies |
typing.Union[typing.List[flyte._cache.cache.CachePolicy], flyte._cache.cache.CachePolicy, NoneType] |
Methods
Method | Description |
---|---|
get_ignored_inputs() |
|
get_version() |
|
is_enabled() |
Check if the cache policy is enabled. |
get_ignored_inputs()
def get_ignored_inputs()
get_version()
def get_version(
params: typing.Optional[flyte._cache.cache.VersionParameters],
) -> str
Parameter | Type |
---|---|
params |
typing.Optional[flyte._cache.cache.VersionParameters] |
is_enabled()
def is_enabled()
Check if the cache policy is enabled.
flyte.CachePolicy
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol):
def meth(self) -> int:
...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C:
def meth(self) -> int:
return 0
def func(x: Proto) -> int:
return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol):
def meth(self) -> T:
...
protocol CachePolicy()
Methods
Method | Description |
---|---|
get_version() |
get_version()
def get_version(
salt: str,
params: flyte._cache.cache.VersionParameters,
) -> str
Parameter | Type |
---|---|
salt |
str |
params |
flyte._cache.cache.VersionParameters |
flyte.Device
Represents a device type, its quantity and partition if applicable. param device: The type of device (e.g., “T4”, “A100”). param quantity: The number of devices of this type. param partition: The partition of the device (e.g., “1g.5gb”, “2g.10gb” for gpus) or (“1x1”, … for tpus).
class Device(
quantity: int,
device: str | None,
partition: str | None,
)
Parameter | Type |
---|---|
quantity |
int |
device |
str | None |
partition |
str | None |
flyte.Environment
class Environment(
name: str,
depends_on: List[Environment],
pod_template: Optional[Union[str, 'V1PodTemplate']],
description: Optional[str],
secrets: Optional[SecretRequest],
env: Optional[Dict[str, str]],
resources: Optional[Resources],
image: Union[str, Image, Literal['auto']],
)
Parameter | Type |
---|---|
name |
str |
depends_on |
List[Environment] |
pod_template |
Optional[Union[str, 'V1PodTemplate']] |
description |
Optional[str] |
secrets |
Optional[SecretRequest] |
env |
Optional[Dict[str, str]] |
resources |
Optional[Resources] |
image |
Union[str, Image, Literal['auto']] |
Methods
Method | Description |
---|---|
add_dependency() |
Add a dependency to the environment. |
clone_with() |
add_dependency()
def add_dependency(
env: Environment,
)
Add a dependency to the environment.
Parameter | Type |
---|---|
env |
Environment |
clone_with()
def clone_with(
name: str,
image: Optional[Union[str, Image, Literal['auto']]],
resources: Optional[Resources],
env: Optional[Dict[str, str]],
secrets: Optional[SecretRequest],
depends_on: Optional[List[Environment]],
kwargs: **kwargs,
) -> Environment
Parameter | Type |
---|---|
name |
str |
image |
Optional[Union[str, Image, Literal['auto']]] |
resources |
Optional[Resources] |
env |
Optional[Dict[str, str]] |
secrets |
Optional[SecretRequest] |
depends_on |
Optional[List[Environment]] |
kwargs |
**kwargs |
flyte.Image
This is a representation of Container Images, which can be used to create layered images programmatically.
Use by first calling one of the base constructor methods. These all begin with from
or default_
The image can then be amended with additional layers using the various with_*
methods.
Invariant for this class: The construction of Image objects must be doable everywhere. That is, if a user has a custom image that is not accessible, calling .with_source_file on a file that doesn’t exist, the instantiation of the object itself must still go through. Further, the .identifier property of the image must also still go through. This is because it may have been already built somewhere else. Use validate() functions to check each layer for actual errors. These are invoked at actual build time. See self.id for more information
class Image(
base_image: Optional[str],
dockerfile: Optional[Path],
registry: Optional[str],
name: Optional[str],
platform: Tuple[Architecture, ...],
python_version: Tuple[int, int],
_layers: Tuple[Layer, ...],
)
Parameter | Type |
---|---|
base_image |
Optional[str] |
dockerfile |
Optional[Path] |
registry |
Optional[str] |
name |
Optional[str] |
platform |
Tuple[Architecture, ...] |
python_version |
Tuple[int, int] |
_layers |
Tuple[Layer, ...] |
Methods
Method | Description |
---|---|
clone() |
Use this method to clone the current image and change the registry and name. |
from_base() |
Use this method to start with a pre-built base image. |
from_debian_base() |
Use this method to start using the default base image, built from this library’s base Dockerfile. |
from_dockerfile() |
Use this method to create a new image with the specified dockerfile. |
from_uv_script() |
Use this method to create a new image with the specified uv script. |
validate() |
|
with_apt_packages() |
Use this method to create a new image with the specified apt packages layered on top of the current image. |
with_commands() |
Use this method to create a new image with the specified commands layered on top of the current image. |
with_dockerignore() |
|
with_env_vars() |
Use this method to create a new image with the specified environment variables layered on top of. |
with_local_v2() |
Use this method to create a new image with the local v2 builder. |
with_pip_packages() |
Use this method to create a new image with the specified pip packages layered on top of the current image. |
with_requirements() |
Use this method to create a new image with the specified requirements file layered on top of the current image. |
with_source_file() |
Use this method to create a new image with the specified local file layered on top of the current image. |
with_source_folder() |
Use this method to create a new image with the specified local directory layered on top of the current image. |
with_uv_project() |
Use this method to create a new image with the specified uv. |
with_workdir() |
Use this method to create a new image with the specified working directory. |
clone()
def clone(
registry: Optional[str],
name: Optional[str],
python_version: Optional[Tuple[int, int]],
addl_layer: Optional[Layer],
) -> Image
Use this method to clone the current image and change the registry and name
Parameter | Type |
---|---|
registry |
Optional[str] |
name |
Optional[str] |
python_version |
Optional[Tuple[int, int]] |
addl_layer |
Optional[Layer] |
from_base()
def from_base(
image_uri: str,
) -> Image
Use this method to start with a pre-built base image. This image must already exist in the registry of course.
Parameter | Type |
---|---|
image_uri |
str |
from_debian_base()
def from_debian_base(
python_version: Optional[Tuple[int, int]],
flyte_version: Optional[str],
install_flyte: bool,
registry: Optional[str],
name: Optional[str],
platform: Optional[Tuple[Architecture, ...]],
) -> Image
Use this method to start using the default base image, built from this library’s base Dockerfile Default images are multi-arch amd/arm64
Parameter | Type |
---|---|
python_version |
Optional[Tuple[int, int]] |
flyte_version |
Optional[str] |
install_flyte |
bool |
registry |
Optional[str] |
name |
Optional[str] |
platform |
Optional[Tuple[Architecture, ...]] |
from_dockerfile()
def from_dockerfile(
file: Path,
registry: str,
name: str,
platform: Union[Architecture, Tuple[Architecture, ...], None],
) -> Image
Use this method to create a new image with the specified dockerfile. Note you cannot use additional layers after this, as the system doesn’t attempt to parse/understand the Dockerfile, and what kind of setup it has (python version, uv vs poetry, etc), so please put all logic into the dockerfile itself.
Also since Python sees paths as from the calling directory, please use Path objects with absolute paths. The context for the builder will be the directory where the dockerfile is located.
Parameter | Type |
---|---|
file |
Path |
registry |
str |
name |
str |
platform |
Union[Architecture, Tuple[Architecture, ...], None] |
from_uv_script()
def from_uv_script(
script: Path | str,
name: str,
registry: str | None,
python_version: Optional[Tuple[int, int]],
index_url: Optional[str],
extra_index_urls: Union[str, List[str], Tuple[str, ...], None],
pre: bool,
extra_args: Optional[str],
platform: Optional[Tuple[Architecture, ...]],
) -> Image
Use this method to create a new image with the specified uv script. It uses the header of the script to determine the python version, dependencies to install. The script must be a valid uv script, otherwise an error will be raised.
Usually the header of the script will look like this: Example:
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["httpx"]
# ///
For more information on the uv script format, see the documentation: UV: Declaring script dependencies
Parameter | Type |
---|---|
script |
Path | str |
name |
str |
registry |
str | None |
python_version |
Optional[Tuple[int, int]] |
index_url |
Optional[str] |
extra_index_urls |
Union[str, List[str], Tuple[str, ...], None] |
pre |
bool |
extra_args |
Optional[str] |
platform |
Optional[Tuple[Architecture, ...]] |
validate()
def validate()
with_apt_packages()
def with_apt_packages(
packages: str,
) -> Image
Use this method to create a new image with the specified apt packages layered on top of the current image
Parameter | Type |
---|---|
packages |
str |
with_commands()
def with_commands(
commands: List[str],
) -> Image
Use this method to create a new image with the specified commands layered on top of the current image Be sure not to use RUN in your command.
Parameter | Type |
---|---|
commands |
List[str] |
with_dockerignore()
def with_dockerignore(
path: Path,
) -> Image
Parameter | Type |
---|---|
path |
Path |
with_env_vars()
def with_env_vars(
env_vars: Dict[str, str],
) -> Image
Use this method to create a new image with the specified environment variables layered on top of the current image. Cannot be used in conjunction with conda
Parameter | Type |
---|---|
env_vars |
Dict[str, str] |
with_local_v2()
def with_local_v2()
Use this method to create a new image with the local v2 builder This will override any existing builder
:return: Image
with_pip_packages()
def with_pip_packages(
packages: str,
index_url: Optional[str],
extra_index_urls: Union[str, List[str], Tuple[str, ...], None],
pre: bool,
extra_args: Optional[str],
) -> Image
Use this method to create a new image with the specified pip packages layered on top of the current image Cannot be used in conjunction with conda
Example:
@flyte.task(image=(flyte.Image
.ubuntu_python()
.with_pip_packages("requests", "numpy")))
def my_task(x: int) -> int:
import numpy as np
return np.sum([x, 1])
Parameter | Type |
---|---|
packages |
str |
index_url |
Optional[str] |
extra_index_urls |
Union[str, List[str], Tuple[str, ...], None] |
pre |
bool |
extra_args |
Optional[str] |
with_requirements()
def with_requirements(
file: str | Path,
) -> Image
Use this method to create a new image with the specified requirements file layered on top of the current image Cannot be used in conjunction with conda
Parameter | Type |
---|---|
file |
str | Path |
with_source_file()
def with_source_file(
src: Path,
dst: str,
) -> Image
Use this method to create a new image with the specified local file layered on top of the current image. If dest is not specified, it will be copied to the working directory of the image
Parameter | Type |
---|---|
src |
Path |
dst |
str |
with_source_folder()
def with_source_folder(
src: Path,
dst: str,
) -> Image
Use this method to create a new image with the specified local directory layered on top of the current image. If dest is not specified, it will be copied to the working directory of the image
Parameter | Type |
---|---|
src |
Path |
dst |
str |
with_uv_project()
def with_uv_project(
pyproject_file: Path,
index_url: Optional[str],
extra_index_urls: Union[str, List[str], Tuple[str, ...], None],
pre: bool,
extra_args: Optional[str],
) -> Image
Use this method to create a new image with the specified uv.lock file layered on top of the current image Must have a corresponding pyproject.toml file in the same directory Cannot be used in conjunction with conda In the Union builders, using this will change the virtual env to /root/.venv
Parameter | Type |
---|---|
pyproject_file |
Path |
index_url |
Optional[str] |
extra_index_urls |
Union[str, List[str], Tuple[str, ...], None] |
pre |
bool |
extra_args |
Optional[str] |
with_workdir()
def with_workdir(
workdir: str,
) -> Image
Use this method to create a new image with the specified working directory This will override any existing working directory
Parameter | Type |
---|---|
workdir |
str |
flyte.PodTemplate
Custom PodTemplate specification for a Task.
class PodTemplate(
pod_spec: typing.Optional[ForwardRef('V1PodSpec')],
primary_container_name: str,
labels: typing.Optional[typing.Dict[str, str]],
annotations: typing.Optional[typing.Dict[str, str]],
)
Parameter | Type |
---|---|
pod_spec |
typing.Optional[ForwardRef('V1PodSpec')] |
primary_container_name |
str |
labels |
typing.Optional[typing.Dict[str, str]] |
annotations |
typing.Optional[typing.Dict[str, str]] |
Methods
Method | Description |
---|---|
to_k8s_pod() |
to_k8s_pod()
def to_k8s_pod()
flyte.Resources
Resources such as CPU, Memory, and GPU that can be allocated to a task.
Example:
- Single CPU, 1GiB of memory, and 1 T4 GPU:
@task(resources=Resources(cpu=1, memory="1GiB", gpu="T4:1"))
def my_task() -> int:
return 42
- 1CPU with limit upto 2CPU, 2GiB of memory, and 8 A100 GPUs and 10GiB of disk:
@task(resources=Resources(cpu=(1, 2), memory="2GiB", gpu="A100:8", disk="10GiB"))
def my_task() -> int:
return 42
class Resources(
cpu: typing.Union[int, float, str, typing.Tuple[int | float | str, int | float | str], NoneType],
memory: typing.Union[str, typing.Tuple[str, str], NoneType],
gpu: typing.Union[typing.Literal['T4:1', 'T4:2', 'T4:3', 'T4:4', 'T4:5', 'T4:6', 'T4:7', 'T4:8', 'L4:1', 'L4:2', 'L4:3', 'L4:4', 'L4:5', 'L4:6', 'L4:7', 'L4:8', 'L40s:1', 'L40s:2', 'L40s:3', 'L40s:4', 'L40s:5', 'L40s:6', 'L40s:7', 'L40s:8', 'A100:1', 'A100:2', 'A100:3', 'A100:4', 'A100:5', 'A100:6', 'A100:7', 'A100:8', 'A100 80G:1', 'A100 80G:2', 'A100 80G:3', 'A100 80G:4', 'A100 80G:5', 'A100 80G:6', 'A100 80G:7', 'A100 80G:8', 'H100:1', 'H100:2', 'H100:3', 'H100:4', 'H100:5', 'H100:6', 'H100:7', 'H100:8'], int, flyte._resources.Device, NoneType],
disk: typing.Optional[str],
shm: typing.Union[str, typing.Literal['auto'], NoneType],
)
Parameter | Type |
---|---|
cpu |
typing.Union[int, float, str, typing.Tuple[int | float | str, int | float | str], NoneType] |
memory |
typing.Union[str, typing.Tuple[str, str], NoneType] |
gpu |
typing.Union[typing.Literal['T4:1', 'T4:2', 'T4:3', 'T4:4', 'T4:5', 'T4:6', 'T4:7', 'T4:8', 'L4:1', 'L4:2', 'L4:3', 'L4:4', 'L4:5', 'L4:6', 'L4:7', 'L4:8', 'L40s:1', 'L40s:2', 'L40s:3', 'L40s:4', 'L40s:5', 'L40s:6', 'L40s:7', 'L40s:8', 'A100:1', 'A100:2', 'A100:3', 'A100:4', 'A100:5', 'A100:6', 'A100:7', 'A100:8', 'A100 80G:1', 'A100 80G:2', 'A100 80G:3', 'A100 80G:4', 'A100 80G:5', 'A100 80G:6', 'A100 80G:7', 'A100 80G:8', 'H100:1', 'H100:2', 'H100:3', 'H100:4', 'H100:5', 'H100:6', 'H100:7', 'H100:8'], int, flyte._resources.Device, NoneType] |
disk |
typing.Optional[str] |
shm |
typing.Union[str, typing.Literal['auto'], NoneType] |
Methods
Method | Description |
---|---|
get_device() |
Get the accelerator string for the task. |
get_shared_memory() |
Get the shared memory string for the task. |
get_device()
def get_device()
Get the accelerator string for the task.
:return: If GPUs are requested, return a tuple of the device name, and potentially a partition string.
Default cloud provider labels typically use the following values: 1g.5gb
, 2g.10gb
, etc.
get_shared_memory()
def get_shared_memory()
Get the shared memory string for the task.
:return: The shared memory string.
flyte.RetryStrategy
Retry strategy for the task or task environment. Retry strategy is optional or can be a simple number of retries.
Example:
- This will retry the task 5 times.
@task(retries=5)
def my_task():
pass
- This will retry the task 5 times with a maximum backoff of 10 seconds and a backoff factor of 2.
@task(retries=RetryStrategy(count=5, max_backoff=10, backoff=2))
def my_task():
pass
class RetryStrategy(
count: int,
backoff: typing.Union[float, datetime.timedelta, NoneType],
backoff_factor: typing.Union[int, float, NoneType],
)
Parameter | Type |
---|---|
count |
int |
backoff |
typing.Union[float, datetime.timedelta, NoneType] |
backoff_factor |
typing.Union[int, float, NoneType] |
flyte.ReusePolicy
ReusePolicy can be used to configure a task to reuse the environment. This is useful when the environment creation is expensive and the runtime of the task is short. The environment will be reused for the next invocation of the task, even the python process maybe be reused by subsequent task invocations. A good mental model is to think of the environment as a container that is reused for multiple tasks, more like a long-running service.
Caution: It is important to note that the environment is shared, so managing memory and resources is important.
class ReusePolicy(
replicas: typing.Union[int, typing.Tuple[int, int]],
idle_ttl: typing.Union[int, datetime.timedelta, NoneType],
reuse_salt: str | None,
concurrency: int,
)
Parameter | Type |
---|---|
replicas |
typing.Union[int, typing.Tuple[int, int]] |
idle_ttl |
typing.Union[int, datetime.timedelta, NoneType] |
reuse_salt |
str | None |
concurrency |
int |
Properties
Property | Type | Description |
---|---|---|
max_replicas |
None |
Returns the maximum number of replicas. |
ttl |
None |
Returns the idle TTL as a timedelta. If idle_ttl is not set, returns the global default. |
flyte.Secret
Secrets are used to inject sensitive information into tasks. Secrets can be mounted as environment variables or files. The secret key is the name of the secret in the secret store. The group is optional and maybe used with some secret stores to organize secrets. The secret_mount is used to specify how the secret should be mounted. If the secret_mount is set to “env” the secret will be mounted as an environment variable. If the secret_mount is set to “file” the secret will be mounted as a file. The as_env_var is an optional parameter that can be used to specify the name of the environment variable that the secret should be mounted as.
Example:
@task(secrets="MY_SECRET")
async def my_task():
os.environ["MY_SECRET"] # This will be set to the value of the secret
@task(secrets=Secret("MY_SECRET", mount="/path/to/secret"))
async def my_task2():
async with open("/path/to/secret") as f:
secret_value = f.read()
TODO: Add support for secret versioning (some stores) and secret groups (some stores) and mounting as files.
class Secret(
key: str,
group: typing.Optional[str],
mount: pathlib._local.Path | None,
as_env_var: typing.Optional[str],
)
Parameter | Type |
---|---|
key |
str |
group |
typing.Optional[str] |
mount |
pathlib._local.Path | None |
as_env_var |
typing.Optional[str] |
Methods
Method | Description |
---|---|
stable_hash() |
Deterministic, process-independent hash (as hex string). |
stable_hash()
def stable_hash()
Deterministic, process-independent hash (as hex string).
flyte.TaskEnvironment
Environment class to define a new environment for a set of tasks.
Example usage:
env = flyte.TaskEnvironment(name="my_env", image="my_image", resources=Resources(cpu="1", memory="1Gi"))
@env.task
async def my_task():
pass
class TaskEnvironment(
name: str,
depends_on: List[Environment],
pod_template: Optional[Union[str, 'V1PodTemplate']],
description: Optional[str],
secrets: Optional[SecretRequest],
env: Optional[Dict[str, str]],
resources: Optional[Resources],
image: Union[str, Image, Literal['auto']],
cache: Union[CacheRequest],
reusable: ReusePolicy | None,
plugin_config: Optional[Any],
)
Parameter | Type |
---|---|
name |
str |
depends_on |
List[Environment] |
pod_template |
Optional[Union[str, 'V1PodTemplate']] |
description |
Optional[str] |
secrets |
Optional[SecretRequest] |
env |
Optional[Dict[str, str]] |
resources |
Optional[Resources] |
image |
Union[str, Image, Literal['auto']] |
cache |
Union[CacheRequest] |
reusable |
ReusePolicy | None |
plugin_config |
Optional[Any] |
Methods
Method | Description |
---|---|
add_dependency() |
Add a dependency to the environment. |
add_task() |
Add a task to the environment. |
clone_with() |
Clone the TaskEnvironment with new parameters. |
task() |
add_dependency()
def add_dependency(
env: Environment,
)
Add a dependency to the environment.
Parameter | Type |
---|---|
env |
Environment |
add_task()
def add_task(
task: TaskTemplate,
) -> TaskTemplate
Add a task to the environment.
Parameter | Type |
---|---|
task |
TaskTemplate |
clone_with()
def clone_with(
name: str,
image: Optional[Union[str, Image, Literal['auto']]],
resources: Optional[Resources],
env: Optional[Dict[str, str]],
secrets: Optional[SecretRequest],
depends_on: Optional[List[Environment]],
kwargs: **kwargs,
) -> TaskEnvironment
Clone the TaskEnvironment with new parameters.
besides the base environment parameters, you can override, kwargs like cache
, reusable
, etc.
Parameter | Type |
---|---|
name |
str |
image |
Optional[Union[str, Image, Literal['auto']]] |
resources |
Optional[Resources] |
env |
Optional[Dict[str, str]] |
secrets |
Optional[SecretRequest] |
depends_on |
Optional[List[Environment]] |
kwargs |
**kwargs |
task()
def task(
_func,
name: Optional[str],
cache: Union[CacheRequest] | None,
retries: Union[int, RetryStrategy],
timeout: Union[timedelta, int],
docs: Optional[Documentation],
secrets: Optional[SecretRequest],
pod_template: Optional[Union[str, 'V1PodTemplate']],
report: bool,
) -> Union[AsyncFunctionTaskTemplate, Callable[P, R]]
Parameter | Type |
---|---|
_func |
|
name |
Optional[str] |
cache |
Union[CacheRequest] | None |
retries |
Union[int, RetryStrategy] |
timeout |
Union[timedelta, int] |
docs |
Optional[Documentation] |
secrets |
Optional[SecretRequest] |
pod_template |
Optional[Union[str, 'V1PodTemplate']] |
report |
bool |
Properties
Property | Type | Description |
---|---|---|
tasks |
None |
Get all tasks defined in the environment. |
flyte.Timeout
Timeout class to define a timeout for a task. The task timeout can be set to a maximum runtime and a maximum queued time. Maximum runtime is the maximum time the task can run for (in one attempt). Maximum queued time is the maximum time the task can stay in the queue before it starts executing.
Example usage:
timeout = Timeout(max_runtime=timedelta(minutes=5), max_queued_time=timedelta(minutes=10))
@env.task(timeout=timeout)
async def my_task():
pass
class Timeout(
max_runtime: datetime.timedelta | int,
max_queued_time: datetime.timedelta | int | None,
)
Parameter | Type |
---|---|
max_runtime |
datetime.timedelta | int |
max_queued_time |
datetime.timedelta | int | None |