[jira] [Updated] (BEAM-7995) IllegalStateException: TimestampCombiner moved element from to earlier time in Python

2020-06-10 Thread Beam JIRA Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-7995:

Labels:   (was: stale-assigned)

> IllegalStateException: TimestampCombiner moved element from to earlier time 
> in Python
> -
>
> Key: BEAM-7995
> URL: https://issues.apache.org/jira/browse/BEAM-7995
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Hai Lu
>Priority: P2
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> I'm looking into a bug I found internally when using Beam portable API 
> (Python) on our own Samza runner. 
>  
> The pipeline looks something like this:
>  
>     (p
>      | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
>      | 'transform' >> beam.Map(lambda event: process_event(event))
>      | 'window' >> beam.WindowInto(FixedWindows(15))
>      | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
>      ...
>  
> The problem comes from the combiners which cause the following exception on 
> Java side:
>  
> Caused by: java.lang.IllegalStateException: TimestampCombiner moved element 
> from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z 
> for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z)
>     at 
> org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
>     at 
> org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
>     at 
> org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
>     at 
> org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
>     at 
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
>     at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>  
> The exception happens here 
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116]
>  when we check the shifted timestamp to ensure it's before the timestamp.
>  
>     if (shifted.isBefore(timestamp)) {
>       throw new IllegalStateException(
>           String.format(
>               "TimestampCombiner moved element from %s to earlier time %s for 
> window %s",
>               BoundedWindow.formatTimestamp(timestamp),
>               BoundedWindow.formatTimestamp(shifted),
>               window));
>     }
>  
> As you can see from the exception, the "shifted" is "XXX 44.999" while the 
> "timestamp" is "XXX 45.000". The "44.999" is coming from 
> [TimestampCombiner.END_OF_WINDOW|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L116]:
>  
>     @Override
>     public Instant merge(BoundedWindow intoWindow, Iterable Instant> mergingTimestamps) {
>       return intoWindow.maxTimestamp();
>     }
>  
> where intoWindow.maxTimestamp() is:
>  
>   /** Returns the largest timestamp that can be included in this window. */
>   @Override
>   public Instant maxTimestamp() {
>     *// end not inclusive*
>     return *end.minus(1)*;
>   }
>  
> Hence, the "44.*999*". 
>  
> And the "45.000" comes from the Python side when the combiner output results 
> as pre GBK operation: 
> [operations.py#PGBKCVOperation#output_key|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py#L889]
>  
>     if windows is 0:
>       self.output(_globally_windowed_value.with_value((key, value)))
>     else:
>       self.output(WindowedValue((key, value), *windows[0].end*, windows))
>  
> Here when we generate the window value, the timestamp is assigned to the 
> closed interval end (45.000) as opposed to open interval end (44.999)
>  
> Clearly the "end of window" definition is a bit inconsistent across Python 
> and Java. I'm yet to try this on other runner so not sure whether this is 
> only an issue for our Samza runner. I tend to think this is a bug but would 
> like to confirm with you. If this has not been an issue for other runners, 
> where did I potentially do wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7995) IllegalStateException: TimestampCombiner moved element from to earlier time in Python

