Re: Consumer Pause & Scheduled Resume

2018-10-26 Thread Manoj Khangaonkar
Hi Pradeep The poll , pause and resume need to happen in the same thread -- in the same while loop. If a scheduler is the trigger for pause or resume, do not call pause /resume from the scheduler thread. Instead set a variable in the class that has the poll loop. The poll loop can check the

Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread pradeep s
Code Snippet Without continuous polling == public class OfferItemImageScheduler { @Scheduled(cron = "0 0/2 * * * ?") void startStreaming() { kafkaConsumer.resume(kafkaConsumer.assignment()); offerItemImageConsumer.streamMessages(kafkaConsumer);

Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread pradeep s
Hi Manoj/Matthias, My requirement is that to run the consumer daily once , stream the messages and pause when i am encountering a few empty fetches . I am planning to run two consumers and pausing the consumption based on the empty fetches for a topic with 4 partitions . To avoid the consumer

Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread Manoj Khangaonkar
One item to be aware with pause and resume - is that it applies to partitions currently assigned to the consumer. But partitions can get revoked or additional partitions can get assigned to consumer. With reassigned , you might be expecting the consumer to be paused but suddenly start getting

Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread Matthias J. Sax
That is correct: clients are not thread safe. You can use an `AtomicBoolean needToResume` that you share over both threads and that is initially false. In your scheduled method, you set the variable to true. In your main consumer, each time before you call poll(), you check if the variable is

Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread pradeep s
Thanks Matthias. I am facing the issue when i am trying to call the resume from the scheduled method . Was getting exception that Kafka Consumer is not safe for multi threaded access . I am trying to see how can call pause and resume on the same thread. There will be only one thread running for

Re: Consumer Pause & Scheduled Resume

2018-10-24 Thread Matthias J. Sax
There is no issue if you call `poll()` is all partitions are paused. If fact, if you want to make sure that the consumer does not fall out of the consumer group, you must call `poll()` in regular interval to not hit `max.poll.interval.ms` timeout. -Matthias On 10/24/18 10:25 AM, pradeep s

Re: Consumer Pause & Scheduled Resume

2018-10-24 Thread pradeep s
Pause and resume is required since i am running a pod in kubernetes and i am not shutting down the app On Tue, Oct 23, 2018 at 10:33 PM pradeep s wrote: > Hi, > I have a requirement to have kafka streaming start at scheduled time and > then pause the stream when the consumer poll returns empty

Consumer Pause & Scheduled Resume

2018-10-23 Thread pradeep s
Hi, I have a requirement to have kafka streaming start at scheduled time and then pause the stream when the consumer poll returns empty fetches for 3 or more polls. I am starting a consumer poll loop during application startup using a singled thread executor and then pausing the consumer when the