Hi Pradeep
The poll , pause and resume need to happen in the same thread -- in the
same while loop.
If a scheduler is the trigger for pause or resume, do not call pause
/resume from the scheduler thread. Instead set a
variable in the class that has the poll loop. The poll loop can check the
Code Snippet Without continuous polling
==
public class OfferItemImageScheduler {
@Scheduled(cron = "0 0/2 * * * ?")
void startStreaming() {
kafkaConsumer.resume(kafkaConsumer.assignment());
offerItemImageConsumer.streamMessages(kafkaConsumer);
Hi Manoj/Matthias,
My requirement is that to run the consumer daily once , stream the messages
and pause when i am encountering a few empty fetches .
I am planning to run two consumers and pausing the consumption based on
the empty fetches for a topic with 4 partitions .
To avoid the consumer
One item to be aware with pause and resume - is that it applies to
partitions currently assigned to the consumer.
But partitions can get revoked or additional partitions can get assigned to
consumer.
With reassigned , you might be expecting the consumer to be paused but
suddenly start getting
That is correct: clients are not thread safe.
You can use an `AtomicBoolean needToResume` that you share over both
threads and that is initially false.
In your scheduled method, you set the variable to true.
In your main consumer, each time before you call poll(), you check if
the variable is
Thanks Matthias. I am facing the issue when i am trying to call the resume
from the scheduled method .
Was getting exception that Kafka Consumer is not safe for multi threaded
access . I am trying to see how can call pause and resume on the same
thread. There will be only one thread running for
There is no issue if you call `poll()` is all partitions are paused. If
fact, if you want to make sure that the consumer does not fall out of
the consumer group, you must call `poll()` in regular interval to not
hit `max.poll.interval.ms` timeout.
-Matthias
On 10/24/18 10:25 AM, pradeep s
Pause and resume is required since i am running a pod in kubernetes and i
am not shutting down the app
On Tue, Oct 23, 2018 at 10:33 PM pradeep s
wrote:
> Hi,
> I have a requirement to have kafka streaming start at scheduled time and
> then pause the stream when the consumer poll returns empty
Hi,
I have a requirement to have kafka streaming start at scheduled time and
then pause the stream when the consumer poll returns empty fetches for 3 or
more polls.
I am starting a consumer poll loop during application startup using a
singled thread executor and then pausing the consumer when the