Pause and resume is required since i am running a pod in kubernetes and i
am not shutting down the app

On Tue, Oct 23, 2018 at 10:33 PM pradeep s <sreekumar.prad...@gmail.com>
wrote:

> Hi,
> I have a requirement to have kafka streaming start at scheduled time and
> then pause the stream when the consumer poll returns empty fetches for 3 or
> more polls.
>
> I am starting a consumer poll loop during application startup using a
> singled thread executor and then pausing the consumer when the poll is
> returning empty for 3 polls.
>
> When the schedule kicks in , i am calling *consumer.resume.*
>
> Is this approach correct ?
> Will it cause any issue If the  consumer calls poll on a paused consumer.
>
> Skeleton Code
> ============
>
> public class *OfferItemImageConsumer* implements Runnable {
>
> @Override
> public void run() {
>     try {
>         do {
>             ConsumerRecords<String, String> records = 
> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
>             writeAndPauseEmptyFetch(records);
>             processRecords(records);
>         } while (!consumerLoopClosed.get());
>     } catch (RuntimeException ex) {
>         handleConsumerLoopException(ex);
>     } finally {
>         kafkaConsumer.close();
>     }
> }
>
>
> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String> records) 
> {
>     if (records.isEmpty()) {
>         emptyFetchCount++;
>     }
>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
>         writeImageData();
>         emptyFetchCount = 0;
>         kafkaConsumer.pause(kafkaConsumer.assignment());
>         consumerPaused = true;
>     }
> }
>
> }
>
> =================================
>
> public class *ItemImageStreamScheduler* {
>     private static final int TERMINATION_TIMEOUT = 10;
>
>
>     private ExecutorService executorService = 
> Executors.newSingleThreadExecutor();
>
>     private final OfferItemImageConsumer offerItemImageConsumer;
>     private final ItemImageStreamConfig itemImageStreamConfig;
>     private final KafkaConsumer<String, String> kafkaConsumer;
>
>     @EventListener(ApplicationReadyEvent.class)
>     void startStreaming() {
>         executorService.submit(offerItemImageConsumer);
>     }
>     @Scheduled
>     void resumeStreaming() {
>         kafkaConsumer.resume(kafkaConsumer.assignment());
>     }
>
>
> }
>
> Thanks
>
> Pradeep
>
>

Reply via email to