[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17202 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r105075392 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,8 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, + s"delay threshold ($delayThreshold) should not be negative.") --- End diff -- use `require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0)` to make `delayThreshold` more reasonable and significative. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r105069930 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { --- End diff -- sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r105005343 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { --- End diff -- I guess you copied these codes from `EventTimeWatermarkExec`. Could you extract them as a new method in `object EventTimeWatermark` and reuse it? Since you are touching these files, could you also fix the following two places as well? - https://github.com/apache/spark/blob/d8830c5039d9c7c5ef03631904c32873ab558e22/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala#L42 doesn't use the correct `delayMs`. Right now we don't use this value in the logical plan, but it's better to make it consistent. - I forgot to update `EventTimeWatermarkExec` in #17199. Could you help me fix it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r105067664 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth +} +assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") --- End diff -- @srowen +1 to your comments. We should make it significative but not just valid. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104999453 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,8 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +assert(parsedDelay.microseconds >= 0, --- End diff -- In other words, 0 means the events always arrive in order --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104983300 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth +} +require(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") --- End diff -- "delay threshold ($delayThreshold) should not be negative" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104983221 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -563,7 +563,7 @@ class Dataset[T] private[sql]( * @param eventTime the name of the column that contains the event time of the row. * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest * record that has been processed in the form of an interval - * (e.g. "1 minute" or "5 hours"). + * (e.g. "1 minute" or "5 hours"). NOTE: This should not be a negative time. --- End diff -- I'd just say "This should not be negative." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104944765 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth +} +assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") --- End diff -- Sorry that last example didn't make sense, it's not a single number. It just doesn't seem like you need to reproduce this conversion to check what you want to, which is that the two fields aren't negative. `require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0)` In fact, I wonder if this should be some kind of method of CalendarInterval, like `isNegative`? it's not that trivial to decide if an interval is negative, and maybe other places do this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104931873 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth +} +assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") --- End diff -- So can you `assert(parsedDelay >= 0, ...)` to detect invalid cases? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104930545 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth +} +assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") --- End diff -- Maybe you misunderstand my cases above. Those cases are invalid, i.e. the `parsedDelay` are negative. |cases|validity| ||--| |inputData.withWatermark("value", "1 month -40 days")|invalid| |inputData.withWatermark("value", "-10 seconds")|invalid| |inputData.withWatermark("value", "10 seconds")|valid| |inputData.withWatermark("value", "1 day -10 seconds")|valid| --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104907760 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth +} +assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") --- End diff -- Ah right. Can you not assert about parsedDelay then? it seems like that's always negative when the result of this conversion, copied from somewhere else? is negative. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104904202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth +} +assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") --- End diff -- `delayThreshold: String` can not be used to assert directly, like: ``` inputData.withWatermark("value", "1 month -40 days") inputData.withWatermark("value", "-10 seconds") ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104900463 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth +} +assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") --- End diff -- Why don't you just `assert(delayThreshold >= 0, ...)`? Why does that check require computing `delayMs`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104899893 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth +} +assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") --- End diff -- @srowen Thanks for you review! > Why compute all this -- don't you just mean to assert about delayThreshold? I do mean to check the `delayThreshold`. `delayThreshold` is converted from `String` to `CalendarInterval`. `CalendarInterval` divides the `delayThreshold` into two parts, i.e. month (contain year and month) and microseconds of rest. (https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java#L86) (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala#L87) > this derived value can only be negative if the input is right? Sorry, I dont get what you mean. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104892736 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,11 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +val delayMs = { + val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31 + parsedDelay.milliseconds + parsedDelay.months * millisPerMonth +} +assert(delayMs >= 0, s"delay threshold should not be a negative time: $delayThreshold") --- End diff -- Why compute all this -- don't you just mean to assert about `delayThreshold`? this derived value can only be negative if the input is right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104888633 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,8 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +assert(parsedDelay.microseconds >= 0, --- End diff -- set 0 means event time should be not less than max event time in last batch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/17202#discussion_r104853064 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -576,6 +576,8 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) +assert(parsedDelay.microseconds >= 0, --- End diff -- what happens when it is 0? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17202: [SPARK-19861][SS] watermark should not be a negat...
GitHub user uncleGen opened a pull request: https://github.com/apache/spark/pull/17202 [SPARK-19861][SS] watermark should not be a negative time. ## What changes were proposed in this pull request? watermark should not be a negative time. ## How was this patch tested? add new unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uncleGen/spark SPARK-19861 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17202.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17202 commit dcc77eda3d88f8cd5c66b60730c0ada5ae717cc3 Author: uncleGen Date: 2017-03-08T03:28:29Z watermark should not be a negative time. commit 00bbadc063e515653af86ab85fc95833bccf727a Author: uncleGen Date: 2017-03-08T03:30:13Z update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org