Enrico Minack created SPARK-34806:
-------------------------------------

             Summary: Helper class for batch Dataset.observe()
                 Key: SPARK-34806
                 URL: https://issues.apache.org/jira/browse/SPARK-34806
             Project: Spark
          Issue Type: New Feature
          Components: SQL
    Affects Versions: 3.2.0
            Reporter: Enrico Minack


The {{observe}} method has been added to the {{Dataset}} API in 3.0.0. It 
allows to collect aggregate metrics over data of a Dataset while they are being 
processed during an action.

These metrics are collected in a separate thread after registering 
{{QueryExecutionListener}} for batch datasets and {{StreamingQueryListener}} 
for stream datasets, respectively. While in streaming context it makes 
perfectly sense to process incremental metrics in an event-based fashion, for 
simple batch datatset processing, a single result should be retrievable without 
the need to register listeners or handling threading.

Introducing an {{Observation}} helper class can hide that complexity for simple 
use-cases in batch processing.

Similar to {{AccumulatorV2}} provided by {{SparkContext}} (e.g. 
{{SparkContext.LongAccumulator}}), the {{SparkSession}} can provide a method to 
create a new {{Observation}} instance and register it with the session.

Alternatively, an {{Observation}} instance could be instantiated on its own 
which on calling {{Observation.on(Dataset)}} registers with 
{{Dataset.sparkSession}}. This "registration" registers a listener with the 
session that retrieves the metrics.

The {{Observation}} class provides methods to retrieve the metrics. This 
retrieval has to wait for the listener to be called in a separate thread. So 
all methods will wait for this, optionally with a timeout:
 - {{Observation.get}} waits without timeout and returns the metric.
 - {{Observation.option(time, unit)}} waits at most {{time}}, returns the 
metric as an {{Option}}, or {{None}} when the timeout occurs.
 - {{Observation.waitCompleted(time, unit)}} waits for the metrics and 
indicates timeout by returning {{false}}.

Obviously, an action has to be called on the observed dataset before any of 
these methods are called, otherwise a timeout will occur.

With {{Observation.reset}}, another action can be observed. Finally, 
{{Observation.close}} unregisters the listener from the session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to