'Pyspark UDF monitoring with prometheus

I am am trying to monitor some logic in a udf using counters.

i.e.

counter = Counter(...).labels("value")

@ufd
def do_smthng(col):
  if col:
    counter.label("not_null").inc()
  else:
    counter.label("null").inc()
  return col

This is not the real case, but you should get the idea. I have followed this article: https://kb.databricks.com/metrics/spark-metrics.html

I have so far tried:

  • Using a global prometheus counter (Failed with Lock is not picklable)
  • Creating a custom source using py4j:

# noinspection PyPep8Naming
class CustomMetrics:
    def __init__(self, sourceName, metricRegistry):
        self.metricRegistry = metricRegistry
        self.sourceName = sourceName

    class Java:
        implements = ["org.apache.spark.metrics.source.Source"]

py_4j_gateway = spark_session.sparkContext._gateway
metric_registry = py_4j_gateway.jvm.com.codahale.metrics.MetricRegistry()
SparkEnv = py_4j_gateway.jvm.org.apache.spark.SparkEnv
custom_metrics_provider = CustomMetrics("spark.ingest.custom", metric_registry)

Which failed with the same error. I also can't get SparkEnv.get.metricsSystem so I can't register the custom metrics client in any case.

Is there no way for me to access the internal metric registry from python? I am starting to wonder how people do monitor spark pipelines with custom metrics.

Spark 3.1.2 Python 3.8 x86 MackBook Pro M1 Pro



Solution 1:[1]

Why don't you use a accumulator? It's made to be accessible and is perfect for counting things. It's a hold over from Map Reduce that was used for collecting metrics before spark was invented.

Your accumulator can then be exposed as a sink via a 'PrometheusServlet'

namespace=AccumulatorSource note: User-configurable sources to attach accumulators to metric system DoubleAccumulatorSource LongAccumulatorSource

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1