Pause and resume is required since i am running a pod in kubernetes and i am not shutting down the app
On Tue, Oct 23, 2018 at 10:33 PM pradeep s <sreekumar.prad...@gmail.com> wrote: > Hi, > I have a requirement to have kafka streaming start at scheduled time and > then pause the stream when the consumer poll returns empty fetches for 3 or > more polls. > > I am starting a consumer poll loop during application startup using a > singled thread executor and then pausing the consumer when the poll is > returning empty for 3 polls. > > When the schedule kicks in , i am calling *consumer.resume.* > > Is this approach correct ? > Will it cause any issue If the consumer calls poll on a paused consumer. > > Skeleton Code > ============ > > public class *OfferItemImageConsumer* implements Runnable { > > @Override > public void run() { > try { > do { > ConsumerRecords<String, String> records = > kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); > writeAndPauseEmptyFetch(records); > processRecords(records); > } while (!consumerLoopClosed.get()); > } catch (RuntimeException ex) { > handleConsumerLoopException(ex); > } finally { > kafkaConsumer.close(); > } > } > > > private void writeAndPauseEmptyFetch(ConsumerRecords<String, String> records) > { > if (records.isEmpty()) { > emptyFetchCount++; > } > if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) { > writeImageData(); > emptyFetchCount = 0; > kafkaConsumer.pause(kafkaConsumer.assignment()); > consumerPaused = true; > } > } > > } > > ================================= > > public class *ItemImageStreamScheduler* { > private static final int TERMINATION_TIMEOUT = 10; > > > private ExecutorService executorService = > Executors.newSingleThreadExecutor(); > > private final OfferItemImageConsumer offerItemImageConsumer; > private final ItemImageStreamConfig itemImageStreamConfig; > private final KafkaConsumer<String, String> kafkaConsumer; > > @EventListener(ApplicationReadyEvent.class) > void startStreaming() { > executorService.submit(offerItemImageConsumer); > } > @Scheduled > void resumeStreaming() { > kafkaConsumer.resume(kafkaConsumer.assignment()); > } > > > } > > Thanks > > Pradeep > >