2.0.0b28

flyte

Flyte SDK for authoring compound AI applications, services and workflows.

Directory

Classes

Class Description
Cache Cache configuration for a task.
Cron This class defines a Cron automation that can be associated with a Trigger in Flyte.
Device Represents a device type, its quantity and partition if applicable.
Environment
FixedRate This class defines a FixedRate automation that can be associated with a Trigger in Flyte.
Image This is a representation of Container Images, which can be used to create layered images programmatically.
PodTemplate Custom PodTemplate specification for a Task.
Resources Resources such as CPU, Memory, and GPU that can be allocated to a task.
RetryStrategy Retry strategy for the task or task environment.
ReusePolicy ReusePolicy can be used to configure a task to reuse the environment.
Secret Secrets are used to inject sensitive information into tasks or image build context.
TaskEnvironment Environment class to define a new environment for a set of tasks.
Timeout Timeout class to define a timeout for a task.
Trigger This class defines specification of a Trigger, that can be associated with any Flyte V2 task.

Protocols

Protocol Description
CachePolicy Base class for protocol classes.

Methods

Method Description
AMD_GPU() Create an AMD GPU device instance.
GPU() Create a GPU device instance.
HABANA_GAUDI() Create a Habana Gaudi device instance.
Neuron() Create a Neuron device instance.
TPU() Create a TPU device instance.
build() Build an image.
build_images() Build the images for the given environments.
ctx() Returns flyte.
current_domain() Returns the current domain from Runtime environment (on the cluster) or from the initialized configuration.
custom_context() Synchronous context manager to set input context for tasks spawned within this block.
deploy() Deploy the given environment or list of environments.
get_custom_context() Get the current input context.
group() Create a new group with the given name.
init() Initialize the Flyte system with the given configuration.
init_from_config() Initialize the Flyte system using a configuration file or Config object.
map() Map a function over the provided arguments with concurrent execution.
run() Run a task with the given parameters.
trace() A decorator that traces function execution with timing information.
version() Returns the version of the Flyte SDK.
with_runcontext() Launch a new run with the given parameters as the context.

Variables

Property Type Description
TimeoutType UnionType
TriggerTime _trigger_time
__version__ str
logger Logger

Methods

AMD_GPU()

def AMD_GPU(
    device: typing.Literal['MI100', 'MI210', 'MI250', 'MI250X', 'MI300A', 'MI300X', 'MI325X', 'MI350X', 'MI355X'],
) -> flyte._resources.Device

Create an AMD GPU device instance.

Parameter Type
device typing.Literal['MI100', 'MI210', 'MI250', 'MI250X', 'MI300A', 'MI300X', 'MI325X', 'MI350X', 'MI355X']

GPU()

def GPU(
    device: typing.Literal['A10', 'A10G', 'A100', 'A100 80G', 'B200', 'H100', 'L4', 'L40s', 'T4', 'V100', 'RTX PRO 6000'],
    quantity: typing.Literal[1, 2, 3, 4, 5, 6, 7, 8],
    partition: typing.Union[typing.Literal['1g.5gb', '2g.10gb', '3g.20gb', '4g.20gb', '7g.40gb'], typing.Literal['1g.10gb', '2g.20gb', '3g.40gb', '4g.40gb', '7g.80gb'], NoneType],
) -> flyte._resources.Device

Create a GPU device instance.

Parameter Type
device typing.Literal['A10', 'A10G', 'A100', 'A100 80G', 'B200', 'H100', 'L4', 'L40s', 'T4', 'V100', 'RTX PRO 6000']
quantity typing.Literal[1, 2, 3, 4, 5, 6, 7, 8]
partition typing.Union[typing.Literal['1g.5gb', '2g.10gb', '3g.20gb', '4g.20gb', '7g.40gb'], typing.Literal['1g.10gb', '2g.20gb', '3g.40gb', '4g.40gb', '7g.80gb'], NoneType]

HABANA_GAUDI()

def HABANA_GAUDI(
    device: typing.Literal['Gaudi1'],
) -> flyte._resources.Device

Create a Habana Gaudi device instance.

Parameter Type
device typing.Literal['Gaudi1']

Neuron()

def Neuron(
    device: typing.Literal['Inf1', 'Inf2', 'Trn1', 'Trn1n', 'Trn2', 'Trn2u'],
) -> flyte._resources.Device

Create a Neuron device instance.

Parameter Type
device typing.Literal['Inf1', 'Inf2', 'Trn1', 'Trn1n', 'Trn2', 'Trn2u']

TPU()

def TPU(
    device: typing.Literal['V5P', 'V6E'],
    partition: typing.Union[typing.Literal['2x2x1', '2x2x2', '2x4x4', '4x4x4', '4x4x8', '4x8x8', '8x8x8', '8x8x16', '8x16x16', '16x16x16', '16x16x24'], typing.Literal['1x1', '2x2', '2x4', '4x4', '4x8', '8x8', '8x16', '16x16'], NoneType],
)

Create a TPU device instance.

Parameter Type
device typing.Literal['V5P', 'V6E']
partition typing.Union[typing.Literal['2x2x1', '2x2x2', '2x4x4', '4x4x4', '4x4x8', '4x8x8', '8x8x8', '8x8x16', '8x16x16', '16x16x16', '16x16x24'], typing.Literal['1x1', '2x2', '2x4', '4x4', '4x8', '8x8', '8x16', '16x16'], NoneType]

build()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await build.aio().

def build(
    image: Image,
) -> str

Build an image. The existing async context will be used.

Example:

import flyte
image = flyte.Image("example_image")
if __name__ == "__main__":
    asyncio.run(flyte.build.aio(image))
Parameter Type
image Image

build_images()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await build_images.aio().

def build_images(
    envs: Environment,
) -> ImageCache

Build the images for the given environments.

Parameter Type
envs Environment

ctx()

def ctx()

Returns flyte.models.TaskContext if within a task context, else None Note: Only use this in task code and not module level.

current_domain()

def current_domain()

Returns the current domain from Runtime environment (on the cluster) or from the initialized configuration. This is safe to be used during deploy, run and within task code.

NOTE: This will not work if you deploy a task to a domain and then run it in another domain.

Raises InitializationError if the configuration is not initialized or domain is not set. :return: The current domain

custom_context()

def custom_context(
    context: str,
)

