Re: How does Kafka Consumer send JoinRequest?

2023-11-26 Thread Haruki Okada
Hi.

JoinGroup request is sent from the polling/user thread.
In your example, the consumer instance will be removed from the group
because it didn't join the group within the timeout.
So the partition will be assigned to another consumer and be processed.

2023年11月26日(日) 18:09 Debraj Manna :

> Can someone let me know if the JoinRequest is sent by the consumer from the
> polling/user thread or from the background heart-beat thread?
>
> If JoinRequest is being sent from the polling/user thread then in this case
> if the poll user thread takes more than max.poll.interval.secs then the
> consumer will remain disconnected from the broker for that long. For
> example, if max.poll.interval.secs is 300 sec and if processing in the poll
> thread takes 15 mins then for 15 mins the partition from which this
> consumer was polling will remain idle and no message will be consumed from
> that partition. Is my understanding correct?
>
> I am using Kafka client 3.5.1 with Apache Kafka broker 2.8.1 with all
> default settings on the consumer configs.
>


-- 

Okada Haruki
ocadar...@gmail.com



How does Kafka Consumer send JoinRequest?

2023-11-26 Thread Debraj Manna
Can someone let me know if the JoinRequest is sent by the consumer from the
polling/user thread or from the background heart-beat thread?

If JoinRequest is being sent from the polling/user thread then in this case
if the poll user thread takes more than max.poll.interval.secs then the
consumer will remain disconnected from the broker for that long. For
example, if max.poll.interval.secs is 300 sec and if processing in the poll
thread takes 15 mins then for 15 mins the partition from which this
consumer was polling will remain idle and no message will be consumed from
that partition. Is my understanding correct?

I am using Kafka client 3.5.1 with Apache Kafka broker 2.8.1 with all
default settings on the consumer configs.


Re: How does kafka consumer behave when consumer poll timeout has expired?

2023-11-03 Thread Debraj Manna
Right now sometime I am observing that after the above log is printed on
both the consumer instances then the machine on which the consumer
instances are running stops consuming any new messages. My understanding
was that after the above log is printed then the consumer instances will be
removed from the group and new consumers will be started via the
rebalancing. Is not my understanding correct? If yes then what is not
allowing this to happen? session.timeout.ms is default.

I am using Kafka Client 3.5.1 with Kafka Broker 2.8.1.


Any one any suggestions here?

On Thu, 2 Nov, 2023, 19:16 Debraj Manna,  wrote:

> Hi
>
> Can someone let me know how a consumer is expected to behave after the
> below log? Will the consumer be considered dead and a new instance will be
> spawned due to consumer group rebalancing? How is this behaviour with
> RangeAssignor and CooperativeStickyAssginer?
>
> consumer poll timeout has expired. This means the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> processing messages. You can address this either by increasing
> max.poll.interval.ms or by reducing the maximum size of batches returned
> in poll() with max.poll.records.
>
> For example, let's say I have two instances of a consumer running on two
> different machines. Both instances of the consumer belong to the same
> consumer-group and consume from the same topic with 10 partitions. In this
> case, what is expected when I see the above logs in both the consumers for
> RangeAssignor and CooperativeStickyAssginer
>
> I know what the above log means but want to understand how the consumer
> behaves after this log.
>
> Thanks
>
>


How does kafka consumer behave when consumer poll timeout has expired?

2023-11-02 Thread Debraj Manna
Hi

Can someone let me know how a consumer is expected to behave after the
below log? Will the consumer be considered dead and a new instance will be
spawned due to consumer group rebalancing? How is this behaviour with
RangeAssignor and CooperativeStickyAssginer?

consumer poll timeout has expired. This means the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time processing
messages. You can address this either by increasing max.poll.interval.ms or
by reducing the maximum size of batches returned in poll() with
max.poll.records.

For example, let's say I have two instances of a consumer running on two
different machines. Both instances of the consumer belong to the same
consumer-group and consume from the same topic with 10 partitions. In this
case, what is expected when I see the above logs in both the consumers for
RangeAssignor and CooperativeStickyAssginer

I know what the above log means but want to understand how the consumer
behaves after this log.

Thanks


Re: Apache Kafka consumer consumes messages with "partition" option, but not with "group" option

2023-06-13 Thread Neeraj Vaidya
 Hi,
If you have a large number of partitions in your topic, it can take a really 
long while before you start seeing messages on the console.

So, using the partition id is the right approach. But just need to be patient 
at the command-line. Out of interest, how long did you wait for the output from 
console consumer ?

If you need to know the partition id, you will need to use a custom program to 
compute it based on the key. (You could have a look at the murmur2 source code 
on the Kafka github repository and try to create a simple command line tool to 
compute the partition id using the key).

However, using --group option will only set the consumer group id of your 
instance of the kafka-console-consumer.sh.

Regards,
Neeraj
 On Tuesday, 13 June, 2023 at 05:26:24 pm GMT+10, Geithner, Wolfgang Dr. 
 wrote:  
 
 This is a copy of a topic I posted in stackoverflow 
