'GCP Vertex Pipeline - Why kfp.v2.dsl.Output as function arguments work without being provided?

Why kfp.v2.dsl.Output as function argument works without being provided?

I am following Create and run ML pipelines with Vertex Pipelines! Jupyter notebook example from GCP.

The function classif_model_eval_metrics takes metrics: Output[Metrics] and metricsc: Output[ClassificationMetrics] which have no default values.

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-6:latest",
    output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
    project: str,
    location: str,      # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],                # No default value set, hence must be mandatory
    metricsc: Output[ClassificationMetrics], # No default value set, hence must be mandatory
) -> NamedTuple("Outputs", [("dep_decision", str)]): 
    # Full code at the bottom.

Hence those arguments should be mandatory, but the function is called without those arguments.

    model_eval_task = classif_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
        # <--- No arguments for ``metrics: Output[Metrics]``` and ```metricsc: Output[ClassificationMetrics]```
    )

The entire pipeline code is below.

@kfp.dsl.pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )
    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=COLUMNS,
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classif_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
        # <--- No arguments for ``metrics: Output[Metrics]``` and ```metricsc: Output[ClassificationMetrics]```
    )

enter image description here

Why does it work and what are metrics: Output[Metrics] and metricsc: Output[ClassificationMetrics] of type kfp.v2.dsl.Output?


classif_model_eval_metrics function code

from kfp.v2.dsl import (
    Dataset, Model, Output, Input, 
    OutputPath, ClassificationMetrics, Metrics, component
)

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-6:latest",
    output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
    project: str,
    location: str,      # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.
    """Renders evaluation metrics for an AutoML Tabular classification model.
    Retrieves the classification model evaluation and render the ROC and confusion matrix
    for the model. Determine whether the model is sufficiently accurate to deploy.
    """
    import json
    import logging
    from google.cloud import aiplatform

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict
        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            metrics = MessageToDict(evaluation._pb.metrics)
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)
        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    return False
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])
        # log the ROC curve
        fpr = [], tpr = [], thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        metricsc.log_roc_curve(fpr, tpr, thresholds)
        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )
        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    aiplatform.init(project=project)

    client = aiplatform.gapic.ModelServiceClient(client_options={"api_endpoint": api_endpoint})
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model.uri.replace("aiplatform://v1/", "")
    )
    log_metrics(metrics_list, metricsc)
    thresholds_dict = json.loads(thresholds_dict_str)

    return ("true",) if classification_thresholds_check(metrics_list[0], thresholds_dict) else ("false", )


Solution 1:[1]

The custom component is defined as a Python function with a @kfp.v2.dsl.component decorator.

The @component decorator specifies three optional arguments: the base container image to use; any packages to install; and the yaml file to which to write the component specification.

The component function, classif_model_eval_metrics, has some input parameters. The model parameter is an input kfp.v2.dsl.Model artifact.

The two function args, metrics and metricsc, are component Outputs, in this case of types Metrics and ClassificationMetrics. They’re not explicitly passed as inputs to the component step, but rather are automatically instantiated and can be used in the component.

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tables_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
)

For example, in the function below, we’re calling metricsc.log_roc_curve() and metricsc.log_confusion_matrix() to render these visualizations in the Pipelines UI. These Output params become component outputs when the component is compiled, and can be consumed by other pipeline steps.

def log_metrics(metrics_list, metricsc):
        ...
        metricsc.log_roc_curve(fpr, tpr, thresholds)
        ...
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

For more information you can refer to this document.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Prajna Rai T