Synchronous context manager to set input context for tasks spawned within this block.

Example:

import flyte

env = flyte.TaskEnvironment(name="...")

@env.task
def t1():
    ctx = flyte.get_custom_context()
    print(ctx)

@env.task
def main():
    # context can be passed via a context manager
    with flyte.custom_context(project="my-project"):
        t1()  # will have {'project': 'my-project'} as context
Parameter Type
context str

deploy()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await deploy.aio().

def deploy(
    envs: Environment,
    dryrun: bool,
    version: str | None,
    interactive_mode: bool | None,
    copy_style: CopyFiles,
) -> List[Deployment]

Deploy the given environment or list of environments.

Parameter Type
envs Environment
dryrun bool
version str | None
interactive_mode bool | None
copy_style CopyFiles

get_custom_context()

def get_custom_context()

Get the current input context. This can be used within a task to retrieve context metadata that was passed to the action.

Context will automatically propagate to sub-actions.

Example:

import flyte

env = flyte.TaskEnvironment(name="...")

@env.task
def t1():
    # context can be retrieved with `get_custom_context`
    ctx = flyte.get_custom_context()
    print(ctx)  # {'project': '...', 'entity': '...'}

:return: Dictionary of context key-value pairs

group()

def group(
    name: str,
)

Create a new group with the given name. The method is intended to be used as a context manager.

Example:

@task
async def my_task():
    ...
    with group("my_group"):
        t1(x,y)  # tasks in this block will be grouped under "my_group"
    ...
Parameter Type
name str

init()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await init.aio().

def init(
    org: str | None,
    project: str | None,
    domain: str | None,
    root_dir: Path | None,
    log_level: int | None,
    endpoint: str | None,
    headless: bool,
    insecure: bool,
    insecure_skip_verify: bool,
    ca_cert_file_path: str | None,
    auth_type: AuthType,
    command: List[str] | None,
    proxy_command: List[str] | None,
    api_key: str | None,
    client_id: str | None,
    client_credentials_secret: str | None,
    auth_client_config: ClientConfig | None,
    rpc_retries: int,
    http_proxy_url: str | None,
    storage: Storage | None,
    batch_size: int,
    image_builder: ImageBuildEngine.ImageBuilderType,
    images: typing.Dict[str, str] | None,
    source_config_path: Optional[Path],
    sync_local_sys_paths: bool,
)

Initialize the Flyte system with the given configuration. This method should be called before any other Flyte remote API methods are called. Thread-safe implementation.

Parameter Type
org str | None
project str | None
domain str | None
root_dir Path | None
log_level int | None
endpoint str | None
headless bool
insecure bool
insecure_skip_verify bool
ca_cert_file_path str | None
auth_type AuthType
command List[str] | None
proxy_command List[str] | None
api_key str | None
client_id str | None
client_credentials_secret str | None
auth_client_config ClientConfig | None
rpc_retries int
http_proxy_url str | None
storage Storage | None
batch_size int
image_builder ImageBuildEngine.ImageBuilderType
images typing.Dict[str, str] | None
source_config_path Optional[Path]
sync_local_sys_paths bool

init_from_config()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await init_from_config.aio().

def init_from_config(
    path_or_config: str | Path | Config | None,
    root_dir: Path | None,
    log_level: int | None,
    storage: Storage | None,
    images: tuple[str, ...] | None,
    sync_local_sys_paths: bool,
)

Initialize the Flyte system using a configuration file or Config object. This method should be called before any other Flyte remote API methods are called. Thread-safe implementation.

Parameter Type
path_or_config str | Path | Config | None
root_dir Path | None
log_level int | None
storage Storage | None
images tuple[str, ...] | None
sync_local_sys_paths bool

map()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await flyte.map.aio().

def map(
    func: typing.Union[flyte._task.AsyncFunctionTaskTemplate[~P, ~R, ~F], functools.partial[~R]],
    args: *args,
    group_name: str | None,
    concurrency: int,
    return_exceptions: bool,
) -> typing.Iterator[typing.Union[~R, Exception]]

Map a function over the provided arguments with concurrent execution.

Parameter Type
func typing.Union[flyte._task.AsyncFunctionTaskTemplate[~P, ~R, ~F], functools.partial[~R]]
args *args
group_name str | None
concurrency int
return_exceptions bool

run()

This method can be called both synchronously or asynchronously.

Default invocation is sync and will block. To call it asynchronously, use the function .aio() on the method name itself, e.g.,: result = await run.aio().

def run(
    task: TaskTemplate[P, R, F],
    args: *args,
    kwargs: **kwargs,
) -> Union[R, Run]

Run a task with the given parameters

Parameter Type
task TaskTemplate[P, R, F]
args *args
kwargs **kwargs

trace()

def trace(
    func: typing.Callable[..., ~T],
) -> typing.Callable[..., ~T]

A decorator that traces function execution with timing information. Works with regular functions, async functions, and async generators/iterators.

Parameter Type
func typing.Callable[..., ~T]

version()

def version()

Returns the version of the Flyte SDK.

with_runcontext()

def with_runcontext(
    mode: Mode | None,
    name: Optional[str],
    service_account: Optional[str],
    version: Optional[str],
    copy_style: CopyFiles,
    dry_run: bool,
    copy_bundle_to: pathlib.Path | None,
    interactive_mode: bool | None,
    raw_data_path: str | None,
    run_base_dir: str | None,
    overwrite_cache: bool,
    project: str | None,
    domain: str | None,
    env_vars: Dict[str, str] | None,
    labels: Dict[str, str] | None,
    annotations: Dict[str, str] | None,
    interruptible: bool | None,
    log_level: int | None,
    disable_run_cache: bool,
    queue: Optional[str],
    custom_context: Dict[str, str] | None,
) -> _Runner

Launch a new run with the given parameters as the context.

Example:

import flyte
env = flyte.TaskEnvironment("example")

@env.task
async def example_task(x: int, y: str) -> str:
    return f"{x} {y}"

if __name__ == "__main__":
    flyte.with_runcontext(name="example_run_id").run(example_task, 1, y="hello")
