Storm kafka integration

2017-02-19 Thread pradeep s
Hi,
I am using Storm 1.0.2 and Kafka 0.10.1.1 and have query on Spout code to
integrate with Kafka. As per storm docs , its mentioned to use Broker Hosts
to register the Kafka Spout.
http://storm.apache.org/releases/1.0.2/storm-kafka.html

In this case will the consumer offsets be stored in zookeeper. Is this the
preferred approach .
I have read that in latest Kafka version , consumer offsets can be
maintained in Kafka cluster itslef. Is there any storm spout example for
this .
Regards
Pradeep S


KStreams API Usage

2018-04-27 Thread pradeep s
Hi,

I am trying to call kafka stream close based on the presence of a value in
the output of ValueTransformer.ValueTransformer produces a
List

Is there a way to avoid the foreach on Kstream and try to get the
first value alone? (like streams api method findFirst)

 private void checkMerchHierarchyEmpty(KStream trans) {
trans.filter((key, value) -> value.stream().anyMatch(val ->

MERCH_HIERARCHY_CACHE_EMPTY.equals(

  val.getErrorMessage(.foreach(
((key, value) -> {

metricsClient.writeMetric(CountMetric.generate(STREAM_SHUTDOWN_ACTION,
1));
log.fatal("Shutting down kafka stream since merch
hierarchy is empty");
kafkaStreams.close(STREAM_SHUTDOWN_WAITTIME_MS,
TimeUnit.MILLISECONDS);
})
);

}


Thanks
Pradeep


Kafka stream specify key for message

2018-06-13 Thread pradeep s
Hi,
In kafka stream, when we use *to *method for sending values to a topic, is
there a way to mention the message key .

.to(outputTopic, Produced.with(byteArraySerde, itemEnvelopeSerde));

In Produced class , i cant find a way to set the key.

https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/Produced.html

Thanks

Pradeep


Kafka consumer loop exception handling

2018-05-31 Thread pradeep s
Hi,
I am running a poll loop for kafka consumer and the app is deployed in
kubernetes.I am using manual commits.Have couple of questions on exception
handling in the poll loop

1) Do i need to handle consumer rebalance scenario(when any of the consumer
pod dies) by adding a listener or will the commits be taken care after
rebalance .

2) Do i need to handle CommitFailedException specifically

Consume loop code below


@Override
public void run() {
try {
do {
processRecords(kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()));
kafkaConsumer.commitSync();
} while (!isConsumerLoopClosed.get());
} catch (WakeupException wakeupException) {
//do nothing if wakeupException is from shutdown hook
if (!isConsumerLoopClosed.get()) {
handleConsumerLoopException(wakeupException);
}
} catch (RuntimeException ex) {
handleConsumerLoopException(ex);
} finally {
kafkaConsumer.close();
}


}

Thanks
Pradeep


Re: Kafka consumer loop exception handling

2018-06-01 Thread pradeep s
Than you. In my case i am receiving messages , doing a small transformation
and sending to a output topic .
If i am running 4 consumers against 4 partitions and one of the consumer
dies , will there be duplicate messages sent in this case
Since when the new consumer comes up , it will again process from the
uncommitted offset .
So do i need transaction semantics in this scenario.


On Fri, Jun 1, 2018 at 4:56 AM, M. Manna  wrote:

> This is actually quite nicely explained by Jason Gustafson on this article
> -
> https://www.confluent.io/blog/tutorial-getting-started-with-
> the-new-apache-kafka-0-9-consumer-client/
>
> It's technically up to the application on how to determine whether message
> is fully received. If you have database txn involved, I would say that
> CommitFailedException should revert all changes you have done. Because you
> couldn't commit the offset successfully, you haven't "Really" consumed any
> message.
>
> Tailoring your code a little bit:
>
> @Override
> public void run() {
> try {
> do {
> processRecords(kafkaConsumer.poll(kafkaConfig.
> getPollTimeoutMs()));
> kafkaConsumer.commitSync();
> } while (!isConsumerLoopClosed.get());
> } catch (WakeupException wakeupException) {
> //do nothing if wakeupException is from shutdown hook
> if (!isConsumerLoopClosed.get()) {
> handleConsumerLoopException(wakeupException);
> }
> } catch (RuntimeException ex) { // RuntimeException could also happen
> for other reasons here
> if (ex instanceof CommitFailedException) {
> // revert db txn etc. to avoid false positives
> } else if (ex instanceof KafkaException) {
> // do something else.
> } else {
>// alternatively, do this
> }
> handleConsumerLoopException(ex);
> } finally {
> kafkaConsumer.close();
> }
>
> }
>
> One thing to remember is that when you are sending data, as of 1.0.0 API
> you can have a "Txn-like" finer control to determine when you have
> successfully committed a transaction. You can check beginTransaction(),
> commitTransaction(), abortTransaction() methods to see how they can be
> utilised to have even finer control over your message delivery.
>
> Regards,
>
>
> On 1 June 2018 at 05:54, pradeep s  wrote:
>
> > Hi,
> > I am running a poll loop for kafka consumer and the app is deployed in
> > kubernetes.I am using manual commits.Have couple of questions on
> exception
> > handling in the poll loop
> >
> > 1) Do i need to handle consumer rebalance scenario(when any of the
> consumer
> > pod dies) by adding a listener or will the commits be taken care after
> > rebalance .
> >
> > 2) Do i need to handle CommitFailedException specifically
> >
> > Consume loop code below
> >
> >
> > @Override
> > public void run() {
> > try {
> > do {
> > processRecords(kafkaConsumer.poll(kafkaConfig.
> > getPollTimeoutMs()));
> > kafkaConsumer.commitSync();
> > } while (!isConsumerLoopClosed.get());
> > } catch (WakeupException wakeupException) {
> > //do nothing if wakeupException is from shutdown hook
> > if (!isConsumerLoopClosed.get()) {
> > handleConsumerLoopException(wakeupException);
> > }
> > } catch (RuntimeException ex) {
> > handleConsumerLoopException(ex);
> > } finally {
> > kafkaConsumer.close();
> > }
> >
> >
> > }
> >
> > Thanks
> > Pradeep
> >
>


