JsonlFile
Package: flyteplugins.jsonl
A file type for JSONL (JSON Lines) files, backed by orjson for fast
serialisation.
Provides streaming read and write methods that process one record at a time
without loading the entire file into memory. Inherits all :class:File
capabilities (remote storage, upload/download, etc.).
Supports zstd-compressed files transparently via extension detection
(.jsonl.zst / .jsonl.zstd).
Example (Async read - compressed or uncompressed):
@env.task
async def process(f: JsonlFile):
async for record in f.iter_records():
print(record)Example (Async write - compressed or uncompressed):
@env.task
async def create() -> JsonlFile:
f = JsonlFile.new_remote("data.jsonl")
async with f.writer() as w:
await w.write({"key": "value"})
return fExample (Sync write - compressed or uncompressed):
@env.task
def create() -> JsonlFile:
f = JsonlFile.new_remote("data.jsonl")
with f.writer_sync() as w:
w.write({"key": "value"})
return fclass JsonlFile(
data: Any,
)Create a new model by parsing and validating input data from keyword arguments.
Raises
ValidationError if the input data cannot be
validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
| Parameter | Type | Description |
|---|---|---|
data |
Any |
Properties
| Property | Type | Description |
|---|---|---|
lazy_uploader |
None |
|
model_extra |
None |
Get extra fields set during validation. Returns: A dictionary of extra fields, or None if config.extra is not set to "allow". |
model_fields_set |
None |
Returns the set of fields that have been explicitly set on this model instance. Returns: A set of strings representing the fields that have been set, i.e. that were not filled from defaults. |
Methods
| Method | Description |
|---|---|
construct() |
|
copy() |
Returns a copy of the model. |
dict() |
|
download() |
Asynchronously download the file to a local path. |
download_sync() |
Synchronously download the file to a local path. |
exists() |
Asynchronously check if the file exists. |
exists_sync() |
Synchronously check if the file exists. |
from_existing_remote() |
Create a File reference from an existing remote file. |
from_local() |
Asynchronously create a new File object from a local file by uploading it to remote storage. |
from_local_sync() |
Synchronously create a new File object from a local file by uploading it to remote storage. |
from_orm() |
|
iter_arrow_batches() |
Stream JSONL as Arrow RecordBatches. |
iter_arrow_batches_sync() |
Sync generator that yields Arrow RecordBatches. |
iter_records() |
Async generator that yields parsed dicts line by line. |
iter_records_sync() |
Sync generator that yields parsed dicts line by line. |
json() |
|
model_construct() |
Creates a new instance of the Model class with validated data. |
model_copy() |
Returns a copy of the model. |
model_dump() |
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude. |
model_dump_json() |
Generates a JSON representation of the model using Pydantic’s to_json method. |
model_json_schema() |
Generates a JSON schema for a model class. |
model_parametrized_name() |
Compute the class name for parametrizations of generic classes. |
model_post_init() |
This function is meant to behave like a BaseModel method to initialise private attributes. |
model_rebuild() |
Try to rebuild the pydantic-core schema for the model. |
model_validate() |
Validate a pydantic model instance. |
model_validate_json() |
Validate the given JSON data against the Pydantic model. |
model_validate_strings() |
Validate the given object with string data against the Pydantic model. |
named_remote() |
Create a File reference whose remote path is derived deterministically from name. |
new_remote() |
Create a new File reference for a remote file that will be written to. |
open() |
Asynchronously open the file and return a file-like object. |
open_sync() |
Synchronously open the file and return a file-like object. |
parse_file() |
|
parse_obj() |
|
parse_raw() |
|
pre_init() |
Internal: Pydantic validator to set default name from path. |
schema() |
|
schema_json() |
|
schema_match() |
Internal: Check if incoming schema matches File schema. |
update_forward_refs() |
|
validate() |
|
writer() |
Async context manager returning a :class:JsonlWriter for streaming writes. |
writer_sync() |
Sync context manager returning a :class:JsonlWriterSync for streaming writes. |
construct()
def construct(
_fields_set: set[str] | None,
values: Any,
) -> Self| Parameter | Type | Description |
|---|---|---|
_fields_set |
set[str] | None |
|
values |
Any |
copy()
def copy(
include: AbstractSetIntStr | MappingIntStrAny | None,
exclude: AbstractSetIntStr | MappingIntStrAny | None,
update: Dict[str, Any] | None,
deep: bool,
) -> SelfReturns a copy of the model.
This method is now deprecated; use model_copy instead.
If you need include or exclude, use:
data = self.model_dump(include=include, exclude=exclude, round_trip=True)
data = {**data, **(update or {})}
copied = self.model_validate(data)| Parameter | Type | Description |
|---|---|---|
include |
AbstractSetIntStr | MappingIntStrAny | None |
Optional set or mapping specifying which fields to include in the copied model. |
exclude |
AbstractSetIntStr | MappingIntStrAny | None |
Optional set or mapping specifying which fields to exclude in the copied model. |
update |
Dict[str, Any] | None |
Optional dictionary of field-value pairs to override field values in the copied model. |
deep |
bool |
If True, the values of fields that are Pydantic models will be deep-copied. |
dict()
def dict(
include: IncEx | None,
exclude: IncEx | None,
by_alias: bool,
exclude_unset: bool,
exclude_defaults: bool,
exclude_none: bool,
) -> Dict[str, Any]| Parameter | Type | Description |
|---|---|---|
include |
IncEx | None |
|
exclude |
IncEx | None |
|
by_alias |
bool |
|
exclude_unset |
bool |
|
exclude_defaults |
bool |
|
exclude_none |
bool |
download()
def download(
local_path: Optional[Union[str, Path]],
) -> strAsynchronously download the file to a local path.
Use this when you need to download a remote file to your local filesystem for processing.
Example (Async):
@env.task
async def download_and_process(f: File) -> str:
local_path = await f.download()
# Now process the local file
with open(local_path, "r") as fh:
return fh.read()Example (Download to specific path):
@env.task
async def download_to_path(f: File) -> str:
local_path = await f.download("/tmp/myfile.csv")
return local_path| Parameter | Type | Description |
|---|---|---|
local_path |
Optional[Union[str, Path]] |
The local path to download the file to. If None, a temporary directory will be used and a path will be generated. |
download_sync()
def download_sync(
local_path: Optional[Union[str, Path]],
) -> strSynchronously download the file to a local path.
Use this in non-async tasks when you need to download a remote file to your local filesystem.
Example (Sync):
@env.task
def download_and_process_sync(f: File) -> str:
local_path = f.download_sync()
# Now process the local file
with open(local_path, "r") as fh:
return fh.read()Example (Download to specific path):
@env.task
def download_to_path_sync(f: File) -> str:
local_path = f.download_sync("/tmp/myfile.csv")
return local_path| Parameter | Type | Description |
|---|---|---|
local_path |
Optional[Union[str, Path]] |
The local path to download the file to. If None, a temporary directory will be used and a path will be generated. |
exists()
def exists()Asynchronously check if the file exists.
Example (Async):
@env.task
async def check_file(f: File) -> bool:
if await f.exists():
print("File exists!")
return True
return FalseReturns: True if the file exists, False otherwise
exists_sync()
def exists_sync()Synchronously check if the file exists.
Use this in non-async tasks or when you need synchronous file existence checking.
Example (Sync):
@env.task
def check_file_sync(f: File) -> bool:
if f.exists_sync():
print("File exists!")
return True
return FalseReturns: True if the file exists, False otherwise
from_existing_remote()
def from_existing_remote(
remote_path: str,
file_cache_key: Optional[str],
) -> File[T]Create a File reference from an existing remote file.
Use this when you want to reference a file that already exists in remote storage without uploading it.
Example:
@env.task
async def process_existing_file() -> str:
file = File.from_existing_remote("s3://my-bucket/data.csv")
async with file.open("rb") as f:
content = await f.read()
return content.decode("utf-8")| Parameter | Type | Description |
|---|---|---|
remote_path |
str |
The remote path to the existing file |
file_cache_key |
Optional[str] |
Optional hash value to use for cache key computation. If not specified, the cache key will be computed based on the file’s attributes (path, name, format). |
from_local()
def from_local(
local_path: Union[str, Path],
remote_destination: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Asynchronously create a new File object from a local file by uploading it to remote storage.
Use this in async tasks when you have a local file that needs to be uploaded to remote storage.
Example (Async):
@env.task
async def upload_local_file() -> File:
# Create a local file
async with aiofiles.open("/tmp/data.csv", "w") as f:
await f.write("col1,col2
# Upload to remote storage
remote_file = await File.from_local("/tmp/data.csv")
return remote_fileExample (With specific destination):
@env.task
async def upload_to_specific_path() -> File:
remote_file = await File.from_local("/tmp/data.csv", "s3://my-bucket/data.csv")
return remote_file| Parameter | Type | Description |
|---|---|---|
local_path |
Union[str, Path] |
Path to the local file |
remote_destination |
Optional[str] |
Optional remote path to store the file. If None, a path will be automatically generated. |
hash_method |
Optional[HashMethod | str] |
Optional HashMethod or string to use for cache key computation. If a string is provided, it will be used as a precomputed cache key. If a HashMethod is provided, it will compute the hash during upload. If not specified, the cache key will be based on file attributes. |
from_local_sync()
def from_local_sync(
local_path: Union[str, Path],
remote_destination: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Synchronously create a new File object from a local file by uploading it to remote storage.
Use this in non-async tasks when you have a local file that needs to be uploaded to remote storage.
Example (Sync):
@env.task
def upload_local_file_sync() -> File:
# Create a local file
with open("/tmp/data.csv", "w") as f:
f.write("col1,col2
# Upload to remote storage
remote_file = File.from_local_sync("/tmp/data.csv")
return remote_fileExample (With specific destination):
@env.task
def upload_to_specific_path() -> File:
remote_file = File.from_local_sync("/tmp/data.csv", "s3://my-bucket/data.csv")
return remote_file| Parameter | Type | Description |
|---|---|---|
local_path |
Union[str, Path] |
Path to the local file |
remote_destination |
Optional[str] |
Optional remote path to store the file. If None, a path will be automatically generated. |
hash_method |
Optional[HashMethod | str] |
Optional HashMethod or string to use for cache key computation. If a string is provided, it will be used as a precomputed cache key. If a HashMethod is provided, it will compute the hash during upload. If not specified, the cache key will be based on file attributes. |
from_orm()
def from_orm(
obj: Any,
) -> Self| Parameter | Type | Description |
|---|---|---|
obj |
Any |
iter_arrow_batches()
def iter_arrow_batches(
batch_size: int,
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> AsyncGenerator[Any, None]Stream JSONL as Arrow RecordBatches.
Memory usage is bounded by batch_size.
| Parameter | Type | Description |
|---|---|---|
batch_size |
int |
|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
iter_arrow_batches_sync()
def iter_arrow_batches_sync(
batch_size: int,
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> Generator[Any, None, None]Sync generator that yields Arrow RecordBatches.
Memory usage is bounded by batch_size.
| Parameter | Type | Description |
|---|---|---|
batch_size |
int |
|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
iter_records()
def iter_records(
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> AsyncGenerator[dict[str, Any], None]Async generator that yields parsed dicts line by line.
| Parameter | Type | Description |
|---|---|---|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
iter_records_sync()
def iter_records_sync(
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> Generator[dict[str, Any], None, None]Sync generator that yields parsed dicts line by line.
| Parameter | Type | Description |
|---|---|---|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
json()
def json(
include: IncEx | None,
exclude: IncEx | None,
by_alias: bool,
exclude_unset: bool,
exclude_defaults: bool,
exclude_none: bool,
encoder: Callable[[Any], Any] | None,
models_as_dict: bool,
dumps_kwargs: Any,
) -> str| Parameter | Type | Description |
|---|---|---|
include |
IncEx | None |
|
exclude |
IncEx | None |
|
by_alias |
bool |
|
exclude_unset |
bool |
|
exclude_defaults |
bool |
|
exclude_none |
bool |
|
encoder |
Callable[[Any], Any] | None |
|
models_as_dict |
bool |
|
dumps_kwargs |
Any |
model_construct()
def model_construct(
_fields_set: set[str] | None,
values: Any,
) -> SelfCreates a new instance of the Model class with validated data.
Creates a new model setting __dict__ and __pydantic_fields_set__ from trusted or pre-validated data.
Default values are respected, but no other validation is performed.
model_construct() generally respects the model_config.extra setting on the provided model.
That is, if model_config.extra == 'allow', then all extra passed values are added to the model instance’s __dict__
and __pydantic_extra__ fields. If model_config.extra == 'ignore' (the default), then all extra passed values are ignored.
Because no validation is performed with a call to model_construct(), having model_config.extra == 'forbid' does not result in
an error if extra values are passed, but they will be ignored.
| Parameter | Type | Description |
|---|---|---|
_fields_set |
set[str] | None |
A set of field names that were originally explicitly set during instantiation. If provided, this is directly used for the
model_fields_set attribute. Otherwise, the field names from the values argument will be used. |
values |
Any |
Trusted or pre-validated data dictionary. |
model_copy()
def model_copy(
update: Mapping[str, Any] | None,
deep: bool,
) -> SelfReturns a copy of the model.
The underlying instance’s
__dict__ attribute is copied. This
might have unexpected side effects if you store anything in it, on top of the model
fields (e.g. the value of
cached properties).
| Parameter | Type | Description |
|---|---|---|
update |
Mapping[str, Any] | None |
|
deep |
bool |
Set to True to make a deep copy of the model. |
model_dump()
def model_dump(
mode: Literal['json', 'python'] | str,
include: IncEx | None,
exclude: IncEx | None,
context: Any | None,
by_alias: bool | None,
exclude_unset: bool,
exclude_defaults: bool,
exclude_none: bool,
exclude_computed_fields: bool,
round_trip: bool,
warnings: bool | Literal['none', 'warn', 'error'],
fallback: Callable[[Any], Any] | None,
serialize_as_any: bool,
) -> dict[str, Any]Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
| Parameter | Type | Description |
|---|---|---|
mode |
Literal['json', 'python'] | str |
The mode in which to_python should run. If mode is ‘json’, the output will only contain JSON serializable types. If mode is ‘python’, the output may contain non-JSON-serializable Python objects. |
include |
IncEx | None |
A set of fields to include in the output. |
exclude |
IncEx | None |
A set of fields to exclude from the output. |
context |
Any | None |
Additional context to pass to the serializer. |
by_alias |
bool | None |
Whether to use the field’s alias in the dictionary key if defined. |
exclude_unset |
bool |
Whether to exclude fields that have not been explicitly set. |
exclude_defaults |
bool |
Whether to exclude fields that are set to their default value. |
exclude_none |
bool |
Whether to exclude fields that have a value of None. |
exclude_computed_fields |
bool |
Whether to exclude computed fields. While this can be useful for round-tripping, it is usually recommended to use the dedicated round_trip parameter instead. |
round_trip |
bool |
If True, dumped values should be valid as input for non-idempotent types such as Json[T]. |
warnings |
bool | Literal['none', 'warn', 'error'] |
How to handle serialization errors. False/“none” ignores them, True/“warn” logs errors, “error” raises a
PydanticSerializationError. |
fallback |
Callable[[Any], Any] | None |
A function to call when an unknown value is encountered. If not provided, a
PydanticSerializationError error is raised. |
serialize_as_any |
bool |
Whether to serialize fields with duck-typing serialization behavior. |
model_dump_json()
def model_dump_json(
indent: int | None,
ensure_ascii: bool,
include: IncEx | None,
exclude: IncEx | None,
context: Any | None,
by_alias: bool | None,
exclude_unset: bool,
exclude_defaults: bool,
exclude_none: bool,
exclude_computed_fields: bool,
round_trip: bool,
warnings: bool | Literal['none', 'warn', 'error'],
fallback: Callable[[Any], Any] | None,
serialize_as_any: bool,
) -> strGenerates a JSON representation of the model using Pydantic’s to_json method.
| Parameter | Type | Description |
|---|---|---|
indent |
int | None |
Indentation to use in the JSON output. If None is passed, the output will be compact. |
ensure_ascii |
bool |
If True, the output is guaranteed to have all incoming non-ASCII characters escaped. If False (the default), these characters will be output as-is. |
include |
IncEx | None |
Field(s) to include in the JSON output. |
exclude |
IncEx | None |
Field(s) to exclude from the JSON output. |
context |
Any | None |
Additional context to pass to the serializer. |
by_alias |
bool | None |
Whether to serialize using field aliases. |
exclude_unset |
bool |
Whether to exclude fields that have not been explicitly set. |
exclude_defaults |
bool |
Whether to exclude fields that are set to their default value. |
exclude_none |
bool |
Whether to exclude fields that have a value of None. |
exclude_computed_fields |
bool |
Whether to exclude computed fields. While this can be useful for round-tripping, it is usually recommended to use the dedicated round_trip parameter instead. |
round_trip |
bool |
If True, dumped values should be valid as input for non-idempotent types such as Json[T]. |
warnings |
bool | Literal['none', 'warn', 'error'] |
How to handle serialization errors. False/“none” ignores them, True/“warn” logs errors, “error” raises a
PydanticSerializationError. |
fallback |
Callable[[Any], Any] | None |
A function to call when an unknown value is encountered. If not provided, a
PydanticSerializationError error is raised. |
serialize_as_any |
bool |
Whether to serialize fields with duck-typing serialization behavior. |
model_json_schema()
def model_json_schema(
by_alias: bool,
ref_template: str,
schema_generator: type[GenerateJsonSchema],
mode: JsonSchemaMode,
union_format: Literal['any_of', 'primitive_type_array'],
) -> dict[str, Any]Generates a JSON schema for a model class.
| Parameter | Type | Description |
|---|---|---|
by_alias |
bool |
Whether to use attribute aliases or not. |
ref_template |
str |
The reference template. - 'any_of': Use the
anyOf keyword to combine schemas (the default). - 'primitive_type_array': Use the
type keyword as an array of strings, containing each type of the combination. If any of the schemas is not a primitive type (string, boolean, null, integer or number) or contains constraints/metadata, falls back to any_of. |
schema_generator |
type[GenerateJsonSchema] |
To override the logic used to generate the JSON schema, as a subclass of GenerateJsonSchema with your desired modifications |
mode |
JsonSchemaMode |
The mode in which to generate the schema. |
union_format |
Literal['any_of', 'primitive_type_array'] |
model_parametrized_name()
def model_parametrized_name(
params: tuple[type[Any], ...],
) -> strCompute the class name for parametrizations of generic classes.
This method can be overridden to achieve a custom naming scheme for generic BaseModels.
| Parameter | Type | Description |
|---|---|---|
params |
tuple[type[Any], ...] |
Tuple of types of the class. Given a generic class Model with 2 type variables and a concrete model Model[str, int], the value (str, int) would be passed to params. |
model_post_init()
def model_post_init(
context: Any,
)This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
| Parameter | Type | Description |
|---|---|---|
context |
Any |
The context. |
model_rebuild()
def model_rebuild(
force: bool,
raise_errors: bool,
_parent_namespace_depth: int,
_types_namespace: MappingNamespace | None,
) -> bool | NoneTry to rebuild the pydantic-core schema for the model.
This may be necessary when one of the annotations is a ForwardRef which could not be resolved during the initial attempt to build the schema, and automatic rebuilding fails.
| Parameter | Type | Description |
|---|---|---|
force |
bool |
Whether to force the rebuilding of the model schema, defaults to False. |
raise_errors |
bool |
Whether to raise errors, defaults to True. |
_parent_namespace_depth |
int |
The depth level of the parent namespace, defaults to 2. |
_types_namespace |
MappingNamespace | None |
The types namespace, defaults to None. |
model_validate()
def model_validate(
obj: Any,
strict: bool | None,
extra: ExtraValues | None,
from_attributes: bool | None,
context: Any | None,
by_alias: bool | None,
by_name: bool | None,
) -> SelfValidate a pydantic model instance.
| Parameter | Type | Description |
|---|---|---|
obj |
Any |
The object to validate. |
strict |
bool | None |
Whether to enforce types strictly. |
extra |
ExtraValues | None |
Whether to ignore, allow, or forbid extra data during model validation. See the
extra configuration value for details. |
from_attributes |
bool | None |
Whether to extract data from object attributes. |
context |
Any | None |
Additional context to pass to the validator. |
by_alias |
bool | None |
Whether to use the field’s alias when validating against the provided input data. |
by_name |
bool | None |
Whether to use the field’s name when validating against the provided input data. |
model_validate_json()
def model_validate_json(
json_data: str | bytes | bytearray,
strict: bool | None,
extra: ExtraValues | None,
context: Any | None,
by_alias: bool | None,
by_name: bool | None,
) -> SelfValidate the given JSON data against the Pydantic model.
| Parameter | Type | Description |
|---|---|---|
json_data |
str | bytes | bytearray |
The JSON data to validate. |
strict |
bool | None |
Whether to enforce types strictly. |
extra |
ExtraValues | None |
Whether to ignore, allow, or forbid extra data during model validation. See the
extra configuration value for details. |
context |
Any | None |
Extra variables to pass to the validator. |
by_alias |
bool | None |
Whether to use the field’s alias when validating against the provided input data. |
by_name |
bool | None |
Whether to use the field’s name when validating against the provided input data. |
model_validate_strings()
def model_validate_strings(
obj: Any,
strict: bool | None,
extra: ExtraValues | None,
context: Any | None,
by_alias: bool | None,
by_name: bool | None,
) -> SelfValidate the given object with string data against the Pydantic model.
| Parameter | Type | Description |
|---|---|---|
obj |
Any |
The object containing string data to validate. |
strict |
bool | None |
Whether to enforce types strictly. |
extra |
ExtraValues | None |
Whether to ignore, allow, or forbid extra data during model validation. See the
extra configuration value for details. |
context |
Any | None |
Extra variables to pass to the validator. |
by_alias |
bool | None |
Whether to use the field’s alias when validating against the provided input data. |
by_name |
bool | None |
Whether to use the field’s name when validating against the provided input data. |
named_remote()
def named_remote(
name: str,
) -> File[T]Create a File reference whose remote path is derived deterministically from name.
Unlike :meth:new_remote, which generates a random path on every call, this method
produces the same path for the same name within a given task execution. This makes
it safe across retries: the first attempt uploads to the path and subsequent retries
resolve to the identical location without re-uploading.
The path is optionally namespaced by the node ID extracted from the backend raw-data path, which follows the convention:
{run_name}-{node_id}-{attempt_index}
If extraction fails, the function falls back to the run base directory alone.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Plain filename (e.g., “data.csv”). Must not contain path separators. |
new_remote()
def new_remote(
file_name: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Create a new File reference for a remote file that will be written to.
Use this when you want to create a new file and write to it directly without creating a local file first.
Example (Async):
@env.task
async def create_csv() -> File:
df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
file = File.new_remote()
async with file.open("wb") as f:
df.to_csv(f)
return file| Parameter | Type | Description |
|---|---|---|
file_name |
Optional[str] |
Optional string specifying a remote file name. If not set, a generated file name will be returned. |
hash_method |
Optional[HashMethod | str] |
Optional HashMethod or string to use for cache key computation. If a string is provided, it will be used as a precomputed cache key. If a HashMethod is provided, it will be used to compute the hash as data is written. |
open()
def open(
mode: str,
block_size: Optional[int],
cache_type: str,
cache_options: Optional[dict],
compression: Optional[str],
kwargs,
) -> AsyncGenerator[Union[AsyncWritableFile, AsyncReadableFile, 'HashingWriter'], None]Asynchronously open the file and return a file-like object.
Use this method in async tasks to read from or write to files directly.
Example (Async Read):
@env.task
async def read_file(f: File) -> str:
async with f.open("rb") as fh:
content = bytes(await fh.read())
return content.decode("utf-8")Example (Async Write):
@env.task
async def write_file() -> File:
f = File.new_remote()
async with f.open("wb") as fh:
await fh.write(b"Hello, World!")
return fExample (Streaming Read):
@env.task
async def stream_read(f: File) -> str:
content_parts = []
async with f.open("rb", block_size=1024) as fh:
while True:
chunk = await fh.read()
if not chunk:
break
content_parts.append(chunk)
return b"".join(content_parts).decode("utf-8")| Parameter | Type | Description |
|---|---|---|
mode |
str |
|
block_size |
Optional[int] |
Size of blocks for reading in bytes. Useful for streaming large files. |
cache_type |
str |
Caching mechanism to use (‘readahead’, ‘mmap’, ‘bytes’, ’none’) |
cache_options |
Optional[dict] |
Dictionary of options for the cache |
compression |
Optional[str] |
Compression format or None for auto-detection |
kwargs |
**kwargs |
open_sync()
def open_sync(
mode: str,
block_size: Optional[int],
cache_type: str,
cache_options: Optional[dict],
compression: Optional[str],
kwargs,
) -> Generator[IO[Any], None, None]Synchronously open the file and return a file-like object.
Use this method in non-async tasks to read from or write to files directly.
Example (Sync Read):
@env.task
def read_file_sync(f: File) -> str:
with f.open_sync("rb") as fh:
content = fh.read()
return content.decode("utf-8")Example (Sync Write):
@env.task
def write_file_sync() -> File:
f = File.new_remote()
with f.open_sync("wb") as fh:
fh.write(b"Hello, World!")
return f| Parameter | Type | Description |
|---|---|---|
mode |
str |
|
block_size |
Optional[int] |
Size of blocks for reading in bytes. Useful for streaming large files. |
cache_type |
str |
Caching mechanism to use (‘readahead’, ‘mmap’, ‘bytes’, ’none’) |
cache_options |
Optional[dict] |
Dictionary of options for the cache |
compression |
Optional[str] |
Compression format or None for auto-detection |
kwargs |
**kwargs |
parse_file()
def parse_file(
path: str | Path,
content_type: str | None,
encoding: str,
proto: DeprecatedParseProtocol | None,
allow_pickle: bool,
) -> Self| Parameter | Type | Description |
|---|---|---|
path |
str | Path |
|
content_type |
str | None |
|
encoding |
str |
|
proto |
DeprecatedParseProtocol | None |
|
allow_pickle |
bool |
parse_obj()
def parse_obj(
obj: Any,
) -> Self| Parameter | Type | Description |
|---|---|---|
obj |
Any |
parse_raw()
def parse_raw(
b: str | bytes,
content_type: str | None,
encoding: str,
proto: DeprecatedParseProtocol | None,
allow_pickle: bool,
) -> Self| Parameter | Type | Description |
|---|---|---|
b |
str | bytes |
|
content_type |
str | None |
|
encoding |
str |
|
proto |
DeprecatedParseProtocol | None |
|
allow_pickle |
bool |
pre_init()
def pre_init(
data,
)Internal: Pydantic validator to set default name from path. Not intended for direct use.
| Parameter | Type | Description |
|---|---|---|
data |
schema()
def schema(
by_alias: bool,
ref_template: str,
) -> Dict[str, Any]| Parameter | Type | Description |
|---|---|---|
by_alias |
bool |
|
ref_template |
str |
schema_json()
def schema_json(
by_alias: bool,
ref_template: str,
dumps_kwargs: Any,
) -> str| Parameter | Type | Description |
|---|---|---|
by_alias |
bool |
|
ref_template |
str |
|
dumps_kwargs |
Any |
schema_match()
def schema_match(
incoming: dict,
)Internal: Check if incoming schema matches File schema. Not intended for direct use.
| Parameter | Type | Description |
|---|---|---|
incoming |
dict |
update_forward_refs()
def update_forward_refs(
localns: Any,
)| Parameter | Type | Description |
|---|---|---|
localns |
Any |
validate()
def validate(
value: Any,
) -> Self| Parameter | Type | Description |
|---|---|---|
value |
Any |
writer()
def writer(
flush_bytes: int,
compression_level: int,
) -> AsyncGenerator[JsonlWriter, None]Async context manager returning a :class:JsonlWriter for streaming writes.
If the file path ends in .jsonl.zst, output is zstd-compressed.
| Parameter | Type | Description |
|---|---|---|
flush_bytes |
int |
Buffer flush threshold in bytes (default 1 MB). |
compression_level |
int |
Zstd compression level (default 3). Only used for .jsonl.zst paths. Higher = smaller files, slower writes. |
writer_sync()
def writer_sync(
flush_bytes: int,
compression_level: int,
) -> Generator[JsonlWriterSync, None, None]Sync context manager returning a :class:JsonlWriterSync for streaming writes.
If the file path ends in .jsonl.zst, output is zstd-compressed.
| Parameter | Type | Description |
|---|---|---|
flush_bytes |
int |
Buffer flush threshold in bytes (default 1 MB). |
compression_level |
int |
Zstd compression level (default 3). Only used for .jsonl.zst paths. Higher = smaller files, slower writes. |