HyukjinKwon commented on a change in pull request #33545:
URL: https://github.com/apache/spark/pull/33545#discussion_r679026047



##########
File path: python/pyspark/sql/observation.py
##########
@@ -112,12 +115,13 @@ def get(self):
 
         Returns
         -------
-        :class:`Row`
+        :class:`Dict`

Review comment:
       ```suggestion
           dict
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
    * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
    * Only the result of the first action is available. Subsequent actions do 
not modify the result.
    *
-   * @return the observed metrics as a [[Row]]
+   * @return the observed metrics as a `Map[String, Any]`
    * @throws InterruptedException interrupted while waiting
    */
   @throws[InterruptedException]
-  def get: Row = {
+  def get: Map[String, Any] = {
     synchronized {
       // we need to loop as wait might return without us calling notify
       // 
https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610
-      while (this.row.isEmpty) {
+      while (this.metrics.isEmpty) {
         wait()
       }
     }
 
-    this.row.get
+    this.metrics.get
+  }
+
+  /**
+   * (Java-specific)
+   * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
+   * Only the result of the first action is available. Subsequent actions do 
not modify the result.
+   *

Review comment:
       ```suggestion
      * (Java-specific) Get the observed metrics. This waits for the observed 
dataset to finish
      * its first action. Only the result of the first action is available. 
Subsequent actions do not
      * modify the result.
      *
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
    * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
    * Only the result of the first action is available. Subsequent actions do 
not modify the result.
    *

Review comment:
       ```suggestion
      * (Scala-specific) Get the observed metrics. This waits for the observed 
dataset to finish
      * its first action. Only the result of the first action is available. 
Subsequent actions do not
      * modify the result.
      *
   ```

##########
File path: 
sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
##########
@@ -391,6 +390,60 @@ public void testGroupBy() {
     Assert.assertEquals(asSet("1a#2", "3foobar#6", "5#10"), 
toSet(cogrouped.collectAsList()));
   }
 
