'How to pass data or files between Kubeflow containerized components in python

I'm exploring Kubeflow as an option to deploy and connect various components of a typical ML pipeline. I'm using docker containers as Kubeflow components and so far I've been unable to successfully use ContainerOp.file_outputs object to pass results between components.

Based on my understanding of the feature, creating and saving to a file that's declared as one of the file_outputs of a component should cause it to persist and be accessible for reading by the following component.

This is how I attempted to declare this in my pipeline python code:

import kfp.dsl as dsl 
import kfp.gcp as gcp

@dsl.pipeline(name='kubeflow demo')
def pipeline(project_id='kubeflow-demo-254012'):
    data_collector = dsl.ContainerOp(
        name='data collector', 
        image='eu.gcr.io/kubeflow-demo-254012/data-collector',
        arguments=[ "--project_id", project_id ],
        file_outputs={ "output": '/output.txt' }
    )   
    data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=[ "--project_id", project_id ]
    )
    data_preprocessor.after(data_collector)
    #TODO: add other components
if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline, __file__ + '.tar.gz')

In the python code for the data-collector.py component I fetch the dataset then write it to output.txt. I'm able to read from the file within the same component but not inside data-preprocessor.py where I get a FileNotFoundError.

Is the use of file_outputs invalid for container-based Kubeflow components or am I incorrectly using it in my code? If it's not an option in my case, is it possible to programmatically create Kubernetes volumes inside the pipeline declaration python code and use them instead of file_outputs?



Solution 1:[1]

Files created in one Kubeflow pipeline component are local to the container. To reference it in the subsequent steps, you would need to pass it as:

data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=["--fetched_dataset", data_collector.outputs['output'],
                   "--project_id", project_id,
                  ]

Note: data_collector.outputs['output'] will contain the actual string contents of the file /output.txt (not a path to the file). If you want for it to contain the path of the file, you'll need to write the dataset to shared storage (like s3, or a mounted PVC volume) and write the path/link to the shared storage to /output.txt. data_preprocessor can then read the dataset based on the path.

Solution 2:[2]

There are three main steps:

  1. save a outputs.txt file which will include data/parameter/anything that you want to pass to next component. Note: it should be at the root level i.e /output.txt

  2. pass file_outputs={'output': '/output.txt'} as arguments as shown is example.

  3. inside a container_op which you will write inside dsl.pipeline pass argument (to respective argument of commponent which needs output from earlier component) as comp1.output (here comp1 is 1st component which produces output & stores it in /output.txt)

import kfp
from kfp import dsl

def SendMsg(
    send_msg: str = 'akash'
):
    return dsl.ContainerOp(
        name = 'Print msg', 
        image = 'docker.io/akashdesarda/comp1:latest', 
        command = ['python', 'msg.py'],
        arguments=[
            '--msg', send_msg
        ],
        file_outputs={
            'output': '/output.txt',
        }
    )

def GetMsg(
    get_msg: str
):
    return dsl.ContainerOp(
        name = 'Read msg from 1st component',
        image = 'docker.io/akashdesarda/comp2:latest',
        command = ['python', 'msg.py'],
        arguments=[
            '--msg', get_msg
        ]
    )

@dsl.pipeline(
    name = 'Pass parameter',
    description = 'Passing para')
def  passing_parameter(send_msg):
    comp1 = SendMsg(send_msg)
    comp2 = GetMsg(comp1.output)


if __name__ == '__main__':
  import kfp.compiler as compiler
  compiler.Compiler().compile(passing_parameter, __file__ + '.tar.gz')

Solution 3:[3]

You don't have to write the data to shared storage, you can use kfp.dsl.InputArgumentPath to pass an output from a python function to the input of a container op.

@kfp.dsl.pipeline(
  name='Build Model Server Pipeline',
  description='Build a kserve model server pipeline.'
)
def build_model_server_pipeline(s3_src_path):
    download_s3_files_task = download_archive_step(s3_src_path)

    tarball_path = "/tmp/artifact.tar"
    artifact_tarball = kfp.dsl.InputArgumentPath(download_s3_files_task.outputs['output_tarball'], path=tarball_path)
    
    build_container = kfp.dsl.ContainerOp(name ='build_container',
                                          image ='python:3.8',
                                          command=['sh', '-c'],
                                          arguments=[
                                              'ls -l ' + tarball_path + ';'
                                          ],
                                          artifact_argument_paths=[artifact_tarball],
                                         )

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 santiago92
Solution 2 Akash Desarda
Solution 3 jon