[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=331443=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331443 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 21/Oct/19 15:09 Start Date: 21/Oct/19 15:09 Worklog Time Spent: 10m Work Description: echauchot commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331443) Time Spent: 4.5h (was: 4h 20m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=331262=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331262 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 21/Oct/19 08:16 Start Date: 21/Oct/19 08:16 Worklog Time Spent: 10m Work Description: echauchot commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-544402252 Triggering of the validates runner tests from the PR seems to be broken. I'm running them from my laptop on your PR branch and they pass. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331262) Time Spent: 4h 20m (was: 4h 10m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=331261=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331261 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 21/Oct/19 08:16 Start Date: 21/Oct/19 08:16 Worklog Time Spent: 10m Work Description: echauchot commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-544402252 Trigerring of the validates runner tests from the PR seems to be broken. I'm running them from my laptop on your PR branch and they pass. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331261) Time Spent: 4h 10m (was: 4h) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=331252=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331252 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 21/Oct/19 08:02 Start Date: 21/Oct/19 08:02 Worklog Time Spent: 10m Work Description: echauchot commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-544397376 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331252) Time Spent: 3h 20m (was: 3h 10m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=331254=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331254 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 21/Oct/19 08:03 Start Date: 21/Oct/19 08:03 Worklog Time Spent: 10m Work Description: echauchot commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-544397900 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331254) Time Spent: 3h 40m (was: 3.5h) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=331256=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331256 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 21/Oct/19 08:04 Start Date: 21/Oct/19 08:04 Worklog Time Spent: 10m Work Description: echauchot commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-544398078 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331256) Time Spent: 4h (was: 3h 50m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=331251=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331251 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 21/Oct/19 08:02 Start Date: 21/Oct/19 08:02 Worklog Time Spent: 10m Work Description: echauchot commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#discussion_r336881029 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ## @@ -338,6 +338,16 @@ public void outputWindowedValue( outputHolder.getWindowedValues(); if (!outputs.isEmpty() || !stateInternals.getState().isEmpty()) { + +Collection expiredInternalTimers = +LateDataUtils.getExpiredTimers( +timerInternals.getTimers(), +windowingStrategy, +timerInternals.currentInputWatermarkTime()); + +// Remove the expired timer from the timerInternals structure +expiredInternalTimers.forEach(timerInternals::deleteTimer); Review comment: @bmv126 I would prefer that you add a method to LateDataUtils that drops expired timers rather than a method that just lists them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331251) Time Spent: 3h 10m (was: 3h) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=331253=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331253 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 21/Oct/19 08:03 Start Date: 21/Oct/19 08:03 Worklog Time Spent: 10m Work Description: echauchot commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-544397376 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331253) Time Spent: 3.5h (was: 3h 20m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=331255=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331255 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 21/Oct/19 08:04 Start Date: 21/Oct/19 08:04 Worklog Time Spent: 10m Work Description: echauchot commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-544397900 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331255) Time Spent: 3h 50m (was: 3h 40m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=330597=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-330597 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 18/Oct/19 16:13 Start Date: 18/Oct/19 16:13 Worklog Time Spent: 10m Work Description: bmv126 commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-543815179 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 330597) Time Spent: 3h (was: 2h 50m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=330497=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-330497 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 18/Oct/19 13:31 Start Date: 18/Oct/19 13:31 Worklog Time Spent: 10m Work Description: bmv126 commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-543747540 @echauchot I have addressed the review comment, can you have a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 330497) Time Spent: 2h 50m (was: 2h 40m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=330496=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-330496 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 18/Oct/19 13:29 Start Date: 18/Oct/19 13:29 Worklog Time Spent: 10m Work Description: bmv126 commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#discussion_r336490749 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ## @@ -338,6 +339,18 @@ public void outputWindowedValue( outputHolder.getWindowedValues(); if (!outputs.isEmpty() || !stateInternals.getState().isEmpty()) { +Collection filteredTimers = +timerInternals.getTimers().stream() +.filter( +timer -> +timer +.getTimestamp() +.plus(windowingStrategy.getAllowedLateness()) + .isBefore(timerInternals.currentInputWatermarkTime())) +.collect(Collectors.toList()); + +filteredTimers.forEach(timerInternals::deleteTimer); + Review comment: I have moved the code to a method in LateDataUtils. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 330496) Time Spent: 2h 40m (was: 2.5h) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=329816=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329816 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 17/Oct/19 12:30 Start Date: 17/Oct/19 12:30 Worklog Time Spent: 10m Work Description: echauchot commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-543151890 @bmv126 fine for me thanks for your work ! Just address the nits and LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 329816) Time Spent: 2.5h (was: 2h 20m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=329814=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329814 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 17/Oct/19 12:29 Start Date: 17/Oct/19 12:29 Worklog Time Spent: 10m Work Description: echauchot commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#discussion_r335973995 ## File path: runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ## @@ -451,6 +457,47 @@ public void testAdvanceWatermarkEqualToPositiveInfinityThrows() { source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE); } + @Test + public void testInStreamingModeCountByKey() throws Exception { +Instant instant = new Instant(0); + +CreateStream> kvSource = +CreateStream.of(KvCoder.of(VarIntCoder.of(), VarLongCoder.of()), batchDuration()) +.emptyBatch() +.advanceWatermarkForNextBatch(instant) +.nextBatch( +TimestampedValue.of(KV.of(1, 100L), instant.plus(Duration.standardSeconds(3L))), +TimestampedValue.of(KV.of(1, 300L), instant.plus(Duration.standardSeconds(4L + .advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(7L))) +.nextBatch( +TimestampedValue.of(KV.of(1, 400L), instant.plus(Duration.standardSeconds(8L +.advanceNextBatchWatermarkToInfinity(); + +PCollection> output = +p.apply("create kv Source", kvSource) +.apply( +"window input", +Window.>into(FixedWindows.of(Duration.standardSeconds(3L))) +.withAllowedLateness(Duration.ZERO)) +.apply(Count.perKey()); + +PAssert.that("Wrong count value ", output) +.satisfies( +(SerializableFunction>, Void>) +input -> { + for (KV element : input) { +if (element.getKey() == 1) { + Long countValue = element.getValue(); + assertNotEquals("Count Value is 0 !!!", 0L, countValue.longValue()); Review comment: indeed, the test fails if the fix is not present so, fine for me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 329814) Time Spent: 2h 20m (was: 2h 10m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 >
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=329813=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329813 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 17/Oct/19 12:27 Start Date: 17/Oct/19 12:27 Worklog Time Spent: 10m Work Description: echauchot commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#discussion_r335973497 ## File path: runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ## @@ -451,6 +457,47 @@ public void testAdvanceWatermarkEqualToPositiveInfinityThrows() { source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE); } + @Test + public void testInStreamingModeCountByKey() throws Exception { +Instant instant = new Instant(0); + +CreateStream> kvSource = +CreateStream.of(KvCoder.of(VarIntCoder.of(), VarLongCoder.of()), batchDuration()) +.emptyBatch() +.advanceWatermarkForNextBatch(instant) +.nextBatch( +TimestampedValue.of(KV.of(1, 100L), instant.plus(Duration.standardSeconds(3L))), +TimestampedValue.of(KV.of(1, 300L), instant.plus(Duration.standardSeconds(4L + .advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(7L))) +.nextBatch( +TimestampedValue.of(KV.of(1, 400L), instant.plus(Duration.standardSeconds(8L +.advanceNextBatchWatermarkToInfinity(); + +PCollection> output = +p.apply("create kv Source", kvSource) +.apply( +"window input", +Window.>into(FixedWindows.of(Duration.standardSeconds(3L))) +.withAllowedLateness(Duration.ZERO)) +.apply(Count.perKey()); Review comment: that is because translation in batch mode and in streaming mode are different: in streaming it uses the sdk impl based on GBK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 329813) Time Spent: 2h 10m (was: 2h) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=325091=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-325091 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 08/Oct/19 13:53 Start Date: 08/Oct/19 13:53 Worklog Time Spent: 10m Work Description: echauchot commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#discussion_r332510266 ## File path: runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ## @@ -451,6 +457,47 @@ public void testAdvanceWatermarkEqualToPositiveInfinityThrows() { source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE); } + @Test + public void testInStreamingModeCountByKey() throws Exception { +Instant instant = new Instant(0); + +CreateStream> kvSource = +CreateStream.of(KvCoder.of(VarIntCoder.of(), VarLongCoder.of()), batchDuration()) +.emptyBatch() +.advanceWatermarkForNextBatch(instant) +.nextBatch( +TimestampedValue.of(KV.of(1, 100L), instant.plus(Duration.standardSeconds(3L))), +TimestampedValue.of(KV.of(1, 300L), instant.plus(Duration.standardSeconds(4L + .advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(7L))) +.nextBatch( +TimestampedValue.of(KV.of(1, 400L), instant.plus(Duration.standardSeconds(8L +.advanceNextBatchWatermarkToInfinity(); + +PCollection> output = +p.apply("create kv Source", kvSource) +.apply( +"window input", +Window.>into(FixedWindows.of(Duration.standardSeconds(3L))) +.withAllowedLateness(Duration.ZERO)) +.apply(Count.perKey()); + +PAssert.that("Wrong count value ", output) +.satisfies( +(SerializableFunction>, Void>) +input -> { + for (KV element : input) { +if (element.getKey() == 1) { + Long countValue = element.getValue(); + assertNotEquals("Count Value is 0 !!!", 0L, countValue.longValue()); Review comment: As I understood expired timers are not evicted and the fact that they are triggered entails an empty collection as output. But it is not in 100% of cases right, only in some corners cases ? I see no corner case in this test case, there should be 3 value output (one per 3s window, with timestamp 3, 4 and 8). I don't understand how this test ensures that the fix works. Is this test really failing without the fix in `SparkGroupAlsoByWindowViaWindowSet` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 325091) Time Spent: 1h 40m (was: 1.5h) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=325092=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-325092 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 08/Oct/19 13:53 Start Date: 08/Oct/19 13:53 Worklog Time Spent: 10m Work Description: echauchot commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#discussion_r332510266 ## File path: runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ## @@ -451,6 +457,47 @@ public void testAdvanceWatermarkEqualToPositiveInfinityThrows() { source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE); } + @Test + public void testInStreamingModeCountByKey() throws Exception { +Instant instant = new Instant(0); + +CreateStream> kvSource = +CreateStream.of(KvCoder.of(VarIntCoder.of(), VarLongCoder.of()), batchDuration()) +.emptyBatch() +.advanceWatermarkForNextBatch(instant) +.nextBatch( +TimestampedValue.of(KV.of(1, 100L), instant.plus(Duration.standardSeconds(3L))), +TimestampedValue.of(KV.of(1, 300L), instant.plus(Duration.standardSeconds(4L + .advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(7L))) +.nextBatch( +TimestampedValue.of(KV.of(1, 400L), instant.plus(Duration.standardSeconds(8L +.advanceNextBatchWatermarkToInfinity(); + +PCollection> output = +p.apply("create kv Source", kvSource) +.apply( +"window input", +Window.>into(FixedWindows.of(Duration.standardSeconds(3L))) +.withAllowedLateness(Duration.ZERO)) +.apply(Count.perKey()); + +PAssert.that("Wrong count value ", output) +.satisfies( +(SerializableFunction>, Void>) +input -> { + for (KV element : input) { +if (element.getKey() == 1) { + Long countValue = element.getValue(); + assertNotEquals("Count Value is 0 !!!", 0L, countValue.longValue()); Review comment: As I understood expired timers are not evicted and the fact that they are triggered entails an empty collection as output. But it is not in 100% of cases right, only in some corners cases ? I see no corner case in this test case, there should be 3 value output (one per 3s window, with timestamp 3, 4 and 8). I don't understand how this test ensures that the fix works. Is this test really failing without the fix in `SparkGroupAlsoByWindowViaWindowSet` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 325092) Time Spent: 1h 50m (was: 1h 40m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=325078=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-325078 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 08/Oct/19 13:51 Start Date: 08/Oct/19 13:51 Worklog Time Spent: 10m Work Description: echauchot commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#discussion_r332478533 ## File path: runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ## @@ -451,6 +457,47 @@ public void testAdvanceWatermarkEqualToPositiveInfinityThrows() { source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE); } + @Test + public void testInStreamingModeCountByKey() throws Exception { Review comment: First thought that this could be a validatesRunner test but then I remembered that spark has its own validates runner tests for streaming. So, this place is not perfect but will be fine This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 325078) Time Spent: 1.5h (was: 1h 20m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=325075=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-325075 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 08/Oct/19 13:51 Start Date: 08/Oct/19 13:51 Worklog Time Spent: 10m Work Description: echauchot commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#discussion_r332510266 ## File path: runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ## @@ -451,6 +457,47 @@ public void testAdvanceWatermarkEqualToPositiveInfinityThrows() { source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE); } + @Test + public void testInStreamingModeCountByKey() throws Exception { +Instant instant = new Instant(0); + +CreateStream> kvSource = +CreateStream.of(KvCoder.of(VarIntCoder.of(), VarLongCoder.of()), batchDuration()) +.emptyBatch() +.advanceWatermarkForNextBatch(instant) +.nextBatch( +TimestampedValue.of(KV.of(1, 100L), instant.plus(Duration.standardSeconds(3L))), +TimestampedValue.of(KV.of(1, 300L), instant.plus(Duration.standardSeconds(4L + .advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(7L))) +.nextBatch( +TimestampedValue.of(KV.of(1, 400L), instant.plus(Duration.standardSeconds(8L +.advanceNextBatchWatermarkToInfinity(); + +PCollection> output = +p.apply("create kv Source", kvSource) +.apply( +"window input", +Window.>into(FixedWindows.of(Duration.standardSeconds(3L))) +.withAllowedLateness(Duration.ZERO)) +.apply(Count.perKey()); + +PAssert.that("Wrong count value ", output) +.satisfies( +(SerializableFunction>, Void>) +input -> { + for (KV element : input) { +if (element.getKey() == 1) { + Long countValue = element.getValue(); + assertNotEquals("Count Value is 0 !!!", 0L, countValue.longValue()); Review comment: As I understood expired timers are not evicted and the fact that they are triggered entails an empty collection as output. But it is not in 100% of cases right, only in some corners cases ? I see no corner case in this test case, there should be 3 value output (one per 3s window, with timestamp 3, 4 and 8). I don't understand how this test ensures that the fix works. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 325075) Time Spent: 1h 10m (was: 1h) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} >
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=325077=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-325077 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 08/Oct/19 13:51 Start Date: 08/Oct/19 13:51 Worklog Time Spent: 10m Work Description: echauchot commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#discussion_r332522630 ## File path: runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ## @@ -451,6 +457,47 @@ public void testAdvanceWatermarkEqualToPositiveInfinityThrows() { source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE); } + @Test + public void testInStreamingModeCountByKey() throws Exception { +Instant instant = new Instant(0); + +CreateStream> kvSource = +CreateStream.of(KvCoder.of(VarIntCoder.of(), VarLongCoder.of()), batchDuration()) +.emptyBatch() +.advanceWatermarkForNextBatch(instant) +.nextBatch( +TimestampedValue.of(KV.of(1, 100L), instant.plus(Duration.standardSeconds(3L))), +TimestampedValue.of(KV.of(1, 300L), instant.plus(Duration.standardSeconds(4L + .advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(7L))) +.nextBatch( +TimestampedValue.of(KV.of(1, 400L), instant.plus(Duration.standardSeconds(8L +.advanceNextBatchWatermarkToInfinity(); + +PCollection> output = +p.apply("create kv Source", kvSource) +.apply( +"window input", +Window.>into(FixedWindows.of(Duration.standardSeconds(3L))) +.withAllowedLateness(Duration.ZERO)) +.apply(Count.perKey()); Review comment: you're lucky :) , Count is implemented in the sdk as Combine.PerKey which is translated in the current spark runner as GBK + Pardo and GBK's translation calls `SparkGroupAlsoByWindowViaWindowSet` which contains your fix. So your fix is called even if you call Count and not GBK. If the runner translated Combine.perKey directly (as the new spark runner) you would not be testing your fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 325077) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=325076=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-325076 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 08/Oct/19 13:51 Start Date: 08/Oct/19 13:51 Worklog Time Spent: 10m Work Description: echauchot commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#discussion_r332476939 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ## @@ -338,6 +339,18 @@ public void outputWindowedValue( outputHolder.getWindowedValues(); if (!outputs.isEmpty() || !stateInternals.getState().isEmpty()) { +Collection filteredTimers = +timerInternals.getTimers().stream() +.filter( +timer -> +timer +.getTimestamp() +.plus(windowingStrategy.getAllowedLateness()) + .isBefore(timerInternals.currentInputWatermarkTime())) +.collect(Collectors.toList()); + +filteredTimers.forEach(timerInternals::deleteTimer); + Review comment: Logic with watermarks is good. I first thought we should put that it the core utility method `LateDataUtils.dropExpiredWindows` but this method drops elements based on their windows and watermark. I know think that dropping the internals timers more belongs to this (dedicated to timers) part of the code. but you should wrap this code into a method such as dropExpiredTimers and maybe put it in a core utility such as `LateDataUtils` because it is a common matter of all the runners This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 325076) Time Spent: 1h 20m (was: 1h 10m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote}
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=324908=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-324908 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 08/Oct/19 07:38 Start Date: 08/Oct/19 07:38 Worklog Time Spent: 10m Work Description: echauchot commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-539387837 @bmv126 currently taking a look at the watermark logic in your PR. Sorry for the delay once again This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 324908) Time Spent: 1h (was: 50m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=319390=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319390 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 27/Sep/19 07:59 Start Date: 27/Sep/19 07:59 Worklog Time Spent: 10m Work Description: echauchot commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-535835878 @bmv126 Sorry I'm very late on that, I'm not very available, I'll take a look on monday or tuesday This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319390) Time Spent: 50m (was: 40m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=317385=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317385 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 24/Sep/19 11:22 Start Date: 24/Sep/19 11:22 Worklog Time Spent: 10m Work Description: bmv126 commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-531316233 R: @jbonofre R: @kennknowles R: @iemejia This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 317385) Time Spent: 40m (was: 0.5h) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=315466=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-315466 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 20/Sep/19 04:27 Start Date: 20/Sep/19 04:27 Worklog Time Spent: 10m Work Description: bmv126 commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-533400027 @echauchot I have moved the test case to CreateStreamTest.java file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 315466) Time Spent: 0.5h (was: 20m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=312181=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-312181 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 13/Sep/19 17:05 Start Date: 13/Sep/19 17:05 Worklog Time Spent: 10m Work Description: bmv126 commented on issue #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567#issuecomment-531316233 R: @jbonofre R: @kennknowles This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 312181) Time Spent: 20m (was: 10m) > Issue with GroupByKey in BeamSql using SparkRunner > -- > > Key: BEAM-5690 > URL: https://issues.apache.org/jira/browse/BEAM-5690 > Project: Beam > Issue Type: Task > Components: runner-spark >Reporter: Kenneth Knowles >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Reported on user@ > {quote}We are trying to setup a pipeline with using BeamSql and the trigger > used is default (AfterWatermark crosses the window). > Below is the pipeline: > >KafkaSource (KafkaIO) >---> Windowing (FixedWindow 1min) >---> BeamSql >---> KafkaSink (KafkaIO) > > We are using Spark Runner for this. > The BeamSql query is: > {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code} > We are grouping by Col3 which is a string. It can hold values string[0-9]. > > The records are getting emitted out at 1 min to kafka sink, but the output > record in kafka is not as expected. > Below is the output observed: (WST and WET are indicators for window start > time and window end time) > {code} > {"count_col1":1,"Col3":"string5","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":3,"Col3":"string7","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":2,"Col3":"string8","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string2","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":1,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 +"} > {"count_col1":0,"Col3":"string6","WST":"2018-10-09 09-55-00 > +","WET":"2018-10-09 09-56-00 0} > {code} > {quote} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner
[ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=312180=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-312180 ] ASF GitHub Bot logged work on BEAM-5690: Author: ASF GitHub Bot Created on: 13/Sep/19 17:03 Start Date: 13/Sep/19 17:03 Worklog Time Spent: 10m Work Description: bmv126 commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner URL: https://github.com/apache/beam/pull/9567 With CountByKey transform in sparkRunner the expired window is not removed from the timer set. This causes the empty list to be present as value. 2019-09-12 21:46:57,384 [Executor task launch worker for task 510] TRACE org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet - SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey: output elements are TimestampedValueInSingleWindow{value=KV{Row:[90001], []}, timestamp=2019-09-12T16:15:59.999Z, window=[2019-09-12T16:15:00.000Z..2019-09-12T16:16:00.000Z), pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} This causes zero values to be emitted whenever the watermark advances for the key. Issue can be reproduced using BeamSql (groupByKey) with SparkRunner also. Fix is provided to check if the timer has crossed the WindowBoundary + lateness, if so the timer is removed from SparkTimerInternals. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build