(https://stackoverflow.com/questions/76458064/apache-kafka-consumer-consumes-messages-with-partition-option-but-not-with-g),
 where I didn't get any answer yet. Searching the web did not yield any helpful 
reults either. Hence, I am addressing to this mailing list:

I am running a plain Apache Kafka server (version 3.4.1) which I would like to 
connect to a Telegraf consumer. The Telegraf ```[[inputs.kafka_consumer]]``` 
plugin has the option to consume by Kafka "group". When staring Telegraf, I get 
an error message

    [inputs.kafka_consumer] Error in plugin: consume: kafka server: Request was 
for a consumer group that is not coordinated by this broker

Hence, I started to investigate my setup by using the Kafka console tools and 
found that when executing

    ./kafka-console-consumer.sh --bootstrap-server myserver:9092 --topic test 
--partition 0

and sending messages via ```kafka-console-consumer.sh```, these messages pop up 
in the console "consumer" window as expected.

In contrast to this, when I run

    ./kafka-console-consumer.sh --bootstrap-server myserver:9092 --topic test 
--group my-group

nothing happens in the "consumer" window. Furthermore, the command

    ./kafka-consumer-groups.sh --bootstrap-server myserver:9092 --list

yields nothing.

What do I have to do to cause the consumer with the "group" option to "see" the 
messages produced to the topic "test"? Ultimately, how can I solve the Telegraf 
error?
  

Apache Kafka consumer consumes messages with "partition" option, but not with "group" option

2023-06-13 Thread Geithner, Wolfgang Dr.
This is a copy of a topic I posted in stackoverflow 
(https://stackoverflow.com/questions/76458064/apache-kafka-consumer-consumes-messages-with-partition-option-but-not-with-g),
 where I didn't get any answer yet. Searching the web did not yield any helpful 
reults either. Hence, I am addressing to this mailing list:

I am running a plain Apache Kafka server (version 3.4.1) which I would like to 
connect to a Telegraf consumer. The Telegraf ```[[inputs.kafka_consumer]]``` 
plugin has the option to consume by Kafka "group". When staring Telegraf, I get 
an error message

[inputs.kafka_consumer] Error in plugin: consume: kafka server: Request was 
for a consumer group that is not coordinated by this broker

Hence, I started to investigate my setup by using the Kafka console tools and 
found that when executing

 ./kafka-console-consumer.sh --bootstrap-server myserver:9092 --topic test 
--partition 0

and sending messages via ```kafka-console-consumer.sh```, these messages pop up 
in the console "consumer" window as expected.

In contrast to this, when I run

 ./kafka-console-consumer.sh --bootstrap-server myserver:9092 --topic test 
--group my-group

nothing happens in the "consumer" window. Furthermore, the command

./kafka-consumer-groups.sh --bootstrap-server myserver:9092 --list

yields nothing.

What do I have to do to cause the consumer with the "group" option to "see" the 
messages produced to the topic "test"? Ultimately, how can I solve the Telegraf 
error?


Managing Kafka consumer buffer memory/size

2023-05-31 Thread Divya Pillai
Hi Team,

We have a scenario where our application is processing messages at a slower
rate. We want the consumer to stop fetching messages from the broker and
re-fetch when an application is ready to process again.

We have fetch.max.bytes but that doesn't manage the buffer memory. If my
understanding is correct then there can be multiple fetch requests and all
the messages fetched must be kept in buffer memory and poll() in consumer
fetches from this buffer memory.

Can we have a way to control this buffer memory? For example, If we keep
this memory at 10MB then it can fetch messages and keep them in the buffer
but once the buffer is full then it should not fetch more and keep more in
memory. Once the poll happens and messages are decreased in the buffer then
it can buffer more by fetching again.

Thanks,
Divya


Re: Kafka Consumer Lag Monitoring

2023-05-10 Thread Raphael Mazelier

I'm not sure to follow you:

either mine

- https://github.com/ut0mt8/yakle or
- https://github.com/danielqsj/kafka_exporter or
- https://github.com/redpanda-data/kminion

export consumer-group lag metrics. All works.

best,

--
Raphael Mazelier


On 10/05/2023 22:47, Akshay Kumar wrote:

Hello team,

The Kakfa lag Prometheus exporter is also not providing lag-related 
metrics.

https://exotel.com/

Akshay Kumar
Member of Technical Staff - 2 | Exotel 

Facebook Twitter 
Linkedin 


Ph: +91855606369
Latest from the Exotel Blog 







Re: Kafka Consumer Lag Monitoring

2023-05-10 Thread Akshay Kumar
Hello team,

The Kakfa lag Prometheus exporter is also not providing lag-related metrics.
 [image: https://exotel.com/]
Akshay Kumar
Member of Technical Staff - 2 |  Exotel
<https://www.exotel.com/?utm_source=global-db_medium=email_campaign=email-signature_term=dg-int_content=shiva-signature>
[image: Facebook] <https://www.facebook.com/Exotel/> [image: Twitter]
<https://twitter.com/Exotel> [image: Linkedin]
<https://www.linkedin.com/company/exotel-techcom-private-limited/>
Ph: +918556063696
Latest from the Exotel Blog
<https://exotel.com/blog/how-does-an-omnichannel-approach-impact-customer-service/>


On Wed, May 10, 2023 at 12:26 AM raph  wrote:

> There is a lot of kakfa lag prometheus exporter. You could give a try at
> mine (named yakle working fine in production since years)But others does
> the job as well.Sent from my Galaxy
>  Original message From: Akshay Kumar
>  Date: 5/9/23  20:14  (GMT+01:00) To:
> users@kafka.apache.org Subject: Kafka Consumer Lag Monitoring Hello
> team,I am using Zookeeper less Kafka (Kafka Kraft - version 3.3.1). I
> wanted to monitor consumer lag, so I was using Burrow for that, but I am
> unable to use Burrow without Zookeeper.Does Burror work without
> Zookeeper?Or what is the better or best way to monitor consumer lag and lag
> history for multiple topics, consumers, and consumer groups?All the topics
> are consumed by different-different consumers(applications). Akshay
> KumarMember of Technical Staff - 2 |  ExotelPh:
> +918556063696Latest from the Exotel Blog
>
>
> CONFIDENTIALITY NOTE: This e-mail is intended only for the person or
> entity to which it is addressed and contains information that is
> privileged, confidential, or otherwise protected from disclosure.
> Dissemination, distribution, or copying of this e-mail or the information
> contained herein by anyone other than the intended recipient, or an
> employee or agent responsible for delivering the message to the intended
> recipient, is prohibited. If you have received this e-mail in error, please
> delete this message and immediately notify the sender by e-mail. NOTE: This
> e-mail does not constitute an electronic signature and the sender does not
> intend to enter into any agreement by way of this e-mail, unless otherwise
> expressly provided by the sender within this e-mail.

-- 
 <https://exotel.com/>
*CONFIDENTIALITY NOTE:* This e-mail is intended only 
for the person or entity to which it is addressed and contains information 
that is privileged, confidential, or otherwise protected from disclosure. 
Dissemination, distribution, or copying of this e-mail or the information 
contained herein by anyone other than the intended recipient, or an 
employee or agent responsible for delivering the message to the intended 
recipient, is prohibited. If you have received this e-mail in error, please 
delete this message and immediately notify the sender by e-mail. 
*NOTE:* 
This e-mail does not constitute an electronic signature and the sender does 
not intend to enter into any agreement by way of this e-mail, unless 
otherwise expressly provided by the sender within this e-mail.



Re: Kafka Consumer Lag Monitoring

2023-05-09 Thread Akshay Kumar
Hello team,

JMX might not help as Kafka JMX doesn't provide Kafka.consumer Mbeans.
However, this Mbean is provided by consumer application. So I am trying to
fetch JMX data from the consumer application with the help of Zabbix, but
I'm unable to fetch proper data.

JMX keys tried -

   -
   
jmx.discovery[beans,"kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-cg-campaign-customer-campaign-customer-consumer-group*"]
   -
   
jmx.get[beans,"kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-cg-campaign-customer-campaign-customer-consumer-group*"]


Can anyone provide any other tool recommendations (tool should work without
zookeeper) or commands which can provide lag for any historical instance?


 [image: https://exotel.com/]
Akshay Kumar
Member of Technical Staff - 2 |  Exotel
<https://www.exotel.com/?utm_source=global-db_medium=email_campaign=email-signature_term=dg-int_content=shiva-signature>
[image: Facebook] <https://www.facebook.com/Exotel/> [image: Twitter]
<https://twitter.com/Exotel> [image: Linkedin]
<https://www.linkedin.com/company/exotel-techcom-private-limited/>
Ph: +918556063696
Latest from the Exotel Blog
<https://exotel.com/blog/how-does-an-omnichannel-approach-impact-customer-service/>


On Wed, May 10, 2023 at 12:28 AM Santhosh Kumar 
wrote:

> Hi Akshay
>
> Burrow is a popular open-source tool for monitoring the consumer lag in
> Apache Kafka. However, Burrow depends on ZooKeeper to access the Kafka
> metadata and track the consumer lag. Therefore, it is not possible to use
> Burrow without ZooKeeper.
>
> Since you are using Kafka version 3.3.1, you can leverage Kafka's built-in
> consumer lag monitoring functionality without the need for any additional
> tools or dependencies. Kafka exposes consumer lag metrics through JMX, and
> you can use any monitoring system that supports JMX to collect and
> visualize these metrics.
>
> Here are the steps to monitor consumer lag in Kafka:
>
>1.
>
>Enable JMX on Kafka brokers and consumers by setting the following JVM
>system properties:
>
> -Dcom.sun.management.jmxremote=true
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.ssl=false
> -Dcom.sun.management.jmxremote.local.only=false
> -Djava.rmi.server.hostname=
>
>
>1.
>
>Replace  with the hostname or IP address of the Kafka broker
>or consumer machine.
>2.
>
>    Start a monitoring system that supports JMX, such as Prometheus,
>Grafana, or Datadog.
>3.
>
>Configure the monitoring system to collect Kafka consumer lag metrics
>using JMX.
>
>    For example, if you are using Prometheus, you can use the jmx_exporter
>tool to expose the Kafka JMX metrics as Prometheus metrics.
>4.
>
>Visualize the Kafka consumer lag metrics in the monitoring system
>dashboard.
>
>You can create custom dashboards that show the consumer lag for
>different topics, consumers, and consumer groups. You can also set up
>alerts to notify you when the consumer lag exceeds a certain threshold.
>
> By using Kafka's built-in JMX metrics and a monitoring system that supports
> JMX, you can monitor consumer lag without the need for additional tools or
> dependencies. This approach also provides a more lightweight and scalable
> solution compared to using external tools like Burrow.
> Thanks
> Santhosh Gopal
> Advisor - Architecture,Performance Engineering
>
> On Tue, May 9, 2023 at 2:14 PM Akshay Kumar  >
> wrote:
>
> > Hello team,
> >
> > I am using Zookeeper less Kafka (Kafka Kraft - version 3.3.1). I wanted
> to
> > monitor consumer lag, so I was using Burrow for that, but I am unable to
> > use Burrow without Zookeeper.
> >
> > Does Burror work without Zookeeper?
> >
> > Or what is the better or best way to monitor consumer lag and lag history
> > for multiple topics, consumers, and consumer groups?
> >
> > All the topics are consumed by different-different
> consumers(applications).
> >
> >  [image: https://exotel.com/]
> > Akshay Kumar
> > Member of Technical Staff - 2 |  Exotel
> > <
> https://www.exotel.com/?utm_source=global-db_medium=email_campaign=email-signature_term=dg-int_content=shiva-signature
> >
> > [image: Facebook] <https://www.facebook.com/Exotel/> [image: Twitter]
> > <https://twitter.com/Exotel> [image: Linkedin]
> > <https://www.linkedin.com/company/exotel-techcom-private-limited/>
> > Ph: +918556063696
> > Latest from the Exotel Blog
> > <
> https://exotel.com/blog/how-does-an-omnichannel-approach-impact-customer-service/
> >
> >

Re: Kafka Consumer Lag Monitoring

2023-05-09 Thread Santhosh Kumar
Hi Akshay

Burrow is a popular open-source tool for monitoring the consumer lag in
Apache Kafka. However, Burrow depends on ZooKeeper to access the Kafka
metadata and track the consumer lag. Therefore, it is not possible to use
Burrow without ZooKeeper.

Since you are using Kafka version 3.3.1, you can leverage Kafka's built-in
consumer lag monitoring functionality without the need for any additional
tools or dependencies. Kafka exposes consumer lag metrics through JMX, and
you can use any monitoring system that supports JMX to collect and
visualize these metrics.

Here are the steps to monitor consumer lag in Kafka:

   1.

   Enable JMX on Kafka brokers and consumers by setting the following JVM
   system properties:

-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.local.only=false
-Djava.rmi.server.hostname=


   1.

   Replace  with the hostname or IP address of the Kafka broker
   or consumer machine.
   2.

   Start a monitoring system that supports JMX, such as Prometheus,
   Grafana, or Datadog.
   3.

   Configure the monitoring system to collect Kafka consumer lag metrics
   using JMX.

   For example, if you are using Prometheus, you can use the jmx_exporter
   tool to expose the Kafka JMX metrics as Prometheus metrics.
   4.

   Visualize the Kafka consumer lag metrics in the monitoring system
   dashboard.

   You can create custom dashboards that show the consumer lag for
   different topics, consumers, and consumer groups. You can also set up
   alerts to notify you when the consumer lag exceeds a certain threshold.

By using Kafka's built-in JMX metrics and a monitoring system that supports
JMX, you can monitor consumer lag without the need for additional tools or
dependencies. This approach also provides a more lightweight and scalable
solution compared to using external tools like Burrow.
Thanks
Santhosh Gopal
Advisor - Architecture,Performance Engineering

On Tue, May 9, 2023 at 2:14 PM Akshay Kumar 
wrote:

> Hello team,
>
> I am using Zookeeper less Kafka (Kafka Kraft - version 3.3.1). I wanted to
> monitor consumer lag, so I was using Burrow for that, but I am unable to
> use Burrow without Zookeeper.
>
> Does Burror work without Zookeeper?
>
> Or what is the better or best way to monitor consumer lag and lag history
> for multiple topics, consumers, and consumer groups?
>
> All the topics are consumed by different-different consumers(applications).
>
>  [image: https://exotel.com/]
> Akshay Kumar
> Member of Technical Staff - 2 |  Exotel
> <https://www.exotel.com/?utm_source=global-db_medium=email_campaign=email-signature_term=dg-int_content=shiva-signature>
> [image: Facebook] <https://www.facebook.com/Exotel/> [image: Twitter]
> <https://twitter.com/Exotel> [image: Linkedin]
> <https://www.linkedin.com/company/exotel-techcom-private-limited/>
> Ph: +918556063696
> Latest from the Exotel Blog
> <https://exotel.com/blog/how-does-an-omnichannel-approach-impact-customer-service/>
>
> [image: https://exotel.com/] <https://exotel.com/>
> *CONFIDENTIALITY NOTE:* This e-mail is intended only for the person or
> entity to which it is addressed and contains information that is
> privileged, confidential, or otherwise protected from disclosure.
> Dissemination, distribution, or copying of this e-mail or the information
> contained herein by anyone other than the intended recipient, or an
> employee or agent responsible for delivering the message to the intended
> recipient, is prohibited. If you have received this e-mail in error, please
> delete this message and immediately notify the sender by e-mail.
> *NOTE:* This e-mail does not constitute an electronic signature and the
> sender does not intend to enter into any agreement by way of this e-mail,
> unless otherwise expressly provided by the sender within this e-mail.
>


RE: Kafka Consumer Lag Monitoring

2023-05-09 Thread raph
There is a lot of kakfa lag prometheus exporter. You could give a try at mine 
(named yakle working fine in production since years)But others does the job as 
well.Sent from my Galaxy
 Original message From: Akshay Kumar 
 Date: 5/9/23  20:14  (GMT+01:00) To: 
users@kafka.apache.org Subject: Kafka Consumer Lag Monitoring Hello team,I am 
using Zookeeper less Kafka (Kafka Kraft - version 3.3.1). I wanted to monitor 
consumer lag, so I was using Burrow for that, but I am unable to use Burrow 
without Zookeeper.Does Burror work without Zookeeper?Or what is the better or 
best way to monitor consumer lag and lag history for multiple topics, 
consumers, and consumer groups?All the topics are consumed by 
different-different consumers(applications). Akshay KumarMember of Technical 
Staff - 2 |  ExotelPh: +918556063696Latest from the Exotel Blog


CONFIDENTIALITY NOTE: This e-mail is intended only for the person or entity to 
which it is addressed and contains information that is privileged, 
confidential, or otherwise protected from disclosure. Dissemination, 
distribution, or copying of this e-mail or the information contained herein by 
anyone other than the intended recipient, or an employee or agent responsible 
for delivering the message to the intended recipient, is prohibited. If you 
have received this e-mail in error, please delete this message and immediately 
notify the sender by e-mail. NOTE: This e-mail does not constitute an 
electronic signature and the sender does not intend to enter into any agreement 
by way of this e-mail, unless otherwise expressly provided by the sender within 
this e-mail.

Kafka Consumer Lag Monitoring

2023-05-09 Thread Akshay Kumar
Hello team,

I am using Zookeeper less Kafka (Kafka Kraft - version 3.3.1). I wanted to
monitor consumer lag, so I was using Burrow for that, but I am unable to
use Burrow without Zookeeper.

Does Burror work without Zookeeper?

Or what is the better or best way to monitor consumer lag and lag history
for multiple topics, consumers, and consumer groups?

All the topics are consumed by different-different consumers(applications).

 [image: https://exotel.com/]
Akshay Kumar
Member of Technical Staff - 2 |  Exotel

[image: Facebook]  [image: Twitter]
 [image: Linkedin]

Ph: +918556063696
Latest from the Exotel Blog


-- 
 
*CONFIDENTIALITY NOTE:* This e-mail is intended only 
for the person or entity to which it is addressed and contains information 
that is privileged, confidential, or otherwise protected from disclosure. 
Dissemination, distribution, or copying of this e-mail or the information 
contained herein by anyone other than the intended recipient, or an 
employee or agent responsible for delivering the message to the intended 
recipient, is prohibited. If you have received this e-mail in error, please 
delete this message and immediately notify the sender by e-mail. 
*NOTE:* 
This e-mail does not constitute an electronic signature and the sender does 
not intend to enter into any agreement by way of this e-mail, unless 
otherwise expressly provided by the sender within this e-mail.



Kafka Consumer Health Check

2023-03-07 Thread Upesh Desai
Hi all,

Is there a standardized way to implement a health check for a Kafka Consumer? 
I.e. for an application that runs in Kubernetes w/liveness probes. There does 
not seem to be an exposed API method for the Consumer’s current state or 
anything similar.

The example issue we ran into was with one of our three broker’s being moved 
when an AWS node failed, and that broker was the coordinator for our Consumer. 
The consumer eventually reached the max.poll.ms and left the group, but no 
exceptions were thrown, no errors, so our applications error handling didn’t 
flag the consumer as dead.

Thanks in advance!
Upesh

Upesh Desai
Senior Software Developer
ude...@itrsgroup.com
www.itrsgroup.com
Internet communications are not secure and therefore the ITRS Group does not 
accept legal responsibility for the contents of this message. Any view or 
opinions presented are solely those of the author and do not necessarily 
represent those of the ITRS Group unless otherwise specifically stated.
[itrs.email.signature]


Kafka Consumer - How to set fetch.max.bytes higher than the default 50mb?

2022-12-06 Thread Rama Eshel
I want my consumers to process large batches, so I aim to have the consumer
listener "awake", say, on 1800mb of data or every 5min, whichever comes
first.

Mine is a kafka-springboot application, the topic has 28 partitions, and
this is the configuration I explicitly change:

| Parameter | Value I set | Default Value | Why I set it
this way   |
| - | --- | - |
--- |
| fetch.max.bytes   | 1801mb  | 50mb  |
fetch.min.bytes+1mb |
| fetch.min.bytes   | 1800mb  | 1b| desired batch
size  |
| fetch.max.wait.ms | 5min| 500ms | desired cadence
|
| max.partition.fetch.bytes | 1801mb  | 1mb   | unbalanced
partitions   |
| request.timeout.ms| 5min+1sec   | 30sec |
fetch.max.wait.ms + 1sec|
| max.poll.records  | 1   | 500   | 1500 found too
low  |
| max.poll.interval.ms  | 5min+1sec   | 5min  |
fetch.max.wait.ms + 1sec|

Nevertheless, I produce ~2gb of data to the topic, and I see the
consumer-listener (a Batch Listener) is called many times per second -- way
more than desired rate.

I logged the serialized-size of the `ConsumerRecords` argument, and
found that it is never more than 55mb.
This hints that I was not able to set fetch.max.bytes above the default
50mb.

Any idea how I can troubleshoot this?



-
Edit:
I found this question:
https://stackoverflow.com/questions/72812954/kafka-msk-a-configuration-of-high-fetch-max-wait-ms-and-fetch-min-bytes-is-beh?rq=1

Is it really impossible as stated?


Can not start python kafka consumer using SASL/SCRAM

2022-11-21 Thread Huy Nguyen
I'm having an issue with Kafka. Whenever the consumer use SASL_SSL or
SASL_PLAINTEXT, it can't start. I've tried to change the consumer to use
PLAINTEXT or SSL (without SASL), and its working fine

Below are my configurations and error:


*config/server.properties:*
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095


ssl.keystore.location=/root/zoo-kaf/kafka_2.12-3.2.3/ssl/KeyStore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/root/zoo-kaf/kafka_2.12-3.2.3/ssl/truststore.jks
ssl.truststore.password=123456
sasl.enabled.mechanisms=SCRAM-SHA-512,PLAIN
listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
required \

   username="alice" \
   password="alice-secret" \
   user_alice="alice-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required \

   username="alice" \
   password="alice-secret" \
   user_alice="alice-secret";
listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
required \

   username="alice" \
   password="alice-secret" \
   user_alice="alice-secret";
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required \

   username="alice" \
   password="alice-secret" \
   user_alice="alice-secret";
...


*consumer.py*
Also, does username alice can be used for `sasl_plain_username`?

from kafka import KafkaConsumer
import logging

logging.basicConfig(level=logging.DEBUG)
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('test-topic', bootstrap_servers='127.0.0.1:9094',
security_protocol="SASL_SSL", sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='alice', sasl_plain_password='alice-secret',
ssl_check_hostname=False, ssl_cafile="/etc/ssl/certs/ca-cert")

Consumer log:

DEBUG:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: creating new socket
DEBUG:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: setting socket option (6, 1, 1)
INFO:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: connecting to 127.0.0.1:9094
[('127.0.0.1', 9094) IPv4]
DEBUG:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: established TCP connection
DEBUG:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: initiating SSL handshake
DEBUG:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: wrapping socket in ssl context
DEBUG:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: completed SSL handshake.
DEBUG:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: initiating SASL authentication
DEBUG:kafka.protocol.parser:Sending request
SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512')
DEBUG:kafka.conn: [IPv4 ('127.0.0.1', 9094)]> Request 1:
SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512')
DEBUG:kafka.protocol.parser:Received correlation id: 1
DEBUG:kafka.protocol.parser:Processing response SaslHandShakeResponse_v0
DEBUG:kafka.conn: [IPv4 ('127.0.0.1', 9094)]> Response 1 (0.7309913635253906
ms): SaslHandShakeResponse_v0(error_code=0,
enabled_mechanisms=['SCRAM-SHA-512', 'PLAIN'])
ERROR:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: Error receiving reply from
server
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 692, in
_try_authenticate_scram
(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
  File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 616, in
_recv_bytes_blocking
raise ConnectionError('Connection reset during recv')
ConnectionError: Connection reset during recv
INFO:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: Closing connection.
KafkaConnectionError:  [IPv4 ('127.0.0.1', 9094)]>: Connection
reset during recv
DEBUG:kafka.conn: [IPv4 ('127.0.0.1', 9094)]>: reconnect backoff
0.055656264781808004 after 1 failures
Traceback (most recent call last):
  File "consumer-scram-ssl.py", line 6, in 
consumer = KafkaConsumer('test-topic', bootstrap_servers='127.0.0.1:9094',
security_protocol="SASL_SSL", sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='alice', sasl_plain_password='alice-secret',
ssl_check_hostname=False, ssl_cafile="/etc/ssl/certs/ca-cert")

  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py",
line 356, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line
244, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line
900, in check_version
raise Errors.NoBrokersAvailable()

Broker doesn't return any log


Re: Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

2022-09-20 Thread Pushkar Deole
Hi Luke,

Thanks for the details... so from explanation above, it seems that both of
these scenarios, I won't be able to avoid duplicates processing, which is
main purpose that I was looking to achieve

scenario 1: consumer shuts down, and doesn't commit offsets of
already polled and processed batch of records (since auto.commit enabled
which would commit on next poll which won't occur when closing consumer).
This would give rise duplicate processing of that batch when partition is
rebalanced to other consumer pod

scenario 2: CooperativeStickyAssignor keeps working on partition before
rebalancing which means again same thing i.e. consumer1 has polled and
processed some records which are not yet committed before rebalancing and
when partition moves over to next consumer, it can process those records
again

On Wed, Sep 21, 2022 at 7:32 AM Luke Chen  wrote:

> Hi
>
> 1. I was under impression, from documentation, that close method waits for
> 30 seconds to complete processing of any in-flight events and then commits
> offsets of last poll. Isn't that true? what does timeout of 30 seconds
> mean?
>
> -> 30 seconds timeout is to have a buffer for graceful closing, ex: commit
> offsets, leave groups,...
> It won't complete processing any in-flight "fetch" events during closing.
>
> 2. How does CoperativeStickyAssignor solve my problem when partitions move
> out to newly added consumer pod. i.e. consumer1 has polled 100 records from
> partition1 and is midway processing those i.e. 50 completed, 50 remaining
> and new consumer is added so partition1 has to move to new consumer2. Since
> auto.commit is enabled, offsets of all 100 polled records will be committed
> only during next poll. 2. How does CoperativeStickyAssignor solve my
> problem when partitions move
> out to newly added consumer pod. i.e. consumer1 has polled 100 records from
> partition1 and is midway processing those i.e. 50 completed, 50 remaining
> and new consumer is added so partition1 has to move to new consumer2. Since
> auto.commit is enabled, offsets of all 100 polled records will be committed
> only during next poll.
>
> -> You're right about the process.
>
> So how does CooperativeStickyAssignore help here to
> wait for process of 100 records and commit their offsets before moving the
> partition to new consumer? Looks like i am missing something
> Looks like i am missing something
>
> -> CooperativeStickyAssignore does the same thing to it, except it will
> keep all the partitions "during rebalancing".
> So, the issue is:
> In eagar protocol (ex: RangeAssignor)
> consumer prepare rebalancing -> commit offsets -> revoke all owned
> partitions -> rebalancing -> received new assignment -> start fetch data
> In cooperative protocol (ex: CooperativeStickyAssignore)
> consumer prepare rebalancing -> commit offsets (but no revoke) ->
> rebalancing -> received new assignment -> revoke partitions not owned
> anymore
>
> So you can see, in cooperative protocol, since it didn't revoke any
> partition before rebalancing, it might fetch more data after offset
> commits.
>
> Hope that's clear
> Luke
>
> On Tue, Sep 20, 2022 at 9:36 PM Pushkar Deole 
> wrote:
>
> > Thanks Luke..
> >
> > 1. I was under impression, from documentation, that close method waits
> for
> > 30 seconds to complete processing of any in-flight events and then
> commits
> > offsets of last poll. Isn't that true? what does timeout of 30 seconds
> > mean?
> >
> > 2. How does CoperativeStickyAssignor solve my problem when partitions
> move
> > out to newly added consumer pod. i.e. consumer1 has polled 100 records
> from
> > partition1 and is midway processing those i.e. 50 completed, 50 remaining
> > and new consumer is added so partition1 has to move to new consumer2.
> Since
> > auto.commit is enabled, offsets of all 100 polled records will be
> committed
> > only during next poll. So how does CooperativeStickyAssignore help here
> to
> > wait for process of 100 records and commit their offsets before moving
> the
> > partition to new consumer? Looks like i am missing something
> >
> > On Fri, Sep 16, 2022 at 7:59 AM Luke Chen  wrote:
> >
> > > Hi Pushkar,
> > >
> > > Here's the answer to your questions:
> > >
> > > > 1. During scale-down operation, I am adding a shutdown hook to the
> Java
> > > Runtime, and calling close on the consumer. As per kafka docs, close
> > > provides 30 sec to commit current offsets if auto.commit is enabled:
> so,
> > i
> > > assume that it will process the current batch of polled records within
> 30
> > > sec timeout before committing offsets and then close the consumer. Is
> my
> > > understanding correct?
> > >
> > > No, close() method is only doing some cleanup and offset commit if
> > needed.
> > > It won't care if the polled records are processed or not.
> > > So, to be clear, the 30 seconds is for consumer to do:
> > > (1) commit offset if auto.commit is enabled (2) leave consumer group
> (3)
> > > other cleanup
> > >
> > > > 2. During scale out 

Re: Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

2022-09-20 Thread Luke Chen
Hi

1. I was under impression, from documentation, that close method waits for
30 seconds to complete processing of any in-flight events and then commits
offsets of last poll. Isn't that true? what does timeout of 30 seconds mean?

-> 30 seconds timeout is to have a buffer for graceful closing, ex: commit
offsets, leave groups,...
It won't complete processing any in-flight "fetch" events during closing.

2. How does CoperativeStickyAssignor solve my problem when partitions move
out to newly added consumer pod. i.e. consumer1 has polled 100 records from
partition1 and is midway processing those i.e. 50 completed, 50 remaining
and new consumer is added so partition1 has to move to new consumer2. Since
auto.commit is enabled, offsets of all 100 polled records will be committed
only during next poll. 2. How does CoperativeStickyAssignor solve my
problem when partitions move
out to newly added consumer pod. i.e. consumer1 has polled 100 records from
partition1 and is midway processing those i.e. 50 completed, 50 remaining
and new consumer is added so partition1 has to move to new consumer2. Since
auto.commit is enabled, offsets of all 100 polled records will be committed
only during next poll.

-> You're right about the process.

So how does CooperativeStickyAssignore help here to
wait for process of 100 records and commit their offsets before moving the
partition to new consumer? Looks like i am missing something
Looks like i am missing something

-> CooperativeStickyAssignore does the same thing to it, except it will
keep all the partitions "during rebalancing".
So, the issue is:
In eagar protocol (ex: RangeAssignor)
consumer prepare rebalancing -> commit offsets -> revoke all owned
partitions -> rebalancing -> received new assignment -> start fetch data
In cooperative protocol (ex: CooperativeStickyAssignore)
consumer prepare rebalancing -> commit offsets (but no revoke) ->
rebalancing -> received new assignment -> revoke partitions not owned
anymore

So you can see, in cooperative protocol, since it didn't revoke any
partition before rebalancing, it might fetch more data after offset commits.

Hope that's clear
Luke

On Tue, Sep 20, 2022 at 9:36 PM Pushkar Deole  wrote:

> Thanks Luke..
>
> 1. I was under impression, from documentation, that close method waits for
> 30 seconds to complete processing of any in-flight events and then commits
> offsets of last poll. Isn't that true? what does timeout of 30 seconds
> mean?
>
> 2. How does CoperativeStickyAssignor solve my problem when partitions move
> out to newly added consumer pod. i.e. consumer1 has polled 100 records from
> partition1 and is midway processing those i.e. 50 completed, 50 remaining
> and new consumer is added so partition1 has to move to new consumer2. Since
> auto.commit is enabled, offsets of all 100 polled records will be committed
> only during next poll. So how does CooperativeStickyAssignore help here to
> wait for process of 100 records and commit their offsets before moving the
> partition to new consumer? Looks like i am missing something
>
> On Fri, Sep 16, 2022 at 7:59 AM Luke Chen  wrote:
>
> > Hi Pushkar,
> >
> > Here's the answer to your questions:
> >
> > > 1. During scale-down operation, I am adding a shutdown hook to the Java
> > Runtime, and calling close on the consumer. As per kafka docs, close
> > provides 30 sec to commit current offsets if auto.commit is enabled: so,
> i
> > assume that it will process the current batch of polled records within 30
> > sec timeout before committing offsets and then close the consumer. Is my
> > understanding correct?
> >
> > No, close() method is only doing some cleanup and offset commit if
> needed.
> > It won't care if the polled records are processed or not.
> > So, to be clear, the 30 seconds is for consumer to do:
> > (1) commit offset if auto.commit is enabled (2) leave consumer group (3)
> > other cleanup
> >
> > > 2. During scale out operation, new pod (consumer) will be added to the
> > consumer group, so partitions of existing consumers will be rebalanced to
> > new consumer. In this case, I want to ensure that the current batch of
> > records polled and being processed by the consumer is processed and
> offsets
> > are committed before partition rebalance happens to new consumer.
> > How can I ensure this with auto-commit enabled?
> >
> > It depends on which version of Kafka you're running, and which
> > `partition.assignment.strategy` you are setting.
> > In Kafka v3.2.1, we found a bug that it'll have chance to process
> duplicate
> > records during rebalance: KAFKA-14196
> > 
> > So, assuming you're using default `partition.assignment.strategy`
> setting,
> > and not in v3.2.1, we can ensure it will not have duplicated consumption.
> > If you set the `partition.assignment.strategy` to
> > cooperativeStickyAssignor, there's a bug that we're still working on:
> > KAFKA-14224 

Re: Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

2022-09-20 Thread Pushkar Deole
Thanks Luke..

1. I was under impression, from documentation, that close method waits for
30 seconds to complete processing of any in-flight events and then commits
offsets of last poll. Isn't that true? what does timeout of 30 seconds mean?

2. How does CoperativeStickyAssignor solve my problem when partitions move
out to newly added consumer pod. i.e. consumer1 has polled 100 records from
partition1 and is midway processing those i.e. 50 completed, 50 remaining
and new consumer is added so partition1 has to move to new consumer2. Since
auto.commit is enabled, offsets of all 100 polled records will be committed
only during next poll. So how does CooperativeStickyAssignore help here to
wait for process of 100 records and commit their offsets before moving the
partition to new consumer? Looks like i am missing something

On Fri, Sep 16, 2022 at 7:59 AM Luke Chen  wrote:

> Hi Pushkar,
>
> Here's the answer to your questions:
>
> > 1. During scale-down operation, I am adding a shutdown hook to the Java
> Runtime, and calling close on the consumer. As per kafka docs, close
> provides 30 sec to commit current offsets if auto.commit is enabled: so, i
> assume that it will process the current batch of polled records within 30
> sec timeout before committing offsets and then close the consumer. Is my
> understanding correct?
>
> No, close() method is only doing some cleanup and offset commit if needed.
> It won't care if the polled records are processed or not.
> So, to be clear, the 30 seconds is for consumer to do:
> (1) commit offset if auto.commit is enabled (2) leave consumer group (3)
> other cleanup
>
> > 2. During scale out operation, new pod (consumer) will be added to the
> consumer group, so partitions of existing consumers will be rebalanced to
> new consumer. In this case, I want to ensure that the current batch of
> records polled and being processed by the consumer is processed and offsets
> are committed before partition rebalance happens to new consumer.
> How can I ensure this with auto-commit enabled?
>
> It depends on which version of Kafka you're running, and which
> `partition.assignment.strategy` you are setting.
> In Kafka v3.2.1, we found a bug that it'll have chance to process duplicate
> records during rebalance: KAFKA-14196
> 
> So, assuming you're using default `partition.assignment.strategy` setting,
> and not in v3.2.1, we can ensure it will not have duplicated consumption.
> If you set the `partition.assignment.strategy` to
> cooperativeStickyAssignor, there's a bug that we're still working on:
> KAFKA-14224 
>
> Thank you.
> Luke
>
> On Wed, Sep 14, 2022 at 3:09 PM Pushkar Deole 
> wrote:
>
> > Hi All,
> >
> > I am hosting kafka consumers inside microservice hosted as kubernetes
> pods,
> > 3 consumers in a consumer group.
> > There is a requirement to add auto-scaling where there will be a single
> pod
> > which will be auto-scaled out or scaled-in based on the load on
> > microservice.
> > So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be
> > scaled down to 2 or 1 pod.
> >
> > Currently, I am using enabled.auto.commit set to 'true' in the consumers
> > and during scale out or scale-in, i want to commit offset of polled and
> > processed records so duplicates won't occur.
> > I have narrowed the problem to 2 scenarios:
> >
> > 1. During scale-down operation, I am adding a shutdown hook to the Java
> > Runtime, and calling close on the consumer. As per kafka docs, close
> > provides 30 sec to commit current offsets if auto.commit is enabled: so,
> i
> > assume that it will process the current batch of polled records within 30
> > sec timeout before committing offsets and then close the consumer. Is my
> > understanding correct?
> >
> > public void close()
> >
> > Close the consumer, waiting for up to the default timeout of 30 seconds
> for
> > any needed cleanup. If auto-commit is enabled, this will commit the
> current
> > offsets if possible within the default timeout. See close(Duration)
> > <
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration-
> > >
> > for
> > details. Note that wakeup()
> > <
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup--
> > >
> > cannot
> > be used to interrupt close.
> >
> > 2. During scale out operation, new pod (consumer) will be added to the
> > consumer group, so partitions of existing consumers will be rebalanced to
> > new consumer. In this case, I want to ensure that the current batch of
> > records polled and being processed by the consumer is processed and
> offsets
> > are committed before partition rebalance happens to new consumer.
> > How can I ensure this with auto-commit enabled?
> >
>


Re: Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

2022-09-15 Thread Luke Chen
Hi Pushkar,

Here's the answer to your questions:

> 1. During scale-down operation, I am adding a shutdown hook to the Java
Runtime, and calling close on the consumer. As per kafka docs, close
provides 30 sec to commit current offsets if auto.commit is enabled: so, i
assume that it will process the current batch of polled records within 30
sec timeout before committing offsets and then close the consumer. Is my
understanding correct?

No, close() method is only doing some cleanup and offset commit if needed.
It won't care if the polled records are processed or not.
So, to be clear, the 30 seconds is for consumer to do:
(1) commit offset if auto.commit is enabled (2) leave consumer group (3)
other cleanup

> 2. During scale out operation, new pod (consumer) will be added to the
consumer group, so partitions of existing consumers will be rebalanced to
new consumer. In this case, I want to ensure that the current batch of
records polled and being processed by the consumer is processed and offsets
are committed before partition rebalance happens to new consumer.
How can I ensure this with auto-commit enabled?

It depends on which version of Kafka you're running, and which
`partition.assignment.strategy` you are setting.
In Kafka v3.2.1, we found a bug that it'll have chance to process duplicate
records during rebalance: KAFKA-14196

So, assuming you're using default `partition.assignment.strategy` setting,
and not in v3.2.1, we can ensure it will not have duplicated consumption.
If you set the `partition.assignment.strategy` to
cooperativeStickyAssignor, there's a bug that we're still working on:
KAFKA-14224 

Thank you.
Luke

On Wed, Sep 14, 2022 at 3:09 PM Pushkar Deole  wrote:

> Hi All,
>
> I am hosting kafka consumers inside microservice hosted as kubernetes pods,
> 3 consumers in a consumer group.
> There is a requirement to add auto-scaling where there will be a single pod
> which will be auto-scaled out or scaled-in based on the load on
> microservice.
> So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be
> scaled down to 2 or 1 pod.
>
> Currently, I am using enabled.auto.commit set to 'true' in the consumers
> and during scale out or scale-in, i want to commit offset of polled and
> processed records so duplicates won't occur.
> I have narrowed the problem to 2 scenarios:
>
> 1. During scale-down operation, I am adding a shutdown hook to the Java
> Runtime, and calling close on the consumer. As per kafka docs, close
> provides 30 sec to commit current offsets if auto.commit is enabled: so, i
> assume that it will process the current batch of polled records within 30
> sec timeout before committing offsets and then close the consumer. Is my
> understanding correct?
>
> public void close()
>
> Close the consumer, waiting for up to the default timeout of 30 seconds for
> any needed cleanup. If auto-commit is enabled, this will commit the current
> offsets if possible within the default timeout. See close(Duration)
> <
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration-
> >
> for
> details. Note that wakeup()
> <
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup--
> >
> cannot
> be used to interrupt close.
>
> 2. During scale out operation, new pod (consumer) will be added to the
> consumer group, so partitions of existing consumers will be rebalanced to
> new consumer. In this case, I want to ensure that the current batch of
> records polled and being processed by the consumer is processed and offsets
> are committed before partition rebalance happens to new consumer.
> How can I ensure this with auto-commit enabled?
>


Kafka consumer ensure offsets committed before partition rebalance to avoid duplicates

2022-09-14 Thread Pushkar Deole
Hi All,

I am hosting kafka consumers inside microservice hosted as kubernetes pods,
3 consumers in a consumer group.
There is a requirement to add auto-scaling where there will be a single pod
which will be auto-scaled out or scaled-in based on the load on
microservice.
So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be
scaled down to 2 or 1 pod.

Currently, I am using enabled.auto.commit set to 'true' in the consumers
and during scale out or scale-in, i want to commit offset of polled and
processed records so duplicates won't occur.
I have narrowed the problem to 2 scenarios:

1. During scale-down operation, I am adding a shutdown hook to the Java
Runtime, and calling close on the consumer. As per kafka docs, close
provides 30 sec to commit current offsets if auto.commit is enabled: so, i
assume that it will process the current batch of polled records within 30
sec timeout before committing offsets and then close the consumer. Is my
understanding correct?

public void close()

Close the consumer, waiting for up to the default timeout of 30 seconds for
any needed cleanup. If auto-commit is enabled, this will commit the current
offsets if possible within the default timeout. See close(Duration)

for
details. Note that wakeup()

cannot
be used to interrupt close.

2. During scale out operation, new pod (consumer) will be added to the
consumer group, so partitions of existing consumers will be rebalanced to
new consumer. In this case, I want to ensure that the current batch of
records polled and being processed by the consumer is processed and offsets
are committed before partition rebalance happens to new consumer.
How can I ensure this with auto-commit enabled?


Intermittent kafka-consumer-perf-test.sh timeouts during tests

2022-08-03 Thread Afonso Mukai
Hi,

I'm running some tests with Kafka (4 broker setup, version 3.2.0) using 
kafka-consumer-perf-test.sh. After starting it multiple times in sequence with 
e.g.

  kafka-consumer-perf-test.sh --bootstrap-server :9092 --topic 
test-topic --messages 5000 --show-detailed-stats --print-metrics

then it often works, but sometimes times out without consuming messages. The 
test-topic topic has three partitions with three replicas each, and as part of 
a previous test I ran a partition reassignment on this topic from brokers 0,1,2 
to 1,2,3.


The detailed metrics in the end include

consumer-coordinator-metrics:failed-rebalance-rate-per-hour:{client-id=perf-consumer-client}
 : 8635.055
consumer-coordinator-metrics:failed-rebalance-total:{client-id=perf-consumer-client}
 : 95.000

and by running it with the logging level set to DEBUG, I repeatedly get the 
following lines in the log (broker IP redacted):

[2022-08-03 17:29:03,517] INFO [Consumer clientId=perf-consumer-client, 
groupId=perf-consumer-61424] Requesting disconnect from last known coordinator 
:9092 (id: 2147483645 rack: null) 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-08-03 17:29:03,614] INFO [Consumer clientId=perf-consumer-client, 
groupId=perf-consumer-61424] Discovered group coordinator :9092 
(id: 2147483645 rack: null) 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-08-03 17:29:03,615] INFO [Consumer clientId=perf-consumer-client, 
groupId=perf-consumer-61424] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-08-03 17:29:03,617] INFO [Consumer clientId=perf-consumer-client, 
groupId=perf-consumer-61424] Group coordinator :9092 (id: 
2147483645 rack: null) is unavailable or invalid due to cause: error response 
NOT_COORDINATOR.isDisconnected: false. Rediscovery will be attempted. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-08-03 17:29:03,617] INFO [Consumer clientId=perf-consumer-client, 
groupId=perf-consumer-61424] Requesting disconnect from last known coordinator 
:9092 (id: 2147483645 rack: null) 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-08-03 17:29:03,617] INFO [Consumer clientId=perf-consumer-client, 
groupId=perf-consumer-61424] JoinGroup failed: This is not the correct 
coordinator. Marking coordinator unknown. Sent generation was 
Generation{generationId=-1, memberId='', protocol='null'} 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

Am I missing some obvious problem here? Is there something I could look for in 
the broker logs to give me a hint of what is causing this?


Thanks in advance,
Afonso



Kafka Consumer JMX incoming byte rate to 0

2022-07-25 Thread Jose Manuel Vega Monroy
Hi there,

Just noticed from time to time, but not so often, Kafka consumer JMX incoming 
byte rate to 0, meanwhile consumer is consuming as expected.

SELECT average(newrelic.timeslice.value)
FROM Metric
WHERE metricTimesliceName = 
'MessageBroker/Kafka/Internal/consumer-node-metrics/incoming-byte-rate'

Kafka consumer version 2.7.1, and broker side is 2.6.2

Any idea which could be the problem?

We are thinking about downgrading consumer version to discard version related.

Thanks

[Logo, company name  Description automatically generated]
Jose Manuel Vega Monroy
Senior Backend Developer
Direct: +350
Mobile: +34(0) 633710634
WHG (International) Ltd  | 6/1 Waterport Place | Gibraltar |



Confidentiality: The contents of this e-mail and any attachments transmitted 
with it are intended to be confidential to the intended recipient; and may be 
privileged or otherwise protected from disclosure. If you are not an intended 
recipient of this e-mail, do not duplicate or redistribute it by any means. 
Please delete it and any attachments and notify the sender that you have 
received it in error. This e-mail is sent by a William Hill PLC group company. 
The William Hill group companies include, among others, William Hill PLC 
(registered number 4212563), William Hill Organization Limited (registered 
number 278208), William Hill US HoldCo Inc, WHG (International) Limited 
(registered number 99191) and Mr Green Limited (registered number C43260). Each 
of William Hill PLC and William Hill Organization Limited is registered in 
England and Wales and has its registered office at 1 Bedford Avenue, London, 
WC1B 3AU, UK. William Hill U.S. HoldCo, Inc. is registered in Delaware and has 
its registered office at 1007 N. Orange Street, 9 Floor, Wilmington, New Castle 
County DE 19801 Delaware, United States of America. WHG (International) Limited 
is registered in Gibraltar and has its registered office at 6/1 Waterport 
Place, Gibraltar. Mr Green Limited is registered in Malta and has its 
registered office at Tagliaferro Business Centre, Level 7, 14 High Street, 
Sliema SLM 1549, Malta. Unless specifically indicated otherwise, the contents 
of this e-mail are subject to contract; and are not an official statement, and 
do not necessarily represent the views, of William Hill PLC, its subsidiaries 
or affiliated companies. Please note that neither William Hill PLC, nor its 
subsidiaries and affiliated companies can accept any responsibility for any 
viruses contained within this e-mail and it is your responsibility to scan any 
emails and their attachments. William Hill PLC, its subsidiaries and affiliated 
companies may monitor e-mail traffic data and also the content of e-mails for 
effective operation of the e-mail system, or for security, purposes.


Re: Kafka consumer filtering

2022-06-14 Thread Anders Engström
Hi -

depending on the rules for how to filter/drop incoming messages (and
depending on the mechanics of the library you use to consume the messages),
it might be possible to filter out messages based on message headers,
maybe? That way you would not need to deserialize the message key/value
before deciding if the message should be dropped or not.

/Anders

On Mon, Jun 13, 2022 at 5:53 PM abdelali elmantagui <
abdelalielmanta...@gmail.com> wrote:

> Hi All,
>
> I started a couple of weeks ago learning Kafka, and my goal is to optimize
> an existing architecture that uses Kafka in its components.
> The problem is that there many microservices that produce messages/events
> to the the kafka topic and in the other hand theres other microservices
> that consumes these messages/events and each microservice have to consume
> all the messages and then filter which message are interested in and that
> create a problem of huge memory usage because of the huge anount of objects
> created in the memory after deserilaization of these messages.
>
> Am asking for any concept or solution that can help in this situation.
>
> Kind Regards,
> Abdelali
>
> ++
> +-+ | | +--+
> | microservices |-->| Kafka topic | > | microservices |
> +-+ | | +--+
> ++
>
>


Re: Kafka consumer filtering

2022-06-13 Thread Jamie
Hi abdelali,
If you can’t get your producers to send the different types of events to 
different topics (or you don’t want to) you could use Kafka streams to filter 
the data in the topic to new topics that are subsets of the data. 

I have also seen apache spark used to do similar.
Thanks,
Jamie 


Sent from the all-new AOL app for iOS


On Monday, June 13, 2022, 4:53 pm, abdelali elmantagui 
 wrote:

Hi All,

I started a couple of weeks ago learning Kafka, and my goal is to optimize an 
existing architecture that uses Kafka in its components.
The problem is that there many microservices that produce messages/events to 
the the kafka topic and in the other hand theres other microservices that 
consumes these messages/events and each microservice have to consume all the 
messages and then filter which message are interested in and that create a 
problem of huge memory usage because of the huge anount of objects created in 
the memory after deserilaization of these messages.

Am asking for any concept or solution that can help in this situation.

Kind Regards,
Abdelali

++
+-+ | | +--+
| microservices |-->| Kafka topic | > | microservices |
+-+ | | +--+
++






Kafka consumer filtering

2022-06-13 Thread abdelali elmantagui
Hi All,

I started a couple of weeks ago learning Kafka, and my goal is to optimize an 
existing architecture that uses Kafka in its components.
The problem is that there many microservices that produce messages/events to 
the the kafka topic and in the other hand theres other microservices that 
consumes these messages/events and each microservice have to consume all the 
messages and then filter which message are interested in and that create a 
problem of huge memory usage because of the huge anount of objects created in 
the memory after deserilaization of these messages.

Am asking for any concept or solution that can help in this situation.

Kind Regards,
Abdelali

++
+-+ | | +--+
| microservices |-->| Kafka topic | > | microservices |
+-+ | | +--+
++



Re: GCP Dataproc - Failed to construct kafka consumer, Failed to load SSL keystore dataproc-versa-sase-p12-1.jks of type JKS

2022-02-02 Thread karan alang
re-checking to see if there is any suggestion on this issue.



On Wed, Feb 2, 2022 at 3:36 PM karan alang  wrote:

> Hello All,
>
> I'm trying to run a Structured Streaming program on GCP Dataproc, which
> accesses the data from Kafka and prints it.
>
> Access to Kafka is using SSL, and the truststore and keystore files are
> stored in buckets. I'm using Google Storage API to access the bucket, and
> store the file in the current working directory. The truststore and
> keystores are passed onto the Kafka Consumer/Producer. However - i'm
> getting an error
> Failed to construct kafka consumer, Failed to load SSL keystore
> dataproc-versa-sase-p12-1.jks of type JKS
>
> Details in stackoverflow -
> https://stackoverflow.com/questions/70964198/gcp-dataproc-failed-to-construct-kafka-consumer-failed-to-load-ssl-keystore-d
>
> From my local m/c, the same code is working fine .. though i'm using PKCS
> format truststore/keystore, on Dataproc - it is expecting JKS format files.
>
> Any ideas on how to debug/fix this ?
>
> tia!
>
>


GCP Dataproc - Failed to construct kafka consumer, Failed to load SSL keystore dataproc-versa-sase-p12-1.jks of type JKS

2022-02-02 Thread karan alang
Hello All,

I'm trying to run a Structured Streaming program on GCP Dataproc, which
accesses the data from Kafka and prints it.

Access to Kafka is using SSL, and the truststore and keystore files are
stored in buckets. I'm using Google Storage API to access the bucket, and
store the file in the current working directory. The truststore and
keystores are passed onto the Kafka Consumer/Producer. However - i'm
getting an error
Failed to construct kafka consumer, Failed to load SSL keystore
dataproc-versa-sase-p12-1.jks of type JKS

Details in stackoverflow -
https://stackoverflow.com/questions/70964198/gcp-dataproc-failed-to-construct-kafka-consumer-failed-to-load-ssl-keystore-d

>From my local m/c, the same code is working fine .. though i'm using PKCS
format truststore/keystore, on Dataproc - it is expecting JKS format files.

Any ideas on how to debug/fix this ?

tia!


Re: Kafka Consumer Fairness when fetching events from different partitions.

2022-02-01 Thread Mazen Ezzeddine
Hello Chen and Edward ,

I have went as well through the  documentation in 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records#KIP41:KafkaConsumerMaxRecords-EnsuringFairConsumption

And it turns out that the consumer polls in a greedy round-robin algorithm. 
Confirming perhaps what Chad has observed experimentally (within the same poll: 
prioritize fetching events from the same partition).

This means that considering the example in my initial question :  all 100 
messages will be fetched from the first partition P1 on the first poll. On the 
second poll, the consumer will poll from P2 all the messages there, since there 
are 50 messages on that partition which is less than max.poll.records.  it will 
as  poll 50 more messages from P3.

So in the first poll priority will be given to P1 (up to max.poll.records out 
of P1), if P1 has less than max.poll.records events, only then events from P2 
will be fetched and so on.
However, on the next poll priority will be given to the set of partitions not 
being visited (fetched from) in the last poll and so on.

I hope that someone from Confluent could confirm and/or correct/comment this 
thread. Indeed, random fetching or greedy polling with being inter-poll unfair 
(always prioritize the same partitions) could lead to starvation of some 
partitions. Hence, theoretically there might unbounded latency on some events, 
and no latency SLA can be granted (I am working on kafka event latency SLA 
research project ).

Thank you.


From: Edward Capriolo 
Sent: Monday, January 31, 2022 11:28 PM
To: users@kafka.apache.org 
Subject: Re: Kafka Consumer Fairness when fetching events from different 
partitions.

On Monday, January 31, 2022, Chad Preisler  wrote:

> Hello,
>
> I got this from the JavaDocs for KafkaConsumer.
>
>  * If a consumer is assigned multiple partitions to fetch data from, it
> will try to consume from all of them at the same time,
>  * effectively giving these partitions the same priority for consumption.
> However in some cases consumers may want to
>  * first focus on fetching from some subset of the assigned partitions at
> full speed, and only start fetching other partitions
>  * when these partitions have few or no data to consume.
>
> *  One of such cases is stream processing, where processor fetches from two
> topics and performs the join on these two streams.
>  * When one of the topics is long lagging behind the other, the processor
> would like to pause fetching from the ahead topic
>  * in order to get the lagging stream to catch up. Another example is
> bootstraping upon consumer starting up where there are
>  * a lot of history data to catch up, the applications usually want to get
> the latest data on some of the topics before consider
>  * fetching other topics.
>
> I'm testing a consumer now. When the topic being read has the following
> lag.
>
> consumer group partition: 0, offset: 254, lag: 12301
> consumer group partition: 1, offset: 302, lag: 12216
> consumer group partition: 2, offset: 300, lag: 12257
> consumer group partition: 3, offset: 259, lag: 12108
>
> My consumer is starting with partition 3 and catching all the way up, then
> it starts reading the rest of the partitions evenly. I'm not sure why it is
> happening that way.
>
> Hope this helps.
>
>
>
>
>
> On Sun, Jan 23, 2022 at 1:58 AM Mazen Ezzeddine <
> mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:
>
> > Dear all,
> >
> > Consider a kafka topic deployment with 3 partitions P1, P2, P3 with
> > events/records lagging in the partitions equal to 100, 50, 75 for P1, P2,
> > P3 respectively. And let’s suppose that num.poll.records (the maximum
> > number of records that can be fetched from the broker ) is equal to 100.
> >
> > If the consumer sends a request to fetch records from P1, P2, P3,  is
> > there any guarantee that the returned records will be fairly/uniformly
> > selected out of the available partitions e.g., say 34 records from P1, 33
> > from P2 and 33 from P3.
> >
> > Otherwise, how the decision on the returned records is handled (e.g., is
> > it based on the first partition  leader that replies to the fetch request
> > e.g., say P1..). In such case how eventual fairness is guaranteed across
> > different partitions,  in case for example when records happen to be
> > fetched/read from a single partition.
> >
> > Thank you.
> >
> >
>

What I have noticed anecdotally. The order is random. Two consumers reading
the same messages from the same group will get messages in different orders.

Also if you get backlogged and partitions have depth  you tend to get all
the data from a partition before it moves onto the next. But this behavior
is likely very version and client dependent.

The order you consume shouldn't matter but in practice everything matters
at least a little evit to someone.


--
Sorry this was sent from mobile. Will do less grammar and spell check than
usual.


Re: Kafka Consumer Fairness when fetching events from different partitions.

2022-01-31 Thread Edward Capriolo
On Monday, January 31, 2022, Chad Preisler  wrote:

> Hello,
>
> I got this from the JavaDocs for KafkaConsumer.
>
>  * If a consumer is assigned multiple partitions to fetch data from, it
> will try to consume from all of them at the same time,
>  * effectively giving these partitions the same priority for consumption.
> However in some cases consumers may want to
>  * first focus on fetching from some subset of the assigned partitions at
> full speed, and only start fetching other partitions
>  * when these partitions have few or no data to consume.
>
> *  One of such cases is stream processing, where processor fetches from two
> topics and performs the join on these two streams.
>  * When one of the topics is long lagging behind the other, the processor
> would like to pause fetching from the ahead topic
>  * in order to get the lagging stream to catch up. Another example is
> bootstraping upon consumer starting up where there are
>  * a lot of history data to catch up, the applications usually want to get
> the latest data on some of the topics before consider
>  * fetching other topics.
>
> I'm testing a consumer now. When the topic being read has the following
> lag.
>
> consumer group partition: 0, offset: 254, lag: 12301
> consumer group partition: 1, offset: 302, lag: 12216
> consumer group partition: 2, offset: 300, lag: 12257
> consumer group partition: 3, offset: 259, lag: 12108
>
> My consumer is starting with partition 3 and catching all the way up, then
> it starts reading the rest of the partitions evenly. I'm not sure why it is
> happening that way.
>
> Hope this helps.
>
>
>
>
>
> On Sun, Jan 23, 2022 at 1:58 AM Mazen Ezzeddine <
> mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:
>
> > Dear all,
> >
> > Consider a kafka topic deployment with 3 partitions P1, P2, P3 with
> > events/records lagging in the partitions equal to 100, 50, 75 for P1, P2,
> > P3 respectively. And let’s suppose that num.poll.records (the maximum
> > number of records that can be fetched from the broker ) is equal to 100.
> >
> > If the consumer sends a request to fetch records from P1, P2, P3,  is
> > there any guarantee that the returned records will be fairly/uniformly
> > selected out of the available partitions e.g., say 34 records from P1, 33
> > from P2 and 33 from P3.
> >
> > Otherwise, how the decision on the returned records is handled (e.g., is
> > it based on the first partition  leader that replies to the fetch request
> > e.g., say P1..). In such case how eventual fairness is guaranteed across
> > different partitions,  in case for example when records happen to be
> > fetched/read from a single partition.
> >
> > Thank you.
> >
> >
>

What I have noticed anecdotally. The order is random. Two consumers reading
the same messages from the same group will get messages in different orders.

Also if you get backlogged and partitions have depth  you tend to get all
the data from a partition before it moves onto the next. But this behavior
is likely very version and client dependent.

The order you consume shouldn't matter but in practice everything matters
at least a little evit to someone.


-- 
Sorry this was sent from mobile. Will do less grammar and spell check than
usual.


Re: Kafka Consumer Fairness when fetching events from different partitions.

2022-01-31 Thread Chad Preisler
Hello,

I got this from the JavaDocs for KafkaConsumer.

 * If a consumer is assigned multiple partitions to fetch data from, it
will try to consume from all of them at the same time,
 * effectively giving these partitions the same priority for consumption.
However in some cases consumers may want to
 * first focus on fetching from some subset of the assigned partitions at
full speed, and only start fetching other partitions
 * when these partitions have few or no data to consume.

*  One of such cases is stream processing, where processor fetches from two
topics and performs the join on these two streams.
 * When one of the topics is long lagging behind the other, the processor
would like to pause fetching from the ahead topic
 * in order to get the lagging stream to catch up. Another example is
bootstraping upon consumer starting up where there are
 * a lot of history data to catch up, the applications usually want to get
the latest data on some of the topics before consider
 * fetching other topics.

I'm testing a consumer now. When the topic being read has the following lag.

consumer group partition: 0, offset: 254, lag: 12301
consumer group partition: 1, offset: 302, lag: 12216
consumer group partition: 2, offset: 300, lag: 12257
consumer group partition: 3, offset: 259, lag: 12108

My consumer is starting with partition 3 and catching all the way up, then
it starts reading the rest of the partitions evenly. I'm not sure why it is
happening that way.

Hope this helps.





On Sun, Jan 23, 2022 at 1:58 AM Mazen Ezzeddine <
mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:

> Dear all,
>
> Consider a kafka topic deployment with 3 partitions P1, P2, P3 with
> events/records lagging in the partitions equal to 100, 50, 75 for P1, P2,
> P3 respectively. And let’s suppose that num.poll.records (the maximum
> number of records that can be fetched from the broker ) is equal to 100.
>
> If the consumer sends a request to fetch records from P1, P2, P3,  is
> there any guarantee that the returned records will be fairly/uniformly
> selected out of the available partitions e.g., say 34 records from P1, 33
> from P2 and 33 from P3.
>
> Otherwise, how the decision on the returned records is handled (e.g., is
> it based on the first partition  leader that replies to the fetch request
> e.g., say P1..). In such case how eventual fairness is guaranteed across
> different partitions,  in case for example when records happen to be
> fetched/read from a single partition.
>
> Thank you.
>
>


Kafka Consumer Fairness when fetching events from different partitions.

2022-01-22 Thread Mazen Ezzeddine
Dear all,

Consider a kafka topic deployment with 3 partitions P1, P2, P3 with 
events/records lagging in the partitions equal to 100, 50, 75 for P1, P2, P3 
respectively. And let’s suppose that num.poll.records (the maximum number of 
records that can be fetched from the broker ) is equal to 100.

If the consumer sends a request to fetch records from P1, P2, P3,  is there any 
guarantee that the returned records will be fairly/uniformly selected out of 
the available partitions e.g., say 34 records from P1, 33 from P2 and 33 from 
P3.

Otherwise, how the decision on the returned records is handled (e.g., is it 
based on the first partition  leader that replies to the fetch request e.g., 
say P1..). In such case how eventual fairness is guaranteed across different 
partitions,  in case for example when records happen to be fetched/read from a 
single partition.

Thank you.



Re: New Kafka Consumer : unknown member id

2021-11-05 Thread Kafka Life
Hello Luke

i have build a new kafka environment with kafka 2.8.0

the consumer is a new consumer set up to this environment is throwing the
below error... the old consumers for the same applications for the same
environment -2.8.0 is working fine.. .

could you please advise

2021-11-02 12:25:24 DEBUG AbstractCoordinator:557 - [Consumer
clientId=, groupId=] Attempt to join group failed due to unknown
member id.

On Fri, Oct 29, 2021 at 7:36 AM Luke Chen  wrote:

> Hi,
> Which version of kafka client are you using?
> I can't find this error message in the source code.
> When googling this error message, it showed the error is in Kafka v0.9.
>
> Could you try to use the V3.0.0 and see if that issue still exist?
>
> Thank you.
> Luke
>
> On Thu, Oct 28, 2021 at 11:15 PM Kafka Life 
> wrote:
>
> > Dear Kafka Experts
> >
> > We have set up a group.id (consumer ) = YYY
> > But when tried to connect to kafka instance : i get this error message. I
> > am sure this consumer (group id does not exist in kafka) .We user plain
> > text protocol to connect to kafka 2.8.0. Please suggest how to resolve
> this
> > issue.
> >
> > DEBUG AbstractCoordinator:557 - [Consumer clientId=X,
> groupId=YYY]
> > Attempt to join group failed due to unknown member id.
> >
>


Re: New Kafka Consumer : unknown member id

2021-10-28 Thread Luke Chen
Hi,
Which version of kafka client are you using?
I can't find this error message in the source code.
When googling this error message, it showed the error is in Kafka v0.9.

Could you try to use the V3.0.0 and see if that issue still exist?

Thank you.
Luke

On Thu, Oct 28, 2021 at 11:15 PM Kafka Life  wrote:

> Dear Kafka Experts
>
> We have set up a group.id (consumer ) = YYY
> But when tried to connect to kafka instance : i get this error message. I
> am sure this consumer (group id does not exist in kafka) .We user plain
> text protocol to connect to kafka 2.8.0. Please suggest how to resolve this
> issue.
>
> DEBUG AbstractCoordinator:557 - [Consumer clientId=X, groupId=YYY]
> Attempt to join group failed due to unknown member id.
>


New Kafka Consumer : unknown member id

2021-10-28 Thread Kafka Life
Dear Kafka Experts

We have set up a group.id (consumer ) = YYY
But when tried to connect to kafka instance : i get this error message. I
am sure this consumer (group id does not exist in kafka) .We user plain
text protocol to connect to kafka 2.8.0. Please suggest how to resolve this
issue.

DEBUG AbstractCoordinator:557 - [Consumer clientId=X, groupId=YYY]
Attempt to join group failed due to unknown member id.


Re: Kafka consumer and heartbeat threads stuck/hanging up

2021-08-26 Thread Shantam Garg(Customer Service and Transact)
On Thu, Aug 26, 2021 at 4:44 PM Shantam Garg(Customer Service and Transact)
 wrote:

> Hello all,
>
> I have come across this behavior in our production cluster where the*
> Kafka consumers and heartbeat threads are getting stuck/hang-up* without
> any error.
>
> As the heartbeat threads as stuck - the session timeout is breached and
> the consumers are thrown on the live consumer group. I have tried debugging
> it by enabling debug logs for Coordinator and Consumer but couldn't find
> the reason for this, my hunch is that it's getting stuck in some deadlock
> but not sure what's causing it.
> Please let me know if anyone has any idea on how to debug this issue.
>
> Kafka version:* 2.7*
> Consumer client:* kafka-client:2.7*
>
> *Configs:*
> max.poll.interval.ms: 30
> session.timeout.ms: 1
> heartbeat.interval.ms: 3000
> fetch.min.bytes: 1
> fetch.max.bytes: 10485760
> (Note: I have tried increasing the session timeout to 60 seconds also but
> the same issue happens.)
>
>
> *Coordinator logs:*INFO [2021-08-26 13:38:10,136] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]:
> *Member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_32-b153e17f-d864-4e0f-a362-996142211fb2
> in group fkw_secor_prod_group_live has failed, removing it from the
> group INFO [2021-08-26 13:38:10,136] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Preparing
> to rebalance group fkw_secor_prod_group_live in state PreparingRebalance
> with old generation 1359 (__consumer_offsets-180) (reason: removing member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_32-b153e17f-d864-4e0f-a362-996142211fb2
> on heartbeat expiration)*
>  INFO [2021-08-26 13:38:10,140] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_31-c80c0c1d-4bd9-41a8-b6e3-6f7ecd7447b2
> in group fkw_secor_prod_group_live has failed, removing it from the group
>  INFO [2021-08-26 13:38:10,363] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_33-bed07f3c-3a10-4e51-bab6-d921de81083e
> in group fkw_secor_prod_group_live has failed, removing it from the group
>  INFO [2021-08-26 13:38:10,436] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_27-5f98ee05-633c-4cee-99b1-03e4f98ae52d
> in group fkw_secor_prod_group_live has failed, removing it from the group
>  INFO [2021-08-26 13:38:10,484] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_29-b5ddfc49-5ce1-41fa-b166-bcc1554d
> in group fkw_secor_prod_group_live has failed, removing it from the group
>  INFO [2021-08-26 13:38:10,545] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_34-88dd2f87-9b35-4966-9436-6ea4019e1c80
> in group fkw_secor_prod_group_live has failed, removing it from the group
>  INFO [2021-08-26 13:38:11,367] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_35-fa1db325-b70f-4bba-97c3-3170298cb194
> in group fkw_secor_prod_group_live has failed, removing it from the group
>  INFO [2021-08-26 13:38:11,421] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_30-68cdeafa-c6cc-4ac9-aa9c-6ed8b03e2b42
> in group fkw_secor_prod_group_live has failed, removing it from the group
>  INFO [2021-08-26 13:38:11,500] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_26-8bfb73de-7c1e-4bc2-a65d-4ce66dfb8cd8
> in group fkw_secor_prod_group_live has failed, removing it from the group
>  INFO [2021-08-26 13:38:13,055] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
> fkw-secor-fkw-secor-prod-cluster-8344558_14763_28-fe52a1cf-3380-407c-91dd-9baa059e4bf8
> in group fkw_secor_prod_group_live has failed, removing it from the group
> * INFO [2021-08-26 13:38:13,056] [executor-Heartbeat][]
> kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Group
> fkw_secor_prod_group_live with generation 1360 is now empty
> (__consumer_offsets-180)*
>
> *Last consumer Logs for member
> (fkw-secor-fkw-secor-prod-cluster-8344558_14763_32):*
> 2021-08-26 13:38:03,009 [Thread-10]
> (org.apache.kafka.clients.consumer.internals.Fetcher:266) DEBUG [Consumer
> clientId=fkw-secor-fkw-secor-prod-cluster-8344558_14763_32,
> groupId=fkw_secor_prod_group_live] Sending READ_UNCOMMITTED
> 

Kafka consumer and heartbeat threat hanging up

2021-08-26 Thread Shantam Garg(Customer Service and Transact)
Hello all,

I have come across this behavior in our production cluster where the* Kafka
consumers and heartbeat threads are getting stuck/hang-up* without any
error.

As the heartbeat threads as stuck - the session timeout is breached and the
consumers are thrown on the live consumer group. I have tried debugging it
by enabling debug logs for Coordinator and Consumer but couldn't find the
reason for this, my hunch is that it's getting stuck in some deadlock but
not sure what's causing it.
Please let me know if anyone has any idea on how to debug this issue.

Kafka version:* 2.7*
Consumer client:* kafka-client:2.7*

*Configs:*
max.poll.interval.ms: 30
session.timeout.ms: 1
heartbeat.interval.ms: 3000
fetch.min.bytes: 1
fetch.max.bytes: 10485760
(Note: I have tried increasing the session timeout to 60 seconds also but
the same issue happens.)


*Coordinator logs:*INFO [2021-08-26 13:38:10,136] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]:
*Member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_32-b153e17f-d864-4e0f-a362-996142211fb2
in group fkw_secor_prod_group_live has failed, removing it from the
group INFO [2021-08-26 13:38:10,136] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Preparing
to rebalance group fkw_secor_prod_group_live in state PreparingRebalance
with old generation 1359 (__consumer_offsets-180) (reason: removing member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_32-b153e17f-d864-4e0f-a362-996142211fb2
on heartbeat expiration)*
 INFO [2021-08-26 13:38:10,140] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_31-c80c0c1d-4bd9-41a8-b6e3-6f7ecd7447b2
in group fkw_secor_prod_group_live has failed, removing it from the group
 INFO [2021-08-26 13:38:10,363] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_33-bed07f3c-3a10-4e51-bab6-d921de81083e
in group fkw_secor_prod_group_live has failed, removing it from the group
 INFO [2021-08-26 13:38:10,436] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_27-5f98ee05-633c-4cee-99b1-03e4f98ae52d
in group fkw_secor_prod_group_live has failed, removing it from the group
 INFO [2021-08-26 13:38:10,484] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_29-b5ddfc49-5ce1-41fa-b166-bcc1554d
in group fkw_secor_prod_group_live has failed, removing it from the group
 INFO [2021-08-26 13:38:10,545] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_34-88dd2f87-9b35-4966-9436-6ea4019e1c80
in group fkw_secor_prod_group_live has failed, removing it from the group
 INFO [2021-08-26 13:38:11,367] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_35-fa1db325-b70f-4bba-97c3-3170298cb194
in group fkw_secor_prod_group_live has failed, removing it from the group
 INFO [2021-08-26 13:38:11,421] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_30-68cdeafa-c6cc-4ac9-aa9c-6ed8b03e2b42
in group fkw_secor_prod_group_live has failed, removing it from the group
 INFO [2021-08-26 13:38:11,500] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_26-8bfb73de-7c1e-4bc2-a65d-4ce66dfb8cd8
in group fkw_secor_prod_group_live has failed, removing it from the group
 INFO [2021-08-26 13:38:13,055] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Member
fkw-secor-fkw-secor-prod-cluster-8344558_14763_28-fe52a1cf-3380-407c-91dd-9baa059e4bf8
in group fkw_secor_prod_group_live has failed, removing it from the group
* INFO [2021-08-26 13:38:13,056] [executor-Heartbeat][]
kafka.coordinator.group.GroupCoordinator - [GroupCoordinator 1]: Group
fkw_secor_prod_group_live with generation 1360 is now empty
(__consumer_offsets-180)*

*Last consumer Logs for member
(fkw-secor-fkw-secor-prod-cluster-8344558_14763_32):*
2021-08-26 13:38:03,009 [Thread-10]
(org.apache.kafka.clients.consumer.internals.Fetcher:266) DEBUG [Consumer
clientId=fkw-secor-fkw-secor-prod-cluster-8344558_14763_32,
groupId=fkw_secor_prod_group_live] Sending READ_UNCOMMITTED
IncrementalFetchRequest(toSend=(ing.fkw.demand.tracking.clickstream-7),
toForget=(), implied=(ing.fkw.credit.tijori.LoanAccount-0,
cdc.fkw.ful.ssms.shipment.shipment_wave-1,
cdc.fkw.ful.inventory.fkw_wh_inventory.wid_wsn-0,
cdc.fkw.ful.inventory.fkw_wh_inventory.warehouse_inventory_issue-1,

Kafka-consumer-groups.sh says "no active members" but the CURRENT_OFFSET is moving

2021-08-17 Thread Mayuresh Gharat
Hi Folks,

We recently came across a weird scenario where we had a consumer group
consuming from multiple topics. When we ran the "Kafka-consumer-group"
command multiple times, we saw that the CURRENT-OFFSET is advancing;
however , we also saw a line printed:
*"Consumer group 'GROUP_ABC' has no active members."*

The consumer lag graph shows no data from this group.


Here is the output from the Kafka-consumer-groups.sh:

./bin/kafka-consumer-groups.sh --bootstrap-server  --group
GROUP_ABC --describe
*Consumer group 'GROUP_ABC' has no active members.*

GROUP TOPIC
PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID
HOSTCLIENT-ID*GROUP_ABC*
TOPIC_A 0  2280611 3861697
1581086 -   -   -
*GROUP_ABC
*TOPIC_A 1  3845015 3845015
 0   -   -   -
*GROUP_ABC*  Topic_B 2
 530 530 0   -
  -   -*GROUP_ABC
  *TOPIC_A 2  2387180
   3736450 1349270 -   -   -
*GROUP_ABC*  TOPIC_B 0
 655 655 0   -
  -   -*GROUP_ABC*  TOPIC_B
 1  563 563 0
 -   -   -


Any idea why this might be happening?


Thanks and regards,

Mayuresh


Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-18 Thread Lerh Chuan Low
Yep that's the one. You can see here for a great example on the typical
flow https://www.confluent.io/blog/transactions-apache-kafka/.

On Sat, Jul 17, 2021 at 3:24 AM Pushkar Deole  wrote:

> Hi Lerh Chuan Low,
>
> MAny thanks for your response. I get it now, that it provides exactly-once
> semantics i.e it looks to user that it is processed exactly once.
> Also, i am clear on the aspect about read_committed level so the
> uncommitted transaction and hence uncommitted send won't be visible to
> consumers.
>
> However one last query i have is how to make sure that as part of the same
> transaction, i am also sending and also committing offsets. Which API
> should i look at: is this correct API :
> KafkaProducer.
> sendOffsetsToTransaction
>
> On Fri, Jul 16, 2021 at 9:57 PM Lerh Chuan Low 
> wrote:
>
> > Pushkar,
> >
> > My understanding is you can easily turn it on by using Kafka streams as
> > Chris mentioned. Otherwise you'd have to do it yourself - I don't think
> you
> > can get exactly once processing, but what you can do (which is also what
> > Kafka streams does) is exactly once schematics (You won't be able to get
> > every message processed exactly once in the system, but they could look
> > like they had been processed exactly once), The final piece of the puzzle
> > besides using idempotent producers and transactions is to set consumers
> of
> > the downstream topic to *read_committed: true*. So in your example the
> > messages would still have made it to the destination topic, however
> because
> > the transaction has not yet been completed, the downstream consumer would
> > ignore them.
> >
> > You can still only do exactly once processing up to the boundaries of
> > Kafka, that said. Wherever Kafka terminates you'd have to code it
> yourself.
> >
> >
> >
> > On Sat, Jul 17, 2021 at 2:01 AM Pushkar Deole 
> > wrote:
> >
> > > Chris,
> > >
> > > I am not sure how this solves the problem scenario that we are
> > experiencing
> > > in customer environment: the scenario is:
> > > 1. application consumed record and processed it
> > > 2. the processed record is produced on destination topic and ack is
> > > received
> > > 3. Before committing offset back to consumed topic, the application pod
> > > crashed or shut down by kubernetes or shut down due to some other issue
> > >
> > > On Fri, Jul 16, 2021 at 8:57 PM Chris Larsen
> >  > > >
> > > wrote:
> > >
> > > > It is not possible out of the box, it is something you’ll have to
> write
> > > > yourself. Would the following work?
> > > >
> > > > Consume -> Produce to primary topic-> get success ack back -> commit
> > the
> > > > consume
> > > >
> > > > Else if ack fails, produce to dead letter, then commit upon success
> > > >
> > > > Else if dead letter ack fails, exit (and thus don’t commit)
> > > >
> > > > Does that help? Someone please feel free to slap my hand but seems
> > legit
> > > to
> > > > me ;)
> > > >
> > > > Chris
> > > >
> > > >
> > > >
> > > > On Fri, Jul 16, 2021 at 10:48 Pushkar Deole 
> > > wrote:
> > > >
> > > > > Thanks Chris for the response!
> > > > >
> > > > > The current application is quite evolved and currently using
> > > > >
> > > > > consumer-producer model described above and we need to fix some
> bugs
> > > soon
> > > > >
> > > > > for a customer. So, moving to kafka streams seems bigger work.
> That's
> > > why
> > > > >
> > > > > looking at work around if same thing can be achieved with current
> > model
> > > > >
> > > > > using transactions that span across consumer offset commits and
> > > producer
> > > > >
> > > > > send.
> > > > >
> > > > >
> > > > >
> > > > > We have made the producer idempotent and turned on transactions.
> > > > >
> > > > > However want to make offset commit to consumer and send from
> producer
> > > to
> > > > be
> > > > >
> > > > > atomic? Is that possible?
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen
> > > >  > 

Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Pushkar Deole
Hi Lerh Chuan Low,

MAny thanks for your response. I get it now, that it provides exactly-once
semantics i.e it looks to user that it is processed exactly once.
Also, i am clear on the aspect about read_committed level so the
uncommitted transaction and hence uncommitted send won't be visible to
consumers.

However one last query i have is how to make sure that as part of the same
transaction, i am also sending and also committing offsets. Which API
should i look at: is this correct API :
KafkaProducer.
sendOffsetsToTransaction

On Fri, Jul 16, 2021 at 9:57 PM Lerh Chuan Low  wrote:

> Pushkar,
>
> My understanding is you can easily turn it on by using Kafka streams as
> Chris mentioned. Otherwise you'd have to do it yourself - I don't think you
> can get exactly once processing, but what you can do (which is also what
> Kafka streams does) is exactly once schematics (You won't be able to get
> every message processed exactly once in the system, but they could look
> like they had been processed exactly once), The final piece of the puzzle
> besides using idempotent producers and transactions is to set consumers of
> the downstream topic to *read_committed: true*. So in your example the
> messages would still have made it to the destination topic, however because
> the transaction has not yet been completed, the downstream consumer would
> ignore them.
>
> You can still only do exactly once processing up to the boundaries of
> Kafka, that said. Wherever Kafka terminates you'd have to code it yourself.
>
>
>
> On Sat, Jul 17, 2021 at 2:01 AM Pushkar Deole 
> wrote:
>
> > Chris,
> >
> > I am not sure how this solves the problem scenario that we are
> experiencing
> > in customer environment: the scenario is:
> > 1. application consumed record and processed it
> > 2. the processed record is produced on destination topic and ack is
> > received
> > 3. Before committing offset back to consumed topic, the application pod
> > crashed or shut down by kubernetes or shut down due to some other issue
> >
> > On Fri, Jul 16, 2021 at 8:57 PM Chris Larsen
>  > >
> > wrote:
> >
> > > It is not possible out of the box, it is something you’ll have to write
> > > yourself. Would the following work?
> > >
> > > Consume -> Produce to primary topic-> get success ack back -> commit
> the
> > > consume
> > >
> > > Else if ack fails, produce to dead letter, then commit upon success
> > >
> > > Else if dead letter ack fails, exit (and thus don’t commit)
> > >
> > > Does that help? Someone please feel free to slap my hand but seems
> legit
> > to
> > > me ;)
> > >
> > > Chris
> > >
> > >
> > >
> > > On Fri, Jul 16, 2021 at 10:48 Pushkar Deole 
> > wrote:
> > >
> > > > Thanks Chris for the response!
> > > >
> > > > The current application is quite evolved and currently using
> > > >
> > > > consumer-producer model described above and we need to fix some bugs
> > soon
> > > >
> > > > for a customer. So, moving to kafka streams seems bigger work. That's
> > why
> > > >
> > > > looking at work around if same thing can be achieved with current
> model
> > > >
> > > > using transactions that span across consumer offset commits and
> > producer
> > > >
> > > > send.
> > > >
> > > >
> > > >
> > > > We have made the producer idempotent and turned on transactions.
> > > >
> > > > However want to make offset commit to consumer and send from producer
> > to
> > > be
> > > >
> > > > atomic? Is that possible?
> > > >
> > > >
> > > >
> > > > On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen
> > >  > > > >
> > > >
> > > > wrote:
> > > >
> > > >
> > > >
> > > > > Pushkar, in kafka development for customer consumer/producer you
> > handle
> > > > it.
> > > >
> > > > > However you can ensure the process stops (or sends message to dead
> > > > letter)
> > > >
> > > > > before manually committing the consumer offset. On the produce side
> > you
> > > > can
> > > >
> > > > > turn on idempotence or transactions. But unless you are using
> > Streams,
> > > > you
> > > >
> > > > > chain those together yoursef. Woul

Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Lerh Chuan Low
Pushkar,

My understanding is you can easily turn it on by using Kafka streams as
Chris mentioned. Otherwise you'd have to do it yourself - I don't think you
can get exactly once processing, but what you can do (which is also what
Kafka streams does) is exactly once schematics (You won't be able to get
every message processed exactly once in the system, but they could look
like they had been processed exactly once), The final piece of the puzzle
besides using idempotent producers and transactions is to set consumers of
the downstream topic to *read_committed: true*. So in your example the
messages would still have made it to the destination topic, however because
the transaction has not yet been completed, the downstream consumer would
ignore them.

You can still only do exactly once processing up to the boundaries of
Kafka, that said. Wherever Kafka terminates you'd have to code it yourself.



On Sat, Jul 17, 2021 at 2:01 AM Pushkar Deole  wrote:

> Chris,
>
> I am not sure how this solves the problem scenario that we are experiencing
> in customer environment: the scenario is:
> 1. application consumed record and processed it
> 2. the processed record is produced on destination topic and ack is
> received
> 3. Before committing offset back to consumed topic, the application pod
> crashed or shut down by kubernetes or shut down due to some other issue
>
> On Fri, Jul 16, 2021 at 8:57 PM Chris Larsen  >
> wrote:
>
> > It is not possible out of the box, it is something you’ll have to write
> > yourself. Would the following work?
> >
> > Consume -> Produce to primary topic-> get success ack back -> commit the
> > consume
> >
> > Else if ack fails, produce to dead letter, then commit upon success
> >
> > Else if dead letter ack fails, exit (and thus don’t commit)
> >
> > Does that help? Someone please feel free to slap my hand but seems legit
> to
> > me ;)
> >
> > Chris
> >
> >
> >
> > On Fri, Jul 16, 2021 at 10:48 Pushkar Deole 
> wrote:
> >
> > > Thanks Chris for the response!
> > >
> > > The current application is quite evolved and currently using
> > >
> > > consumer-producer model described above and we need to fix some bugs
> soon
> > >
> > > for a customer. So, moving to kafka streams seems bigger work. That's
> why
> > >
> > > looking at work around if same thing can be achieved with current model
> > >
> > > using transactions that span across consumer offset commits and
> producer
> > >
> > > send.
> > >
> > >
> > >
> > > We have made the producer idempotent and turned on transactions.
> > >
> > > However want to make offset commit to consumer and send from producer
> to
> > be
> > >
> > > atomic? Is that possible?
> > >
> > >
> > >
> > > On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen
> >  > > >
> > >
> > > wrote:
> > >
> > >
> > >
> > > > Pushkar, in kafka development for customer consumer/producer you
> handle
> > > it.
> > >
> > > > However you can ensure the process stops (or sends message to dead
> > > letter)
> > >
> > > > before manually committing the consumer offset. On the produce side
> you
> > > can
> > >
> > > > turn on idempotence or transactions. But unless you are using
> Streams,
> > > you
> > >
> > > > chain those together yoursef. Would kafka streams work for the
> > operation
> > >
> > > > you’re looking to do?
> > >
> > > >
> > >
> > > > Best,
> > >
> > > > Chris
> > >
> > > >
> > >
> > > > On Fri, Jul 16, 2021 at 08:30 Pushkar Deole 
> > > wrote:
> > >
> > > >
> > >
> > > > > Hi All,
> > >
> > > > >
> > >
> > > > >
> > >
> > > > >
> > >
> > > > > I am using a normal kafka consumer-producer in my microservice,
> with
> > a
> > >
> > > > >
> > >
> > > > > simple model of consume from source topic -> process the record ->
> > >
> > > > produce
> > >
> > > > >
> > >
> > > > > on destination topic.
> > >
> > > > >
> > >
> > > > > I am mainly looking for exactly-once guarantee  wherein the offset
> > > commit
&g

Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Ran Lupovich
Another acceptable solution is doing idempotent actions while if you re
read the message again you will check "did I process it already?" Or doing
upsert... and keep it in at least once semantics

בתאריך יום ו׳, 16 ביולי 2021, 19:10, מאת Ran Lupovich ‏<
ranlupov...@gmail.com>:

> You need to do atomic actions with processing and saving the
> partition/offsets , while rebalance or assign or on initial start events
> you read the offset from the outside store, there are documentation and
> examples on the internet, what type of processing are you doing ?
>
> בתאריך יום ו׳, 16 ביולי 2021, 19:01, מאת Pushkar Deole ‏<
> pdeole2...@gmail.com>:
>
>> Chris,
>>
>> I am not sure how this solves the problem scenario that we are
>> experiencing
>> in customer environment: the scenario is:
>> 1. application consumed record and processed it
>> 2. the processed record is produced on destination topic and ack is
>> received
>> 3. Before committing offset back to consumed topic, the application pod
>> crashed or shut down by kubernetes or shut down due to some other issue
>>
>> On Fri, Jul 16, 2021 at 8:57 PM Chris Larsen > >
>> wrote:
>>
>> > It is not possible out of the box, it is something you’ll have to write
>> > yourself. Would the following work?
>> >
>> > Consume -> Produce to primary topic-> get success ack back -> commit the
>> > consume
>> >
>> > Else if ack fails, produce to dead letter, then commit upon success
>> >
>> > Else if dead letter ack fails, exit (and thus don’t commit)
>> >
>> > Does that help? Someone please feel free to slap my hand but seems
>> legit to
>> > me ;)
>> >
>> > Chris
>> >
>> >
>> >
>> > On Fri, Jul 16, 2021 at 10:48 Pushkar Deole 
>> wrote:
>> >
>> > > Thanks Chris for the response!
>> > >
>> > > The current application is quite evolved and currently using
>> > >
>> > > consumer-producer model described above and we need to fix some bugs
>> soon
>> > >
>> > > for a customer. So, moving to kafka streams seems bigger work. That's
>> why
>> > >
>> > > looking at work around if same thing can be achieved with current
>> model
>> > >
>> > > using transactions that span across consumer offset commits and
>> producer
>> > >
>> > > send.
>> > >
>> > >
>> > >
>> > > We have made the producer idempotent and turned on transactions.
>> > >
>> > > However want to make offset commit to consumer and send from producer
>> to
>> > be
>> > >
>> > > atomic? Is that possible?
>> > >
>> > >
>> > >
>> > > On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen
>> > > > > >
>> > >
>> > > wrote:
>> > >
>> > >
>> > >
>> > > > Pushkar, in kafka development for customer consumer/producer you
>> handle
>> > > it.
>> > >
>> > > > However you can ensure the process stops (or sends message to dead
>> > > letter)
>> > >
>> > > > before manually committing the consumer offset. On the produce side
>> you
>> > > can
>> > >
>> > > > turn on idempotence or transactions. But unless you are using
>> Streams,
>> > > you
>> > >
>> > > > chain those together yoursef. Would kafka streams work for the
>> > operation
>> > >
>> > > > you’re looking to do?
>> > >
>> > > >
>> > >
>> > > > Best,
>> > >
>> > > > Chris
>> > >
>> > > >
>> > >
>> > > > On Fri, Jul 16, 2021 at 08:30 Pushkar Deole 
>> > > wrote:
>> > >
>> > > >
>> > >
>> > > > > Hi All,
>> > >
>> > > > >
>> > >
>> > > > >
>> > >
>> > > > >
>> > >
>> > > > > I am using a normal kafka consumer-producer in my microservice,
>> with
>> > a
>> > >
>> > > > >
>> > >
>> > > > > simple model of consume from source topic -> process the record ->
>> > >
>> > > > produce
>> > >
>> > >

Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Ran Lupovich
You need to do atomic actions with processing and saving the
partition/offsets , while rebalance or assign or on initial start events
you read the offset from the outside store, there are documentation and
examples on the internet, what type of processing are you doing ?

בתאריך יום ו׳, 16 ביולי 2021, 19:01, מאת Pushkar Deole ‏<
pdeole2...@gmail.com>:

> Chris,
>
> I am not sure how this solves the problem scenario that we are experiencing
> in customer environment: the scenario is:
> 1. application consumed record and processed it
> 2. the processed record is produced on destination topic and ack is
> received
> 3. Before committing offset back to consumed topic, the application pod
> crashed or shut down by kubernetes or shut down due to some other issue
>
> On Fri, Jul 16, 2021 at 8:57 PM Chris Larsen  >
> wrote:
>
> > It is not possible out of the box, it is something you’ll have to write
> > yourself. Would the following work?
> >
> > Consume -> Produce to primary topic-> get success ack back -> commit the
> > consume
> >
> > Else if ack fails, produce to dead letter, then commit upon success
> >
> > Else if dead letter ack fails, exit (and thus don’t commit)
> >
> > Does that help? Someone please feel free to slap my hand but seems legit
> to
> > me ;)
> >
> > Chris
> >
> >
> >
> > On Fri, Jul 16, 2021 at 10:48 Pushkar Deole 
> wrote:
> >
> > > Thanks Chris for the response!
> > >
> > > The current application is quite evolved and currently using
> > >
> > > consumer-producer model described above and we need to fix some bugs
> soon
> > >
> > > for a customer. So, moving to kafka streams seems bigger work. That's
> why
> > >
> > > looking at work around if same thing can be achieved with current model
> > >
> > > using transactions that span across consumer offset commits and
> producer
> > >
> > > send.
> > >
> > >
> > >
> > > We have made the producer idempotent and turned on transactions.
> > >
> > > However want to make offset commit to consumer and send from producer
> to
> > be
> > >
> > > atomic? Is that possible?
> > >
> > >
> > >
> > > On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen
> >  > > >
> > >
> > > wrote:
> > >
> > >
> > >
> > > > Pushkar, in kafka development for customer consumer/producer you
> handle
> > > it.
> > >
> > > > However you can ensure the process stops (or sends message to dead
> > > letter)
> > >
> > > > before manually committing the consumer offset. On the produce side
> you
> > > can
> > >
> > > > turn on idempotence or transactions. But unless you are using
> Streams,
> > > you
> > >
> > > > chain those together yoursef. Would kafka streams work for the
> > operation
> > >
> > > > you’re looking to do?
> > >
> > > >
> > >
> > > > Best,
> > >
> > > > Chris
> > >
> > > >
> > >
> > > > On Fri, Jul 16, 2021 at 08:30 Pushkar Deole 
> > > wrote:
> > >
> > > >
> > >
> > > > > Hi All,
> > >
> > > > >
> > >
> > > > >
> > >
> > > > >
> > >
> > > > > I am using a normal kafka consumer-producer in my microservice,
> with
> > a
> > >
> > > > >
> > >
> > > > > simple model of consume from source topic -> process the record ->
> > >
> > > > produce
> > >
> > > > >
> > >
> > > > > on destination topic.
> > >
> > > > >
> > >
> > > > > I am mainly looking for exactly-once guarantee  wherein the offset
> > > commit
> > >
> > > > >
> > >
> > > > > to consumed topic and produce on destination topic would both
> happen
> > >
> > > > >
> > >
> > > > > atomically or none of them would happen.
> > >
> > > > >
> > >
> > > > >
> > >
> > > > >
> > >
> > > > > In case of failures of service instance, if consumer has consumed,
> > >
> > > > >
> > >
> > > > > processed record and produced

Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Pushkar Deole
Chris,

