Code Snippet Without continuous polling
==================================
public class OfferItemImageScheduler {

@Scheduled(cron = "0 0/2 * * * ?")

void startStreaming() {
    kafkaConsumer.resume(kafkaConsumer.assignment());
    offerItemImageConsumer.streamMessages(kafkaConsumer);
    kafkaConsumer.pause(kafkaConsumer.assignment());

}

}

=========================

public class OfferItemImageConsumer {

public boolean streamMessages(KafkaConsumer<String, String> kafkaConsumer) {
    try {
        do {
            ConsumerRecords<String, String> records =
kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
            log.info("Kafka poll returned {} records", records.count());
            checkEmptyFetch(records);
            processRecords(records);
        } while (!consumerPaused.get() && !consumerLoopClosed.get());
    } catch (WakeupException wakeupException) {
        //do nothing if wakeupException is from shutdown hook
        if (!consumerLoopClosed.get()) {
            handleConsumerLoopException(wakeupException);
        }
    } catch (RuntimeException ex) {
        handleConsumerLoopException(ex);
    } finally {
        resetConsumerStatus();
    }
    return true;
}


}


On Thu, Oct 25, 2018 at 6:11 PM pradeep s <sreekumar.prad...@gmail.com>
wrote:

> Hi Manoj/Matthias,
> My requirement is that to run the consumer daily once , stream the
> messages and pause when i am encountering a few empty fetches .
> I am planning to  run two consumers and  pausing the consumption based on
> the empty fetches for a topic with 4 partitions .
> To avoid the consumer multi thread access issue , i am running  consumer,
> exit  the poll loop, and calling pause on the same thread. In this case , i
> will not continuously polling .
> When the next schedule kicks in , i will resume the polling .
> Will the consumer resume call cause issues  ,since the schedule loop is
> trigger long time after the polling stopped .(Or the old approach of
> continuous polling is the correct one)
> Also ,Manoj, can you please explain on the rebalance scenario if the
> consumer is paused for two partitions and gets the assignment for another
> two partitions (because of a pod termination), how can i pause the
> consumption if its not the scheduled time to process the records.
> Thanks
> Pradeep
>
> On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar <khangaon...@gmail.com>
> wrote:
>
>> One item to be aware with pause and resume - is that it applies to
>> partitions currently assigned to the consumer.
>>
>> But partitions can get revoked or additional partitions can get assigned
>> to
>> consumer.
>>
>> With reassigned , you might be expecting the consumer to be paused but
>> suddenly start getting messages because a new partition got assigned.
>>
>> Use the RebalanceListener to pause or resume any new partitions
>>
>> regards
>>
>> On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>> > That is correct: clients are not thread safe.
>> >
>> > You can use an `AtomicBoolean needToResume` that you share over both
>> > threads and that is initially false.
>> >
>> > In your scheduled method, you set the variable to true.
>> >
>> > In your main consumer, each time before you call poll(), you check if
>> > the variable is set to true. If yes, you resume() and reset the variable
>> > to false.
>> >
>> > Hope this helps.
>> >
>> > -Matthias
>> >
>> >
>> > On 10/25/18 2:09 PM, pradeep s wrote:
>> > > Thanks Matthias. I am facing the issue  when i am trying to call the
>> > resume
>> > > from the scheduled method .
>> > > Was getting exception that  Kafka Consumer is not safe for multi
>> threaded
>> > > access . I am trying to see how can call pause and resume on the same
>> > > thread. There will be only one thread running for consumption.
>> > >
>> > >
>> > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <
>> matth...@confluent.io>
>> > > wrote:
>> > >
>> > >> There is no issue if you call `poll()` is all partitions are paused.
>> If
>> > >> fact, if you want to make sure that the consumer does not fall out of
>> > >> the consumer group, you must call `poll()` in regular interval to not
>> > >> hit `max.poll.interval.ms` timeout.
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >> On 10/24/18 10:25 AM, pradeep s wrote:
>> > >>> Pause and resume is required since i am running a pod in kubernetes
>> > and i
>> > >>> am not shutting down the app
>> > >>>
>> > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
>> > sreekumar.prad...@gmail.com>
>> > >>> wrote:
>> > >>>
>> > >>>> Hi,
>> > >>>> I have a requirement to have kafka streaming start at scheduled
>> time
>> > and
>> > >>>> then pause the stream when the consumer poll returns empty fetches
>> for
>> > >> 3 or
>> > >>>> more polls.
>> > >>>>
>> > >>>> I am starting a consumer poll loop during application startup
>> using a
>> > >>>> singled thread executor and then pausing the consumer when the
>> poll is
>> > >>>> returning empty for 3 polls.
>> > >>>>
>> > >>>> When the schedule kicks in , i am calling *consumer.resume.*
>> > >>>>
>> > >>>> Is this approach correct ?
>> > >>>> Will it cause any issue If the  consumer calls poll on a paused
>> > >> consumer.
>> > >>>>
>> > >>>> Skeleton Code
>> > >>>> ============
>> > >>>>
>> > >>>> public class *OfferItemImageConsumer* implements Runnable {
>> > >>>>
>> > >>>> @Override
>> > >>>> public void run() {
>> > >>>>     try {
>> > >>>>         do {
>> > >>>>             ConsumerRecords<String, String> records =
>> > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
>> > >>>>             writeAndPauseEmptyFetch(records);
>> > >>>>             processRecords(records);
>> > >>>>         } while (!consumerLoopClosed.get());
>> > >>>>     } catch (RuntimeException ex) {
>> > >>>>         handleConsumerLoopException(ex);
>> > >>>>     } finally {
>> > >>>>         kafkaConsumer.close();
>> > >>>>     }
>> > >>>> }
>> > >>>>
>> > >>>>
>> > >>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String,
>> String>
>> > >> records) {
>> > >>>>     if (records.isEmpty()) {
>> > >>>>         emptyFetchCount++;
>> > >>>>     }
>> > >>>>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD &&
>> !consumerPaused) {
>> > >>>>         writeImageData();
>> > >>>>         emptyFetchCount = 0;
>> > >>>>         kafkaConsumer.pause(kafkaConsumer.assignment());
>> > >>>>         consumerPaused = true;
>> > >>>>     }
>> > >>>> }
>> > >>>>
>> > >>>> }
>> > >>>>
>> > >>>> =================================
>> > >>>>
>> > >>>> public class *ItemImageStreamScheduler* {
>> > >>>>     private static final int TERMINATION_TIMEOUT = 10;
>> > >>>>
>> > >>>>
>> > >>>>     private ExecutorService executorService =
>> > >> Executors.newSingleThreadExecutor();
>> > >>>>
>> > >>>>     private final OfferItemImageConsumer offerItemImageConsumer;
>> > >>>>     private final ItemImageStreamConfig itemImageStreamConfig;
>> > >>>>     private final KafkaConsumer<String, String> kafkaConsumer;
>> > >>>>
>> > >>>>     @EventListener(ApplicationReadyEvent.class)
>> > >>>>     void startStreaming() {
>> > >>>>         executorService.submit(offerItemImageConsumer);
>> > >>>>     }
>> > >>>>     @Scheduled
>> > >>>>     void resumeStreaming() {
>> > >>>>         kafkaConsumer.resume(kafkaConsumer.assignment());
>> > >>>>     }
>> > >>>>
>> > >>>>
>> > >>>> }
>> > >>>>
>> > >>>> Thanks
>> > >>>>
>> > >>>> Pradeep
>> > >>>>
>> > >>>>
>> > >>>
>> > >>
>> > >>
>> > >
>> >
>> >
>>
>> --
>> http://khangaonkar.blogspot.com/
>>
>

Reply via email to