[GitHub] spark pull request #21701: [SPARK-24730][SS] Add policy to choose max as glo...

2018-07-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21701


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21701: [SPARK-24730][SS] Add policy to choose max as glo...

2018-07-09 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21701#discussion_r201071243
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
 ---
@@ -20,15 +20,66 @@ package org.apache.spark.sql.execution.streaming
 import scala.collection.mutable
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.RuntimeConfig
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.internal.SQLConf
 
-class WatermarkTracker extends Logging {
+/**
+ * Policy to define how to choose a new global watermark value if there are
+ * multiple watermark operators in a streaming query.
+ */
+sealed trait MultipleWatermarkPolicy {
+  def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long
+}
+
+object MultipleWatermarkPolicy {
+  val DEFAULT_POLICY_NAME = "min"
+
+  def apply(policyName: String): MultipleWatermarkPolicy = {
+policyName.toLowerCase match {
+  case DEFAULT_POLICY_NAME => MinWatermark
+  case "max" => MaxWatermark
+  case _ =>
+throw new IllegalArgumentException(s"Could not recognize watermark 
policy '$policyName'")
+}
+  }
+}
+
+/**
+ * Policy to choose the *min* of the operator watermark values as the 
global watermark value.
+ * Note that this is the safe (hence default) policy as the global 
watermark will advance
+ * only if all the individual operator watermarks have advanced. In other 
words, in a
+ * streaming query with multiple input streams and watermarks defined on 
all of them,
+ * the global watermark will advance as slowly as the slowest input. So if 
there is watermark
+ * based state cleanup or late-data dropping, then this policy is the most 
conservative one.
+ */
+case object MinWatermark extends MultipleWatermarkPolicy {
+  def chooseGlobalWatermark(operatorWatermarks: Seq[Long]): Long = {
+if (operatorWatermarks.nonEmpty) operatorWatermarks.min else 0
--- End diff --

I took a while to figure out how 0 works and then realized 
`operatorWatermarks ` is always not empty. Should we add an assertion rather 
than returning `0`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21701: [SPARK-24730][SS] Add policy to choose max as glo...

2018-07-09 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21701#discussion_r200771488
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -115,8 +130,20 @@ object OffsetSeqMetadata extends Logging {
 
 case None =>
   // For backward compatibility, if a config was not recorded in 
the offset log,
-  // then log it, and let the existing conf value in SparkSession 
prevail.
-  logWarning (s"Conf '$confKey' was not found in the offset log, 
using existing value")
+  // then either inject a default value (if specified in 
`relevantSQLConfDefaultValues`) or
+  // let the existing conf value in SparkSession prevail.
+  relevantSQLConfDefaultValues.get(confKey) match {
+
+case Some(defaultValue) =>
+  sessionConf.set(confKey, defaultValue)
+  logWarning(s"Conf '$confKey' was not found in the offset 
log, " +
+s"using default value '$defaultValue'")
+
+case None =>
+  logWarning(s"Conf '$confKey' was not found in the offset 
log, " +
+s"using existing value '${sessionConf.get(confKey, null)}")
--- End diff --

`'${sessionConf.get(confKey, null)}`? What's this supposed to be? It seems 
weird to output `null` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21701: [SPARK-24730][SS] Add policy to choose max as glo...

2018-07-09 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21701#discussion_r200761267
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -875,6 +875,16 @@ object SQLConf {
   .stringConf
   
.createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol")
 
+  val STREAMING_MULTIPLE_WATERMARK_POLICY =
+buildConf("spark.sql.streaming.multipleWatermarkPolicy")
+  .doc("Policy to calculate the global watermark value when there are 
multiple watermark " +
+"operators in a streaming query. The default value is 'min' which 
chooses " +
+"the minimum watermark reported across multiple operators." +
+"Note: This configuration cannot be changed between query restarts 
from the same " +
+"checkpoint location.")
+  .stringConf
--- End diff --

You can use `checkValue` to add validation for this conf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21701: [SPARK-24730][SS] Add policy to choose max as glo...

2018-07-02 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/21701

[SPARK-24730][SS] Add policy to choose max as global watermark when 
streaming query has multiple watermarks 

## What changes were proposed in this pull request?

Currently, when a streaming query has multiple watermark, the policy is to 
choose the min of them as the global watermark. This is safe to do as the 
global watermark moves with the slowest stream, and is therefore is safe as it 
does not unexpectedly drop some data as late, etc. While this is indeed the 
safe thing to do, in some cases, you may want the watermark to advance with the 
fastest stream, that is, take the max of multiple watermarks. This PR is to add 
that configuration. It makes the following changes. 

- Adds a configuration to specify max as the policy.
- Saves the configuration in OffsetSeqMetadata because changing it in the 
middle can lead to unpredictable results. 
   - For old checkpoints without the configuration, it assumes the default 
policy as min (irrespective of the policy set at the session where the query is 
being restarted). This is to ensure that existing queries are affected in any 
way. 

- [ ] Add a test for recovery from existing checkpoints.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-24730

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21701.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21701


commit c0d1c6e0a5532eeab0848834d2dc348808e54069
Author: Tathagata Das 
Date:   2018-07-03T04:28:05Z

Implemented




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org