I am not sure how this solves the problem scenario that we are experiencing
in customer environment: the scenario is:
1. application consumed record and processed it
2. the processed record is produced on destination topic and ack is received
3. Before committing offset back to consumed topic, the application pod
crashed or shut down by kubernetes or shut down due to some other issue

On Fri, Jul 16, 2021 at 8:57 PM Chris Larsen 
wrote:

> It is not possible out of the box, it is something you’ll have to write
> yourself. Would the following work?
>
> Consume -> Produce to primary topic-> get success ack back -> commit the
> consume
>
> Else if ack fails, produce to dead letter, then commit upon success
>
> Else if dead letter ack fails, exit (and thus don’t commit)
>
> Does that help? Someone please feel free to slap my hand but seems legit to
> me ;)
>
> Chris
>
>
>
> On Fri, Jul 16, 2021 at 10:48 Pushkar Deole  wrote:
>
> > Thanks Chris for the response!
> >
> > The current application is quite evolved and currently using
> >
> > consumer-producer model described above and we need to fix some bugs soon
> >
> > for a customer. So, moving to kafka streams seems bigger work. That's why
> >
> > looking at work around if same thing can be achieved with current model
> >
> > using transactions that span across consumer offset commits and producer
> >
> > send.
> >
> >
> >
> > We have made the producer idempotent and turned on transactions.
> >
> > However want to make offset commit to consumer and send from producer to
> be
> >
> > atomic? Is that possible?
> >
> >
> >
> > On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen
>  > >
> >
> > wrote:
> >
> >
> >
> > > Pushkar, in kafka development for customer consumer/producer you handle
> > it.
> >
> > > However you can ensure the process stops (or sends message to dead
> > letter)
> >
> > > before manually committing the consumer offset. On the produce side you
> > can
> >
> > > turn on idempotence or transactions. But unless you are using Streams,
> > you
> >
> > > chain those together yoursef. Would kafka streams work for the
> operation
> >
> > > you’re looking to do?
> >
> > >
> >
> > > Best,
> >
> > > Chris
> >
> > >
> >
> > > On Fri, Jul 16, 2021 at 08:30 Pushkar Deole 
> > wrote:
> >
> > >
> >
> > > > Hi All,
> >
> > > >
> >
> > > >
> >
> > > >
> >
> > > > I am using a normal kafka consumer-producer in my microservice, with
> a
> >
> > > >
> >
> > > > simple model of consume from source topic -> process the record ->
> >
> > > produce
> >
> > > >
> >
> > > > on destination topic.
> >
> > > >
> >
> > > > I am mainly looking for exactly-once guarantee  wherein the offset
> > commit
> >
> > > >
> >
> > > > to consumed topic and produce on destination topic would both happen
> >
> > > >
> >
> > > > atomically or none of them would happen.
> >
> > > >
> >
> > > >
> >
> > > >
> >
> > > > In case of failures of service instance, if consumer has consumed,
> >
> > > >
> >
> > > > processed record and produced on destination topic but offset not yet
> >
> > > >
> >
> > > > committed back to source topic then produce should also not happen on
> >
> > > >
> >
> > > > destination topic.
> >
> > > >
> >
> > > > Is this behavior i.e. exactly-once, across consumers and producers,
> >
> > > >
> >
> > > > possible with transactional support in kafka?
> >
> > > >
> >
> > > > --
> >
> > >
> >
> > >
> >
> > > [image: Confluent] <https://www.confluent.io>
> >
> > > Chris Larsen
> >
> > > Sr Solutions Engineer
> >
> > > +1 847 274 3735 <+1+847+274+3735>
> >
> > > Follow us: [image: Blog]
> >
> > > <
> >
> > >
> >
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
> >
> > > >[image:
> >
> > > Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> >
> > > <https://www.linkedin.com/in/chrislarsen/>
> >
> > >
> >
> > --
>
>
> [image: Confluent] <https://www.confluent.io>
> Chris Larsen
> Sr Solutions Engineer
> +1 847 274 3735 <+1+847+274+3735>
> Follow us: [image: Blog]
> <
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
> >[image:
> Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> <https://www.linkedin.com/in/chrislarsen/>
>


Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Chris Larsen
It is not possible out of the box, it is something you’ll have to write
yourself. Would the following work?

Consume -> Produce to primary topic-> get success ack back -> commit the
consume

Else if ack fails, produce to dead letter, then commit upon success

Else if dead letter ack fails, exit (and thus don’t commit)

Does that help? Someone please feel free to slap my hand but seems legit to
me ;)

Chris



On Fri, Jul 16, 2021 at 10:48 Pushkar Deole  wrote:

> Thanks Chris for the response!
>
> The current application is quite evolved and currently using
>
> consumer-producer model described above and we need to fix some bugs soon
>
> for a customer. So, moving to kafka streams seems bigger work. That's why
>
> looking at work around if same thing can be achieved with current model
>
> using transactions that span across consumer offset commits and producer
>
> send.
>
>
>
> We have made the producer idempotent and turned on transactions.
>
> However want to make offset commit to consumer and send from producer to be
>
> atomic? Is that possible?
>
>
>
> On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen  >
>
> wrote:
>
>
>
> > Pushkar, in kafka development for customer consumer/producer you handle
> it.
>
> > However you can ensure the process stops (or sends message to dead
> letter)
>
> > before manually committing the consumer offset. On the produce side you
> can
>
> > turn on idempotence or transactions. But unless you are using Streams,
> you
>
> > chain those together yoursef. Would kafka streams work for the operation
>
> > you’re looking to do?
>
> >
>
> > Best,
>
> > Chris
>
> >
>
> > On Fri, Jul 16, 2021 at 08:30 Pushkar Deole 
> wrote:
>
> >
>
> > > Hi All,
>
> > >
>
> > >
>
> > >
>
> > > I am using a normal kafka consumer-producer in my microservice, with a
>
> > >
>
> > > simple model of consume from source topic -> process the record ->
>
> > produce
>
> > >
>
> > > on destination topic.
>
> > >
>
> > > I am mainly looking for exactly-once guarantee  wherein the offset
> commit
>
> > >
>
> > > to consumed topic and produce on destination topic would both happen
>
> > >
>
> > > atomically or none of them would happen.
>
> > >
>
> > >
>
> > >
>
> > > In case of failures of service instance, if consumer has consumed,
>
> > >
>
> > > processed record and produced on destination topic but offset not yet
>
> > >
>
> > > committed back to source topic then produce should also not happen on
>
> > >
>
> > > destination topic.
>
> > >
>
> > > Is this behavior i.e. exactly-once, across consumers and producers,
>
> > >
>
> > > possible with transactional support in kafka?
>
> > >
>
> > > --
>
> >
>
> >
>
> > [image: Confluent] <https://www.confluent.io>
>
> > Chris Larsen
>
> > Sr Solutions Engineer
>
> > +1 847 274 3735 <+1+847+274+3735>
>
> > Follow us: [image: Blog]
>
> > <
>
> >
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
>
> > >[image:
>
> > Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
>
> > <https://www.linkedin.com/in/chrislarsen/>
>
> >
>
> --


[image: Confluent] <https://www.confluent.io>
Chris Larsen
Sr Solutions Engineer
+1 847 274 3735 <+1+847+274+3735>
Follow us: [image: Blog]
<https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog>[image:
Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/chrislarsen/>


Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Pushkar Deole
Thanks Chris for the response!
The current application is quite evolved and currently using
consumer-producer model described above and we need to fix some bugs soon
for a customer. So, moving to kafka streams seems bigger work. That's why
looking at work around if same thing can be achieved with current model
using transactions that span across consumer offset commits and producer
send.

We have made the producer idempotent and turned on transactions.
However want to make offset commit to consumer and send from producer to be
atomic? Is that possible?

On Fri, Jul 16, 2021 at 6:18 PM Chris Larsen 
wrote:

> Pushkar, in kafka development for customer consumer/producer you handle it.
> However you can ensure the process stops (or sends message to dead letter)
> before manually committing the consumer offset. On the produce side you can
> turn on idempotence or transactions. But unless you are using Streams, you
> chain those together yoursef. Would kafka streams work for the operation
> you’re looking to do?
>
> Best,
> Chris
>
> On Fri, Jul 16, 2021 at 08:30 Pushkar Deole  wrote:
>
> > Hi All,
> >
> >
> >
> > I am using a normal kafka consumer-producer in my microservice, with a
> >
> > simple model of consume from source topic -> process the record ->
> produce
> >
> > on destination topic.
> >
> > I am mainly looking for exactly-once guarantee  wherein the offset commit
> >
> > to consumed topic and produce on destination topic would both happen
> >
> > atomically or none of them would happen.
> >
> >
> >
> > In case of failures of service instance, if consumer has consumed,
> >
> > processed record and produced on destination topic but offset not yet
> >
> > committed back to source topic then produce should also not happen on
> >
> > destination topic.
> >
> > Is this behavior i.e. exactly-once, across consumers and producers,
> >
> > possible with transactional support in kafka?
> >
> > --
>
>
> [image: Confluent] <https://www.confluent.io>
> Chris Larsen
> Sr Solutions Engineer
> +1 847 274 3735 <+1+847+274+3735>
> Follow us: [image: Blog]
> <
> https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog
> >[image:
> Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> <https://www.linkedin.com/in/chrislarsen/>
>


Re: Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Chris Larsen
Pushkar, in kafka development for customer consumer/producer you handle it.
However you can ensure the process stops (or sends message to dead letter)
before manually committing the consumer offset. On the produce side you can
turn on idempotence or transactions. But unless you are using Streams, you
chain those together yoursef. Would kafka streams work for the operation
you’re looking to do?

Best,
Chris

On Fri, Jul 16, 2021 at 08:30 Pushkar Deole  wrote:

> Hi All,
>
>
>
> I am using a normal kafka consumer-producer in my microservice, with a
>
> simple model of consume from source topic -> process the record -> produce
>
> on destination topic.
>
> I am mainly looking for exactly-once guarantee  wherein the offset commit
>
> to consumed topic and produce on destination topic would both happen
>
> atomically or none of them would happen.
>
>
>
> In case of failures of service instance, if consumer has consumed,
>
> processed record and produced on destination topic but offset not yet
>
> committed back to source topic then produce should also not happen on
>
> destination topic.
>
> Is this behavior i.e. exactly-once, across consumers and producers,
>
> possible with transactional support in kafka?
>
> --


[image: Confluent] <https://www.confluent.io>
Chris Larsen
Sr Solutions Engineer
+1 847 274 3735 <+1+847+274+3735>
Follow us: [image: Blog]
<https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog>[image:
Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/chrislarsen/>


Is exactly-once possible with kafka consumer-producer ?

2021-07-16 Thread Pushkar Deole
Hi All,

I am using a normal kafka consumer-producer in my microservice, with a
simple model of consume from source topic -> process the record -> produce
on destination topic.
I am mainly looking for exactly-once guarantee  wherein the offset commit
to consumed topic and produce on destination topic would both happen
atomically or none of them would happen.

In case of failures of service instance, if consumer has consumed,
processed record and produced on destination topic but offset not yet
committed back to source topic then produce should also not happen on
destination topic.
Is this behavior i.e. exactly-once, across consumers and producers,
possible with transactional support in kafka?


Re: Kafka Consumer Retries Failing

2021-07-13 Thread Rahul Patwari
Apologies ... gmail is adding a link to : - Format error with
copy paste.

The HTTP protocol is NOT present in the logs.
Adding logs without the link here:

org.apache.kafka.common.errors.DisconnectException: null

{"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
clientId=consumer-XX-3, groupId=XX] Error sending fetch request
(sessionId=405798138, epoch=5808) to node 8: {}.","method":"handleError"}

org.apache.kafka.common.errors.TimeoutException: Failed

{"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
clientId=consumer-XX-3, groupId=XX] Group coordinator 100.98
.40.16:9092 (id: 2147483623 rack: null) is unavailable or invalid, will
attempt rediscovery","method":"markCoordinatorUnknown"}

{"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
clientId=consumer-XX-3, groupId=XX] Discovered group coordinator
100.98.40.16:9092 (id: 2147483623 rack: null)","method":"onSuccess"}


On Wed, Jul 14, 2021 at 12:28 AM Ran Lupovich  wrote:

> I would suggest you will check you bootstrap definition and
> server.properties, somehow it looks for http://ip:9092 , kafka is not
> using
> http protocol, seems something not configured correctly
>
> בתאריך יום ג׳, 13 ביולי 2021, 21:46, מאת Rahul Patwari ‏<
> rahulpatwari8...@gmail.com>:
>
> > Hi,
> >
> > We are facing an issue in our application where Kafka Consumer Retries
> are
> > failing whereas a restart of the application is making the Kafka
> Consumers
> > work as expected again.
> >
> > Kafka Server version is 2.5.0 - confluent 5.5.0
> > Kafka Client Version is 2.4.1 -
> >
> >
> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
> > version: 2.4.1","method":""}
> >
> > Occasionally(every 24 hours), we have observed that the Kafka consumption
> > rate went down(NOT 0) and the following logs were observed: Generally,
> the
> > consumption rate across all consumers is 1k records/sec. When this issue
> > occurred, the consumption rate dropped to < 100 records/sec
> >
> > org.apache.kafka.common.errors.DisconnectException: null
> >
> >
> >
> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
> > clientId=consumer-XX-3, groupId=XX] Error sending fetch request
> > (sessionId=405798138, epoch=5808) to node 8: {}.","method":"handleError"}
> >
> > org.apache.kafka.common.errors.TimeoutException: Failed
> >
> >
> >
> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
> > clientId=consumer-XX-3, groupId=XX] Group coordinator
> x.x.x.x:9092
> > <http://100.98.40.16:9092/> (id: 2147483623 rack: null) is unavailable
> or
> > invalid, will attempt rediscovery","method":"markCoordinatorUnknown"}
> >
> >
> >
> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
> > clientId=consumer-XX-3, groupId=XX] Discovered group coordinator
> x
> > .x.x.x:9092 <http://100.98.40.16:9092/> (id: 2147483623 rack:
> > null)","method":"onSuccess"}
> >
> > The consumers retried for more than an hour but the above logs are
> observed
> > again.
> > The consumers started pulling data after a manual restart of the
> > application.
> >
> > No WARN or ERROR logs were observed in Kafka or Zookeeper during this
> > period.
> >
> > Our observation from this incident is that Kafka Consumer retries could
> not
> > resolve the issue but a manual restart of the application does.
> >
> > Has anyone faced this issue before? Any pointers are appreciated.
> >
> > Regards,
> > Rahul
> >
>


Re: Kafka Consumer Retries Failing

2021-07-13 Thread Ran Lupovich
I would suggest you will check you bootstrap definition and
server.properties, somehow it looks for http://ip:9092 , kafka is not using
http protocol, seems something not configured correctly

בתאריך יום ג׳, 13 ביולי 2021, 21:46, מאת Rahul Patwari ‏<
rahulpatwari8...@gmail.com>:

> Hi,
>
> We are facing an issue in our application where Kafka Consumer Retries are
> failing whereas a restart of the application is making the Kafka Consumers
> work as expected again.
>
> Kafka Server version is 2.5.0 - confluent 5.5.0
> Kafka Client Version is 2.4.1 -
>
> {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
> version: 2.4.1","method":""}
>
> Occasionally(every 24 hours), we have observed that the Kafka consumption
> rate went down(NOT 0) and the following logs were observed: Generally, the
> consumption rate across all consumers is 1k records/sec. When this issue
> occurred, the consumption rate dropped to < 100 records/sec
>
> org.apache.kafka.common.errors.DisconnectException: null
>
>
> {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
> clientId=consumer-XX-3, groupId=XX] Error sending fetch request
> (sessionId=405798138, epoch=5808) to node 8: {}.","method":"handleError"}
>
> org.apache.kafka.common.errors.TimeoutException: Failed
>
>
> {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
> clientId=consumer-XX-3, groupId=XX] Group coordinator x.x.x.x:9092
> <http://100.98.40.16:9092/> (id: 2147483623 rack: null) is unavailable or
> invalid, will attempt rediscovery","method":"markCoordinatorUnknown"}
>
>
> {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
> clientId=consumer-XX-3, groupId=XX] Discovered group coordinator x
> .x.x.x:9092 <http://100.98.40.16:9092/> (id: 2147483623 rack:
> null)","method":"onSuccess"}
>
> The consumers retried for more than an hour but the above logs are observed
> again.
> The consumers started pulling data after a manual restart of the
> application.
>
> No WARN or ERROR logs were observed in Kafka or Zookeeper during this
> period.
>
> Our observation from this incident is that Kafka Consumer retries could not
> resolve the issue but a manual restart of the application does.
>
> Has anyone faced this issue before? Any pointers are appreciated.
>
> Regards,
> Rahul
>


Kafka Consumer Retries Failing

2021-07-13 Thread Rahul Patwari
Hi,

We are facing an issue in our application where Kafka Consumer Retries are
failing whereas a restart of the application is making the Kafka Consumers
work as expected again.

Kafka Server version is 2.5.0 - confluent 5.5.0
Kafka Client Version is 2.4.1 -
{"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka
version: 2.4.1","method":""}

Occasionally(every 24 hours), we have observed that the Kafka consumption
rate went down(NOT 0) and the following logs were observed: Generally, the
consumption rate across all consumers is 1k records/sec. When this issue
occurred, the consumption rate dropped to < 100 records/sec

org.apache.kafka.common.errors.DisconnectException: null

{"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
clientId=consumer-XX-3, groupId=XX] Error sending fetch request
(sessionId=405798138, epoch=5808) to node 8: {}.","method":"handleError"}

org.apache.kafka.common.errors.TimeoutException: Failed

{"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
clientId=consumer-XX-3, groupId=XX] Group coordinator x.x.x.x:9092
<http://100.98.40.16:9092/> (id: 2147483623 rack: null) is unavailable or
invalid, will attempt rediscovery","method":"markCoordinatorUnknown"}

{"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer
clientId=consumer-XX-3, groupId=XX] Discovered group coordinator x
.x.x.x:9092 <http://100.98.40.16:9092/> (id: 2147483623 rack:
null)","method":"onSuccess"}

The consumers retried for more than an hour but the above logs are observed
again.
The consumers started pulling data after a manual restart of the
application.

No WARN or ERROR logs were observed in Kafka or Zookeeper during this
period.

Our observation from this incident is that Kafka Consumer retries could not
resolve the issue but a manual restart of the application does.

Has anyone faced this issue before? Any pointers are appreciated.

Regards,
Rahul


Kafka consumer : Maximum consumer rate.

2021-06-16 Thread Mazen Ezzeddine
I am interested in learning/deducing the maximum consumption rate of a Kafka 
consumer in my consumer group. Maximum consumption rate is the rate at which 
the consumer can not keep up with the message arrival rate, and hence the 
consumer will fall farther and farther behind and the message lag would 
accumulate.

On the consumer side, I can compute the maximum consumption rate as 1/(message 
waiting time), where message waiting time is the time required to pull the 
message from the broker and process it by the consumer in the poll loop.

Would that strategy return an ACCURATE measure of the maximum consumption rate 
of a consumer. Any other suggestion for the computation of the maximum 
consumption rate using the consumer API?

On the other hand, say I have a controller process with a Kafka admin client 
API, would the below logic return accurately the maximum consumption rate (any 
unseen boundary cases). In the logic, At and Rt are the arrival and consumer 
rate at time t, and at time t-1 for At-1 and Rt-1

if (At > At-1 && Rt = Rt-1) then Rt is the maximum consumption rate.


Any suggestion on the computation of the maximum consumption rate using the 
admin client API and not the consumer API?


Thank you.



MAXIMUM consumption rate of a kafka consumer.

2021-06-06 Thread Mazen Ezzeddine
Dear all,

I am experimenting with an increasing (in terms of msgs/sec) Kafka workload, 
where I have continuously access to the following two metrics: consumption rate 
per sec CRSEC and  arrival rate per sec ARSEC. From the following two metrics 
that are continuously monitored, I want to deduce/compute the maximum 
consumption rate per sec MCRSEC.  MCRSEC is defined as the consumption rate 
value that saturates the consumer, and hence the consumer can not keep up with 
the with the increase in the arrival rate.

Is it safe to assume the maximum consumption rate per sec MCRSEC happens when 
ARSEC at time t is greater than ARSEC at time t-1, while CRSEC at time t is not 
greater than CRSEC at time t-1? Otherwise said, ARSEC@t > ARSEC@t-1 && CRSEC@t 
= (not >) CRSEC@t-1?

Any role for the consumer  lag in the computation of MCRSEC?

Thank you.



R: kafka-consumer-groups option

2021-05-06 Thread Rovere Lorenzo
Ok I understard why I should reset the offset, but why would I want to reset 
to the current one? I mean, am I not always at the current offset by definition?
I don't know if I am missing something, but if I describe a consumer group and 
I see 

TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
test_topic  1   52  55  
3 
test_topic  059 71  
12  

and now I want to reset --to-current, I obtain:

TOPIC  PARTITION  NEW-OFFSET
test_topic 1 52
test_topic 0 59

Apparently nothing changed, are there any benefits in resetting --to-current?
I hope everything is clear.
Lorenzo




Lorenzo Rovere 

Technology Reply
Via Avogadri, 2
31057 - Silea (TV) - ITALY 
phone: +39 0422 1836521
l.rov...@reply.it
www.reply.it
-Messaggio originale-
Da: mangat rai  
Inviato: 6 May, 2021 11:51 AM
A: users@kafka.apache.org
Oggetto: Re: kafka-consumer-groups option

Hey Lorenzo Rovere,

Consider the case where you want to reprocess all the data. Let's say your 
process had a bug. You fixed it and now you want to reprocess everything to 
produce the correct output.

Similarly, there can be other use cases for resetting the consumer offsets and 
reprocessing the input.

Regards,
Mangat

On Thu, May 6, 2021 at 11:25 AM Rovere Lorenzo  wrote:

> Hi,
>
> I’m playing with the kafka-consumer-groups.sh command.
>
> I wanted to ask the utility of the *--to-current* option used to 
> *reset offsets of a consumer group to current offset*. The thing I 
> don’t understand is in which scenario I would want to use this option. 
> If I’m already at the current offset, why would I need to reset to the 
> offset I’m already at?
>
> Thanks in advance
>
>
>
> Lorenzo Rovere
>
>
>
>
>
> Lorenzo Rovere
>
> Technology Reply
> Via Avogadri, 2
> 31057 - Silea (TV) - ITALY
> phone: +39 0422 1836521
> l.rov...@reply.it
> www.reply.it
>
> [image: Technology Reply]
>


Re: kafka-consumer-groups option

2021-05-06 Thread mangat rai
Hey Lorenzo Rovere,

Consider the case where you want to reprocess all the data. Let's say your
process had a bug. You fixed it and now you want to reprocess everything to
produce the correct output.

Similarly, there can be other use cases for resetting the consumer offsets
and reprocessing the input.

Regards,
Mangat

On Thu, May 6, 2021 at 11:25 AM Rovere Lorenzo  wrote:

> Hi,
>
> I’m playing with the kafka-consumer-groups.sh command.
>
> I wanted to ask the utility of the *--to-current* option used to *reset
> offsets of a consumer group to current offset*. The thing I don’t
> understand is in which scenario I would want to use this option. If I’m
> already at the current offset, why would I need to reset to the offset I’m
> already at?
>
> Thanks in advance
>
>
>
> Lorenzo Rovere
>
>
>
>
>
> Lorenzo Rovere
>
> Technology Reply
> Via Avogadri, 2
> 31057 - Silea (TV) - ITALY
> phone: +39 0422 1836521
> l.rov...@reply.it
> www.reply.it
>
> [image: Technology Reply]
>


kafka-consumer-groups option

2021-05-06 Thread Rovere Lorenzo
Hi,
I'm playing with the kafka-consumer-groups.sh command.
I wanted to ask the utility of the --to-current option used to reset offsets of 
a consumer group to current offset. The thing I don't understand is in which 
scenario I would want to use this option. If I'm already at the current offset, 
why would I need to reset to the offset I'm already at?
Thanks in advance

Lorenzo Rovere





Lorenzo Rovere

Technology Reply
Via Avogadri, 2
31057 - Silea (TV) - ITALY
phone: +39 0422 1836521
l.rov...@reply.it<mailto:l.rov...@reply.it>
www.reply.it<http://www.reply.it> 

[Technology Reply]


Re: Kafka Consumer Consumption based on TimeStamp-based position

2021-01-23 Thread M. Manna
Thanks. Just realised that it was in the API since 0.11.0. Thanks Steve.

On Sat, 23 Jan 2021 at 12:42, Steve Howard 
wrote:

> Hi,
>
> Yes, you can use the offsetsForTimes() method.  See below for a simple
> example that should get you started...
>
> import org.apache.kafka.clients.consumer.*;
> import org.apache.kafka.common.config.ConfigException;
> import org.apache.kafka.common.*;
> import java.io.*;
> import java.time.Duration;
> import java.util.*;
> import java.text.*;
>
> public class searchByTime {
>   static KafkaConsumer c;
>
>   public static void main(String args[]) throws Exception {
> Properties props = new Properties();
> props.put("bootstrap.servers","localhost:9092");
> props.put("max.poll.records",1);
> props.put("topic","yourtopicname");
> props.put("group.id",UUID.randomUUID().toString());
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> c = new KafkaConsumer(props);
> String topic = (String)props.get("topic");
> c.subscribe(Collections.singletonList(topic));
> System.out.println("subscribed to topic " + topic);
> System.out.println(c.partitionsFor(topic));
> List partitions = new ArrayList();
> for (PartitionInfo p: c.partitionsFor(topic)) {
>   partitions.add(new TopicPartition(topic,p.partition()));
> }
> System.out.println(partitions);
>
> long timestamp = Long.parseLong(args[0]);
> Map partitionOffsetsRequest = new
> HashMap<>(partitions.size());
> for (TopicPartition partition : partitions) {
>   partitionOffsetsRequest.put(new TopicPartition(partition.topic(),
> partition.partition()),
>   timestamp);
> }
>
> final Map result = new
> HashMap<>(partitions.size());
>
> for (Map.Entry partitionToOffset :
>   c.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
> result.put(new TopicPartition(partitionToOffset.getKey().topic(),
> partitionToOffset.getKey().partition()),
>  (partitionToOffset.getValue() == null)
> ? null : partitionToOffset.getValue().offset());
>   }
>
> System.out.println(result);
> ConsumerRecords records =
> c.poll(Duration.ofSeconds(1));
> for (TopicPartition part: result.keySet()){
>   long offset = result.get(part);
>   c.seek(part,offset);
> }
>
> System.out.println("trying to get records...");
> records = c.poll(Duration.ofSeconds(1));
> for (ConsumerRecord record : records) {
>   Date date = new Date(record.timestamp());
>   DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
>   formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
>   String dateFormatted = formatter.format(date);
>   System.out.println("Received message: (" + record.key() + ", " +
> record.value() + ") at offset " + record.offset() + " at time " +
> dateFormatted);
> }
>   }
> }
>
> Thanks,
>
> Steve
>
>
> On Sat, Jan 23, 2021 at 6:14 AM M. Manna  wrote:
>
> > Hello,
> >
> > We know that using KafkaConsumer api we can replay messages from certain
> > offsets. However, we are not sure if we could specify timeStamp from
> which
> > we could replay messages again.
> >
> > Does anyone know if this is possible?
> >
> > Regards,
> >
>


Re: Kafka Consumer Consumption based on TimeStamp-based position

2021-01-23 Thread Steve Howard
Hi,

Yes, you can use the offsetsForTimes() method.  See below for a simple
example that should get you started...

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.*;
import java.io.*;
import java.time.Duration;
import java.util.*;
import java.text.*;

public class searchByTime {
  static KafkaConsumer c;

  public static void main(String args[]) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("max.poll.records",1);
props.put("topic","yourtopicname");
props.put("group.id",UUID.randomUUID().toString());
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

c = new KafkaConsumer(props);
String topic = (String)props.get("topic");
c.subscribe(Collections.singletonList(topic));
System.out.println("subscribed to topic " + topic);
System.out.println(c.partitionsFor(topic));
List partitions = new ArrayList();
for (PartitionInfo p: c.partitionsFor(topic)) {
  partitions.add(new TopicPartition(topic,p.partition()));
}
System.out.println(partitions);

long timestamp = Long.parseLong(args[0]);
Map partitionOffsetsRequest = new
HashMap<>(partitions.size());
for (TopicPartition partition : partitions) {
  partitionOffsetsRequest.put(new TopicPartition(partition.topic(),
partition.partition()),
  timestamp);
}

final Map result = new
HashMap<>(partitions.size());

for (Map.Entry partitionToOffset :
  c.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
result.put(new TopicPartition(partitionToOffset.getKey().topic(),
partitionToOffset.getKey().partition()),
 (partitionToOffset.getValue() == null)
? null : partitionToOffset.getValue().offset());
  }

System.out.println(result);
ConsumerRecords records = c.poll(Duration.ofSeconds(1));
for (TopicPartition part: result.keySet()){
  long offset = result.get(part);
  c.seek(part,offset);
}

System.out.println("trying to get records...");
records = c.poll(Duration.ofSeconds(1));
for (ConsumerRecord record : records) {
  Date date = new Date(record.timestamp());
  DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
  formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
  String dateFormatted = formatter.format(date);
  System.out.println("Received message: (" + record.key() + ", " +
record.value() + ") at offset " + record.offset() + " at time " +
dateFormatted);
}
  }
}