Kafka consumer commit behaviour on rebalance

2018-02-13 Thread pradeep s
Hi All,
I am running a Kafka consumer(Single threaded) on kubernetes . Application
is polling the records and accummulating in memory . There is a scheduled
write of these records to S3 . Only after that i am committing the offsets
back to Kafka.
There are 4 partitions and 4 consumers(4 kubernetes pods) are there in the
group.

I have a question on the commit behaviour when kafka rebalancing happens .
After a kafka rebalance , will the consumers get reassigned and get
duplicate records .
Do i need to clear my in memory buffer on a rebalance event

Thanks
Pradeep


Kakfa embedded cluster rebalance scenario test

2018-08-21 Thread pradeep s
Hi ,
 I would like to add a integration test for kafka rebalance scenario and
make sure retries are working as expected if enough replicas are not there
in my kafka streams application .
Do you have any info on how can i achieve this .
I was checking TestUtils class in org.apache.kafka.test, but could not
figure out a way on how to trigger a rebalance
or NotLeaderForPartitionException scenario using a embedded kafka cluster
Thanks
Pradeep


Kafka streams state directory - help

2018-04-21 Thread pradeep s
Hi,
I am using kafka streams app connecting to confluent kafka cluster(10.2.1).
Application is reading messages from a topic, performing a tranformation
and pushing to output topic . There is no count or aggregation performed .
Have following clarifications regarding state directory.

*1)* Will there be any data written in state directory?
When i verified the state directory , it was showing
0
0

*2)* Application is running in kubernetes without any external volumes .
Will state directory cause any processing issue during kubernetes pod
restarts?

*3)* Will the app creates a changelog topic since there is no in memory
store used in the app?


Code Snippet
===

Stream Config
=

properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
streamEnvironment.getKafkaBootstrapServers());
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
NUMBER_OF_STREAM_THREADS);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);


properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY);
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
ItemDeserializationExceptionHandler.class);



Stream builder
=

