JsonlFile
Package: flyteplugins.jsonl
A file type for JSONL (JSON Lines) files, backed by orjson for fast
serialisation.
Provides streaming read and write methods that process one record at a time
without loading the entire file into memory. Inherits all File
capabilities (remote storage, upload/download, etc.).
Supports zstd-compressed files transparently via extension detection
(.jsonl.zst / .jsonl.zstd).
Example (Async read - compressed or uncompressed):
@env.task
async def process(f: JsonlFile):
async for record in f.iter_records():
print(record)Example (Async write - compressed or uncompressed):
@env.task
async def create() -> JsonlFile:
f = JsonlFile.new_remote("data.jsonl")
async with f.writer() as w:
await w.write({"key": "value"})
return fExample (Sync write - compressed or uncompressed):
@env.task
def create() -> JsonlFile:
f = JsonlFile.new_remote("data.jsonl")
with f.writer_sync() as w:
w.write({"key": "value"})
return fParameters
class JsonlFile(
path: str,
name: typing.Optional[str],
format: str,
hash: typing.Optional[str],
hash_method: typing.Optional[flyte.io._hashing_io.HashMethod],
)Create a new model by parsing and validating input data from keyword arguments.
Raises
ValidationError if the input data cannot be
validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
| Parameter | Type | Description |
|---|---|---|
path |
str |
|
name |
typing.Optional[str] |
|
format |
str |
|
hash |
typing.Optional[str] |
|
hash_method |
typing.Optional[flyte.io._hashing_io.HashMethod] |
Properties
| Property | Type | Description |
|---|---|---|
lazy_uploader |
None |
Methods
| Method | Description |
|---|---|
download() |
Asynchronously download the file to a local path. |
download_sync() |
Synchronously download the file to a local path. |
exists() |
Asynchronously check if the file exists. |
exists_sync() |
Synchronously check if the file exists. |
from_existing_remote() |
Create a File reference from an existing remote file. |
from_local() |
Asynchronously create a new File object from a local file by uploading it to remote storage. |
from_local_sync() |
Synchronously create a new File object from a local file by uploading it to remote storage. |
iter_arrow_batches() |
Stream JSONL as Arrow RecordBatches. |
iter_arrow_batches_sync() |
Sync generator that yields Arrow RecordBatches. |
iter_records() |
Async generator that yields parsed dicts line by line. |
iter_records_sync() |
Sync generator that yields parsed dicts line by line. |
model_post_init() |
This function is meant to behave like a BaseModel method to initialise private attributes. |
named_remote() |
Create a File reference whose remote path is derived deterministically from name. |
new_remote() |
Create a new File reference for a remote file that will be written to. |
open() |
Asynchronously open the file and return a file-like object. |
open_sync() |
Synchronously open the file and return a file-like object. |
pre_init() |
Internal: Pydantic validator to set default name from path. |
schema_match() |
Internal: Check if incoming schema matches File schema. |
writer() |
Async context manager returning a JsonlWriter for streaming writes. |
writer_sync() |
Sync context manager returning a JsonlWriterSync for streaming writes. |
download()
def download(
local_path: Optional[Union[str, Path]],
) -> strAsynchronously download the file to a local path.
Use this when you need to download a remote file to your local filesystem for processing.
Example (Async):
@env.task
async def download_and_process(f: File) -> str:
local_path = await f.download()
# Now process the local file
with open(local_path, "r") as fh:
return fh.read()Example (Download to specific path):
@env.task
async def download_to_path(f: File) -> str:
local_path = await f.download("/tmp/myfile.csv")
return local_path| Parameter | Type | Description |
|---|---|---|
local_path |
Optional[Union[str, Path]] |
The local path to download the file to. If None, a temporary directory will be used and a path will be generated. |
Returns: The absolute path to the downloaded file
download_sync()
def download_sync(
local_path: Optional[Union[str, Path]],
) -> strSynchronously download the file to a local path.
Use this in non-async tasks when you need to download a remote file to your local filesystem.
Example (Sync):
@env.task
def download_and_process_sync(f: File) -> str:
local_path = f.download_sync()
# Now process the local file
with open(local_path, "r") as fh:
return fh.read()Example (Download to specific path):
@env.task
def download_to_path_sync(f: File) -> str:
local_path = f.download_sync("/tmp/myfile.csv")
return local_path| Parameter | Type | Description |
|---|---|---|
local_path |
Optional[Union[str, Path]] |
The local path to download the file to. If None, a temporary directory will be used and a path will be generated. |
Returns: The absolute path to the downloaded file
exists()
def exists()Asynchronously check if the file exists.
Example (Async):
@env.task
async def check_file(f: File) -> bool:
if await f.exists():
print("File exists!")
return True
return FalseReturns: True if the file exists, False otherwise
exists_sync()
def exists_sync()Synchronously check if the file exists.
Use this in non-async tasks or when you need synchronous file existence checking.
Example (Sync):
@env.task
def check_file_sync(f: File) -> bool:
if f.exists_sync():
print("File exists!")
return True
return FalseReturns: True if the file exists, False otherwise
from_existing_remote()
def from_existing_remote(
remote_path: str,
file_cache_key: Optional[str],
) -> File[T]Create a File reference from an existing remote file.
Use this when you want to reference a file that already exists in remote storage without uploading it.
Example:
@env.task
async def process_existing_file() -> str:
file = File.from_existing_remote("s3://my-bucket/data.csv")
async with file.open("rb") as f:
content = await f.read()
return content.decode("utf-8")| Parameter | Type | Description |
|---|---|---|
remote_path |
str |
The remote path to the existing file |
file_cache_key |
Optional[str] |
Optional hash value to use for cache key computation. If not specified, the cache key will be computed based on the file’s attributes (path, name, format). |
Returns: A new File instance pointing to the existing remote file
from_local()
def from_local(
local_path: Union[str, Path],
remote_destination: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Asynchronously create a new File object from a local file by uploading it to remote storage.
Use this in async tasks when you have a local file that needs to be uploaded to remote storage.
Example (Async):
@env.task
async def upload_local_file() -> File:
# Create a local file
async with aiofiles.open("/tmp/data.csv", "w") as f:
await f.write("col1,col2
# Upload to remote storage
remote_file = await File.from_local("/tmp/data.csv")
return remote_fileExample (With specific destination):
@env.task
async def upload_to_specific_path() -> File:
remote_file = await File.from_local("/tmp/data.csv", "s3://my-bucket/data.csv")
return remote_file| Parameter | Type | Description |
|---|---|---|
local_path |
Union[str, Path] |
Path to the local file |
remote_destination |
Optional[str] |
Optional remote path to store the file. If None, a path will be automatically generated. |
hash_method |
Optional[HashMethod | str] |
Optional HashMethod or string to use for cache key computation. If a string is provided, it will be used as a precomputed cache key. If a HashMethod is provided, it will compute the hash during upload. If not specified, the cache key will be based on file attributes. |
Returns
A new File instance pointing to the uploaded remote file
from_local_sync()
def from_local_sync(
local_path: Union[str, Path],
remote_destination: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Synchronously create a new File object from a local file by uploading it to remote storage.
Use this in non-async tasks when you have a local file that needs to be uploaded to remote storage.
Example (Sync):
@env.task
def upload_local_file_sync() -> File:
# Create a local file
with open("/tmp/data.csv", "w") as f:
f.write("col1,col2
# Upload to remote storage
remote_file = File.from_local_sync("/tmp/data.csv")
return remote_fileExample (With specific destination):
@env.task
def upload_to_specific_path() -> File:
remote_file = File.from_local_sync("/tmp/data.csv", "s3://my-bucket/data.csv")
return remote_file| Parameter | Type | Description |
|---|---|---|
local_path |
Union[str, Path] |
Path to the local file |
remote_destination |
Optional[str] |
Optional remote path to store the file. If None, a path will be automatically generated. |
hash_method |
Optional[HashMethod | str] |
Optional HashMethod or string to use for cache key computation. If a string is provided, it will be used as a precomputed cache key. If a HashMethod is provided, it will compute the hash during upload. If not specified, the cache key will be based on file attributes. |
Returns
A new File instance pointing to the uploaded remote file
iter_arrow_batches()
def iter_arrow_batches(
batch_size: int,
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> AsyncGenerator[Any, None]Stream JSONL as Arrow RecordBatches.
Memory usage is bounded by batch_size.
| Parameter | Type | Description |
|---|---|---|
batch_size |
int |
|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
iter_arrow_batches_sync()
def iter_arrow_batches_sync(
batch_size: int,
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> Generator[Any, None, None]Sync generator that yields Arrow RecordBatches.
Memory usage is bounded by batch_size.
| Parameter | Type | Description |
|---|---|---|
batch_size |
int |
|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
iter_records()
def iter_records(
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> AsyncGenerator[dict[str, Any], None]Async generator that yields parsed dicts line by line.
| Parameter | Type | Description |
|---|---|---|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
iter_records_sync()
def iter_records_sync(
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> Generator[dict[str, Any], None, None]Sync generator that yields parsed dicts line by line.
| Parameter | Type | Description |
|---|---|---|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
model_post_init()
def model_post_init(
context: Any,
)This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
| Parameter | Type | Description |
|---|---|---|
context |
Any |
The context. |
named_remote()
def named_remote(
name: str,
) -> File[T]Create a File reference whose remote path is derived deterministically from name.
Unlike new_remote, which generates a random path on every call, this method
produces the same path for the same name within a given task execution. This makes
it safe across retries: the first attempt uploads to the path and subsequent retries
resolve to the identical location without re-uploading.
The path is optionally namespaced by the node ID extracted from the backend raw-data path, which follows the convention:
{run_name}-{node_id}-{attempt_index}
If extraction fails, the function falls back to the run base directory alone.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Plain filename (e.g., “data.csv”). Must not contain path separators. |
Returns: A File instance whose path is stable across retries.
new_remote()
def new_remote(
file_name: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Create a new File reference for a remote file that will be written to.
Use this when you want to create a new file and write to it directly without creating a local file first.
Example (Async):
@env.task
async def create_csv() -> File:
df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
file = File.new_remote()
async with file.open("wb") as f:
df.to_csv(f)
return file| Parameter | Type | Description |
|---|---|---|
file_name |
Optional[str] |
Optional string specifying a remote file name. If not set, a generated file name will be returned. |
hash_method |
Optional[HashMethod | str] |
Optional HashMethod or string to use for cache key computation. If a string is provided, it will be used as a precomputed cache key. If a HashMethod is provided, it will be used to compute the hash as data is written. |
Returns: A new File instance with a generated remote path
open()
def open(
mode: str,
block_size: Optional[int],
cache_type: str,
cache_options: Optional[dict],
compression: Optional[str],
kwargs,
) -> AsyncGenerator[Union[AsyncWritableFile, AsyncReadableFile, 'HashingWriter'], None]Asynchronously open the file and return a file-like object.
Use this method in async tasks to read from or write to files directly.
Example (Async Read):
@env.task
async def read_file(f: File) -> str:
async with f.open("rb") as fh:
content = bytes(await fh.read())
return content.decode("utf-8")Example (Async Write):
@env.task
async def write_file() -> File:
f = File.new_remote()
async with f.open("wb") as fh:
await fh.write(b"Hello, World!")
return fExample (Streaming Read):
@env.task
async def stream_read(f: File) -> str:
content_parts = []
async with f.open("rb", block_size=1024) as fh:
while True:
chunk = await fh.read()
if not chunk:
break
content_parts.append(chunk)
return b"".join(content_parts).decode("utf-8")| Parameter | Type | Description |
|---|---|---|
mode |
str |
The mode to open the file in (default: ‘rb’). Common modes: ‘rb’ (read binary), ‘wb’ (write binary), ‘rt’ (read text), ‘wt’ (write text) |
block_size |
Optional[int] |
Size of blocks for reading in bytes. Useful for streaming large files. |
cache_type |
str |
Caching mechanism to use (‘readahead’, ‘mmap’, ‘bytes’, ’none’) |
cache_options |
Optional[dict] |
Dictionary of options for the cache |
compression |
Optional[str] |
Compression format or None for auto-detection |
kwargs |
**kwargs |
Returns: An async file-like object that can be used with async read/write operations
open_sync()
def open_sync(
mode: str,
block_size: Optional[int],
cache_type: str,
cache_options: Optional[dict],
compression: Optional[str],
kwargs,
) -> Generator[IO[Any], None, None]Synchronously open the file and return a file-like object.
Use this method in non-async tasks to read from or write to files directly.
Example (Sync Read):
@env.task
def read_file_sync(f: File) -> str:
with f.open_sync("rb") as fh:
content = fh.read()
return content.decode("utf-8")Example (Sync Write):
@env.task
def write_file_sync() -> File:
f = File.new_remote()
with f.open_sync("wb") as fh:
fh.write(b"Hello, World!")
return f| Parameter | Type | Description |
|---|---|---|
mode |
str |
The mode to open the file in (default: ‘rb’). Common modes: ‘rb’ (read binary), ‘wb’ (write binary), ‘rt’ (read text), ‘wt’ (write text) |
block_size |
Optional[int] |
Size of blocks for reading in bytes. Useful for streaming large files. |
cache_type |
str |
Caching mechanism to use (‘readahead’, ‘mmap’, ‘bytes’, ’none’) |
cache_options |
Optional[dict] |
Dictionary of options for the cache |
compression |
Optional[str] |
Compression format or None for auto-detection |
kwargs |
**kwargs |
Returns: A file-like object that can be used with standard read/write operations
pre_init()
def pre_init(
data,
)Internal: Pydantic validator to set default name from path. Not intended for direct use.
| Parameter | Type | Description |
|---|---|---|
data |
schema_match()
def schema_match(
incoming: dict,
)Internal: Check if incoming schema matches File schema. Not intended for direct use.
| Parameter | Type | Description |
|---|---|---|
incoming |
dict |
writer()
def writer(
flush_bytes: int,
compression_level: int,
) -> AsyncGenerator[JsonlWriter, None]Async context manager returning a JsonlWriter for streaming writes.
If the file path ends in .jsonl.zst, output is zstd-compressed.
| Parameter | Type | Description |
|---|---|---|
flush_bytes |
int |
Buffer flush threshold in bytes (default 1 MB). |
compression_level |
int |
Zstd compression level (default 3). Only used for .jsonl.zst paths. Higher = smaller files, slower writes. |
writer_sync()
def writer_sync(
flush_bytes: int,
compression_level: int,
) -> Generator[JsonlWriterSync, None, None]Sync context manager returning a JsonlWriterSync for streaming writes.
If the file path ends in .jsonl.zst, output is zstd-compressed.
| Parameter | Type | Description |
|---|---|---|
flush_bytes |
int |
Buffer flush threshold in bytes (default 1 MB). |
compression_level |
int |
Zstd compression level (default 3). Only used for .jsonl.zst paths. Higher = smaller files, slower writes. |