Passing Files/Artifacts between tasks
In Workflow, FlyteFile and FlyteDirectory are special types representing files and directory that can be passed between tasks in a workflow. It helps manage and track files and directories, ensuring that they can be accessed and moved across different execution environments.
Passing files across tasks
from truefoundry.workflow import (
PythonTaskConfig,
TaskPythonBuild,
task,
workflow,
FlyteFile
)
from truefoundry.deploy import Resources
task_config = PythonTaskConfig(
image=TaskPythonBuild(
python_version="3.9",
pip_packages=["truefoundry[workflow]==0.4.7rc0"],
),
resources=Resources(cpu_request=0.45),
service_account="default",
)
@task(task_config=task_config)
def create_file() -> FlyteFile:
file_path = "/tmp/sample.txt"
with open(file_path, "w") as f:
f.write("Hello World!")
return FlyteFile(file_path)
@task(task_config=task_config)
def read_and_print_file(flyte_file: FlyteFile):
with open(flyte_file, "r") as f:
content = f.read()
print("File content:", content)
@workflow
def simple_file_workflow():
file = create_file()
read_and_print_file(flyte_file=file)
if __name__ == "__main__":
simple_file_workflow()
- In the above example we are creating a
sample.txt
file in acreate_file
task and then returning that file as FlyteFile and then inread_and_print_file
task we are taking the flyteFile as input and then printing the content of the file.
Passing directory across tasks
from truefoundry.workflow import (
PythonTaskConfig,
TaskPythonBuild,
task,
workflow,
FlyteDirectory
)
from truefoundry.deploy import Resources
task_config = PythonTaskConfig(
image=TaskPythonBuild(
python_version="3.9",
pip_packages=["truefoundry[workflow]==0.4.7rc0"],
),
resources=Resources(cpu_request=0.45),
service_account="default",
)
@task(task_config=task_config)
def create_directory() -> FlyteDirectory:
import os
dir_path = "/tmp/sample_directory"
os.makedirs(dir_path, exist_ok=True)
# Create multiple text files in the directory
for i in range(3):
with open(os.path.join(dir_path, f"file_{i}.txt"), "w") as f:
f.write(f"Content of file {i}\n")
return FlyteDirectory(dir_path)
@task(task_config=task_config)
def read_and_print_directory(directory: FlyteDirectory) -> str:
import os
# List all files in the directory
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
with open(file_path, "r") as f:
content = f.read()
print(f"Contents of {filename}:")
print(content)
return "Done reading and printing the directory."
@workflow
def simple_directory_workflow():
directory = create_directory()
read_and_print_directory(directory=directory)
- In the above example, we are creating multiple files in
/tmp/sample_directory
directory increate_directory
task and then we are passing this directory inread_and_print_directory
task to print the files in that directory and the content of these files.
Updated about 1 month ago