2020-06-01 Thread Kenneth Knowles (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-7995:
--
Labels: stale-assigned  (was: )

> IllegalStateException: TimestampCombiner moved element from to earlier time 
> in Python
> -
>
> Key: BEAM-7995
> URL: https://issues.apache.org/jira/browse/BEAM-7995
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: P2
>  Labels: stale-assigned
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> I'm looking into a bug I found internally when using Beam portable API 
> (Python) on our own Samza runner. 
>  
> The pipeline looks something like this:
>  
>     (p
>      | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
>      | 'transform' >> beam.Map(lambda event: process_event(event))
>      | 'window' >> beam.WindowInto(FixedWindows(15))
>      | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
>      ...
>  
> The problem comes from the combiners which cause the following exception on 
> Java side:
>  
> Caused by: java.lang.IllegalStateException: TimestampCombiner moved element 
> from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z 
> for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z)
>     at 
> org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
>     at 
> org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
>     at 
> org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
>     at 
> org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
>     at 
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
>     at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>  
> The exception happens here 
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116]
>  when we check the shifted timestamp to ensure it's before the timestamp.
>  
>     if (shifted.isBefore(timestamp)) {
>       throw new IllegalStateException(
>           String.format(
>               "TimestampCombiner moved element from %s to earlier time %s for 
> window %s",
>               BoundedWindow.formatTimestamp(timestamp),
>               BoundedWindow.formatTimestamp(shifted),
>               window));
>     }
>  
> As you can see from the exception, the "shifted" is "XXX 44.999" while the 
> "timestamp" is "XXX 45.000". The "44.999" is coming from 
> [TimestampCombiner.END_OF_WINDOW|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L116]:
>  
>     @Override
>     public Instant merge(BoundedWindow intoWindow, Iterable Instant> mergingTimestamps) {
>       return intoWindow.maxTimestamp();
>     }
>  
> where intoWindow.maxTimestamp() is:
>  
>   /** Returns the largest timestamp that can be included in this window. */
>   @Override
>   public Instant maxTimestamp() {
>     *// end not inclusive*
>     return *end.minus(1)*;
>   }
>  
> Hence, the "44.*999*". 
>  
> And the "45.000" comes from the Python side when the combiner output results 
> as pre GBK operation: 
> [operations.py#PGBKCVOperation#output_key|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py#L889]
>  
>     if windows is 0:
>       self.output(_globally_windowed_value.with_value((key, value)))
>     else:
>       self.output(WindowedValue((key, value), *windows[0].end*, windows))
>  
> Here when we generate the window value, the timestamp is assigned to the 
> closed interval end (45.000) as opposed to open interval end (44.999)
>  
> Clearly the "end of window" definition is a bit inconsistent across Python 
> and Java. I'm yet to try this on other runner so not sure whether this is 
> only an issue for our Samza runner. I tend to think this is a bug but would 
> like to confirm with you. If this has not been an issue for other runners, 
> where did I potentially do wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-7995) IllegalStateException: TimestampCombiner moved element from to earlier time in Python

