[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-07-14 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r669619755



##
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
##
@@ -2374,6 +2375,48 @@ class DataFrameSuite extends QueryTest
 }
   }
 
+  test("SPARK-34806: observation on datasets") {
+val namedObservation = Observation("named")
+val unnamedObservation = Observation()
+
+val df = spark
+  .range(100)
+  .observe(
+namedObservation,
+min($"id").as("min_val"),
+max($"id").as("max_val"),
+sum($"id").as("sum_val"),
+count(when($"id" % 2 === 0, 1)).as("num_even")
+  )
+  .observe(
+unnamedObservation,
+avg($"id").cast("int").as("avg_val")
+  )
+
+def checkMetrics(namedMetric: Row, unnamedMetric: Row): Unit = {
+  assert(namedMetric === Row(0L, 99L, 4950L, 50L))
+  assert(unnamedMetric === Row(49))
+}
+
+// First run
+df.collect()
+checkMetrics(namedObservation.get, unnamedObservation.get)
+// we can get the result multiple times
+checkMetrics(namedObservation.get, unnamedObservation.get)
+
+// an observation can be used only once
+assertThrows[IllegalStateException] {

Review comment:
   since it's a user-facing error, maybe `IllegalArgumentException`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-28 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r659583680



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+
+/**
+ * Helper class to simplify usage of [[Dataset.observe(String, Column, 
Column*)]]:
+ *
+ * {{{
+ *   // Observe row count (rows) and highest id (maxid) in the Dataset while 
writing it
+ *   val observation = Observation("my metrics")
+ *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), 
max($"id").as("maxid"))
+ *   observed_ds.write.parquet("ds.parquet")
+ *   val metrics = observation.get
+ * }}}
+ *
+ * This collects the metrics while the first action is executed on the 
observed dataset. Subsequent
+ * actions do not modify the metrics returned by [[get]]. Retrieval of the 
metric via [[get]]
+ * blocks until the first action has finished and metrics become available.
+ *
+ * This class does not support streaming datasets.
+ *
+ * @param name name of the metric
+ * @since 3.2.0
+ */
+class Observation(name: String) {
+
+  private val listener: ObservationListener = ObservationListener(this)
+
+  @volatile private var sparkSession: Option[SparkSession] = None
+
+  @volatile private var row: Option[Row] = None
+
+  // we use a private object for synchronized { } when accessing this.row
+  // so we have full control on who calls notify
+  private val rowSync: Object = new Object()
+
+  /**
+   * Attaches this observation to the given [[Dataset]] to observe aggregation 
expressions.
+   *
+   * @param ds dataset
+   * @param expr first aggregation expression
+   * @param exprs more aggregation expressions
+   * @tparam T dataset type
+   * @return observed dataset
+   * @throws IllegalArgumentException If this is a streaming Dataset 
(ds.isStreaming == true)
+   */
+  private[spark] def on[T](ds: Dataset[T], expr: Column, exprs: Column*): 
Dataset[T] = {
+if (ds.isStreaming) {
+  throw new IllegalArgumentException("Observation does not support 
streaming Datasets")
+}
+register(ds.sparkSession)
+ds.observe(name, expr, exprs: _*)
+  }
+
+  /**
+   * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
+   * Only the result of the first action is available. Subsequent actions do 
not modify the result.
+   *
+   * @return the observed metrics as a [[Row]]
+   * @throws InterruptedException interrupted while waiting
+   */
+  def get: Row = {
+this.rowSync.synchronized {
+  if (this.row.isEmpty) {
+this.rowSync.wait()

Review comment:
   I searched this a bit more and I think @hvanhovell is right about the 
spurious wakeups: 
https://stackoverflow.com/questions/1050592/do-spurious-wakeups-in-java-actually-happen
   
   We need to use a loop here, or use other locking APIs from the JDK that are 
safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-22 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r656033954



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Helper class to simplify usage of [[Dataset.observe(String, Column, 
Column*)]]:
+ *
+ * {{{
+ *   // Observe row count (rows) and highest id (maxid) in the Dataset while 
writing it
+ *   val observation = Observation("my_metrics")
+ *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), 
max($"id").as("maxid"))
+ *   observed_ds.write.parquet("ds.parquet")
+ *   val metrics = observation.get

Review comment:
   private[spark] SGTM




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-22 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r656032281



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Helper class to simplify usage of [[Dataset.observe(String, Column, 
Column*)]]:
+ *
+ * {{{
+ *   // Observe row count (rows) and highest id (maxid) in the Dataset while 
writing it
+ *   val observation = Observation("my_metrics")
+ *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), 
max($"id").as("maxid"))
+ *   observed_ds.write.parquet("ds.parquet")
+ *   val metrics = observation.get
+ * }}}
+ *
+ * This collects the metrics while the first action is executed on the 
obseerved dataset. Subsequent
+ * actions do not modify the metrics returned by 
[[org.apache.spark.sql.Observation.get]]. Retrieval
+ * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the 
first action has
+ * finished and metrics become available. You can add a timeout to that 
blocking via
+ * [[org.apache.spark.sql.Observation.waitCompleted]]:
+ *
+ * {{{
+ *   if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) {
+ * observation.get
+ *   }
+ * }}}
+ *
+ * This class does not support streaming datasets.
+ *
+ * @param name name of the metric
+ * @since 3.2.0
+ */
+class Observation(name: String) {
+
+  private val listener: ObservationListener = ObservationListener(this)
+
+  private var sparkSession: Option[SparkSession] = None
+
+  @volatile private var row: Option[Row] = None
+
+  /**
+   * Attaches this observation to the given [[Dataset]] to observe aggregation 
expressions.
+   *
+   * @param ds dataset
+   * @param expr first aggregation expression
+   * @param exprs more aggregation expressions
+   * @tparam T dataset type
+   * @return observed dataset
+   * @throws IllegalArgumentException If this is a streaming Dataset 
(ds.isStreaming == true)
+   */
+  def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = {
+if (ds.isStreaming) {
+  throw new IllegalArgumentException("Observation does not support 
streaming Datasets")
+}
+register(ds.sparkSession)
+ds.observe(name, expr, exprs: _*)
+  }
+
+  /**
+   * Waits for the first action on the observed dataset to complete and 
returns true.
+   * The result is then available through the get method.
+   * This method times out after the given amount of time returning false.
+   *
+   * @param time timeout
+   * @param unit timeout time unit
+   * @return true if action completed within timeout, false otherwise
+   * @throws InterruptedException interrupted while waiting
+   */
+  def waitCompleted(time: Long, unit: TimeUnit): Boolean = 
waitCompleted(Some(unit.toMillis(time)))
+
+  /**
+   * Get the observed metrics. This waits until the observed dataset finishes 
its first action.
+   * If you want to wait for the result and provide a timeout, use 
[[waitCompleted]]. Only the
+   * result of the first action is available. Subsequent actions do not modify 
the result.
+   *
+   * @return the observed metrics as a [[Row]]
+   * @throws InterruptedException interrupted while waiting
+   */
+  def get: Row = {
+assert(waitCompleted(None), "waitCompleted without timeout returned false")
+row.get
+  }
+
+  private def waitCompleted(millis: Option[Long]): Boolean = {
+synchronized {
+  if (row.isEmpty) {
+if (millis.isDefined) {
+  this.wait(millis.get)
+} else {
+  this.wait()
+}
+  }
+  row.isDefined
+}
+  }
+
+  private def register(sparkSession: SparkSession): Unit = {
+// makes this class thread-safe:
+// only the first thread entering this block can set sparkSession
+// all other threads will see the exception, because it is only allowed to 
do this once
+synchronized {
+  if (this.sparkSession.isDefined) {
+throw new 

[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-22 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r656031102



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Helper class to simplify usage of [[Dataset.observe(String, Column, 
Column*)]]:
+ *
+ * {{{
+ *   // Observe row count (rows) and highest id (maxid) in the Dataset while 
writing it
+ *   val observation = Observation("my_metrics")
+ *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), 
max($"id").as("maxid"))
+ *   observed_ds.write.parquet("ds.parquet")
+ *   val metrics = observation.get
+ * }}}
+ *
+ * This collects the metrics while the first action is executed on the 
obseerved dataset. Subsequent
+ * actions do not modify the metrics returned by 
[[org.apache.spark.sql.Observation.get]]. Retrieval
+ * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the 
first action has
+ * finished and metrics become available. You can add a timeout to that 
blocking via
+ * [[org.apache.spark.sql.Observation.waitCompleted]]:
+ *
+ * {{{
+ *   if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) {
+ * observation.get
+ *   }
+ * }}}
+ *
+ * This class does not support streaming datasets.
+ *
+ * @param name name of the metric
+ * @since 3.2.0
+ */
+class Observation(name: String) {

Review comment:
   SGTM




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-22 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r655400419



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Helper class to simplify usage of [[Dataset.observe(String, Column, 
Column*)]]:
+ *
+ * {{{
+ *   // Observe row count (rows) and highest id (maxid) in the Dataset while 
writing it
+ *   val observation = Observation("my_metrics")
+ *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), 
max($"id").as("maxid"))
+ *   observed_ds.write.parquet("ds.parquet")
+ *   val metrics = observation.get

Review comment:
   shall we also mention the `on` API here?

##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Helper class to simplify usage of [[Dataset.observe(String, Column, 
Column*)]]:
+ *
+ * {{{
+ *   // Observe row count (rows) and highest id (maxid) in the Dataset while 
writing it
+ *   val observation = Observation("my_metrics")
+ *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), 
max($"id").as("maxid"))
+ *   observed_ds.write.parquet("ds.parquet")
+ *   val metrics = observation.get
+ * }}}
+ *
+ * This collects the metrics while the first action is executed on the 
obseerved dataset. Subsequent
+ * actions do not modify the metrics returned by 
[[org.apache.spark.sql.Observation.get]]. Retrieval
+ * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the 
first action has
+ * finished and metrics become available. You can add a timeout to that 
blocking via
+ * [[org.apache.spark.sql.Observation.waitCompleted]]:
+ *
+ * {{{
+ *   if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) {
+ * observation.get
+ *   }
+ * }}}
+ *
+ * This class does not support streaming datasets.
+ *
+ * @param name name of the metric
+ * @since 3.2.0
+ */
+class Observation(name: String) {
+
+  private val listener: ObservationListener = ObservationListener(this)
+
+  private var sparkSession: Option[SparkSession] = None
+
+  @volatile private var row: Option[Row] = None
+
+  /**
+   * Attaches this observation to the given [[Dataset]] to observe aggregation 
expressions.
+   *
+   * @param ds dataset
+   * @param expr first aggregation expression
+   * @param exprs more aggregation expressions
+   * @tparam T dataset type
+   * @return observed dataset
+   * @throws IllegalArgumentException If this is a streaming Dataset 
(ds.isStreaming == true)
+   */
+  def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = {
+if (ds.isStreaming) {
+  throw new IllegalArgumentException("Observation does not support 
streaming Datasets")
+}
+register(ds.sparkSession)
+ds.observe(name, expr, exprs: _*)
+  }
+
+  /**
+   * Waits for the first action on the observed dataset to complete and 

[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-21 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r655423841



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Helper class to simplify usage of [[Dataset.observe(String, Column, 
Column*)]]:
+ *
+ * {{{
+ *   // Observe row count (rows) and highest id (maxid) in the Dataset while 
writing it
+ *   val observation = Observation("my_metrics")
+ *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), 
max($"id").as("maxid"))
+ *   observed_ds.write.parquet("ds.parquet")
+ *   val metrics = observation.get
+ * }}}
+ *
+ * This collects the metrics while the first action is executed on the 
obseerved dataset. Subsequent
+ * actions do not modify the metrics returned by 
[[org.apache.spark.sql.Observation.get]]. Retrieval
+ * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the 
first action has
+ * finished and metrics become available. You can add a timeout to that 
blocking via
+ * [[org.apache.spark.sql.Observation.waitCompleted]]:
+ *
+ * {{{
+ *   if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) {
+ * observation.get
+ *   }
+ * }}}
+ *
+ * This class does not support streaming datasets.
+ *
+ * @param name name of the metric
+ * @since 3.2.0
+ */
+class Observation(name: String) {

Review comment:
   That's a good point! We can simply leverage the standard `Future` API to 
represent this async result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-21 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r655406702



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Helper class to simplify usage of [[Dataset.observe(String, Column, 
Column*)]]:
+ *
+ * {{{
+ *   // Observe row count (rows) and highest id (maxid) in the Dataset while 
writing it
+ *   val observation = Observation("my_metrics")
+ *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), 
max($"id").as("maxid"))
+ *   observed_ds.write.parquet("ds.parquet")
+ *   val metrics = observation.get
+ * }}}
+ *
+ * This collects the metrics while the first action is executed on the 
obseerved dataset. Subsequent
+ * actions do not modify the metrics returned by 
[[org.apache.spark.sql.Observation.get]]. Retrieval
+ * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the 
first action has
+ * finished and metrics become available. You can add a timeout to that 
blocking via
+ * [[org.apache.spark.sql.Observation.waitCompleted]]:
+ *
+ * {{{
+ *   if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) {
+ * observation.get
+ *   }
+ * }}}
+ *
+ * This class does not support streaming datasets.
+ *
+ * @param name name of the metric
+ * @since 3.2.0
+ */
+class Observation(name: String) {
+
+  private val listener: ObservationListener = ObservationListener(this)
+
+  private var sparkSession: Option[SparkSession] = None
+
+  @volatile private var row: Option[Row] = None
+
+  /**
+   * Attaches this observation to the given [[Dataset]] to observe aggregation 
expressions.
+   *
+   * @param ds dataset
+   * @param expr first aggregation expression
+   * @param exprs more aggregation expressions
+   * @tparam T dataset type
+   * @return observed dataset
+   * @throws IllegalArgumentException If this is a streaming Dataset 
(ds.isStreaming == true)
+   */
+  def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = {
+if (ds.isStreaming) {
+  throw new IllegalArgumentException("Observation does not support 
streaming Datasets")
+}
+register(ds.sparkSession)
+ds.observe(name, expr, exprs: _*)
+  }
+
+  /**
+   * Waits for the first action on the observed dataset to complete and 
returns true.
+   * The result is then available through the get method.
+   * This method times out after the given amount of time returning false.
+   *
+   * @param time timeout
+   * @param unit timeout time unit
+   * @return true if action completed within timeout, false otherwise
+   * @throws InterruptedException interrupted while waiting
+   */
+  def waitCompleted(time: Long, unit: TimeUnit): Boolean = 
waitCompleted(Some(unit.toMillis(time)))
+
+  /**
+   * Get the observed metrics. This waits until the observed dataset finishes 
its first action.
+   * If you want to wait for the result and provide a timeout, use 
[[waitCompleted]]. Only the
+   * result of the first action is available. Subsequent actions do not modify 
the result.
+   *
+   * @return the observed metrics as a [[Row]]
+   * @throws InterruptedException interrupted while waiting
+   */
+  def get: Row = {
+assert(waitCompleted(None), "waitCompleted without timeout returned false")
+row.get
+  }
+
+  private def waitCompleted(millis: Option[Long]): Boolean = {

Review comment:
   Actually `Object.wait(0)` means wait forever.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-21 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r655401634



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Helper class to simplify usage of [[Dataset.observe(String, Column, 
Column*)]]:
+ *
+ * {{{
+ *   // Observe row count (rows) and highest id (maxid) in the Dataset while 
writing it
+ *   val observation = Observation("my_metrics")
+ *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), 
max($"id").as("maxid"))
+ *   observed_ds.write.parquet("ds.parquet")
+ *   val metrics = observation.get
+ * }}}
+ *
+ * This collects the metrics while the first action is executed on the 
obseerved dataset. Subsequent
+ * actions do not modify the metrics returned by 
[[org.apache.spark.sql.Observation.get]]. Retrieval
+ * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the 
first action has
+ * finished and metrics become available. You can add a timeout to that 
blocking via
+ * [[org.apache.spark.sql.Observation.waitCompleted]]:
+ *
+ * {{{
+ *   if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) {
+ * observation.get
+ *   }
+ * }}}
+ *
+ * This class does not support streaming datasets.
+ *
+ * @param name name of the metric
+ * @since 3.2.0
+ */
+class Observation(name: String) {
+
+  private val listener: ObservationListener = ObservationListener(this)
+
+  private var sparkSession: Option[SparkSession] = None
+
+  @volatile private var row: Option[Row] = None
+
+  /**
+   * Attaches this observation to the given [[Dataset]] to observe aggregation 
expressions.
+   *
+   * @param ds dataset
+   * @param expr first aggregation expression
+   * @param exprs more aggregation expressions
+   * @tparam T dataset type
+   * @return observed dataset
+   * @throws IllegalArgumentException If this is a streaming Dataset 
(ds.isStreaming == true)
+   */
+  def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = {
+if (ds.isStreaming) {
+  throw new IllegalArgumentException("Observation does not support 
streaming Datasets")
+}
+register(ds.sparkSession)
+ds.observe(name, expr, exprs: _*)
+  }
+
+  /**
+   * Waits for the first action on the observed dataset to complete and 
returns true.
+   * The result is then available through the get method.
+   * This method times out after the given amount of time returning false.
+   *
+   * @param time timeout
+   * @param unit timeout time unit
+   * @return true if action completed within timeout, false otherwise
+   * @throws InterruptedException interrupted while waiting
+   */
+  def waitCompleted(time: Long, unit: TimeUnit): Boolean = 
waitCompleted(Some(unit.toMillis(time)))
+
+  /**
+   * Get the observed metrics. This waits until the observed dataset finishes 
its first action.
+   * If you want to wait for the result and provide a timeout, use 
[[waitCompleted]]. Only the
+   * result of the first action is available. Subsequent actions do not modify 
the result.
+   *
+   * @return the observed metrics as a [[Row]]
+   * @throws InterruptedException interrupted while waiting
+   */
+  def get: Row = {
+assert(waitCompleted(None), "waitCompleted without timeout returned false")
+row.get
+  }
+
+  private def waitCompleted(millis: Option[Long]): Boolean = {

Review comment:
   shall we use `-1` to indicate no timeout?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-21 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r655400419



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Helper class to simplify usage of [[Dataset.observe(String, Column, 
Column*)]]:
+ *
+ * {{{
+ *   // Observe row count (rows) and highest id (maxid) in the Dataset while 
writing it
+ *   val observation = Observation("my_metrics")
+ *   val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), 
max($"id").as("maxid"))
+ *   observed_ds.write.parquet("ds.parquet")
+ *   val metrics = observation.get

Review comment:
   shall we also mention the `on` API here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-18 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r653798159



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Not thread-safe.
+ * @param name
+ * @param sparkSession
+ */
+class Observation(name: String) {
+
+  private val listener: ObservationListener = ObservationListener(this)
+
+  private var sparkSession: Option[SparkSession] = None
+
+  @volatile private var row: Option[Row] = None
+
+  /**
+   * Attach this observation to the given Dataset.
+   *
+   * @param ds dataset
+   * @tparam T dataset type
+   * @return observed dataset
+   */
+  def on[T](ds: Dataset[T])(expr: Column, exprs: Column*): Dataset[T] = {
+if (ds.isStreaming) {
+  throw new IllegalArgumentException("Observation does not support 
streaming Datasets")
+}
+register(ds.sparkSession)
+ds.observe(name, expr, exprs: _*)
+  }
+
+  /**
+   * Wait for the first action on the observed dataset to complete and returns 
true.
+   * This method times out after the given amount of time and returns false.
+   *
+   * @param time timeout
+   * @param unit timeout time unit
+   * @return true if action complete within timeout, false on timeout
+   */
+  def waitCompleted(time: Long, unit: TimeUnit): Boolean = 
waitCompleted(Some(time), unit)
+
+  /**
+   * Get the observation results. This waits until the observed dataset 
finishes its first action.
+   * If you want to wait for the result and provide a timeout, use 
waitCompleted.
+   * Only the result of the first action is available. Subsequent actions do 
not modify the result.
+   */
+  def get: Row = {
+assert(waitCompleted(None, TimeUnit.SECONDS), "waitCompleted without 
timeout returned false")
+assert(row.isDefined, "waitCompleted without timeout returned while result 
is still None")

Review comment:
   This is not needed, `waitCompleted` just returns `row.isDefined`

##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Not thread-safe.
+ * @param name
+ * @param sparkSession
+ */
+class Observation(name: String) {
+
+  private val listener: ObservationListener = ObservationListener(this)
+
+  private var sparkSession: Option[SparkSession] = None
+
+  @volatile private var row: Option[Row] = None
+
+  /**
+   * Attach this observation to the given Dataset.
+   *
+   * @param ds dataset
+   * @tparam T dataset type
+   * @return observed dataset
+   */
+  def on[T](ds: Dataset[T])(expr: Column, exprs: Column*): Dataset[T] = {
+if (ds.isStreaming) {
+  throw new IllegalArgumentException("Observation does not support 
streaming Datasets")
+}
+register(ds.sparkSession)
+ds.observe(name, expr, exprs: _*)
+  }
+
+  /**
+   * Wait for the first action on the observed dataset to complete and returns 
true.
+   * This 

[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-15 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r652340898



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Not thread-safe.
+ * @param name
+ * @param sparkSession
+ */
+class Observation(name: String) {

Review comment:
   then shall we have a variant of `waitCompleted` that w/o a timeout?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-15 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r651797114



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Not thread-safe.
+ * @param name
+ * @param sparkSession
+ */
+class Observation(name: String) {
+
+  private val lock: Lock = new ReentrantLock()
+  private val completed: Condition = lock.newCondition()
+  private val listener: ObservationListener = ObservationListener(this)
+
+  private var sparkSession: Option[SparkSession] = None
+
+  @transient private var row: Option[Row] = None
+
+  /**
+   * Attach this observation to the given Dataset.
+   * Remember to call `close()` when the observation is done.
+   *
+   * @param ds dataset
+   * @tparam T dataset type
+   * @return observed dataset
+   */
+  def on[T](ds: Dataset[T])(expr: Column, exprs: Column*): Dataset[T] = {
+if (ds.isStreaming) {
+  throw new IllegalArgumentException("Observation does not support 
streaming Datasets")
+}
+register(ds.sparkSession)
+ds.observe(name, expr, exprs: _*)
+  }
+
+  /**
+   * Get the observation results. Waits for the first action on the observed 
dataset to complete.
+   * After calling `reset()`, waits for completion of the next action on the 
observed dataset.
+   */
+  def get: Row = option().get
+
+  /**
+   * Get the observation results. Waits for the first action on the observed 
dataset to complete.
+   * This method times out waiting for the action after the given amount of 
time.
+   * After calling `reset()`, waits for completion of the next action on the 
observed dataset.
+   *
+   * @param time timeout
+   * @param unit timeout time unit
+   * @return observation row as an Option, or None on timeout
+   */
+  def option(time: Long, unit: TimeUnit): Option[Row] = option(Some(time), 
unit)
+
+  /**
+   * Wait for the first action on the observed dataset to complete.
+   * When the time parameter is given, this method times out waiting for the 
action.
+   * After calling `reset()`, waits for completion of the next action on the 
observed dataset.
+   *
+   * @param time timeout
+   * @param unit timeout time unit
+   * @return true if action complete within timeout, false on timeout
+   */
+  def waitCompleted(time: Option[Long] = None, unit: TimeUnit = 
TimeUnit.MILLISECONDS): Boolean = {
+lock.lock()
+try {
+  if (row.isEmpty) {
+if (time.isDefined) {
+  completed.await(time.get, unit)
+} else {
+  completed.await()
+}
+  }
+  row.isDefined
+} finally {
+  lock.unlock()
+}
+  }
+
+  /**
+   * Wait for the first action on the observed dataset to complete.
+   * After calling `reset()`, waits for completion of the next action on the 
observed dataset.
+   *
+   * @param time timeout
+   * @param unit timeout time unit
+   * @return true if action complete within timeout, false on timeout
+   */
+  def waitCompleted(time: Long, unit: TimeUnit): Boolean = 
waitCompleted(Some(time), unit)
+
+  /**
+   * Reset the observation. This deletes the observation and allows to wait 
for completion
+   * of the next action called on the observed dataset. Not resetting the 
observation before
+   * attempting to retrieve the next action's results via get, option or 
waitCompleted is not
+   * guaranteed to work.
+   */
+  def reset(): Unit = {

Review comment:
   I'm a little worried about this API. The listener is registered and the 
result of the first observation may come at any time later.
   
   Shall we let the `Observation` instance be used only once and not provide 
`reset` API? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please 

[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-15 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r651793629



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Not thread-safe.
+ * @param name
+ * @param sparkSession
+ */
+class Observation(name: String) {

Review comment:
   I'm thinking about what APIs we should provide here. We should at least 
have the `get` API to get the result and wait until it's available. We should 
also provide APIs with a timeout. Some ideas:
   1. `def get(time: Long, unit: TimeUnit)`, which is similar to 
[LinkedBlockingQueue.poll](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html#poll(long,%20java.util.concurrent.TimeUnit))
   2. `waitCompleted` and users can call `get` later if it returns true.
   
   It looks to me that `def get(time: Long, unit: TimeUnit)` should be good 
enough and people can do try-catch if they want to handle timeout differently. 
It's better to start with less public APIs as we can always add later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-10 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r649262392



##
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
##
@@ -2374,6 +2376,78 @@ class DataFrameSuite extends QueryTest
 }
   }
 
+  test("SPARK-34806: observation on datasets") {
+val namedObservation = Observation("named")
+val unnamedObservation = Observation()
+
+try {
+  val df = spark
+.range(100)
+.observe(
+  namedObservation,
+  min($"id").as("min_val"),
+  max($"id").as("max_val"),
+  sum($"id").as("sum_val"),
+  count(when($"id" % 2 === 0, 1)).as("num_even")
+)
+.observe(
+  unnamedObservation,
+  avg($"id").cast("int").as("avg_val")
+)
+
+  def checkMetrics(namedMetric: Row, unnamedMetric: Row): Unit = {
+assert(namedMetric === Row(0L, 99L, 4950L, 50L))
+assert(unnamedMetric === Row(49))
+  }
+
+  // Before first run observation times out
+  assert(namedObservation.waitCompleted(100, TimeUnit.MILLISECONDS) === 
false)
+  assert(unnamedObservation.waitCompleted(100, TimeUnit.MILLISECONDS) === 
false)
+
+  // First run
+  df.collect()
+  assert(namedObservation.waitCompleted(1, TimeUnit.SECONDS))
+  assert(unnamedObservation.waitCompleted(1, TimeUnit.SECONDS))
+  checkMetrics(namedObservation.get, unnamedObservation.get)
+  namedObservation.reset()
+  unnamedObservation.reset()

Review comment:
   what if we don't call `reset` and start the second run?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-06-10 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r649259169



##
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+/**
+ * Not thread-safe.
+ * @param name
+ * @param sparkSession
+ */
+case class Observation(name: String) {

Review comment:
   I'd avoid using case class in public APIs, as it provides too many APIs: 
apply, unapply, getters, copy, etc. and it's very hard to keep backward 
compatibility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-03-29 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r603380474



##
File path: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
##
@@ -683,6 +683,12 @@ class SparkSession private(
 ret
   }
 
+  def observation(expr: Column, exprs: Column*): Observation =

Review comment:
   Yea this is arguable. Personally, I'd like to see a new API similar to 
the existing `df.observe(...)`, but the new one here is very different:
   ```
   val ob = spark.observe(...)
   df.transform(ob.on).
   ```
   
   Let's get more feedbacks. cc @srowen @HyukjinKwon @dongjoon-hyun @maropu 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-03-26 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r602072774



##
File path: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
##
@@ -683,6 +683,12 @@ class SparkSession private(
 ret
   }
 
+  def observation(expr: Column, exprs: Column*): Observation =

Review comment:
   It's a bit awkward to build an `Observation` with columns. The 
`Observation` looks like can be used with different DataFrames, but it is not 
because the columns are from a certain DataFrame.
   
   How about this:
   ```
   val observation = Observation("the name")
   df1.observe(observation, expressions...)...
   val res1 = observation.get
   
   df2.observe(observation, expressions...)...
   val res2 = observation.get
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe

2021-03-22 Thread GitBox


cloud-fan commented on a change in pull request #31905:
URL: https://github.com/apache/spark/pull/31905#discussion_r598537187



##
File path: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
##
@@ -683,6 +683,12 @@ class SparkSession private(
 ret
   }
 
+  def observation(expr: Column, exprs: Column*): Observation =

Review comment:
   Should this be a DataFrame API? A common pattern in pyspark is 
`df.select(df.my_col)`, but it's weird to see `spark.observation(df.my_col)`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org