We are trying to setup a pipeline with using BeamSql and the trigger used
is default (AfterWatermark crosses the window).
Below is the pipeline:

   KafkaSource (KafkaIO) ---> Windowing (FixedWindow 1min) ---> BeamSql
---> KafkaSink (KafkaIO)

We are using Spark Runner for this.
The BeamSql query is:
             select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY
Col3

We are grouping by Col3 which is a string. It can hold values string[0-9].

The records are getting emitted out at 1 min to kafka sink, but the output
record in kafka is not as expected.
Below is the output observed: (WST and WET are indicators for window start
time and window end time)

{"count_col1":1,"Col3":"string5","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000  +0000"}
{"count_col1":3,"Col3":"string7","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000  +0000"}
{"count_col1":2,"Col3":"string8","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000  +0000"}
{"count_col1":1,"Col3":"string2","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000  +0000"}








*{"count_col1":1,"Col3":"string6","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000
+0000"}{"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000
+0000"}{"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000
+0000"}{"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000
+0000"}{"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000
+0000"}{"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000
+0000"}{"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000
+0000"}{"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000
+0000"}{"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000
+0000","WET":"2018-10-09  09-56-00 0000  +0000"}*

We ran the same pipeline using direct and flink runner and we dont see 0
entries for count_col1.

As per beam matrix page (
https://beam.apache.org/documentation/runners/capability-matrix/#cap-summary-what),
GroupBy is not fully supported,is this one of those cases ?

*Thanks & Regards,*

*Vishwas *

Reply via email to