'Is there an implemented way to use a kubeflow pipeline's output outside the pipeline?

I'm using local kubeflow pipelines for building a continuous machine learning test project. I have one pipeline that preprocess the data using TFX, and it saves the outputs automatically to minio. Outside of this pipeline, I want to train the model using tfx's Trainer, but I need the artifacts generated in the preprocessing pipeline. Is there an implemented way to import this outputs? I've looked through the documentation and some issues, but can't find an answer. And because I'm trying to do it continuous, I can't rely on doing it manually.

Example of my preprocessing pipeline:


    @kfp.dsl.pipeline(
      name='TFX',
      description='TFX pipeline'
    )
    def tfx_pipeline():
    
        # DL with wget, can use gcs instead as well
        fetch = kfp.dsl.ContainerOp(
          name='download',
          image='busybox',
          command=['sh', '-c'],
          arguments=[
              'sleep 1;'
              'mkdir -p /tmp/data;'
              'wget <gcp link> -O /tmp/data/results.csv'],
          file_outputs={'downloaded': '/tmp/data'})
        records_example = tfx_csv_gen(input_base=fetch.output)
        stats = tfx_statistic_gen(input_data=records_example.output)
        schema_op = tfx_schema_gen(stats.output)
        tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])
        #tag::tft[]
        transformed_output = tfx_transform(
            input_data=records_example.output,
            schema=schema_op.outputs['output'],
            module_file=module_file) # Path to your TFT code on GCS/S3
        #end::tft[]

and then executing with


    kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')


    client = kfp.Client()
    client.list_experiments()
    #exp = client.create_experiment(name='mdupdate')


    my_experiment = client.create_experiment(name='tfx_pipeline')
    my_run = client.run_pipeline(my_experiment.id, 'tfx', 
      'tfx_pipeline.zip')

I'm working on a .ipynb in visual studio code



Solution 1:[1]

You can get that information like this: https://github.com/kubeflow/pipelines/issues/4327#issuecomment-687255001

component_name: This can be checked in the yaml definition of the pipeline, under templates.name (search for the component containing the output you want)

artifact_name: This can also be checked in the yaml definition of the pipeline, under that same component on the outputs attribute

Once you got these two parameters, you can use the functions as described in the above url:

#!/usr/bin/env python3

import json
import tarfile
from base64 import b64decode
from io import BytesIO

import kfp


def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
    run = client.runs.get_run(run_id)
    workflow = json.loads(run.pipeline_runtime.workflow_manifest)
    nodes = workflow["status"]["nodes"]
    for node_id, node_info in nodes.items():
        if node_info["displayName"] == component_name:
            return node_id
    else:
        raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")


def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
    artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
    # Artifacts are returned as base64-encoded .tar.gz strings
    data = b64decode(artifact.data)
    io_buffer = BytesIO()
    io_buffer.write(data)
    io_buffer.seek(0)
    data = None
    with tarfile.open(fileobj=io_buffer) as tar:
        member_names = tar.getnames()
        if len(member_names) == 1:
            data = tar.extractfile(member_names[0]).read().decode('utf-8')
        else:
            # Is it possible for KFP artifacts to have multiple members?
            data = {}
            for member_name in member_names:
                data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
    return data


if __name__ == "__main__":
    run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
    component_name = "my-component"
    # For an output variable named "output_data"
    artifact_name = "my-component-output_data"

    client = kfp.Client()
    node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
    artifact = get_artifact(
        run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
    )
    # Do something with artifact ...

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 augustoamerico