private void processStream(StreamsBuilder builder) {
KStream input = builder.stream(inputTopic,
Consumed.with(byteArraySerde, byteArraySerde))
   .peek((key, value) ->
metricsClient.writeMetric(

CountMetric.generate(METRIC_OFFER_INPUT, 1)));

KStream
deserializationResultStream = input
.mapValues(this::deserializeOffer);

quarantineNonDeSerializableOffers(deserializationResultStream);

KStream trans =
transformOffers(deserializationResultStream);

produceToQuarantineTopic(trans);

produceToOutputTopic(trans);

}

private void produceToOutputTopic(KStream trans) {
trans.filter((key, value) -> value != null
 && !value.isEmpty())
 .peek((key, value) ->
metricsClient.writeMetric(CountMetric.generate(METRIC_ITEMS_OUTPUT,
1)))
 .flatMapValues(transformerResults -> transformerResults.stream()

.map(TransformerResult::getItem)

.filter(Objects::nonNull)

.collect(Collectors.toCollection(ArrayList::new)))
 .to(outputTopic, Produced.with(byteArraySerde, itemEnvelopeSerde));
}

private void produceToQuarantineTopic(KStream trans) {
trans.filter((key, value) -> value == null || value.isEmpty()
 ||
value.stream().anyMatch(TransformerResult::hasErrors))
 .mapValues(val -> toQuarantineEnvelope(val, INVALID_SKU))
 .to(quarantineTopic, Produced.with(byteArraySerde,
quarantineItemEnvelopeSerde));
}

Thanks
Pradeep


Re: Kafka streams state directory - help

2018-04-21 Thread pradeep s
Thanks Matthias. Can you also please confirm the compatible versions of the
client dependencies . Our broker version is 10.2.1 and when i updgrade the
client library to 1.1.0, i am getting a issue with tests while starting the
embedded cluster .
Test dependencies are (kafka-stream.version is 1.1.0)


org.apache.kafka
kafka-streams
${kafka-stream.version}
test
test


org.apache.kafka
kafka-clients
${kafka-stream.version}
test
test


org.apache.kafka
kafka_2.11
${kafka-stream.version}
test
test




Test error
===
java.lang.AbstractMethodError:
kafka.zk.EmbeddedZookeeper.kafka$utils$Logging$_setter_$loggerName_$eq(Ljava/lang/String;)V

at kafka.utils.Logging$class.$init$(Logging.scala:23)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:37)
at
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.start(EmbeddedKafkaCluster.java:87)
at
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.before(EmbeddedKafkaCluster.java:153)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
at
com.github.tomakehurst.wiremock.junit.WireMockClassRule$1.evaluate(WireMockClassRule.java:70)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at
org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

