Code Snippet Without continuous polling ================================== public class OfferItemImageScheduler {
@Scheduled(cron = "0 0/2 * * * ?") void startStreaming() { kafkaConsumer.resume(kafkaConsumer.assignment()); offerItemImageConsumer.streamMessages(kafkaConsumer); kafkaConsumer.pause(kafkaConsumer.assignment()); } } ========================= public class OfferItemImageConsumer { public boolean streamMessages(KafkaConsumer<String, String> kafkaConsumer) { try { do { ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); log.info("Kafka poll returned {} records", records.count()); checkEmptyFetch(records); processRecords(records); } while (!consumerPaused.get() && !consumerLoopClosed.get()); } catch (WakeupException wakeupException) { //do nothing if wakeupException is from shutdown hook if (!consumerLoopClosed.get()) { handleConsumerLoopException(wakeupException); } } catch (RuntimeException ex) { handleConsumerLoopException(ex); } finally { resetConsumerStatus(); } return true; } } On Thu, Oct 25, 2018 at 6:11 PM pradeep s <sreekumar.prad...@gmail.com> wrote: > Hi Manoj/Matthias, > My requirement is that to run the consumer daily once , stream the > messages and pause when i am encountering a few empty fetches . > I am planning to run two consumers and pausing the consumption based on > the empty fetches for a topic with 4 partitions . > To avoid the consumer multi thread access issue , i am running consumer, > exit the poll loop, and calling pause on the same thread. In this case , i > will not continuously polling . > When the next schedule kicks in , i will resume the polling . > Will the consumer resume call cause issues ,since the schedule loop is > trigger long time after the polling stopped .(Or the old approach of > continuous polling is the correct one) > Also ,Manoj, can you please explain on the rebalance scenario if the > consumer is paused for two partitions and gets the assignment for another > two partitions (because of a pod termination), how can i pause the > consumption if its not the scheduled time to process the records. > Thanks > Pradeep > > On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar <khangaon...@gmail.com> > wrote: > >> One item to be aware with pause and resume - is that it applies to >> partitions currently assigned to the consumer. >> >> But partitions can get revoked or additional partitions can get assigned >> to >> consumer. >> >> With reassigned , you might be expecting the consumer to be paused but >> suddenly start getting messages because a new partition got assigned. >> >> Use the RebalanceListener to pause or resume any new partitions >> >> regards >> >> On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax <matth...@confluent.io> >> wrote: >> >> > That is correct: clients are not thread safe. >> > >> > You can use an `AtomicBoolean needToResume` that you share over both >> > threads and that is initially false. >> > >> > In your scheduled method, you set the variable to true. >> > >> > In your main consumer, each time before you call poll(), you check if >> > the variable is set to true. If yes, you resume() and reset the variable >> > to false. >> > >> > Hope this helps. >> > >> > -Matthias >> > >> > >> > On 10/25/18 2:09 PM, pradeep s wrote: >> > > Thanks Matthias. I am facing the issue when i am trying to call the >> > resume >> > > from the scheduled method . >> > > Was getting exception that Kafka Consumer is not safe for multi >> threaded >> > > access . I am trying to see how can call pause and resume on the same >> > > thread. There will be only one thread running for consumption. >> > > >> > > >> > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax < >> matth...@confluent.io> >> > > wrote: >> > > >> > >> There is no issue if you call `poll()` is all partitions are paused. >> If >> > >> fact, if you want to make sure that the consumer does not fall out of >> > >> the consumer group, you must call `poll()` in regular interval to not >> > >> hit `max.poll.interval.ms` timeout. >> > >> >> > >> >> > >> -Matthias >> > >> >> > >> On 10/24/18 10:25 AM, pradeep s wrote: >> > >>> Pause and resume is required since i am running a pod in kubernetes >> > and i >> > >>> am not shutting down the app >> > >>> >> > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s < >> > sreekumar.prad...@gmail.com> >> > >>> wrote: >> > >>> >> > >>>> Hi, >> > >>>> I have a requirement to have kafka streaming start at scheduled >> time >> > and >> > >>>> then pause the stream when the consumer poll returns empty fetches >> for >> > >> 3 or >> > >>>> more polls. >> > >>>> >> > >>>> I am starting a consumer poll loop during application startup >> using a >> > >>>> singled thread executor and then pausing the consumer when the >> poll is >> > >>>> returning empty for 3 polls. >> > >>>> >> > >>>> When the schedule kicks in , i am calling *consumer.resume.* >> > >>>> >> > >>>> Is this approach correct ? >> > >>>> Will it cause any issue If the consumer calls poll on a paused >> > >> consumer. >> > >>>> >> > >>>> Skeleton Code >> > >>>> ============ >> > >>>> >> > >>>> public class *OfferItemImageConsumer* implements Runnable { >> > >>>> >> > >>>> @Override >> > >>>> public void run() { >> > >>>> try { >> > >>>> do { >> > >>>> ConsumerRecords<String, String> records = >> > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); >> > >>>> writeAndPauseEmptyFetch(records); >> > >>>> processRecords(records); >> > >>>> } while (!consumerLoopClosed.get()); >> > >>>> } catch (RuntimeException ex) { >> > >>>> handleConsumerLoopException(ex); >> > >>>> } finally { >> > >>>> kafkaConsumer.close(); >> > >>>> } >> > >>>> } >> > >>>> >> > >>>> >> > >>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String, >> String> >> > >> records) { >> > >>>> if (records.isEmpty()) { >> > >>>> emptyFetchCount++; >> > >>>> } >> > >>>> if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && >> !consumerPaused) { >> > >>>> writeImageData(); >> > >>>> emptyFetchCount = 0; >> > >>>> kafkaConsumer.pause(kafkaConsumer.assignment()); >> > >>>> consumerPaused = true; >> > >>>> } >> > >>>> } >> > >>>> >> > >>>> } >> > >>>> >> > >>>> ================================= >> > >>>> >> > >>>> public class *ItemImageStreamScheduler* { >> > >>>> private static final int TERMINATION_TIMEOUT = 10; >> > >>>> >> > >>>> >> > >>>> private ExecutorService executorService = >> > >> Executors.newSingleThreadExecutor(); >> > >>>> >> > >>>> private final OfferItemImageConsumer offerItemImageConsumer; >> > >>>> private final ItemImageStreamConfig itemImageStreamConfig; >> > >>>> private final KafkaConsumer<String, String> kafkaConsumer; >> > >>>> >> > >>>> @EventListener(ApplicationReadyEvent.class) >> > >>>> void startStreaming() { >> > >>>> executorService.submit(offerItemImageConsumer); >> > >>>> } >> > >>>> @Scheduled >> > >>>> void resumeStreaming() { >> > >>>> kafkaConsumer.resume(kafkaConsumer.assignment()); >> > >>>> } >> > >>>> >> > >>>> >> > >>>> } >> > >>>> >> > >>>> Thanks >> > >>>> >> > >>>> Pradeep >> > >>>> >> > >>>> >> > >>> >> > >> >> > >> >> > > >> > >> > >> >> -- >> http://khangaonkar.blogspot.com/ >> >