Parameter Type
mode Mode | None
name Optional[str]
service_account Optional[str]
version Optional[str]
copy_style CopyFiles
dry_run bool
copy_bundle_to pathlib.Path | None
interactive_mode bool | None
raw_data_path str | None
run_base_dir str | None
overwrite_cache bool
project str | None
domain str | None
env_vars Dict[str, str] | None
labels Dict[str, str] | None
annotations Dict[str, str] | None
interruptible bool | None
log_level int | None
disable_run_cache bool
queue Optional[str]
custom_context Dict[str, str] | None

flyte.Cache

Cache configuration for a task.

class Cache(
    behavior: typing.Literal['auto', 'override', 'disable'],
    version_override: typing.Optional[str],
    serialize: bool,
    ignored_inputs: typing.Union[typing.Tuple[str, ...], str],
    salt: str,
    policies: typing.Union[typing.List[flyte._cache.cache.CachePolicy], flyte._cache.cache.CachePolicy, NoneType],
)
Parameter Type
behavior typing.Literal['auto', 'override', 'disable']
version_override typing.Optional[str]
serialize bool
ignored_inputs typing.Union[typing.Tuple[str, ...], str]
salt str
policies typing.Union[typing.List[flyte._cache.cache.CachePolicy], flyte._cache.cache.CachePolicy, NoneType]

Methods

Method Description
get_ignored_inputs()
get_version()
is_enabled() Check if the cache policy is enabled.

get_ignored_inputs()

def get_ignored_inputs()

get_version()

def get_version(
    params: typing.Optional[flyte._cache.cache.VersionParameters],
) -> str
Parameter Type
params typing.Optional[flyte._cache.cache.VersionParameters]

is_enabled()

def is_enabled()

Check if the cache policy is enabled.

flyte.CachePolicy

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
protocol CachePolicy()

Methods

Method Description
get_version()

get_version()

def get_version(
    salt: str,
    params: flyte._cache.cache.VersionParameters,
) -> str
Parameter Type
salt str
params flyte._cache.cache.VersionParameters

flyte.Cron

This class defines a Cron automation that can be associated with a Trigger in Flyte. Example usage:

from flyte.trigger import Trigger, Cron
my_trigger = Trigger(
    name="my_cron_trigger",
    automation=Cron("0 * * * *"),  # Runs every hour
    description="A trigger that runs every hour",
)
class Cron(
    expression: str,
    timezone: Timezone,
)
Parameter Type
expression str
timezone Timezone

Properties

Property Type Description
timezone_expression None

flyte.Device

Represents a device type, its quantity and partition if applicable. param device: The type of device (e.g., “T4”, “A100”). param quantity: The number of devices of this type. param partition: The partition of the device (e.g., “1g.5gb”, “2g.10gb” for gpus) or (“1x1”, … for tpus).

class Device(
    quantity: int,
    device_class: typing.Literal['GPU', 'TPU', 'NEURON', 'AMD_GPU', 'HABANA_GAUDI'],
    device: str | None,
    partition: str | None,
)
Parameter Type
quantity int
device_class typing.Literal['GPU', 'TPU', 'NEURON', 'AMD_GPU', 'HABANA_GAUDI']
device str | None
partition str | None

flyte.Environment

class Environment(
    name: str,
    depends_on: List[Environment],
    pod_template: Optional[Union[str, PodTemplate]],
    description: Optional[str],
    secrets: Optional[SecretRequest],
    env_vars: Optional[Dict[str, str]],
    resources: Optional[Resources],
    interruptible: bool,
    image: Union[str, Image, Literal['auto']],
)
Parameter Type
name str
depends_on List[Environment]
pod_template Optional[Union[str, PodTemplate]]
description Optional[str]
secrets Optional[SecretRequest]
env_vars Optional[Dict[str, str]]
resources Optional[Resources]
interruptible bool
image Union[str, Image, Literal['auto']]

Methods

Method Description
add_dependency() Add a dependency to the environment.
clone_with()

add_dependency()

def add_dependency(
    env: Environment,
)

Add a dependency to the environment.

Parameter Type
env Environment

clone_with()

def clone_with(
    name: str,
    image: Optional[Union[str, Image, Literal['auto']]],
    resources: Optional[Resources],
    env_vars: Optional[Dict[str, str]],
    secrets: Optional[SecretRequest],
    depends_on: Optional[List[Environment]],
    description: Optional[str],
    kwargs: **kwargs,
) -> Environment
Parameter Type
name str
image Optional[Union[str, Image, Literal['auto']]]
resources Optional[Resources]
env_vars Optional[Dict[str, str]]
secrets Optional[SecretRequest]
depends_on Optional[List[Environment]]
description Optional[str]
kwargs **kwargs

flyte.FixedRate

This class defines a FixedRate automation that can be associated with a Trigger in Flyte. Example usage:

from flyte.trigger import Trigger, FixedRate
my_trigger = Trigger(
    name="my_fixed_rate_trigger",
    automation=FixedRate(60),  # Runs every hour
    description="A trigger that runs every hour",
)
class FixedRate(
    interval_minutes: int,
    start_time: datetime | None,
)
Parameter Type
interval_minutes int
start_time datetime | None

flyte.Image

This is a representation of Container Images, which can be used to create layered images programmatically.

Use by first calling one of the base constructor methods. These all begin with from or default_ The image can then be amended with additional layers using the various with_* methods.

Invariant for this class: The construction of Image objects must be doable everywhere. That is, if a user has a custom image that is not accessible, calling .with_source_file on a file that doesn’t exist, the instantiation of the object itself must still go through. Further, the .identifier property of the image must also still go through. This is because it may have been already built somewhere else. Use validate() functions to check each layer for actual errors. These are invoked at actual build time. See self.id for more information

class Image(
    base_image: Optional[str],
    dockerfile: Optional[Path],
    registry: Optional[str],
    name: Optional[str],
    platform: Tuple[Architecture, ...],
    python_version: Tuple[int, int],
    _ref_name: Optional[str],
    _layers: Tuple[Layer, ...],
    _image_registry_secret: Optional[Secret],
)
Parameter Type
base_image Optional[str]
dockerfile Optional[Path]
registry Optional[str]
name Optional[str]
platform Tuple[Architecture, ...]
python_version Tuple[int, int]
_ref_name Optional[str]
_layers Tuple[Layer, ...]
_image_registry_secret Optional[Secret]

Methods

