1.16.10

flytekit.types.directory.types

Directory

Classes

Class Description
FlyteDirToMultipartBlobTransformer This transformer handles conversion between the Python native FlyteDirectory class defined above, and the Flyte.
FlyteDirectory

Methods

Method Description
noop()

Variables

Property Type Description
MESSAGEPACK str
T TypeVar

Methods

noop()

def noop()

flytekit.types.directory.types.FlyteDirToMultipartBlobTransformer

This transformer handles conversion between the Python native FlyteDirectory class defined above, and the Flyte IDL literal/type of Multipart Blob. Please see the FlyteDirectory comments for additional information.

caution:

The transformer will not check if the given path is actually a directory. This is because the path could be a remote reference.

def FlyteDirToMultipartBlobTransformer()

Methods

Method Description
assert_type()
async_to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
async_to_python_value() Converts the given Literal to a Python Type.
dict_to_flyte_directory()
from_binary_idl() If the input is from flytekit, the Life Cycle will be as follows:.
from_generic_idl() If the input is from Flyte Console, the Life Cycle will be as follows:.
get_format()
get_literal_type() Converts the python type to a Flyte LiteralType.
guess_python_type() Converts the Flyte LiteralType to a python object type.
isinstance_generic()
to_html() Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div.
to_literal() Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type.
to_python_value() Converts the given Literal to a Python Type.

assert_type()

def assert_type(
    t: typing.Type[FlyteDirectory],
    v: typing.Union[FlyteDirectory, os.PathLike, str],
)
Parameter Type Description
t typing.Type[FlyteDirectory]
v typing.Union[FlyteDirectory, os.PathLike, str]

async_to_literal()

def async_to_literal(
    ctx: FlyteContext,
    python_val: FlyteDirectory,
    python_type: typing.Type[FlyteDirectory],
    expected: LiteralType,
) -> Literal

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type Description
ctx FlyteContext A FlyteContext, useful in accessing the filesystem and other attributes
python_val FlyteDirectory The actual value to be transformed
python_type typing.Type[FlyteDirectory] The assumed type of the value (this matches the declared type on the function)
expected LiteralType Expected Literal Type

async_to_python_value()

def async_to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: typing.Type[FlyteDirectory],
) -> FlyteDirectory

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised

Parameter Type Description
ctx FlyteContext FlyteContext
lv Literal The received literal Value
expected_python_type typing.Type[FlyteDirectory] Expected native python type that should be returned

dict_to_flyte_directory()

def dict_to_flyte_directory(
    dict_obj: typing.Dict[str, str],
    expected_python_type: typing.Type[FlyteDirectory],
) -> FlyteDirectory
Parameter Type Description
dict_obj typing.Dict[str, str]
expected_python_type typing.Type[FlyteDirectory]

from_binary_idl()

def from_binary_idl(
    binary_idl_object: Binary,
    expected_python_type: typing.Type[FlyteDirectory],
) -> FlyteDirectory

If the input is from flytekit, the Life Cycle will be as follows:

Life Cycle: binary IDL -> resolved binary -> bytes -> expected Python object (flytekit customized (propeller processing) (flytekit binary IDL) (flytekit customized serialization) deserialization)

Example Code: @dataclass class DC: fd: FlyteDirectory

@workflow def wf(dc: DC): t_fd(dc.fd)

Note:

  • The deserialization is the same as put a flyte directory in a dataclass, which will deserialize by the mashumaro’s API.

Related PR:

Parameter Type Description
binary_idl_object Binary
expected_python_type typing.Type[FlyteDirectory]

from_generic_idl()

def from_generic_idl(
    generic: Struct,
    expected_python_type: typing.Type[FlyteDirectory],
) -> FlyteDirectory

If the input is from Flyte Console, the Life Cycle will be as follows:

Life Cycle: json str -> protobuf struct -> resolved protobuf struct -> expected Python object (console user input) (console output) (propeller) (flytekit customized deserialization)

Example Code: @dataclass class DC: fd: FlyteDirectory

@workflow def wf(dc: DC): t_fd(dc.fd)

Note:

  • The deserialization is the same as put a flyte directory in a dataclass, which will deserialize by the mashumaro’s API.

Related PR:

Parameter Type Description
generic Struct
expected_python_type typing.Type[FlyteDirectory]

get_format()

