'GCP Vertex Pipeline - Why kfp.v2.dsl.Output as function arguments work without being provided?
Why kfp.v2.dsl.Output
as function argument works without being provided?
I am following Create and run ML pipelines with Vertex Pipelines! Jupyter notebook example from GCP.
The function classif_model_eval_metrics
takes metrics: Output[Metrics]
and metricsc: Output[ClassificationMetrics]
which have no default values.
@component(
base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-6:latest",
output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
project: str,
location: str, # "us-central1",
api_endpoint: str, # "us-central1-aiplatform.googleapis.com",
thresholds_dict_str: str,
model: Input[Model],
metrics: Output[Metrics], # No default value set, hence must be mandatory
metricsc: Output[ClassificationMetrics], # No default value set, hence must be mandatory
) -> NamedTuple("Outputs", [("dep_decision", str)]):
# Full code at the bottom.
Hence those arguments should be mandatory, but the function is called without those arguments.
model_eval_task = classif_model_eval_metrics(
project,
gcp_region,
api_endpoint,
thresholds_dict_str,
training_op.outputs["model"],
# <--- No arguments for ``metrics: Output[Metrics]``` and ```metricsc: Output[ClassificationMetrics]```
)
The entire pipeline code is below.
@kfp.dsl.pipeline(name="automl-tab-beans-training-v2",
pipeline_root=PIPELINE_ROOT)
def pipeline(
bq_source: str = "bq://aju-dev-demos.beans.beans1",
display_name: str = DISPLAY_NAME,
project: str = PROJECT_ID,
gcp_region: str = "us-central1",
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
thresholds_dict_str: str = '{"auRoc": 0.95}',
):
dataset_create_op = gcc_aip.TabularDatasetCreateOp(
project=project, display_name=display_name, bq_source=bq_source
)
training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
project=project,
display_name=display_name,
optimization_prediction_type="classification",
budget_milli_node_hours=1000,
column_transformations=COLUMNS,
dataset=dataset_create_op.outputs["dataset"],
target_column="Class",
)
model_eval_task = classif_model_eval_metrics(
project,
gcp_region,
api_endpoint,
thresholds_dict_str,
training_op.outputs["model"],
# <--- No arguments for ``metrics: Output[Metrics]``` and ```metricsc: Output[ClassificationMetrics]```
)
Why does it work and what are metrics: Output[Metrics]
and metricsc: Output[ClassificationMetrics]
of type kfp.v2.dsl.Output
?
classif_model_eval_metrics function code
from kfp.v2.dsl import (
Dataset, Model, Output, Input,
OutputPath, ClassificationMetrics, Metrics, component
)
@component(
base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-6:latest",
output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
project: str,
location: str, # "us-central1",
api_endpoint: str, # "us-central1-aiplatform.googleapis.com",
thresholds_dict_str: str,
model: Input[Model],
metrics: Output[Metrics],
metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]): # Return parameter.
"""Renders evaluation metrics for an AutoML Tabular classification model.
Retrieves the classification model evaluation and render the ROC and confusion matrix
for the model. Determine whether the model is sufficiently accurate to deploy.
"""
import json
import logging
from google.cloud import aiplatform
# Fetch model eval info
def get_eval_info(client, model_name):
from google.protobuf.json_format import MessageToDict
response = client.list_model_evaluations(parent=model_name)
metrics_list = []
metrics_string_list = []
for evaluation in response:
metrics = MessageToDict(evaluation._pb.metrics)
metrics_str = json.dumps(metrics)
metrics_list.append(metrics)
metrics_string_list.append(metrics_str)
return (
evaluation.name,
metrics_list,
metrics_string_list,
)
def classification_thresholds_check(metrics_dict, thresholds_dict):
for k, v in thresholds_dict.items():
if k in ["auRoc", "auPrc"]: # higher is better
if metrics_dict[k] < v: # if under threshold, don't deploy
return False
return True
def log_metrics(metrics_list, metricsc):
test_confusion_matrix = metrics_list[0]["confusionMatrix"]
logging.info("rows: %s", test_confusion_matrix["rows"])
# log the ROC curve
fpr = [], tpr = [], thresholds = []
for item in metrics_list[0]["confidenceMetrics"]:
fpr.append(item.get("falsePositiveRate", 0.0))
tpr.append(item.get("recall", 0.0))
thresholds.append(item.get("confidenceThreshold", 0.0))
metricsc.log_roc_curve(fpr, tpr, thresholds)
# log the confusion matrix
annotations = []
for item in test_confusion_matrix["annotationSpecs"]:
annotations.append(item["displayName"])
metricsc.log_confusion_matrix(
annotations,
test_confusion_matrix["rows"],
)
# log textual metrics info as well
for metric in metrics_list[0].keys():
if metric != "confidenceMetrics":
val_string = json.dumps(metrics_list[0][metric])
metrics.log_metric(metric, val_string)
# metrics.metadata["model_type"] = "AutoML Tabular classification"
aiplatform.init(project=project)
client = aiplatform.gapic.ModelServiceClient(client_options={"api_endpoint": api_endpoint})
eval_name, metrics_list, metrics_str_list = get_eval_info(
client, model.uri.replace("aiplatform://v1/", "")
)
log_metrics(metrics_list, metricsc)
thresholds_dict = json.loads(thresholds_dict_str)
return ("true",) if classification_thresholds_check(metrics_list[0], thresholds_dict) else ("false", )
Solution 1:[1]
The custom component is defined as a Python function with a @kfp.v2.dsl.component
decorator.
The @component
decorator specifies three optional arguments: the base container image to use; any packages to install; and the yaml file to which to write the component specification.
The component function, classif_model_eval_metrics
, has some input parameters. The model parameter is an input kfp.v2.dsl.Model artifact
.
The two function args, metrics
and metricsc
, are component Outputs, in this case of types Metrics and ClassificationMetrics. They’re not explicitly passed as inputs to the component step, but rather are automatically instantiated and can be used in the component.
@component(
base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
output_component_file="tables_eval_component.yaml",
packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
project: str,
location: str, # "us-central1",
api_endpoint: str, # "us-central1-aiplatform.googleapis.com",
thresholds_dict_str: str,
model: Input[Model],
metrics: Output[Metrics],
metricsc: Output[ClassificationMetrics],
)
For example, in the function below, we’re calling metricsc.log_roc_curve()
and metricsc.log_confusion_matrix()
to render these visualizations in the Pipelines UI. These Output params become component outputs when the component is compiled, and can be consumed by other pipeline steps.
def log_metrics(metrics_list, metricsc):
...
metricsc.log_roc_curve(fpr, tpr, thresholds)
...
metricsc.log_confusion_matrix(
annotations,
test_confusion_matrix["rows"],
)
For more information you can refer to this document.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Prajna Rai T |