Hi to all,
I have a problem with Flink and Kafka queues.
I have a Producer that puts some Rows into a data Sink represented by a
kafka queue and a Consumer that reads from this sink and process Rows in
buckets of *N* elements using custom trigger function
/messageStream.keyBy(0)//
// .windowAll(GlobalWindows.create())//
// .trigger(CountWithTimeoutTrigger.of(Time.seconds(30), *N*))//
// .apply(new RowToQuery());/
/
/The problem is that the Consumer, stop to consume data once reached
about 1000 rows.
With N = 20 the consumer process 50 buckets for a total of 1000 elements.
With N = 21 the consumer process 48 buckets for a total of 1008 elements.
With N = 68 the consumer process 15 buckets for a total of 1020
elements. And so on...
The same happens also without using a custom trigger function, but with
simple CountTrigger function:
/messageStream.keyBy(0)//
// .windowAll(GlobalWindows.create())//
// .trigger(PurgingTrigger.of(CountTrigger.of(//*N*//)))//
// .apply(new RowToQuery());/
How is it possible? Is there any properties on Consumer to be set in
order to process more data?
Thanks,
Simone.