2.0.0b1

flyte.storage

Directory

Classes

Class Description
ABFS Any Azure Blob Storage specific configuration.
GCS Any GCS specific configuration.
S3 S3 specific configuration.
Storage Data storage configuration that applies across any provider.

Methods

Method Description
get()
get_configured_fsspec_kwargs()
get_random_local_directory() :return: a random directory.
get_random_local_path() Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name.
get_stream() Get a stream of data from a remote location.
get_underlying_filesystem()
is_remote() Let’s find a replacement.
join() Join multiple paths together.
put()
put_stream() Put a stream of data to a remote location.

Methods

get()

def get(
    from_path: str,
    to_path: typing.Union[str, pathlib._local.Path, NoneType],
    recursive: bool,
    kwargs,
) -> str
Parameter Type
from_path str
to_path typing.Union[str, pathlib._local.Path, NoneType]
recursive bool
kwargs **kwargs

get_configured_fsspec_kwargs()

def get_configured_fsspec_kwargs(
    protocol: typing.Optional[str],
    anonymous: bool,
) -> typing.Dict[str, typing.Any]
Parameter Type
protocol typing.Optional[str]
anonymous bool

get_random_local_directory()

def get_random_local_directory()

:return: a random directory :rtype: pathlib.Path

get_random_local_path()

def get_random_local_path(
    file_path_or_file_name: pathlib._local.Path | str | None,
) -> pathlib._local.Path

Use file_path_or_file_name, when you want a random directory, but want to preserve the leaf file name

Parameter Type
file_path_or_file_name pathlib._local.Path | str | None

get_stream()

def get_stream(
    path: str,
    chunk_size,
    kwargs,
) -> typing.AsyncIterator[bytes]

Get a stream of data from a remote location. This is useful for downloading streaming data from a remote location. Example usage:

import flyte.storage as storage
obj = storage.get_stream(path="s3://my_bucket/my_file.txt")
Parameter Type
path str
chunk_size
kwargs **kwargs

get_underlying_filesystem()

def get_underlying_filesystem(
    protocol: typing.Optional[str],
    anonymous: bool,
    path: typing.Optional[str],
    kwargs,
) -> fsspec.spec.AbstractFileSystem
Parameter Type
protocol typing.Optional[str]
anonymous bool
path typing.Optional[str]
kwargs **kwargs

is_remote()

def is_remote(
    path: typing.Union[pathlib._local.Path, str],
) -> bool

Let’s find a replacement

Parameter Type
path typing.Union[pathlib._local.Path, str]

join()

def join(
    paths: str,
) -> str

Join multiple paths together. This is a wrapper around os.path.join.

TODO replace with proper join with fsspec root etc

Parameter Type
paths str

put()

def put(
    from_path: str,
    to_path: typing.Optional[str],
    recursive: bool,
    kwargs,
) -> str
Parameter Type
from_path str
to_path typing.Optional[str]
recursive bool
kwargs **kwargs

put_stream()

def put_stream(
    data_iterable: typing.Union[typing.AsyncIterable[bytes], bytes],
    name: str | None,
    to_path: str | None,
    kwargs,
) -> str

Put a stream of data to a remote location. This is useful for streaming data to a remote location. Example usage:

import flyte.storage as storage
storage.put_stream(iter([b'hello']), name="my_file.txt")
OR
storage.put_stream(iter([b'hello']), to_path="s3://my_bucket/my_file.txt")
Parameter Type
data_iterable typing.Union[typing.AsyncIterable[bytes], bytes]
name str | None
to_path str | None
kwargs **kwargs

flyte.storage.ABFS

Any Azure Blob Storage specific configuration.

class ABFS(
    retries: int,
    backoff: datetime.timedelta,
    enable_debug: bool,
    attach_execution_metadata: bool,
    account_name: typing.Optional[str],
    account_key: typing.Optional[str],
    tenant_id: typing.Optional[str],
    client_id: typing.Optional[str],
    client_secret: typing.Optional[str],
)
Parameter Type
retries int
backoff datetime.timedelta
enable_debug bool
attach_execution_metadata bool
account_name typing.Optional[str]
account_key typing.Optional[str]
tenant_id typing.Optional[str]
client_id typing.Optional[str]
client_secret typing.Optional[str]

Methods

Method Description
auto() Construct the config object automatically from environment variables.
get_fsspec_kwargs() Returns the configuration as kwargs for constructing an fsspec filesystem.

auto()

def auto()

Construct the config object automatically from environment variables.

get_fsspec_kwargs()

def get_fsspec_kwargs(
    anonymous: bool,
    kwargs,
) -> typing.Dict[str, typing.Any]

Returns the configuration as kwargs for constructing an fsspec filesystem.

Parameter Type
anonymous bool
kwargs **kwargs

flyte.storage.GCS

Any GCS specific configuration.

class GCS(
    retries: int,
    backoff: datetime.timedelta,
    enable_debug: bool,
    attach_execution_metadata: bool,
    gsutil_parallelism: bool,
)
Parameter Type
retries int
backoff datetime.timedelta
enable_debug bool
attach_execution_metadata bool
gsutil_parallelism bool

Methods

Method Description
auto() Construct the config object automatically from environment variables.
get_fsspec_kwargs() Returns the configuration as kwargs for constructing an fsspec filesystem.

auto()

def auto()

Construct the config object automatically from environment variables.

get_fsspec_kwargs()

def get_fsspec_kwargs(
    anonymous: bool,
    kwargs,
) -> typing.Dict[str, typing.Any]

Returns the configuration as kwargs for constructing an fsspec filesystem.

Parameter Type
anonymous bool
kwargs **kwargs

flyte.storage.S3

S3 specific configuration

class S3(
    retries: int,
    backoff: datetime.timedelta,
    enable_debug: bool,
    attach_execution_metadata: bool,
    endpoint: typing.Optional[str],
    access_key_id: typing.Optional[str],
    secret_access_key: typing.Optional[str],
)
Parameter Type
retries int
backoff datetime.timedelta
enable_debug bool
attach_execution_metadata bool
endpoint typing.Optional[str]
access_key_id typing.Optional[str]
secret_access_key typing.Optional[str]

Methods

Method Description
auto() :return: Config.
for_sandbox() :return:.
get_fsspec_kwargs() Returns the configuration as kwargs for constructing an fsspec filesystem.

auto()

def auto()

:return: Config

for_sandbox()

def for_sandbox()

:return:

get_fsspec_kwargs()

def get_fsspec_kwargs(
    anonymous: bool,
    kwargs,
) -> typing.Dict[str, typing.Any]

Returns the configuration as kwargs for constructing an fsspec filesystem.

Parameter Type
anonymous bool
kwargs **kwargs

flyte.storage.Storage

Data storage configuration that applies across any provider.

class Storage(
    retries: int,
    backoff: datetime.timedelta,
    enable_debug: bool,
    attach_execution_metadata: bool,
)
Parameter Type
retries int
backoff datetime.timedelta
enable_debug bool
attach_execution_metadata bool

Methods

Method Description
auto() Construct the config object automatically from environment variables.
get_fsspec_kwargs() Returns the configuration as kwargs for constructing an fsspec filesystem.

auto()

def auto()

Construct the config object automatically from environment variables.

get_fsspec_kwargs()

def get_fsspec_kwargs(
    anonymous: bool,
    kwargs,
) -> typing.Dict[str, typing.Any]

Returns the configuration as kwargs for constructing an fsspec filesystem.

Parameter Type
anonymous bool
kwargs **kwargs