def get_format(
    t: typing.Type[FlyteDirectory],
) -> str
Parameter Type Description
t typing.Type[FlyteDirectory]

get_literal_type()

def get_literal_type(
    t: typing.Type[FlyteDirectory],
) -> LiteralType

Converts the python type to a Flyte LiteralType

Parameter Type Description
t typing.Type[FlyteDirectory]

guess_python_type()

def guess_python_type(
    literal_type: LiteralType,
) -> typing.Type[FlyteDirectory[typing.Any]]

Converts the Flyte LiteralType to a python object type.

Parameter Type Description
literal_type LiteralType

isinstance_generic()

def isinstance_generic(
    obj,
    generic_alias,
)
Parameter Type Description
obj
generic_alias

to_html()

def to_html(
    ctx: FlyteContext,
    python_val: T,
    expected_python_type: Type[T],
) -> str

Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div

Parameter Type Description
ctx FlyteContext
python_val T
expected_python_type Type[T]

to_literal()

def to_literal(
    ctx: FlyteContext,
    python_val: typing.Any,
    python_type: Type[T],
    expected: LiteralType,
) -> Literal

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch

Parameter Type Description
ctx FlyteContext A FlyteContext, useful in accessing the filesystem and other attributes
python_val typing.Any The actual value to be transformed
python_type Type[T] The assumed type of the value (this matches the declared type on the function)
expected LiteralType Expected Literal Type

to_python_value()

def to_python_value(
    ctx: FlyteContext,
    lv: Literal,
    expected_python_type: Type[T],
) -> Optional[T]

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised

Parameter Type Description
ctx FlyteContext FlyteContext
lv Literal The received literal Value
expected_python_type Type[T] Expected native python type that should be returned

Properties

Property Type Description
is_async
name
python_type
This returns the python type
type_assertions_enabled
Indicates if the transformer wants type assertions to be enabled at the core type engine layer

flytekit.types.directory.types.FlyteDirectory

class FlyteDirectory(
    path: typing.Union[str, os.PathLike],
    downloader: typing.Optional[typing.Callable],
    remote_directory: typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]],
)
Parameter Type Description
path typing.Union[str, os.PathLike] The source path that users are expected to call open() on
downloader typing.Optional[typing.Callable] Optional function that can be passed that used to delay downloading of the actual fil until a user actually calls open().
remote_directory typing.Optional[typing.Union[os.PathLike, str, typing.Literal[False]]] If the user wants to return something and also specify where it should be uploaded to. If set to False, then flytekit will not upload the directory to the remote store.

Methods