Method Description
clone() Use this method to clone the current image and change the registry and name.
from_base() Use this method to start with a pre-built base image.
from_debian_base() Use this method to start using the default base image, built from this library’s base Dockerfile.
from_dockerfile() Use this method to create a new image with the specified dockerfile.
from_ref_name()
from_uv_script() Use this method to create a new image with the specified uv script.
validate()
with_apt_packages() Use this method to create a new image with the specified apt packages layered on top of the current image.
with_commands() Use this method to create a new image with the specified commands layered on top of the current image.
with_dockerignore()
with_env_vars() Use this method to create a new image with the specified environment variables layered on top of.
with_local_v2() Use this method to create a new image with the local v2 builder.
with_pip_packages() Use this method to create a new image with the specified pip packages layered on top of the current image.
with_poetry_project() Use this method to create a new image with the specified pyproject.
with_requirements() Use this method to create a new image with the specified requirements file layered on top of the current image.
with_source_file() Use this method to create a new image with the specified local file layered on top of the current image.
with_source_folder() Use this method to create a new image with the specified local directory layered on top of the current image.
with_uv_project() Use this method to create a new image with the specified uv.
with_workdir() Use this method to create a new image with the specified working directory.

clone()

def clone(
    registry: Optional[str],
    registry_secret: Optional[str | Secret],
    name: Optional[str],
    base_image: Optional[str],
    python_version: Optional[Tuple[int, int]],
    addl_layer: Optional[Layer],
) -> Image

Use this method to clone the current image and change the registry and name

Parameter Type
registry Optional[str]
registry_secret Optional[str | Secret]
name Optional[str]
base_image Optional[str]
python_version Optional[Tuple[int, int]]
addl_layer Optional[Layer]

from_base()

def from_base(
    image_uri: str,
) -> Image

Use this method to start with a pre-built base image. This image must already exist in the registry of course.

Parameter Type
image_uri str

from_debian_base()

def from_debian_base(
    python_version: Optional[Tuple[int, int]],
    flyte_version: Optional[str],
    install_flyte: bool,
    registry: Optional[str],
    registry_secret: Optional[str | Secret],
    name: Optional[str],
    platform: Optional[Tuple[Architecture, ...]],
) -> Image

Use this method to start using the default base image, built from this library’s base Dockerfile Default images are multi-arch amd/arm64

Parameter Type
python_version Optional[Tuple[int, int]]
flyte_version Optional[str]
install_flyte bool
registry Optional[str]
registry_secret Optional[str | Secret]
name Optional[str]
platform Optional[Tuple[Architecture, ...]]

from_dockerfile()

def from_dockerfile(
    file: Path,
    registry: str,
    name: str,
    platform: Union[Architecture, Tuple[Architecture, ...], None],
) -> Image

Use this method to create a new image with the specified dockerfile. Note you cannot use additional layers after this, as the system doesn’t attempt to parse/understand the Dockerfile, and what kind of setup it has (python version, uv vs poetry, etc), so please put all logic into the dockerfile itself.

Also since Python sees paths as from the calling directory, please use Path objects with absolute paths. The context for the builder will be the directory where the dockerfile is located.

Parameter Type
file Path
registry str
name str
platform Union[Architecture, Tuple[Architecture, ...], None]

from_ref_name()

def from_ref_name(
    name: str,
) -> Image
Parameter Type
name str

from_uv_script()

def from_uv_script(
    script: Path | str,
    name: str,
    registry: str | None,
    registry_secret: Optional[str | Secret],
    python_version: Optional[Tuple[int, int]],
    index_url: Optional[str],
    extra_index_urls: Union[str, List[str], Tuple[str, ...], None],
    pre: bool,
    extra_args: Optional[str],
    platform: Optional[Tuple[Architecture, ...]],
    secret_mounts: Optional[SecretRequest],
) -> Image

Use this method to create a new image with the specified uv script. It uses the header of the script to determine the python version, dependencies to install. The script must be a valid uv script, otherwise an error will be raised.

Usually the header of the script will look like this: Example:

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = ["httpx"]
# ///

For more information on the uv script format, see the documentation: UV: Declaring script dependencies

Parameter Type
script Path | str
name str
registry str | None
registry_secret Optional[str | Secret]
python_version Optional[Tuple[int, int]]
index_url Optional[str]
extra_index_urls Union[str, List[str], Tuple[str, ...], None]
pre bool
extra_args Optional[str]
platform Optional[Tuple[Architecture, ...]]
secret_mounts Optional[SecretRequest]

validate()

def validate()

with_apt_packages()

def with_apt_packages(
    packages: str,
    secret_mounts: Optional[SecretRequest],
) -> Image

Use this method to create a new image with the specified apt packages layered on top of the current image

Parameter Type
packages str
secret_mounts Optional[SecretRequest]

with_commands()

def with_commands(
    commands: List[str],
    secret_mounts: Optional[SecretRequest],
) -> Image

Use this method to create a new image with the specified commands layered on top of the current image Be sure not to use RUN in your command.

Parameter Type
commands List[str]
secret_mounts Optional[SecretRequest]

with_dockerignore()

def with_dockerignore(
    path: Path,
) -> Image
Parameter Type
path Path

with_env_vars()

def with_env_vars(
    env_vars: Dict[str, str],
) -> Image

Use this method to create a new image with the specified environment variables layered on top of the current image. Cannot be used in conjunction with conda

Parameter Type
env_vars Dict[str, str]

with_local_v2()

def with_local_v2()

Use this method to create a new image with the local v2 builder This will override any existing builder

:return: Image

with_pip_packages()

def with_pip_packages(
    packages: str,
    index_url: Optional[str],
    extra_index_urls: Union[str, List[str], Tuple[str, ...], None],
    pre: bool,
    extra_args: Optional[str],
    secret_mounts: Optional[SecretRequest],
) -> Image

Use this method to create a new image with the specified pip packages layered on top of the current image Cannot be used in conjunction with conda

Example:

@flyte.task(image=(flyte.Image.from_debian_base().with_pip_packages("requests", "numpy")))
def my_task(x: int) -> int:
    import numpy as np
    return np.sum([x, 1])

To mount secrets during the build process to download private packages, you can use the secret_mounts. In the below example, “GITHUB_PAT” will be mounted as env var “GITHUB_PAT”, and “apt-secret” will be mounted at /etc/apt/apt-secret. Example:

