Re: Flink Kafka consumer with low latency requirement
I posted my related observation here in a separated thread. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-making-synchronize-call-might-choke-the-whole-pipeline-tc28383.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink Kafka consumer with low latency requirement
private static void doWork(long tid) throws InterruptedException { if (!sortedTid.contains(tid)) { sortedTid.add(tid); } // simulate a straggler, make the thread with the lowest tid a slow processor if (sortedTid.first() == tid) { if (counter++ == 1000){ Thread.sleep(60,000); } Thread.sleep(20); } else { Thread.sleep(20); } } Just for testing purpose, the thread with the lowest tid sleeps 60s when the counter reaches 1000. Will 'sleep' causes any issues? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink Kafka consumer with low latency requirement
Fabian, Thank you for replying. If I understand your previous comment correctly, I setup up a consumer with parallelism 1 and connect a worker task with parallelism 2. If worker thread one is making a block call and stuck for 60s, the consumer thread should continue fetching from the partition and feeding thread two. >From my reading of Flink documentation, if checkpointing is enabled, the consumer should commit its own internal state back to Kafka to show progress to external monitoring tool. If that`s the case, during the 60s when thread one is stuck, checkpoint should all succeed, thread two continuing chucking along merrily. Even though the highest offset committed is the one less than the offset hold by thread 1. After 60s, I should see a huge jump from the monitoring tool due to the fact the thread 1 has released the offset and all offsets consumed by thread 2 during the 60s can be committed. However, what I have observed is that the as soon as thread one get stuck, checkpointing is choked, consumer thread stopped feeding thread two and the whole pipeline became stagnant. Could you please help me understand this behavior. Thanks again. Ben -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Does Flink Kafka connector has max_pending_offsets concept?
Thanks Fabian. This is really helpful. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Does Flink Kafka connector has max_pending_offsets concept?
Elias Thanks for your reply. In this case, *When # of Kafka consumers = # of partitions, and I use setParallelism(>1), something like this 'messageSteam.rebalance().map(lamba).setParallelism(3).print()' * If checkpointing is enabled, I assume Flink will commit the offsets in the 'right order' during checkpoint. For example, if a batch of offsets comprised of (1,2,3,4,5) and there are three worker threads(setParallelism(3) thread 1 -> 1 [stuck by a sync call] thread 2 -> 2, 3 [success] thread 3 -> 4, 5 [success] Will Flink commit 5? I just want to make sure that Flink will manage the pending offsets correctly so that there will be no data lost if the above code is used on production. Thanks again! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/