Hi to all,

I have a problem with Flink and Kafka queues.

I have a Producer that puts some Rows into a data Sink represented by a kafka queue and a Consumer that reads from this sink and process Rows in buckets of *N* elements using custom trigger function

/messageStream.keyBy(0)//
//        .windowAll(GlobalWindows.create())//
//        .trigger(CountWithTimeoutTrigger.of(Time.seconds(30), *N*))//
//        .apply(new RowToQuery());/

/
/The problem is that the Consumer, stop to consume data once reached about 1000 rows.
With N = 20 the consumer process 50 buckets for a total of 1000 elements.
With N = 21 the consumer process 48 buckets for a total of 1008 elements.
With N = 68 the consumer process 15 buckets for a total of 1020 elements. And so on...

The same happens also without using a custom trigger function, but with simple CountTrigger function:

/messageStream.keyBy(0)//
//        .windowAll(GlobalWindows.create())//
//         .trigger(PurgingTrigger.of(CountTrigger.of(//*N*//)))//
//         .apply(new RowToQuery());/

How is it possible? Is there any properties on Consumer to be set in order to process more data?

Thanks,

Simone.

Reply via email to