[GitHub] spark pull request #21701: [SPARK-24730][SS] Add policy to choose max as glo...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21701 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21701: [SPARK-24730][SS] Add policy to choose max as glo...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21701#discussion_r201071243 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala --- @@ -20,15 +20,66 @@ package org.apache.spark.sql.execution.streaming import scala.collection.mutable import org.apache.spark.internal.Logging +import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.internal.SQLConf -class WatermarkTracker extends Logging { +/** + * Policy to define how to choose a new global watermark value if there are + * multiple watermark operators in a streaming query. + */ +sealed trait MultipleWatermarkPolicy { + def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long +} + +object MultipleWatermarkPolicy { + val DEFAULT_POLICY_NAME = "min" + + def apply(policyName: String): MultipleWatermarkPolicy = { +policyName.toLowerCase match { + case DEFAULT_POLICY_NAME => MinWatermark + case "max" => MaxWatermark + case _ => +throw new IllegalArgumentException(s"Could not recognize watermark policy '$policyName'") +} + } +} + +/** + * Policy to choose the *min* of the operator watermark values as the global watermark value. + * Note that this is the safe (hence default) policy as the global watermark will advance + * only if all the individual operator watermarks have advanced. In other words, in a + * streaming query with multiple input streams and watermarks defined on all of them, + * the global watermark will advance as slowly as the slowest input. So if there is watermark + * based state cleanup or late-data dropping, then this policy is the most conservative one. + */ +case object MinWatermark extends MultipleWatermarkPolicy { + def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long = { +if (operatorWatermarks.nonEmpty) operatorWatermarks.min else 0 --- End diff -- I took a while to figure out how 0 works and then realized `operatorWatermarks ` is always not empty. Should we add an assertion rather than returning `0`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21701: [SPARK-24730][SS] Add policy to choose max as glo...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21701#discussion_r200771488 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala --- @@ -115,8 +130,20 @@ object OffsetSeqMetadata extends Logging { case None => // For backward compatibility, if a config was not recorded in the offset log, - // then log it, and let the existing conf value in SparkSession prevail. - logWarning (s"Conf '$confKey' was not found in the offset log, using existing value") + // then either inject a default value (if specified in `relevantSQLConfDefaultValues`) or + // let the existing conf value in SparkSession prevail. + relevantSQLConfDefaultValues.get(confKey) match { + +case Some(defaultValue) => + sessionConf.set(confKey, defaultValue) + logWarning(s"Conf '$confKey' was not found in the offset log, " + +s"using default value '$defaultValue'") + +case None => + logWarning(s"Conf '$confKey' was not found in the offset log, " + +s"using existing value '${sessionConf.get(confKey, null)}") --- End diff -- `'${sessionConf.get(confKey, null)}`? What's this supposed to be? It seems weird to output `null` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21701: [SPARK-24730][SS] Add policy to choose max as glo...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21701#discussion_r200761267 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -875,6 +875,16 @@ object SQLConf { .stringConf .createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol") + val STREAMING_MULTIPLE_WATERMARK_POLICY = +buildConf("spark.sql.streaming.multipleWatermarkPolicy") + .doc("Policy to calculate the global watermark value when there are multiple watermark " + +"operators in a streaming query. The default value is 'min' which chooses " + +"the minimum watermark reported across multiple operators." + +"Note: This configuration cannot be changed between query restarts from the same " + +"checkpoint location.") + .stringConf --- End diff -- You can use `checkValue` to add validation for this conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21701: [SPARK-24730][SS] Add policy to choose max as glo...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/21701 [SPARK-24730][SS] Add policy to choose max as global watermark when streaming query has multiple watermarks ## What changes were proposed in this pull request? Currently, when a streaming query has multiple watermark, the policy is to choose the min of them as the global watermark. This is safe to do as the global watermark moves with the slowest stream, and is therefore is safe as it does not unexpectedly drop some data as late, etc. While this is indeed the safe thing to do, in some cases, you may want the watermark to advance with the fastest stream, that is, take the max of multiple watermarks. This PR is to add that configuration. It makes the following changes. - Adds a configuration to specify max as the policy. - Saves the configuration in OffsetSeqMetadata because changing it in the middle can lead to unpredictable results. - For old checkpoints without the configuration, it assumes the default policy as min (irrespective of the policy set at the session where the query is being restarted). This is to ensure that existing queries are affected in any way. - [ ] Add a test for recovery from existing checkpoints. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-24730 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21701.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21701 commit c0d1c6e0a5532eeab0848834d2dc348808e54069 Author: Tathagata Das Date: 2018-07-03T04:28:05Z Implemented --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org