Files and directories

Flyte provides the flyte.io.File and flyte.io.Dir types to represent files and directories, respectively. Together with flyte.io.DataFrame they constitute the offloaded data types - unlike materialized types like data classes, these pass references rather than full data content.

A variable of an offloaded type does not contain its actual data, but rather a reference to the data. The actual data is stored in the internal blob store of your Union/Flyte instance. When a variable of an offloaded type is first created, its data is uploaded to the blob store. It can then be passed from task to task as a reference. The actual data is only downloaded from the blob stored when the task needs to access it, for example, when the task calls open() on a File or Dir object.

This allows Flyte to efficiently handle large files and directories without needing to transfer the data unnecessarily. Even very large data objects like video files and DNA datasets can be passed efficiently between tasks.

The File and Dir classes provide both sync and async methods to interact with the data.

Example usage

The examples below show the basic use-cases of uploading files and directories created locally, and using them as inputs to a task.

file_and_dir.py
import asyncio
import tempfile
from pathlib import Path

import flyte
from flyte.io import Dir, File

env = flyte.TaskEnvironment(name="files-and-folders")


@env.task
async def write_file(name: str) -> File:

    # Create a file and write some content to it
    with open("test.txt", "w") as f:
        f.write(f"hello world {name}")

    # Upload the file using flyte
    uploaded_file_obj = await File.from_local("test.txt")
    return uploaded_file_obj

The upload happens when the File.from_local command is called. Because the upload would otherwise block execution, File.from_local is implemented as an async function. The Flyte SDK frequently uses this class constructor pattern, so you will see it with other types as well.

This is a slightly more complicated task that calls the task above to produce File objects.

These are assembled into a directory and the Dir object is returned, also via invoking from_local.

file_and_dir.py
@env.task
async def write_and_check_files() -> Dir:
    coros = []
    for name in ["Alice", "Bob", "Eve"]:
        coros.append(write_file(name=name))

    vals = await asyncio.gather(*coros)
    temp_dir = tempfile.mkdtemp()
    for file in vals:
        async with file.open("rb") as fh:
            contents = await fh.read()
            # Convert bytes to string
            contents_str = contents.decode('utf-8') if isinstance(contents, bytes) else str(contents)
            print(f"File {file.path} contents: {contents_str}")
            new_file = Path(temp_dir) / file.name
            with open(new_file, "w") as out:  # noqa: ASYNC230
                out.write(contents_str)
    print(f"Files written to {temp_dir}")

    # walk the directory and ls
    for path in Path(temp_dir).iterdir():
        print(f"File: {path.name}")

    my_dir = await Dir.from_local(temp_dir)
    return my_dir

Finally, these tasks show how to use an offloaded type as an input. Helper functions like walk and open have been added to the objects and do what you might expect.

file_and_dir.py
@env.task
async def check_dir(my_dir: Dir):
    print(f"Dir {my_dir.path} contents:")
    async for file in my_dir.walk():
        print(f"File: {file.name}")
        async with file.open("rb") as fh:
            contents = await fh.read()
            # Convert bytes to string
            contents_str = contents.decode('utf-8') if isinstance(contents, bytes) else str(contents)
            print(f"Contents: {contents_str}")


@env.task
async def create_and_check_dir():
    my_dir = await write_and_check_files()
    await check_dir(my_dir=my_dir)


if __name__ == "__main__":
    flyte.init_from_config()
    r = flyte.run(create_and_check_dir)
    print(r.name)
    print(r.url)
    r.wait()

JSONL files

The flyteplugins-jsonl package extends File and Dir with JSONL-aware types: JsonlFile and JsonlDir. They add streaming record-level read and write on top of the standard file/directory capabilities, with optional zstd compression and automatic shard rotation for large datasets.

pip install flyteplugins-jsonl

Setup

jsonl.py
import flyte
from flyteplugins.jsonl import JsonlDir, JsonlFile

env = flyte.TaskEnvironment(
    name="jsonl-examples",
    image=flyte.Image.from_debian_base(name="jsonl").with_pip_packages(
        "flyteplugins-jsonl>=2.0.0"
    ),
    resources=flyte.Resources(cpu="1", memory="1Gi"),
)

JsonlFile

JsonlFile is a File subclass for single JSONL files. Use its async context manager to write records incrementally without requiring the entire dataset to be loaded into memory:

jsonl.py
@env.task
async def write_records() -> JsonlFile:
    """Write a sequence of records to a JsonlFile."""
    records = [
        {"id": 1, "name": "Alice", "score": 95.5},
        {"id": 2, "name": "Bob", "score": 87.0},
        {"id": 3, "name": "Charlie", "score": 91.2},
    ]
    out = JsonlFile(path="results.jsonl")
    async with out.writer() as writer:
        for record in records:
            await writer.write(record)
    return out

Reading is equally streaming:

jsonl.py
@env.task
async def read_records(data: JsonlFile) -> int:
    """Read records from a JsonlFile and return the count."""
    count = 0
    async for record in data.iter_records():
        print(record)
        count += 1
    return count

Compressed files (.jsonl.zst / .jsonl.zstd) are handled transparently based on the file extension.

JsonlDir

JsonlDir is a Dir subclass that shards records across multiple JSONL files. When a shard reaches the max_bytes threshold, a new shard is opened automatically. This keeps individual files at a manageable size even for very large datasets:

jsonl.py
@env.task
async def write_large_dataset() -> JsonlDir:
    """Write a large dataset to a sharded JsonlDir.

    JsonlDir automatically rotates to a new shard file once the
    current shard reaches the size limit.
    """
    out = JsonlDir(path="dataset/")
    async with out.writer(max_bytes=1024 * 1024) as writer:  # 1 MB shards
        for i in range(1000):
            await writer.write({"index": i, "value": i * i})
    return out

Reading iterates across all shards transparently:

jsonl.py
@env.task
async def sum_values(dataset: JsonlDir) -> int:
    """Read all records across all shards and compute a sum."""
    total = 0
    async for record in dataset.iter_records():
        total += record["value"]
    return total