File
Package: flyte.io
A generic file class representing a file with a specified format. Provides both async and sync interfaces for file operations. All methods without _sync suffix are async.
The class should be instantiated using one of the class methods. The constructor should be used only to instantiate references to existing remote objects.
The generic type T represents the format of the file.
Important methods:
from_existing_remote: Create a File object from an existing remote file.new_remote: Create a new File reference for a remote file that will be written to.
Asynchronous methods:
open: Asynchronously open the file and return a file-like object.download: Asynchronously download the file to a local path.from_local: Asynchronously create a File object from a local file, uploading it to remote storage.exists: Asynchronously check if the file exists.
Synchronous methods (suffixed with _sync):
open_sync: Synchronously open the file and return a file-like object.download_sync: Synchronously download the file to a local path.from_local_sync: Synchronously create a File object from a local file, uploading it to remote storage.exists_sync: Synchronously check if the file exists.
Example: Read a file input in a Task (Async).
@env.task
async def read_file(file: File) -> str:
async with file.open("rb") as f:
content = bytes(await f.read())
return content.decode("utf-8")Example: Read a file input in a Task (Sync).
@env.task
def read_file_sync(file: File) -> str:
with file.open_sync("rb") as f:
content = f.read()
return content.decode("utf-8")Example: Write a file by streaming it directly to blob storage (Async).
@env.task
async def write_file() -> File:
file = File.new_remote()
async with file.open("wb") as f:
await f.write(b"Hello, World!")
return fileExample: Upload a local file to remote storage (Async).
@env.task
async def upload_file() -> File:
# Write to local file first
with open("/tmp/data.csv", "w") as f:
f.write("col1,col2\n1,2\n3,4\n")
# Upload to remote storage
return await File.from_local("/tmp/data.csv")Example: Upload a local file to remote storage (Sync).
@env.task
def upload_file_sync() -> File:
# Write to local file first
with open("/tmp/data.csv", "w") as f:
f.write("col1,col2\n1,2\n3,4\n")
# Upload to remote storage
return File.from_local_sync("/tmp/data.csv")Example: Download a file to local storage (Async).
@env.task
async def download_file(file: File) -> str:
local_path = await file.download()
# Process the local file
with open(local_path, "r") as f:
return f.read()Example: Download a file to local storage (Sync).
@env.task
def download_file_sync(file: File) -> str:
local_path = file.download_sync()
# Process the local file
with open(local_path, "r") as f:
return f.read()Example: Reference an existing remote file.
@env.task
async def process_existing_file() -> str:
file = File.from_existing_remote("s3://my-bucket/data.csv")
async with file.open("rb") as f:
content = await f.read()
return content.decode("utf-8")Example: Check if a file exists (Async).
@env.task
async def check_file(file: File) -> bool:
return await file.exists()Example: Check if a file exists (Sync).
@env.task
def check_file_sync(file: File) -> bool:
return file.exists_sync()Example: Pass through a file without copying.
@env.task
async def pass_through(file: File) -> File:
# No copy occurs - just passes the reference
return fileclass File(
path: str,
name: typing.Optional[str],
format: str,
hash: typing.Optional[str],
hash_method: typing.Optional[flyte.io._hashing_io.HashMethod],
)Create a new model by parsing and validating input data from keyword arguments.
Raises
ValidationError if the input data cannot be
validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
| Parameter | Type | Description |
|---|---|---|
path |
str |
The path to the file (can be local or remote) |
name |
typing.Optional[str] |
Optional name for the file (defaults to basename of path) |
format |
str |
|
hash |
typing.Optional[str] |
|
hash_method |
typing.Optional[flyte.io._hashing_io.HashMethod] |
Properties
| Property | Type | Description |
|---|---|---|
lazy_uploader |
None |
Methods
| Method | Description |
|---|---|
download() |
Asynchronously download the file to a local path. |
download_sync() |
Synchronously download the file to a local path. |
exists() |
Asynchronously check if the file exists. |
exists_sync() |
Synchronously check if the file exists. |
from_existing_remote() |
Create a File reference from an existing remote file. |
from_local() |
Asynchronously create a new File object from a local file by uploading it to remote storage. |
from_local_sync() |
Synchronously create a new File object from a local file by uploading it to remote storage. |
model_post_init() |
This function is meant to behave like a BaseModel method to initialise private attributes. |
named_remote() |
Create a File reference whose remote path is derived deterministically from name. |
new_remote() |
Create a new File reference for a remote file that will be written to. |
open() |
Asynchronously open the file and return a file-like object. |
open_sync() |
Synchronously open the file and return a file-like object. |
pre_init() |
Internal: Pydantic validator to set default name from path. |
schema_match() |
Internal: Check if incoming schema matches File schema. |
download()
def download(
local_path: Optional[Union[str, Path]],
) -> strAsynchronously download the file to a local path.
Use this when you need to download a remote file to your local filesystem for processing.
Example (Async):
@env.task
async def download_and_process(f: File) -> str:
local_path = await f.download()
# Now process the local file
with open(local_path, "r") as fh:
return fh.read()Example (Download to specific path):
@env.task
async def download_to_path(f: File) -> str:
local_path = await f.download("/tmp/myfile.csv")
return local_path| Parameter | Type | Description |
|---|---|---|
local_path |
Optional[Union[str, Path]] |
The local path to download the file to. If None, a temporary directory will be used and a path will be generated. |
download_sync()
def download_sync(
local_path: Optional[Union[str, Path]],
) -> strSynchronously download the file to a local path.
Use this in non-async tasks when you need to download a remote file to your local filesystem.
Example (Sync):
@env.task
def download_and_process_sync(f: File) -> str:
local_path = f.download_sync()
# Now process the local file
with open(local_path, "r") as fh:
return fh.read()Example (Download to specific path):
@env.task
def download_to_path_sync(f: File) -> str:
local_path = f.download_sync("/tmp/myfile.csv")
return local_path| Parameter | Type | Description |
|---|---|---|
local_path |
Optional[Union[str, Path]] |
The local path to download the file to. If None, a temporary directory will be used and a path will be generated. |
exists()
def exists()Asynchronously check if the file exists.
Example (Async):
@env.task
async def check_file(f: File) -> bool:
if await f.exists():
print("File exists!")
return True
return FalseReturns: True if the file exists, False otherwise
exists_sync()
def exists_sync()Synchronously check if the file exists.
Use this in non-async tasks or when you need synchronous file existence checking.
Example (Sync):
@env.task
def check_file_sync(f: File) -> bool:
if f.exists_sync():
print("File exists!")
return True
return FalseReturns: True if the file exists, False otherwise
from_existing_remote()
def from_existing_remote(
remote_path: str,
file_cache_key: Optional[str],
) -> File[T]Create a File reference from an existing remote file.
Use this when you want to reference a file that already exists in remote storage without uploading it.
Example:
@env.task
async def process_existing_file() -> str:
file = File.from_existing_remote("s3://my-bucket/data.csv")
async with file.open("rb") as f:
content = await f.read()
return content.decode("utf-8")| Parameter | Type | Description |
|---|---|---|
remote_path |
str |
The remote path to the existing file |
file_cache_key |
Optional[str] |
Optional hash value to use for cache key computation. If not specified, the cache key will be computed based on the file’s attributes (path, name, format). |
from_local()
def from_local(
local_path: Union[str, Path],
remote_destination: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Asynchronously create a new File object from a local file by uploading it to remote storage.
Use this in async tasks when you have a local file that needs to be uploaded to remote storage.
Example (Async):
@env.task
async def upload_local_file() -> File:
# Create a local file
async with aiofiles.open("/tmp/data.csv", "w") as f:
await f.write("col1,col2
# Upload to remote storage
remote_file = await File.from_local("/tmp/data.csv")
return remote_fileExample (With specific destination):
@env.task
async def upload_to_specific_path() -> File:
remote_file = await File.from_local("/tmp/data.csv", "s3://my-bucket/data.csv")
return remote_file| Parameter | Type | Description |
|---|---|---|
local_path |
Union[str, Path] |
Path to the local file |
remote_destination |
Optional[str] |
Optional remote path to store the file. If None, a path will be automatically generated. |
hash_method |
Optional[HashMethod | str] |
Optional HashMethod or string to use for cache key computation. If a string is provided, it will be used as a precomputed cache key. If a HashMethod is provided, it will compute the hash during upload. If not specified, the cache key will be based on file attributes. |
from_local_sync()
def from_local_sync(
local_path: Union[str, Path],
remote_destination: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Synchronously create a new File object from a local file by uploading it to remote storage.
Use this in non-async tasks when you have a local file that needs to be uploaded to remote storage.
Example (Sync):
@env.task
def upload_local_file_sync() -> File:
# Create a local file
with open("/tmp/data.csv", "w") as f:
f.write("col1,col2
# Upload to remote storage
remote_file = File.from_local_sync("/tmp/data.csv")
return remote_fileExample (With specific destination):
@env.task
def upload_to_specific_path() -> File:
remote_file = File.from_local_sync("/tmp/data.csv", "s3://my-bucket/data.csv")
return remote_file| Parameter | Type | Description |
|---|---|---|
local_path |
Union[str, Path] |
Path to the local file |
remote_destination |
Optional[str] |
Optional remote path to store the file. If None, a path will be automatically generated. |
hash_method |
Optional[HashMethod | str] |
Optional HashMethod or string to use for cache key computation. If a string is provided, it will be used as a precomputed cache key. If a HashMethod is provided, it will compute the hash during upload. If not specified, the cache key will be based on file attributes. |
model_post_init()
def model_post_init(
context: Any,
)This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
| Parameter | Type | Description |
|---|---|---|
context |
Any |
The context. |
named_remote()
def named_remote(
name: str,
) -> File[T]Create a File reference whose remote path is derived deterministically from name.
Unlike :meth:new_remote, which generates a random path on every call, this method
produces the same path for the same name within a given task execution. This makes
it safe across retries: the first attempt uploads to the path and subsequent retries
resolve to the identical location without re-uploading.
The path is optionally namespaced by the node ID extracted from the backend raw-data path, which follows the convention:
{run_name}-{node_id}-{attempt_index}
If extraction fails, the function falls back to the run base directory alone.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Plain filename (e.g., “data.csv”). Must not contain path separators. |
new_remote()
def new_remote(
file_name: Optional[str],
hash_method: Optional[HashMethod | str],
) -> File[T]Create a new File reference for a remote file that will be written to.
Use this when you want to create a new file and write to it directly without creating a local file first.
Example (Async):
@env.task
async def create_csv() -> File:
df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]})
file = File.new_remote()
async with file.open("wb") as f:
df.to_csv(f)
return file| Parameter | Type | Description |
|---|---|---|
file_name |
Optional[str] |
Optional string specifying a remote file name. If not set, a generated file name will be returned. |
hash_method |
Optional[HashMethod | str] |
Optional HashMethod or string to use for cache key computation. If a string is provided, it will be used as a precomputed cache key. If a HashMethod is provided, it will be used to compute the hash as data is written. |
open()
def open(
mode: str,
block_size: Optional[int],
cache_type: str,
cache_options: Optional[dict],
compression: Optional[str],
kwargs,
) -> AsyncGenerator[Union[AsyncWritableFile, AsyncReadableFile, 'HashingWriter'], None]Asynchronously open the file and return a file-like object.
Use this method in async tasks to read from or write to files directly.
Example (Async Read):
@env.task
async def read_file(f: File) -> str:
async with f.open("rb") as fh:
content = bytes(await fh.read())
return content.decode("utf-8")Example (Async Write):
@env.task
async def write_file() -> File:
f = File.new_remote()
async with f.open("wb") as fh:
await fh.write(b"Hello, World!")
return fExample (Streaming Read):
@env.task
async def stream_read(f: File) -> str:
content_parts = []
async with f.open("rb", block_size=1024) as fh:
while True:
chunk = await fh.read()
if not chunk:
break
content_parts.append(chunk)
return b"".join(content_parts).decode("utf-8")| Parameter | Type | Description |
|---|---|---|
mode |
str |
|
block_size |
Optional[int] |
Size of blocks for reading in bytes. Useful for streaming large files. |
cache_type |
str |
Caching mechanism to use (‘readahead’, ‘mmap’, ‘bytes’, ’none’) |
cache_options |
Optional[dict] |
Dictionary of options for the cache |
compression |
Optional[str] |
Compression format or None for auto-detection |
kwargs |
**kwargs |
open_sync()
def open_sync(
mode: str,
block_size: Optional[int],
cache_type: str,
cache_options: Optional[dict],
compression: Optional[str],
kwargs,
) -> Generator[IO[Any], None, None]Synchronously open the file and return a file-like object.
Use this method in non-async tasks to read from or write to files directly.
Example (Sync Read):
@env.task
def read_file_sync(f: File) -> str:
with f.open_sync("rb") as fh:
content = fh.read()
return content.decode("utf-8")Example (Sync Write):
@env.task
def write_file_sync() -> File:
f = File.new_remote()
with f.open_sync("wb") as fh:
fh.write(b"Hello, World!")
return f| Parameter | Type | Description |
|---|---|---|
mode |
str |
|
block_size |
Optional[int] |
Size of blocks for reading in bytes. Useful for streaming large files. |
cache_type |
str |
Caching mechanism to use (‘readahead’, ‘mmap’, ‘bytes’, ’none’) |
cache_options |
Optional[dict] |
Dictionary of options for the cache |
compression |
Optional[str] |
Compression format or None for auto-detection |
kwargs |
**kwargs |
pre_init()
def pre_init(
data,
)Internal: Pydantic validator to set default name from path. Not intended for direct use.
| Parameter | Type | Description |
|---|---|---|
data |
schema_match()
def schema_match(
incoming: dict,
)Internal: Check if incoming schema matches File schema. Not intended for direct use.
| Parameter | Type | Description |
|---|---|---|
incoming |
dict |