Storm kafka integration
Hi, I am using Storm 1.0.2 and Kafka 0.10.1.1 and have query on Spout code to integrate with Kafka. As per storm docs , its mentioned to use Broker Hosts to register the Kafka Spout. http://storm.apache.org/releases/1.0.2/storm-kafka.html In this case will the consumer offsets be stored in zookeeper. Is this the preferred approach . I have read that in latest Kafka version , consumer offsets can be maintained in Kafka cluster itslef. Is there any storm spout example for this . Regards Pradeep S
KStreams API Usage
Hi, I am trying to call kafka stream close based on the presence of a value in the output of ValueTransformer.ValueTransformer produces a List Is there a way to avoid the foreach on Kstream and try to get the first value alone? (like streams api method findFirst) private void checkMerchHierarchyEmpty(KStreamtrans) { trans.filter((key, value) -> value.stream().anyMatch(val -> MERCH_HIERARCHY_CACHE_EMPTY.equals( val.getErrorMessage(.foreach( ((key, value) -> { metricsClient.writeMetric(CountMetric.generate(STREAM_SHUTDOWN_ACTION, 1)); log.fatal("Shutting down kafka stream since merch hierarchy is empty"); kafkaStreams.close(STREAM_SHUTDOWN_WAITTIME_MS, TimeUnit.MILLISECONDS); }) ); } Thanks Pradeep
Kafka stream specify key for message
Hi, In kafka stream, when we use *to *method for sending values to a topic, is there a way to mention the message key . .to(outputTopic, Produced.with(byteArraySerde, itemEnvelopeSerde)); In Produced class , i cant find a way to set the key. https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/Produced.html Thanks Pradeep
Kafka consumer loop exception handling
Hi, I am running a poll loop for kafka consumer and the app is deployed in kubernetes.I am using manual commits.Have couple of questions on exception handling in the poll loop 1) Do i need to handle consumer rebalance scenario(when any of the consumer pod dies) by adding a listener or will the commits be taken care after rebalance . 2) Do i need to handle CommitFailedException specifically Consume loop code below @Override public void run() { try { do { processRecords(kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs())); kafkaConsumer.commitSync(); } while (!isConsumerLoopClosed.get()); } catch (WakeupException wakeupException) { //do nothing if wakeupException is from shutdown hook if (!isConsumerLoopClosed.get()) { handleConsumerLoopException(wakeupException); } } catch (RuntimeException ex) { handleConsumerLoopException(ex); } finally { kafkaConsumer.close(); } } Thanks Pradeep
Re: Kafka consumer loop exception handling
Than you. In my case i am receiving messages , doing a small transformation and sending to a output topic . If i am running 4 consumers against 4 partitions and one of the consumer dies , will there be duplicate messages sent in this case Since when the new consumer comes up , it will again process from the uncommitted offset . So do i need transaction semantics in this scenario. On Fri, Jun 1, 2018 at 4:56 AM, M. Manna wrote: > This is actually quite nicely explained by Jason Gustafson on this article > - > https://www.confluent.io/blog/tutorial-getting-started-with- > the-new-apache-kafka-0-9-consumer-client/ > > It's technically up to the application on how to determine whether message > is fully received. If you have database txn involved, I would say that > CommitFailedException should revert all changes you have done. Because you > couldn't commit the offset successfully, you haven't "Really" consumed any > message. > > Tailoring your code a little bit: > > @Override > public void run() { > try { > do { > processRecords(kafkaConsumer.poll(kafkaConfig. > getPollTimeoutMs())); > kafkaConsumer.commitSync(); > } while (!isConsumerLoopClosed.get()); > } catch (WakeupException wakeupException) { > //do nothing if wakeupException is from shutdown hook > if (!isConsumerLoopClosed.get()) { > handleConsumerLoopException(wakeupException); > } > } catch (RuntimeException ex) { // RuntimeException could also happen > for other reasons here > if (ex instanceof CommitFailedException) { > // revert db txn etc. to avoid false positives > } else if (ex instanceof KafkaException) { > // do something else. > } else { >// alternatively, do this > } > handleConsumerLoopException(ex); > } finally { > kafkaConsumer.close(); > } > > } > > One thing to remember is that when you are sending data, as of 1.0.0 API > you can have a "Txn-like" finer control to determine when you have > successfully committed a transaction. You can check beginTransaction(), > commitTransaction(), abortTransaction() methods to see how they can be > utilised to have even finer control over your message delivery. > > Regards, > > > On 1 June 2018 at 05:54, pradeep s wrote: > > > Hi, > > I am running a poll loop for kafka consumer and the app is deployed in > > kubernetes.I am using manual commits.Have couple of questions on > exception > > handling in the poll loop > > > > 1) Do i need to handle consumer rebalance scenario(when any of the > consumer > > pod dies) by adding a listener or will the commits be taken care after > > rebalance . > > > > 2) Do i need to handle CommitFailedException specifically > > > > Consume loop code below > > > > > > @Override > > public void run() { > > try { > > do { > > processRecords(kafkaConsumer.poll(kafkaConfig. > > getPollTimeoutMs())); > > kafkaConsumer.commitSync(); > > } while (!isConsumerLoopClosed.get()); > > } catch (WakeupException wakeupException) { > > //do nothing if wakeupException is from shutdown hook > > if (!isConsumerLoopClosed.get()) { > > handleConsumerLoopException(wakeupException); > > } > > } catch (RuntimeException ex) { > > handleConsumerLoopException(ex); > > } finally { > > kafkaConsumer.close(); > > } > > > > > > } > > > > Thanks > > Pradeep > > >
Kafka consumer commit behaviour on rebalance
Hi All, I am running a Kafka consumer(Single threaded) on kubernetes . Application is polling the records and accummulating in memory . There is a scheduled write of these records to S3 . Only after that i am committing the offsets back to Kafka. There are 4 partitions and 4 consumers(4 kubernetes pods) are there in the group. I have a question on the commit behaviour when kafka rebalancing happens . After a kafka rebalance , will the consumers get reassigned and get duplicate records . Do i need to clear my in memory buffer on a rebalance event Thanks Pradeep
Kakfa embedded cluster rebalance scenario test
Hi , I would like to add a integration test for kafka rebalance scenario and make sure retries are working as expected if enough replicas are not there in my kafka streams application . Do you have any info on how can i achieve this . I was checking TestUtils class in org.apache.kafka.test, but could not figure out a way on how to trigger a rebalance or NotLeaderForPartitionException scenario using a embedded kafka cluster Thanks Pradeep
Kafka streams state directory - help
Hi, I am using kafka streams app connecting to confluent kafka cluster(10.2.1). Application is reading messages from a topic, performing a tranformation and pushing to output topic . There is no count or aggregation performed . Have following clarifications regarding state directory. *1)* Will there be any data written in state directory? When i verified the state directory , it was showing 0 0 *2)* Application is running in kubernetes without any external volumes . Will state directory cause any processing issue during kubernetes pod restarts? *3)* Will the app creates a changelog topic since there is no in memory store used in the app? Code Snippet === Stream Config = properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, streamEnvironment.getKafkaBootstrapServers()); properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUMBER_OF_STREAM_THREADS); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY); properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, ItemDeserializationExceptionHandler.class); Stream builder = private void processStream(StreamsBuilder builder) { KStreaminput = builder.stream(inputTopic, Consumed.with(byteArraySerde, byteArraySerde)) .peek((key, value) -> metricsClient.writeMetric( CountMetric.generate(METRIC_OFFER_INPUT, 1))); KStream deserializationResultStream = input .mapValues(this::deserializeOffer); quarantineNonDeSerializableOffers(deserializationResultStream); KStream trans = transformOffers(deserializationResultStream); produceToQuarantineTopic(trans); produceToOutputTopic(trans); } private void produceToOutputTopic(KStream trans) { trans.filter((key, value) -> value != null && !value.isEmpty()) .peek((key, value) -> metricsClient.writeMetric(CountMetric.generate(METRIC_ITEMS_OUTPUT, 1))) .flatMapValues(transformerResults -> transformerResults.stream() .map(TransformerResult::getItem) .filter(Objects::nonNull) .collect(Collectors.toCollection(ArrayList::new))) .to(outputTopic, Produced.with(byteArraySerde, itemEnvelopeSerde)); } private void produceToQuarantineTopic(KStream trans) { trans.filter((key, value) -> value == null || value.isEmpty() || value.stream().anyMatch(TransformerResult::hasErrors)) .mapValues(val -> toQuarantineEnvelope(val, INVALID_SKU)) .to(quarantineTopic, Produced.with(byteArraySerde, quarantineItemEnvelopeSerde)); } Thanks Pradeep
Re: Kafka streams state directory - help
Thanks Matthias. Can you also please confirm the compatible versions of the client dependencies . Our broker version is 10.2.1 and when i updgrade the client library to 1.1.0, i am getting a issue with tests while starting the embedded cluster . Test dependencies are (kafka-stream.version is 1.1.0) org.apache.kafka kafka-streams ${kafka-stream.version} test test org.apache.kafka kafka-clients ${kafka-stream.version} test test org.apache.kafka kafka_2.11 ${kafka-stream.version} test test Test error === java.lang.AbstractMethodError: kafka.zk.EmbeddedZookeeper.kafka$utils$Logging$_setter_$loggerName_$eq(Ljava/lang/String;)V at kafka.utils.Logging$class.$init$(Logging.scala:23) at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:37) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.start(EmbeddedKafkaCluster.java:87) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.before(EmbeddedKafkaCluster.java:153) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46) at com.github.tomakehurst.wiremock.junit.WireMockClassRule$1.evaluate(WireMockClassRule.java:70) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) On Sat, Apr 21, 2018 at 3:06 AM, Matthias J. Sax <matth...@confluent.io> wrote: > You are hitting: https://issues.apache.org/jira/browse/KAFKA-6499 > > Was fixed in 1.1 release. > > Thus, you can just ignore the checkpoint file. There should be no issue > with running on Kubernetes. > > Also, if there is no store (independent of disk based or in-memory) > there will be no changelog topic. > > > -Matthias > > On 4/21/18 8:34 AM, pradeep s wrote: > > Hi, > > I am using kafka streams app connecting to confluent kafka > cluster(10.2.1). > > Application is reading messages from a topic, performing a tranformation > > and pushing to output topic . There is no count or aggregation performed > . > > Have following clarifications regarding state directory. > > > > *1)* Will there be any data written in state directory? > > When i verified the state directory , it was showing > > 0 > > 0 > > > > *2)* Application is running in kubernetes without any external volumes . > > Will state directory cause any processing issue during kubernetes pod > > restarts? > > > > *3)* Will the app creates a changelog topic since there is no in memory > > store used in the app? > > > > > > Code Snippet > > === > > > > Stream Config > > = > > > > properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > streamEnvironment.getKafkaBootstrapServers()); > > properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, > > NUMBER_OF_STREAM_THREADS); > > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > COMMIT_INTERVAL_MS); > > > > > > properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY); > > properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_ > EXCEPTION_HANDLER_CLASS_CONFIG, > > ItemDeserializationExceptionHandler.class); > > > > > > > > Stream builder > > = > > > > private void processStream(StreamsBuilder builder) { > > KStream<byte[], byte[]> input = builder.stream(inputTopic, > > Consumed.with(byteArraySerde, byteArraySerde)) > >.peek((key, value) -> > > metricsClient.writeMetric( > > > > CountMetric.generate(METRIC_OFFER_INPUT, 1))); > > > > KStream<byte[], DeserializationResultWrapper> > > deserializationResultStream = input > > .mapValues(this::deserializeOffer); > > > > quarantineNonDeSerializableOffers(deserializationResultStream); > > > > KStream<byte[], List> trans = > > transformOffers(deserializationResultStream); > > > > produceToQuarantineTopic(trans); > > > > produceToOutputTopic(trans); > > > > } > > > > private void produceToOutputTopic(KStream<byte[], > > List> trans) { > > trans.filter((key, va
Kafka commit interval
Hi, I have a usecase to stream messages from Kafka and buffer it in memory till a message count is reached and then write these to output file . I am using manual commit . I have a question on whats the maximum time i can wait after consuming the message and till we commit back to Kafka . Is there a upper limit for this ? Is consumer rebalance the only issue while waiting long periods, then can i write the contents using a rebalance listener and commit at that point . Thanks Pradeep
Re: Kafka commit interval
Thanks Matthias On Fri, Oct 19, 2018 at 11:25 AM Matthias J. Sax wrote: > There is not upper limit. > > And yes, you are right about rebalancing. This would be an issue and > yes, you can use the rebalance listener to address it (it's the purpose > of the rebalance listener to be used for cases like this). > > -Matthias > > > On 10/16/18 2:19 PM, pradeep s wrote: > > Hi, > > I have a usecase to stream messages from Kafka and buffer it in memory > till > > a message count is reached and then write these to output file . I am > using > > manual commit . I have a question on whats the maximum time i can wait > > after consuming the message and till we commit back to Kafka . Is there a > > upper limit for this ? Is consumer rebalance the only issue while waiting > > long periods, then can i write the contents using a rebalance listener > and > > commit at that point . > > Thanks > > Pradeep > > > >
Re: Consumer Pause & Scheduled Resume
Pause and resume is required since i am running a pod in kubernetes and i am not shutting down the app On Tue, Oct 23, 2018 at 10:33 PM pradeep s wrote: > Hi, > I have a requirement to have kafka streaming start at scheduled time and > then pause the stream when the consumer poll returns empty fetches for 3 or > more polls. > > I am starting a consumer poll loop during application startup using a > singled thread executor and then pausing the consumer when the poll is > returning empty for 3 polls. > > When the schedule kicks in , i am calling *consumer.resume.* > > Is this approach correct ? > Will it cause any issue If the consumer calls poll on a paused consumer. > > Skeleton Code > > > public class *OfferItemImageConsumer* implements Runnable { > > @Override > public void run() { > try { > do { > ConsumerRecords records = > kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); > writeAndPauseEmptyFetch(records); > processRecords(records); > } while (!consumerLoopClosed.get()); > } catch (RuntimeException ex) { > handleConsumerLoopException(ex); > } finally { > kafkaConsumer.close(); > } > } > > > private void writeAndPauseEmptyFetch(ConsumerRecords records) > { > if (records.isEmpty()) { > emptyFetchCount++; > } > if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) { > writeImageData(); > emptyFetchCount = 0; > kafkaConsumer.pause(kafkaConsumer.assignment()); > consumerPaused = true; > } > } > > } > > = > > public class *ItemImageStreamScheduler* { > private static final int TERMINATION_TIMEOUT = 10; > > > private ExecutorService executorService = > Executors.newSingleThreadExecutor(); > > private final OfferItemImageConsumer offerItemImageConsumer; > private final ItemImageStreamConfig itemImageStreamConfig; > private final KafkaConsumer kafkaConsumer; > > @EventListener(ApplicationReadyEvent.class) > void startStreaming() { > executorService.submit(offerItemImageConsumer); > } > @Scheduled > void resumeStreaming() { > kafkaConsumer.resume(kafkaConsumer.assignment()); > } > > > } > > Thanks > > Pradeep > >
Re: Consumer Pause & Scheduled Resume
Thanks Matthias. I am facing the issue when i am trying to call the resume from the scheduled method . Was getting exception that Kafka Consumer is not safe for multi threaded access . I am trying to see how can call pause and resume on the same thread. There will be only one thread running for consumption. On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax wrote: > There is no issue if you call `poll()` is all partitions are paused. If > fact, if you want to make sure that the consumer does not fall out of > the consumer group, you must call `poll()` in regular interval to not > hit `max.poll.interval.ms` timeout. > > > -Matthias > > On 10/24/18 10:25 AM, pradeep s wrote: > > Pause and resume is required since i am running a pod in kubernetes and i > > am not shutting down the app > > > > On Tue, Oct 23, 2018 at 10:33 PM pradeep s > > wrote: > > > >> Hi, > >> I have a requirement to have kafka streaming start at scheduled time and > >> then pause the stream when the consumer poll returns empty fetches for > 3 or > >> more polls. > >> > >> I am starting a consumer poll loop during application startup using a > >> singled thread executor and then pausing the consumer when the poll is > >> returning empty for 3 polls. > >> > >> When the schedule kicks in , i am calling *consumer.resume.* > >> > >> Is this approach correct ? > >> Will it cause any issue If the consumer calls poll on a paused > consumer. > >> > >> Skeleton Code > >> > >> > >> public class *OfferItemImageConsumer* implements Runnable { > >> > >> @Override > >> public void run() { > >> try { > >> do { > >> ConsumerRecords records = > kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); > >> writeAndPauseEmptyFetch(records); > >> processRecords(records); > >> } while (!consumerLoopClosed.get()); > >> } catch (RuntimeException ex) { > >> handleConsumerLoopException(ex); > >> } finally { > >> kafkaConsumer.close(); > >> } > >> } > >> > >> > >> private void writeAndPauseEmptyFetch(ConsumerRecords > records) { > >> if (records.isEmpty()) { > >> emptyFetchCount++; > >> } > >> if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) { > >> writeImageData(); > >> emptyFetchCount = 0; > >> kafkaConsumer.pause(kafkaConsumer.assignment()); > >> consumerPaused = true; > >> } > >> } > >> > >> } > >> > >> = > >> > >> public class *ItemImageStreamScheduler* { > >> private static final int TERMINATION_TIMEOUT = 10; > >> > >> > >> private ExecutorService executorService = > Executors.newSingleThreadExecutor(); > >> > >> private final OfferItemImageConsumer offerItemImageConsumer; > >> private final ItemImageStreamConfig itemImageStreamConfig; > >> private final KafkaConsumer kafkaConsumer; > >> > >> @EventListener(ApplicationReadyEvent.class) > >> void startStreaming() { > >> executorService.submit(offerItemImageConsumer); > >> } > >> @Scheduled > >> void resumeStreaming() { > >> kafkaConsumer.resume(kafkaConsumer.assignment()); > >> } > >> > >> > >> } > >> > >> Thanks > >> > >> Pradeep > >> > >> > > > >
Re: Consumer Pause & Scheduled Resume
Hi Manoj/Matthias, My requirement is that to run the consumer daily once , stream the messages and pause when i am encountering a few empty fetches . I am planning to run two consumers and pausing the consumption based on the empty fetches for a topic with 4 partitions . To avoid the consumer multi thread access issue , i am running consumer, exit the poll loop, and calling pause on the same thread. In this case , i will not continuously polling . When the next schedule kicks in , i will resume the polling . Will the consumer resume call cause issues ,since the schedule loop is trigger long time after the polling stopped .(Or the old approach of continuous polling is the correct one) Also ,Manoj, can you please explain on the rebalance scenario if the consumer is paused for two partitions and gets the assignment for another two partitions (because of a pod termination), how can i pause the consumption if its not the scheduled time to process the records. Thanks Pradeep On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar wrote: > One item to be aware with pause and resume - is that it applies to > partitions currently assigned to the consumer. > > But partitions can get revoked or additional partitions can get assigned to > consumer. > > With reassigned , you might be expecting the consumer to be paused but > suddenly start getting messages because a new partition got assigned. > > Use the RebalanceListener to pause or resume any new partitions > > regards > > On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax > wrote: > > > That is correct: clients are not thread safe. > > > > You can use an `AtomicBoolean needToResume` that you share over both > > threads and that is initially false. > > > > In your scheduled method, you set the variable to true. > > > > In your main consumer, each time before you call poll(), you check if > > the variable is set to true. If yes, you resume() and reset the variable > > to false. > > > > Hope this helps. > > > > -Matthias > > > > > > On 10/25/18 2:09 PM, pradeep s wrote: > > > Thanks Matthias. I am facing the issue when i am trying to call the > > resume > > > from the scheduled method . > > > Was getting exception that Kafka Consumer is not safe for multi > threaded > > > access . I am trying to see how can call pause and resume on the same > > > thread. There will be only one thread running for consumption. > > > > > > > > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax > > > > wrote: > > > > > >> There is no issue if you call `poll()` is all partitions are paused. > If > > >> fact, if you want to make sure that the consumer does not fall out of > > >> the consumer group, you must call `poll()` in regular interval to not > > >> hit `max.poll.interval.ms` timeout. > > >> > > >> > > >> -Matthias > > >> > > >> On 10/24/18 10:25 AM, pradeep s wrote: > > >>> Pause and resume is required since i am running a pod in kubernetes > > and i > > >>> am not shutting down the app > > >>> > > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s < > > sreekumar.prad...@gmail.com> > > >>> wrote: > > >>> > > >>>> Hi, > > >>>> I have a requirement to have kafka streaming start at scheduled time > > and > > >>>> then pause the stream when the consumer poll returns empty fetches > for > > >> 3 or > > >>>> more polls. > > >>>> > > >>>> I am starting a consumer poll loop during application startup using > a > > >>>> singled thread executor and then pausing the consumer when the poll > is > > >>>> returning empty for 3 polls. > > >>>> > > >>>> When the schedule kicks in , i am calling *consumer.resume.* > > >>>> > > >>>> Is this approach correct ? > > >>>> Will it cause any issue If the consumer calls poll on a paused > > >> consumer. > > >>>> > > >>>> Skeleton Code > > >>>> > > >>>> > > >>>> public class *OfferItemImageConsumer* implements Runnable { > > >>>> > > >>>> @Override > > >>>> public void run() { > > >>>> try { > > >>>> do { > > >>>> ConsumerRecords records = > > >> kafkaConsumer.poll(kafkaConfig.getPollTi
Re: Consumer Pause & Scheduled Resume
Code Snippet Without continuous polling == public class OfferItemImageScheduler { @Scheduled(cron = "0 0/2 * * * ?") void startStreaming() { kafkaConsumer.resume(kafkaConsumer.assignment()); offerItemImageConsumer.streamMessages(kafkaConsumer); kafkaConsumer.pause(kafkaConsumer.assignment()); } } = public class OfferItemImageConsumer { public boolean streamMessages(KafkaConsumer kafkaConsumer) { try { do { ConsumerRecords records = kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); log.info("Kafka poll returned {} records", records.count()); checkEmptyFetch(records); processRecords(records); } while (!consumerPaused.get() && !consumerLoopClosed.get()); } catch (WakeupException wakeupException) { //do nothing if wakeupException is from shutdown hook if (!consumerLoopClosed.get()) { handleConsumerLoopException(wakeupException); } } catch (RuntimeException ex) { handleConsumerLoopException(ex); } finally { resetConsumerStatus(); } return true; } } On Thu, Oct 25, 2018 at 6:11 PM pradeep s wrote: > Hi Manoj/Matthias, > My requirement is that to run the consumer daily once , stream the > messages and pause when i am encountering a few empty fetches . > I am planning to run two consumers and pausing the consumption based on > the empty fetches for a topic with 4 partitions . > To avoid the consumer multi thread access issue , i am running consumer, > exit the poll loop, and calling pause on the same thread. In this case , i > will not continuously polling . > When the next schedule kicks in , i will resume the polling . > Will the consumer resume call cause issues ,since the schedule loop is > trigger long time after the polling stopped .(Or the old approach of > continuous polling is the correct one) > Also ,Manoj, can you please explain on the rebalance scenario if the > consumer is paused for two partitions and gets the assignment for another > two partitions (because of a pod termination), how can i pause the > consumption if its not the scheduled time to process the records. > Thanks > Pradeep > > On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar > wrote: > >> One item to be aware with pause and resume - is that it applies to >> partitions currently assigned to the consumer. >> >> But partitions can get revoked or additional partitions can get assigned >> to >> consumer. >> >> With reassigned , you might be expecting the consumer to be paused but >> suddenly start getting messages because a new partition got assigned. >> >> Use the RebalanceListener to pause or resume any new partitions >> >> regards >> >> On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax >> wrote: >> >> > That is correct: clients are not thread safe. >> > >> > You can use an `AtomicBoolean needToResume` that you share over both >> > threads and that is initially false. >> > >> > In your scheduled method, you set the variable to true. >> > >> > In your main consumer, each time before you call poll(), you check if >> > the variable is set to true. If yes, you resume() and reset the variable >> > to false. >> > >> > Hope this helps. >> > >> > -Matthias >> > >> > >> > On 10/25/18 2:09 PM, pradeep s wrote: >> > > Thanks Matthias. I am facing the issue when i am trying to call the >> > resume >> > > from the scheduled method . >> > > Was getting exception that Kafka Consumer is not safe for multi >> threaded >> > > access . I am trying to see how can call pause and resume on the same >> > > thread. There will be only one thread running for consumption. >> > > >> > > >> > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax < >> matth...@confluent.io> >> > > wrote: >> > > >> > >> There is no issue if you call `poll()` is all partitions are paused. >> If >> > >> fact, if you want to make sure that the consumer does not fall out of >> > >> the consumer group, you must call `poll()` in regular interval to not >> > >> hit `max.poll.interval.ms` timeout. >> > >> >> > >> >> > >> -Matthias >> > >> >> > >> On 10/24/18 10:25 AM, pradeep s wrote: >> > >>> Pause and resume is required since i am running a pod in kubernetes >> > and i >> >
Consumer Pause & Scheduled Resume
Hi, I have a requirement to have kafka streaming start at scheduled time and then pause the stream when the consumer poll returns empty fetches for 3 or more polls. I am starting a consumer poll loop during application startup using a singled thread executor and then pausing the consumer when the poll is returning empty for 3 polls. When the schedule kicks in , i am calling *consumer.resume.* Is this approach correct ? Will it cause any issue If the consumer calls poll on a paused consumer. Skeleton Code public class *OfferItemImageConsumer* implements Runnable { @Override public void run() { try { do { ConsumerRecords records = kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()); writeAndPauseEmptyFetch(records); processRecords(records); } while (!consumerLoopClosed.get()); } catch (RuntimeException ex) { handleConsumerLoopException(ex); } finally { kafkaConsumer.close(); } } private void writeAndPauseEmptyFetch(ConsumerRecords records) { if (records.isEmpty()) { emptyFetchCount++; } if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) { writeImageData(); emptyFetchCount = 0; kafkaConsumer.pause(kafkaConsumer.assignment()); consumerPaused = true; } } } = public class *ItemImageStreamScheduler* { private static final int TERMINATION_TIMEOUT = 10; private ExecutorService executorService = Executors.newSingleThreadExecutor(); private final OfferItemImageConsumer offerItemImageConsumer; private final ItemImageStreamConfig itemImageStreamConfig; private final KafkaConsumer kafkaConsumer; @EventListener(ApplicationReadyEvent.class) void startStreaming() { executorService.submit(offerItemImageConsumer); } @Scheduled void resumeStreaming() { kafkaConsumer.resume(kafkaConsumer.assignment()); } } Thanks Pradeep
Kafka streams usecase
Hi , I have a requirement to stream item details to specific destinations . There are three different kafka streams , one for item info, second for item price and promotions and third for item availability . I want to join all these info and produce a single message containing item,price and availability . Can kafka streams be leveraged for this , if all the messages across three topics can be joined using a coming item identifier . Also its not necessary that all three topics have data . For example, if item setup is done and no inventory is allocated, only item and price topics will have data . Also any good pointers to see a sample app for joining multiple streams and producing a single message to a new topic. Thanks Pradeep
Re: Kafka streams usecase
Thanks Chad! if we want to consume from multiple topics and persist to a database , can i go with a consumer and lookup the record and update .Requirement is to consume from item topic and price topic and create a record in postgress . Both topic have item id in message which is the key in postgress database . Can this be done with a simple consumer ? On Thu, Jan 13, 2022 at 11:11 AM Chad Preisler wrote: > Yes Kafka streams can be used to do this. There are probably several ways > to implement this. We did something like this in Java using a groupByKey() > and reduce() functions. The three topics we wanted to combine into one > topic had different schemas and different java class types. So to combine > them together into one aggregated object we did the following. > > - Create a class with data members of the three objects we wanted to > combine. Let's call it AggregateObj. > - Create a KStream for each topic we wanted to combine. > - For each KStream use a map function that creates and outputs an > AggregateObj setting the input stream object to the correct data member on > the AggregateObj. > - Create an intermediate topic to write individual AggregateObj from each > KStream. > - Create a stream to read the intermediate topic and use the groupByKey() > and reduce() function to create one AggregateObj that has all the parts. > Output that result to the final combined output stream using > toStream().to(). > > We did all of this in one application. You may be able to accomplish the > same thing using aggregate a different way or you may be able to use left > join methods to accomplish the same thing. I can't share the code. Sorry. > > On Tue, Jan 11, 2022 at 10:46 PM pradeep s > wrote: > > > Hi , > > I have a requirement to stream item details to specific destinations . > > There are three different kafka streams , one for item info, second for > > item price and promotions and third for item availability . > > I want to join all these info and produce a single message containing > > item,price and availability . > > Can kafka streams be leveraged for this , if all the messages across > three > > topics can be joined using a coming item identifier . Also its not > > necessary that all three topics have data . > > For example, if item setup is done and no inventory is allocated, only > item > > and price topics will have data . Also any good pointers to see a sample > > app for joining multiple streams and producing a single message to a new > > topic. > > Thanks > > Pradeep > > >