On Sat, Apr 21, 2018 at 3:06 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> You are hitting: https://issues.apache.org/jira/browse/KAFKA-6499
>
> Was fixed in 1.1 release.
>
> Thus, you can just ignore the checkpoint file. There should be no issue
> with running on Kubernetes.
>
> Also, if there is no store (independent of disk based or in-memory)
> there will be no changelog topic.
>
>
> -Matthias
>
> On 4/21/18 8:34 AM, pradeep s wrote:
> > Hi,
> > I am using kafka streams app connecting to confluent kafka
> cluster(10.2.1).
> > Application is reading messages from a topic, performing a tranformation
> > and pushing to output topic . There is no count or aggregation performed
> .
> > Have following clarifications regarding state directory.
> >
> > *1)* Will there be any data written in state directory?
> > When i verified the state directory , it was showing
> > 0
> > 0
> >
> > *2)* Application is running in kubernetes without any external volumes .
> > Will state directory cause any processing issue during kubernetes pod
> > restarts?
> >
> > *3)* Will the app creates a changelog topic since there is no in memory
> > store used in the app?
> >
> >
> > Code Snippet
> > ===
> >
> > Stream Config
> > =
> >
> > properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > streamEnvironment.getKafkaBootstrapServers());
> > properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> > NUMBER_OF_STREAM_THREADS);
> > properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> COMMIT_INTERVAL_MS);
> >
> >
> > properties.put(StreamsConfig.STATE_DIR_CONFIG, STATE_DIRECTORY);
> > properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_
> EXCEPTION_HANDLER_CLASS_CONFIG,
> > ItemDeserializationExceptionHandler.class);
> >
> >
> >
> > Stream builder
> > =
> >
> > private void processStream(StreamsBuilder builder) {
> > KStream<byte[], byte[]> input = builder.stream(inputTopic,
> > Consumed.with(byteArraySerde, byteArraySerde))
> >.peek((key, value) ->
> > metricsClient.writeMetric(
> >
> > CountMetric.generate(METRIC_OFFER_INPUT, 1)));
> >
> > KStream<byte[], DeserializationResultWrapper>
> > deserializationResultStream = input
> > .mapValues(this::deserializeOffer);
> >
> > quarantineNonDeSerializableOffers(deserializationResultStream);
> >
> > KStream<byte[], List> trans =
> > transformOffers(deserializationResultStream);
> >
> > produceToQuarantineTopic(trans);
> >
> > produceToOutputTopic(trans);
> >
> > }
> >
> > private void produceToOutputTopic(KStream<byte[],
> > List> trans) {
> > trans.filter((key, va

Kafka commit interval

2018-10-16 Thread pradeep s
Hi,
I have a usecase to stream messages from Kafka and buffer it in memory till
a message count is reached and then write these to output file . I am using
manual commit . I have a question on whats the maximum time i can wait
after consuming the message and till we commit back to Kafka . Is there a
upper limit for this ? Is consumer rebalance the only issue while waiting
long periods, then can i write the contents using a rebalance listener and
commit at that point .
Thanks
Pradeep


Re: Kafka commit interval

2018-10-19 Thread pradeep s
Thanks Matthias

On Fri, Oct 19, 2018 at 11:25 AM Matthias J. Sax 
wrote:

> There is not upper limit.
>
> And yes, you are right about rebalancing. This would be an issue and
> yes, you can use the rebalance listener to address it (it's the purpose
> of the rebalance listener to be used for cases like this).
>
> -Matthias
>
>
> On 10/16/18 2:19 PM, pradeep s wrote:
> > Hi,
> > I have a usecase to stream messages from Kafka and buffer it in memory
> till
> > a message count is reached and then write these to output file . I am
> using
> > manual commit . I have a question on whats the maximum time i can wait
> > after consuming the message and till we commit back to Kafka . Is there a
> > upper limit for this ? Is consumer rebalance the only issue while waiting
> > long periods, then can i write the contents using a rebalance listener
> and
> > commit at that point .
> > Thanks
> > Pradeep
> >
>
>


Re: Consumer Pause & Scheduled Resume

2018-10-24 Thread pradeep s
Pause and resume is required since i am running a pod in kubernetes and i
am not shutting down the app

On Tue, Oct 23, 2018 at 10:33 PM pradeep s 
wrote:

> Hi,
> I have a requirement to have kafka streaming start at scheduled time and
> then pause the stream when the consumer poll returns empty fetches for 3 or
> more polls.
>
> I am starting a consumer poll loop during application startup using a
> singled thread executor and then pausing the consumer when the poll is
> returning empty for 3 polls.
>
> When the schedule kicks in , i am calling *consumer.resume.*
>
> Is this approach correct ?
> Will it cause any issue If the  consumer calls poll on a paused consumer.
>
> Skeleton Code
> 
>
> public class *OfferItemImageConsumer* implements Runnable {
>
> @Override
> public void run() {
> try {
> do {
> ConsumerRecords records = 
> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> writeAndPauseEmptyFetch(records);
> processRecords(records);
> } while (!consumerLoopClosed.get());
> } catch (RuntimeException ex) {
> handleConsumerLoopException(ex);
> } finally {
> kafkaConsumer.close();
> }
> }
>
>
> private void writeAndPauseEmptyFetch(ConsumerRecords records) 
> {
> if (records.isEmpty()) {
> emptyFetchCount++;
> }
> if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
> writeImageData();
> emptyFetchCount = 0;
> kafkaConsumer.pause(kafkaConsumer.assignment());
> consumerPaused = true;
> }
> }
>
> }
>
> =
>
> public class *ItemImageStreamScheduler* {
> private static final int TERMINATION_TIMEOUT = 10;
>
>
> private ExecutorService executorService = 
> Executors.newSingleThreadExecutor();
>
> private final OfferItemImageConsumer offerItemImageConsumer;
> private final ItemImageStreamConfig itemImageStreamConfig;
> private final KafkaConsumer kafkaConsumer;
>
> @EventListener(ApplicationReadyEvent.class)
> void startStreaming() {
> executorService.submit(offerItemImageConsumer);
> }
> @Scheduled
> void resumeStreaming() {
> kafkaConsumer.resume(kafkaConsumer.assignment());
> }
>
>
> }
>
> Thanks
>
> Pradeep
>
>


Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread pradeep s
Thanks Matthias. I am facing the issue  when i am trying to call the resume
from the scheduled method .
Was getting exception that  Kafka Consumer is not safe for multi threaded
access . I am trying to see how can call pause and resume on the same
thread. There will be only one thread running for consumption.


On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax 
wrote:

> There is no issue if you call `poll()` is all partitions are paused. If
> fact, if you want to make sure that the consumer does not fall out of
> the consumer group, you must call `poll()` in regular interval to not
> hit `max.poll.interval.ms` timeout.
>
>
> -Matthias
>
> On 10/24/18 10:25 AM, pradeep s wrote:
> > Pause and resume is required since i am running a pod in kubernetes and i
> > am not shutting down the app
> >
> > On Tue, Oct 23, 2018 at 10:33 PM pradeep s 
> > wrote:
> >
> >> Hi,
> >> I have a requirement to have kafka streaming start at scheduled time and
> >> then pause the stream when the consumer poll returns empty fetches for
> 3 or
> >> more polls.
> >>
> >> I am starting a consumer poll loop during application startup using a
> >> singled thread executor and then pausing the consumer when the poll is
> >> returning empty for 3 polls.
> >>
> >> When the schedule kicks in , i am calling *consumer.resume.*
> >>
> >> Is this approach correct ?
> >> Will it cause any issue If the  consumer calls poll on a paused
> consumer.
> >>
> >> Skeleton Code
> >> 
> >>
> >> public class *OfferItemImageConsumer* implements Runnable {
> >>
> >> @Override
> >> public void run() {
> >> try {
> >> do {
> >> ConsumerRecords records =
> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> >> writeAndPauseEmptyFetch(records);
> >> processRecords(records);
> >> } while (!consumerLoopClosed.get());
> >> } catch (RuntimeException ex) {
> >> handleConsumerLoopException(ex);
> >> } finally {
> >> kafkaConsumer.close();
> >> }
> >> }
> >>
> >>
> >> private void writeAndPauseEmptyFetch(ConsumerRecords
> records) {
> >> if (records.isEmpty()) {
> >> emptyFetchCount++;
> >> }
> >> if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
> >> writeImageData();
> >> emptyFetchCount = 0;
> >> kafkaConsumer.pause(kafkaConsumer.assignment());
> >> consumerPaused = true;
> >> }
> >> }
> >>
> >> }
> >>
> >> =
> >>
> >> public class *ItemImageStreamScheduler* {
> >> private static final int TERMINATION_TIMEOUT = 10;
> >>
> >>
> >> private ExecutorService executorService =
> Executors.newSingleThreadExecutor();
> >>
> >> private final OfferItemImageConsumer offerItemImageConsumer;
> >> private final ItemImageStreamConfig itemImageStreamConfig;
> >> private final KafkaConsumer kafkaConsumer;
> >>
> >> @EventListener(ApplicationReadyEvent.class)
> >> void startStreaming() {
> >> executorService.submit(offerItemImageConsumer);
> >> }
> >> @Scheduled
> >> void resumeStreaming() {
> >> kafkaConsumer.resume(kafkaConsumer.assignment());
> >> }
> >>
> >>
> >> }
> >>
> >> Thanks
> >>
> >> Pradeep
> >>
> >>
> >
>
>


Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread pradeep s
Hi Manoj/Matthias,
My requirement is that to run the consumer daily once , stream the messages
and pause when i am encountering a few empty fetches .
I am planning to  run two consumers and  pausing the consumption based on
the empty fetches for a topic with 4 partitions .
To avoid the consumer multi thread access issue , i am running  consumer,
exit  the poll loop, and calling pause on the same thread. In this case , i
will not continuously polling .
When the next schedule kicks in , i will resume the polling .
Will the consumer resume call cause issues  ,since the schedule loop is
trigger long time after the polling stopped .(Or the old approach of
continuous polling is the correct one)
Also ,Manoj, can you please explain on the rebalance scenario if the
consumer is paused for two partitions and gets the assignment for another
two partitions (because of a pod termination), how can i pause the
consumption if its not the scheduled time to process the records.
Thanks
Pradeep

On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar 
wrote:

> One item to be aware with pause and resume - is that it applies to
> partitions currently assigned to the consumer.
>
> But partitions can get revoked or additional partitions can get assigned to
> consumer.
>
> With reassigned , you might be expecting the consumer to be paused but
> suddenly start getting messages because a new partition got assigned.
>
> Use the RebalanceListener to pause or resume any new partitions
>
> regards
>
> On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax 
> wrote:
>
> > That is correct: clients are not thread safe.
> >
> > You can use an `AtomicBoolean needToResume` that you share over both
> > threads and that is initially false.
> >
> > In your scheduled method, you set the variable to true.
> >
> > In your main consumer, each time before you call poll(), you check if
> > the variable is set to true. If yes, you resume() and reset the variable
> > to false.
> >
> > Hope this helps.
> >
> > -Matthias
> >
> >
> > On 10/25/18 2:09 PM, pradeep s wrote:
> > > Thanks Matthias. I am facing the issue  when i am trying to call the
> > resume
> > > from the scheduled method .
> > > Was getting exception that  Kafka Consumer is not safe for multi
> threaded
> > > access . I am trying to see how can call pause and resume on the same
> > > thread. There will be only one thread running for consumption.
> > >
> > >
> > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax  >
> > > wrote:
> > >
> > >> There is no issue if you call `poll()` is all partitions are paused.
> If
> > >> fact, if you want to make sure that the consumer does not fall out of
> > >> the consumer group, you must call `poll()` in regular interval to not
> > >> hit `max.poll.interval.ms` timeout.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 10/24/18 10:25 AM, pradeep s wrote:
> > >>> Pause and resume is required since i am running a pod in kubernetes
> > and i
> > >>> am not shutting down the app
> > >>>
> > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
> > sreekumar.prad...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi,
> > >>>> I have a requirement to have kafka streaming start at scheduled time
> > and
> > >>>> then pause the stream when the consumer poll returns empty fetches
> for
> > >> 3 or
> > >>>> more polls.
> > >>>>
> > >>>> I am starting a consumer poll loop during application startup using
> a
> > >>>> singled thread executor and then pausing the consumer when the poll
> is
> > >>>> returning empty for 3 polls.
> > >>>>
> > >>>> When the schedule kicks in , i am calling *consumer.resume.*
> > >>>>
> > >>>> Is this approach correct ?
> > >>>> Will it cause any issue If the  consumer calls poll on a paused
> > >> consumer.
> > >>>>
> > >>>> Skeleton Code
> > >>>> 
> > >>>>
> > >>>> public class *OfferItemImageConsumer* implements Runnable {
> > >>>>
> > >>>> @Override
> > >>>> public void run() {
> > >>>> try {
> > >>>> do {
> > >>>> ConsumerRecords records =
> > >> kafkaConsumer.poll(kafkaConfig.getPollTi

Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread pradeep s
Code Snippet Without continuous polling
==
public class OfferItemImageScheduler {

@Scheduled(cron = "0 0/2 * * * ?")

void startStreaming() {
kafkaConsumer.resume(kafkaConsumer.assignment());
offerItemImageConsumer.streamMessages(kafkaConsumer);
kafkaConsumer.pause(kafkaConsumer.assignment());

}

}

=

public class OfferItemImageConsumer {

public boolean streamMessages(KafkaConsumer kafkaConsumer) {
try {
do {
ConsumerRecords records =
kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
log.info("Kafka poll returned {} records", records.count());
checkEmptyFetch(records);
processRecords(records);
} while (!consumerPaused.get() && !consumerLoopClosed.get());
} catch (WakeupException wakeupException) {
//do nothing if wakeupException is from shutdown hook
if (!consumerLoopClosed.get()) {
handleConsumerLoopException(wakeupException);
}
} catch (RuntimeException ex) {
handleConsumerLoopException(ex);
} finally {
resetConsumerStatus();
}
return true;
}


}


On Thu, Oct 25, 2018 at 6:11 PM pradeep s 
wrote:

> Hi Manoj/Matthias,
> My requirement is that to run the consumer daily once , stream the
> messages and pause when i am encountering a few empty fetches .
> I am planning to  run two consumers and  pausing the consumption based on
> the empty fetches for a topic with 4 partitions .
> To avoid the consumer multi thread access issue , i am running  consumer,
> exit  the poll loop, and calling pause on the same thread. In this case , i
> will not continuously polling .
> When the next schedule kicks in , i will resume the polling .
> Will the consumer resume call cause issues  ,since the schedule loop is
> trigger long time after the polling stopped .(Or the old approach of
> continuous polling is the correct one)
> Also ,Manoj, can you please explain on the rebalance scenario if the
> consumer is paused for two partitions and gets the assignment for another
> two partitions (because of a pod termination), how can i pause the
> consumption if its not the scheduled time to process the records.
> Thanks
> Pradeep
>
> On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar 
> wrote:
>
>> One item to be aware with pause and resume - is that it applies to
>> partitions currently assigned to the consumer.
>>
>> But partitions can get revoked or additional partitions can get assigned
>> to
>> consumer.
>>
>> With reassigned , you might be expecting the consumer to be paused but
>> suddenly start getting messages because a new partition got assigned.
>>
>> Use the RebalanceListener to pause or resume any new partitions
>>
>> regards
>>
>> On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax 
>> wrote:
>>
>> > That is correct: clients are not thread safe.
>> >
>> > You can use an `AtomicBoolean needToResume` that you share over both
>> > threads and that is initially false.
>> >
>> > In your scheduled method, you set the variable to true.
>> >
>> > In your main consumer, each time before you call poll(), you check if
>> > the variable is set to true. If yes, you resume() and reset the variable
>> > to false.
>> >
>> > Hope this helps.
>> >
>> > -Matthias
>> >
>> >
>> > On 10/25/18 2:09 PM, pradeep s wrote:
>> > > Thanks Matthias. I am facing the issue  when i am trying to call the
>> > resume
>> > > from the scheduled method .
>> > > Was getting exception that  Kafka Consumer is not safe for multi
>> threaded
>> > > access . I am trying to see how can call pause and resume on the same
>> > > thread. There will be only one thread running for consumption.
>> > >
>> > >
>> > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <
>> matth...@confluent.io>
>> > > wrote:
>> > >
>> > >> There is no issue if you call `poll()` is all partitions are paused.
>> If
>> > >> fact, if you want to make sure that the consumer does not fall out of
>> > >> the consumer group, you must call `poll()` in regular interval to not
>> > >> hit `max.poll.interval.ms` timeout.
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >> On 10/24/18 10:25 AM, pradeep s wrote:
>> > >>> Pause and resume is required since i am running a pod in kubernetes
>> > and i
>> > 

Consumer Pause & Scheduled Resume

2018-10-23 Thread pradeep s
Hi,
I have a requirement to have kafka streaming start at scheduled time and
then pause the stream when the consumer poll returns empty fetches for 3 or
more polls.

I am starting a consumer poll loop during application startup using a
singled thread executor and then pausing the consumer when the poll is
returning empty for 3 polls.

When the schedule kicks in , i am calling *consumer.resume.*

Is this approach correct ?
Will it cause any issue If the  consumer calls poll on a paused consumer.

Skeleton Code


public class *OfferItemImageConsumer* implements Runnable {

@Override
public void run() {
try {
do {
ConsumerRecords records =
kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
writeAndPauseEmptyFetch(records);
processRecords(records);
} while (!consumerLoopClosed.get());
} catch (RuntimeException ex) {
handleConsumerLoopException(ex);
} finally {
kafkaConsumer.close();
}
}


private void writeAndPauseEmptyFetch(ConsumerRecords records) {
if (records.isEmpty()) {
emptyFetchCount++;
}
if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
writeImageData();
emptyFetchCount = 0;
kafkaConsumer.pause(kafkaConsumer.assignment());
consumerPaused = true;
}
}

}

=

public class *ItemImageStreamScheduler* {
private static final int TERMINATION_TIMEOUT = 10;


private ExecutorService executorService =
Executors.newSingleThreadExecutor();

private final OfferItemImageConsumer offerItemImageConsumer;
private final ItemImageStreamConfig itemImageStreamConfig;
private final KafkaConsumer kafkaConsumer;

@EventListener(ApplicationReadyEvent.class)
void startStreaming() {
executorService.submit(offerItemImageConsumer);
}
@Scheduled
void resumeStreaming() {
kafkaConsumer.resume(kafkaConsumer.assignment());
}


}

Thanks

Pradeep


Kafka streams usecase

2022-01-11 Thread pradeep s
Hi ,
I have a requirement to stream item details to specific destinations .
There are three different kafka streams , one for item info, second for
item price and promotions and third for item availability .
I want to join all these info and produce a single message  containing
item,price and availability .
Can kafka streams be leveraged for this , if all the messages across three
topics can be joined using a coming item identifier . Also its not
necessary that all three topics have data .
For example, if item setup is done and no inventory is allocated, only item
and price topics will have data . Also any good pointers to see a sample
app for joining multiple streams and producing a single message to a new
topic.
Thanks
Pradeep


Re: Kafka streams usecase

2022-02-16 Thread pradeep s
Thanks Chad! if we want to  consume from multiple topics and persist to a
database , can i go with a consumer and lookup the record and update
.Requirement is to consume from item topic and price topic and create a
record in postgress . Both topic have item id in message which is the key
in postgress database . Can this be done with a simple consumer ?

On Thu, Jan 13, 2022 at 11:11 AM Chad Preisler 
wrote:

> Yes Kafka streams can be used to do this. There are probably several ways
> to implement this. We did something like this in Java using a groupByKey()
> and reduce() functions. The three topics we wanted to combine into one
> topic had different schemas and different java class types. So to combine
> them together into one aggregated object we did the following.
>
> - Create a class with data members of the three objects we wanted to
> combine. Let's call it AggregateObj.
> - Create a KStream for each topic we wanted to combine.
> - For each KStream use a map function that creates and outputs an
> AggregateObj setting the input stream object to the correct data member on
> the AggregateObj.
> - Create an intermediate topic to write individual AggregateObj from each
> KStream.
> - Create a stream to read the intermediate topic and use the groupByKey()
> and reduce() function to create one AggregateObj that has all the parts.
> Output that result to the final combined output stream using
> toStream().to().
>
> We did all of this in one application. You may be able to accomplish the
> same thing using aggregate a different way or you may be able to use left
> join methods to accomplish the same thing. I can't share the code. Sorry.
>
> On Tue, Jan 11, 2022 at 10:46 PM pradeep s 
> wrote:
>
> > Hi ,
> > I have a requirement to stream item details to specific destinations .
> > There are three different kafka streams , one for item info, second for
> > item price and promotions and third for item availability .
> > I want to join all these info and produce a single message  containing
> > item,price and availability .
> > Can kafka streams be leveraged for this , if all the messages across
> three
> > topics can be joined using a coming item identifier . Also its not
> > necessary that all three topics have data .
> > For example, if item setup is done and no inventory is allocated, only
> item
> > and price topics will have data . Also any good pointers to see a sample
> > app for joining multiple streams and producing a single message to a new
> > topic.
> > Thanks
> > Pradeep
> >
>