Thanks,

Steve


On Sat, Jan 23, 2021 at 6:14 AM M. Manna  wrote:

> Hello,
>
> We know that using KafkaConsumer api we can replay messages from certain
> offsets. However, we are not sure if we could specify timeStamp from which
> we could replay messages again.
>
> Does anyone know if this is possible?
>
> Regards,
>


Kafka Consumer Consumption based on TimeStamp-based position

2021-01-23 Thread M. Manna
Hello,

We know that using KafkaConsumer api we can replay messages from certain
offsets. However, we are not sure if we could specify timeStamp from which
we could replay messages again.

Does anyone know if this is possible?

Regards,


Kafka consumer and producer JMX metrics

2020-10-16 Thread Jose Manuel Vega Monroy
Hi there,

We are switching our monitoring tool, and dealing with JMX metrics, notice we 
were using for consumer and producer:

JMX|kafka.server|Fetch:queue-size,\
JMX|kafka.server|Fetch|*:byte-rate,\
JMX|kafka.server|Fetch|*:throttle-time,\
JMX|kafka.server|Produce:queue-size,\
JMX|kafka.server|Produce|*:byte-rate,\
JMX|kafka.server|Produce|*:throttle-time,\

However, can find documentation pointing to kafka.producer and kafka.consumer 
mbeans, and looking our brokers are not exposing those ones somehow.