Method Description
crawl() Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”.
deserialize_flyte_dir()
download()
extension()
from_dict()
from_json()
from_source() Create a new FlyteDirectory object with the remote source set to the input.
listdir() This function will list all files and folders in the given directory, but without downloading the contents.
new() Create a new FlyteDirectory object in current Flyte working directory.
new_dir() This will create a new folder under the current folder.
new_file() This will create a new file under the current folder.
new_remote() Create a new FlyteDirectory object using the currently configured default remote in the context (i.
schema()
serialize_flyte_dir()
to_dict()
to_json()

crawl()

def crawl(
    maxdepth: typing.Optional[int],
    topdown: bool,
    kwargs,
) -> Generator[Tuple[typing.Union[str, os.PathLike[Any]], typing.Dict[Any, Any]], None, None]

Crawl returns a generator of all files prefixed by any sub-folders under the given “FlyteDirectory”. if details=True is passed, then it will return a dictionary as specified by fsspec.

Example:

>>> list(fd.crawl())
[("/base", "file1"), ("/base", "dir1/file1"), ("/base", "dir2/file1"), ("/base", "dir1/dir/file1")]

>>> list(x.crawl(detail=True))
[('/tmp/test', {'my-dir/ab.py': {'name': '/tmp/test/my-dir/ab.py', 'size': 0, 'type': 'file',
 'created': 1677720780.2318847, 'islink': False, 'mode': 33188, 'uid': 501, 'gid': 0,
  'mtime': 1677720780.2317934, 'ino': 1694329, 'nlink': 1}})]
Parameter Type Description
maxdepth typing.Optional[int]
topdown bool
kwargs **kwargs

deserialize_flyte_dir()

def deserialize_flyte_dir(
    info,
) -> FlyteDirectory
Parameter Type Description
info

download()

def download()

extension()

def extension()

from_dict()

def from_dict(
    kvs: typing.Union[dict, list, str, int, float, bool, NoneType],
    infer_missing,
) -> ~A
Parameter Type Description
kvs typing.Union[dict, list, str, int, float, bool, NoneType]
infer_missing

from_json()

def from_json(
    s: typing.Union[str, bytes, bytearray],
    parse_float,
    parse_int,
    parse_constant,
    infer_missing,
    kw,
) -> ~A
Parameter Type Description
s typing.Union[str, bytes, bytearray]
parse_float
parse_int
parse_constant
infer_missing
kw

from_source()

def from_source(
    source: str | os.PathLike,
) -> FlyteDirectory

Create a new FlyteDirectory object with the remote source set to the input

Parameter Type Description
source str | os.PathLike

listdir()

def listdir(
    directory: FlyteDirectory,
) -> typing.List[typing.Union[FlyteDirectory, FlyteFile]]

This function will list all files and folders in the given directory, but without downloading the contents. In addition, it will return a list of FlyteFile and FlyteDirectory objects that have ability to lazily download the contents of the file/folder. For example:

entity = FlyteDirectory.listdir(directory)
for e in entity:
    print("s3 object:", e.remote_source)
    # s3 object: s3://test-flytedir/file1.txt
    # s3 object: s3://test-flytedir/file2.txt
    # s3 object: s3://test-flytedir/sub_dir

open(entity[0], "r")  # This will download the file to the local disk.
open(entity[0], "r")  # flytekit will read data from the local disk if you open it again.
Parameter Type Description
directory FlyteDirectory

new()

def new(
    dirname: str | os.PathLike,
) -> FlyteDirectory

Create a new FlyteDirectory object in current Flyte working directory.

Parameter Type Description
dirname str | os.PathLike

new_dir()

def new_dir(
    name: typing.Optional[str],
) -> FlyteDirectory

This will create a new folder under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.

Parameter Type Description
name typing.Optional[str]

new_file()

def new_file(
    name: typing.Optional[str],
) -> FlyteFile

This will create a new file under the current folder. If given a name, it will use the name given, otherwise it’ll pick a random string. Collisions are not checked.

Parameter Type Description
name typing.Optional[str]

new_remote()

def new_remote(
    stem: typing.Optional[str],
    alt: typing.Optional[str],
) -> FlyteDirectory

Create a new FlyteDirectory object using the currently configured default remote in the context (i.e. the raw_output_prefix configured in the current FileAccessProvider object in the context). This is used if you explicitly have a folder somewhere that you want to create files under. If you want to write a whole folder, you can let your task return a FlyteDirectory object, and let flytekit handle the uploading.

Parameter Type Description
stem typing.Optional[str] A stem to append to the path as the final prefix “directory”.
alt typing.Optional[str] An alternate first member of the prefix to use instead of the default. :return FlyteDirectory: A new FlyteDirectory object that points to a remote location.

schema()

def schema(
    infer_missing: bool,
    only,
    exclude,
    many: bool,
    context,
    load_only,
    dump_only,
    partial: bool,
    unknown,
) -> SchemaType[A]
Parameter Type Description
infer_missing bool
only
exclude
many bool
context
load_only
dump_only
partial bool
unknown

serialize_flyte_dir()

def serialize_flyte_dir()

to_dict()

def to_dict(
    encode_json,
) -> typing.Dict[str, typing.Union[dict, list, str, int, float, bool, NoneType]]
Parameter Type Description
encode_json

to_json()

def to_json(
    skipkeys: bool,
    ensure_ascii: bool,
    check_circular: bool,
    allow_nan: bool,
    indent: typing.Union[int, str, NoneType],
    separators: typing.Tuple[str, str],
    default: typing.Callable,
    sort_keys: bool,
    kw,
) -> str
Parameter Type Description
skipkeys bool
ensure_ascii bool
check_circular bool
allow_nan bool
indent typing.Union[int, str, NoneType]
separators typing.Tuple[str, str]
default typing.Callable
sort_keys bool
kw

Properties

Property Type Description
downloaded
remote_directory
remote_source
If this is an input to a task, and the original path is s3://something, flytekit will download the
directory for the user. In case the user wants access to the original path, it will be here.
sep