+  @Test
+  public void testObservation() {
+    // SPARK-34806: tests the Observation Java API and 
Dataset.observe(Observation, Column, Column*)
+    Observation namedObservation = new Observation("named");
+    Observation unnamedObservation = new Observation();
+
+    Dataset<Long> df = spark
+      .range(100)
+      .observe(
+        namedObservation,
+        min(col("id")).as("min_val"),
+        max(col("id")).as("max_val"),
+        sum(col("id")).as("sum_val"),
+        count(when(pmod(col("id"), lit(2)).$eq$eq$eq(0), 1)).as("num_even")
+      )
+      .observe(
+        unnamedObservation,
+        avg(col("id")).cast("int").as("avg_val")
+      );
+
+    df.collect();
+    Map<String, Object> namedMetrics = null;
+    Map<String, Object> unnamedMetrics = null;
+
+    try {
+      namedMetrics = namedObservation.getAsJavaMap();
+      unnamedMetrics = unnamedObservation.getAsJavaMap();

Review comment:
       ```suggestion
         namedMetrics = namedObservation.getAsJava();
         unnamedMetrics = unnamedObservation.getAsJava();
   ```

##########
File path: python/pyspark/sql/observation.py
##########
@@ -112,12 +115,13 @@ def get(self):
 
         Returns
         -------
-        :class:`Row`
+        :class:`Dict`
             the observed metrics
         """
         assert self._jo is not None, 'call DataFrame.observe'
-        jrow = self._jo.get()
-        return self._to_row(jrow)
+        jmap = self._jo.getAsJavaMap()
+        # return a pure Python dict, not a py4j JavaMap
+        return {k: v for k, v in jmap.items()}

Review comment:
       can you actually directly return `self._jo.getAsJavaMap()`. That becomes 
a plain dictionary IIRC.

##########
File path: python/pyspark/sql/observation.py
##########
@@ -112,12 +115,13 @@ def get(self):
 
         Returns
         -------
-        :class:`Row`
+        :class:`Dict`
             the observed metrics
         """
         assert self._jo is not None, 'call DataFrame.observe'
-        jrow = self._jo.get()
-        return self._to_row(jrow)
+        jmap = self._jo.getAsJavaMap()

Review comment:
       ```suggestion
           jmap = self._jo.getAsJava()
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
    * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
    * Only the result of the first action is available. Subsequent actions do 
not modify the result.
    *
-   * @return the observed metrics as a [[Row]]
+   * @return the observed metrics as a `Map[String, Any]`
    * @throws InterruptedException interrupted while waiting
    */
   @throws[InterruptedException]
-  def get: Row = {
+  def get: Map[String, Any] = {
     synchronized {
       // we need to loop as wait might return without us calling notify
       // 
https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610
-      while (this.row.isEmpty) {
+      while (this.metrics.isEmpty) {
         wait()
       }
     }
 
-    this.row.get
+    this.metrics.get
+  }
+
+  /**
+   * (Java-specific)
+   * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
+   * Only the result of the first action is available. Subsequent actions do 
not modify the result.
+   *
+   * @return the observed metrics as a `java.util.Map[String, Object]`
+   * @throws InterruptedException interrupted while waiting
+   */
+  @throws[InterruptedException]
+  def getAsJavaMap: java.util.Map[String, Object] = {

Review comment:
       ```suggestion
     def getAsJava: java.util.Map[String, Object] = {
   ```

##########
File path: 
sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
##########
@@ -391,6 +390,60 @@ public void testGroupBy() {
     Assert.assertEquals(asSet("1a#2", "3foobar#6", "5#10"), 
toSet(cogrouped.collectAsList()));
   }
 
+  @Test
+  public void testObservation() {
+    // SPARK-34806: tests the Observation Java API and 
Dataset.observe(Observation, Column, Column*)
+    Observation namedObservation = new Observation("named");
+    Observation unnamedObservation = new Observation();
+
+    Dataset<Long> df = spark
+      .range(100)
+      .observe(
+        namedObservation,
+        min(col("id")).as("min_val"),
+        max(col("id")).as("max_val"),
+        sum(col("id")).as("sum_val"),
+        count(when(pmod(col("id"), lit(2)).$eq$eq$eq(0), 1)).as("num_even")
+      )
+      .observe(
+        unnamedObservation,
+        avg(col("id")).cast("int").as("avg_val")
+      );
+
+    df.collect();
+    Map<String, Object> namedMetrics = null;
+    Map<String, Object> unnamedMetrics = null;
+
+    try {
+      namedMetrics = namedObservation.getAsJavaMap();
+      unnamedMetrics = unnamedObservation.getAsJavaMap();
+    } catch (InterruptedException e) {
+      Assert.fail();
+    }
+    Map<String, Object> expectedNamedMetrics = new HashMap<String, Object>() {{
+      put("min_val", 0L);
+      put("max_val", 99L);
+      put("sum_val", 4950L);
+      put("num_even", 50L);
+    }};
+    Assert.assertEquals(expectedNamedMetrics, namedMetrics);
+
+    Map<String, Object> expectedUnnamedMetrics = new HashMap<String, Object>() 
{{
+      put("avg_val", 49);
+    }};
+    Assert.assertEquals(expectedUnnamedMetrics, unnamedMetrics);
+
+    // we can get the result multiple times
+    try {
+      namedMetrics = namedObservation.getAsJavaMap();
+      unnamedMetrics = unnamedObservation.getAsJavaMap();

Review comment:
       ```suggestion
         namedMetrics = namedObservation.getAsJava();
         unnamedMetrics = unnamedObservation.getAsJava();
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
    * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
    * Only the result of the first action is available. Subsequent actions do 
not modify the result.
    *
-   * @return the observed metrics as a [[Row]]
+   * @return the observed metrics as a `Map[String, Any]`
    * @throws InterruptedException interrupted while waiting
    */
   @throws[InterruptedException]
-  def get: Row = {
+  def get: Map[String, Any] = {
     synchronized {
       // we need to loop as wait might return without us calling notify
       // 
https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610
-      while (this.row.isEmpty) {
+      while (this.metrics.isEmpty) {
         wait()
       }
     }
 
-    this.row.get
+    this.metrics.get
+  }
+
+  /**
+   * (Java-specific)
+   * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
+   * Only the result of the first action is available. Subsequent actions do 
not modify the result.
+   *
+   * @return the observed metrics as a `java.util.Map[String, Object]`
+   * @throws InterruptedException interrupted while waiting
+   */
+  @throws[InterruptedException]
+  def getAsJavaMap: java.util.Map[String, Object] = {

Review comment:
       Hmmm ... okay. this one might be one problem .. it requires to add 
another method which I actually would like to avoid ... but I guess it's fine




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to