Please could you point why and how we could expose?

Using version 2.2.1

Thanks

[https://www.williamhillplc.com/content/signature/WHlogo.gif?width=180]
[https://www.williamhillplc.com/content/signature/senet.gif?width=180]
Jose Manuel Vega Monroy
Java Developer / Software Developer Engineer in Test
Direct: +0035 0 2008038 (Ext. 8038)
Email: jose.mon...@williamhill.com
William Hill | 6/1 Waterport Place | Gibraltar | GX11 1AA



Confidentiality: The contents of this e-mail and any attachments transmitted 
with it are intended to be confidential to the intended recipient; and may be 
privileged or otherwise protected from disclosure. If you are not an intended 
recipient of this e-mail, do not duplicate or redistribute it by any means. 
Please delete it and any attachments and notify the sender that you have 
received it in error. This e-mail is sent by a William Hill PLC group company. 
The William Hill group companies include, among others, William Hill PLC 
(registered number 4212563), William Hill Organization Limited (registered 
number 278208), William Hill US HoldCo Inc, WHG (International) Limited 
(registered number 99191) and Mr Green Limited (registered number C43260). Each 
of William Hill PLC and William Hill Organization Limited is registered in 
England and Wales and has its registered office at 1 Bedford Avenue, London, 
WC1B 3AU, UK. William Hill U.S. HoldCo, Inc. is registered in Delaware and has 
its registered office at 1007 N. Orange Street, 9 Floor, Wilmington, New Castle 
County DE 19801 Delaware, United States of America. WHG (International) Limited 
is registered in Gibraltar and has its registered office at 6/1 Waterport 
Place, Gibraltar. Mr Green Limited is registered in Malta and has its 
registered office at Tagliaferro Business Centre, Level 7, 14 High Street, 
Sliema SLM 1549, Malta. Unless specifically indicated otherwise, the contents 
of this e-mail are subject to contract; and are not an official statement, and 
do not necessarily represent the views, of William Hill PLC, its subsidiaries 
or affiliated companies. Please note that neither William Hill PLC, nor its 
subsidiaries and affiliated companies can accept any responsibility for any 
viruses contained within this e-mail and it is your responsibility to scan any 
emails and their attachments. William Hill PLC, its subsidiaries and affiliated 
companies may monitor e-mail traffic data and also the content of e-mails for 
effective operation of the e-mail system, or for security, purposes.


Kafka consumer coming without partition assignment

2020-09-30 Thread Sanjay Tiwari
Hi All,

I am using Kafka version 2.4.0. I have 8 Kafka broker. I am having a topic
with 80 partition and 80 java based consumer.

Sometime I find very stange behaviour on consumer restart i.e. consumer
comes up with no partitions assigned to them.. in log messages appear like
"adding newly assigned partitions : " i.e. empty...

I also verified from kafka-consumer-group script to check member and state
and its showing member as 80 and state as stable. But with verbose option
it is showing assignment as - .

As I am not having any error on the broker logs and consumer logs. Could
not understand what's happening here and under what circumstance this
possible.

Any help would be appreciated.

Thanks and Regards
Sanjay


Re: kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Ricardo Ferreira

Pushkar,

You are not wrong. Indeed whatever deserialization errors that happens 
during the poll() method will cause your code to be interrupted without 
much information about which offset failed. A workaround would be trying 
to parse the message contained in the exception SerializationException 
and try to recover. But this is too pushy.


Taking a more closer look in the stack trace that you shared, it seems 
that the real problem might be connectivity with Schema Registry. Hence 
why the last mile of your exception says that there is a 'Connection 
Refused' in place.


```

Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
Source)
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.base/java.net.Socket.connect(Unknown Source)
at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
Source)
at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) 
*<- [5] Here, way after everything it tries to connect to 
the service but fails.*

at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211) 
*<- [4] This is the part where the AvroDeserializer tries to 
contact Schema Registry to fetch the Schema*

at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) 
*<- [3] So far so good. No major deserialization errors*

at
io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) 
*<- [2] Up to this point the record is read*

at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
*<- [1] Things start here*


```

Thanks,

-- Ricardo

On 6/18/20 10:08 AM, Pushkar Deole wrote:

Hi Ricardo,

Probably this is more complicated than that since the exception has 
occurred during Consumer.poll itself, so there is no ConsumerRecord 
for the application to process and hence the application doesn't know 
the offset of record where the poll has failed.


On Thu, Jun 18, 2020 at 7:03 PM Ricardo Ferreira 
mailto:rifer...@riferrei.com>> wrote:


Pushkar,

Kafka uses the concept of offsets to identify the order of each
record within the log. But this concept is more powerful than it
looks like. Committed offsets are also used to keep track of which

Re: kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Pushkar Deole
Hi Ricardo,

Probably this is more complicated than that since the exception has
occurred during Consumer.poll itself, so there is no ConsumerRecord for the
application to process and hence the application doesn't know the offset of
record where the poll has failed.

On Thu, Jun 18, 2020 at 7:03 PM Ricardo Ferreira 
wrote:

> Pushkar,
>
> Kafka uses the concept of offsets to identify the order of each record
> within the log. But this concept is more powerful than it looks like.
> Committed offsets are also used to keep track of which records has been
> successfully read and which ones are not. When you commit a offset in the
> consumer; a message is sent to Kafka that in turn register this commit into
> a internal topic called `__committed_offsets`.
>
> Point being: you can elegantly solve this problem by handling properly the
> exception in your code but only committing the offset if the record was
> deemed fully read -- which means being able to deserialize the record with
> no exceptions thrown. In order to do this, you will need to disable auto
> commit and manually commit the offsets either in a per-batch basis or in a
> per-record basis.
>
> Non-committed offsets will be picked up by the same or another thread from
> the consumer group. This is the part where *Gerbrand's* suggestion might
> take place. You might want to have another stream processor specifically
> handling those outliers and sending them out to a DLQ topic for manual
> reprocessing purposes.
>
> Thanks,
>
> -- Ricardo
> On 6/18/20 7:45 AM, Pushkar Deole wrote:
>
> Hi Gerbrand,
>
> thanks for the update, however if i dig more into it, the issue is because
> of schema registry issue and the schema registry not accessible. So the
> error is coming during poll operation itself:
> So this is a not a bad event really but the event can't be deserialized
> itself due to schema not available. Even if this record is skipped, the
> next record will meet the same error.
>
> Exception in thread "Thread-9"
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition tenant.avro-2 at offset 1. If needed, please seek
> past the record to continue consumption.
> Caused by: org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id 93
> Caused by: java.net.ConnectException: Connection refused (Connection
> refused)
> at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
> at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
> at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
> Source)
> at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
> at java.base/java.net.Socket.connect(Unknown Source)
> at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
> Source)
> at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
> Source)
> at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211)
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
> at
> io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
> at
> org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
> at
> 