2019-08-19 Thread Jira


 [ 
https://issues.apache.org/jira/browse/BEAM-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-7995:
---
Status: Open  (was: Triage Needed)

> IllegalStateException: TimestampCombiner moved element from to earlier time 
> in Python
> -
>
> Key: BEAM-7995
> URL: https://issues.apache.org/jira/browse/BEAM-7995
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> I'm looking into a bug I found internally when using Beam portable API 
> (Python) on our own Samza runner. 
>  
> The pipeline looks something like this:
>  
>     (p
>      | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
>      | 'transform' >> beam.Map(lambda event: process_event(event))
>      | 'window' >> beam.WindowInto(FixedWindows(15))
>      | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
>      ...
>  
> The problem comes from the combiners which cause the following exception on 
> Java side:
>  
> Caused by: java.lang.IllegalStateException: TimestampCombiner moved element 
> from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z 
> for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z)
>     at 
> org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
>     at 
> org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
>     at 
> org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
>     at 
> org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
>     at 
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
>     at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>  
> The exception happens here 
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116]
>  when we check the shifted timestamp to ensure it's before the timestamp.
>  
>     if (shifted.isBefore(timestamp)) {
>       throw new IllegalStateException(
>           String.format(
>               "TimestampCombiner moved element from %s to earlier time %s for 
> window %s",
>               BoundedWindow.formatTimestamp(timestamp),
>               BoundedWindow.formatTimestamp(shifted),
>               window));
>     }
>  
> As you can see from the exception, the "shifted" is "XXX 44.999" while the 
> "timestamp" is "XXX 45.000". The "44.999" is coming from 
> [TimestampCombiner.END_OF_WINDOW|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L116]:
>  
>     @Override
>     public Instant merge(BoundedWindow intoWindow, Iterable Instant> mergingTimestamps) {
>       return intoWindow.maxTimestamp();
>     }
>  
> where intoWindow.maxTimestamp() is:
>  
>   /** Returns the largest timestamp that can be included in this window. */
>   @Override
>   public Instant maxTimestamp() {
>     *// end not inclusive*
>     return *end.minus(1)*;
>   }
>  
> Hence, the "44.*999*". 
>  
> And the "45.000" comes from the Python side when the combiner output results 
> as pre GBK operation: 
> [operations.py#PGBKCVOperation#output_key|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py#L889]
>  
>     if windows is 0:
>       self.output(_globally_windowed_value.with_value((key, value)))
>     else:
>       self.output(WindowedValue((key, value), *windows[0].end*, windows))
>  
> Here when we generate the window value, the timestamp is assigned to the 
> closed interval end (45.000) as opposed to open interval end (44.999)
>  
> Clearly the "end of window" definition is a bit inconsistent across Python 
> and Java. I'm yet to try this on other runner so not sure whether this is 
> only an issue for our Samza runner. I tend to think this is a bug but would 
> like to confirm with you. If this has not been an issue for other runners, 
> where did I potentially do wrong.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (BEAM-7995) IllegalStateException: TimestampCombiner moved element from to earlier time in Python

2019-08-16 Thread Hai Lu (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Lu updated BEAM-7995:
-
Description: 
I'm looking into a bug I found internally when using Beam portable API (Python) 
on our own Samza runner. 
 
The pipeline looks something like this:
 
    (p
     | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
     | 'transform' >> beam.Map(lambda event: process_event(event))
     | 'window' >> beam.WindowInto(FixedWindows(15))
     | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
     ...
 
The problem comes from the combiners which cause the following exception on 
Java side:
 
Caused by: java.lang.IllegalStateException: TimestampCombiner moved element 
from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z for 
window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z)
    at org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
    at 
org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
    at 
org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
    at 
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
    at 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
    at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
 
The exception happens here 
[https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116]
 when we check the shifted timestamp to ensure it's before the timestamp.
 
    if (shifted.isBefore(timestamp)) {
      throw new IllegalStateException(
          String.format(
              "TimestampCombiner moved element from %s to earlier time %s for 
window %s",
              BoundedWindow.formatTimestamp(timestamp),
              BoundedWindow.formatTimestamp(shifted),
              window));
    }
 
As you can see from the exception, the "shifted" is "XXX 44.999" while the 
"timestamp" is "XXX 45.000". The "44.999" is coming from 
[TimestampCombiner.END_OF_WINDOW|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L116]:
 
    @Override
    public Instant merge(BoundedWindow intoWindow, Iterable 
mergingTimestamps) {
      return intoWindow.maxTimestamp();
    }
 
where intoWindow.maxTimestamp() is:
 
  /** Returns the largest timestamp that can be included in this window. */
  @Override
  public Instant maxTimestamp() {
    *// end not inclusive*
    return *end.minus(1)*;
  }
 
Hence, the "44.*999*". 
 
And the "45.000" comes from the Python side when the combiner output results as 
pre GBK operation: 
[operations.py#PGBKCVOperation#output_key|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py#L889]
 
    if windows is 0:
      self.output(_globally_windowed_value.with_value((key, value)))
    else:
      self.output(WindowedValue((key, value), *windows[0].end*, windows))
 
Here when we generate the window value, the timestamp is assigned to the closed 
interval end (45.000) as opposed to open interval end (44.999)
 
Clearly the "end of window" definition is a bit inconsistent across Python and 
Java. I'm yet to try this on other runner so not sure whether this is only an 
issue for our Samza runner. I tend to think this is a bug but would like to 
confirm with you. If this has not been an issue for other runners, where did I 
potentially do wrong.

> IllegalStateException: TimestampCombiner moved element from to earlier time 
> in Python
> -
>
> Key: BEAM-7995
> URL: https://issues.apache.org/jira/browse/BEAM-7995
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>
> I'm looking into a bug I found internally when using Beam portable API 
> (Python) on our own Samza runner. 
>  
> The pipeline looks something like this:
>  
>     (p
>      | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
>      | 'transform' >> beam.Map(lambda event: process_event(event))
>      | 'window' >> beam.WindowInto(FixedWindows(15))
>      | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
>      ...
>  
> The problem comes from the combiners which cause the following exception on 
> Java side:
>  
> Caused by: java.lang.IllegalStateException: TimestampCombiner moved element 
> from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z 
> for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z)
>     at 
>