The page you navigated to () does not exist, so we brought you to the closest page to it.
You have switched from the to the variant of this site. There is no equivalent of . We have taken you to the closest page in the variant.
JsonlFile
Package: flyteplugins.jsonl
A file type for JSONL (JSON Lines) files, backed by orjson for fast
serialisation.
Provides streaming read and write methods that process one record at a time
without loading the entire file into memory. Inherits all File
capabilities (remote storage, upload/download, etc.).
Supports zstd-compressed files transparently via extension detection
(.jsonl.zst / .jsonl.zstd).
Example (Async read - compressed or uncompressed):
@env.task
async def process(f: JsonlFile):
async for record in f.iter_records():
print(record)Example (Async write - compressed or uncompressed):
@env.task
async def create() -> JsonlFile:
f = JsonlFile.new_remote("data.jsonl")
async with f.writer() as w:
await w.write({"key": "value"})
return fExample (Sync write - compressed or uncompressed):
@env.task
def create() -> JsonlFile:
f = JsonlFile.new_remote("data.jsonl")
with f.writer_sync() as w:
w.write({"key": "value"})
return fParameters
class JsonlFile(
path: str,
name: typing.Optional[str],
format: str,
hash: typing.Optional[str],
hash_method: typing.Optional[flyte.io._hashing_io.HashMethod],
)Create a new model by parsing and validating input data from keyword arguments.
Raises
ValidationError if the input data cannot be
validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
| Parameter | Type | Description |
|---|---|---|
path |
str |
|
name |
typing.Optional[str] |
|
format |
str |
|
hash |
typing.Optional[str] |
|
hash_method |
typing.Optional[flyte.io._hashing_io.HashMethod] |
Properties
| Property | Type | Description |
|---|---|---|
lazy_uploader |
None |
Methods
| Method | Description |
|---|---|
download() |
Asynchronously download the file to a local path. |
download_sync() |
Synchronously download the file to a local path. |
exists() |
Asynchronously check if the file exists. |
exists_sync() |
Synchronously check if the file exists. |
from_existing_remote() |
Create a File reference from an existing remote file. |
from_local() |
Asynchronously create a new File object from a local file by uploading it to remote storage. |
from_local_sync() |
Synchronously create a new File object from a local file by uploading it to remote storage. |
iter_arrow_batches() |
Stream JSONL as Arrow RecordBatches. |
iter_arrow_batches_sync() |
Sync generator that yields Arrow RecordBatches. |
iter_records() |
Async generator that yields parsed dicts line by line. |
iter_records_sync() |
Sync generator that yields parsed dicts line by line. |
model_post_init() |
This function is meant to behave like a BaseModel method to initialize private attributes. |
named_remote() |
Create a File reference whose remote path is derived deterministically from name. |
new_remote() |
Create a new File reference for a remote file that will be written to. |
open() |
Asynchronously open the file and return a file-like object. |
open_sync() |
Synchronously open the file and return a file-like object. |
pre_init() |
Internal: Pydantic validator to set default name from path. |
schema_match() |
Internal: Check if incoming schema matches File schema. |
writer() |
Async context manager returning a JsonlWriter for streaming writes. |
writer_sync() |
Sync context manager returning a JsonlWriterSync for streaming writes. |
download()
def download(
local_path: Optional[Union[str, Path]],
) -> strAsynchronously download the file to a local path.
Use this when you need to download a remote file to your local filesystem for processing.
Example (Async):
@env.task
async def download_and_process(f: File) -> str:
local_path = await f.download()
# Now process the local file
with open(local_path, "r") as fh:
return fh.read()Example (Download to specific path):
@env.task
async def download_to_path(f: File) -> str:
local_path = await f.download("/tmp/myfile.csv")
return local_path| Parameter | Type | Description |
|---|---|---|
local_path |
Optional[Union[str, Path]] |
The local path to download the file to. If None, a temporary directory will be used and a path will be generated. |
Returns: The absolute path to the downloaded file
download_sync()
def download_sync(
local_path: Optional[Union[str, Path]],
) -> strSynchronously download the file to a local path.
Use this in non-async tasks when you need to download a remote file to your local filesystem.
Example (Sync):
@env.task
def download_and_process_sync(f: File) -> str:
local_path = f.download_sync()
# Now process the local file
with open(local_path, "r") as fh:
return fh.read()Example (Download to specific path):
@env.task
def download_to_path_sync(f: File) -> str:
local_path = f.download_sync("/tmp/myfile.csv")
return local_path| Parameter | Type | Description |
|---|---|---|
local_path |
Optional[Union[str, Path]] |
The local path to download the file to. If None, a temporary directory will be used and a path will be generated. |
Returns: The absolute path to the downloaded file
exists()
def exists()Asynchronously check if the file exists.
Example (Async):
@env.task
async def check_file(f: File) -> bool:
if await f.exists():
print("File exists!")
return True
return FalseReturns: True if the file exists, False otherwise
exists_sync()
def exists_sync()Synchronously check if the file exists.
Use this in non-async tasks or when you need synchronous file existence checking.
Example (Sync):
@env.task
def check_file_sync(f: File) -> bool:
if f.exists_sync():
print("File exists!")
return True
return FalseReturns: True if the file exists, False otherwise
from_existing_remote()
def from_existing_remote(
remote_path: str,
file_cache_key: Optional[str],
) -> File[T]Create a File reference from an existing remote file.
Use this when you want to reference a file that already exists in remote storage without uploading it.
@env.task
async def process_existing_file() -> str:
file = File.from_existing_remote("s3://my-bucket/data.csv")
async with file.open("rb") as f:
content = await f.read()
return content.decode("utf-8")| Parameter | Type | Description |
|---|---|---|
remote_path |
str |
The remote path to the existing file |
file_cache_key |
Optional[str] |
Optional hash value to use for cache key computation. If not specified, the cache key will be computed based on the file’s attributes (path, name, format). |
Returns: A new File instance pointing to the existing remote file
from_local()
def from_local(
local_path: Union[str, Path],
remote_destination: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Asynchronously create a new File object from a local file by uploading it to remote storage.
Use this in async tasks when you have a local file that needs to be uploaded to remote storage.
Example (Async):
```python
@env.task
async def upload_local_file() -> File:
# Create a local file
async with aiofiles.open("/tmp/data.csv", "w") as f:
await f.write("col1,col2
1,2 3,4 “)
# Upload to remote storage
remote_file = await File.from_local("/tmp/data.csv")
return remote_file
```
Example (With specific destination):
```python
@env.task
async def upload_to_specific_path() -> File:
remote_file = await File.from_local("/tmp/data.csv", "s3://my-bucket/data.csv")
return remote_file
```
Args:
local_path: Path to the local file
remote_destination: Optional remote path to store the file. If None, a path will be automatically generated.
hash_method: Optional HashMethod or string to use for cache key computation. If a string is provided,
it will be used as a precomputed cache key. If a HashMethod is provided, it will compute
the hash during upload. If not specified, the cache key will be based on file attributes.
Returns:
A new File instance pointing to the uploaded remote file
| Parameter | Type | Description |
|---|---|---|
local_path |
Union[str, Path] |
|
remote_destination |
Optional[str] |
|
hash_method |
Optional[HashMethod | str] |
from_local_sync()
def from_local_sync(
local_path: Union[str, Path],
remote_destination: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Synchronously create a new File object from a local file by uploading it to remote storage.
Use this in non-async tasks when you have a local file that needs to be uploaded to remote storage.
Example (Sync):
```python
@env.task
def upload_local_file_sync() -> File:
# Create a local file
with open("/tmp/data.csv", "w") as f:
f.write("col1,col2
1,2 3,4 “)
# Upload to remote storage
remote_file = File.from_local_sync("/tmp/data.csv")
return remote_file
```
Example (With specific destination):
```python
@env.task
def upload_to_specific_path() -> File:
remote_file = File.from_local_sync("/tmp/data.csv", "s3://my-bucket/data.csv")
return remote_file
```
Args:
local_path: Path to the local file
remote_destination: Optional remote path to store the file. If None, a path will be automatically generated.
hash_method: Optional HashMethod or string to use for cache key computation. If a string is provided,
it will be used as a precomputed cache key. If a HashMethod is provided, it will compute
the hash during upload. If not specified, the cache key will be based on file attributes.
Returns:
A new File instance pointing to the uploaded remote file
| Parameter | Type | Description |
|---|---|---|
local_path |
Union[str, Path] |
|
remote_destination |
Optional[str] |
|
hash_method |
Optional[HashMethod | str] |
iter_arrow_batches()
def iter_arrow_batches(
batch_size: int,
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> AsyncGenerator[Any, None]Stream JSONL as Arrow RecordBatches.
Memory usage is bounded by batch_size.
| Parameter | Type | Description |
|---|---|---|
batch_size |
int |
|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
iter_arrow_batches_sync()
def iter_arrow_batches_sync(
batch_size: int,
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> Generator[Any, None, None]Sync generator that yields Arrow RecordBatches.
Memory usage is bounded by batch_size.
| Parameter | Type | Description |
|---|---|---|
batch_size |
int |
|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
iter_records()
def iter_records(
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> AsyncGenerator[dict[str, Any], None]Async generator that yields parsed dicts line by line.
| Parameter | Type | Description |
|---|---|---|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
iter_records_sync()
def iter_records_sync(
on_error: Literal['raise', 'skip'] | ErrorHandler,
) -> Generator[dict[str, Any], None, None]Sync generator that yields parsed dicts line by line.
| Parameter | Type | Description |
|---|---|---|
on_error |
Literal['raise', 'skip'] | ErrorHandler |
model_post_init()
def model_post_init(
context: Any,
)This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
| Parameter | Type | Description |
|---|---|---|
context |
Any |
The context. |
named_remote()
def named_remote(
name: str,
) -> File[T]Create a File reference whose remote path is derived deterministically from name.
Unlike new_remote, which generates a random path on every call, this method
produces the same path for the same name within a given task execution. This makes
it safe across retries: the first attempt uploads to the path and subsequent retries
resolve to the identical location without re-uploading.
The path is optionally namespaced by the node ID extracted from the backend raw-data path, which follows the convention:
{run_name}-{node_id}-{attempt_index}
If extraction fails, the function falls back to the run base directory alone.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Plain filename (e.g., “data.csv”). Must not contain path separators. |
Returns: A File instance whose path is stable across retries.
new_remote()
def new_remote(
file_name: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Create a new File reference for a remote file that will be written to.
Use this when you want to create a new file and write to it directly without creating a local file first.
Example (Async):
@env.task
async def create_csv() -> File:
df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
file = File.new_remote()
async with file.open("wb") as f:
df.to_csv(f)
return file| Parameter | Type | Description |
|---|---|---|
file_name |
Optional[str] |
Optional string specifying a remote file name. If not set, a generated file name will be returned. |
hash_method |
Optional[HashMethod | str] |
Optional HashMethod or string to use for cache key computation. If a string is provided, it will be used as a precomputed cache key. If a HashMethod is provided, it will be used to compute the hash as data is written. |
Returns: A new File instance with a generated remote path
open()
def open(
mode: str,
block_size: Optional[int],
cache_type: str,
cache_options: Optional[dict],
compression: Optional[str],
kwargs,
) -> AsyncGenerator[Union[AsyncWritableFile, AsyncReadableFile, 'HashingWriter'], None]Asynchronously open the file and return a file-like object.
Use this method in async tasks to read from or write to files directly.
Example (Async Read):
@env.task
async def read_file(f: File) -> str:
async with f.open("rb") as fh:
content = bytes(await fh.read())
return content.decode("utf-8")Example (Async Write):
@env.task
async def write_file() -> File:
f = File.new_remote()
async with f.open("wb") as fh:
await fh.write(b"Hello, World!")
return fExample (Streaming Read):
@env.task
async def stream_read(f: File) -> str:
content_parts = []
async with f.open("rb", block_size=1024) as fh:
while True:
chunk = await fh.read()
if not chunk:
break
content_parts.append(chunk)
return b"".join(content_parts).decode("utf-8")| Parameter | Type | Description |
|---|---|---|
mode |
str |
The mode to open the file in (default: ‘rb’). Common modes: ‘rb’ (read binary), ‘wb’ (write binary), ‘rt’ (read text), ‘wt’ (write text) |
block_size |
Optional[int] |
Size of blocks for reading in bytes. Useful for streaming large files. |
cache_type |
str |
Caching mechanism to use (‘readahead’, ‘mmap’, ‘bytes’, ’none’) |
cache_options |
Optional[dict] |
Dictionary of options for the cache |
compression |
Optional[str] |
Compression format or None for auto-detection |
kwargs |
**kwargs |
Returns: An async file-like object that can be used with async read/write operations
open_sync()
def open_sync(
mode: str,
block_size: Optional[int],
cache_type: str,
cache_options: Optional[dict],
compression: Optional[str],
kwargs,
) -> Generator[IO[Any], None, None]Synchronously open the file and return a file-like object.
Use this method in non-async tasks to read from or write to files directly.
Example (Sync Read):
@env.task
def read_file_sync(f: File) -> str:
with f.open_sync("rb") as fh:
content = fh.read()
return content.decode("utf-8")Example (Sync Write):
@env.task
def write_file_sync() -> File:
f = File.new_remote()
with f.open_sync("wb") as fh:
fh.write(b"Hello, World!")
return f| Parameter | Type | Description |
|---|---|---|
mode |
str |
The mode to open the file in (default: ‘rb’). Common modes: ‘rb’ (read binary), ‘wb’ (write binary), ‘rt’ (read text), ‘wt’ (write text) |
block_size |
Optional[int] |
Size of blocks for reading in bytes. Useful for streaming large files. |
cache_type |
str |
Caching mechanism to use (‘readahead’, ‘mmap’, ‘bytes’, ’none’) |
cache_options |
Optional[dict] |
Dictionary of options for the cache |
compression |
Optional[str] |
Compression format or None for auto-detection |
kwargs |
**kwargs |
Returns: A file-like object that can be used with standard read/write operations
pre_init()
def pre_init(
data,
)Internal: Pydantic validator to set default name from path. Not intended for direct use.
| Parameter | Type | Description |
|---|---|---|
data |
schema_match()
def schema_match(
incoming: dict,
)Internal: Check if incoming schema matches File schema. Not intended for direct use.
| Parameter | Type | Description |
|---|---|---|
incoming |
dict |
writer()
def writer(
flush_bytes: int,
compression_level: int,
) -> AsyncGenerator[JsonlWriter, None]Async context manager returning a JsonlWriter for streaming writes.
If the file path ends in .jsonl.zst, output is zstd-compressed.
| Parameter | Type | Description |
|---|---|---|
flush_bytes |
int |
Buffer flush threshold in bytes (default 1 MB). |
compression_level |
int |
Zstd compression level (default 3). Only used for .jsonl.zst paths. Higher = smaller files, slower writes. |
writer_sync()
def writer_sync(
flush_bytes: int,
compression_level: int,
) -> Generator[JsonlWriterSync, None, None]Sync context manager returning a JsonlWriterSync for streaming writes.
If the file path ends in .jsonl.zst, output is zstd-compressed.
| Parameter | Type | Description |
|---|---|---|
flush_bytes |
int |
Buffer flush threshold in bytes (default 1 MB). |
compression_level |
int |
Zstd compression level (default 3). Only used for .jsonl.zst paths. Higher = smaller files, slower writes. |