private_package = "git+https://[email protected]/flyteorg/flytex.git@2e20a2acebfc3877d84af643fdd768edea41d533"
@flyte.task(
    image=(
        flyte.Image.from_debian_base()
        .with_pip_packages("private_package", secret_mounts=[Secret(key="GITHUB_PAT")])
        .with_apt_packages("git", secret_mounts=[Secret(key="apt-secret", mount="/etc/apt/apt-secret")])
)
def my_task(x: int) -> int:
    import numpy as np
    return np.sum([x, 1])
Parameter Type
packages str
index_url Optional[str]
extra_index_urls Union[str, List[str], Tuple[str, ...], None]
pre bool
extra_args Optional[str]
secret_mounts Optional[SecretRequest]

with_poetry_project()

def with_poetry_project(
    pyproject_file: str | Path,
    poetry_lock: Path | None,
    extra_args: Optional[str],
    secret_mounts: Optional[SecretRequest],
)

Use this method to create a new image with the specified pyproject.toml layered on top of the current image. Must have a corresponding pyproject.toml file in the same directory. Cannot be used in conjunction with conda.

By default, this method copies the entire project into the image, including files such as pyproject.toml, poetry.lock, and the src/ directory.

If you prefer not to install the current project, you can pass through extra_args --no-root. In this case, the image builder will only copy pyproject.toml and poetry.lock into the image.

Parameter Type
pyproject_file str | Path
poetry_lock Path | None
extra_args Optional[str]
secret_mounts Optional[SecretRequest]

with_requirements()

def with_requirements(
    file: str | Path,
    secret_mounts: Optional[SecretRequest],
) -> Image

Use this method to create a new image with the specified requirements file layered on top of the current image Cannot be used in conjunction with conda

Parameter Type
file str | Path
secret_mounts Optional[SecretRequest]

with_source_file()

def with_source_file(
    src: Path,
    dst: str,
) -> Image

Use this method to create a new image with the specified local file layered on top of the current image. If dest is not specified, it will be copied to the working directory of the image

Parameter Type
src Path
dst str

with_source_folder()

def with_source_folder(
    src: Path,
    dst: str,
    copy_contents_only: bool,
) -> Image

Use this method to create a new image with the specified local directory layered on top of the current image. If dest is not specified, it will be copied to the working directory of the image

Parameter Type
src Path
dst str
copy_contents_only bool

with_uv_project()

def with_uv_project(
    pyproject_file: str | Path,
    uvlock: Path | None,
    index_url: Optional[str],
    extra_index_urls: Union[List[str], Tuple[str, ...], None],
    pre: bool,
    extra_args: Optional[str],
    secret_mounts: Optional[SecretRequest],
    project_install_mode: typing.Literal['dependencies_only', 'install_project'],
) -> Image

Use this method to create a new image with the specified uv.lock file layered on top of the current image Must have a corresponding pyproject.toml file in the same directory Cannot be used in conjunction with conda

By default, this method copies the pyproject.toml and uv.lock files into the image.

If project_install_mode is “install_project”, it will also copy directory where the pyproject.toml file is located into the image.

Parameter Type
pyproject_file str | Path
uvlock Path | None
index_url Optional[str]
extra_index_urls Union[List[str], Tuple[str, ...], None]
pre bool
extra_args Optional[str]
secret_mounts Optional[SecretRequest]
project_install_mode typing.Literal['dependencies_only', 'install_project']

with_workdir()

def with_workdir(
    workdir: str,
) -> Image

Use this method to create a new image with the specified working directory This will override any existing working directory

Parameter Type
workdir str

flyte.PodTemplate

Custom PodTemplate specification for a Task.

class PodTemplate(
    pod_spec: typing.Optional[ForwardRef('V1PodSpec')],
    primary_container_name: str,
    labels: typing.Optional[typing.Dict[str, str]],
    annotations: typing.Optional[typing.Dict[str, str]],
)
Parameter Type
pod_spec typing.Optional[ForwardRef('V1PodSpec')]
primary_container_name str
labels typing.Optional[typing.Dict[str, str]]
annotations typing.Optional[typing.Dict[str, str]]

Methods

Method Description
to_k8s_pod()

to_k8s_pod()

def to_k8s_pod()

flyte.Resources

Resources such as CPU, Memory, and GPU that can be allocated to a task.

Example:

  • Single CPU, 1GiB of memory, and 1 T4 GPU:
@task(resources=Resources(cpu=1, memory="1GiB", gpu="T4:1"))
def my_task() -> int:
    return 42
  • 1CPU with limit upto 2CPU, 2GiB of memory, and 8 A100 GPUs and 10GiB of disk:
@task(resources=Resources(cpu=(1, 2), memory="2GiB", gpu="A100:8", disk="10GiB"))
def my_task() -> int:
    return 42
class Resources(
    cpu: typing.Union[int, float, str, typing.Tuple[int | float | str, int | float | str], NoneType],
    memory: typing.Union[str, typing.Tuple[str, str], NoneType],
    gpu: typing.Union[typing.Literal['A10:1', 'A10:2', 'A10:3', 'A10:4', 'A10:5', 'A10:6', 'A10:7', 'A10:8', 'A10G:1', 'A10G:2', 'A10G:3', 'A10G:4', 'A10G:5', 'A10G:6', 'A10G:7', 'A10G:8', 'A100:1', 'A100:2', 'A100:3', 'A100:4', 'A100:5', 'A100:6', 'A100:7', 'A100:8', 'A100 80G:1', 'A100 80G:2', 'A100 80G:3', 'A100 80G:4', 'A100 80G:5', 'A100 80G:6', 'A100 80G:7', 'A100 80G:8', 'B200:1', 'B200:2', 'B200:3', 'B200:4', 'B200:5', 'B200:6', 'B200:7', 'B200:8', 'H100:1', 'H100:2', 'H100:3', 'H100:4', 'H100:5', 'H100:6', 'H100:7', 'H100:8', 'H200:1', 'H200:2', 'H200:3', 'H200:4', 'H200:5', 'H200:6', 'H200:7', 'H200:8', 'L4:1', 'L4:2', 'L4:3', 'L4:4', 'L4:5', 'L4:6', 'L4:7', 'L4:8', 'L40s:1', 'L40s:2', 'L40s:3', 'L40s:4', 'L40s:5', 'L40s:6', 'L40s:7', 'L40s:8', 'V100:1', 'V100:2', 'V100:3', 'V100:4', 'V100:5', 'V100:6', 'V100:7', 'V100:8', 'RTX PRO 6000:1', 'T4:1', 'T4:2', 'T4:3', 'T4:4', 'T4:5', 'T4:6', 'T4:7', 'T4:8', 'Trn1:1', 'Trn1:4', 'Trn1:8', 'Trn1:16', 'Trn1n:1', 'Trn1n:4', 'Trn1n:8', 'Trn1n:16', 'Trn2:1', 'Trn2:4', 'Trn2:8', 'Trn2:16', 'Trn2u:1', 'Trn2u:4', 'Trn2u:8', 'Trn2u:16', 'Inf1:1', 'Inf1:2', 'Inf1:3', 'Inf1:4', 'Inf1:5', 'Inf1:6', 'Inf1:7', 'Inf1:8', 'Inf1:9', 'Inf1:10', 'Inf1:11', 'Inf1:12', 'Inf1:13', 'Inf1:14', 'Inf1:15', 'Inf1:16', 'Inf2:1', 'Inf2:2', 'Inf2:3', 'Inf2:4', 'Inf2:5', 'Inf2:6', 'Inf2:7', 'Inf2:8', 'Inf2:9', 'Inf2:10', 'Inf2:11', 'Inf2:12', 'MI100:1', 'MI210:1', 'MI250:1', 'MI250X:1', 'MI300A:1', 'MI300X:1', 'MI325X:1', 'MI350X:1', 'MI355X:1', 'Gaudi1:1'], int, flyte._resources.Device, NoneType],
    disk: typing.Optional[str],
    shm: typing.Union[str, typing.Literal['auto'], NoneType],
)
Parameter Type
cpu typing.Union[int, float, str, typing.Tuple[int | float | str, int | float | str], NoneType]
memory typing.Union[str, typing.Tuple[str, str], NoneType]
gpu typing.Union[typing.Literal['A10:1', 'A10:2', 'A10:3', 'A10:4', 'A10:5', 'A10:6', 'A10:7', 'A10:8', 'A10G:1', 'A10G:2', 'A10G:3', 'A10G:4', 'A10G:5', 'A10G:6', 'A10G:7', 'A10G:8', 'A100:1', 'A100:2', 'A100:3', 'A100:4', 'A100:5', 'A100:6', 'A100:7', 'A100:8', 'A100 80G:1', 'A100 80G:2', 'A100 80G:3', 'A100 80G:4', 'A100 80G:5', 'A100 80G:6', 'A100 80G:7', 'A100 80G:8', 'B200:1', 'B200:2', 'B200:3', 'B200:4', 'B200:5', 'B200:6', 'B200:7', 'B200:8', 'H100:1', 'H100:2', 'H100:3', 'H100:4', 'H100:5', 'H100:6', 'H100:7', 'H100:8', 'H200:1', 'H200:2', 'H200:3', 'H200:4', 'H200:5', 'H200:6', 'H200:7', 'H200:8', 'L4:1', 'L4:2', 'L4:3', 'L4:4', 'L4:5', 'L4:6', 'L4:7', 'L4:8', 'L40s:1', 'L40s:2', 'L40s:3', 'L40s:4', 'L40s:5', 'L40s:6', 'L40s:7', 'L40s:8', 'V100:1', 'V100:2', 'V100:3', 'V100:4', 'V100:5', 'V100:6', 'V100:7', 'V100:8', 'RTX PRO 6000:1', 'T4:1', 'T4:2', 'T4:3', 'T4:4', 'T4:5', 'T4:6', 'T4:7', 'T4:8', 'Trn1:1', 'Trn1:4', 'Trn1:8', 'Trn1:16', 'Trn1n:1', 'Trn1n:4', 'Trn1n:8', 'Trn1n:16', 'Trn2:1', 'Trn2:4', 'Trn2:8', 'Trn2:16', 'Trn2u:1', 'Trn2u:4', 'Trn2u:8', 'Trn2u:16', 'Inf1:1', 'Inf1:2', 'Inf1:3', 'Inf1:4', 'Inf1:5', 'Inf1:6', 'Inf1:7', 'Inf1:8', 'Inf1:9', 'Inf1:10', 'Inf1:11', 'Inf1:12', 'Inf1:13', 'Inf1:14', 'Inf1:15', 'Inf1:16', 'Inf2:1', 'Inf2:2', 'Inf2:3', 'Inf2:4', 'Inf2:5', 'Inf2:6', 'Inf2:7', 'Inf2:8', 'Inf2:9', 'Inf2:10', 'Inf2:11', 'Inf2:12', 'MI100:1', 'MI210:1', 'MI250:1', 'MI250X:1', 'MI300A:1', 'MI300X:1', 'MI325X:1', 'MI350X:1', 'MI355X:1', 'Gaudi1:1'], int, flyte._resources.Device, NoneType]
disk typing.Optional[str]
shm typing.Union[str, typing.Literal['auto'], NoneType]

Methods

Method Description
get_device() Get the accelerator string for the task.
get_shared_memory() Get the shared memory string for the task.

get_device()

def get_device()

Get the accelerator string for the task.

:return: If GPUs are requested, return a tuple of the device name, and potentially a partition string. Default cloud provider labels typically use the following values: 1g.5gb, 2g.10gb, etc.

get_shared_memory()

def get_shared_memory()

Get the shared memory string for the task.

:return: The shared memory string.

flyte.RetryStrategy

Retry strategy for the task or task environment. Retry strategy is optional or can be a simple number of retries.

Example:

  • This will retry the task 5 times.
@task(retries=5)
def my_task():
    pass
  • This will retry the task 5 times with a maximum backoff of 10 seconds and a backoff factor of 2.
@task(retries=RetryStrategy(count=5, max_backoff=10, backoff=2))
def my_task():
    pass
class RetryStrategy(
    count: int,
    backoff: typing.Union[float, datetime.timedelta, NoneType],
    backoff_factor: typing.Union[int, float, NoneType],
)
Parameter Type
count int
backoff typing.Union[float, datetime.timedelta, NoneType]
backoff_factor typing.Union[int, float, NoneType]

flyte.ReusePolicy

ReusePolicy can be used to configure a task to reuse the environment. This is useful when the environment creation is expensive and the runtime of the task is short. The environment will be reused for the next invocation of the task, even the python process maybe be reused by subsequent task invocations. A good mental model is to think of the environment as a container that is reused for multiple tasks, more like a long-running service.

Caution: It is important to note that the environment is shared, so managing memory and resources is important.

class ReusePolicy(
    replicas: typing.Union[int, typing.Tuple[int, int]],
    idle_ttl: typing.Union[int, datetime.timedelta],
    concurrency: int,
    scaledown_ttl: typing.Union[int, datetime.timedelta],
)
Parameter Type
replicas typing.Union[int, typing.Tuple[int, int]]
idle_ttl typing.Union[int, datetime.timedelta]
concurrency int
scaledown_ttl typing.Union[int, datetime.timedelta]

Methods

Method Description
get_scaledown_ttl() Returns the scaledown TTL as a timedelta.

get_scaledown_ttl()

def get_scaledown_ttl()

Returns the scaledown TTL as a timedelta. If scaledown_ttl is not set, returns None.

Properties

Property Type Description
max_replicas None
Returns the maximum number of replicas.
min_replicas None
Returns the minimum number of replicas.
ttl None
Returns the idle TTL as a timedelta. If idle_ttl is not set, returns the global default.

flyte.Secret

Secrets are used to inject sensitive information into tasks or image build context. Secrets can be mounted as environment variables or files. The secret key is the name of the secret in the secret store. The group is optional and maybe used with some secret stores to organize secrets. The secret_mount is used to specify how the secret should be mounted. If the secret_mount is set to “env” the secret will be mounted as an environment variable. If the secret_mount is set to “file” the secret will be mounted as a file. The as_env_var is an optional parameter that can be used to specify the name of the environment variable that the secret should be mounted as.

Example:

@task(secrets="my-secret")
async def my_task():
    # This will be set to the value of the secret. Note: The env var is always uppercase, and - is replaced with _.
    os.environ["MY_SECRET"]

@task(secrets=Secret("my-openai-api-key", as_env_var="OPENAI_API_KEY"))
async def my_task2():
    os.environ["OPENAI_API_KEY"]

TODO: Add support for secret versioning (some stores) and secret groups (some stores) and mounting as files.

class Secret(
    key: str,
    group: typing.Optional[str],
    mount: pathlib._local.Path | None,
    as_env_var: typing.Optional[str],
)
Parameter Type
key str
group typing.Optional[str]
mount pathlib._local.Path | None
as_env_var typing.Optional[str]

Methods

Method Description
stable_hash() Deterministic, process-independent hash (as hex string).

stable_hash()

def stable_hash()

Deterministic, process-independent hash (as hex string).

flyte.TaskEnvironment

Environment class to define a new environment for a set of tasks.

Example usage:

env = flyte.TaskEnvironment(name="my_env", image="my_image", resources=Resources(cpu="1", memory="1Gi"))

@env.task
async def my_task():
    pass
class TaskEnvironment(
    name: str,
    depends_on: List[Environment],
    pod_template: Optional[Union[str, PodTemplate]],
    description: Optional[str],
    secrets: Optional[SecretRequest],
    env_vars: Optional[Dict[str, str]],
    resources: Optional[Resources],
    interruptible: bool,
    image: Union[str, Image, Literal['auto']],
    cache: CacheRequest,
    reusable: ReusePolicy | None,
    plugin_config: Optional[Any],
    queue: Optional[str],
)
Parameter Type
name str
depends_on List[Environment]
pod_template Optional[Union[str, PodTemplate]]
description Optional[str]
secrets Optional[SecretRequest]
env_vars Optional[Dict[str, str]]
resources Optional[Resources]
interruptible bool
image Union[str, Image, Literal['auto']]
cache CacheRequest
reusable ReusePolicy | None
plugin_config Optional[Any]
queue Optional[str]

Methods

Method Description
add_dependency() Add a dependency to the environment.
clone_with() Clone the TaskEnvironment with new parameters.
from_task() Create a TaskEnvironment from a list of tasks.
task() Decorate a function to be a task.

add_dependency()

def add_dependency(
    env: Environment,
)

Add a dependency to the environment.

Parameter Type
env Environment

clone_with()

def clone_with(
    name: str,
    image: Optional[Union[str, Image, Literal['auto']]],
    resources: Optional[Resources],
    env_vars: Optional[Dict[str, str]],
    secrets: Optional[SecretRequest],
    depends_on: Optional[List[Environment]],
    description: Optional[str],
    interruptible: Optional[bool],
    kwargs: **kwargs,
) -> TaskEnvironment

Clone the TaskEnvironment with new parameters.

Besides the base environment parameters, you can override kwargs like cache, reusable, etc.

Parameter Type
name str
image Optional[Union[str, Image, Literal['auto']]]
resources Optional[Resources]
env_vars Optional[Dict[str, str]]
secrets Optional[SecretRequest]
depends_on Optional[List[Environment]]
description Optional[str]
interruptible Optional[bool]
kwargs **kwargs

from_task()

def from_task(
    name: str,
    tasks: TaskTemplate,
) -> TaskEnvironment

Create a TaskEnvironment from a list of tasks. All tasks should have the same image or no Image defined. Similarity of Image is determined by the python reference, not by value.

If images are different, an error is raised. If no image is defined, the image is set to “auto”.

For any other tasks that need to be use these tasks, the returned environment can be used in the depends_on attribute of the other TaskEnvironment.

Parameter Type
name str
tasks TaskTemplate

task()

def task(
    _func: F | None,
    short_name: Optional[str],
    cache: CacheRequest | None,
    retries: Union[int, RetryStrategy],
    timeout: Union[timedelta, int],
    docs: Optional[Documentation],
    pod_template: Optional[Union[str, PodTemplate]],
    report: bool,
    interruptible: bool | None,
    max_inline_io_bytes: int,
    queue: Optional[str],
    triggers: Tuple[Trigger, ...] | Trigger,
) -> Callable[[F], AsyncFunctionTaskTemplate[P, R, F]] | AsyncFunctionTaskTemplate[P, R, F]

Decorate a function to be a task.

Parameter Type
_func F | None
short_name Optional[str]
cache CacheRequest | None
retries Union[int, RetryStrategy]
timeout Union[timedelta, int]
docs Optional[Documentation]
pod_template Optional[Union[str, PodTemplate]]
report bool
interruptible bool | None
max_inline_io_bytes int
queue Optional[str]
triggers Tuple[Trigger, ...] | Trigger

Properties

Property Type Description
tasks None
Get all tasks defined in the environment.

flyte.Timeout

Timeout class to define a timeout for a task. The task timeout can be set to a maximum runtime and a maximum queued time. Maximum runtime is the maximum time the task can run for (in one attempt). Maximum queued time is the maximum time the task can stay in the queue before it starts executing.

Example usage:

timeout = Timeout(max_runtime=timedelta(minutes=5), max_queued_time=timedelta(minutes=10))
@env.task(timeout=timeout)
async def my_task():
    pass
class Timeout(
    max_runtime: datetime.timedelta | int,
    max_queued_time: datetime.timedelta | int | None,
)
Parameter Type
max_runtime datetime.timedelta | int
max_queued_time datetime.timedelta | int | None

flyte.Trigger

This class defines specification of a Trigger, that can be associated with any Flyte V2 task. The trigger then is deployed to the Flyte Platform.

Triggers can be used to run tasks on a schedule, in response to events, or based on other conditions. The Trigger class encapsulates the metadata and configuration needed to define a trigger.

You can associate the same Trigger object with multiple tasks.

Example usage:

from flyte.trigger import Trigger
my_trigger = Trigger(
    name="my_trigger",
    description="A trigger that runs every hour",
)
class Trigger(
    name: str,
    automation: Union[Cron, FixedRate],
    description: str,
    auto_activate: bool,
    inputs: Dict[str, Any] | None,
    env_vars: Dict[str, str] | None,
    interruptible: bool | None,
    overwrite_cache: bool,
    queue: str | None,
    labels: Mapping[str, str] | None,
    annotations: Mapping[str, str] | None,
)
Parameter Type
name str
automation Union[Cron, FixedRate]
description str
auto_activate bool
inputs Dict[str, Any] | None
env_vars Dict[str, str] | None
interruptible bool | None
overwrite_cache bool
queue str | None
labels Mapping[str, str] | None
annotations Mapping[str, str] | None

Methods

Method Description
daily() Creates a Cron trigger that runs daily at midnight.
hourly() Creates a Cron trigger that runs every hour.
minutely() Creates a Cron trigger that runs every minute.
monthly() Creates a Cron trigger that runs monthly on the 1st at midnight.
weekly() Creates a Cron trigger that runs weekly on Sundays at midnight.

daily()

def daily(
    trigger_time_input_key: str,
    name: str,
    description: str,
    auto_activate: bool,
    inputs: Dict[str, Any] | None,
    env_vars: Dict[str, str] | None,
    interruptible: bool | None,
    overwrite_cache: bool,
    queue: str | None,
    labels: Mapping[str, str] | None,
    annotations: Mapping[str, str] | None,
) -> Trigger

Creates a Cron trigger that runs daily at midnight.

Parameter Type
trigger_time_input_key str
name str
description str
auto_activate bool
inputs Dict[str, Any] | None
env_vars Dict[str, str] | None
interruptible bool | None
overwrite_cache bool
queue str | None
labels Mapping[str, str] | None
annotations Mapping[str, str] | None

hourly()

def hourly(
    trigger_time_input_key: str,
    name: str,
    description: str,
    auto_activate: bool,
    inputs: Dict[str, Any] | None,
    env_vars: Dict[str, str] | None,
    interruptible: bool | None,
    overwrite_cache: bool,
    queue: str | None,
    labels: Mapping[str, str] | None,
    annotations: Mapping[str, str] | None,
) -> Trigger

Creates a Cron trigger that runs every hour.

Parameter Type
trigger_time_input_key str
name str
description str
auto_activate bool
inputs Dict[str, Any] | None
env_vars Dict[str, str] | None
interruptible bool | None
overwrite_cache bool
queue str | None
labels Mapping[str, str] | None
annotations Mapping[str, str] | None

minutely()

def minutely(
    trigger_time_input_key: str,
    name: str,
    description: str,
    auto_activate: bool,
    inputs: Dict[str, Any] | None,
    env_vars: Dict[str, str] | None,
    interruptible: bool | None,
    overwrite_cache: bool,
    queue: str | None,
    labels: Mapping[str, str] | None,
    annotations: Mapping[str, str] | None,
) -> Trigger

Creates a Cron trigger that runs every minute.

Parameter Type
trigger_time_input_key str
name str
description str
auto_activate bool
inputs Dict[str, Any] | None
env_vars Dict[str, str] | None
interruptible bool | None
overwrite_cache bool
queue str | None
labels Mapping[str, str] | None
annotations Mapping[str, str] | None

monthly()

def monthly(
    trigger_time_input_key: str,
    name: str,
    description: str,
    auto_activate: bool,
    inputs: Dict[str, Any] | None,
    env_vars: Dict[str, str] | None,
    interruptible: bool | None,
    overwrite_cache: bool,
    queue: str | None,
    labels: Mapping[str, str] | None,
    annotations: Mapping[str, str] | None,
) -> Trigger

Creates a Cron trigger that runs monthly on the 1st at midnight.

Parameter Type
trigger_time_input_key str
name str
description str
auto_activate bool
inputs Dict[str, Any] | None
env_vars Dict[str, str] | None
interruptible bool | None
overwrite_cache bool
queue str | None
labels Mapping[str, str] | None
annotations Mapping[str, str] | None

weekly()

def weekly(
    trigger_time_input_key: str,
    name: str,
    description: str,
    auto_activate: bool,
    inputs: Dict[str, Any] | None,
    env_vars: Dict[str, str] | None,
    interruptible: bool | None,
    overwrite_cache: bool,
    queue: str | None,
    labels: Mapping[str, str] | None,
    annotations: Mapping[str, str] | None,
) -> Trigger

Creates a Cron trigger that runs weekly on Sundays at midnight.

Parameter Type
trigger_time_input_key str
name str
description str
auto_activate bool
inputs Dict[str, Any] | None
env_vars Dict[str, str] | None
interruptible bool | None
overwrite_cache bool
queue str | None
labels Mapping[str, str] | None
annotations Mapping[str, str] | None