Re: kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Ricardo Ferreira

Pushkar,

Kafka uses the concept of offsets to identify the order of each record 
within the log. But this concept is more powerful than it looks like. 
Committed offsets are also used to keep track of which records has been 
successfully read and which ones are not. When you commit a offset in 
the consumer; a message is sent to Kafka that in turn register this 
commit into a internal topic called `__committed_offsets`.


Point being: you can elegantly solve this problem by handling properly 
the exception in your code but only committing the offset if the record 
was deemed fully read -- which means being able to deserialize the 
record with no exceptions thrown. In order to do this, you will need to 
disable auto commit and manually commit the offsets either in a 
per-batch basis or in a per-record basis.


Non-committed offsets will be picked up by the same or another thread 
from the consumer group. This is the part where *Gerbrand's* suggestion 
might take place. You might want to have another stream processor 
specifically handling those outliers and sending them out to a DLQ topic 
for manual reprocessing purposes.


Thanks,

-- Ricardo

On 6/18/20 7:45 AM, Pushkar Deole wrote:

Hi Gerbrand,

thanks for the update, however if i dig more into it, the issue is because
of schema registry issue and the schema registry not accessible. So the
error is coming during poll operation itself:
So this is a not a bad event really but the event can't be deserialized
itself due to schema not available. Even if this record is skipped, the
next record will meet the same error.

Exception in thread "Thread-9"
org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition tenant.avro-2 at offset 1. If needed, please seek
past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 93
Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
Source)
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.base/java.net.Socket.connect(Unknown Source)
at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
Source)
at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
at
io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at

Re: kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Pushkar Deole
Hi Gerbrand,

thanks for the update, however if i dig more into it, the issue is because
of schema registry issue and the schema registry not accessible. So the
error is coming during poll operation itself:
So this is a not a bad event really but the event can't be deserialized
itself due to schema not available. Even if this record is skipped, the
next record will meet the same error.

Exception in thread "Thread-9"
org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition tenant.avro-2 at offset 1. If needed, please seek
past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 93
Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
Source)
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.base/java.net.Socket.connect(Unknown Source)
at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
Source)
at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
at
io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at
com.avaya.analytics.dsi.DsiConsumer.runAdminConsumer(DsiConsumer.java:797)
at java.base/java.lang.Thread.run(Unknown Source)

On Thu, Jun 18, 2020 at 3:17 PM Gerbrand van Dieijen 
wrote:

> Hello Pushkar,
>
>
> I'd split records/events in categories based on the error:
> - Events that can be parsed or otherwise handled correctly, e.g. good
> events
> - Fatal error, like parsing error, empty or incorrect values, etc., e.g.
> bad events
> - Non-fatal, like database-connection failure, io-failure, out of memory,
> and others
>   that could be retried
>
> Best to avoid doing something blocking while handling the error, so create
> a separate stream for each. That way 'good' events don't have to wait for
> the handling of 'bad' events.
>
> Any fatal can events you could store in a separate topic, or send to some
> monitoring/logging system. As a simple start you could sent the erroneous
> events to a 

Re: kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Gerbrand van Dieijen
Hello Pushkar,


I'd split records/events in categories based on the error:
- Events that can be parsed or otherwise handled correctly, e.g. good events
- Fatal error, like parsing error, empty or incorrect values, etc., e.g. bad 
events
- Non-fatal, like database-connection failure, io-failure, out of memory, and 
others
  that could be retried

Best to avoid doing something blocking while handling the error, so create a 
separate stream for each. That way 'good' events don't have to wait for the 
handling of 'bad' events.

Any fatal can events you could store in a separate topic, or send to some 
monitoring/logging system. As a simple start you could sent the erroneous 
events to a separate topic named something like 'errorevents'. 
Any non-fatal errors could be retried. Last time I used Akka for that 
(https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html) but afaik 
KStreams has mechanism for that as well.  You could also store records that you 
want to retry into a separate topic 'retry'.
Do not store records that that you want to retry back into the original topic! 
If you do that you're have a great risk that overload you're whole 
kafka-cluster.

Op 18-06-2020 09:55 heeft Pushkar Deole  geschreven:

Hi All,

This is what I am observing: we have a consumer which polls data from
topic, does the processing, again polls data which keeps happening
continuously.
At one time, there was some bad data on the topic which could not be
consumed by consumer, probably because it couldn't deserialize the event
due to incompatible avro schema or something similar,
and consumer got error deserializing event. Since the exception wasn't
handled, it crashed the consumer thread which then stopped consuming data.

The question here is how these kind of scenarios can be handled:
1. Even if I catch the exception and log it, the consumer will i think
process the next event. So the bad event will be lost
2. When consumer goes for another poll, it would commit offsets of previous
poll which includes bad event, So the event will be lost

How can this scenario be handled in best possible way?


kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Pushkar Deole
Hi All,

This is what I am observing: we have a consumer which polls data from
topic, does the processing, again polls data which keeps happening
continuously.
At one time, there was some bad data on the topic which could not be
consumed by consumer, probably because it couldn't deserialize the event
due to incompatible avro schema or something similar,
and consumer got error deserializing event. Since the exception wasn't
handled, it crashed the consumer thread which then stopped consuming data.

The question here is how these kind of scenarios can be handled:
1. Even if I catch the exception and log it, the consumer will i think
process the next event. So the bad event will be lost
2. When consumer goes for another poll, it would commit offsets of previous
poll which includes bad event, So the event will be lost

How can this scenario be handled in best possible way?


Re: Kafka consumer is taking time to recognize new partition

2020-05-20 Thread Liam Clarke-Hutchinson
Hi,

You want metadata.max.age.ms which, as you noticed, defaults to 5 minutes
:)

https://kafka.apache.org/documentation/#metadata.max.age.ms

Cheers,

Liam Clarke-Hutchinson

On Thu, May 21, 2020 at 1:06 PM Kafka Shil  wrote:

> I was running a test where kafka consumer was reading data from multiple
> partitions of a topic. While the process was running I added more
> partitions. It took around 5 minutes for consumer thread to read data from
> the new partition. I have found this configuration "
> topic.metadata.refresh.interval.ms", but this is for producer only. Is
> there a similar config for consumer too?
>


Kafka consumer is taking time to recognize new partition

2020-05-20 Thread Kafka Shil
I was running a test where kafka consumer was reading data from multiple
partitions of a topic. While the process was running I added more
partitions. It took around 5 minutes for consumer thread to read data from
the new partition. I have found this configuration "
topic.metadata.refresh.interval.ms", but this is for producer only. Is
there a similar config for consumer too?


hangup of kafka consumer in case of unclean leader election

2020-05-13 Thread Dmitry Sorokin
It seems, that we discovered a bug:
In case if unclean leader election happened, KafkaConsumer may hang up
indefinitely

Full version

According to documentation, in case if `auto.offset.reset` is set
to none or not set, the exception is thrown to a client code, allowing to
handle it in a way that client want.
In case if one will take a closer look on this mechanism, it will turn out
that it is not working.

