[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r669619755 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ## @@ -2374,6 +2375,48 @@ class DataFrameSuite extends QueryTest } } + test("SPARK-34806: observation on datasets") { +val namedObservation = Observation("named") +val unnamedObservation = Observation() + +val df = spark + .range(100) + .observe( +namedObservation, +min($"id").as("min_val"), +max($"id").as("max_val"), +sum($"id").as("sum_val"), +count(when($"id" % 2 === 0, 1)).as("num_even") + ) + .observe( +unnamedObservation, +avg($"id").cast("int").as("avg_val") + ) + +def checkMetrics(namedMetric: Row, unnamedMetric: Row): Unit = { + assert(namedMetric === Row(0L, 99L, 4950L, 50L)) + assert(unnamedMetric === Row(49)) +} + +// First run +df.collect() +checkMetrics(namedObservation.get, unnamedObservation.get) +// we can get the result multiple times +checkMetrics(namedObservation.get, unnamedObservation.get) + +// an observation can be used only once +assertThrows[IllegalStateException] { Review comment: since it's a user-facing error, maybe `IllegalArgumentException`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r659583680 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + + +/** + * Helper class to simplify usage of [[Dataset.observe(String, Column, Column*)]]: + * + * {{{ + * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it + * val observation = Observation("my metrics") + * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid")) + * observed_ds.write.parquet("ds.parquet") + * val metrics = observation.get + * }}} + * + * This collects the metrics while the first action is executed on the observed dataset. Subsequent + * actions do not modify the metrics returned by [[get]]. Retrieval of the metric via [[get]] + * blocks until the first action has finished and metrics become available. + * + * This class does not support streaming datasets. + * + * @param name name of the metric + * @since 3.2.0 + */ +class Observation(name: String) { + + private val listener: ObservationListener = ObservationListener(this) + + @volatile private var sparkSession: Option[SparkSession] = None + + @volatile private var row: Option[Row] = None + + // we use a private object for synchronized { } when accessing this.row + // so we have full control on who calls notify + private val rowSync: Object = new Object() + + /** + * Attaches this observation to the given [[Dataset]] to observe aggregation expressions. + * + * @param ds dataset + * @param expr first aggregation expression + * @param exprs more aggregation expressions + * @tparam T dataset type + * @return observed dataset + * @throws IllegalArgumentException If this is a streaming Dataset (ds.isStreaming == true) + */ + private[spark] def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = { +if (ds.isStreaming) { + throw new IllegalArgumentException("Observation does not support streaming Datasets") +} +register(ds.sparkSession) +ds.observe(name, expr, exprs: _*) + } + + /** + * Get the observed metrics. This waits for the observed dataset to finish its first action. + * Only the result of the first action is available. Subsequent actions do not modify the result. + * + * @return the observed metrics as a [[Row]] + * @throws InterruptedException interrupted while waiting + */ + def get: Row = { +this.rowSync.synchronized { + if (this.row.isEmpty) { +this.rowSync.wait() Review comment: I searched this a bit more and I think @hvanhovell is right about the spurious wakeups: https://stackoverflow.com/questions/1050592/do-spurious-wakeups-in-java-actually-happen We need to use a loop here, or use other locking APIs from the JDK that are safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r656033954 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Helper class to simplify usage of [[Dataset.observe(String, Column, Column*)]]: + * + * {{{ + * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it + * val observation = Observation("my_metrics") + * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid")) + * observed_ds.write.parquet("ds.parquet") + * val metrics = observation.get Review comment: private[spark] SGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r656032281 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Helper class to simplify usage of [[Dataset.observe(String, Column, Column*)]]: + * + * {{{ + * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it + * val observation = Observation("my_metrics") + * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid")) + * observed_ds.write.parquet("ds.parquet") + * val metrics = observation.get + * }}} + * + * This collects the metrics while the first action is executed on the obseerved dataset. Subsequent + * actions do not modify the metrics returned by [[org.apache.spark.sql.Observation.get]]. Retrieval + * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the first action has + * finished and metrics become available. You can add a timeout to that blocking via + * [[org.apache.spark.sql.Observation.waitCompleted]]: + * + * {{{ + * if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) { + * observation.get + * } + * }}} + * + * This class does not support streaming datasets. + * + * @param name name of the metric + * @since 3.2.0 + */ +class Observation(name: String) { + + private val listener: ObservationListener = ObservationListener(this) + + private var sparkSession: Option[SparkSession] = None + + @volatile private var row: Option[Row] = None + + /** + * Attaches this observation to the given [[Dataset]] to observe aggregation expressions. + * + * @param ds dataset + * @param expr first aggregation expression + * @param exprs more aggregation expressions + * @tparam T dataset type + * @return observed dataset + * @throws IllegalArgumentException If this is a streaming Dataset (ds.isStreaming == true) + */ + def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = { +if (ds.isStreaming) { + throw new IllegalArgumentException("Observation does not support streaming Datasets") +} +register(ds.sparkSession) +ds.observe(name, expr, exprs: _*) + } + + /** + * Waits for the first action on the observed dataset to complete and returns true. + * The result is then available through the get method. + * This method times out after the given amount of time returning false. + * + * @param time timeout + * @param unit timeout time unit + * @return true if action completed within timeout, false otherwise + * @throws InterruptedException interrupted while waiting + */ + def waitCompleted(time: Long, unit: TimeUnit): Boolean = waitCompleted(Some(unit.toMillis(time))) + + /** + * Get the observed metrics. This waits until the observed dataset finishes its first action. + * If you want to wait for the result and provide a timeout, use [[waitCompleted]]. Only the + * result of the first action is available. Subsequent actions do not modify the result. + * + * @return the observed metrics as a [[Row]] + * @throws InterruptedException interrupted while waiting + */ + def get: Row = { +assert(waitCompleted(None), "waitCompleted without timeout returned false") +row.get + } + + private def waitCompleted(millis: Option[Long]): Boolean = { +synchronized { + if (row.isEmpty) { +if (millis.isDefined) { + this.wait(millis.get) +} else { + this.wait() +} + } + row.isDefined +} + } + + private def register(sparkSession: SparkSession): Unit = { +// makes this class thread-safe: +// only the first thread entering this block can set sparkSession +// all other threads will see the exception, because it is only allowed to do this once +synchronized { + if (this.sparkSession.isDefined) { +throw new
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r656031102 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Helper class to simplify usage of [[Dataset.observe(String, Column, Column*)]]: + * + * {{{ + * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it + * val observation = Observation("my_metrics") + * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid")) + * observed_ds.write.parquet("ds.parquet") + * val metrics = observation.get + * }}} + * + * This collects the metrics while the first action is executed on the obseerved dataset. Subsequent + * actions do not modify the metrics returned by [[org.apache.spark.sql.Observation.get]]. Retrieval + * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the first action has + * finished and metrics become available. You can add a timeout to that blocking via + * [[org.apache.spark.sql.Observation.waitCompleted]]: + * + * {{{ + * if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) { + * observation.get + * } + * }}} + * + * This class does not support streaming datasets. + * + * @param name name of the metric + * @since 3.2.0 + */ +class Observation(name: String) { Review comment: SGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r655400419 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Helper class to simplify usage of [[Dataset.observe(String, Column, Column*)]]: + * + * {{{ + * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it + * val observation = Observation("my_metrics") + * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid")) + * observed_ds.write.parquet("ds.parquet") + * val metrics = observation.get Review comment: shall we also mention the `on` API here? ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Helper class to simplify usage of [[Dataset.observe(String, Column, Column*)]]: + * + * {{{ + * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it + * val observation = Observation("my_metrics") + * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid")) + * observed_ds.write.parquet("ds.parquet") + * val metrics = observation.get + * }}} + * + * This collects the metrics while the first action is executed on the obseerved dataset. Subsequent + * actions do not modify the metrics returned by [[org.apache.spark.sql.Observation.get]]. Retrieval + * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the first action has + * finished and metrics become available. You can add a timeout to that blocking via + * [[org.apache.spark.sql.Observation.waitCompleted]]: + * + * {{{ + * if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) { + * observation.get + * } + * }}} + * + * This class does not support streaming datasets. + * + * @param name name of the metric + * @since 3.2.0 + */ +class Observation(name: String) { + + private val listener: ObservationListener = ObservationListener(this) + + private var sparkSession: Option[SparkSession] = None + + @volatile private var row: Option[Row] = None + + /** + * Attaches this observation to the given [[Dataset]] to observe aggregation expressions. + * + * @param ds dataset + * @param expr first aggregation expression + * @param exprs more aggregation expressions + * @tparam T dataset type + * @return observed dataset + * @throws IllegalArgumentException If this is a streaming Dataset (ds.isStreaming == true) + */ + def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = { +if (ds.isStreaming) { + throw new IllegalArgumentException("Observation does not support streaming Datasets") +} +register(ds.sparkSession) +ds.observe(name, expr, exprs: _*) + } + + /** + * Waits for the first action on the observed dataset to complete and
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r655423841 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Helper class to simplify usage of [[Dataset.observe(String, Column, Column*)]]: + * + * {{{ + * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it + * val observation = Observation("my_metrics") + * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid")) + * observed_ds.write.parquet("ds.parquet") + * val metrics = observation.get + * }}} + * + * This collects the metrics while the first action is executed on the obseerved dataset. Subsequent + * actions do not modify the metrics returned by [[org.apache.spark.sql.Observation.get]]. Retrieval + * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the first action has + * finished and metrics become available. You can add a timeout to that blocking via + * [[org.apache.spark.sql.Observation.waitCompleted]]: + * + * {{{ + * if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) { + * observation.get + * } + * }}} + * + * This class does not support streaming datasets. + * + * @param name name of the metric + * @since 3.2.0 + */ +class Observation(name: String) { Review comment: That's a good point! We can simply leverage the standard `Future` API to represent this async result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r655406702 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Helper class to simplify usage of [[Dataset.observe(String, Column, Column*)]]: + * + * {{{ + * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it + * val observation = Observation("my_metrics") + * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid")) + * observed_ds.write.parquet("ds.parquet") + * val metrics = observation.get + * }}} + * + * This collects the metrics while the first action is executed on the obseerved dataset. Subsequent + * actions do not modify the metrics returned by [[org.apache.spark.sql.Observation.get]]. Retrieval + * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the first action has + * finished and metrics become available. You can add a timeout to that blocking via + * [[org.apache.spark.sql.Observation.waitCompleted]]: + * + * {{{ + * if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) { + * observation.get + * } + * }}} + * + * This class does not support streaming datasets. + * + * @param name name of the metric + * @since 3.2.0 + */ +class Observation(name: String) { + + private val listener: ObservationListener = ObservationListener(this) + + private var sparkSession: Option[SparkSession] = None + + @volatile private var row: Option[Row] = None + + /** + * Attaches this observation to the given [[Dataset]] to observe aggregation expressions. + * + * @param ds dataset + * @param expr first aggregation expression + * @param exprs more aggregation expressions + * @tparam T dataset type + * @return observed dataset + * @throws IllegalArgumentException If this is a streaming Dataset (ds.isStreaming == true) + */ + def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = { +if (ds.isStreaming) { + throw new IllegalArgumentException("Observation does not support streaming Datasets") +} +register(ds.sparkSession) +ds.observe(name, expr, exprs: _*) + } + + /** + * Waits for the first action on the observed dataset to complete and returns true. + * The result is then available through the get method. + * This method times out after the given amount of time returning false. + * + * @param time timeout + * @param unit timeout time unit + * @return true if action completed within timeout, false otherwise + * @throws InterruptedException interrupted while waiting + */ + def waitCompleted(time: Long, unit: TimeUnit): Boolean = waitCompleted(Some(unit.toMillis(time))) + + /** + * Get the observed metrics. This waits until the observed dataset finishes its first action. + * If you want to wait for the result and provide a timeout, use [[waitCompleted]]. Only the + * result of the first action is available. Subsequent actions do not modify the result. + * + * @return the observed metrics as a [[Row]] + * @throws InterruptedException interrupted while waiting + */ + def get: Row = { +assert(waitCompleted(None), "waitCompleted without timeout returned false") +row.get + } + + private def waitCompleted(millis: Option[Long]): Boolean = { Review comment: Actually `Object.wait(0)` means wait forever. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r655401634 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Helper class to simplify usage of [[Dataset.observe(String, Column, Column*)]]: + * + * {{{ + * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it + * val observation = Observation("my_metrics") + * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid")) + * observed_ds.write.parquet("ds.parquet") + * val metrics = observation.get + * }}} + * + * This collects the metrics while the first action is executed on the obseerved dataset. Subsequent + * actions do not modify the metrics returned by [[org.apache.spark.sql.Observation.get]]. Retrieval + * of the metric via [[org.apache.spark.sql.Observation.get]] blocks until the first action has + * finished and metrics become available. You can add a timeout to that blocking via + * [[org.apache.spark.sql.Observation.waitCompleted]]: + * + * {{{ + * if (observation.waitCompleted(100, TimeUnit.MILLISECONDS)) { + * observation.get + * } + * }}} + * + * This class does not support streaming datasets. + * + * @param name name of the metric + * @since 3.2.0 + */ +class Observation(name: String) { + + private val listener: ObservationListener = ObservationListener(this) + + private var sparkSession: Option[SparkSession] = None + + @volatile private var row: Option[Row] = None + + /** + * Attaches this observation to the given [[Dataset]] to observe aggregation expressions. + * + * @param ds dataset + * @param expr first aggregation expression + * @param exprs more aggregation expressions + * @tparam T dataset type + * @return observed dataset + * @throws IllegalArgumentException If this is a streaming Dataset (ds.isStreaming == true) + */ + def on[T](ds: Dataset[T], expr: Column, exprs: Column*): Dataset[T] = { +if (ds.isStreaming) { + throw new IllegalArgumentException("Observation does not support streaming Datasets") +} +register(ds.sparkSession) +ds.observe(name, expr, exprs: _*) + } + + /** + * Waits for the first action on the observed dataset to complete and returns true. + * The result is then available through the get method. + * This method times out after the given amount of time returning false. + * + * @param time timeout + * @param unit timeout time unit + * @return true if action completed within timeout, false otherwise + * @throws InterruptedException interrupted while waiting + */ + def waitCompleted(time: Long, unit: TimeUnit): Boolean = waitCompleted(Some(unit.toMillis(time))) + + /** + * Get the observed metrics. This waits until the observed dataset finishes its first action. + * If you want to wait for the result and provide a timeout, use [[waitCompleted]]. Only the + * result of the first action is available. Subsequent actions do not modify the result. + * + * @return the observed metrics as a [[Row]] + * @throws InterruptedException interrupted while waiting + */ + def get: Row = { +assert(waitCompleted(None), "waitCompleted without timeout returned false") +row.get + } + + private def waitCompleted(millis: Option[Long]): Boolean = { Review comment: shall we use `-1` to indicate no timeout? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r655400419 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Helper class to simplify usage of [[Dataset.observe(String, Column, Column*)]]: + * + * {{{ + * // Observe row count (rows) and highest id (maxid) in the Dataset while writing it + * val observation = Observation("my_metrics") + * val observed_ds = ds.observe(observation, count(lit(1)).as("rows"), max($"id").as("maxid")) + * observed_ds.write.parquet("ds.parquet") + * val metrics = observation.get Review comment: shall we also mention the `on` API here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r653798159 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Not thread-safe. + * @param name + * @param sparkSession + */ +class Observation(name: String) { + + private val listener: ObservationListener = ObservationListener(this) + + private var sparkSession: Option[SparkSession] = None + + @volatile private var row: Option[Row] = None + + /** + * Attach this observation to the given Dataset. + * + * @param ds dataset + * @tparam T dataset type + * @return observed dataset + */ + def on[T](ds: Dataset[T])(expr: Column, exprs: Column*): Dataset[T] = { +if (ds.isStreaming) { + throw new IllegalArgumentException("Observation does not support streaming Datasets") +} +register(ds.sparkSession) +ds.observe(name, expr, exprs: _*) + } + + /** + * Wait for the first action on the observed dataset to complete and returns true. + * This method times out after the given amount of time and returns false. + * + * @param time timeout + * @param unit timeout time unit + * @return true if action complete within timeout, false on timeout + */ + def waitCompleted(time: Long, unit: TimeUnit): Boolean = waitCompleted(Some(time), unit) + + /** + * Get the observation results. This waits until the observed dataset finishes its first action. + * If you want to wait for the result and provide a timeout, use waitCompleted. + * Only the result of the first action is available. Subsequent actions do not modify the result. + */ + def get: Row = { +assert(waitCompleted(None, TimeUnit.SECONDS), "waitCompleted without timeout returned false") +assert(row.isDefined, "waitCompleted without timeout returned while result is still None") Review comment: This is not needed, `waitCompleted` just returns `row.isDefined` ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Not thread-safe. + * @param name + * @param sparkSession + */ +class Observation(name: String) { + + private val listener: ObservationListener = ObservationListener(this) + + private var sparkSession: Option[SparkSession] = None + + @volatile private var row: Option[Row] = None + + /** + * Attach this observation to the given Dataset. + * + * @param ds dataset + * @tparam T dataset type + * @return observed dataset + */ + def on[T](ds: Dataset[T])(expr: Column, exprs: Column*): Dataset[T] = { +if (ds.isStreaming) { + throw new IllegalArgumentException("Observation does not support streaming Datasets") +} +register(ds.sparkSession) +ds.observe(name, expr, exprs: _*) + } + + /** + * Wait for the first action on the observed dataset to complete and returns true. + * This
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r652340898 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Not thread-safe. + * @param name + * @param sparkSession + */ +class Observation(name: String) { Review comment: then shall we have a variant of `waitCompleted` that w/o a timeout? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r651797114 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Not thread-safe. + * @param name + * @param sparkSession + */ +class Observation(name: String) { + + private val lock: Lock = new ReentrantLock() + private val completed: Condition = lock.newCondition() + private val listener: ObservationListener = ObservationListener(this) + + private var sparkSession: Option[SparkSession] = None + + @transient private var row: Option[Row] = None + + /** + * Attach this observation to the given Dataset. + * Remember to call `close()` when the observation is done. + * + * @param ds dataset + * @tparam T dataset type + * @return observed dataset + */ + def on[T](ds: Dataset[T])(expr: Column, exprs: Column*): Dataset[T] = { +if (ds.isStreaming) { + throw new IllegalArgumentException("Observation does not support streaming Datasets") +} +register(ds.sparkSession) +ds.observe(name, expr, exprs: _*) + } + + /** + * Get the observation results. Waits for the first action on the observed dataset to complete. + * After calling `reset()`, waits for completion of the next action on the observed dataset. + */ + def get: Row = option().get + + /** + * Get the observation results. Waits for the first action on the observed dataset to complete. + * This method times out waiting for the action after the given amount of time. + * After calling `reset()`, waits for completion of the next action on the observed dataset. + * + * @param time timeout + * @param unit timeout time unit + * @return observation row as an Option, or None on timeout + */ + def option(time: Long, unit: TimeUnit): Option[Row] = option(Some(time), unit) + + /** + * Wait for the first action on the observed dataset to complete. + * When the time parameter is given, this method times out waiting for the action. + * After calling `reset()`, waits for completion of the next action on the observed dataset. + * + * @param time timeout + * @param unit timeout time unit + * @return true if action complete within timeout, false on timeout + */ + def waitCompleted(time: Option[Long] = None, unit: TimeUnit = TimeUnit.MILLISECONDS): Boolean = { +lock.lock() +try { + if (row.isEmpty) { +if (time.isDefined) { + completed.await(time.get, unit) +} else { + completed.await() +} + } + row.isDefined +} finally { + lock.unlock() +} + } + + /** + * Wait for the first action on the observed dataset to complete. + * After calling `reset()`, waits for completion of the next action on the observed dataset. + * + * @param time timeout + * @param unit timeout time unit + * @return true if action complete within timeout, false on timeout + */ + def waitCompleted(time: Long, unit: TimeUnit): Boolean = waitCompleted(Some(time), unit) + + /** + * Reset the observation. This deletes the observation and allows to wait for completion + * of the next action called on the observed dataset. Not resetting the observation before + * attempting to retrieve the next action's results via get, option or waitCompleted is not + * guaranteed to work. + */ + def reset(): Unit = { Review comment: I'm a little worried about this API. The listener is registered and the result of the first observation may come at any time later. Shall we let the `Observation` instance be used only once and not provide `reset` API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r651793629 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Not thread-safe. + * @param name + * @param sparkSession + */ +class Observation(name: String) { Review comment: I'm thinking about what APIs we should provide here. We should at least have the `get` API to get the result and wait until it's available. We should also provide APIs with a timeout. Some ideas: 1. `def get(time: Long, unit: TimeUnit)`, which is similar to [LinkedBlockingQueue.poll](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html#poll(long,%20java.util.concurrent.TimeUnit)) 2. `waitCompleted` and users can call `get` later if it returns true. It looks to me that `def get(time: Long, unit: TimeUnit)` should be good enough and people can do try-catch if they want to handle timeout differently. It's better to start with less public APIs as we can always add later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r649262392 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ## @@ -2374,6 +2376,78 @@ class DataFrameSuite extends QueryTest } } + test("SPARK-34806: observation on datasets") { +val namedObservation = Observation("named") +val unnamedObservation = Observation() + +try { + val df = spark +.range(100) +.observe( + namedObservation, + min($"id").as("min_val"), + max($"id").as("max_val"), + sum($"id").as("sum_val"), + count(when($"id" % 2 === 0, 1)).as("num_even") +) +.observe( + unnamedObservation, + avg($"id").cast("int").as("avg_val") +) + + def checkMetrics(namedMetric: Row, unnamedMetric: Row): Unit = { +assert(namedMetric === Row(0L, 99L, 4950L, 50L)) +assert(unnamedMetric === Row(49)) + } + + // Before first run observation times out + assert(namedObservation.waitCompleted(100, TimeUnit.MILLISECONDS) === false) + assert(unnamedObservation.waitCompleted(100, TimeUnit.MILLISECONDS) === false) + + // First run + df.collect() + assert(namedObservation.waitCompleted(1, TimeUnit.SECONDS)) + assert(unnamedObservation.waitCompleted(1, TimeUnit.SECONDS)) + checkMetrics(namedObservation.get, unnamedObservation.get) + namedObservation.reset() + unnamedObservation.reset() Review comment: what if we don't call `reset` and start the second run? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r649259169 ## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.UUID +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +/** + * Not thread-safe. + * @param name + * @param sparkSession + */ +case class Observation(name: String) { Review comment: I'd avoid using case class in public APIs, as it provides too many APIs: apply, unapply, getters, copy, etc. and it's very hard to keep backward compatibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r603380474 ## File path: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ## @@ -683,6 +683,12 @@ class SparkSession private( ret } + def observation(expr: Column, exprs: Column*): Observation = Review comment: Yea this is arguable. Personally, I'd like to see a new API similar to the existing `df.observe(...)`, but the new one here is very different: ``` val ob = spark.observe(...) df.transform(ob.on). ``` Let's get more feedbacks. cc @srowen @HyukjinKwon @dongjoon-hyun @maropu -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r602072774 ## File path: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ## @@ -683,6 +683,12 @@ class SparkSession private( ret } + def observation(expr: Column, exprs: Column*): Observation = Review comment: It's a bit awkward to build an `Observation` with columns. The `Observation` looks like can be used with different DataFrames, but it is not because the columns are from a certain DataFrame. How about this: ``` val observation = Observation("the name") df1.observe(observation, expressions...)... val res1 = observation.get df2.observe(observation, expressions...)... val res2 = observation.get ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #31905: [SPARK-34806][SQL] Add Observation helper for Dataset.observe
cloud-fan commented on a change in pull request #31905: URL: https://github.com/apache/spark/pull/31905#discussion_r598537187 ## File path: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ## @@ -683,6 +683,12 @@ class SparkSession private( ret } + def observation(expr: Column, exprs: Column*): Observation = Review comment: Should this be a DataFrame API? A common pattern in pyspark is `df.select(df.my_col)`, but it's weird to see `spark.observation(df.my_col)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org