HyukjinKwon commented on a change in pull request #33545:
URL: https://github.com/apache/spark/pull/33545#discussion_r679026047
##########
File path: python/pyspark/sql/observation.py
##########
@@ -112,12 +115,13 @@ def get(self):
Returns
-------
- :class:`Row`
+ :class:`Dict`
Review comment:
```suggestion
dict
```
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
* Get the observed metrics. This waits for the observed dataset to finish
its first action.
* Only the result of the first action is available. Subsequent actions do
not modify the result.
*
- * @return the observed metrics as a [[Row]]
+ * @return the observed metrics as a `Map[String, Any]`
* @throws InterruptedException interrupted while waiting
*/
@throws[InterruptedException]
- def get: Row = {
+ def get: Map[String, Any] = {
synchronized {
// we need to loop as wait might return without us calling notify
//
https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610
- while (this.row.isEmpty) {
+ while (this.metrics.isEmpty) {
wait()
}
}
- this.row.get
+ this.metrics.get
+ }
+
+ /**
+ * (Java-specific)
+ * Get the observed metrics. This waits for the observed dataset to finish
its first action.
+ * Only the result of the first action is available. Subsequent actions do
not modify the result.
+ *
Review comment:
```suggestion
* (Java-specific) Get the observed metrics. This waits for the observed
dataset to finish
* its first action. Only the result of the first action is available.
Subsequent actions do not
* modify the result.
*
```
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
* Get the observed metrics. This waits for the observed dataset to finish
its first action.
* Only the result of the first action is available. Subsequent actions do
not modify the result.
*
Review comment:
```suggestion
* (Scala-specific) Get the observed metrics. This waits for the observed
dataset to finish
* its first action. Only the result of the first action is available.
Subsequent actions do not
* modify the result.
*
```
##########
File path:
sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
##########
@@ -391,6 +390,60 @@ public void testGroupBy() {
Assert.assertEquals(asSet("1a#2", "3foobar#6", "5#10"),
toSet(cogrouped.collectAsList()));
}
+ @Test
+ public void testObservation() {
+ // SPARK-34806: tests the Observation Java API and
Dataset.observe(Observation, Column, Column*)
+ Observation namedObservation = new Observation("named");
+ Observation unnamedObservation = new Observation();
+
+ Dataset<Long> df = spark
+ .range(100)
+ .observe(
+ namedObservation,
+ min(col("id")).as("min_val"),
+ max(col("id")).as("max_val"),
+ sum(col("id")).as("sum_val"),
+ count(when(pmod(col("id"), lit(2)).$eq$eq$eq(0), 1)).as("num_even")
+ )
+ .observe(
+ unnamedObservation,
+ avg(col("id")).cast("int").as("avg_val")
+ );
+
+ df.collect();
+ Map<String, Object> namedMetrics = null;
+ Map<String, Object> unnamedMetrics = null;
+
+ try {
+ namedMetrics = namedObservation.getAsJavaMap();
+ unnamedMetrics = unnamedObservation.getAsJavaMap();
Review comment:
```suggestion
namedMetrics = namedObservation.getAsJava();
unnamedMetrics = unnamedObservation.getAsJava();
```
##########
File path: python/pyspark/sql/observation.py
##########
@@ -112,12 +115,13 @@ def get(self):
Returns
-------
- :class:`Row`
+ :class:`Dict`
the observed metrics
"""
assert self._jo is not None, 'call DataFrame.observe'
- jrow = self._jo.get()
- return self._to_row(jrow)
+ jmap = self._jo.getAsJavaMap()
+ # return a pure Python dict, not a py4j JavaMap
+ return {k: v for k, v in jmap.items()}
Review comment:
can you actually directly return `self._jo.getAsJavaMap()`. That becomes
a plain dictionary IIRC.
##########
File path: python/pyspark/sql/observation.py
##########
@@ -112,12 +115,13 @@ def get(self):
Returns
-------
- :class:`Row`
+ :class:`Dict`
the observed metrics
"""
assert self._jo is not None, 'call DataFrame.observe'
- jrow = self._jo.get()
- return self._to_row(jrow)
+ jmap = self._jo.getAsJavaMap()
Review comment:
```suggestion
jmap = self._jo.getAsJava()
```
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
* Get the observed metrics. This waits for the observed dataset to finish
its first action.
* Only the result of the first action is available. Subsequent actions do
not modify the result.
*
- * @return the observed metrics as a [[Row]]
+ * @return the observed metrics as a `Map[String, Any]`
* @throws InterruptedException interrupted while waiting
*/
@throws[InterruptedException]
- def get: Row = {
+ def get: Map[String, Any] = {
synchronized {
// we need to loop as wait might return without us calling notify
//
https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610
- while (this.row.isEmpty) {
+ while (this.metrics.isEmpty) {
wait()
}
}
- this.row.get
+ this.metrics.get
+ }
+
+ /**
+ * (Java-specific)
+ * Get the observed metrics. This waits for the observed dataset to finish
its first action.
+ * Only the result of the first action is available. Subsequent actions do
not modify the result.
+ *
+ * @return the observed metrics as a `java.util.Map[String, Object]`
+ * @throws InterruptedException interrupted while waiting
+ */
+ @throws[InterruptedException]
+ def getAsJavaMap: java.util.Map[String, Object] = {
Review comment:
```suggestion
def getAsJava: java.util.Map[String, Object] = {
```
##########
File path:
sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
##########
@@ -391,6 +390,60 @@ public void testGroupBy() {
Assert.assertEquals(asSet("1a#2", "3foobar#6", "5#10"),
toSet(cogrouped.collectAsList()));
}
+ @Test
+ public void testObservation() {
+ // SPARK-34806: tests the Observation Java API and
Dataset.observe(Observation, Column, Column*)
+ Observation namedObservation = new Observation("named");
+ Observation unnamedObservation = new Observation();
+
+ Dataset<Long> df = spark
+ .range(100)
+ .observe(
+ namedObservation,
+ min(col("id")).as("min_val"),
+ max(col("id")).as("max_val"),
+ sum(col("id")).as("sum_val"),
+ count(when(pmod(col("id"), lit(2)).$eq$eq$eq(0), 1)).as("num_even")
+ )
+ .observe(
+ unnamedObservation,
+ avg(col("id")).cast("int").as("avg_val")
+ );
+
+ df.collect();
+ Map<String, Object> namedMetrics = null;
+ Map<String, Object> unnamedMetrics = null;
+
+ try {
+ namedMetrics = namedObservation.getAsJavaMap();
+ unnamedMetrics = unnamedObservation.getAsJavaMap();
+ } catch (InterruptedException e) {
+ Assert.fail();
+ }
+ Map<String, Object> expectedNamedMetrics = new HashMap<String, Object>() {{
+ put("min_val", 0L);
+ put("max_val", 99L);
+ put("sum_val", 4950L);
+ put("num_even", 50L);
+ }};
+ Assert.assertEquals(expectedNamedMetrics, namedMetrics);
+
+ Map<String, Object> expectedUnnamedMetrics = new HashMap<String, Object>()
{{
+ put("avg_val", 49);
+ }};
+ Assert.assertEquals(expectedUnnamedMetrics, unnamedMetrics);
+
+ // we can get the result multiple times
+ try {
+ namedMetrics = namedObservation.getAsJavaMap();
+ unnamedMetrics = unnamedObservation.getAsJavaMap();
Review comment:
```suggestion
namedMetrics = namedObservation.getAsJava();
unnamedMetrics = unnamedObservation.getAsJava();
```
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
* Get the observed metrics. This waits for the observed dataset to finish
its first action.
* Only the result of the first action is available. Subsequent actions do
not modify the result.
*
- * @return the observed metrics as a [[Row]]
+ * @return the observed metrics as a `Map[String, Any]`
* @throws InterruptedException interrupted while waiting
*/
@throws[InterruptedException]
- def get: Row = {
+ def get: Map[String, Any] = {
synchronized {
// we need to loop as wait might return without us calling notify
//
https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610
- while (this.row.isEmpty) {
+ while (this.metrics.isEmpty) {
wait()
}
}
- this.row.get
+ this.metrics.get
+ }
+
+ /**
+ * (Java-specific)
+ * Get the observed metrics. This waits for the observed dataset to finish
its first action.
+ * Only the result of the first action is available. Subsequent actions do
not modify the result.
+ *
+ * @return the observed metrics as a `java.util.Map[String, Object]`
+ * @throws InterruptedException interrupted while waiting
+ */
+ @throws[InterruptedException]
+ def getAsJavaMap: java.util.Map[String, Object] = {
Review comment:
Hmmm ... okay. this one might be one problem .. it requires to add
another method which I actually would like to avoid ... but I guess it's fine
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]