Starting from kafka 2.3 new offset reset negotiation algorithm added
(org.apache.kafka.clients.consumer.internals.Fetcher#validateOffsetsAsync)
During this validation,
Fetcher `org.apache.kafka.clients.consumer.internals.SubscriptionState` is
held in `AWAIT_VALIDATION` fetch state.
This effectively means that fetch requests are not issued and consumption
stopped.
In case if unclean leader election is happening during this time,
`LogTruncationException` is thrown from future listener in method
`validateOffsetsAsync`.
The main problem is that this exception (thrown from listener of future) is
effectively swallowed
by `org.apache.kafka.clients.consumer.internals.AsyncClient#sendAsyncRequest`
by this part of code
```
} catch (RuntimeException e) {
  if (!future.isDone()) {
future.raise(e);
  }
}
```

In the end the result is: The only way to get out of AWAIT_VALIDATION and
continue consumption is to successfully finish validation, but it can not
be finished.
However - consumer is alive, but is consuming nothing. The only way to
resume consumption is to terminate consumer and start another one.

We discovered this situation by means of kstreams application, where valid
value of `auto.offset.reset` provided by our code is replaced
by `None` value for a purpose of position reset
(org.apache.kafka.streams.processor.internals.StreamThread#create).
And with kstreams it is even worse, as application may be working, logging
warn messages of format `Truncation detected for partition ...,` but data
is not generated for a long time and in the end is lost, making kstreams
application unreliable.

*Did someone saw it already, maybe there are some ways to reconfigure this
behavior?*
-- 
Dmitry Sorokin
mailto://dmitry.soro...@gmail.com


Re: Kafka consumer

2020-05-07 Thread vishnu murali
Thanks Chris

But it won't work,I tried that also.

I found solution

That @KafkaListener default behavior it self is to  take one by one data
only..





On Thu, May 7, 2020, 11:28 Chris Toomey  wrote:

> You can set the max.poll.records config. setting to 1 in order to pull down
> and process 1 record at a time.
>
> See https://kafka.apache.org/documentation/#consumerconfigs .
>
> On Mon, May 4, 2020 at 1:04 AM vishnu murali 
> wrote:
>
> > Hey Guys,
> >
> > I am having a topic and in that topic I am having 3000 messages
> >
> > In my springboot application I want to consume the data using
> > @KafkaListener()  and also one by one because  I need to do some tedious
> > process on that Data it may take some time
> >
> > So within this time  I don't need to consume another data.
> >
> > After the process is finished only I need to consume the data from the
> > topic .?
> >
> > How can I do this?
> >
> > Any ideas?
> >
>


Re: Kafka consumer

2020-05-06 Thread Chris Toomey
You can set the max.poll.records config. setting to 1 in order to pull down
and process 1 record at a time.

See https://kafka.apache.org/documentation/#consumerconfigs .

On Mon, May 4, 2020 at 1:04 AM vishnu murali 
wrote:

> Hey Guys,
>
> I am having a topic and in that topic I am having 3000 messages
>
> In my springboot application I want to consume the data using
> @KafkaListener()  and also one by one because  I need to do some tedious
> process on that Data it may take some time
>
> So within this time  I don't need to consume another data.
>
> After the process is finished only I need to consume the data from the
> topic .?
>
> How can I do this?
>
> Any ideas?
>


Re: thank you ! which java-client api can has same effect about kafka-consumer-groups.sh command ?

2020-04-22 Thread Liam Clarke-Hutchinson
Ah, with you now. You'll also need to use the results of
AdminClient.listOffsets which takes TopicPartition objects as an argument.

On Wed, Apr 22, 2020 at 7:43 PM 一直以来 <279377...@qq.com> wrote:

> i use :
> private static void printConsumerGroupOffsets() throws
> InterruptedException, ExecutionException {
> Properties props = new Properties();
> props.setProperty("bootstrap.servers",
> "192.168.1.100:9081,192.168.1.100:9082,
> 192.168.1.100:9083,,192.168.1.100:9087,,192.168.1.100:9088");
> AdminClient client = KafkaAdminClient.create(props);
>
>
> ListConsumerGroupOffsetsResult listConsumerGroupOffsets =
> client.listConsumerGroupOffsets("test");
> KafkaFuture partitionsToOffsetAndMetadata = listConsumerGroupOffsets
> .partitionsToOffsetAndMetadata();
> Map partitionsToOffsetAndMetadata.get();
> Iterator map.keySet().iterator();
> while (iterator.hasNext()) {
> TopicPartition key = iterator.next();
> OffsetAndMetadata value = map.get(key);
> System.out.println(key.toString() + " " +
> value.toString());
> }
> }
>
> but i not find PARTITION,CURRENT-OFFSET,LOG-END-OFFSET
> Corresponding java method
>
>
> --原始邮件--
> 发件人:"Liam Clarke-Hutchinson" 发送时间:2020年4月22日(星期三) 下午3:35
> 收件人:"users"
> 主题:Re: thank you ! which java-client api can has same effect about
> kafka-consumer-groups.sh command ?
>
>
>
> Looking at the source code, try listConsumerGroupOffsets(String
> groupId, ListConsumerGroupOffsetsOptions options) instead?
>
> On Wed, Apr 22, 2020 at 6:40 PM 一直以来 <279377...@qq.com wrote:
>
>  ./kafka-consumer-groups.sh --bootstrap-server localhost:9081
> --describe
>  --group test
> 
> 
>  use describeConsumerGroups method ??
> 
> 
>  private static void
> print() throws InterruptedException,
>  ExecutionException {
> 
> Properties props = new Properties();
> 
> props.setProperty("bootstrap.servers",
> 
> "192.168.1.100:9081,192.168.1.100:9082,
>  192.168.1.100:9083,,192.168.1.100:9087,,192.168.1.100:9088");
> 
> AdminClient client = KafkaAdminClient.create(props);
> 
> DescribeConsumerGroupsResult describeConsumerGroups =
>  client.describeConsumerGroups(Arrays.asList("test"));
> 
> Map  describedGroups = describeConsumerGroups.describedGroups();
> 
> Iterator  describedGroups.keySet().iterator();
> 
> while (iterator.hasNext()) {
> 
> String key = iterator.next();
> 
> KafkaFuture  describedGroups.get(key);
> 
> ConsumerGroupDescription consumerGroupDescription
>  = value.get();
> 
> Collection  consumerGroupDescription.members();
> 
> }
> 
> 
>  }
> 
> 
> 
>  but i can't find about any method about bottom column:
>  GROUPnbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp;TOPICnbsp; nbsp; nbsp;
>  nbsp; nbsp; nbsp;PARTITIONnbsp;
> CURRENT-OFFSETnbsp;
>  LOG-END-OFFSETnbsp; LAGnbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp;
>  nbsp;CONSUMER-IDnbsp; nbsp; nbsp;HOSTnbsp;
> nbsp; nbsp; nbsp; nbsp;
>  nbsp; CLIENT-ID
> 
> 
> 
>  By its three info :
>  PARTITIONnbsp; CURRENT-OFFSETnbsp; LOG-END-OFFSETnbsp;


Re: kafka-consumer-groups.sh CURRENT-OFFSET column show "-" , what mean about it ! thank you !

2020-04-22 Thread Liam Clarke-Hutchinson
Yep, 0 is a non-null value, that is, the consumer group has a committed
offset, which in this case happens to be 0. The dash "-" implies an
unknown/null value.

In your example with your test topics - the earliest offset in the topic is
still 0, and there's not been much data through it, so maybe it makes sense
as a default value?

But look at the earliest offset on one of our topics, which has a retention
time of 7 days (as Kafka offsets are absolute for a topic, never relative):

# the -2 arg to time is shorthand for "earliest", -1 is "latest"
kafka-run-class kafka.tools.GetOffsetShell --broker-list kafka:9092 --topic
trx_reporting --time -2
trx_reporting:0:8681020918
trx_reporting:1:8675438549
trx_reporting:2:8681184674

  If I saw a consumer group that listed an offset of 8681020918 on
partition 0 of topic trx_reporting, I would think it had previously been
consuming that topic. However, an offset of "-" makes it clear that it's
never consumed.

And it's important that we distinguish "My next offset to read from is 0"
from "I have never read from this partition" because of the configuration
property available to consumers, auto.offset.reset, which allows you to
modify a consumer's behaviour if the last committed offset is null:

auto.offset.reset: What to do when there is no initial offset in Kafka or
if the current offset does not exist any more on the server (e.g. because
that data has been deleted):

   - earliest: automatically reset the offset to the earliest offset
   - latest: automatically reset the offset to the latest offset
   - none: throw exception to the consumer if no previous offset is found
   for the consumer's group
   - anything else: throw exception to the consumer.


Different applications will have business cases for each possible value.
And it requires being able to distinguish a consumer group with a committed
offset from a consumer group that's never committed an offset.

On Wed, Apr 22, 2020 at 8:02 PM 一直以来 <279377...@qq.com> wrote:

> but show "0" and "_", show two value difference???
> thank you !
>
>
> --原始邮件--
> 发件人:"Liam Clarke-Hutchinson" 发送时间:2020年4月21日(星期二) 凌晨4:22
> 收件人:"users"
> 主题:Re: kafka-consumer-groups.sh CURRENT-OFFSET column show "-" ,
> what mean about it ! thank you !
>
>
>
> Hi,
>
> It means the consumer group exists, and Kafka's aware of it's topic
> subscriptions, but either the consumers haven't consumed yet (if using auto
> offset commit) or the consumers haven't committed any offsets, if using
> manual offset committing.
>
> Does that make sense?
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>
> On Tue, Apr 21, 2020 at 8:12 PM 一直以来 <279377...@qq.com wrote:
>
>  ghy@ghy-VirtualBox:~/T/k/bin$ ./kafka-consumer-groups.sh
>  --bootstrap-server localhost:9081 --describe --group test
> 
> 
>  GROUPnbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp;TOPICnbsp; nbsp; nbsp;
>  nbsp; nbsp; nbsp;PARTITIONnbsp;
> CURRENT-OFFSETnbsp;
>  LOG-END-OFFSETnbsp; LAGnbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp;
>  nbsp;CONSUMER-IDnbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp;
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp;
>  nbsp; nbsp; nbsp; HOSTnbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp; CLIENT-ID
>  testnbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> myTopic3nbsp; nbsp; nbsp;
>  nbsp; 4nbsp; nbsp; nbsp; nbsp; nbsp;
> -nbsp; nbsp; nbsp; nbsp;
>  nbsp; nbsp; nbsp; nbsp;1nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp;
>  nbsp; nbsp;-nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp;
>  nbsp;consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 /
> 192.168.1.104nbsp;
>  consumer-test-1
>  testnbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> myTopic3nbsp; nbsp; nbsp;
>  nbsp; 1nbsp; nbsp; nbsp; nbsp; nbsp;
> -nbsp; nbsp; nbsp; nbsp;
>  nbsp; nbsp; nbsp; nbsp;1nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp;
>  nbsp; nbsp;-nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp;
>  nbsp;consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 /
> 192.168.1.104nbsp;
>  consumer-test-1
>  testnbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> myTopic3nbsp; nbsp; nbsp;
>  nbsp; 2nbsp; nbsp; nbsp; nbsp; nbsp;
> -nbsp; nbsp; nbsp; nbsp;
>  nbsp; nbsp; nbsp; nbsp;0nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp;
>  nbsp; nbsp;-nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp;
>  nbsp;consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 /
> 192.168.1.104nbsp;
>  consumer-test-1
>  testnbsp; nbsp; nbsp; nbsp; nbsp; nbsp;
> myTopic3nbsp; nbsp; nbsp;
>  nbsp; 3nbsp; nbsp; nbsp; nbsp; nbsp;
> -nbsp; nbsp; nbsp; nbsp;
>  nbsp; nbsp; nbsp; nbsp;1nbsp; nbsp;
> nbsp; nbsp; nbsp; nbsp;
>  nbsp; nbsp;-nbsp; nbsp; nbsp; nbsp;
> nbsp; nbsp; nbsp;
>  nbsp;consumer-test-1-d5f2f83d-59fa-4c3b-859d-421

?????? kafka-consumer-groups.sh CURRENT-OFFSET column show "-" , what mean about it ! thank you !

2020-04-22 Thread ????????
but show "0" and "_", show two value difference???
thank you !


----
??:"Liam Clarke-Hutchinson"

?????? thank you ! which java-client api can has same effect about kafka-consumer-groups.sh command ?

2020-04-22 Thread ????????
i use :
private static void printConsumerGroupOffsets() throws 
InterruptedException, ExecutionException {
Properties props = new Properties();
props.setProperty("bootstrap.servers",

"192.168.1.100:9081,192.168.1.100:9082,192.168.1.100:9083,,192.168.1.100:9087,,192.168.1.100:9088");
AdminClient client = KafkaAdminClient.create(props);


ListConsumerGroupOffsetsResult listConsumerGroupOffsets = 
client.listConsumerGroupOffsets("test");
KafkaFuture

Re: thank you ! which java-client api can has same effect about kafka-consumer-groups.sh command ?

2020-04-22 Thread Liam Clarke-Hutchinson
Looking at the source code, try listConsumerGroupOffsets(String
groupId, ListConsumerGroupOffsetsOptions options) instead?

On Wed, Apr 22, 2020 at 6:40 PM 一直以来 <279377...@qq.com> wrote:

> ./kafka-consumer-groups.sh --bootstrap-server localhost:9081 --describe
> --group test
>
>
> use describeConsumerGroups method ??
>
>
> private static void print() throws InterruptedException,
> ExecutionException {
> Properties props = new Properties();
> props.setProperty("bootstrap.servers",
> "192.168.1.100:9081,192.168.1.100:9082,
> 192.168.1.100:9083,,192.168.1.100:9087,,192.168.1.100:9088");
> AdminClient client = KafkaAdminClient.create(props);
> DescribeConsumerGroupsResult describeConsumerGroups =
> client.describeConsumerGroups(Arrays.asList("test"));
> Map describedGroups = describeConsumerGroups.describedGroups();
> Iterator describedGroups.keySet().iterator();
> while (iterator.hasNext()) {
> String key = iterator.next();
> KafkaFuture describedGroups.get(key);
> ConsumerGroupDescription consumerGroupDescription
> = value.get();
> Collection consumerGroupDescription.members();
> }
>
>
> }
>
>
>
> but i can't find about any method about bottom column:
> GROUP TOPIC  
>   PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG 
> CONSUMER-ID  HOST
>  CLIENT-ID
>
>
>
> By its three info :
> PARTITION CURRENT-OFFSET LOG-END-OFFSET


thank you ! which java-client api can has same effect about kafka-consumer-groups.sh command ?

2020-04-22 Thread ????????
./kafka-consumer-groups.sh --bootstrap-server localhost:9081 --describe --group 
test


use describeConsumerGroups method ??


private static void print() throws InterruptedException, 
ExecutionException {
Properties props = new Properties();
props.setProperty("bootstrap.servers",

"192.168.1.100:9081,192.168.1.100:9082,192.168.1.100:9083,,192.168.1.100:9087,,192.168.1.100:9088");
AdminClient client = KafkaAdminClient.create(props);
DescribeConsumerGroupsResult describeConsumerGroups = 
client.describeConsumerGroups(Arrays.asList("test"));
Map

Re: kafka-consumer-groups.sh CURRENT-OFFSET column show "-" , what mean about it ! thank you !

2020-04-21 Thread Karolis Pocius
It means no consumer has consumed anything from that partition. Likely
because there's no data in that partition yet.

On Tue, Apr 21, 2020 at 8:12 PM 一直以来 <279377...@qq.com> wrote:

> ghy@ghy-VirtualBox:~/T/k/bin$ ./kafka-consumer-groups.sh
> --bootstrap-server localhost:9081 --describe --group test
>
>
> GROUP TOPIC  
>   PARTITION CURRENT-OFFSET
> LOG-END-OFFSET LAG 
> CONSUMER-ID   
>  
>HOST  CLIENT-ID
> test  myTopic3  
>  4 -   
>1 
>  -  
> consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 
> /192.168.1.104
> consumer-test-1
> test  myTopic3  
>  1 -   
>1 
>  -  
> consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 
> /192.168.1.104
> consumer-test-1
> test  myTopic3  
>  2 -   
>0 
>  -  
> consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 
> /192.168.1.104
> consumer-test-1
> test  myTopic3  
>  3 -   
>1 
>  -  
> consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 
> /192.168.1.104
> consumer-test-1
> test  myTopic3  
>  0 -   
>2 
>  -  
> consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 
> /192.168.1.104
> consumer-test-1


kafka-consumer-groups.sh CURRENT-OFFSET column show "-" , what mean about it ! thank you !

2020-04-21 Thread ????????
ghy@ghy-VirtualBox:~/T/k/bin$ ./kafka-consumer-groups.sh --bootstrap-server 
localhost:9081 --describe --group test


GROUP TOPIC
 PARTITION CURRENT-OFFSET LOG-END-OFFSET 
LAG  CONSUMER-ID  
   
HOST  
CLIENT-ID
test  myTopic3   
 4 - 
  1   
-   
consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 /192.168.1.104 
consumer-test-1
test  myTopic3   
 1 - 
  1   
-   
consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 /192.168.1.104 
consumer-test-1
test  myTopic3   
 2 - 
  0   
-   
consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 /192.168.1.104 
consumer-test-1
test  myTopic3   
 3 - 
  1   
-   
consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 /192.168.1.104 
consumer-test-1
test  myTopic3   
 0 - 
  2   
-   
consumer-test-1-d5f2f83d-59fa-4c3b-859d-4219e16479a8 /192.168.1.104 
consumer-test-1

Re: kafka-consumer-perf-test error when printing metrics

2020-02-21 Thread Jp Silva
I forgot to mention I'm using Kafka 2.4.0.

Regards,

*João Paulo Leonidas Fernandes Dias da Silva aka JP**Lead Consultant -
Software Developer*
emailjsi...@thoughtworks.com
Telephone+1 513 628 7609
[image: ThoughtWorks] <http://www.thoughtworks.com/>
<https://www.thoughtworks.com/perspectives>


On Thu, Feb 20, 2020 at 3:39 PM Jp Silva  wrote:

> Hi,
>
> I'm using kafka-consumer-perf-test but I'm getting an error if I add the
> --print-metrics option.
>
> Here's a snippet of my output including the error:
>
> consumer-fetch-manager-metrics:fetch-size-max:{client-id=consumer-perf-consumer-99250-1}
> : 0.000
> consumer-fetch-manager-metrics:fetch-throttle-time-avg:{client-id=consumer-perf-consumer-99250-1}
>: 0.000
> consumer-fetch-manager-metrics:fetch-throttle-time-max:{client-id=consumer-perf-consumer-99250-1}
>: 0.000
> consumer-fetch-manager-metrics:fetch-total:{client-id=consumer-perf-consumer-99250-1}
>: 72.000
> Exception in thread "main" java.util.IllegalFormatConversionException: f
> != java.lang.Integer
> at
> java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4302)
> at
> java.util.Formatter$FormatSpecifier.printFloat(Formatter.java:2806)
> at java.util.Formatter$FormatSpecifier.print(Formatter.java:2753)
> at java.util.Formatter.format(Formatter.java:2520)
> at java.util.Formatter.format(Formatter.java:2455)
> at java.lang.String.format(String.java:2940)
> at
> scala.collection.immutable.StringLike.format(StringLike.scala:354)
> at
> scala.collection.immutable.StringLike.format$(StringLike.scala:353)
> at scala.collection.immutable.StringOps.format(StringOps.scala:33)
> at
> kafka.utils.ToolsUtils$.$anonfun$printMetrics$3(ToolsUtils.scala:60)
> at
> kafka.utils.ToolsUtils$.$anonfun$printMetrics$3$adapted(ToolsUtils.scala:58)
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at kafka.utils.ToolsUtils$.printMetrics(ToolsUtils.scala:58)
> at
> kafka.tools.ConsumerPerformance$.main(ConsumerPerformance.scala:82)
> at kafka.tools.ConsumerPerformance.main(ConsumerPerformance.scala)
>
> Does anyone have the same issue? Am I doing something wrong?
>
> Regards,
>
> JP Silva
>
>


kafka-consumer-perf-test error when printing metrics

2020-02-20 Thread Jp Silva
Hi,

I'm using kafka-consumer-perf-test but I'm getting an error if I add the 
--print-metrics option.

Here's a snippet of my output including the error:

consumer-fetch-manager-metrics:fetch-size-max:{client-id=consumer-perf-consumer-99250-1}
  : 0.000
consumer-fetch-manager-metrics:fetch-throttle-time-avg:{client-id=consumer-perf-consumer-99250-1}
 : 0.000
consumer-fetch-manager-metrics:fetch-throttle-time-max:{client-id=consumer-perf-consumer-99250-1}
 : 0.000
consumer-fetch-manager-metrics:fetch-total:{client-id=consumer-perf-consumer-99250-1}
 : 72.000
Exception in thread "main" java.util.IllegalFormatConversionException: f != 
java.lang.Integer
at 
java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4302)
at java.util.Formatter$FormatSpecifier.printFloat(Formatter.java:2806)
at java.util.Formatter$FormatSpecifier.print(Formatter.java:2753)
at java.util.Formatter.format(Formatter.java:2520)
at java.util.Formatter.format(Formatter.java:2455)
at java.lang.String.format(String.java:2940)
at scala.collection.immutable.StringLike.format(StringLike.scala:354)
at scala.collection.immutable.StringLike.format$(StringLike.scala:353)
at scala.collection.immutable.StringOps.format(StringOps.scala:33)
at kafka.utils.ToolsUtils$.$anonfun$printMetrics$3(ToolsUtils.scala:60)
at 
kafka.utils.ToolsUtils$.$anonfun$printMetrics$3$adapted(ToolsUtils.scala:58)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at kafka.utils.ToolsUtils$.printMetrics(ToolsUtils.scala:58)
at kafka.tools.ConsumerPerformance$.main(ConsumerPerformance.scala:82)
at kafka.tools.ConsumerPerformance.main(ConsumerPerformance.scala)

Does anyone have the same issue? Am I doing something wrong?

Regards,

JP Silva



Re: Kafka consumer group keeps moving to PreparingRebalance and stops consuming

2019-12-08 Thread Guozhang Wang
Hello Avshalom,

I think the first question to answer is where are the new consumers coming
from. From your description they seem to be not expected (i.e. you did not
intentionally start up new instances), so looking at those VMs that
suddenly start new consumers would be my first shot.


Guozhang

On Sun, Dec 8, 2019 at 2:28 AM Avshalom Manevich 
wrote:

> Hi Boyang,
>
> Thanks for your reply.
> We looked into this direction, but since we didn't change max.poll.interval
> from its default value, we're not sure if it's the case.
>
>
> On Fri, 6 Dec 2019 at 17:42, Boyang Chen 
> wrote:
>
> > Hey Avshalom,
> >
> > the consumer instance is initiated per stream thread. You will not be
> > creating new consumers so the root cause is definitely member timeout.
> > Have you changed the max.poll.interval by any chance? That config
> controls
> > how long you tolerate the interval between poll calls to make sure
> progress
> > is being made. If it's very tight, the consumer could
> > stop sending heartbeats once progress is slow.
> >
> > Best,
> > Boyang
> >
> > On Fri, Dec 6, 2019 at 7:12 AM Avshalom Manevich 
> > wrote:
> >
> > > We have a Kafka Streams consumer group that keep moving to
> > > PreparingRebalance state and stop consuming. The pattern is as follows:
> > >
> > >1.
> > >
> > >Consumer group is running and stable for around 20 minutes
> > >2.
> > >
> > >New consumers (members) start to appear in the group state without
> any
> > >clear reason, these new members only originate from a small number
> of
> > > VMs
> > >(not the same VMs each time), and they keep joining
> > >3. Group state changes to PreparingRebalance
> > >4. All consumers stop consuming, showing these logs: "Group
> > coordinator
> > >... is unavailable or invalid, will attempt rediscovery"
> > >5. The consumer on VMs that generated extra members show these logs:
> > >
> > > Offset commit failed on partition X at offset Y: The coordinator is not
> > > aware of this member.
> > >
> > > Failed to commit stream task X since it got migrated to another thread
> > > already. Closing it as zombie before triggering a new rebalance.
> > >
> > > Detected task Z that got migrated to another thread. This implies that
> > this
> > > thread missed a rebalance and dropped out of the consumer group. Will
> try
> > > to rejoin the consumer group.
> > >
> > >
> > >1. We kill all consumer processes on all VMs, the group moves to
> Empty
> > >with 0 members, we start the processes and we're back to step 1
> > >
> > > Kafka version is 1.1.0, streams version is 2.0.0
> > >
> > > We took thread dumps from the misbehaving consumers, and didn't see
> more
> > > consumer threads than configured.
> > >
> > > We tried restarting kafka brokers, cleaning zookeeper cache.
> > >
> > > We suspect that the issue has to do with missing heartbeats, but the
> > > default heartbeat is 3 seconds and message handling times are no where
> > near
> > > that.
> > >
> > > Anyone encountered a similar behaviour?
> > >
> >
>
>
> --
> *Avshalom Manevich*
>


-- 
-- Guozhang


Re: Kafka consumer group keeps moving to PreparingRebalance and stops consuming

2019-12-08 Thread Jamie
Hi Avshalom, 
Have you tried increasing the session timeout? What's the current session 
timeout?
Regarding the max.poll.interval.ms - this is the maximum time between calls to 
poll of the consumer, are there any possible scenarios where the processing of 
one lot of messages from the consumer (max.poll.records) could take longer than 
the time configured for max.poll.interval.ms? Maybe you could log when the 
records are returned to the streams task and then when the records have 
finished processing to determine how long this normally takes?  

Thanks, 
Jamie


-Original Message-
From: Avshalom Manevich 
To: users 
Sent: Sun, 8 Dec 2019 10:28
Subject: Re: Kafka consumer group keeps moving to PreparingRebalance and stops 
consuming

Hi Boyang,

Thanks for your reply.
We looked into this direction, but since we didn't change max.poll.interval
from its default value, we're not sure if it's the case.


On Fri, 6 Dec 2019 at 17:42, Boyang Chen  wrote:

> Hey Avshalom,
>
> the consumer instance is initiated per stream thread. You will not be
> creating new consumers so the root cause is definitely member timeout.
> Have you changed the max.poll.interval by any chance? That config controls
> how long you tolerate the interval between poll calls to make sure progress
> is being made. If it's very tight, the consumer could
> stop sending heartbeats once progress is slow.
>
> Best,
> Boyang
>
> On Fri, Dec 6, 2019 at 7:12 AM Avshalom Manevich 
> wrote:
>
> > We have a Kafka Streams consumer group that keep moving to
> > PreparingRebalance state and stop consuming. The pattern is as follows:
> >
> >    1.
> >
> >    Consumer group is running and stable for around 20 minutes
> >    2.
> >
> >    New consumers (members) start to appear in the group state without any
> >    clear reason, these new members only originate from a small number of
> > VMs
> >    (not the same VMs each time), and they keep joining
> >    3. Group state changes to PreparingRebalance
> >    4. All consumers stop consuming, showing these logs: "Group
> coordinator
> >    ... is unavailable or invalid, will attempt rediscovery"
> >    5. The consumer on VMs that generated extra members show these logs:
> >
> > Offset commit failed on partition X at offset Y: The coordinator is not
> > aware of this member.
> >
> > Failed to commit stream task X since it got migrated to another thread
> > already. Closing it as zombie before triggering a new rebalance.
> >
> > Detected task Z that got migrated to another thread. This implies that
> this
> > thread missed a rebalance and dropped out of the consumer group. Will try
> > to rejoin the consumer group.
> >
> >
> >    1. We kill all consumer processes on all VMs, the group moves to Empty
> >    with 0 members, we start the processes and we're back to step 1
> >
> > Kafka version is 1.1.0, streams version is 2.0.0
> >
> > We took thread dumps from the misbehaving consumers, and didn't see more
> > consumer threads than configured.
> >
> > We tried restarting kafka brokers, cleaning zookeeper cache.
> >
> > We suspect that the issue has to do with missing heartbeats, but the
> > default heartbeat is 3 seconds and message handling times are no where
> near
> > that.
> >
> > Anyone encountered a similar behaviour?
> >
>


-- 
*Avshalom Manevich*


Re: Kafka consumer group keeps moving to PreparingRebalance and stops consuming

2019-12-08 Thread Avshalom Manevich
Hi Boyang,

Thanks for your reply.
We looked into this direction, but since we didn't change max.poll.interval
from its default value, we're not sure if it's the case.


On Fri, 6 Dec 2019 at 17:42, Boyang Chen  wrote:

> Hey Avshalom,
>
> the consumer instance is initiated per stream thread. You will not be
> creating new consumers so the root cause is definitely member timeout.
> Have you changed the max.poll.interval by any chance? That config controls
> how long you tolerate the interval between poll calls to make sure progress
> is being made. If it's very tight, the consumer could
> stop sending heartbeats once progress is slow.
>
> Best,
> Boyang
>
> On Fri, Dec 6, 2019 at 7:12 AM Avshalom Manevich 
> wrote:
>
> > We have a Kafka Streams consumer group that keep moving to
> > PreparingRebalance state and stop consuming. The pattern is as follows:
> >
> >1.
> >
> >Consumer group is running and stable for around 20 minutes
> >2.
> >
> >New consumers (members) start to appear in the group state without any
> >clear reason, these new members only originate from a small number of
> > VMs
> >(not the same VMs each time), and they keep joining
> >3. Group state changes to PreparingRebalance
> >4. All consumers stop consuming, showing these logs: "Group
> coordinator
> >... is unavailable or invalid, will attempt rediscovery"
> >5. The consumer on VMs that generated extra members show these logs:
> >
> > Offset commit failed on partition X at offset Y: The coordinator is not
> > aware of this member.
> >
> > Failed to commit stream task X since it got migrated to another thread
> > already. Closing it as zombie before triggering a new rebalance.
> >
> > Detected task Z that got migrated to another thread. This implies that
> this
> > thread missed a rebalance and dropped out of the consumer group. Will try
> > to rejoin the consumer group.
> >
> >
> >1. We kill all consumer processes on all VMs, the group moves to Empty
> >with 0 members, we start the processes and we're back to step 1
> >
> > Kafka version is 1.1.0, streams version is 2.0.0
> >
> > We took thread dumps from the misbehaving consumers, and didn't see more
> > consumer threads than configured.
> >
> > We tried restarting kafka brokers, cleaning zookeeper cache.
> >
> > We suspect that the issue has to do with missing heartbeats, but the
> > default heartbeat is 3 seconds and message handling times are no where
> near
> > that.
> >
> > Anyone encountered a similar behaviour?
> >
>


-- 
*Avshalom Manevich*


Re: Kafka consumer group keeps moving to PreparingRebalance and stops consuming

2019-12-06 Thread Boyang Chen
Hey Avshalom,

the consumer instance is initiated per stream thread. You will not be
creating new consumers so the root cause is definitely member timeout.
Have you changed the max.poll.interval by any chance? That config controls
how long you tolerate the interval between poll calls to make sure progress
is being made. If it's very tight, the consumer could
stop sending heartbeats once progress is slow.

Best,
Boyang

On Fri, Dec 6, 2019 at 7:12 AM Avshalom Manevich 
wrote:

> We have a Kafka Streams consumer group that keep moving to
> PreparingRebalance state and stop consuming. The pattern is as follows:
>
>1.
>
>Consumer group is running and stable for around 20 minutes
>2.
>
>New consumers (members) start to appear in the group state without any
>clear reason, these new members only originate from a small number of
> VMs
>(not the same VMs each time), and they keep joining
>3. Group state changes to PreparingRebalance
>4. All consumers stop consuming, showing these logs: "Group coordinator
>... is unavailable or invalid, will attempt rediscovery"
>5. The consumer on VMs that generated extra members show these logs:
>
> Offset commit failed on partition X at offset Y: The coordinator is not
> aware of this member.
>
> Failed to commit stream task X since it got migrated to another thread
> already. Closing it as zombie before triggering a new rebalance.
>
> Detected task Z that got migrated to another thread. This implies that this
> thread missed a rebalance and dropped out of the consumer group. Will try
> to rejoin the consumer group.
>
>
>1. We kill all consumer processes on all VMs, the group moves to Empty
>with 0 members, we start the processes and we're back to step 1
>
> Kafka version is 1.1.0, streams version is 2.0.0
>
> We took thread dumps from the misbehaving consumers, and didn't see more
> consumer threads than configured.
>
> We tried restarting kafka brokers, cleaning zookeeper cache.
>
> We suspect that the issue has to do with missing heartbeats, but the
> default heartbeat is 3 seconds and message handling times are no where near
> that.
>
> Anyone encountered a similar behaviour?
>


Kafka consumer group keeps moving to PreparingRebalance and stops consuming

2019-12-06 Thread Avshalom Manevich
We have a Kafka Streams consumer group that keep moving to
PreparingRebalance state and stop consuming. The pattern is as follows:

   1.

   Consumer group is running and stable for around 20 minutes
   2.

   New consumers (members) start to appear in the group state without any
   clear reason, these new members only originate from a small number of VMs
   (not the same VMs each time), and they keep joining
   3. Group state changes to PreparingRebalance
   4. All consumers stop consuming, showing these logs: "Group coordinator
   ... is unavailable or invalid, will attempt rediscovery"
   5. The consumer on VMs that generated extra members show these logs:

Offset commit failed on partition X at offset Y: The coordinator is not
aware of this member.

Failed to commit stream task X since it got migrated to another thread
already. Closing it as zombie before triggering a new rebalance.

Detected task Z that got migrated to another thread. This implies that this
thread missed a rebalance and dropped out of the consumer group. Will try
to rejoin the consumer group.


   1. We kill all consumer processes on all VMs, the group moves to Empty
   with 0 members, we start the processes and we're back to step 1

Kafka version is 1.1.0, streams version is 2.0.0

We took thread dumps from the misbehaving consumers, and didn't see more
consumer threads than configured.

We tried restarting kafka brokers, cleaning zookeeper cache.

We suspect that the issue has to do with missing heartbeats, but the
default heartbeat is 3 seconds and message handling times are no where near
that.

Anyone encountered a similar behaviour?


Kafka consumer metrics

2019-12-04 Thread Branko Terzic
Hi everyone,

we want to get Kafka consumer group metrics (throttling and byte rate, for
example).

We have done this already, using:
1. JMX Mbean of the Kafka consumer Java application
2.  CLI utility:  *bin/kafka-consumer-groups.sh --describe --group
group_name --bootstrap-server localhost:port*.

Question:
Can this be done programmatically using some Java libraries?

So far, we have tried kafka.admin.ConsumerGroupCommand, which is also used
by the *bin/kafka bin/kafka-consumer-groups.sh*, but we got only the subset
of expected values.

Thank you for your help.

Regards,

Branko Terzic


kafka-consumer-groups timeout when setting an offset to a specific datetime

2019-12-02 Thread Bernd Wittefeld
Hi everyone,

we use kafka 2.3.0 from the confluent-kafka-2.11 Debian package on Debian 10.

When we want to set an offset of a consumer to a datetime, we get a timeout 
error even if we use the timeout switch of the kafka-consumer-groups script:

> kafka-consumer-groups --bootstrap-server localhost:9092 --group cg1 --topic 
> 'topic1' --reset-offsets --to-datetime '2019-11-30T00:00:00.000' --timeout 
> 120 --dry-run

> Error: Executing consumer group command failed due to Failed to get offsets 
> by times in 60003ms
> org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by 
> times in 60003ms

Where can we set this timeout? As far as we could see from the documents this 
should be a setting on the client side: request.timeout.ms.
We are using the kafka-consumer-groups tool that is shipped with the confluent 
distribution, but this is just a wrapper to use the kafka-run-class tool, as it 
is with the vanilla kafka distribution.

Setting the offset to a specific offset value or to the earliest works without 
any issues.

Is there any way to set the offset to a datetime value? This would be very 
helpful. Can you guide on where we need to change the config so that this 
timeout doesn't happen?


Best regards,
Bernd


Few Kafka Consumer threads do not consume

2019-10-30 Thread Debi Chowdhuri
Kafka details:
Kafka Version: kafka_2.12-2.2.0
3 broker Kafka Cluster
Kafka topic configs - Partitions 60, Replication Factor 3, 
min-insync-replica 2
 
Overview of Application from Kafka scenario:
 
1. The Collector application recieves data from Kafka topic (60 
Partitions.)
2. Application has 4 instances running on 4 different server machines.
3. An application instantance initiates multiple consumer threads.
4. This is done through Java Executor service
   eg:
   ExecutorService es = Executors.newFixedThreadPool(NoOfconsumers);
 
5. An auto.offset.reset: 'earliest'  property is also passed to each kafka 
consumer threads.
   
6. Each single partition in the topic is assigned to each single consumer 
thread using the Kafka Assign Method.
7. A "seekToBeginning" method is called inside the kafka Consumer thread 
to consume from first offset from the Kafka Partition. 
8. The Kafka consumer group-id is kept constant throughout the application 
instances.
9.  The Kafka consumer threads are closed using KafkaConsumer.close() 
method once all the records from its respective partition are consumed 
from the topic
 
 
 
Issue Description:
 
1. On some occasions, One or more Kafka consumer threads, do not fetch 
from first offset of the kafka partitions, ie. they start to consume from 
between. 
2. Thus some of the data from kafka topic is missed and not consumed. 
3. This pattern of in-correct consumption can be observed on any of the 4 
Servers and is not constant.
4. There are also instances where the all the threads have consumed from 
the first offsets of their assigned partitions successfully.

Is this issue related to the current version of Kafka we are using (
kafka_2.12-2.2.0)  and its corresponding bug has been fixed in later 
versions ?
Please advise on this.



Thanks & Regards
Debi Chowdhuri
Tata Consultancy Services Limited
Mumbai,Maharashtra
India
Mailto: debi.chowdh...@tcs.com
Website: http://www.tcs.com

Experience certainty.   IT Services
Business Solutions
Consulting

=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you




Re: Kafka consumer Fetcher several Ignoring fetched records logs

2019-09-17 Thread Aminouvic
Hello, 

My apologies for the late reply.

No the data isn't getting deleted faster than the consumer is reading it.

We also notice when this happens the number of Replica Fetchers doubles on the 
broker side and the total of out bytes drops significantly.

Regards, 
Amine

On 2019/09/07 04:18:18, Hrishikesh Mishra  wrote: 
> Can you check whether its happening because logs are getting purge very
> fast.
> 
> On Sat, 7 Sep 2019 at 12:18 AM, Aminouvic  wrote:
> 
> > Hello all,
> >
> > We're noticing several logs on our consumer apps similar to the following :
> >
> > 2019-09-06 17:56:36,933 DEBUG
> > org.apache.kafka.clients.consumer.internals.Fetcher   - Ignoring fetched
> > records for mytopic-7 at offset 45704161910 since the current position is
> > 45704162370
> >
> > Any idea on what would cause this ?
> > Did anyone ever encounter this kind of logs?
> >
> > Kafka Cluster version 0.10.1
> > Kafka Client version 0.10.2
> >
> > Best regards,
> > Amine
> >
> >
> >
> 


Re: Kafka consumer Fetcher several Ignoring fetched records logs

2019-09-06 Thread Hrishikesh Mishra
Can you check whether its happening because logs are getting purge very
fast.

On Sat, 7 Sep 2019 at 12:18 AM, Aminouvic  wrote:

> Hello all,
>
> We're noticing several logs on our consumer apps similar to the following :
>
> 2019-09-06 17:56:36,933 DEBUG
> org.apache.kafka.clients.consumer.internals.Fetcher   - Ignoring fetched
> records for mytopic-7 at offset 45704161910 since the current position is
> 45704162370
>
> Any idea on what would cause this ?
> Did anyone ever encounter this kind of logs?
>
> Kafka Cluster version 0.10.1
> Kafka Client version 0.10.2
>
> Best regards,
> Amine
>
>
>


Kafka consumer Fetcher several Ignoring fetched records logs

2019-09-06 Thread Aminouvic
Hello all, 

We're noticing several logs on our consumer apps similar to the following :

2019-09-06 17:56:36,933 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher   - Ignoring fetched 
records for mytopic-7 at offset 45704161910 since the current position is 
45704162370

Any idea on what would cause this ?
Did anyone ever encounter this kind of logs?

Kafka Cluster version 0.10.1
Kafka Client version 0.10.2

Best regards,
Amine




Kafka consumer is not reading some partition

2019-07-23 Thread Sergey Fedorov
Hello. I was using Kafka 2.1.1 and facing a problem where our consumers
sometimes intermittently stop consuming from one or two of the partitions. My
config


Re: Kafka Consumer API - partition lags

2019-06-25 Thread Rahul Singh
Hi Garvit,

You can check here https://kafka.apache.org/documentation

Thanks,
Rahul

On Tue, Jun 25, 2019 at 4:11 PM Garvit Sharma  wrote:

> Hi All,
>
> I am looking for Kafka consumer API documentation to understand how it
> works internally.
>
> I am facing a problem where my consumer group is lagging behind very badly
> only on some partitions of the topic.
>
> Please let me know.
>
> Thanks,
>
>
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.
>


Kafka Consumer API - partition lags

2019-06-25 Thread Garvit Sharma
Hi All,

I am looking for Kafka consumer API documentation to understand how it
works internally.

I am facing a problem where my consumer group is lagging behind very badly
only on some partitions of the topic.

Please let me know.

Thanks,


-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Re: Kafka consumer lag does not work for just one topic.

2019-06-17 Thread 1095193290



On 2019/06/13 22:24:26, Shrikant Patel  wrote: 
> kafka-consumer-groups.sh  fails for just one consumer group -
> group-vendor-cust. It work for every other consumer group. Command errors
> out complaining about the timeout, but adding --timeout does not help
> either.
> 
> I don't know whats wrong or how go about debug this.
> 
> Any help or suggestion?? Thanks Shri
> 
> *bash-4.2$ /opt/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server
> x.x.x.x:9093 --list --command-config
> /opt/kafka/config/client-ssl.properties*
> eopn-group-vendor-cust
> group-vendor-cust
> console-consumer-52725
> group-cust
> error-group-vendor-cust
> error-eopn-group-vendor-cust
> 
> 
> *bash-4.2$ /opt/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server
> x.x.x.x:9093 --describe --group group-vendor-cust --command-config
> /opt/kafka/config/client-ssl.properties*Error: Executing consumer group
> command failed due to Failed to send request after 5000 ms.
> 
> 
> *bash-4.2$ /opt/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server
> x.x.x.x:9093 --describe --group group-vendor-cust --command-config
> /opt/kafka/config/client-ssl.properties --timeout 1*Error: Executing
> consumer group command failed due to Failed to send request after 5000 ms.
> 
> 
> *bash-4.2$ /opt/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server
> x.x.x.x:9093 --describe --group group-cust --command-config
> /opt/kafka/config/client-ssl.properties*
> TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> CONSUMER-IDHOST
>CLIENT-ID
> patient-cust7  59  59  0
> pnmConsumer[patient-cust#rx-cust]-15713999-459b-48de-870d-8579179d57b9
> /x.x.x.x  pnmConsumer[patient-cust#rx-cust]
> rx-cust 7  23608   23608   0
> .
> Could you change the log-level of log4j.properties and provide more 
> information?


Kafka consumer lag does not work for just one topic.

2019-06-13 Thread Shrikant Patel
kafka-consumer-groups.sh  fails for just one consumer group -
group-vendor-cust. It work for every other consumer group. Command errors
out complaining about the timeout, but adding --timeout does not help
either.

I don't know whats wrong or how go about debug this.

Any help or suggestion?? Thanks Shri

*bash-4.2$ /opt/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server
x.x.x.x:9093 --list --command-config
/opt/kafka/config/client-ssl.properties*
eopn-group-vendor-cust
group-vendor-cust
console-consumer-52725
group-cust
error-group-vendor-cust
error-eopn-group-vendor-cust


*bash-4.2$ /opt/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server
x.x.x.x:9093 --describe --group group-vendor-cust --command-config
/opt/kafka/config/client-ssl.properties*Error: Executing consumer group
command failed due to Failed to send request after 5000 ms.


*bash-4.2$ /opt/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server
x.x.x.x:9093 --describe --group group-vendor-cust --command-config
/opt/kafka/config/client-ssl.properties --timeout 1*Error: Executing
consumer group command failed due to Failed to send request after 5000 ms.


*bash-4.2$ /opt/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server
x.x.x.x:9093 --describe --group group-cust --command-config
/opt/kafka/config/client-ssl.properties*
TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
CONSUMER-IDHOST
   CLIENT-ID
patient-cust7  59  59  0
pnmConsumer[patient-cust#rx-cust]-15713999-459b-48de-870d-8579179d57b9
/x.x.x.x  pnmConsumer[patient-cust#rx-cust]
rx-cust 7  23608   23608   0
.


RE: kafka consumer metadata expire

2019-04-25 Thread 赖剑清
Hi,

Have you tried setting the METADATA_MAX_AGE_CONFIG (default: 300,000ms) smaller?
It seems the consumer won't actually update the metadata info until it's out of 
date.

>-Original Message-
>From: Shengnan YU [mailto:ysna...@hotmail.com]
>Sent: Wednesday, April 24, 2019 1:43 PM
>To: users@kafka.apache.org
>Subject: kafka consumer metadata expire
>
>Hi everyone
>How to update kafka consumer's metadata when some topics are deleted.
>The metadata in consumer will not be expired or remove outdated topic,
>which leads to UNKNOWN_TOPIC exception when fetching metadata from
>the cluster. Thank you very mych.
>
><https://maas.mail.163.com/dashi-web-
>extend/html/proSignature.html?ftlId=1=ysnakie=ysnakie%40hot
>mail.com=http%3A%2F%2Fmail-
>online.nosdn.127.net%2Fsmc8371a9788890d59e567ed336b96676b.jpg
>=%5B%22ysnakie%40hotmail.com%22%5D>
>[http://mail-
>online.nosdn.127.net/smc8371a9788890d59e567ed336b96676b.jpg]
>ysnakie
>
>ysna...@hotmail.com
>
>签名由 网易邮箱大师<https://mail.163.com/dashi/dlpro.html?from=mail81>
>定制


Kafka consumer downgrade issue

2019-04-24 Thread Andreas Nilsson
Hi all,

Recently we upgraded our application from the more primitive Java client APIs 
(kafka.javaapi.consumer.SimpleConsumer, kafka.api.FetchRequest and friends) to 
the more friendly poll-based org.apache.kafka.clients.consumer.KafkaConsumer 
using Kafka Java client libraries version 1.1.0.

The upgrade went fine and meant we could remove a LOT of custom code we had 
previously needed to use. This was also released into a version of the 
application that went into QA / staging environments of a client of ours. The 
upgrade (environment was not wiped) in the staging environment went fine as 
well without any errors. Due to an unrelated serious issue that we discovered a 
couple of days later, we had to advice the client a roll-back from the new 
application version (still had not gone into production). We didn't think this 
would lead to any issues.

What DID occur when the application was downgraded in the staging client 
environment, however, was Kafka clients starting to get a lot of offset fetch 
errors:

WARN  [2019-04-24 01:19:43,269] custom.application.package.KafkaClient: Failed 
to fetch messages from Kafka. Error code: 1

Now, error code 1 means offset out of range as far as I'm aware. I would not 
have expected this since I was pretty sure offsets were stored in ZooKeeper 
with the old SimpleConsumer (I can see the offsets being stored in nodes like 
/consumers//offsets//0) and in Kafka internal topic 
__consumer_offsets with the new KafkaConsumer (I can NOT see offsets in 
ZooKeeper for a couple of new-style consumers we already had).

The ZooKeeper-stored consumer offsets should also never have been "out of 
range" since there would strictly always be more events since they were last 
used. Could this error simply mean that the ZooKeeper offsets was TOO OLD 
(meaning Kafka had thrown away messages)?

I wouldn't have been surprised to see some kind of error if the offset data was 
shared.

This is NOT a serious issue for us at the moment, we were just surprised that 
the downgrade ran into errors.

Regards,
Andreas Nilsson


  1   2   3   4   5   6   7   >