Re: Facing MESSAGE_TOO_LARGE

2022-07-01 Thread M. Manna
Hi, you need to increase record and message size because your real message
payload is bigger than what’s mention in properties file.

Regards,

On Fri, 1 Jul 2022 at 20:24, Divya Jain 
wrote:

> Hi,
>
> I am facing this issue:
> 2022-07-01 19:01:05,548] INFO Topic 'postgres.public.content_history'
> already exists. (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
> [2022-07-01 19:01:05,641] INFO
> WorkerSourceTask{id=smtip-de-content2-source-connector-0} Committing
> offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
> [2022-07-01 19:01:05,642] INFO
> WorkerSourceTask{id=smtip-de-content2-source-connector-0} flushing 155
> outstanding messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
> [2022-07-01 19:01:06,034] INFO
> WorkerSourceTask{id=smtip-de-content2-source-connector-0} Finished
> commitOffsets successfully in 393 ms
> (org.apache.kafka.connect.runtime.WorkerSourceTask:586)
> [2022-07-01 19:01:06,575] WARN [Producer
> clientId=connector-producer-smtip-de-content2-source-connector-0] Got error
> produce response in correlation id 610 on topic-partition
> postgres.public.content_history-1, splitting and retrying (2147483647
> attempts left). Error: MESSAGE_TOO_LARGE
> (org.apache.kafka.clients.producer.internals.Sender:582)
> [2022-07-01 19:01:06,843] ERROR
> WorkerSourceTask{id=smtip-de-content2-source-connector-0} failed to send
> record to postgres.public.content_history:
>  (org.apache.kafka.connect.runtime.WorkerSourceTask:372)
> org.apache.kafka.common.errors.RecordTooLargeException: The request
> included a message larger than the max message size the server will accept.
> [2022-07-01 19:01:06,927] WARN [Producer
> clientId=connector-producer-smtip-de-content2-source-connector-0] Got error
> produce response in correlation id 643 on topic-partition
> postgres.public.content_history-0, splitting and retrying (2147483647
> attempts left). Error: MESSAGE_TOO_LARGE
> (org.apache.kafka.clients.producer.internals.Sender:582)
> [2022-07-01 19:01:06,941] INFO The task will send records to topic
> 'postgres.public.content_attributes' for the first time. Checking whether
> topic exists (org.apache.kafka.connect.runtime.WorkerSourceTask:419)
> [2022-07-01 19:01:06,943] INFO Topic 'postgres.public.content_attributes'
> already exists. (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
>
>
> The properties I am using in my worker.properties are: I have tried
> combination of properties I am not sure how to fix it. Please guide on the
> same.
>
> offset.flush.timeout.ms=6
>
> offset.flush.interval.ms=5000
>
> max.partition.fetch.bytes=2147483647
>
> max.request.size=2147483647
>
> max.message.bytes=2147483647
>
> message.max.bytes=2147483647
>
> replica.fetch.max.bytes=2147483647
>
> producer.security.protocol=SSL
>
> producer.max.request.size=2147483647
>
> connector.client.config.override.policy=All
>
> Thanks & Regards
> Divya Jain
>


Advertised.listeners to be used behind VIP Load Balancer

2021-07-04 Thread M. Manna
Hello,

Is it currently possible to use a single endpoint for advertised.listeners,
which is in front of all my brokers? the flow for example

broker1-->| V
broker2-->| I
broker3-->| P

I was under the impression that I can reach any broker I wanted (assuming
that broker is 100% guaranteed to be healthy at all times, but it can never
be), and the broker coordination service will pass it onto the leader for
that topicpartition. May be I am wrong.

Has anyone done this successfully in production/staging/development?

Regards,


Re: Certificate request not coming mtls

2021-06-25 Thread M. Manna
1. What is it that you’ve tried ?
2. What config changes have you made?
3. What do you expect to see?

On Fri, 25 Jun 2021 at 09:22, Anjali Sharma 
wrote:

> Hii All,
>
>
> Can you please help with this?
>
> While trying for mtls ssl.client.aut=required, server side in certificate
> request the DN are for some junk certificates which we have not deployed on
> server
>


Re: SAP PI/PO -> Kafka integration options

2021-03-17 Thread M. Manna
Kafka doesn’t have a rest proxy. Confluent does.
Also, Instaclustr offers a Kafka REST proxy.

Also, SAP has 100s of products including SAP Cloud Platform. So not sure
what this PI/PO means for your case.

Unless there’s something I’m unaware of, you referring to non-Apache
offering here. You might want to browse Confluent to understand what they
have for proxy.

Thanks,




On Wed, 17 Mar 2021 at 20:57, Blakaj Arian  wrote:

> Hi,
>
> My name is Arian. I work at Scania as an architect.
>
> We have SAP in our landscape and are using PI/PO as our integration
> platform. We are interested to know what type of integration options there
> is to integrate to Kafka from a SAP PI/PO perspective. We know that Kafka
> has an API proxy which we can utilize with our REST adapter but we are
> concerned as to if that will be sufficient enough for all of our use cases.
> There are adapters available for purchase (for instance Advantco Kafka
> adapter for PI/PO) but we would like to know if there is other options
> before we should take a look into that.
>
> Please also provide useful links, whitepaper regarding this (SAP PI/PO ->
> Kafka). Perhaps a meeting/demo session can be arranged.
>
> BR
> Arian
>


Re: Polling behaviour when a consumer assigned with multiple topics-partition

2021-02-01 Thread M. Manna
Hey Upendra,

On Mon, 1 Feb 2021 at 05:32, Upendra Yadav  wrote:

> Hi,
>
> I want to know the polling behaviour when a consumer is assigned with
> multiple topic-partitions.
>
> 1. In a single poll will it get messages from multiple topic-partitions. Or
> in one poll only one topic-partition's messages will come?


  If you have either subscribed/assigned strategy, you will get all
available messages under the specific TopicPartions. Please read about
subscribe/assign on Consumer API doc.


>
> 2. How does it choose topic-partitions for next polling? Any default
> class/strategy assigned for this, that we can reconfigure?


 Same as above. Please read the consumer API as they have the full
explanation on consumer behaviour on partition assignment, rebalancing, and
polling behaviours.


>
> Thanks
> Upendra
>


Re: Kafka Consumer Consumption based on TimeStamp-based position

2021-01-23 Thread M. Manna
Thanks. Just realised that it was in the API since 0.11.0. Thanks Steve.

On Sat, 23 Jan 2021 at 12:42, Steve Howard 
wrote:

> Hi,
>
> Yes, you can use the offsetsForTimes() method.  See below for a simple
> example that should get you started...
>
> import org.apache.kafka.clients.consumer.*;
> import org.apache.kafka.common.config.ConfigException;
> import org.apache.kafka.common.*;
> import java.io.*;
> import java.time.Duration;
> import java.util.*;
> import java.text.*;
>
> public class searchByTime {
>   static KafkaConsumer c;
>
>   public static void main(String args[]) throws Exception {
> Properties props = new Properties();
> props.put("bootstrap.servers","localhost:9092");
> props.put("max.poll.records",1);
> props.put("topic","yourtopicname");
> props.put("group.id",UUID.randomUUID().toString());
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> c = new KafkaConsumer(props);
> String topic = (String)props.get("topic");
> c.subscribe(Collections.singletonList(topic));
> System.out.println("subscribed to topic " + topic);
> System.out.println(c.partitionsFor(topic));
> List partitions = new ArrayList();
> for (PartitionInfo p: c.partitionsFor(topic)) {
>   partitions.add(new TopicPartition(topic,p.partition()));
> }
> System.out.println(partitions);
>
> long timestamp = Long.parseLong(args[0]);
> Map partitionOffsetsRequest = new
> HashMap<>(partitions.size());
> for (TopicPartition partition : partitions) {
>   partitionOffsetsRequest.put(new TopicPartition(partition.topic(),
> partition.partition()),
>   timestamp);
> }
>
> final Map result = new
> HashMap<>(partitions.size());
>
> for (Map.Entry partitionToOffset :
>   c.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
> result.put(new TopicPartition(partitionToOffset.getKey().topic(),
> partitionToOffset.getKey().partition()),
>  (partitionToOffset.getValue() == null)
> ? null : partitionToOffset.getValue().offset());
>   }
>
> System.out.println(result);
> ConsumerRecords records =
> c.poll(Duration.ofSeconds(1));
> for (TopicPartition part: result.keySet()){
>   long offset = result.get(part);
>   c.seek(part,offset);
> }
>
> System.out.println("trying to get records...");
> records = c.poll(Duration.ofSeconds(1));
> for (ConsumerRecord record : records) {
>   Date date = new Date(record.timestamp());
>   DateFormat formatter = new SimpleDateFormat("HH:mm:ss.SSS");
>   formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
>   String dateFormatted = formatter.format(date);
>   System.out.println("Received message: (" + record.key() + ", " +
> record.value() + ") at offset " + record.offset() + " at time " +
> dateFormatted);
> }
>   }
> }
>
> Thanks,
>
> Steve
>
>
> On Sat, Jan 23, 2021 at 6:14 AM M. Manna  wrote:
>
> > Hello,
> >
> > We know that using KafkaConsumer api we can replay messages from certain
> > offsets. However, we are not sure if we could specify timeStamp from
> which
> > we could replay messages again.
> >
> > Does anyone know if this is possible?
> >
> > Regards,
> >
>


Kafka Consumer Consumption based on TimeStamp-based position

2021-01-23 Thread M. Manna
Hello,

We know that using KafkaConsumer api we can replay messages from certain
offsets. However, we are not sure if we could specify timeStamp from which
we could replay messages again.

Does anyone know if this is possible?

Regards,


Transactional Producers and Apparent Disk Space for Brokers

2020-09-19 Thread M. Manna
Hello,

We have question regarding transactional producer states and disk space
usage.

We did a quick and dirty test recently with 3 Simple Java client producers
writing to compressed and topics with compression.type set for both
producers and topic correctly. We performed two rounds of tests (with
Transactional and Non-Transactional Producers).

What we see is that the disk space usage quite high when there are
transactional producers. It's true that once the cleanup happens, some of
these data is expected to be cleaned up and we can reclaim disk space. But
until the files are closed, the disk space usage is quite significant
compared to non-transactional.

For our test, all our producers ran for 24 minutes. For transactional
producers, the kafka log directory apparent-size was 2GB. But for
transactional, it was 60 MB. Our disk space alert was fired based on the
apparent-size. And once we shutdown all our brokers, the directory sizes
changed to 2.2 MB (for txn) and 1.5 MB (for non-txn)

Is this expected for transactional producer-based writes? If so, what is
the recommendation for optimising producers (e.g. any official docs etc.) ?

Regards,


Re: Compression.type=GZIP without actual compression

2020-09-18 Thread M. Manna
We are using official Apache Kafka stuff. Nothing different.



On Fri, 18 Sep 2020 at 03:30, Shaohan Yin  wrote:

> Hi,
>
>
>
> Are you using the official Java client or any third party clients?
>
>
>
> On Thu, 17 Sep 2020 at 22:53, M. Manna  wrote:
>
>
>
> > Hey Shaohan,
>
> >
>
> > Thanks for your reply, much appreciated. We are using Kafka 2.4.1.
> Although
>
> > we use the Confluent platform under Confluent Licence, I don't think this
>
> > matters.
>
> >
>
> > We are using compression.type=GZIP for all our producers.
>
> >
>
> > We create all topics with compression.type=GZIP i.e. per topic level.
> When
>
> > I do a "describe" of a kafka topic, I see metadata confirming that it's
>
> > enabled for GZIP compressed data.
>
> >
>
> > That's all I could say.
>
> >
>
> > Regards,
>
> >
>
> >
>
> >
>
> >
>
> > On Thu, 17 Sep 2020 at 15:01, Shaohan Yin  wrote:
>
> >
>
> > > Hi,
>
> > >
>
> > > Could you specify your version and the protocol of your broker?
>
> > >
>
> > > From the client of 2.5.1 I didn't see any changes that could be made to
>
> > the
>
> > > client compression.
>
> > > Maybe you could check if the compression.type is set on the topic level
>
> > or
>
> > > the broker side?
>
> > >
>
> > > Cheers
>
> > >
>
> > > On Thu, 17 Sep 2020 at 20:19, M. Manna  wrote:
>
> > >
>
> > > > Hello,
>
> > > >
>
> > > > I am trying to understand the compression.type settings and the
> growth
>
> > > > impact from this. With a compression type (valid) settings, this
>
> > ensures
>
> > > > that the message in the topic stays compressed and has to be
>
> > decompressed
>
> > > > by the consumer.
>
> > > >
>
> > > > However, it may be possible that the compression will not happen i.e.
>
> > > NONE
>
> > > > if the uncompressed size is the same as compressed. I am trying to
> use
>
> > > this
>
> > > > command to verify:
>
> > > >
>
> > > > ./kafka-run-class kafka.tools.DumpLogSegments --files
>
> > > > /filename.log
>
> > > > --print-data-log | grep -iE "compresscodec: NONE"
>
> > > >
>
> > > > I can see a lot of entries with NONE. But all my producers are
> writing
>
> > > with
>
> > > > compression.type=GZIP. Is this expected?
>
> > > >
>
> > > > Regards,
>
> > > >
>
> > >
>
> >
>
>


Re: Compression.type=GZIP without actual compression

2020-09-17 Thread M. Manna
Hey Shaohan,

Thanks for your reply, much appreciated. We are using Kafka 2.4.1. Although
we use the Confluent platform under Confluent Licence, I don't think this
matters.

We are using compression.type=GZIP for all our producers.

We create all topics with compression.type=GZIP i.e. per topic level. When
I do a "describe" of a kafka topic, I see metadata confirming that it's
enabled for GZIP compressed data.

That's all I could say.

Regards,




On Thu, 17 Sep 2020 at 15:01, Shaohan Yin  wrote:

> Hi,
>
> Could you specify your version and the protocol of your broker?
>
> From the client of 2.5.1 I didn't see any changes that could be made to the
> client compression.
> Maybe you could check if the compression.type is set on the topic level or
> the broker side?
>
> Cheers
>
> On Thu, 17 Sep 2020 at 20:19, M. Manna  wrote:
>
> > Hello,
> >
> > I am trying to understand the compression.type settings and the growth
> > impact from this. With a compression type (valid) settings, this ensures
> > that the message in the topic stays compressed and has to be decompressed
> > by the consumer.
> >
> > However, it may be possible that the compression will not happen i.e.
> NONE
> > if the uncompressed size is the same as compressed. I am trying to use
> this
> > command to verify:
> >
> > ./kafka-run-class kafka.tools.DumpLogSegments --files
> > /filename.log
> > --print-data-log | grep -iE "compresscodec: NONE"
> >
> > I can see a lot of entries with NONE. But all my producers are writing
> with
> > compression.type=GZIP. Is this expected?
> >
> > Regards,
> >
>


Compression.type=GZIP without actual compression

2020-09-17 Thread M. Manna
Hello,

I am trying to understand the compression.type settings and the growth
impact from this. With a compression type (valid) settings, this ensures
that the message in the topic stays compressed and has to be decompressed
by the consumer.

However, it may be possible that the compression will not happen i.e. NONE
if the uncompressed size is the same as compressed. I am trying to use this
command to verify:

./kafka-run-class kafka.tools.DumpLogSegments --files  /filename.log
--print-data-log | grep -iE "compresscodec: NONE"

I can see a lot of entries with NONE. But all my producers are writing with
compression.type=GZIP. Is this expected?

Regards,


Request for Comments/Resolution - KAFKA-10465

2020-09-07 Thread M. Manna
Hello,

We do appreciate that release 2.7 is keeping us occupied, but this bug (or
not) is holding us back from making some design changes/improvements. It'd
be awesome if anyone could take a look and perhaps either, rule it out with
explanation or acknowledge the changes.

Regards,


Transactional Producers/Consumers with LSO and Last Committed Record

2020-09-07 Thread M. Manna
Hello,

I understand that a consumer with Txn Isolation level set as
"read_committed" will read till Last Stable Offset (LSO) and will not be
given any messages for aborted Txn. LSO also includes non-transactional
messages.

We have seen a behaviour where LSO didn't have any data and we had to seek
(rewind by decrementing the offsets) to be able to poll data (or, poll just
hangs forever). The reason for such seek() is that our consumer only needs
to fetch the last successfully committed record to Kafka TopicPartition.
Once we can retrieve it, we don't need to keep running the consumer (i.e.
we close it successfully).

This almost seems like a missing configuration or a bug, if the consumer
isn't going to poll forever. For example, if a TopicPartition has last
successfully committed records at offset 10, I should be able to call
seek(TopicPartition, 10) and poll for data without hanging forever. But
currently, with read_committed isolation level, we are getting offset as a
result of endOffsets(), but the last committed record is available at
offset 9.

Is this an expected behaviour? If so, does it mean our consumer use case is
unique?

Regards,


Re: Consumer endOffsets and seek making poll run forever

2020-09-05 Thread M. Manna
Firstly, Thanks very much. I think what I am struggling with is how the
offsets are fetched based on LSO and high watermark. Also, if there’re are
non-transactional producers, this code will always work with offset-1 if I
want to get the last record written.

I cannot recall which admin command I can invoke to get the list of open
transactions (or, if there’s any at all) In this way I don’t have to keep
retrying. I also think this is very ambiguous and the documentation doesn’t
is make it any easier to explain. I suppose an improvement would be to get
the min offset from open transactions, and use that.

Thanks,



On Sat, 5 Sep 2020 at 11:49, adobewjl  wrote:

> hi, i just run you code.
>
>
>
>
>
> it works and take the data at endOffsets - 100 in the first round poll.
>
> then hang at second round of poll.
>
>
>
>
>
> this may happen if there no continuously records send to the topic.
>
> and you pass the poll timeout with an Long.MAX_VALUE. the poll will just
> for new record coming.
>
> and there is no newly data,so the poll hangs.
>
>
>
>
>
> and the second question.
>
> the seek() just specify where you what to consume on topic
>
> when you seek to somewhere. next time poll is just send request data at
> where you seek.
>
>
>
>
>
> if
>
> seek 1
>
> seek 20
>
>
>
>
>
> then
>
> poll will start from 20. because the latest seek is 20.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2020-09-05 02:18:53, "M. Manna"  wrote:
>
> >Hello,
>
> >
>
> >During some tests, I can see that consumer.endOffsets() returns some valid
>
> >value, but cannot be polled definitely if seek() is called on that
>
> >consumer. In other words, poll hangs almost forever.
>
> >
>
> >My intention is to start consuming from the last successfully written data
>
> >on that TopicPartition. I am using this model because that data is written
>
> >by a different system and required for send a separate command somewhere
>
> >else.
>
> >
>
> >
>
> >Here is my code
>
> >
>
> >>
>
> >> // Prepare
>
> >> List topics = new ArrayList<>();
>
> >> topics.add(topicString);
>
> >>
>
> >> KafkaConsumer consumer = new
>
> >> KafkaConsumer<>(prepareSimpleProperties());
>
> >> TopicPartition topicPartition = new TopicPartition(topicString,
> 0);
>
> >>
>
> >>// ASSIGN
>
> >
>
> >Set assignments = consumer.assignment();
>
> >if (assignments == null || !assignments.isEmpty() ||
>
> >!assignments.contains(topicPartition)) {
>
> >consumer.assign(Collections.singleton(topicPartition));
>
> >}
>
> >
>
> >// SEEK
>
> >Map topicPartitionOffsets =
>
> >consumer.endOffsets(Collections.singleton(topicPartition));
>
> >if (topicPartitionOffsets != null) {
>
> >for (Map.Entry data:
>
> >topicPartitionOffsets.entrySet()) {
>
> >Long desiredOffset = data.getValue()-100;
>
> >consumer.seek(data.getKey(), desiredOffset < 0L ? 0L :
>
> >desiredOffset);
>
> >}
>
> >}
>
> >
>
> >// POLL
>
> >try {
>
> >while (true) {
>
> >ConsumerRecords records =
>
> >consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
>
> >for (TopicPartition partition : records.partitions()) {
>
> >List> partitionRecords
> =
>
> >records.records(partition);
>
> >for (ConsumerRecord record :
>
> >partitionRecords) {
>
> >System.out.println(record.offset() + ": " + new
>
> >String(record.value()));
>
> >}
>
> >long lastOffset =
>
> >partitionRecords.get(partitionRecords.size() - 1).offset();
>
> >consumer.commitSync();
>
> >}
>
> >}
>
> >} finally {
>
> >consumer.close();
>
> >}
>
> >}
>
> >
>
> >When I read the following from seek docs:
>
> >
>
> >*If this API is invoked for the same partition more than once, the latest
>
> >> offset will be used on the next poll(). Note that you may lose data if
> this
>
> >> API is arbitrarily used in the middle of consumption, to reset the fetch
>
> >> offsets.*
>
> >
>
> >
>
> >I am not sure I understood what it means by "Latest offset" and "more than
>
> >once". What's the best way to get this offset calculation to work
>
> >correctly? Also, is there a documentation that explains (with hopefully,
>
> >some visual aid) how LSO and smallest offset for open transaction.
>
> >Confluent or Apache Kafka any link is fine.
>
> >
>
> >I would really appreciate some explanation as I believe this could be
>
> >explained a bit better in official website.
>
> >
>
> >Regards,
>
>


Consumer endOffsets and seek making poll run forever

2020-09-04 Thread M. Manna
Hello,

During some tests, I can see that consumer.endOffsets() returns some valid
value, but cannot be polled definitely if seek() is called on that
consumer. In other words, poll hangs almost forever.

My intention is to start consuming from the last successfully written data
on that TopicPartition. I am using this model because that data is written
by a different system and required for send a separate command somewhere
else.


Here is my code

>
> // Prepare
> List topics = new ArrayList<>();
> topics.add(topicString);
>
> KafkaConsumer consumer = new
> KafkaConsumer<>(prepareSimpleProperties());
> TopicPartition topicPartition = new TopicPartition(topicString, 0);
>
>// ASSIGN

Set assignments = consumer.assignment();
if (assignments == null || !assignments.isEmpty() ||
!assignments.contains(topicPartition)) {
consumer.assign(Collections.singleton(topicPartition));
}

// SEEK
Map topicPartitionOffsets =
consumer.endOffsets(Collections.singleton(topicPartition));
if (topicPartitionOffsets != null) {
for (Map.Entry data:
topicPartitionOffsets.entrySet()) {
Long desiredOffset = data.getValue()-100;
consumer.seek(data.getKey(), desiredOffset < 0L ? 0L :
desiredOffset);
}
}

// POLL
try {
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
for (TopicPartition partition : records.partitions()) {
List> partitionRecords =
records.records(partition);
for (ConsumerRecord record :
partitionRecords) {
System.out.println(record.offset() + ": " + new
String(record.value()));
}
long lastOffset =
partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync();
}
}
} finally {
consumer.close();
}
}

When I read the following from seek docs:

*If this API is invoked for the same partition more than once, the latest
> offset will be used on the next poll(). Note that you may lose data if this
> API is arbitrarily used in the middle of consumption, to reset the fetch
> offsets.*


I am not sure I understood what it means by "Latest offset" and "more than
once". What's the best way to get this offset calculation to work
correctly? Also, is there a documentation that explains (with hopefully,
some visual aid) how LSO and smallest offset for open transaction.
Confluent or Apache Kafka any link is fine.

I would really appreciate some explanation as I believe this could be
explained a bit better in official website.

Regards,


Topic Partition Admin operation

2020-08-25 Thread M. Manna
Hello,

I tried to find this information, but may be I searched for the wrong stuff.

I am trying to identify what's the last message written on a
TopicPartition. My constraints are:

1) No Knowledge of the last offset - so I cannot use seek(TopicPartition,
long)
2) I have to retrieve the last-written data in that TopicPartition to
determine some checkpoint information.

Currently, if I do a subscribe() and poll() I get all records for all
TopicPartitions. But i would like to find out only the last item

My issue is that I need to do this only once every time my producer starts
up. I need to retrieve only the last message written on a TopicPartition
without going through all records. AFAIK, there is no mechanism in Kafka
that does exactly that.

Could anyone please help me understand?

Regards,


Re: Kafka compatibility with ZK

2020-07-23 Thread M. Manna
Hi,

AFAIK, ZK is packed with Kafka. So if you upgrade to 2.4.1 you’ll get what
is in 2.4.1.

It’s a little different however, if you’re hosting ZK in a different host
running independently of Kafka.

What’s your situation ?



On Thu, 23 Jul 2020 at 21:02, Andrey Klochkov  wrote:

> Hello,
> We're upgrading our Kafka from 1.1.0 to 2.4.1 and I'm wondering if ZK needs
> to be upgraded too (we're currently on 3.4.6). The upgrade guide says that
> "kafka has switched to the XXX version of ZK" but never says if switching
> to a newer ZK is mandatory or not. What are the guidelines on keeping Kafka
> and ZK compatible?
>
> --
> Andrey Klochkov
>


Re: User License Agreement - Kafka v.2.4.0

2020-07-01 Thread M. Manna
Hello,

Apache Kafka has both Apache 2.0 licence (a fine text at the bottom of
Kafka website), and Confluent licence (From Confluent.io).

Depending on what you use, you can get it from the appropriate site.

I hope this helps ?

Regards,

On Wed, 1 Jul 2020 at 18:10, Theresa Sowinski <
reese.sowin...@burgessgroup.com> wrote:

> May I please get a copy of the user license?  I am trying to determine
> whether your license is permissive or restrictive.
>
>
> Thank you,
> Reese
>
> Theresa 'Reese' Sowinski
> Contracts Manager
> Burgess
> 800-637-2004
> burgessgroup.com
> Note: The information, including any attachments, contained in this
> message is confidential and intended only for use by the individual(s) or
> entity(s) to which it is addressed. If you are not an intended recipient or
> if you have received this message in error, please promptly delete this
> message. Any dissemination, distribution, or copying of this information is
> strictly prohibited. Burgess, The B logo, Burgess Source, and Payment
> Accountability are trademarks of The Burgess Group, LLC and are registered
> with the U.S. Patent and Trademark Office.
>
>


Re: Broker side partition round robin

2020-05-26 Thread M. Manna
Hey Vinicius,


On Tue, 26 May 2020 at 10:27, Vinicius Scheidegger <
vinicius.scheideg...@gmail.com> wrote:

> In a scenario with multiple independent producers (imagine ephemeral
> dockers, that do not know the state of each other), what should be the
> approach for the messages being sent to be equally distributed over a topic
> partition?
>
> From what I understood the partition election is always on the Producer. Is
> this understanding correct?
>
> If that's the case, how should one achieve an equally distributed load
> balancing (round robin) over the partitions in a scenario with multiple
> producers?
>
> Thank you,
>
> Vinicius Scheidegger


 Have you checked RoundRobinPartitioner ? Also, you can always specify
which partition you are writing to, so you can control the partitioning in
your way.

Regards,

Regards,

>
>


Re: Is that a bug?

2020-05-22 Thread M. Manna
Hey Xie,

On Fri, 22 May 2020 at 08:31, Jiamei Xie  wrote:

> Hi
>
> Kill all zookeeper and kafka process. Clear zookeeper and kafka data dir.
> Restart zookeeper and kafka. If there are any active client.  Topic used by
> client will be auto-created.
>
> How to reproduce?
>
>
>   1.  Start zookeeper and kafka zookeeper and kafka config file.
>
> nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
>
> nohup bin/kafka-server-start.sh config/server.properties &
>
>
>
>   1.  Create topic test with 2 partitions
>
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test
> --partitions 2 --replication-factor 1
>
>
>
>   1.  Produce some data to topic test
>
> bin/kafka-producer-perf-test.sh --topic test --num-records 5000
> --record-size 100 --throughput=-1 --producer-props
> bootstrap.servers=localhost:9092
>
>
>
>   1.  Kill zookeeper and kafka. ProducerPerformance is still running.
>
> jps
>
> 21072 QuorumPeerMain
>
> 21704 ProducerPerformance
>
> 21230 Kafka
>
> 21854 Jps
>
> kill -9 21072 21230
>
>
>
>   1.  Remove Zookeeper and Kafka data
>
> rm -rf /tmp/zookeeper/
>
> rm -rf /tmp/kafka-logs/
>
>
>
>   1.  Start zookeeper and kafka
>
> nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
>
> nohup bin/kafka-server-start.sh config/server.properties &
>
>
>
>   1.  Check topic and you'll see there is topic named test with partition
> 1.  And the ProducerPerformance process continues to run normally.
>
> bin/kafka-topics.sh --describe --zookeeper localhost:2181
>
> Topic: test PartitionCount: 1   ReplicationFactor: 1Configs:
>
> Topic: test Partition: 0Leader: 0   Replicas: 0
>  Isr: 0
>
>
> Some output of ProducerPerformance process.
> 1995632 records sent, 399126.4 records/sec (38.06 MB/sec), 378.6 ms avg
> latency, 435.0 ms max latency.
> org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s)
> for test-1:12 ms has passed since batch creation
> .
> org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s)
> for test-1:121774 ms has passed since batch creation
> 1711254 records sent, 342250.8 records/sec (32.64 MB/sec), 2324.5 ms avg
> latency, 123473.0 ms max latency.
>
>
>
> Is that a bug?


  How have you configured your server.properties and consumer.peoperties?
Have you set auto creation ON?

Regards,

>
>
> Best Wishes,
> Jiamei
>
> IMPORTANT NOTICE: The contents of this email and any attachments are
> confidential and may also be privileged. If you are not the intended
> recipient, please notify the sender immediately and do not disclose the
> contents to any other person, use it for any purpose, or store or copy the
> information in any medium. Thank you.
>


KSQL to search for data in Kafka Topics

2020-05-19 Thread M. Manna
Hello,

I am quite new to KSQL, so apologise for misunderstanding it's concept.

I have a list of topics that I want to search data for. I am not using
stream process, but plain topics which has data retained for 14 days. All i
want to do is search for data in SQL-like way as long as it's within the 14
day window. Could I do that using KSQL, even when I am not using stream
topology?

Regards,


Re: Kafka upgrade from 0.10 to 2.3.x

2020-05-13 Thread M. Manna
I have done this before. What Matthias said below is correct.

First you’ve got to stop all apps to prevent data consumption (if that’s
what you also mean by having downtime)

Then, you can go ahead and replace the bin.

Regards,

On Tue, 12 May 2020 at 18:33, Matthias J. Sax  wrote:

> I guess you can just stop all servers, update the binaries (and
> potentially configs), and afterward restart the servers.
>
> Of course, you might want to stop all applications that connect to the
> cluster first.
>
>
> -Matthias
>
> On 5/11/20 9:50 AM, Praveen wrote:
> > Hi folks,
> >
> > I'd like to take downtime to upgrade to 2.3.x from 10.2.1. But I can't
> find
> > it in the doc for 2.3.x upgrade that I can take downtime to do this. The
> > instructions are for rolling upgrade only.
> >
> > Has anyone tried this?
> >
> > Praveen
> >
>
>


Re: Is there any difference between conflunt kafka and apache kafka?

2020-05-09 Thread M. Manna
I agree with Steve.

Also, it’s worth reading Jay’s PR last year regarding confluent community
licence.

Regards,

On Sat, 9 May 2020 at 16:14, Steven Miller  wrote:

> At the risk of starting a uh-huh-uhnt-uh battle, I would have to disagree.
> There are seriously good people at Confluent, many of whom have been
> pivotal to Kafka’s development. You can decide whether or not Confluent
> makes sense for you for other reasons, but I wouldn’t go for a blanket no,
> by any means.
>
> -Steve
>
> > On May 9, 2020, at 9:20 AM, Mich Talebzadeh 
> wrote:
> >
> > Hi,
> >
> > I would say stick to Apache Kafka.
> >
> > These days every start-up sees some opportunities to grab a piece of
> cake.
> > Often ending up re-inventing the wheel to create an add-on that mimics
> what
> > already exists and charge license fees.
> >
> > One of the most pertinent exception is Apache HBase that has been around
> > for 10 years. It is only supported as part of third-party suits of
> products
> > on Hadoop like Cloudera or Hortonworks. The great Apache community
> supports
> > it and I believe that is the reason it is so successful and widely used.
> >
> > HTH
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> > <
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >*
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
> >
> >
> >
> >> On Sat, 9 May 2020 at 09:46, wangl...@geekplus.com.cn <
> >> wangl...@geekplus.com.cn> wrote:
> >>
> >>
> >> I want to know if there's any difference between apache kafka and the
> open
> >> sourced confluent kafka ?
> >>
> >>
> >> Thanks,
> >> Lei
> >>
> >>
> >> wangl...@geekplus.com.cn
> >>
> >>
>


Re: Scaling Apache Kafka Producers & Consumers

2020-03-26 Thread M. Manna
Hey Prasad (#StayAtHomeSaveLives),

On Thu, 26 Mar 2020 at 11:19, Prasad Suhas Shembekar <
ps00516...@techmahindra.com> wrote:

> Hi,
>
> I am using Apache Kafka as a Message Broker in our application. The
> producers and consumers are running as Docker containers in Kubernetes.
> Right now, the producer publishes messages to a topic in single partition.
> While the consumer consumes it from the topic.
> As per my understanding, in Apache Kafka a single consumer from a consumer
> group can consume messages from one partition only. Meaning, if there is
> only a single partition and multiple consumers in a consumer group, only
> one consumer will consume the message and the rest will remain idle, till
> Apache Kafka does the partition rebalancing.
>

 Yes this is correct.


> As mentioned earlier, we have a single topic and single partition and
> multiple consumers in a single group. Thus we won't be able to achieve the
> horizontal scaling for message consumption.
>
> Please let me know if the above understanding is correct.
>

 Yes this is correct.

>
> I am looking out on how to create partitions dynamically in the topic, as
> and when a new consumer is added to consumer group (K8S auto scaling of
> PODS).
> Also, how to make the producer write to these different partitions created
> dynamically, without overloading few partitions.
>
> Request you to provide some inputs / suggestions on how to achieve this.
>
>  Before anyone could answer any specific use case-related questions,
perhaps you could read this
https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/

I believe this could serve as a great pointer and learning experience (it
certainly did for myself) before you could tackle more precise cases. Feel
free to follow up and share your concerns after this.


> Thanks & Regards,
> Prasad Shembekar
> Blue Marble
> WST-020, D Non-ODC, Mihan SEZ,
> Nagpur
> Extension: 6272148
> Direct: 0712-6672148
>
> 
> Disclaimer: This message and the information contained herein is
> proprietary and confidential and subject to the Tech Mahindra policy
> statement, you may review the policy at
> http://www.techmahindra.com/Disclaimer.html externally
> http://tim.techmahindra.com/tim/disclaimer.html internally within
> TechMahindra.
> 
>


Re: what happened in case of single disk failure

2020-03-12 Thread M. Manna
Please see the following link from Confluent. Also, if you register with
Confluent Technical Talks, they are running quite a lot of nice and
simplified webinar this month on Fundamentals of Kafka.

https://www.youtube.com/watch?v=ibozaujze9k

I thought the 2 part presentation was quite good (but I don't work for
Confluent :), so a disclaimer in advance).

 There is also an upcoming webinar on how Kafka is integrated in your
application/architecture.

I hope it helps.

Regards,
M. MAnna

On Thu, 12 Mar 2020 at 00:51, 张祥  wrote:

> Thanks, very helpful !
>
> Peter Bukowinski  于2020年3月12日周四 上午5:48写道:
>
> > Yes, that’s correct. While a broker is down:
> >
> > all topic partitions assigned to that broker will be under-replicated
> > topic partitions with an unmet minimum ISR count will be offline
> > leadership of partitions meeting the minimum ISR count will move to the
> > next in-sync replica in the replica list
> > if no in-sync replica exists for a topic-partitions, it will be offline
> > Setting unclean.leader.election.enable=true will allow an out-of-sync
> > replica to become a leader.
> > If topic partition availability is more important to you than data
> > integrity, you should allow unclean leader election.
> >
> >
> > > On Mar 11, 2020, at 6:11 AM, 张祥  wrote:
> > >
> > > Hi, Peter, following what we talked about before, I want to understand
> > what
> > > will happen when one broker goes down, I would say it will be very
> > similar
> > > to what happens under disk failure, except that the rules apply to all
> > the
> > > partitions on that broker instead of only one malfunctioned disk. Am I
> > > right? Thanks.
> > >
> > > 张祥  于2020年3月5日周四 上午9:25写道:
> > >
> > >> Thanks Peter, really appreciate it.
> > >>
> > >> Peter Bukowinski  于2020年3月4日周三 下午11:50写道:
> > >>
> > >>> Yes, you should restart the broker. I don’t believe there’s any code
> to
> > >>> check if a Log directory previously marked as failed has returned to
> > >>> healthy.
> > >>>
> > >>> I always restart the broker after a hardware repair. I treat broker
> > >>> restarts as a normal, non-disruptive operation in my clusters. I use
> a
> > >>> minimum of 3x replication.
> > >>>
> > >>> -- Peter (from phone)
> > >>>
> > >>>> On Mar 4, 2020, at 12:46 AM, 张祥  wrote:
> > >>>>
> > >>>> Another question, according to my memory, the broker needs to be
> > >>> restarted
> > >>>> after replacing disk to recover this. Is that correct? If so, I take
> > >>> that
> > >>>> Kafka cannot know by itself that the disk has been replaced,
> manually
> > >>>> restart is necessary.
> > >>>>
> > >>>> 张祥  于2020年3月4日周三 下午2:48写道:
> > >>>>
> > >>>>> Thanks Peter, it makes a lot of sense.
> > >>>>>
> > >>>>> Peter Bukowinski  于2020年3月3日周二 上午11:56写道:
> > >>>>>
> > >>>>>> Whether your brokers have a single data directory or multiple data
> > >>>>>> directories on separate disks, when a disk fails, the topic
> > partitions
> > >>>>>> located on that disk become unavailable. What happens next depends
> > on
> > >>> how
> > >>>>>> your cluster and topics are configured.
> > >>>>>>
> > >>>>>> If the topics on the affected broker have replicas and the minimum
> > ISR
> > >>>>>> (in-sync replicas) count is met, then all topic partitions will
> > remain
> > >>>>>> online and leaders will move to another broker. Producers and
> > >>> consumers
> > >>>>>> will continue to operate as usual.
> > >>>>>>
> > >>>>>> If the topics don’t have replicas or the minimum ISR count is not
> > met,
> > >>>>>> then the topic partitions on the failed disk will be offline.
> > >>> Producers can
> > >>>>>> still send data to the affected topics — it will just go to the
> > online
> > >>>>>> partitions. Consumers can still consume data from the online
> > >>> partitions.
> > >>>>>>
> > >>>>>> -- Peter
> > >>>>>>
> > >>>>>>>> On Mar 2, 2020, at 7:00 PM, 张祥 
> wrote:
> > >>>>>>>>
> > >>>>>>>> Hi community,
> > >>>>>>>>
> > >>>>>>>> I ran into disk failure when using Kafka, and fortunately it did
> > not
> > >>>>>> crash
> > >>>>>>> the entire cluster. So I am wondering how Kafka handles multiple
> > >>> disks
> > >>>>>> and
> > >>>>>>> it manages to work in case of single disk failure. The more
> > detailed,
> > >>>>>> the
> > >>>>>>> better. Thanks !
> > >>>>>>
> > >>>>>
> > >>>
> > >>
> >
> >
>


Re: Problems when Consuming from multiple Partitions

2020-03-05 Thread M. Manna
Hi James,

3 Consumers in a group means you are having 20 partitions per consumer (as
per your 60 partition and 1 CGroup setup), 5 means 12. There's nothing
special about these numbers as you also noticed.
Have you tried setting fetch.max.wait.ms = 0 and see whether that's making
a difference for you?

Thanks,


On Thu, 5 Mar 2020 at 03:43, James Olsen  wrote:

> I’m seeing behaviour that I don’t understand when I have Consumers
> fetching from multiple Partitions from the same Topic.  There are two
> different conditions arising:
>
> 1. A subset of the Partitions allocated to a given Consumer not being
> consumed at all.  The Consumer appears healthy, the Thread is running and
> logging activity and is successfully processing records from some of the
> Partitions it has been assigned.  I don’t think this is due to the first
> Partition fetched filling a Batch (KIP-387).  The problem does not occur if
> we have a particular number of Consumers (3 in this case) but it has failed
> with a range of other larger values.  I don’t think there is anything
> special about 3 - it just happens to work OK with that value although it is
> the same as the Broker and Replica count.  When we tried 6, 5 Consumers
> were fine but 1 exhibited this issue.
>
> 2. Up to a half second delay between Producer sending and Consumer
> receiving a message.  This looks suspiciously like the fetch.max.wait.ms=500
> but we also have fetch.min.bytes=1 so should get messages as soon as
> something is available.  The only explanation I can think of is if the
> fetch.max.wait.ms is applied in full to the first Partition checked and
> it remains empty for the duration.  Then it moves on to a subsequent
> non-empty Partition and delivers messages from there.
>
> Our environment is AWS MSK (Kafka 2.2.1) and Kafka Java client 2.4.0.
>
> All environments appear healthy and under light load, e.g. clients only
> operating at a 1-2% CPU, Brokers (3) at 5-10% CPU.   No swap, no crashes,
> no dead threads etc.
>
> Typical scenario is a Topic with 60 Partitions, 3 Replicas and a single
> ConsumerGroup with 5 Consumers.  The Partitioning is for semantic purposes
> with the intention being to add more Consumers as the business grows and
> load increases.  Some of the Partitions are always empty due to using short
> string keys and the default Partitioner - we will probably implement a
> custom Partitioner to achieve better distribution in the near future.
>
> I don’t have access to the detailed JMX metrics yet but am working on that
> in the hope it will help diagnose.
>
> Thoughts and advice appreciated!


Re: Kafka clustering issue

2020-02-13 Thread M. Manna
Hi,

On Thu, 13 Feb 2020 at 12:43, Chikulal C  wrote:

>
>1. Turned off node1 and node 2 (expected vs. actual)
>1. expected: Message publish failure with following warnings ( in
>   producer )
>   1. Connection to node 0 could not be established. Broker may not be
>  available.
>  Connection to node 1 could not be established. Broker may not be
>  available.
>  2. actual: The same
>   2. Turned on node 1
>1. expected: No warnings and should  publish data to topic
>   2. actual: The same
>
>  At this stage, what can you see regarding your topic partitions - who is
leader/follower?

>
>1.
>1. Turned off node 1
>   1. expected: Message failure with following warnings
>  1. Connection to node 0 could not be established. Broker may not
>  be available.
>  Connection to node 1 could not be established. Broker may not be
>  available.
>  2. actual: Message failure with only one type of warning ( It
>   should warn me that both nodes are down )
>   1. Connection to node 0 could not be established. Broker may not be
>  available.
>
>  Why do you think this is wrong? You have turned off the same node that
you turned back on? Or, am I missing something here?

>
>1.
>   1. Turned on node 2
>1. expected: No warnings and should  publish data to topic
>   2. actual: Message failure with only one type of warning
>  1. Connection to node 0 could not be established. Broker may not
>  be available.
>
>
>  See my previous comments. I think you may have either misconfigured one
of your tests, or have not explained the steps correctly. See my
highlighted text above

>
>1.
>
>
> Here when you turn on node 2 in step 4, I would like to have my cluster
> up, since one of the broker is up. But it is not happening.
> --
> *From:* M. Manna 
> *Sent:* 13 February 2020 17:55
> *To:* Chikulal C 
> *Cc:* Kafka Users 
> *Subject:* Re: Kafka clustering issue
>
> My apologies as I misread one of the steps you mentioned in your original
> email.
>
> Could you kindly mention what you are seeing as per your order of failover
> tests?
>
>   1.   Turned off node1 and node 2 (expected vs. actual)
>   2.   Turned on node 1 (expected vs actual)
>   3.   Turned off node 1 (expected vs actual)
>   4.   Turned on node 2  (expected vs actual)
>
>
> Thanks,
>
> On Thu, 13 Feb 2020 at 12:06, Chikulal C  wrote:
>
> Hi,
>
>
> I tried setting transaction.state.log.min.isr=1. But the issue still
> exists.
>
>
> I am also getting one warning after doing step 3 (with
> transaction.ate.log.min.isr=1) and producing some data on the topic as
> given below.
>
>
> [Producer clientId=producer-1] 2 partitions have leader brokers without a
> matching listener, including [topic2-0, topic2-1]
>
>
> But I was not facing this issue when transaction.state.log.min.isr was 2.
> This warning also leads to failure from the producer side to put data on
> the topic.
>
>
> Are there any other things I have to check?
>
>
> Thanks
>
>
> --
> *From:* M. Manna 
> *Sent:* 13 February 2020 16:35
> *To:* Kafka Users 
> *Subject:* Re: Kafka clustering issue
>
> This could be because you have set your transaction.ate.log.min.isr=2. Have
> you tried with setting this to 1?
>
> Also, please note that if your min.insync.replica=1, and you only have 2
> nodes, you would only have a guarantee from 1 brokers to have the messages
> - but if the same broker fails then you may see issues.
>
> On Thu, 13 Feb 2020 at 10:40, Chikulal C 
> wrote:
>
> > Hi,
> >
> > I am facing an issue with the Kafka clustering setup that I have. I have
> a
> > Kafka cluster with two broker that are connected to two zookeepers. I am
> > posting data to a topic that have replication factor and partition two
> each
> > with a spring boot Kafka producer and consuming the same with another
> > spring boot app.
> >
> > I found one strange behavior when testing the cluster in the following
> > manner -
> >
> >   1.   Turned off node1 and node 2
> >   2.   Turned on node 1
> >   3.   Turned off node 1
> >   4.   Turned on node 2
> >
> > After turning on node 2 Kafka cluster got failed and I am not able to
> > produce data to Kafka. My consumer started throwing the message
> > continuously as given below.
> >
> >  [Producer clientId=producer-1] Connection to node 1 (/server1-ip:9092)
> > could not be established. Broker may not be a

Re: Kafka clustering issue

2020-02-13 Thread M. Manna
My apologies as I misread one of the steps you mentioned in your original
email.

Could you kindly mention what you are seeing as per your order of failover
tests?

  1.   Turned off node1 and node 2 (expected vs. actual)
  2.   Turned on node 1 (expected vs actual)
  3.   Turned off node 1 (expected vs actual)
  4.   Turned on node 2  (expected vs actual)


Thanks,

On Thu, 13 Feb 2020 at 12:06, Chikulal C  wrote:

> Hi,
>
>
> I tried setting transaction.state.log.min.isr=1. But the issue still
> exists.
>
>
> I am also getting one warning after doing step 3 (with
> transaction.ate.log.min.isr=1) and producing some data on the topic as
> given below.
>
>
> [Producer clientId=producer-1] 2 partitions have leader brokers without a
> matching listener, including [topic2-0, topic2-1]
>
>
> But I was not facing this issue when transaction.state.log.min.isr was 2.
> This warning also leads to failure from the producer side to put data on
> the topic.
>
>
> Are there any other things I have to check?
>
>
> Thanks
>
>
> --
> *From:* M. Manna 
> *Sent:* 13 February 2020 16:35
> *To:* Kafka Users 
> *Subject:* Re: Kafka clustering issue
>
> This could be because you have set your transaction.ate.log.min.isr=2. Have
> you tried with setting this to 1?
>
> Also, please note that if your min.insync.replica=1, and you only have 2
> nodes, you would only have a guarantee from 1 brokers to have the messages
> - but if the same broker fails then you may see issues.
>
> On Thu, 13 Feb 2020 at 10:40, Chikulal C 
> wrote:
>
> > Hi,
> >
> > I am facing an issue with the Kafka clustering setup that I have. I have
> a
> > Kafka cluster with two broker that are connected to two zookeepers. I am
> > posting data to a topic that have replication factor and partition two
> each
> > with a spring boot Kafka producer and consuming the same with another
> > spring boot app.
> >
> > I found one strange behavior when testing the cluster in the following
> > manner -
> >
> >   1.   Turned off node1 and node 2
> >   2.   Turned on node 1
> >   3.   Turned off node 1
> >   4.   Turned on node 2
> >
> > After turning on node 2 Kafka cluster got failed and I am not able to
> > produce data to Kafka. My consumer started throwing the message
> > continuously as given below.
> >
> >  [Producer clientId=producer-1] Connection to node 1 (/server1-ip:9092)
> > could not be established. Broker may not be available.
> >
> > Issue is visible in both nodes. But if I kept both system up for a while
> > issue will get resolved and I can turn off any of the node without
> breaking
> > the cluster.
> > My broker configuration is as below.
> >
> > broker.id=0
> > listeners=PLAINTEXT://server1-ip:9092
> > advertised.listeners=PLAINTEXT://serever1-ip:9092
> > num.network.threads=3
> > num.io.threads=8
> > socket.send.buffer.bytes=102400
> > socket.receive.buffer.bytes=102400
> > socket.request.max.bytes=104857600
> > log.dirs=/home/user/kafka/data/kafka-logs
> > num.partitions=1
> > num.recovery.threads.per.data.dir=2
> > offsets.topic.replication.factor=2
> > transaction.state.log.replication.factor=2
> > transaction.state.log.min.isr=2
> > log.retention.hours=168
> > log.segment.bytes=1073741824
> > log.retention.check.interval.ms=30
> > zookeeper.connect=serever1-ip:2181,serever2-ip:2181
> > zookeeper.connection.timeout.ms=6000
> > group.initial.rebalance.delay.ms=3000
> > auto.leader.rebalance.enable=true
> > leader.imbalance.check.interval.seconds=5
> >
> > Zookeeper configuration
> >
> > dataDir=/home/user/kafka/data
> > clientPort=2181
> > maxClientCnxns=0
> > initLimit=10
> > syncLimit=5
> > tickTime=2000
> > server.1=server1-ip:2888:3888
> > server.2=server2-ip:2888:3888
> >
> > Is this is an expected behavior of Kafka or am I doing something wrong
> > with this configuration ?
> >
> > Can somebody help me with this issue ..
> >
> > Thanks in advance.
> >
> >
> >
>


Re: Kafka clustering issue

2020-02-13 Thread M. Manna
This could be because you have set your transaction.ate.log.min.isr=2. Have
you tried with setting this to 1?

Also, please note that if your min.insync.replica=1, and you only have 2
nodes, you would only have a guarantee from 1 brokers to have the messages
- but if the same broker fails then you may see issues.

On Thu, 13 Feb 2020 at 10:40, Chikulal C 
wrote:

> Hi,
>
> I am facing an issue with the Kafka clustering setup that I have. I have a
> Kafka cluster with two broker that are connected to two zookeepers. I am
> posting data to a topic that have replication factor and partition two each
> with a spring boot Kafka producer and consuming the same with another
> spring boot app.
>
> I found one strange behavior when testing the cluster in the following
> manner -
>
>   1.   Turned off node1 and node 2
>   2.   Turned on node 1
>   3.   Turned off node 1
>   4.   Turned on node 2
>
> After turning on node 2 Kafka cluster got failed and I am not able to
> produce data to Kafka. My consumer started throwing the message
> continuously as given below.
>
>  [Producer clientId=producer-1] Connection to node 1 (/server1-ip:9092)
> could not be established. Broker may not be available.
>
> Issue is visible in both nodes. But if I kept both system up for a while
> issue will get resolved and I can turn off any of the node without breaking
> the cluster.
> My broker configuration is as below.
>
> broker.id=0
> listeners=PLAINTEXT://server1-ip:9092
> advertised.listeners=PLAINTEXT://serever1-ip:9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/home/user/kafka/data/kafka-logs
> num.partitions=1
> num.recovery.threads.per.data.dir=2
> offsets.topic.replication.factor=2
> transaction.state.log.replication.factor=2
> transaction.state.log.min.isr=2
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> zookeeper.connect=serever1-ip:2181,serever2-ip:2181
> zookeeper.connection.timeout.ms=6000
> group.initial.rebalance.delay.ms=3000
> auto.leader.rebalance.enable=true
> leader.imbalance.check.interval.seconds=5
>
> Zookeeper configuration
>
> dataDir=/home/user/kafka/data
> clientPort=2181
> maxClientCnxns=0
> initLimit=10
> syncLimit=5
> tickTime=2000
> server.1=server1-ip:2888:3888
> server.2=server2-ip:2888:3888
>
> Is this is an expected behavior of Kafka or am I doing something wrong
> with this configuration ?
>
> Can somebody help me with this issue ..
>
> Thanks in advance.
>
>
>


Re: Zookeeper uses TCP 8080 from 2.4.0

2020-02-06 Thread M. Manna
Apologies but we think we've found the information  here (Jetty based admin
server)

http://kafka.apache.org/documentation/#Additional+Upgrade+Notes

Sorry for spamming.

Regards,




On Thu, 6 Feb 2020 at 12:31, M. Manna  wrote:

> Hey all,
>
> We have a test ecosystem which uses two app instances with port 80 and
> 8080 engaged. Since, Kafka 2.4.0 we are always having issues since
> Zookeeper is using port 8080.
>
> We are not sure why this is the case since port 8080 usage was never there
> before 2.4.0. Before we dig into the code, could someone please confirm
> whether this is intentional?
>
> Regards,
>


Zookeeper uses TCP 8080 from 2.4.0

2020-02-06 Thread M. Manna
Hey all,

We have a test ecosystem which uses two app instances with port 80 and 8080
engaged. Since, Kafka 2.4.0 we are always having issues since Zookeeper is
using port 8080.

We are not sure why this is the case since port 8080 usage was never there
before 2.4.0. Before we dig into the code, could someone please confirm
whether this is intentional?

Regards,


Eclipse Scala IDE with MacOS (Catalina) and OpenJDK11

2020-01-31 Thread M. Manna
Hey Dev Group,

Apologies if this was meant to be sent for dev group, so I thought I would
put it out here.

It seems that I cannot get Eclipse Scala to start with MacOS (Catalina). I
must be honest, that I haven't tried it on Mac before. Additionally, I am
using OpenJDK 11. I have put it inside the /Library/Java/OpenJDK/
subdirectory and confirmed that Java Path and executables are discovered
correctly.

 Any pointers from folks out there who are using Eclipse Scala IDE with Mac
and OpenJDK 11 would be appreciated. If I get it it to work, I would
probably note this down on Cwiki for all future comms.

Regards,
M. Manna


Re: Kafka consumers freeze

2020-01-31 Thread M. Manna
Hey Tim

On Fri, 31 Jan 2020 at 13:06, Sullivan, Tim 
wrote:

>
>
> Is there a way I can proactively check my consumers to see if
> they are consuming? Periodically some or all of my consumers stop
> consuming. The only way I am made aware of this is when my down stream
> feeds folks alert me that their data isn’t flowing into Kafka. My normal
> solution is to bump the kafka servers and then they begin to consume.
>
>
>
> Any help will be greatly appreciated.
>

 Does using Burrow  help you?

Also, using cruise-control  you
could dynamically rebalance the workload, if some consumers are not in
steady state.

Regards,


>
>
>
> Tim Sullivan
>
> Be Well
>
>
>
> Sr. Software Engineer
>
> Supply Chain – IT | Data & Analytics
>
> The Home Depot
>
> 2250 Newmarket Parkway | Atlanta, GA 30339
>
>
>
> Work Cell 470-455-8346
>
> Personal Cell 678-525-2583
>
> Home Phone:  770-945-3315
>
>
>
> [image: signature_1339376142]
>
>
>
>
>
> --
>
> The information in this Internet Email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this Email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful. When addressed
> to our clients any opinions or advice contained in this Email are subject
> to the terms and conditions expressed in any applicable governing The Home
> Depot terms of business or client engagement letter. The Home Depot
> disclaims all responsibility and liability for the accuracy and content of
> this attachment and for any damages or losses arising from any
> inaccuracies, errors, viruses, e.g., worms, trojan horses, etc., or other
> items of a destructive nature, which may be contained in this attachment
> and shall not be liable for direct, indirect, consequential or special
> damages in connection with this e-mail message or its attachment.
>


Re: How to change/increase ISR

2020-01-30 Thread M. Manna
Hey Upendra,

https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools

The above should guide you through the reassignment of partitions/replicas.

Also, you should read about

offset.topic.num.partitions
offset.topic.replication.factor

I hope this helps you.

Regards,



On Thu, 30 Jan 2020 at 21:48, Upendra Yadav  wrote:

> Hi Team,
>
> Is there way to change ISR for existing topics.
> I want this for user topics as well as for __consumer_offset topic.
>
> By mistake, __consumer_offset topic was configured with 1 replication
> factor and 1 ISR.
>
> kafka broker and client version: 0.10.0.1
>
> Thanks,
> Upendra
>


Re: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

2020-01-27 Thread M. Manna
Hey Buks,

On Mon, 27 Jan 2020 at 07:51,  wrote:

>
>
> Hi, I would appreciate any help on this.
>
> Thanks a stack!
>
> Buks
>
>
>
> 
> org.apache.kafka
> kafka-clients
> ${kafka.version}
> 
>
>
>
> 2.3.1
>
>
>
> 
> org.springframework.boot
> spring-boot-starter-parent
> 2.2.4.RELEASE
>  
> 
>
>
>
>
>
> Sent from Mail  for
> Windows 10
>
>
>

Seems like you’re using Spring boot with Kafka. Did you manage to start in
debug mode to see where the hickup is? Also, is that the only stack trace
info ?

>


Re: min.insync.replicas and producer acks

2020-01-25 Thread M. Manna
Pushkar,

On Sat, 25 Jan 2020 at 11:19, Pushkar Deole  wrote:

> Thank you for a quick response.
>
> What would happen if I set the producer acks to be 'one' and
> min.insync.replicas to 2. In this case the producer will return when only
> leader received the message but will not wait for other replicas to receive
> the message. In this case, how min.insync.replicas of 2 will be guaranteed
> by kafka?



>  To be fair, this is documented across various areas on official channels,
confluence pages, and confluent websites.

We suggest that you explore them first to understand where the confusion is.

If you believe something is incorrectly documented, or rather ambiguous, we
will be happy to explain.

Regards,


> On Sat, Jan 25, 2020 at 12:50 PM Boyang Chen 
> wrote:
>
> > Hey Pushkar,
> >
> > producer ack only has 3 options: none, one, or all. You could not
> nominate
> > an arbitrary number.
> >
> > On Fri, Jan 24, 2020 at 7:53 PM Pushkar Deole 
> > wrote:
> >
> > > Hi All,
> > >
> > > I am a bit confused about min.insync.replicas and producer acks. Are
> > these
> > > two configurations achieve the same thing? e.g. if I set
> > > min.insync.replicas to 2, I can also achieve it by setting producer
> acks
> > to
> > > 2 so the producer won't get a ack until 2 replicas received the
> message?
> > >
> >
>


Re: User interface for kafka: Management/administration

2020-01-23 Thread M. Manna
Thanks Robin - looks nice!

On Thu, 23 Jan 2020 at 09:36, Robin Moffatt  wrote:

> There's a good presentation from Stephane Maarek that covers tooling,
> including UIs:
>
> https://www.confluent.io/kafka-summit-lon19/show-me-kafka-tools-increase-productivity/
>
> You'll find some projects that need to be built also ship with Docker
> images that you can then just run.
>
> Here's a useful list of projects to look at:
> https://dev.to/guyshiloizy/kafka-administration-and-monitoring-ui-tools-hf4
>
> --
>
> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>
>
> On Thu, 23 Jan 2020 at 04:10, Sunil CHAUDHARI
>  wrote:
>
> > Hi all,
> > Please help me to get some user interface for management and
> > administration of my kafka cluster.
> > There are some open source available, but they either have some
> > dependencies or those need to be built before running.
> > Is there any pre-build(ready to use package) which I can just download
> and
> > run?
> > Our environment have many restrictions, so its difficult to
> > download/install dependencies.
> >
> > I hope you guys understood my problem.
> >
> > Regards,
> > Sunil.
> >
> > CONFIDENTIAL NOTE:
> > The information contained in this email is intended only for the use of
> > the individual or entity named above and may contain information that is
> > privileged, confidential and exempt from disclosure under applicable law.
> > If the reader of this message is not the intended recipient, you are
> hereby
> > notified that any dissemination, distribution or copying of this
> > communication is strictly prohibited. If you have received this message
> in
> > error, please immediately notify the sender and delete the mail. Thank
> you.
> >
>


Re: User interface for kafka: Management/administration

2020-01-23 Thread M. Manna
It depends.

Kafka-webview is excellent for managing messages etc. I use it for our
preprod monitoring.

LinkedIn CruiseControl is a defacto for managing performance related
thresholds (goals)

LinkedIn burrow is good for consumer lag monitoring.

And all the above are free.

Regards,

On Thu, 23 Jan 2020 at 08:38, Antonios Chalkiopoulos 
wrote:

> Have you tried out https://lenses.io  ?
>
> Contains both management/admin capabilities but also monitoring/security
> and SQLOps
> and is available as a single download
>
> Here’s a quick video on how it looks 
>
>
> > On 23 Jan 2020, at 06:17, Ashutosh singh  wrote:
> >
> > You can use Yahoo kafka-manager . Download and Run.
> > https://github.com/yahoo/CMAK
> >
> > On Thu, Jan 23, 2020 at 11:02 AM Stephen Powis
> >  wrote:
> >
> >> Definitely more focused on the consuming side vs management, but has
> some
> >> management tooling and has a binary distribution:
> >> https://github.com/sourcelaborg/kafka-webview
> >>
> >> On Thu, Jan 23, 2020 at 1:10 PM Sunil CHAUDHARI
> >>  wrote:
> >>
> >>> Hi all,
> >>> Please help me to get some user interface for management and
> >>> administration of my kafka cluster.
> >>> There are some open source available, but they either have some
> >>> dependencies or those need to be built before running.
> >>> Is there any pre-build(ready to use package) which I can just download
> >> and
> >>> run?
> >>> Our environment have many restrictions, so its difficult to
> >>> download/install dependencies.
> >>>
> >>> I hope you guys understood my problem.
> >>>
> >>> Regards,
> >>> Sunil.
> >>>
> >>> CONFIDENTIAL NOTE:
> >>> The information contained in this email is intended only for the use of
> >>> the individual or entity named above and may contain information that
> is
> >>> privileged, confidential and exempt from disclosure under applicable
> law.
> >>> If the reader of this message is not the intended recipient, you are
> >> hereby
> >>> notified that any dissemination, distribution or copying of this
> >>> communication is strictly prohibited. If you have received this message
> >> in
> >>> error, please immediately notify the sender and delete the mail. Thank
> >> you.
> >>>
> >>
> >
> >
> > --
> > Thanx & Regard
> > Ashutosh Singh
> > 08151945559
>
>


Kafka Debug Instructions Updated on Cwiki

2020-01-20 Thread M. Manna
Hey all,

I meant to do this a while back, so apologies for the delay.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=145722808

The above has a working instruction on how to start debugger in Eclipse
Scala. I have't proofread the text, but the solution works correctly. If
anyone wants to extend it by adding IntelliJ or other variants please feel
free to do so.

Also, any feedback/comments are welcome.

Regards,


Re: Kafka encoding UTF-8 problem

2020-01-17 Thread M. Manna
Hi Marco,

On Fri, 17 Jan 2020 at 13:08, Marco Di Falco 
wrote:

> Hi.
> I'm sorry, I'm new to kafka and I'm afraid I haven't understood your
> question.
> How do I try a serialization / deserialization if I communicate in two
> windows shell? I have no explicit serialization / deserialization,
> communication is transparent to me.
> I think I have to set a kafka properties for this problem, but I don't
> know what it can be.
> Thank you
>
> -----Messaggio originale-
> Da: M. Manna 
> Inviato: venerdì 17 gennaio 2020 13:25
> A: users@kafka.apache.org
> Oggetto: Re: Kafka encoding UTF-8 problem
>
> Hi,
>
>
> On Fri, 17 Jan 2020 at 11:18, Marco Di Falco 
> wrote:
>
> > Hello guys!
> > I have a producer and consumer running in a windows shell.
> > I write this message ‘questo è un test’ and in consumer receive this:
> > “questo ´┐¢ un test” .
> >
> > What properties should I use to set up character coding in utf-8?
> > thank you
> > Marco
>
>
> Have you tried using UTF-8 bytes when serialising/deserialising ?
>
> >
> >
>

 I think if you follow official Kafka docs - it explains quite nicely how
to set this up.


Re: Kafka encoding UTF-8 problem

2020-01-17 Thread M. Manna
Hi,


On Fri, 17 Jan 2020 at 11:18, Marco Di Falco 
wrote:

> Hello guys!
> I have a producer and consumer running in a windows shell.
> I write this message ‘questo è un test’ and in consumer receive this:
> “questo ´┐¢ un test” .
>
> What properties should I use to set up character coding in utf-8?
> thank you
> Marco


Have you tried using UTF-8 bytes when serialising/deserialising ?

>
>


Re: Kafka send() call, ack, and message writes to the filesystem

2020-01-16 Thread M. Manna
Andiya,

On Wed, 15 Jan 2020 at 21:59, Anindya Haldar 
wrote:

> Okay, let’s say
>
> - the application is using a non-transactional producer, shared across
> multiple threads
> - the linger.ms and buffer.memory is non-zero, and so is batch.size such
> that messages are actually batched
> - the replication factor is 3
> - the minimum number of ISRs is 2
> - the parameter ack is set to ‘all’
>
> Now the application calls send(), get a future back, and then calls get()
> on the future. At some point (driven by the batching related parameters and
> a number of other factors) the get() call to the future returns
> successfully.
>
> Precisely at this point does Kafka guarantee that the message has been
> persisted to the leader’s and all the ISRs’ logs? By persisted, I mean
> written to the replication logs, but may or may not yet have been committed
> to the storage media by the fsync() call.
>
  At this stage the ISRs (this includes leader) are all acknowledged.

>
> If the answer is yes, it looks good from here. If the answer is no, then
> what else does the application need to do?
>
> Sincerely,
> Anindya Haldar
> Oracle Responsys
>
>
> > On Jan 15, 2020, at 12:31 PM, M. Manna  wrote:
> >
> > Hey Anindya,
> >
> >
> >
> > On Wed, 15 Jan 2020 at 18:23, Anindya Haldar 
> > wrote:
> >
> >> Thanks for the response.
> >>
> >> Essentially, we are looking for a confirmation that a send
> acknowledgement
> >> received at the client’s end will ensure the message is indeed
> persisted to
> >> the replication logs. We initially wondered whether the client has to
> make
> >> an explicit flush() call or whether it has to commit a producer
> transaction
> >> for that to happen. Based upon what I understand now from your
> response, a
> >> flush() or commitTransaction() call should not be necessary for this,
> and a
> >> send acknowledgement via the successful return from the get() call on
> the
> >> future will ensure the persistence of the message.
> >>
> >> Please feel free to correct me if I didn’t get it right.
> >>
> >
> > I'm sure you have done the reading, but to be in context of your
> question,
> > *commitTransaction()* is sufficient on it's own (see excerpt from
> *flush()*
> > doc below)
> >
> > *Applications don't need to call this method for transactional producers,
> >> since the commitTransaction() will flush all buffered records before
> >> performing the commit. This ensures that all the send(ProducerRecord)
> calls
> >> made since the previous beginTransaction() are completed before the
> >> commit. *
> >
> >
> >  But you *do *need to call commitTransaction() (for txn based producers),
> > or flush() (for normal cases) to send the records *immediately*.
> Otherwise,
> > they will be sent when the data buffer is full (re: buffer.memory and
> > linger.ms).
> >
> >  If you want to know more about transactions, there are some nice
> articles
> > regarding txn producers
> >
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_Transactional-2BMessaging-2Bin-2BKafka=DwIFaQ=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI=qtRoal09Ax8f1wskhpGkLJz8loX98EAVCX95pMjnI8s=laTP9-1xOTyb1L9AFMVLYSlvZE-nfgJ7N4rsL3NyZvU=
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__www.confluent.io_blog_transactions-2Dapache-2Dkafka_=DwIFaQ=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI=qtRoal09Ax8f1wskhpGkLJz8loX98EAVCX95pMjnI8s=SMCrXdI5TvfT6FEiqpQAA_8f8x8RA2MRFzrOKJmCFFc=
> >
> > Also, if you are interested to become more technical, please check the
> > codebase for KafkaProducer and see what doSend() and wakeup() is doing:
> >
> >
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_kafka_blob_5c00191ea957fef425bf5dbbe47d70e41249e2d6_clients_src_main_java_org_apache_kafka_clients_producer_KafkaProducer.java-23L832=DwIFaQ=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI=qtRoal09Ax8f1wskhpGkLJz8loX98EAVCX95pMjnI8s=xruUiNP1BFXu6CziC0aB00HcoX7GyH8HNalyLp-CYlI=
> >
> > I hope this helps.
> >
> > Regards,
> >
> >>
> >> Sincerely,
> >> Anindya Haldar
> >> Oracle Responsys
> >>
> >>
> >>> On Jan 15, 2020, at 8:55 AM, M. Manna  wrote:
> >>>
> >>> Anindya,
> >>>
> >>> On Wed, 15 Jan 2020 at 16:49, Anindya Haldar <
> anind

Re: Kafka send() call, ack, and message writes to the filesystem

2020-01-15 Thread M. Manna
Hey Anindya,



On Wed, 15 Jan 2020 at 18:23, Anindya Haldar 
wrote:

> Thanks for the response.
>
> Essentially, we are looking for a confirmation that a send acknowledgement
> received at the client’s end will ensure the message is indeed persisted to
> the replication logs. We initially wondered whether the client has to make
> an explicit flush() call or whether it has to commit a producer transaction
> for that to happen. Based upon what I understand now from your response, a
> flush() or commitTransaction() call should not be necessary for this, and a
> send acknowledgement via the successful return from the get() call on the
> future will ensure the persistence of the message.
>
> Please feel free to correct me if I didn’t get it right.
>

 I'm sure you have done the reading, but to be in context of your question,
*commitTransaction()* is sufficient on it's own (see excerpt from *flush()*
doc below)

*Applications don't need to call this method for transactional producers,
> since the commitTransaction() will flush all buffered records before
> performing the commit. This ensures that all the send(ProducerRecord) calls
> made since the previous beginTransaction() are completed before the
> commit. *


  But you *do *need to call commitTransaction() (for txn based producers),
or flush() (for normal cases) to send the records *immediately*. Otherwise,
they will be sent when the data buffer is full (re: buffer.memory and
linger.ms).

  If you want to know more about transactions, there are some nice articles
regarding txn producers

https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
  https://www.confluent.io/blog/transactions-apache-kafka/

 Also, if you are interested to become more technical, please check the
codebase for KafkaProducer and see what doSend() and wakeup() is doing:


https://github.com/apache/kafka/blob/5c00191ea957fef425bf5dbbe47d70e41249e2d6/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L832

 I hope this helps.

 Regards,

>
> Sincerely,
> Anindya Haldar
> Oracle Responsys
>
>
> > On Jan 15, 2020, at 8:55 AM, M. Manna  wrote:
> >
> > Anindya,
> >
> > On Wed, 15 Jan 2020 at 16:49, Anindya Haldar 
> > wrote:
> >
> >> In our case, the minimum in-sync replicas is set to 2.
> >>
> >> Given that, what will be expected behavior for the scenario I outlined?
> >>
> >
> > This means you will get confirmation when 2 of them have acknowledged. so
> > you will always have 2 in-sync.
> >
> > Perhaps drilling each detail and having a long thread, you could explain
> > what is it you are trying to investigate/identify? We will be happy to
> help.
> >
> > Regards,
> >
> >
> >> Sincerely,
> >> Anindya Haldar
> >> Oracle Responsys
> >>
> >>
> >>> On Jan 15, 2020, at 6:38 AM, Ismael Juma  wrote:
> >>>
> >>> To all the in-sync replicas. You can set the minimum number of in-sync
> >>> replicas via the min.insync.replicas topic/broker config.
> >>>
> >>> Ismael
> >>>
> >>> On Tue, Jan 14, 2020 at 11:11 AM Anindya Haldar <
> >> anindya.hal...@oracle.com>
> >>> wrote:
> >>>
> >>>> I have a question related to the semantics of a producer send and the
> >> get
> >>>> calls on the future returned by the send call.
> >>>>
> >>>> - It is a Java application, using the Kafka Java client library
> >>>> - The application is set up to use 3 replicas and using acks=all for
> the
> >>>> producer
> >>>> - the application is using a non-zero value for linger.ms and
> >> batch.size
> >>>> parameters
> >>>> - The application is using a single non-transactional Kafka producer
> >>>> instance, shared across a number of threads
> >>>>
> >>>> With that,
> >>>>
> >>>> - Any application thread makes a send() call on the producer.
> >>>> - Then the same thread calls get() on the future returned by the last
> >>>> send() call
> >>>> - The get() call on the future returns after it gets the
> acknowledgement
> >>>> from the system for the message send
> >>>>
> >>>> At this point, is it guaranteed that the message has actually been
> >> written
> >>>> (but may not be committed by calling fsync) to ALL of the replicas’
> >>>> filesystems?
> >>>>
> >>>> Sincerely,
> >>>> Anindya Haldar
> >>>> Oracle Responsys
> >>>>
> >>>>
> >>
> >>
>
>


Re: Kafka send() call, ack, and message writes to the filesystem

2020-01-15 Thread M. Manna
Anindya,

On Wed, 15 Jan 2020 at 16:49, Anindya Haldar 
wrote:

> In our case, the minimum in-sync replicas is set to 2.
>
> Given that, what will be expected behavior for the scenario I outlined?
>

 This means you will get confirmation when 2 of them have acknowledged. so
you will always have 2 in-sync.

 Perhaps drilling each detail and having a long thread, you could explain
what is it you are trying to investigate/identify? We will be happy to help.

Regards,


> Sincerely,
> Anindya Haldar
> Oracle Responsys
>
>
> > On Jan 15, 2020, at 6:38 AM, Ismael Juma  wrote:
> >
> > To all the in-sync replicas. You can set the minimum number of in-sync
> > replicas via the min.insync.replicas topic/broker config.
> >
> > Ismael
> >
> > On Tue, Jan 14, 2020 at 11:11 AM Anindya Haldar <
> anindya.hal...@oracle.com>
> > wrote:
> >
> >> I have a question related to the semantics of a producer send and the
> get
> >> calls on the future returned by the send call.
> >>
> >> - It is a Java application, using the Kafka Java client library
> >> - The application is set up to use 3 replicas and using acks=all for the
> >> producer
> >> - the application is using a non-zero value for linger.ms and
> batch.size
> >> parameters
> >> - The application is using a single non-transactional Kafka producer
> >> instance, shared across a number of threads
> >>
> >> With that,
> >>
> >> - Any application thread makes a send() call on the producer.
> >> - Then the same thread calls get() on the future returned by the last
> >> send() call
> >> - The get() call on the future returns after it gets the acknowledgement
> >> from the system for the message send
> >>
> >> At this point, is it guaranteed that the message has actually been
> written
> >> (but may not be committed by calling fsync) to ALL of the replicas’
> >> filesystems?
> >>
> >> Sincerely,
> >> Anindya Haldar
> >> Oracle Responsys
> >>
> >>
>
>


Re: Streams Newbie Question - Deployment and Management of Stream Processors

2020-01-13 Thread M. Manna
Hey Sachin,

On Mon, 13 Jan 2020 at 05:12, Sachin Mittal  wrote:

> Hi,
> The way I have used streams processing in past; use case to process streams
> is when you have a continuous stream of data which needs to be processed
> and used by certain applications.
> Since in kafka streams can be a simple java application, this application
> can run in its own JVM which is different from say actual client
> application.
> It can be on same physical or virtual machine, but some degree of
> separation is best.
>
> Regarding streams the way I look at it that, it is some continuous process
> whose data downstream is used by micro services.
> The downstream data can be stored using stream's state stores or can be
> some external data store (say mongodb, cassandra, etc).
>

 I totally get your point. My understanding has been the same too. Stream
processing is all about honouring what stream is all about - stateless,
non-interfering (almost), and side-effect free.
 Also, even though the terminal result from stream topology can be stored -
may be it's needed for decision making only. So storage is a usage (amongst
many).

Thanks a lot for clarifying. I shall continue my endeavour to learn other
things. Apart from Confluent and ASF examples, do you recommend anything
else for starters ?

Regards,

Hope it answers some of your questions.
>
> Thanks
> Sachin
>
>
>
> On Mon, Jan 13, 2020 at 1:32 AM M. Manna  wrote:
>
> > Hello,
> >
> > Even though I have been using Kafka for a while, it's primarily for
> > publish/subscribe event messaging ( and I understand them reasonably
> well).
> > But I would like to do more regarding streams.
> >
> > For my initiative, I have been going through the code written in
> "examples"
> > folder. I would like to apologise for such newbie questions in advance.
> >
> > With reference to WordCountDemo.java - I wanted to understand something
> > related to Stream Processor integration with business applications (i.e.
> > clients). Is it a good practice to always keep the stream processor
> > topology separate from actual client application who uses the processed
> > data?
> >
> > My understanding (from what I can see at first glace) multiple
> > streams.start() needs careful observation for scaling up/out in long
> term.
> > To separate problems, I would expected this to be deployed separately
> (may
> > be microservices?) But again, I am simply entering this world of streams,
> > so I could really use some insight into how some of us has tackled this
> > over the years.
> >
> > Kindest Regards,
> >
>


Streams Newbie Question - Deployment and Management of Stream Processors

2020-01-12 Thread M. Manna
Hello,

Even though I have been using Kafka for a while, it's primarily for
publish/subscribe event messaging ( and I understand them reasonably well).
But I would like to do more regarding streams.

For my initiative, I have been going through the code written in "examples"
folder. I would like to apologise for such newbie questions in advance.

With reference to WordCountDemo.java - I wanted to understand something
related to Stream Processor integration with business applications (i.e.
clients). Is it a good practice to always keep the stream processor
topology separate from actual client application who uses the processed
data?

My understanding (from what I can see at first glace) multiple
streams.start() needs careful observation for scaling up/out in long term.
To separate problems, I would expected this to be deployed separately (may
be microservices?) But again, I am simply entering this world of streams,
so I could really use some insight into how some of us has tackled this
over the years.

Kindest Regards,


Re: Free Kafka Stream Data

2020-01-08 Thread M. Manna
Priyanka,

On Wed, 8 Jan 2020 at 20:42, cool girl  wrote:

> Hi ,
>
> I am trying to learn Kafka. Is there any free API which I can use like
> twitter? I created twitter account but looks like ti will take days before
> I can use their streaming data .
>

  Welcome to Kafka. If you are seeking a REST API for pubsub/streaming.
there is none yet.
The only REST API that I am aware of is part of Kafka Connect - which you
can read about from here -
http://kafka.apache.org/documentation/#connect_rest

There are also REST APIs provided under Confluent licence, but they are
probably more admin and not what you are looking for. Please visit
confluent.io to check more details.

Regards,

>
> Thanks
> Priyanka
>


Re: callback function

2020-01-08 Thread M. Manna
Hey Tavares,


On Wed, 8 Jan 2020 at 09:38, Tom Bentley  wrote:

> Tavares, if you're asking about the consumer then I think you might have a
> misconception about how it works: The application calls poll() to fetch the
> latest records from the broker(s). The broker is not pushing records into
> some queue in the consumer. It might be worth reading
> https://kafka.apache.org/documentation/#design_pull.
>
> Kind regards,
>
> Tom
>
> On Wed, Jan 8, 2020 at 9:09 AM Jonathan Santilli <
> jonathansanti...@gmail.com>
> wrote:
>
> > Hello, when you say "something is new in the queue" you mean a new
> > message/record is available in a partition within a topic?
> >
> > Cheers!
> >
> > On Tue, Jan 7, 2020, 8:46 PM Tavares Forby 
> > wrote:
> >
> > > Hi,
> > >
> > > Is there a method in which a function can wake when something is new in
> > > the queue? This has to be non-blocking to the main thread.
> > >
> > > Thanks,
> > > -Tavares
> > >
> >


+1 to what Tom et. al. mentioned.

Perhaps it would be good to know what it is you are trying to solve
exactly. We may be able to help you better if we have then if picture :) ?

Regards,

>
>


Re: Question On KIP-500

2020-01-07 Thread M. Manna
Hey Ryanne,

Amazing! Thanks for pointing this out. I’ll take a look soon.

Regards,


On Tue, 7 Jan 2020 at 17:15, Ryanne Dolan  wrote:

> Hello. The dev list might be a better place to ask this question. FWIW I
> believe your interpretation is correct -- the proposal essentially uses two
> separate clusters, comprising "controllers" and "brokers". N.B. that the
> brokers cannot become controllers or vice versa.
>
> You can find the discussion thread here:
>
> https://lists.apache.org/thread.html/cce5313ebe72bde34bf0da3af5a1723db3ee871667b1fd8edf2ee7ab@%3Cdev.kafka.apache.org%3E
>
> You will see I expressed concerns early on that "the proposal still
> requires separate processes with separate configuration".
>
> Ryanne
>
> On Thu, Jan 2, 2020 at 9:45 AM M. Manna  wrote:
>
> > Hello,
> >
> > Greetings of the New Year to everybody. Sorry for reviving this randomly,
> > as I didn't have the original thread anymore.
> >
> > I was reading through this KIP and trying to following the current vs
> > proposed diagrams. Once again, apologies for making mistakes in
> > understanding this.
> >
> > Are we replacing the ZK cluster with another kafka cluster which will
> > simply do what ZK is doing? Or, are we simply distributing the metadata
> > management job to the existing kafka cluster? I believe it's the former
> as
> > I read on the KIP that the metadata manager (in post-ZK world) would be
> > separate from Kafka brokers But it would be good if someone can correct
> me.
> >
> > Thanks and Best Regards,
> >
>


Re: Why do some people recommend breaking large messages into many smaller ones?

2020-01-03 Thread M. Manna
Hi,


On Fri, 3 Jan 2020 at 19:48, Clark Sims  wrote:

> Why do some people so strongly recommend cutting large messages into
> many small messages, as opposed to changing max.message.bytes?
>
> For example, Stéphane Maarek at
> https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafkam,
> says "Kafka isn’t meant to handle large messages and that’s why the
> message max size is 1MB (the setting in your brokers is called
> message.max.bytes). See Apache Kafka. If you wanted to you could
> increase   that as well as make sure to increase the network buffers
> for your producers and consumers. Let me be clear, I DO NOT ENCOURAGE
> THIS."
>
> It seems to me that a 100 megabyte message should be fine on any large
> server.
>

 It depends on what you are trying to do. In streams, or a event-centric
ecosystem having a 100MB payload sounds a bit extreme to me. I would expect
large message size when you are dealing with log aggregation or e2e
integration (e.g. via Kafka Connect). It's not about who recommends what,
it's about what your end goal is with Kafka.

Also, it's not related to Kafka, but any other modern messaging system e.g.
Pulsar, Nats etc. You will determine what throughput/latency meets your
target and what kind of payload you have. Based on that there will always
be an upper bound of what you can/cannot realistically achieve.

Thanks,

>
> Thanks in Advance,
> Clark
>


Question On KIP-500

2020-01-02 Thread M. Manna
Hello,

Greetings of the New Year to everybody. Sorry for reviving this randomly,
as I didn't have the original thread anymore.

I was reading through this KIP and trying to following the current vs
proposed diagrams. Once again, apologies for making mistakes in
understanding this.

Are we replacing the ZK cluster with another kafka cluster which will
simply do what ZK is doing? Or, are we simply distributing the metadata
management job to the existing kafka cluster? I believe it's the former as
I read on the KIP that the metadata manager (in post-ZK world) would be
separate from Kafka brokers But it would be good if someone can correct me.

Thanks and Best Regards,


Re: Kafka trunk vs master branch

2019-12-25 Thread M. Manna
+1 with what  John mentioned.

Master is more like a template that gets created for new repo. It’s not in
use for any Kafka activities (not that we know of).

Regards,


On Wed, 25 Dec 2019 at 17:04, John Roesler  wrote:

> Hi Sachin,
>
> Trunk is the basis for development. I’m not sure what master is for, if
> anything. I’ve never used it for anything or even checked it out.
>
> The numbered release branches are used to develop patch releases.
>
> Releases are created from trunk, PRs should be made against trunk, etc.
>
> Thanks for asking!
> John
>
> On Wed, Dec 25, 2019, at 08:54, Sachin Mittal wrote:
> > Hello Folks,
> > I just wanted to know what commits goes into what branch.
> >
> > I see trunk branch which seems default and latest.
> > I also see master branch which seems bit behind trunk.
> > I also see different versions branches like 2.2, 2.3 and 2.4 which are
> also
> > actively updated.
> >
> > I wanted to know when forking kafka repo, which is the branch one should
> > use base off to build from source or do any active development.
> >
> > What is the difference between between trunk and master branch?
> > Also release branches are created from trunk or master branch?
> >
> > Also when issuing a pull request which is the general branch one should
> use
> > as target?
> >
> > Thanks
> > Sachin
> >
>


Re: [EXTERNAL] Re: Health Check

2019-12-21 Thread M. Manna
This is really helpful. Thanks for sharing this with the community.



On Sat, 21 Dec 2019 at 19:29, Alex Woolford  wrote:

> Not sure if this is helpful, Tim.
>
> I recently recorded a video that shows the gist of monitoring Kafka with
> Prometheus: https://www.youtube.com/watch?v=nk3sk1LO7Bo
>
> Cheers,
>
> Alex
>
> On Sat, Dec 21, 2019 at 12:13 PM Liam Clarke 
> wrote:
>
> > Nope, it's a third party project:
> > https://github.com/prometheus/jmx_exporter
> >
> > We're using it with an 0.11 cluster.
> >
> > On Sat, 21 Dec. 2019, 1:52 am Sullivan, Tim,  >
> > wrote:
> >
> > >
> > > Wonderful, I assume that it is only in the latest release,
> > > correct? I’m running 1.1.0 so I’m thinking I’d have to upgrade to get
> the
> > > benefits?
> > >
> > >
> > > Tim Sullivan
> > >
> > > Be Well
> > >
> > >
> > >
> > > Sr. Software Engineer
> > > Supply Chain – IT | Data & Analytics
> > > The Home Depot
> > > 2250 Newmarket Parkway | Atlanta, GA 30339
> > >
> > > Work Cell 470-455-8346
> > > Personal Cell 678-525-2583
> > > Home Phone:  770-945-3315
> > >
> > >
> > > *From: *Liam Clarke 
> > > *Reply-To: *"users@kafka.apache.org" 
> > > *Date: *Thursday, December 19, 2019 at 6:13 PM
> > > *To: *"users@kafka.apache.org" 
> > > *Subject: *[EXTERNAL] Re: Health Check
> > >
> > >
> > >
> > > We've wired the Prometheus JMX exporter java-agent into our Kafka
> > > processes, we've then set up alerts in Prometheus for several key
> metrics
> > > to monitor cluster health (under replicated partitions/offline
> > partitions,
> > > request handler avg idle percent to measure load etc.) - and because
> the
> > > JMX exporter runs within the Kafka process, any sustained failure to
> > scrape
> > > also sets off an alert.
> > >
> > > Cheers,
> > >
> > > Liam Clarke
> > >
> > > On Thu, Dec 19, 2019 at 12:42 AM Miguel Silvestre <
> msilves...@gmail.com>
> > > wrote:
> > >
> > > A simple tcp connection to kafka port (9092) should be enough no?
> > >
> > > --
> > > Miguel Silvestre
> >
> >
>


Re: Health Check

2019-12-17 Thread M. Manna
Jai,

On Tue, 17 Dec 2019 at 17:33, Jai Nanda Kumar 
wrote:

> Hi,
>
> How to perform a health check on a running Kafka server in AWS EC2
> server.
>

 SHouldn't this be part of your liveness probe? Or, are you trying to do
this adhoc (not how folks do it anyway)?

>
> Thanks and Regards,
> A. Jai Nanda Kumar,
> Senior Software Engineer,
> 9066260799.
> Confidentiality Notice: This e-mail message, including any attachments, is
> for the sole use of the intended recipient(s) and may contain confidential
> and privileged information. Any unauthorized review, use, disclosure or
> distribution is prohibited. If you are not the intended recipient, please
> contact the sender by reply e-mail and destroy all copies of the original
> message.
>


Re: Which container should you use when deploying on docker ?

2019-12-17 Thread M. Manna
Robin,

On Tue, 17 Dec 2019 at 01:58, Yu Watanabe  wrote:

> Robin.
>
> Thank you for the reply.
>
> I am about to run kafka on docker in development environment for first time
> and also in production.
> To get started, I searched images in docker hub that has "Official Images"
> tag to find if there is a docker image maintained by the community.
> But there wasn't one so I asked the question.
>
> Thanks,
> Yu Watanabe
>
> On Tue, Dec 17, 2019 at 12:26 AM Robin Moffatt  wrote:
>
> > There are various Kafka images available, including:
> >
> > https://hub.docker.com/r/confluentinc/cp-kafka/
> > 
> > https://hub.docker.com/r/wurstmeister/kafka/
> >
> > I'm not 100% clear what your doubt is? If these are legitimate Kafka
> > images, or something else?
> > 
> >
>

 I guess the confusion is due to fact is Kafka is not as official as, for
example, “Tomcat” on public dockerhub.  When referring to official ASF
image, I would always first go to Apache maintained dockerhub site for
images (not elsewhere). It’s perfectly valid to have CP image (given how
major CP contribution is for Kafka on cloud). But it's still not ASF image
(but licensed under Confluent licence).

If you don’t agree, let’s google “Official Apache Kafka Docker Image” vs
“Official Apache Tomcat docker image" - let's see what the top search
(minus Ads) says.

It certainly wouldn't cause that much of an issue if it's advertised both
on Kafka site and Github main site that CP kafka image is the official go
to. I use CP image for our test K8s cluster, and I would love this to be
clarified in official capacity on Kafka/Github site.

Hope this makes sense.

> >
> > --
> >
> > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
> >
> >
> > On Sat, 14 Dec 2019 at 04:18, Yu Watanabe  wrote:
> >
> > > Hello.
> > >
> > > I would like to ask question related to kafka on docker engine.
> > > Which container should you use for kafka when deploying on docker in
> > > production ?
> > >
> > > When I look in docker hub , I do not see neither of below tagged for
> > kafka
> > > container .
> > >
> > > Docker certified
> > > Verified publisher
> > > Official Images
> > >
> > > Repository "confluent" seems be the closest one since its the creator
> of
> > > kafka but it does not have above tag .
> > >
> > > Thanks,
> > > Yu Watanabe
> > >
> > > --
> > > Yu Watanabe
> > > Weekend Freelancer who loves to challenge building data platform
> > > yu.w.ten...@gmail.com
> > > [image: LinkedIn icon] 
> > [image:
> > > Twitter icon] 
> > >
> >
>
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.ten...@gmail.com
> [image: LinkedIn icon]   [image:
> Twitter icon] 
>


Re: Message batch & compression doesn't work as expected

2019-12-12 Thread M. Manna
Frank,

On Thu, 12 Dec 2019 at 11:28, Frank Zhou  wrote:

> Hi,
>
> I am testing kafka client on message batch and compression. I have enabled
> message batching along with compression, with batch.size set to 3M,
> linger.ms set to 5000ms and compression.type set to gzip(Attached whole
> producer config). Then testing with wireshark to check the details.
> First issue our team notice is that compression codec seems to have some
> issue. Since we set it as gzip, but we notice in wireshark, it will display
> as other compression codec, like Snappy in attached screenshot(Not sure if
> this is wireshark's issue, or a real issue on Kafka, but the whole packet
> details display seems fine in wireshark).
> Second issue is that we have set the latency and batch number so high, but
> it still send the Produce request to server much more frequently than we
> expected. Size per message that is sending before batch & compression
> should be around 200 bytes, and during testing, all the message generated
> by us should be around 200KB, so we are expecting much less packets
> transferred than this(screenshot only shows small amount of them, total
> number is 1472).
> [image: 2019-12-12_19h00_15.png]
>  Is it we miss some config or the config is not correct leading to this?
>
>
>
I recently tuned our GCP based test cluster using batch of 800K, no
compression, and no linger.ms. We got the desired consistency and desired
throughput. But we used 2.3.0 version, and I don't suppose it matters much
at that point.

https://www.youtube.com/watch?v=oQe7PpDDdzA

The above shows a very good and detailed analysis done by Becket regarding
throughput calculation and estimation. Have you checked this to see what
matches your scenario?
The sending is impacted by linger.ms too, not just batch.size. So tuning
them together is a bit tricky. Perhaps you want to see which one you need
more.




> --
> *Frank Zhou*
> R, Itiviti
> Java Developer
> D +852 2521 7480
> frank.z...@itiviti.com
>
> __
>
> itiviti.com 
>
> *The information contained in or attached to this email is strictly
> confidential. If you are not the intended recipient, please notify us
> immediately by telephone and return the message to us.*
>
> *Email communications by definition contain personal information. The
> Itiviti group of companies is subject to European data protection
> regulations. Itiviti’s Privacy Notice is available at www.itiviti.com
> . Itiviti expects the recipient of this email to
> be compliant with Itiviti’s Privacy Notice and applicable regulations.
> Please advise us immediately at dataprotectiont...@itiviti.com if you are
> not compliant with these.*
>
> __
>
> itiviti.com 
>  Follow Itiviti on Linkedin
> 
>
> The information contained in or attached to this email is strictly
> confidential. If you are not the intended recipient, please notify us
> immediately by telephone and return the message to us. Email communications
> by definition contain personal information. The Itiviti group of companies
> is subject to European data protection regulations.
>
> Itiviti’s Privacy Notice is available at www.itiviti.com. Itiviti expects
> the recipient of this email to be compliant with Itiviti’s Privacy Notice
> and applicable regulations. Please advise us immediately at
> dataprotectiont...@itiviti.com if you are not compliant with these.
>


Re: Broker data size

2019-12-02 Thread M. Manna
Hi,

On Mon, 2 Dec 2019 at 14:59, Rodoljub Radivojevic <
rodoljub.radivoje...@instana.com> wrote:

> Hi everyone,
>
> I want to calculate the total amount of data per broker (sum of sizes of
> all partitions on the broker).
> How can I do that using existing metrics?
>
>
 Why would require a kafka metric for this (just out of curiosity)?
Wouldn't your host OS provide insight on resource usage?
The data.dir directory size is essentially your data per broker (including
internal offsets and every TopicPartition assigned to the broker). Does
that not assist your investigation?

Best regards,
> Rodoljub
>


Re: Number of leader partitions per broker

2019-12-02 Thread M. Manna
Rodoljub,

On Mon, 2 Dec 2019 at 14:52, Rodoljub Radivojevic <
rodoljub.radivoje...@instana.com> wrote:

> Hello,
>
> Is it possible to calculate the number of partitions for which one broker
> is a leader, using existing Kafka metrics?
>
> Regards,
> Rodoljub
>

Does the below answer your question (or, hints to what you seek)?
https://cwiki.apache.org/confluence/display/KAFKA/Finding+Topic+and+Partition+Leader


Thanks,


Re: Broker Interceptors

2019-12-02 Thread M. Manna
Hi Tom,

On Mon, 2 Dec 2019 at 09:41, Thomas Aley  wrote:

> Hi Kafka community,
>
> I am hoping to get some feedback and thoughts about broker interceptors.
>
> KIP-42 Added Producer and Consumer interceptors which have provided Kafka
> users the ability to collect client side metrics and trace the path of
> individual messages end-to-end.
>
> This KIP also mentioned "Adding message interceptor on the broker makes a
> lot of sense, and will add more detail to monitoring. However, the
> proposal is to do it later in a separate KIP".
>
> One of the motivations for leading with client interceptors was to gain
> experience and see how useable they are before tackling the server side
> implementation which would ultimately "allow us to have a more
> complete/detailed message monitoring".
>
> Broker interceptors could also provide more value than just more complete
> and detailed monitoring such as server side schema validation, so I am
> curious to learn if anyone in the community has progressed this work; has
> ideas about other potential server side interceptor uses or has actually
> implemented something similar.
>

 I personally feel that the cost here is the impact on performance. If I am
right, this interceptor is going to tap into nearly everything. If you have
strong guarantee (min.in.sync.replicas = N-1) then this may incur some
delay (and let's not forget inter broker comms protection by TLS config).
This may not be desirable for some systems. That said, it would be good to
know what others think about this.

Thanks,

>
> Regards,
>
> Tom Aley
> thomas.a...@ibm.com
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>
>


Re: Deleting topics in windows - fix estimate or workaround

2019-12-01 Thread M. Manna
Hi both,

It’s been going around for a long time, but Kafka is officially not fully
tested and verified for Windows. The disclaimer is on the official site.
Windows servers are easy choice because a lot of infrastructures are on
Windows and a lot of businesses are dependent on those infrastructure. That
said, the forward way is highly likely to be on the cloud using some
orchestrator (Cloud Foundry) or IaaS provider e.g. AWS.

Also, Windows OS has proprietary settings on file systems which is the root
cause of this problem, so Kafka is out of scope for that. I would suggest
that you move to a Linux or UNIX like architecture (if at all possible)

Regards,

On Sun, 1 Dec 2019 at 15:24, Israel Ekpo  wrote:

> Sachin
>
> Are you running production Kafka on Windows?
>
> Is it possible to migrate to an alternate UNIX based environment?
>
> It would be great to learn why you chose to run it on Windows
>
>
> On Sun, Dec 1, 2019 at 2:22 AM Sachin Mittal  wrote:
>
> > Hi All,
> > I hope we are well aware of the critical bug in windows where kafka
> crashes
> > when we delete a topic. This affects other areas too like in stream
> > processing when trying to reset a stream using StreamsResetter.
> >
> > Right now only workaround I have found is to stop zookeeper and kafka
> > server and manually delete directories and files containing streams,
> topics
> > and offset information.
> >
> > However doing this every time is kind of unproductive.
> >
> > I see that there are multiple critical bugs logged in JIRA
> > https://issues.apache.org/jira/browse/KAFKA-6203
> > https://issues.apache.org/jira/browse/KAFKA-1194
> > around the same issue.
> >
> > I would like to know by when would the fix be available?
> > I see that there have been multiple pull requests issued around fixes of
> > these issues.
> >
> > I wanted to know if one or more pull requests need to be merged to get
> the
> > fix out or if there is something I can try config wise to have some
> > workaround for this issue.
> >
> > Please note that there can be few of us who might be using windows in
> > production too, so this fix is highly important for us.
> >
> > Please let me know what can be done to address this issue.
> >
> > Thanks
> > Sachin
> >
>


Re: kafka stream or samza

2019-11-29 Thread M. Manna
Hi

On Fri, 29 Nov 2019 at 11:11, Roberts Roth 
wrote:

> Hello
>
> I was confused, for realtime streams, shall we use kafka or samza?
>
> We have deployed kafka cluster with large scale in production
> environment.Shall we re-use kafka's streaming feature, or deploy new
> cluster of samza?
>
> Thanks for your suggestion
>

This is too broad for discussion (at least, in my opinion anyway). I think
you need to consider your business impact and development cost for managing
Kafka vs. Samza. It's not what the whitepapers say which one is the best,
but your needs and adaptability. Also, future support is an important
question.

Have you had such discussion amongst your teams?

Thanks,

>
> regards.
> Roberts
>


Re: BufferOverflowException on rolling new segment after upgrading Kafka from 1.1.0 to 2.3.1

2019-11-19 Thread M. Manna
Hi,

Is there any reason why you haven’t performed the upgrade based on official
docs ? Or, is this something you’re planning to do now?

Thanks,

On Tue, 19 Nov 2019 at 19:52, Daniyar Kulakhmetov 
wrote:

> Hi Kafka users,
>
> We updated our Kafka cluster from 1.1.0 version to 2.3.1.
> Message format and inter-broker protocol versions left the same:
>
> inter.broker.protocol.version=1.1
> log.message.format.version=1.1
>
> After upgrading, we started to get some occasional exceptions:
>
> 2019/11/19 05:30:53 INFO [ProducerStateManager
> partition=matchmaker_retry_clicks_15m-2] Writing producer snapshot at
> offset 788532 (kafka.log.ProducerStateManager)
> 2019/11/19 05:30:53 INFO [Log partition=matchmaker_retry_clicks_15m-2,
> dir=/mnt/kafka] Rolled new log segment at offset 788532 in 1 ms.
> (kafka.log.Log)
> 2019/11/19 05:31:01 ERROR [ReplicaManager broker=0] Error processing append
> operation on partition matchmaker_retry_clicks_15m-2
> (kafka.server.ReplicaManager)
> 2019/11/19 05:31:01 java.nio.BufferOverflowException
> 2019/11/19 05:31:01 at java.nio.Buffer.nextPutIndex(Buffer.java:527)
> 2019/11/19 05:31:01 at
> java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:797)
> 2019/11/19 05:31:01 at
> kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:134)
> 2019/11/19 05:31:01 at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> 2019/11/19 05:31:01 at
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
> 2019/11/19 05:31:01 at
> kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
> 2019/11/19 05:31:01 at
> kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:520)
> 2019/11/19 05:31:01 at kafka.log.Log.$anonfun$roll$8(Log.scala:1690)
> 2019/11/19 05:31:01 at
> kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1690)
> 2019/11/19 05:31:01 at scala.Option.foreach(Option.scala:407)
> 2019/11/19 05:31:01 at kafka.log.Log.$anonfun$roll$2(Log.scala:1690)
> 2019/11/19 05:31:01 at
> kafka.log.Log.maybeHandleIOException(Log.scala:2085)
> 2019/11/19 05:31:01 at kafka.log.Log.roll(Log.scala:1654)
> 2019/11/19 05:31:01 at kafka.log.Log.maybeRoll(Log.scala:1639)
> 2019/11/19 05:31:01 at kafka.log.Log.$anonfun$append$2(Log.scala:966)
> 2019/11/19 05:31:01 at
> kafka.log.Log.maybeHandleIOException(Log.scala:2085)
> 2019/11/19 05:31:01 at kafka.log.Log.append(Log.scala:850)
> 2019/11/19 05:31:01 at kafka.log.Log.appendAsLeader(Log.scala:819)
> 2019/11/19 05:31:01 at
>
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:772)
> 2019/11/19 05:31:01 at
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
> 2019/11/19 05:31:01 at
> kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:259)
> 2019/11/19 05:31:01 at
> kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:759)
> 2019/11/19 05:31:01 at
>
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:763)
> 2019/11/19 05:31:01 at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> 2019/11/19 05:31:01 at
> scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
> 2019/11/19 05:31:01 at
> scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
> 2019/11/19 05:31:01 at
> scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
> 2019/11/19 05:31:01 at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
> 2019/11/19 05:31:01 at
> scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
> 2019/11/19 05:31:01 at
> scala.collection.TraversableLike.map(TraversableLike.scala:238)
> 2019/11/19 05:31:01 at
> scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> 2019/11/19 05:31:01 at
> scala.collection.AbstractTraversable.map(Traversable.scala:108)
> 2019/11/19 05:31:01 at
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:751)
> 2019/11/19 05:31:01 at
> kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:492)
> 2019/11/19 05:31:01 at
> kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:544)
> 2019/11/19 05:31:01 at
> kafka.server.KafkaApis.handle(KafkaApis.scala:113)
> 2019/11/19 05:31:01 at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> 2019/11/19 05:31:01 at java.lang.Thread.run(Thread.java:748)
>
>
> This error persists until broker gets restarted (or leadership get moved to
> another broker).
>
> What could be the issue and how we can solve it?
>
> Thank you!
>
> Best regards,
> Daniyar.
>


Re: Kafka Broker do not recover after crash

2019-11-17 Thread M. Manna
HI,

On Sat, 16 Nov 2019 at 19:54, Oliver Eckle  wrote:

> Hi,
>
> yes it is intentional, but just because I don't know better and want to
> spare a little resources?
>

I never understood the benefit of having more brokers than replicas with
the intention of saving resources. A lot of people do that, and the Kafka
community seems to be okay with it ( i.e. not documentation or caution for
NOT doing that). Please make sure you use it to full extent.

For your case, I believe your log and index file stored on the affected
broker (or rather, the PV attached to it if you have one) may have been
corrupted.
The best way (rather than debugging and investigating logs endlessly) is to
simply delete the pod and let it start again. Also, make sure that it
does't refer to the old files (if you have a PV/StatefulSet with it). It's
important that upon restart the broker builds all the data files themselves
than referring to previously stored files.

Try that and see how it goes.

Thanks,


>From your answer I guess the preferred way is having a replication of 3?
>
>
> -Ursprüngliche Nachricht-
> Von: M. Manna 
> Gesendet: Samstag, 16. November 2019 20:27
> An: users@kafka.apache.org
> Betreff: Re: Kafka Broker do not recover after crash
>
> Hi,
>
> On Sat, 16 Nov 2019 at 19:21, Oliver Eckle  wrote:
>
> > Hello,
> >
> >
> >
> > having a Kafka Cluster running in Kubernetes with 3 Brokers and all
> > replikations (topic, offsets) set to 2.
>
>
> This sounds strange. You have 3 brokers and replication set to 2. Is this
> intentional ?
>
>
> >
> > For whatever reason one of the broker crash and restartes. And since
> > it circles in some kind of restart/crash loop.
> >
> > Any idea how to recover?
> >
> >
> >
> > Whole Logfile is like that:
> >
> >
> >
> > [38;5;6m [38;5;5m19:15:42.58 [0m
> >
> > [38;5;6m [38;5;5m19:15:42.58 [0m[1mWelcome to the Bitnami kafka
> > container[0m
> >
> > [38;5;6m [38;5;5m19:15:42.58 [0mSubscribe to project updates by
> > watching [1mhttps://github.com/bitnami/bitnami-docker-kafka[0m
> <http://github.com/bitnami/bitnami-docker-kafka%5B0m>
> > <http://github.com/bitnami/bitnami-docker-kafka%5B0m>
> >
> > [38;5;6m [38;5;5m19:15:42.58 [0mSubmit issues and feature requests at
> > [1mhttps://github.com/bitnami/bitnami-docker-kafka/issues[0m
> <http://github.com/bitnami/bitnami-docker-kafka/issues%5B0m>
> > <http://github.com/bitnami/bitnami-docker-kafka/issues%5B0m>
> >
> > [38;5;6m [38;5;5m19:15:42.58 [0mSend us your feedback at
> > [1mcontain...@bitnami.com[0m
> >
> > [38;5;6m [38;5;5m19:15:42.59 [0m
> >
> > [38;5;6m [38;5;5m19:15:42.59 [0m[38;5;2mINFO [0m ==> ** Starting Kafka
> > setup
> > **
> >
> > [38;5;6m [38;5;5m19:15:42.83 [0m[38;5;3mWARN [0m ==> You set the
> > environment variable ALLOW_PLAINTEXT_LISTENER=yes. For safety reasons,
> > do not use this flag in a production environment.
> >
> > [38;5;6m [38;5;5m19:15:42.84 [0m[38;5;2mINFO [0m ==> Initializing
> Kafka...
> >
> > [38;5;6m [38;5;5m19:15:42.84 [0m[38;5;2mINFO [0m ==> No injected
> > configuration files found, creating default config files
> >
> > [38;5;6m [38;5;5m19:15:43.83 [0m[38;5;2mINFO [0m ==> ** Kafka setup
> > finished! **
> >
> >
> >
> > [38;5;6m [38;5;5m19:15:43.84 [0m[38;5;2mINFO [0m ==> ** Starting Kafka
> > **
> >
> > [2019-11-16 19:15:49,625] INFO Registered
> > kafka:type=kafka.Log4jController MBean
> > (kafka.utils.Log4jControllerRegistration$)
> >
> > [2019-11-16 19:15:52,933] INFO Registered signal handlers for TERM,
> > INT, HUP
> > (org.apache.kafka.common.utils.LoggingSignalHandler)
> >
> > [2019-11-16 19:15:52,934] INFO starting (kafka.server.KafkaServer)
> >
> > [2019-11-16 19:15:52,935] INFO Connecting to zookeeper on
> > kafka-zookeeper
> > (kafka.server.KafkaServer)
> >
> > [2019-11-16 19:15:53,230] INFO [ZooKeeperClient Kafka server]
> > Initializing a new session to kafka-zookeeper.
> > (kafka.zookeeper.ZooKeeperClient)
> >
> > [2019-11-16 19:15:53,331] INFO Client
> >
> > environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255a
> > c140bc f, built on 03/06/2019 16:18 GMT
> > (org.apache.zookeeper.ZooKeeper)
> >
> > [2019-11-16 19:15:53,331] INFO Client
> > environment:host.name=kafka-1.kafka-headless.bd-iot.svc.cluster.local
> > (org.apache.zookeeper.ZooKeeper)
> >
> > [2019-11-16 19:15:53,331] INFO Client
> > environment:java.version=1.8.0_23

Re: Kafka Broker do not recover after crash

2019-11-16 Thread M. Manna
Hi,

On Sat, 16 Nov 2019 at 19:21, Oliver Eckle  wrote:

> Hello,
>
>
>
> having a Kafka Cluster running in Kubernetes with 3 Brokers and all
> replikations (topic, offsets) set to 2.


This sounds strange. You have 3 brokers and replication set to 2. Is this
intentional ?


>
> For whatever reason one of the broker crash and restartes. And since it
> circles in some kind of restart/crash loop.
>
> Any idea how to recover?
>
>
>
> Whole Logfile is like that:
>
>
>
> [38;5;6m [38;5;5m19:15:42.58 [0m
>
> [38;5;6m [38;5;5m19:15:42.58 [0m[1mWelcome to the Bitnami kafka
> container[0m
>
> [38;5;6m [38;5;5m19:15:42.58 [0mSubscribe to project updates by watching
> [1mhttps://github.com/bitnami/bitnami-docker-kafka[0m
> 
>
> [38;5;6m [38;5;5m19:15:42.58 [0mSubmit issues and feature requests at
> [1mhttps://github.com/bitnami/bitnami-docker-kafka/issues[0m
> 
>
> [38;5;6m [38;5;5m19:15:42.58 [0mSend us your feedback at
> [1mcontain...@bitnami.com[0m
>
> [38;5;6m [38;5;5m19:15:42.59 [0m
>
> [38;5;6m [38;5;5m19:15:42.59 [0m[38;5;2mINFO [0m ==> ** Starting Kafka
> setup
> **
>
> [38;5;6m [38;5;5m19:15:42.83 [0m[38;5;3mWARN [0m ==> You set the
> environment
> variable ALLOW_PLAINTEXT_LISTENER=yes. For safety reasons, do not use this
> flag in a production environment.
>
> [38;5;6m [38;5;5m19:15:42.84 [0m[38;5;2mINFO [0m ==> Initializing Kafka...
>
> [38;5;6m [38;5;5m19:15:42.84 [0m[38;5;2mINFO [0m ==> No injected
> configuration files found, creating default config files
>
> [38;5;6m [38;5;5m19:15:43.83 [0m[38;5;2mINFO [0m ==> ** Kafka setup
> finished! **
>
>
>
> [38;5;6m [38;5;5m19:15:43.84 [0m[38;5;2mINFO [0m ==> ** Starting Kafka **
>
> [2019-11-16 19:15:49,625] INFO Registered kafka:type=kafka.Log4jController
> MBean (kafka.utils.Log4jControllerRegistration$)
>
> [2019-11-16 19:15:52,933] INFO Registered signal handlers for TERM, INT,
> HUP
> (org.apache.kafka.common.utils.LoggingSignalHandler)
>
> [2019-11-16 19:15:52,934] INFO starting (kafka.server.KafkaServer)
>
> [2019-11-16 19:15:52,935] INFO Connecting to zookeeper on kafka-zookeeper
> (kafka.server.KafkaServer)
>
> [2019-11-16 19:15:53,230] INFO [ZooKeeperClient Kafka server] Initializing
> a
> new session to kafka-zookeeper. (kafka.zookeeper.ZooKeeperClient)
>
> [2019-11-16 19:15:53,331] INFO Client
>
> environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bc
> f, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
>
> [2019-11-16 19:15:53,331] INFO Client
> environment:host.name=kafka-1.kafka-headless.bd-iot.svc.cluster.local
> (org.apache.zookeeper.ZooKeeper)
>
> [2019-11-16 19:15:53,331] INFO Client environment:java.version=1.8.0_232
> (org.apache.zookeeper.ZooKeeper)
>
> [2019-11-16 19:15:53,331] INFO Client environment:java.vendor=AdoptOpenJDK
> (org.apache.zookeeper.ZooKeeper)
>
> [2019-11-16 19:15:53,332] INFO Client
> environment:java.home=/opt/bitnami/java (org.apache.zookeeper.ZooKeeper)
>
> [2019-11-16 19:15:53,332] INFO Client
>
> environment:java.class.path=/opt/bitnami/kafka/bin/../libs/activation-1.1.1.
>
> jar:/opt/bitnami/kafka/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/bit
>
> nami/kafka/bin/../libs/argparse4j-0.7.0.jar:/opt/bitnami/kafka/bin/../libs/a
>
> udience-annotations-0.5.0.jar:/opt/bitnami/kafka/bin/../libs/commons-lang3-3
>
> .8.1.jar:/opt/bitnami/kafka/bin/../libs/connect-api-2.3.1.jar:/opt/bitnami/k
>
> afka/bin/../libs/connect-basic-auth-extension-2.3.1.jar:/opt/bitnami/kafka/b
>
> in/../libs/connect-file-2.3.1.jar:/opt/bitnami/kafka/bin/../libs/connect-jso
>
> n-2.3.1.jar:/opt/bitnami/kafka/bin/../libs/connect-runtime-2.3.1.jar:/opt/bi
>
> tnami/kafka/bin/../libs/connect-transforms-2.3.1.jar:/opt/bitnami/kafka/bin/
>
> ../libs/guava-20.0.jar:/opt/bitnami/kafka/bin/../libs/hk2-api-2.5.0.jar:/opt
>
> /bitnami/kafka/bin/../libs/hk2-locator-2.5.0.jar:/opt/bitnami/kafka/bin/../l
>
> ibs/hk2-utils-2.5.0.jar:/opt/bitnami/kafka/bin/../libs/jackson-annotations-2
>
> .10.0.jar:/opt/bitnami/kafka/bin/../libs/jackson-core-2.10.0.jar:/opt/bitnam
>
> i/kafka/bin/../libs/jackson-databind-2.10.0.jar:/opt/bitnami/kafka/bin/../li
>
> bs/jackson-dataformat-csv-2.10.0.jar:/opt/bitnami/kafka/bin/../libs/jackson-
>
> datatype-jdk8-2.10.0.jar:/opt/bitnami/kafka/bin/../libs/jackson-jaxrs-base-2
>
> .10.0.jar:/opt/bitnami/kafka/bin/../libs/jackson-jaxrs-json-provider-2.10.0.
>
> jar:/opt/bitnami/kafka/bin/../libs/jackson-module-jaxb-annotations-2.10.0.ja
>
> r:/opt/bitnami/kafka/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/bi
>
> tnami/kafka/bin/../libs/jackson-module-scala_2.11-2.10.0.jar:/opt/bitnami/ka
>
> fka/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/bitnami/kafka/bin/../l
>
> ibs/jakarta.annotation-api-1.3.4.jar:/opt/bitnami/kafka/bin/../libs/jakarta.
> inject-2.5.0.jar:/opt/bitnami/kafka/bin/../libs/jakarta.ws
> .rs-api-2.1.5.jar:
>
> 

Re: Partition Reassignment is getting stuck

2019-11-13 Thread M. Manna
On Wed, 13 Nov 2019 at 13:10, Ashutosh singh  wrote:

> Yeah, Although it wouldn't have any impact but I will have to try this
> tonight as it is peak business hours now.
>  Instead deleting all data I will try to delete topic partitions which are
> having issues and then restart the broker.  I believe it should catch up
> but I will let you know.
>

 Since you're doing it OOB hours, it should be fine. The issue you're
mentioning here is not uncommon, but such occurrence should be close to
minuscule. As long as you have >=3 replicas you should be able to do this
comfortably.

Thanks,

>
>
>
> On Wed, Nov 13, 2019 at 6:23 PM M. Manna  wrote:
>
> > On Wed, 13 Nov 2019 at 12:41, Ashutosh singh  wrote:
> >
> > > Hi,
> > >
> > > All of a  sudden I see under replicated partition in our Kafka cluster
> > and
> > > it is not getting replicated.  It seems it is getting stuck somewhere.
> In
> > > sync replica is missing only form one of the broker it seems there is
> > some
> > > issue with that broker but other hand there are many others topic on
> that
> > > node and they are working fine.  I have tried rolling restart of all
> the
> > > nodes in cluster but that didn't help.
> > > I tried manual reassignment of that particular topic but that is
> getting
> > > stuck forever.  So I had to kill the reassignment by deleting
> > > /admin/reassign_partitions  node.  I restarted zookeeper so that leader
> > > gets change and then tried to reassign partitions but again it is
> getting
> > > stuck.
> > >
> > > I really appreciate if someone can help to understand the issue.
> > >
> >
> > If all you have is 1 broker not in sync - can you please try to stop that
> > broker, delete all the data files on that broker, and restart? It should
> > catch up.
> >
> >
> > >
> > > No of nodes : 8
> > > Version : 2.1.1
> > >
> > > --
> > > Thanks
> > > Ashu
> > >
> >
>
>
> --
> Thanx & Regard
> Ashutosh Singh
> 08151945559
>


Re: Partition Reassignment is getting stuck

2019-11-13 Thread M. Manna
On Wed, 13 Nov 2019 at 12:41, Ashutosh singh  wrote:

> Hi,
>
> All of a  sudden I see under replicated partition in our Kafka cluster and
> it is not getting replicated.  It seems it is getting stuck somewhere. In
> sync replica is missing only form one of the broker it seems there is some
> issue with that broker but other hand there are many others topic on that
> node and they are working fine.  I have tried rolling restart of all the
> nodes in cluster but that didn't help.
> I tried manual reassignment of that particular topic but that is getting
> stuck forever.  So I had to kill the reassignment by deleting
> /admin/reassign_partitions  node.  I restarted zookeeper so that leader
> gets change and then tried to reassign partitions but again it is getting
> stuck.
>
> I really appreciate if someone can help to understand the issue.
>

If all you have is 1 broker not in sync - can you please try to stop that
broker, delete all the data files on that broker, and restart? It should
catch up.


>
> No of nodes : 8
> Version : 2.1.1
>
> --
> Thanks
> Ashu
>


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-12 Thread M. Manna
HI,

On Tue, 12 Nov 2019 at 14:37, Jorg Heymans  wrote:

> Thanks for helping debugging this. You can reproduce the issue using below
> deserializer, and invoking kafka-console-consumer with
> --value-deserializer=my.BasicDeserializer . As you will see, when the
> consumer starts receiving messages only "SERDE WITHOUT HEADERS" is printed
> to the console.
>
> Thanks,
> Jorg
>
In the above, what command have you put exactly from command prompt ? can
you share this with us?

Thanks,

>
> public class BasicDeserializer implements Deserializer {
>
> @Override
> public void configure(Map configs, boolean isKey) {
> System.out.println("CONFIGURE");
> }
>
> @Override
> public String deserialize(String topic, byte[] data) {
> System.out.println("SERDE WITHOUT HEADERS");
> return new String(data);
> }
>
> @Override
> public String deserialize(String topic, Headers headers, byte[] data) {
> System.out.println("SERDE WITH HEADERS");
> return new String(data);
> }
>
>     @Override
> public void close() {
> System.out.println("CLOSE");
> }
> }
>
>
>
>
> On 2019/11/12 12:57:21, "M. Manna"  wrote:
> > HI again,
> >
> > On Tue, 12 Nov 2019 at 12:31, Jorg Heymans 
> wrote:
> >
> > > Hi,
> > >
> > > The issue is not that i cannot get a custom deserializer working, it's
> > > that the custom deserializer i provide implements the default method
> from
> > > the Deserializer interface
> > >
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> > > that gives access to record Headers.
> > >
> > > The kafka console consumer never calls this method, it will only call
> the
> > > variant without Headers
> > >
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
> > >
> > > I'm using kafka 2.3.0 btw.
> > >
> > > Jorg
> > >
> >
> > Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> > is calling default implementation of Deserializer.deserialize() with
> > header. The default implementation returns the implementation of
> > deserialize() with header. If you provide overridden version of
> > deserializer (for both header/non-header) it will be called.
> >
> >
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
> >
> >
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
> >
> > Console consumer simply puts a consumer wrapper around KafkaConsumer.
> There
> > is no change in behaviour otherwise. I take it that you've debugged and
> > confirmed that it's not calling your overridden deserialize() with
> headers?
> > If so, can you link it here for everyone's benefit?
> >
> > Thanks,
> >
> >
> >
> >
> >
> > > On 2019/11/12 11:58:26, "M. Manna"  wrote:
> > > >
> > > > I think you can try the following to get your implementation working
> > > >
> > > > 1) Provide the SerDe classes into classpath
> > > > 2) Provide your consumer config file
> > > > 3) Provide key/value Deserializer props via --consumer-property arg.
> > > >
> > >
> > >
> >
>


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-12 Thread M. Manna
HI again,

On Tue, 12 Nov 2019 at 12:31, Jorg Heymans  wrote:

> Hi,
>
> The issue is not that i cannot get a custom deserializer working, it's
> that the custom deserializer i provide implements the default method from
> the Deserializer interface
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> that gives access to record Headers.
>
> The kafka console consumer never calls this method, it will only call the
> variant without Headers
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
>
> I'm using kafka 2.3.0 btw.
>
> Jorg
>

Recrord feching (deserialization call) happens using Fetcher. And Fetcher
is calling default implementation of Deserializer.deserialize() with
header. The default implementation returns the implementation of
deserialize() with header. If you provide overridden version of
deserializer (for both header/non-header) it will be called.

https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265

https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268

Console consumer simply puts a consumer wrapper around KafkaConsumer. There
is no change in behaviour otherwise. I take it that you've debugged and
confirmed that it's not calling your overridden deserialize() with headers?
If so, can you link it here for everyone's benefit?

Thanks,





> On 2019/11/12 11:58:26, "M. Manna"  wrote:
> >
> > I think you can try the following to get your implementation working
> >
> > 1) Provide the SerDe classes into classpath
> > 2) Provide your consumer config file
> > 3) Provide key/value Deserializer props via --consumer-property arg.
> >
>
>


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-12 Thread M. Manna
Hi

On Tue, 12 Nov 2019 at 09:53, Jorg Heymans  wrote:

> Indeed, i corrected the typo but now my deserializer class is not taken
> into account at all and it goes back to the default deserializer. You can
> verify this by putting a non-existent class and it still runs fine.
>
> value.deserializer=does.not.exist
>
> In ConsoleConsumer, the bootstrap.server, key/value deserializer are being
enforced via --consumer-property arg. It's aggregating all properties
between --consumer-property and --consumer.config. It'll prioritise kv pair
supplied via --consumer-property over the prop file.

I think you can try the following to get your implementation working

1) Provide the SerDe classes into classpath
2) Provide your consumer config file
3) Provide key/value Deserializer props via --consumer-property arg.

See how that works for you.

Thanks,

> Jorg
>
> On 2019/11/11 14:31:49, "M. Manna"  wrote:
> > You have a typo - you mean deserializer
> >
> > Please try again.
> >
> > Regards,
> >
> > On Mon, 11 Nov 2019 at 14:28, Jorg Heymans 
> wrote:
> >
> > > Don't think that option is available there, specifying
> > > 'value.deserializer' in my consumer-config.properties file gives
> > >
> > > [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was
> > > supplied but isn't a known config.
> > > (org.apache.kafka.clients.consumer.ConsumerConfig)
> > >
> > > Does there exist a description of what properties the consumer-config
> > > properties file accepts ? I could find only a few references to it in
> the
> > > documentation.
> > >
> > > Jorg
> > >
> > > On 2019/11/11 13:00:03, "M. Manna"  wrote:
> > > > Hi,
> > > >
> > > >
> > > > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans 
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I have created a class implementing Deserializer, providing an
> > > > > implementation for
> > > > >
> > > > > public String deserialize(String topic, Headers headers, byte[]
> data)
> > > > >
> > > > > that does some conditional processing based on headers, and then
> calls
> > > the
> > > > > other serde method
> > > > >
> > > > > public String deserialize(String topic, byte[] data)
> > > > >
> > > > > What i'm seeing is that kafka-console-consumer only uses the second
> > > method
> > > > > when a value deserializer is specified. Is there a way to force it
> to
> > > > > invoke the first method, so i can do processing with headers ? I
> tried
> > > > > implementing the deprecated 'ExtendedSerializer' but it does not
> make a
> > > > > difference.
> > > > >
> > > > > Thanks,
> > > > > Jorg
> > > > >
> > > >
> > > > Have you tried providing a separate prop file using consumer.config
> > > > argument? Please see the reference here:
> > > >
> > > > --consumer.config   Consumer config properties
> file.
> > > > Note
> > > >that [consumer-property]
> takes
> > > >precedence over this
> config.
> > > >
> > > > Try that and see how it goes.
> > > >
> > > > Thanks,
> > > >
> > >
> >
>


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-11 Thread M. Manna
You have a typo - you mean deserializer

Please try again.

Regards,

On Mon, 11 Nov 2019 at 14:28, Jorg Heymans  wrote:

> Don't think that option is available there, specifying
> 'value.deserializer' in my consumer-config.properties file gives
>
> [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was
> supplied but isn't a known config.
> (org.apache.kafka.clients.consumer.ConsumerConfig)
>
> Does there exist a description of what properties the consumer-config
> properties file accepts ? I could find only a few references to it in the
> documentation.
>
> Jorg
>
> On 2019/11/11 13:00:03, "M. Manna"  wrote:
> > Hi,
> >
> >
> > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans 
> wrote:
> >
> > > Hi,
> > >
> > > I have created a class implementing Deserializer, providing an
> > > implementation for
> > >
> > > public String deserialize(String topic, Headers headers, byte[] data)
> > >
> > > that does some conditional processing based on headers, and then calls
> the
> > > other serde method
> > >
> > > public String deserialize(String topic, byte[] data)
> > >
> > > What i'm seeing is that kafka-console-consumer only uses the second
> method
> > > when a value deserializer is specified. Is there a way to force it to
> > > invoke the first method, so i can do processing with headers ? I tried
> > > implementing the deprecated 'ExtendedSerializer' but it does not make a
> > > difference.
> > >
> > > Thanks,
> > > Jorg
> > >
> >
> > Have you tried providing a separate prop file using consumer.config
> > argument? Please see the reference here:
> >
> > --consumer.config   Consumer config properties file.
> > Note
> >that [consumer-property] takes
> >precedence over this config.
> >
> > Try that and see how it goes.
> >
> > Thanks,
> >
>


Re: Detecting cluster down in consumer

2019-11-11 Thread M. Manna
Hi,

On Mon, 11 Nov 2019 at 11:55, Sachin Kale  wrote:

> Hi,
>
> We are working on a prototype where we write to two Kafka cluster
> (primary-secondary) and read from one of them (based on which one is
> primary) to increase the availability. There is a flag which is used to
> determine which cluster is primary and other becomes secondary. On
> detecting primary cluster is down, secondary is promoted to primary.
>
> How do we detect cluster downtime failures in Kafka Consumer? I tried
> different things but poll() makes sure to mask all the exceptions and
> returns 0 records.
>
>
> -Sachin-
>

These couple of links suggest how to approach it..

https://www.slideshare.net/gwenshap/multicluster-and-failover-for-apache-kafka-kafka-summit-sf-17

https://www.confluent.io/blog/3-ways-prepare-disaster-recovery-multi-datacenter-apache-kafka-deployments


If you are in container world (e.g. K8s, YARN or Mesos) - using liveness
probe can help you determine if there's been a failover. But on traditional
cloud, it's simply a heartbeat mechanism that tells you whether the
services are usable or not.
An example would be to be setup monitor alerts using SolarWind (or similar
monitoring agents) and use Cruise control or Kafka-Monitor to setup alerts.

May be others can also suggest something which I cannot think of right now.


Thanks,


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-11 Thread M. Manna
Hi,


On Mon, 11 Nov 2019 at 10:58, Jorg Heymans  wrote:

> Hi,
>
> I have created a class implementing Deserializer, providing an
> implementation for
>
> public String deserialize(String topic, Headers headers, byte[] data)
>
> that does some conditional processing based on headers, and then calls the
> other serde method
>
> public String deserialize(String topic, byte[] data)
>
> What i'm seeing is that kafka-console-consumer only uses the second method
> when a value deserializer is specified. Is there a way to force it to
> invoke the first method, so i can do processing with headers ? I tried
> implementing the deprecated 'ExtendedSerializer' but it does not make a
> difference.
>
> Thanks,
> Jorg
>

Have you tried providing a separate prop file using consumer.config
argument? Please see the reference here:

--consumer.config   Consumer config properties file.
Note
   that [consumer-property] takes
   precedence over this config.

Try that and see how it goes.

Thanks,


Re: Message order and retries question

2019-11-08 Thread M. Manna
Hi,

On Fri, 8 Nov 2019 at 17:19, Jose Manuel Vega Monroy <
jose.mon...@williamhill.com> wrote:

> Hi there,
>
>
>
> I have a question about message order and retries.
>
>
>
> After checking official documentation, and asking your feedback, we set
> this kafka client configuration in each producer:
>
>
>
> retries = 1
>
> # note to ensure order enable.idempotence=true, which forcing to acks=all 
> and max.in.flight.requests.per.connection<=5
>
> enable.idempotence = true
>
> max.in.flight.requests.per.connection = 4
>
> acks = "all"
>
>
>
 The documentation also says:

> Allowing retries without setting max.in.flight.requests.per.connection to
> 1 will potentially change the ordering of records because if two batches
> are sent to a single partition, and the first fails and is retried but the
> second succeeds, then the records in the second batch may appear first.
> Note additionally that produce requests will be failed before the number of
> retries has been exhausted if the timeout configured by
> delivery.timeout.ms expires first before successful acknowledgement.
> Users should generally prefer to leave this config unset and instead use
> delivery.timeout.ms to control retry behavior.


Are you planning to do it via delivery.timeout.ms?


> However, somehow while rolling upgrade, we saw producer retrying a lot of
> times (for example, 16 times), and finally sending fine when broker was up
> and running back, with exceptions like this:
>
>
>
> Cause: org.apache.kafka.common.errors.OutOfOrderSequenceException: The
> broker received an out of order sequence number..
>
> Cause: org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition..
>
>
>
> Is that behaviour expected? It’s that retries configuration right trying
> to ensure the message order, or maybe we should remove retries
> configuration from our producers?
>
>
>
> As well we found this related to retries:
>
>
>
> The default value for the producer's retries config was changed to
> Integer.MAX_VALUE, as we introduced delivery.timeout.ms in KIP-91, which
> sets an upper bound on the total time between sending a record and
> receiving acknowledgement from the broker. By default, the delivery timeout
> is set to 2 minutes.
>
>
>
> Allowing retries without setting `max.in.flight.requests.per.connection` to 
> `1` will potentially change the ordering of records because if two batches 
> are sent to a single partition, and the first fails and is retried but the 
> second succeeds, then the records in the second batch may appear first. Note 
> additionally that produce requests will be failed before the number of 
> retries has been exhausted if the timeout configured by delivery.timeout.ms 
> expires first before successful acknowledgement. Users should generally 
> prefer to leave this config unset and instead use delivery.timeout.ms to 
> control retry behavior.
>
>
>
> Note this was faced while rolling upgrade from 2.1.1 to 2.2.1.
>
>
>
> Thanks
>
>
>
> [image:
> https://www.williamhillplc.com/content/signature/WHlogo.gif?width=180]
> 
>
> [image:
> https://www.williamhillplc.com/content/signature/senet.gif?width=180]
> 
>
> *Jose Manuel Vega Monroy *
> *Java Developer / Software Developer Engineer in Test*
>
> Direct: +*0035 0 2008038 (Ext. 8038)*
> Email: jose.mon...@williamhill.com
>
> William Hill | 6/1 Waterport Place | Gibraltar | GX11 1AA
>
>
>
>
>
>
> Confidentiality: The contents of this e-mail and any attachments
> transmitted with it are intended to be confidential to the intended
> recipient; and may be privileged or otherwise protected from disclosure. If
> you are not an intended recipient of this e-mail, do not duplicate or
> redistribute it by any means. Please delete it and any attachments and
> notify the sender that you have received it in error. This e-mail is sent
> by a William Hill PLC group company. The William Hill group companies
> include, among others, William Hill PLC (registered number 4212563),
> William Hill Organization Limited (registered number 278208), William Hill
> US HoldCo Inc, WHG (International) Limited (registered number 99191) and Mr
> Green Limited (registered number C43260). Each of William Hill PLC and
> William Hill Organization Limited is registered in England and Wales and
> has its registered office at 1 Bedford Avenue, London, WC1B 3AU, UK.
> William Hill U.S. HoldCo, Inc. is registered in Delaware and has its
> registered office at 1007 N. Orange Street, 9 Floor, Wilmington, New Castle
> County DE 19801 Delaware, United States of America. WHG (International)
> Limited is registered in Gibraltar and has its registered office at 6/1
> Waterport Place, Gibraltar. Mr Green Limited is registered in Malta and has
> its registered office at Tagliaferro Business Centre, Level 7, 14 High
> Street, Sliema SLM 1549, Malta. Unless specifically indicated otherwise,
> 

Re: Consumer Lags and receive no records anymore

2019-11-08 Thread M. Manna
Hi

On Fri, 8 Nov 2019 at 07:06, Oliver Eckle  wrote:

> Hi,
>
> Don’t get me wrong, I just want to understand what's going on.
> so how do I figure out, how much partitions are required? Trial and Error?
>
 Normally, you have to run your perf test with adequate
batching,partition,isr etc. to determine what speed/consistency is good for
you. So, it's not only trial and error - it's what you need.

> And as far as I understand, if I have null as key for the record, the
> record is stored in all partitions.
>
They are written to a specific partition each time you do a send(). If you
have 1 partition then it's irrelevant.

> Is it then not also processed by each consumer, even if I have more than
> one consumer?
>
 If you have created two consumer thread under the same consumer group,
each will get 1 partition (if you have 2). Each consumer polls and
processed data, and then commit offsets to indicate their position.

> So could you explain, why the consumer stops to get data?
>
 Unfortunately, without understanding your setup it's a broad question. So
far you're saying that you've got a sudden data burst of ~2k messages. If
the delay between consecutive poll() is long enough, there will be big
timeouts. Provided that your timeouts are still within request/session
limits, the chances are that you are still a lot slower than you should be.
This again, goes back to how you have determined your topic partition
setup. If you do general research on various Kafka blogs or Tech areas, the
general conclusion is always the same "you're slow in consuming messages".
Also, I would advise that you reproduce it in your test env (if possible)
so that you can identify where the bottleneck is.
Also, for reference

https://www.lightbend.com/blog/monitor-kafka-consumer-group-latency-with-kafka-lag-exporter

https://sematext.com/blog/kafka-consumer-lag-offsets-monitoring/

I hope this helps.

Regards,

>
> Thx
>
> -Ursprüngliche Nachricht-
> Von: M. Manna 
> Gesendet: Freitag, 8. November 2019 00:51
> An: Kafka Users 
> Betreff: Re: Consumer Lags and receive no records anymore
>
> Hi again,
>
> On Thu, 7 Nov 2019 at 23:40, Oliver Eckle  wrote:
>
> > Hi,
> >
> > slow consumers - that could be the case. But why is that an issue? I
> > mean I try to use kafka exactly for that and the ability to recover.
> > So e.g if there is some burst scenario where a lot of data arrives and
> > has to be processed, a "slow consumer" will be the default case.
> > What I could understand, that having constantly slow consumers will be
> > an issue, e.g. if there is some compaction on the topic or data will
> > be erased, without having been read.
> >
> > This is what I think about the "lagging topic"
> > The scenario is like that:
> >
> > Producer --- Topic C ---> Consumer --- Processing ---> external REST
> > Endpoint
> >
> > Sending a Record to the external REST Endpoint takes around 300ms.
> > So if I have the "burst scenario" I mentioned above, there is maybe a
> > lag of 1000-2000 records.
> > So consumer is pulling 500 and process them, which means it takes
> > around 150s for the app to process the records.
> > This could create some timeouts I guess ... so that’s the reason why I
> > try to lower the poll records to 50 e.g. cause then is takes only 15s
> > until the poll is committed.
> >
> > Yeah having some liveness probe sounds pretty elegant .. give that a
> > try ...
> > Anyway, I need to understand why that is happening to deal with the
> > scenario the correct way.. killing the consumer after he stops to
> > consume messages, seems to me more like a workaround.
> >
> > Regards
> >
> As per your previous replies, if you have 2 partitions with that topic,
> you can distribute all data between 2 consumers in your cgroup, and process
> information. But given your data burst case, I would advise you increase
> your number of partitions and spread the burst across. Just like any other
> tool, Kafka requires certain level of configuration to achieve what you
> want. I would recommend you increase your partitions and consumers to
> spread the load.
>
> Regards,
>
> >
> > -Ursprüngliche Nachricht-
> > Von: M. Manna 
> > Gesendet: Freitag, 8. November 2019 00:24
> > An: users@kafka.apache.org
> > Betreff: Re: Consumer Lags and receive no records anymore
> >
> > Hi,
> >
> > > On 7 Nov 2019, at 22:39, Oliver Eckle  wrote:
> > >
> > > Have a consumer group with one consumer for the topic .. by
> > misunderstanding I have two partitions on the topic ..
> > > Due to having no key set for

Re: Consumer Lags and receive no records anymore

2019-11-07 Thread M. Manna
Hi again,

On Thu, 7 Nov 2019 at 23:40, Oliver Eckle  wrote:

> Hi,
>
> slow consumers - that could be the case. But why is that an issue? I mean
> I try to use kafka exactly for that and the ability to recover.
> So e.g if there is some burst scenario where a lot of data arrives and has
> to be processed, a "slow consumer" will be the default case.
> What I could understand, that having constantly slow consumers will be an
> issue, e.g. if there is some compaction on the topic or data will be
> erased, without having been read.
>
> This is what I think about the "lagging topic"
> The scenario is like that:
>
> Producer --- Topic C ---> Consumer --- Processing ---> external REST
> Endpoint
>
> Sending a Record to the external REST Endpoint takes around 300ms.
> So if I have the "burst scenario" I mentioned above, there is maybe a lag
> of 1000-2000 records.
> So consumer is pulling 500 and process them, which means it takes around
> 150s for the app to process the records.
> This could create some timeouts I guess ... so that’s the reason why I try
> to lower the poll records to 50 e.g. cause then is takes only 15s until the
> poll is committed.
>
> Yeah having some liveness probe sounds pretty elegant .. give that a try
> ...
> Anyway, I need to understand why that is happening to deal with the
> scenario the correct way.. killing the consumer after he stops to consume
> messages, seems to me more like a workaround.
>
> Regards
>
As per your previous replies, if you have 2 partitions with that topic, you
can distribute all data between 2 consumers in your cgroup, and process
information. But given your data burst case, I would advise you increase
your number of partitions and spread the burst across. Just like any other
tool, Kafka requires certain level of configuration to achieve what you
want. I would recommend you increase your partitions and consumers to
spread the load.

Regards,

>
> -Ursprüngliche Nachricht-
> Von: M. Manna 
> Gesendet: Freitag, 8. November 2019 00:24
> An: users@kafka.apache.org
> Betreff: Re: Consumer Lags and receive no records anymore
>
> Hi,
>
> > On 7 Nov 2019, at 22:39, Oliver Eckle  wrote:
> >
> > Have a consumer group with one consumer for the topic .. by
> misunderstanding I have two partitions on the topic ..
> > Due to having no key set for the record - I think having several
> consumers making no sense, or am I wrong.
> >
> I am not sure why that would be an issue. If you have 1 consumer your
> cgroup, yes all the topic partitions will be assigned to that consumer.
> Slow consumer means your consumers aren’t consuming messages as fast as you
> are producing (or, fast enough).
> > Is there any possibility to work around that?
> > Cause for example on lagging topic is put to a external REST service,
> which takes around 300ms to be handled.
> What do you mean by “Lagging topic is put to an external REST service”?
> > So is lowering the max.poll.records an option?
> Polling will keep blocking continuously until minimum bytes of records
> available. Also, it sends a heartbeat per call of poll().
> > Anyhow, I could probably not avoid situations like that. Sounds to me
> like a pretty common scenario?
> > So how to deal with them? Having a health check that crush the app if no
> data is appearing anymore?
> In K8s world, you can tie this with liveness probe, if you consumers
> aren’t live and then you may chose to destroy the pod and bring them back
> up. Provided that your offset commits are adhering to how technical
> requirements, you should be able to recover based on the last committed
> offset. Try that and see how it goes.
> >
> > Regards
> >
> > -Ursprüngliche Nachricht-
> > Von: M. Manna 
> > Gesendet: Donnerstag, 7. November 2019 23:35
> > An: users@kafka.apache.org
> > Betreff: Re: Consumer Lags and receive no records anymore
> >
> > Consuming not fast/frequent enough is one of the most common reasons for
> it. Have you you checked how fast/much message you’re churning out vs. how
> many consumers you have in the group the handle the workload?
> >
> > Also, what are your partition setup for consumer groups?
> >
> >
> > Regards,
> >
> > On Thu, 7 Nov 2019 at 22:03, Oliver Eckle  wrote:
> >
> >> Using  kafka-consumer-groups.sh --bootstrap-server localhost:9092
> >> --describe -group my-app ..
> >> put the output within the logs .. also its pretty obvious, cause no
> >> data will flow anymore
> >>
> >> Regards
> >>
> >> -Ursprüngliche Nachricht-
> >> Von: M. Manna 
> >&g

Re: Kafka Partition Leader -1

2019-11-07 Thread M. Manna
Hi,

> On 7 Nov 2019, at 09:18, SenthilKumar K  wrote:
> 
> Hello Experts ,  We are observing issues in Partition(s) when the Kafka 
> broker is down & the Partition Leader Broker ID set to -1.
> 
> Kafka Version 2.2.0 
> Total No Of Brokers: 24
> Total No Of Partitions: 48
> Replication Factor: 2
> Min In sync Replicas: 1
> 
> 
> Partition Leader Replicas 
>   In Sync Replicas
> 
> See above the Partition 41 leader set to -1 ,  the replicas exist in 18,24. 
> The broker id 24 went down for due to Too many open files. 
> 
> Q: Why is it not electing the broker 18 as a leader for Partition 41?
> 
> Had a similar problem in Past, after fixing the broker the partition shows 
> correct leader. 
> 
> Kafka Properties:
> default.replication.factor = 2
> unclean.leader.election.enable = true
> min.insync.replicas = 1
> num.partitions = 48
> 
> 
> Zookeeper Active Broker ID's:
> [zk: localhost:2181(CONNECTED) 1] ls /kafka.test/brokers/ids
> [1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 3, 4, 5, 6, 7, 8, 
> 9]
> [zk: localhost:2181(CONNECTED) 2] 
> 
> Restarting ZK also didn't help.
> 
> Pls advise! How to fix this? Why is this happening?
Your attachment image didn’t come through. Also, not sure what’s your ID 
assignment strategy is, Keeping Broker iD 0 has caused some problems for me in 
the past. With ID >=1 I haven’t had any issue as such.

Also, you mentioned you “Fixed the broker” in the past. What exactly did you 
fix?

Regards,





Re: Consumer Lags and receive no records anymore

2019-11-07 Thread M. Manna
Hi,

> On 7 Nov 2019, at 22:39, Oliver Eckle  wrote:
> 
> Have a consumer group with one consumer for the topic .. by misunderstanding 
> I have two partitions on the topic .. 
> Due to having no key set for the record - I think having several consumers 
> making no sense, or am I wrong.
> 
I am not sure why that would be an issue. If you have 1 consumer your cgroup, 
yes all the topic partitions will be assigned to that consumer. Slow consumer 
means your consumers aren’t consuming messages as fast as you are producing 
(or, fast enough). 
> Is there any possibility to work around that? 
> Cause for example on lagging topic is put to a external REST service, which 
> takes around 300ms to be handled.
What do you mean by “Lagging topic is put to an external REST service”? 
> So is lowering the max.poll.records an option?
Polling will keep blocking continuously until minimum bytes of records 
available. Also, it sends a heartbeat per call of poll().
> Anyhow, I could probably not avoid situations like that. Sounds to me like a 
> pretty common scenario?
> So how to deal with them? Having a health check that crush the app if no data 
> is appearing anymore?
In K8s world, you can tie this with liveness probe, if you consumers aren’t 
live and then you may chose to destroy the pod and bring them back up. Provided 
that your offset commits are adhering to how technical requirements, you should 
be able to recover based on the last committed offset. Try that and see how it 
goes.
> 
> Regards
> 
> -Ursprüngliche Nachricht-
> Von: M. Manna  
> Gesendet: Donnerstag, 7. November 2019 23:35
> An: users@kafka.apache.org
> Betreff: Re: Consumer Lags and receive no records anymore
> 
> Consuming not fast/frequent enough is one of the most common reasons for it. 
> Have you you checked how fast/much message you’re churning out vs. how many 
> consumers you have in the group the handle the workload?
> 
> Also, what are your partition setup for consumer groups?
> 
> 
> Regards,
> 
> On Thu, 7 Nov 2019 at 22:03, Oliver Eckle  wrote:
> 
>> Using  kafka-consumer-groups.sh --bootstrap-server localhost:9092 
>> --describe -group my-app ..
>> put the output within the logs .. also its pretty obvious, cause no 
>> data will flow anymore
>> 
>> Regards
>> 
>> -Ursprüngliche Nachricht-
>> Von: M. Manna 
>> Gesendet: Donnerstag, 7. November 2019 22:10
>> An: users@kafka.apache.org
>> Betreff: Re: Consumer Lags and receive no records anymore
>> 
>> Have you checked your Kafka consumer group status ? How did you 
>> determine that your consumers are lagging ?
>> 
>> Thanks,
>> 
>> On Thu, 7 Nov 2019 at 20:55, Oliver Eckle  wrote:
>> 
>>> Hi there,
>>> 
>>> 
>>> 
>>> have pretty strange behaviour questioned here already:
>>> https://stackoverflow.com/q/58650416/7776688
>>> 
>>> 
>>> 
>>> As you could see from the logs: https://pastebin.com/yrSytSHD at a 
>>> specific point the client is stopping to receive records.
>>> 
>>> I have a strong suspicion that it relates to performance on handling 
>>> the records - so that I run into kind of timeout.
>>> 
>>> What seems to be strange, is that the client is not getting back and 
>>> heartbeats are processed successfully.
>>> 
>>> Even the consumer will be returned on inspecting the consumer group.
>>> Any idea .. kafka log has no error in it.
>>> 
>>> 
>>> 
>>> Running a cluster with 3 broker inside a Kubernetes cluster, using 
>>> the bitnami helm chart.
>>> 
>>> 
>>> 
>>> Kind Regards
>>> 
>>> Oliver
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
> 



Re: Consumer Lags and receive no records anymore

2019-11-07 Thread M. Manna
Consuming not fast/frequent enough is one of the most common reasons for
it. Have you you checked how fast/much message you’re churning out vs. how
many consumers you have in the group the handle the workload?

Also, what are your partition setup for consumer groups?


Regards,

On Thu, 7 Nov 2019 at 22:03, Oliver Eckle  wrote:

> Using  kafka-consumer-groups.sh --bootstrap-server localhost:9092
> --describe -group my-app ..
> put the output within the logs .. also its pretty obvious, cause no data
> will flow anymore
>
> Regards
>
> -Ursprüngliche Nachricht-
> Von: M. Manna 
> Gesendet: Donnerstag, 7. November 2019 22:10
> An: users@kafka.apache.org
> Betreff: Re: Consumer Lags and receive no records anymore
>
> Have you checked your Kafka consumer group status ? How did you determine
> that your consumers are lagging ?
>
> Thanks,
>
> On Thu, 7 Nov 2019 at 20:55, Oliver Eckle  wrote:
>
> > Hi there,
> >
> >
> >
> > have pretty strange behaviour questioned here already:
> > https://stackoverflow.com/q/58650416/7776688
> >
> >
> >
> > As you could see from the logs: https://pastebin.com/yrSytSHD at a
> > specific point the client is stopping to receive records.
> >
> > I have a strong suspicion that it relates to performance on handling
> > the records - so that I run into kind of timeout.
> >
> > What seems to be strange, is that the client is not getting back and
> > heartbeats are processed successfully.
> >
> > Even the consumer will be returned on inspecting the consumer group.
> > Any idea .. kafka log has no error in it.
> >
> >
> >
> > Running a cluster with 3 broker inside a Kubernetes cluster, using the
> > bitnami helm chart.
> >
> >
> >
> > Kind Regards
> >
> > Oliver
> >
> >
> >
> >
> >
> >
>
>


Re: Consumer Lags and receive no records anymore

2019-11-07 Thread M. Manna
Have you checked your Kafka consumer group status ? How did you determine
that your consumers are lagging ?

Thanks,

On Thu, 7 Nov 2019 at 20:55, Oliver Eckle  wrote:

> Hi there,
>
>
>
> have pretty strange behaviour questioned here already:
> https://stackoverflow.com/q/58650416/7776688
>
>
>
> As you could see from the logs: https://pastebin.com/yrSytSHD at a
> specific
> point the client is stopping to receive records.
>
> I have a strong suspicion that it relates to performance on handling the
> records - so that I run into kind of timeout.
>
> What seems to be strange, is that the client is not getting back and
> heartbeats are processed successfully.
>
> Even the consumer will be returned on inspecting the consumer group. Any
> idea .. kafka log has no error in it.
>
>
>
> Running a cluster with 3 broker inside a Kubernetes cluster, using the
> bitnami helm chart.
>
>
>
> Kind Regards
>
> Oliver
>
>
>
>
>
>


Re: Compare an original topic with a mirrored topic

2019-11-04 Thread M. Manna
How about High watermark check ?

Since consumers consume based on HWM, presence of the same HWM should be a
good checkpoint, no?

Regards,

On Mon, 4 Nov 2019 at 22:53, Guillaume Arnaud  wrote:

> Hi,
>
> I would like to compare the messages of an original topic with a mirrored
> topic in an other cluster to be sure that the content is the same.
>
> I see that checksum method in KafkaConsumer is deprecated:
>
> https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>
> What is the best way to do that ?
>
> Thank you,
>
> Guillaume
>


Re: Transaction error in Kafka producer perf test

2019-10-28 Thread M. Manna
Hi,

Per test is based on a set of tuning parameters e.g. batch.size, axes,
partitions, network latency etc. Your transactions are failing because your
batch has expired, (or at least, that’s what it shows on the log). You have
to tune your request timeout and batch.size correctly to improve on these.
I suggest you try  and get this right first without a non-txn producer
setup. Then attempt with txn.

Perhaps you want to recheck docs and understand what goal you want to
target (e.g. speed, consistency, balanced etc.).

Regards,


On Mon, 28 Oct 2019 at 17:58, Anindya Haldar 
wrote:

> Anyone with a pointer on this? Do transactions work reliably with Kafka
> perf test tools? If yes, then is there a way to make it work in this
> scenario?
>
> Sincerely,
> Anindya Haldar
> Oracle Responsys
>
>
> > On Oct 25, 2019, at 2:51 PM, Anindya Haldar 
> wrote:
> >
> > We are evaluating Kafka for some of our use cases. As part of that
> effort I am trying to run an experiment with a cluster we have set up, and
> using the producer perf test tool supplied with the binaries.
> >
> > Here’s the cluster info:
> >
> > Runs in Kubernetes, with 4 CPUs, 32 GB RAM, 100 GB log space allocation
> for each node.
> > 3 ZooKeeper nodes
> > 5 Kafka nodes
> >
> > Here is the topic description:
> >
> > $ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092
> --topic testtopic3
> > Topic:testtopic3PartitionCount:5ReplicationFactor:3
> Configs:min.insync.replicas=2,segment.bytes=1073741824,retention.ms <
> https://urldefense.proofpoint.com/v2/url?u=https-3A__slack-2Dredir.net_link-3Furl-3Dhttp-253A-252F-252Fretention.ms=DwIFaQ=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI=bcvrB2WHGYyMkDxuOiAX_kDm8dM_Gnhn9co-80eQpeM=d9QdJSgIZN7kFfuuRJDX3gi3J8Y7uTnC7UrLSFBdasI=
> >=360,flush.messages=1,unclean.leader.election.enable=false
> >Topic: testtopic3Partition: 0Leader: 1Replicas: 1,4,2
> Isr: 1,4,2
> >Topic: testtopic3Partition: 1Leader: 4Replicas: 4,2,3
> Isr: 4,2,3
> >Topic: testtopic3Partition: 2Leader: 2Replicas: 2,3,5
> Isr: 2,3,5
> >Topic: testtopic3Partition: 3Leader: 3Replicas: 3,5,1
> Isr: 3,5,1
> >Topic: testtopic3Partition: 4Leader: 5Replicas: 5,1,4
> Isr: 5,1,4
> >
> >
> >
> > And here is the producer test run command line and the result:
> >
> > $ bin/kafka-producer-perf-test.sh --topic testtopic3 --num-records
> 100 --throughput -1 --record-size 256 --producer-props
> bootstrap.servers=kafka-a-0.ri:9092,kafka-b-0.ri:9092,kafka-c-0.ri:9092,kafka-d-0.ri:9092,kafka-e-0.ri:9092
> acks=all batch.size=1 max.block.ms <
> https://urldefense.proofpoint.com/v2/url?u=https-3A__slack-2Dredir.net_link-3Furl-3Dhttp-253A-252F-252Fmax.block.ms=DwIFaQ=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE=vmJiAMDGSxNeZnFztNs5ITB_i_Z3h3VtLPGma9y7cKI=bcvrB2WHGYyMkDxuOiAX_kDm8dM_Gnhn9co-80eQpeM=XP3cGgt3rD1lr8r0diTLpP4FP3BwZ8spcHNDNXc3i6I=
> >=360 enable.idempotence=true max.in.flight.requests.per.connection=1
> retries=3  --transaction-duration-ms 360
> > 4100 records sent, 819.7 records/sec (0.20 MB/sec), 2572.0 ms avg
> latency, 4892.0 ms max latency.
> > 4261 records sent, 852.0 records/sec (0.21 MB/sec), 7397.2 ms avg
> latency, 9873.0 ms max latency.
> > 4216 records sent, 843.0 records/sec (0.21 MB/sec), 12383.7 ms avg
> latency, 14849.0 ms max latency.
> > 4400 records sent, 879.8 records/sec (0.21 MB/sec), 17332.0 ms avg
> latency, 19784.0 ms max latency.
> > 4354 records sent, 870.8 records/sec (0.21 MB/sec), 22349.4 ms avg
> latency, 24763.0 ms max latency.
> > 4477 records sent, 895.4 records/sec (0.22 MB/sec), 27241.1 ms avg
> latency, 29728.0 ms max latency.
> > 4366 records sent, 873.2 records/sec (0.21 MB/sec), 32218.3 ms avg
> latency, 34703.0 ms max latency.
> > 4408 records sent, 881.6 records/sec (0.22 MB/sec), 37190.6 ms avg
> latency, 39672.0 ms max latency.
> > 4159 records sent, 831.5 records/sec (0.20 MB/sec), 42135.0 ms avg
> latency, 44640.0 ms max latency.
> > 4260 records sent, 852.0 records/sec (0.21 MB/sec), 47098.0 ms avg
> latency, 49624.0 ms max latency.
> > 4360 records sent, 872.0 records/sec (0.21 MB/sec), 52137.1 ms avg
> latency, 54574.0 ms max latency.
> > 4514 records sent, 902.8 records/sec (0.22 MB/sec), 57038.1 ms avg
> latency, 59554.0 ms max latency.
> > 4273 records sent, 854.3 records/sec (0.21 MB/sec), 62001.8 ms avg
> latency, 64524.0 ms max latency.
> > 4348 records sent, 869.6 records/sec (0.21 MB/sec), 67037.8 ms avg
> latency, 69494.0 ms max latency.
> > 4039 records sent, 807.5 records/sec (0.20 MB/sec), 72009.8 ms avg
> latency, 74481.0 ms max latency.
> > 4327 records sent, 865.2 records/sec (0.21 MB/sec), 76993.8 ms avg
> latency, 79457.0 ms max latency.
> > 4307 records sent, 861.4 records/sec (0.21 MB/sec), 82011.9 ms avg
> latency, 84449.0 ms max latency.
> > 4506 records sent, 901.0 records/sec (0.22 MB/sec), 86922.6 ms avg
> latency, 89434.0 ms max 

Re: [EXTERNAL] SSL setup failing

2019-10-28 Thread M. Manna
Hi,

not sure what it means "Tries to communicate with itself". Are you talking
about local network loopback?

Also, have you tried ssl debug using openssl? What did you observe?

The exception is handshake exception. This is quite common when your cert
validation fails. How have you setup your signed certificates? Does your
CN/SAN matches with your advertised.listeners setup? Have you setup
hostname verification correctly?

Thanks,

On Mon, 28 Oct 2019 at 11:11, Péter Nagykátai  wrote:

> @Jose
>
> >It looks like communication problem between brokers.
> As I mentioned, "I can't get the first broker started". The message above
> is from when the broker tries to communicate with "itself": [Controller
> id=1001, targetBrokerId=1001]).
>
> Nevertheless, I went through the checklist and everything is in order. For
> the first couple of tries, I got different SSL errors but I could work
> those out (that time I messed up the certificates), but now the problem is:
> >> Caused by: javax.net.ssl.SSLProtocolException: *Unexpected handshake
> **message:
> server_hello*
>
> Peter
>
> On Mon, Oct 28, 2019 at 8:09 AM Jose Manuel Vega Monroy <
> jose.mon...@williamhill.com> wrote:
>
> > @Peter
> >
> > It looks like communication problem between brokers. But ensure:
> >
> > 1) Crtificates are valid and properly signed by root CA or intermediate
> > one in the chain
> > 2) Clients and brokers having private key and certificate in their
> > keystore and properly configured to point to its path
> > 3) Clients and brokers having CA certificates in the truststore and
> > properly configured to point to its path
> > 4) Clients and brokersbroker having root CA certificate in their keystore
> > and properly configured to.point to its path
> > 5) Permissions are right ones fro trustore and keystore
> >
> > Thanks
> >
> > Get Outlook for Android 
> >
> > --
> > *From:* Péter Nagykátai 
> > *Sent:* Monday, 28 October 2019, 00:13
> > *To:* users@kafka.apache.org
> > *Subject:* [EXTERNAL] SSL setup failing
> >
> > Hi!
> >
> > I'm experimenting with setting up a log ingesting cluster and Kafka would
> > be part of it. Unfortunately, I can't get the first broker started. I
> need
> > to secure the communication between a dozen nodes and Kaquiafka would
> only
> > be
> > one part of it. I have a secured node where I generate certificates for
> > every server in the cluster (with an intermediate CA). AFAIK, I need to
> use
> > '.jks' files for Kafka, so I've generated a '.p12' file from the openssl
> > certificate and key then used `keytool` to generate a keystore:
> > `keytool -importkeystore -srckeystore kafka-1.p12 -srcstoretype PKCS12
> > -alias kafka-1 -destkeystore kafka-1.jks`
> > I generated a truststore for the root and intermediate chain as well:
> > `keytool -importcert -alias ca-root -keystore truststore.jks -file
> > ca-chain.cert.pem
> >
> > Relevant part of the 'server.properties' configuration:
> > 
> > listeners=EXTERNAL://kafka-1:9092,INTERNAL://kafka-1:9093
> > advertised.listeners=EXTERNAL://kafka-1:9092,INTERNAL://kafka-1:9093
> > inter.broker.listener.name=INTERNAL
> > listener.security.protocol.map=EXTERNAL:SSL,INTERNAL:SSL
> > security.protocol=SSL
> > ssl.client.auth=required
> > ssl.truststore.location=/***/truststore.jks
> > ssl.truststore.password=*
> > ssl.keystore.location=/***/kafka-1.jks
> > ssl.keystore.password=*
> > 
> >
> > After starting Kafka (as a service) I get the the following in the
> > 'server.log':
> > >>...
> > >> INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
> > >> INFO [SocketServer brokerId=1001] Failed authentication with
> > /XXX.XXX.XXX.XXX (SSL handshake failed)
> > (org.apache.kafka.common.network.Selector)
> > >> INFO [Controller id=1001, targetBrokerId=1001] Failed authentication
> > with kafka-1/XXX.XXX.XXX.XXX (SSL handshake failed)
> > (org.apache.kafka.common.network.Selector)
> > >> ERROR [Controller id=1001, targetBrokerId=1001] Connection to node
> 1001
> > (kafka-1/XXX.XXX.XXX.XXX:9093) failed authentication due to: SSL
> handshake
> > failed (org.apache.kafka.clients.NetworkClient)
> > >>...
> > >> WARN SSL handshake failed (kafka.utils.CoreUtils$)
> > >> org.apache.kafka.common.errors.SslAuthenticationException: SSL
> handshake
> > failed
> > >> Caused by: javax.net.ssl.SSLProtocolException: Unexpected handshake
> > message: server_hello
> > >>...
> >
> > I couldn't find any lead with that error message and got stuck. Any ideas
> > what that error message means and how to solve it?
> >
> > Specs:
> > - Ubuntu 18.04.3 LTS
> > - OpenJDK Runtime Environment (build
> 11.0.4+11-post-Ubuntu-1ubuntu218.04.3)
> > - Kafka 2.2.1 (from kafka_2.12-2.2.1.tgz)
> > - OpenSSL 1.1.1
> >
> > Thank you!
> > Peter
> >
> > Confidentiality: The contents of this e-mail and any attachments
> > transmitted with it are intended to be confidential to the intended
> > recipient; and may be 

Re: On the number of partitions.

2019-10-26 Thread M. Manna
You should also check out Becket Qin’s presentation on producer performance
tuning on YouTube. Both these items should give you all positives and
negatives of having many/less portions.

Thanks,

On Sat, 26 Oct 2019 at 09:19, Manasvi Gupta  wrote:

>
> https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster
>
> This might be useful for you.
>
> On Sat 26 Oct, 2019, 12:40 PM Jungyeol Lee, 
> wrote:
>
> > I'm running a kafka cluster running only one broker with GCP
> n1-standard-2
> > instance.
> > I configured the number of partitions to 1000, but it is suffering from
> > running a broker on startup. It seems that it needs much more time for
> > topic creation. So, what I did is just set the smaller number of
> partitions
> > which is now 32. Everything works okay for now. Is this normal behavior
> or
> > performance? Or, are there any guidelines on setting the number of
> > partitions?
> >
> > Best,
> > --
> >
>


Re: Kafka 2.2.1 with OpenJDK 11

2019-10-25 Thread M. Manna
It’s in Github. Look for release tab it’s rx-2 for 2.3.1. You can download
and run it using Openjdk11 and see  how it goes.

I don’t see any Jenkins build for 2.3.1 with jdk11, but there’s definitely
a trunk build passing for jdl11 yesterday morning.



On Fri, 25 Oct 2019 at 04:16, Debraj Manna  wrote:

> Can you point me to the link where I have to check?
>
> On Thu 24 Oct, 2019, 7:54 PM M. Manna,  wrote:
>
> > Have you checked the Kafka build 2.3.1 RC2 which everyone is currently
> > voting for ? It’s worth checking for your question...
> >
> > Regards.
> > On Thu, 24 Oct 2019 at 13:31, Debraj Manna 
> > wrote:
> >
> > > Hi
> > >
> > > Does Kafka work with OpenJDK 11? I have seen the below issue which is
> > > resolved in 2.1.
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-7264
> > >
> > > But it does not mention about OpenJDK. Can someone confirm if Kafka
> 2.2.1
> > > is supported with OpenJDK 11 also?
> > >
> >
>


Re: Kafka 2.2.1 with OpenJDK 11

2019-10-24 Thread M. Manna
Have you checked the Kafka build 2.3.1 RC2 which everyone is currently
voting for ? It’s worth checking for your question...

Regards.
On Thu, 24 Oct 2019 at 13:31, Debraj Manna  wrote:

> Hi
>
> Does Kafka work with OpenJDK 11? I have seen the below issue which is
> resolved in 2.1.
>
> https://issues.apache.org/jira/browse/KAFKA-7264
>
> But it does not mention about OpenJDK. Can someone confirm if Kafka 2.2.1
> is supported with OpenJDK 11 also?
>


Re: Kafka Broker not Starting

2019-10-23 Thread M. Manna
As long as you have >1 broker (2,3, whatever) and min insyc replica is set
to 1 (set by server.properties), you should be able to stop the affected
broker(s), delete all data files, and restart them.
It would recreate all the files based on your live leader.
Before you do that, please ensure that your healthy broker is in-sync for
all topic partition.

However, if you have only 1 broker and zookeeper (and you can handle loss
of data), you should just stop the broker,  delete (*Only*) kafka data
files, and restart the broker. It will take all the latest data from ZK and
recreate the files. This works __only__  if you are using zk to store
offsets and letting kafka coordinate consumer groups.

I have just verified the 2nd approach (with 1 broker and zk only, as I have
done it long time ago), and yes it recreates the data files for Kafka.

I hope this helps,
Thanks,

On Wed, 23 Oct 2019 at 13:35, Gur Aizenberg  wrote:

> Hey,
>
> Thank you for the reply.
>
> Unfortunately an upgrade is not possible for us on this environment.
>
> What would happen if I delete the broker files when RF < 3?
> Data loss is something we can handle in order to bring this back up.
>
> Currently we have a large number of under replicated partitions as well as
> occurrences of broker failures.
>
> Thank you for the help!
>
> On Sun, Oct 20, 2019 at 5:20 PM M. Manna  wrote:
>
> > It looks like the issue is fixed in later releases. And you’re running a
> > very old Kafka version TBF.
> >
> > Would an upgrade help ? If not, if you’ve got replication enabled (RF
> >=3)
> > you could try deleting broker files and recreating them by restarting the
> > affected broker.
> >
> > Thanks,
> >
> > Thanks,
> >
> > On Sun, 20 Oct 2019 at 13:30, Gur Aizenberg 
> wrote:
> >
> > > Hey guys,
> > >
> > > I am having a problem starting my Kafka Broker - I am using Kafka 0.11.
> > >
> > > When I try to start the broker I get the error:
> > > Fatal error during KafkaServer startup. Prepare to shutdown
> > > (kafka.server.KafkaServer)
> > > java.lang.IllegalArgumentException: inconsistent range
> > >
> > > I saw that there is a bug regarding this issue:
> > > https://issues.apache.org/jira/browse/KAFKA-7401
> > >
> > > Did anyone encounter this issue? Or knows how to fix this?
> > > Any help would be appreciated!
> > >
> > >
> > >
> > >
> > > Thank you,
> > > Gur
> > >
> >
>


Re: Impact on having large number of consumers on producers / brokers

2019-10-22 Thread M. Manna
Everything has impact. You cannot keep churning loads of messages under the
same operating condition, and expect nothing to change.

You have know find out (via load testing) an optimum operating condition
(e.g. partition, batch.size etc.) for you producer/consumer to work
correctly. Remember that more topics/partition you have, the more the
complexity. Based on how many topics/partitions/consumers/producers you're
creating - the tuning of brokers may need to change accordingly.

Thanks,

On Tue, 22 Oct 2019 at 10:09, Hrishikesh Mishra  wrote:

> I wanted to understand whether broker will be unstable with large number of
> consumers or will consume face some issue like lag will increase?
>
>
> On Mon, Oct 21, 2019 at 6:55 PM Shyam P  wrote:
>
> > What are you trying to do here ? whats your objective ?
> >
> > On Sat, Oct 19, 2019 at 8:45 PM Hrishikesh Mishra 
> > wrote:
> >
> > > Can anyone please help me with this?
> > >
> > > On Fri, 18 Oct 2019 at 2:58 PM, Hrishikesh Mishra  >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I wanted to understand, having large numbers of consumers on
> > > > producer latency and brokers. I have around 7K independent consumers.
> > > Each
> > > > consumer is consuming all partitions of a topic. I have manually
> > assigned
> > > > partitions of a topic to a consumer, not using consumer groups. Each
> > > > consumer is consuming msg only one topic. Message size is small less
> > > than a
> > > > KB. Produce throughput is also low. But when produce spike come then
> > > > produce latency (here ack=1). Brokers resource is also very low. I
> want
> > > to
> > > > understand the impact on having large number of consumers on Kafka.
> > > >
> > > >
> > > > *Cluster Details: *
> > > >
> > > > Broker#: 13 (1 Broker : 14 cores & 36GB memory  )
> > > >
> > > > Kafka cluster version: 2.0.0
> > > > Kafka Java client version: 2.0.0
> > > > Number topics: ~15.
> > > > Number of consumers: 7K (all independent and manually assigned all
> > > > partitions of a topic to a consumers. One consumer is consuming all
> > > > partitions from a topic only)
> > > >
> > > >
> > > >
> > > > Regards,
> > > > Hrishikesh
> > > >
> > > >
> > >
> >
>


Re: Help Needed - Message Loss for Consumer groups in 2.3.0 (client 2.2.0)

2019-10-21 Thread M. Manna
The issue was due to the fact the some messages were corrupted and didn't
get processed at the subscriber end, so nothing to do with Kafka.
Since the error logs were not stored anywhere, we had to troubleshoot the
live system in debug mode to capture message logs.



On Mon, 21 Oct 2019 at 01:13, M. Manna  wrote:

> Hello,
>
> I have recently had some message loss for a consumer group under kafka
> 2.3.0.
>
> The client I am using is still in 2.2.0. Here is how the problem can be
> reproduced,
>
> 1) The messages were sent to 4 consumer groups, 3 of them were live and 1
> was down
> 2) When the consumer group came back online, the messages were not
> received.
>
> I am using manual offset commit (in a typical while loop-based consumer
> thread - same as Kafka Officail Docs), and I only commit offset when I have
> fully processed the messages. My Poll time is *4*s. My affected topic has
> 6 partitions and 3 replicas. The timeout and connection duration are all
> sufficiently set.
>
> The issue is only when when the consumer group (all consumers) go offline
> for a while (e.g. server down) and comes back online.
>
> Could someone please help me understanding the issue?
>
> Regards,
>


Kafka-View - Free Browsing of Cluster

2019-10-21 Thread M. Manna
Hi All,

https://github.com/SourceLabOrg/kafka-webview

Not sure if anyone has come across this. Very nice tool indeed and has
Spring boot baseline.
another one is - Kafka Magic viewer http://www.kafkamagic.com/

Are they worth covering in cwiki?

Thanks,


Help Needed - Message Loss for Consumer groups in 2.3.0 (client 2.2.0)

2019-10-20 Thread M. Manna
Hello,

I have recently had some message loss for a consumer group under kafka
2.3.0.

The client I am using is still in 2.2.0. Here is how the problem can be
reproduced,

1) The messages were sent to 4 consumer groups, 3 of them were live and 1
was down
2) When the consumer group came back online, the messages were not received.

I am using manual offset commit (in a typical while loop-based consumer
thread - same as Kafka Officail Docs), and I only commit offset when I have
fully processed the messages. My Poll time is *4*s. My affected topic has 6
partitions and 3 replicas. The timeout and connection duration are all
sufficiently set.

The issue is only when when the consumer group (all consumers) go offline
for a while (e.g. server down) and comes back online.

Could someone please help me understanding the issue?

Regards,


Re: Kafka Broker not Starting

2019-10-20 Thread M. Manna
It looks like the issue is fixed in later releases. And you’re running a
very old Kafka version TBF.

Would an upgrade help ? If not, if you’ve got replication enabled (RF >=3)
you could try deleting broker files and recreating them by restarting the
affected broker.

Thanks,

Thanks,

On Sun, 20 Oct 2019 at 13:30, Gur Aizenberg  wrote:

> Hey guys,
>
> I am having a problem starting my Kafka Broker - I am using Kafka 0.11.
>
> When I try to start the broker I get the error:
> Fatal error during KafkaServer startup. Prepare to shutdown
> (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: inconsistent range
>
> I saw that there is a bug regarding this issue:
> https://issues.apache.org/jira/browse/KAFKA-7401
>
> Did anyone encounter this issue? Or knows how to fix this?
> Any help would be appreciated!
>
>
>
>
> Thank you,
> Gur
>


Re: Broker that stays outside of the ISR, how to recover

2019-10-18 Thread M. Manna
In addition to what Peter said, I would recommend that you stop and delete
all data logs (if your replication factor is set correctly). Upon restart,
they’ll be recreated. This is of course the last time thing to do if you
cannot determine the root cause.

 The measure works well for me with my k8s deployment where a pod (broker)
is killed and recreated upon Liveness Probe failure.

Thanks,


On Fri, 18 Oct 2019 at 10:06, Peter Bukowinski  wrote:

> Hi Bart,
>
> Before changing anything, I would verify whether or not the affected
> broker is trying to catch up. Have you looked at the broker’s log? Do you
> see any errors? Check your metrics or the partition directories themselves
> to see if data is flowing into the broker.
>
> If you do want to reset the broker to have it start a fresh resync, stop
> the kafka broker service/process, 'rm -rf /path/to/kafka-logs' — check the
> value of your log.dir or log.dirs property in your server.properties file
> for the path — and then start the service again. It should check in with
> zookeeper and then start following the topic partition leaders for all the
> topic partition replicas assigned to it.
>
> -- Peter
>
> >> On Oct 18, 2019, at 12:16 AM, Bart van Deenen <
> bartvandee...@fastmail.fm> wrote:
> > Hi all
> >
> > We had a Kafka broker failure (too many open files, stupid), and now the
> partitions on that broker will no longer become part of the ISR set. It's
> been a few days (organizational issues), and we have significant amounts of
> data on the ISR partitions.
> >
> > In order to make the partitions on the broker become part of the ISR set
> again, should I:
> >
> > * increase `replica.lag.time.max.ms` on the broker to the number of ms
> that the partitions are behind. I can guesstimate the value to about 7
> days, or should I measure it somehow?
> > * stop the broker and wipe files (which ones?) and then restart it.
> Should I also do stuff on zookeeper ?
> >
> > Is there any _official_ information on how to deal with this situation?
> >
> > Thanks for helping!
>


Re: kafka connection from docker

2019-10-17 Thread M. Manna
Please check your advertised.listeners and listeners config.

Thanks,

On Thu, 17 Oct 2019 at 22:13, Wang, Shuo  wrote:

> Hi,
>
> I have a question regarding connecting to kafka broker from docker.
>
> I have zookeeper and kafka broker running on my local machine.
> I have a docker container running on the same local machine with
> --network=host
> I want to send message from inside the docker container to my local kafka
> broker.
>
> From inside the docker, I can connect to zookeeper and seek the existing
> topicsby running:
> `./bin/kafka-topics.sh --zookeeper localhost:2181 --list`
>
> But I cannot connect to the kafka broker with either the
> `kafka-console-consumer.sh` or `kafka-console-producer.sh`,
>
> by running : `bin/kafka-console-consumer.sh --bootstrap-server
> localhost:9092 --topic test`
> I get:
>
> ```
> [2019-10-17 19:12:04,097] WARN [Consumer clientId=consumer-1,
> groupId=console-consumer-99825] Error connecting to node aictjt:9092 (id: 0
> rack: null) (org.apache.kafka.clients.NetworkClient)
> java.net.UnknownHostException: aictjt
> at java.net.InetAddress.getAllByName0(InetAddress.java:1281)
> at java.net.InetAddress.getAllByName(InetAddress.java:1193)
> at java.net.InetAddress.getAllByName(InetAddress.java:1127)
> at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104)
> at
>
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
> at
>
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
> at
>
> org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
> at
>
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:943)
> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:68)
> at
>
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1114)
> at
>
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1005)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:537)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:259)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:326)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
> at
>
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:439)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:105)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
>
> ```
>
>
> any help?
>
> Thanks.
>
>
>
> -Shuo
>


Re: Brokers occasionally dropping out of cluster

2019-10-09 Thread M. Manna
Hello Peter,

have you tried setting a higher value for connection timeout ?

I am running 2.3.0 with 30s for zk  sessions and 90s for zk connection.

 I haven’t checked 2.3.1 yet, looks like you may have found something worth
checking before upgrading.

Regards,

On Tue, 8 Oct 2019 at 21:41, Peter Bukowinski  wrote:

> Greetings,
>
> I’m experiencing a concerning issue with brokers dropping out of kafka
> clusters, and I suspect it may be due to zookeeper timeouts. I have many
> clusters running kafka 2.3.1 and have seen this issue on more than a few,
> though this issue predates this version.
>
> The clusters use  zookeeper.session.timeout.ms=3, and
> zookeeper.connection.timeout.ms is unset.
>
> This is what I see in the log of broker 14 before and after the broker has
> been kicked out of its cluster:
>
> [2019-10-07 11:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52,
> dir=/data/3/kl] Found deletable segments with base offsets [1975332] due to
> retention time 360ms breach (kafka.log.Log)
> [2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52,
> dir=/data/3/kl] Scheduling log segment [baseOffset 1975332, size 92076008]
> for deletion. (kafka.log.Log)
> [2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52,
> dir=/data/3/kl] Incrementing log start offset to 2000317 (kafka.log.Log)
> [2019-10-07 11:03:56,936] INFO [Log partition=internal_test-52,
> dir=/data/3/kl] Deleting segment 1975332 (kafka.log.Log)
> [2019-10-07 11:03:56,957] INFO Deleted log
> /data/3/kl/internal_test-52/01975332.log.deleted.
> (kafka.log.LogSegment)
> [2019-10-07 11:03:56,957] INFO Deleted offset index
> /data/3/kl/internal_test-52/01975332.index.deleted.
> (kafka.log.LogSegment)
> [2019-10-07 11:03:56,958] INFO Deleted time index
> /data/3/kl/internal_test-52/01975332.timeindex.deleted.
> (kafka.log.LogSegment)
> [2019-10-07 11:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 11:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 11:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 11:42:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 11:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:02:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:42:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 1 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 12:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:12:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:42:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 13:52:27,630] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 1 milliseconds.
> (kafka.coordinator.group.GroupMetadataManager)
> [2019-10-07 14:02:27,629] INFO [GroupMetadataManager brokerId=14] Removed
> 0 expired offsets in 0 milliseconds.
> 

Re: How auto.offset.reset = latest works

2019-10-03 Thread M. Manna
If I get your question right, your concern isn't about auto.offset.reset -
it's the partition assignment.

consumer group represents parallelism. It's well-documented in Kafka
official docs. Each consumer (in a consumer group) gets fare share of job
(i.e. # partitions for a topic subscription). Due to rebalance or
disgraceful departure of consumers from a group, the partitions get
reassigned to live consumers. In that case you will end up having some
difference in workload per consumer in a group. Once again, this is
well-explained in the official doc.

Thanks,

On Thu, 3 Oct 2019 at 20:30, Hrishikesh Mishra  wrote:

> Hi,
>
> I want to understand how does *auto.offset.reset = latest *work. When
> consumer first call poll() method, will it assign the current offsets to
> consumer for all partition (when single consumer is up in a consumer
> group)? How do I know all partitions are assigned to a consumer?
>
>
> Regards
> Hrishikesh
>


Re: One Partition missing a node in ISR

2019-10-01 Thread M. Manna
I don’t think you can control replication using clients (pub/sub). And
neither should you be allowed to.

I guess we need more of such examples to understand correct use if admin
API.

Thanks,

On Tue, 1 Oct 2019 at 19:45, Sebastian Schmitz <
sebastian.schm...@propellerhead.co.nz> wrote:

> I didn't change anything with the topic... But as it only affects one
> topic my idea would be that it's related to the publisher as it's
> different for this specific topic.
>
> Can a publisher tell Kafka what to do about replication? Like having
> such an influence to have this kind of symptoms?
>
> Also the JMX-Metrics of Kafka didn't report any under-replicated
> partitions... But when running the kafka-topics.sh with
> --under-replicated-partitions it showed the ones from this topic.
>
>
> On 01-Oct-19 10:58 PM, M. Manna wrote:
> > I was going to ask you to do that :) As long as >1 replicas are in-sync
> > Kafka handles this nicely for recreating everything in the restarted
> broker.
> >
> > I am curious, do you remember manipulating something before all these
> > started? e.g. Using some topic admin command (or something along the
> line) ?
> >
> >
> > On Tue, 1 Oct 2019 at 02:00, Sebastian Schmitz <
> > sebastian.schm...@propellerhead.co.nz> wrote:
> >
> >> I deleted the topic now and with topic-auto-create enabled it was
> >> immediately recreated and all is in sync again.
> >>
> >> Will keep and eye on this to see if it happens again
> >>
> >>
> >> On 30-Sep-19 3:12 PM, Sebastian Schmitz wrote:
> >>> Hello again,
> >>>
> >>> after like 15 minutes I have now this result:
> >>>
> >>> root@kafka_node_1:/opt/kafka_2.12-2.3.0/bin#
> >>> ./kafka-reassign-partitions.sh --bootstrap-server localhost:9092
> >>> --zookeeper node1:2181 --reassignment-json-file move2.json --verify
> >>> Status of partition reassignment:
> >>> Reassignment of partition my_topic-7 completed successfully
> >>> Reassignment of partition my_topic-14 completed successfully
> >>> Reassignment of partition my_topic-8 completed successfully
> >>> Reassignment of partition my_topic-4 completed successfully
> >>> Reassignment of partition my_topic-3 completed successfully
> >>> Reassignment of partition my_topic-13 completed successfully
> >>> Reassignment of partition my_topic-1 completed successfully
> >>> Reassignment of partition my_topic-15 completed successfully
> >>> Reassignment of partition my_topic-6 completed successfully
> >>> Reassignment of partition my_topic-11 completed successfully
> >>> Reassignment of partition my_topic-0 completed successfully
> >>> Reassignment of partition my_topic-12 completed successfully
> >>> Reassignment of partition my_topic-10 completed successfully
> >>> Reassignment of partition my_topic-2 completed successfully
> >>> Reassignment of partition my_topic-9 completed successfully
> >>> Reassignment of partition my_topic-5 completed successfully
> >>>
> >>> root@kafka_node_1:/opt/kafka_2.12-2.3.0/bin# ./kafka-topics.sh
> >>> --bootstrap-server localhost:9092 --topic my_topic --describe
> >>> Topic:my_topicPartitionCount:16 ReplicationFactor:3
> >>>
> >>
> Configs:segment.bytes=1073741824,message.format.version=2.3-IV1,retention.bytes=1073741824
> >>>  Topic: my_topic   Partition: 0Leader: 1 Replicas:
> >>> 2,3,1 Isr: 1
> >>>  Topic: my_topic   Partition: 1Leader: 1 Replicas:
> >>> 3,1,2 Isr: 1
> >>>  Topic: my_topic   Partition: 2Leader: 1 Replicas:
> >>> 1,2,3 Isr: 1
> >>>  Topic: my_topic   Partition: 3Leader: 1 Replicas:
> >>> 2,1,3 Isr: 1
> >>>  Topic: my_topic   Partition: 4Leader: 1 Replicas:
> >>> 3,2,1 Isr: 1
> >>>  Topic: my_topic   Partition: 5Leader: 1 Replicas:
> >>> 1,3,2 Isr: 1
> >>>  Topic: my_topic   Partition: 6Leader: 1 Replicas:
> >>> 2,3,1 Isr: 1
> >>>  Topic: my_topic   Partition: 7Leader: 1 Replicas:
> >>> 3,1,2 Isr: 1
> >>>  Topic: my_topic   Partition: 8Leader: 1 Replicas:
> >>> 1,2,3 Isr: 1
> >>>  Topic: my_topic   Partition: 9Leader: 1 Replicas:
> >>> 2,1,3 Isr: 1
> >>>  Topic: my_topic   Partition: 10   Leader: 1 Rep

Re: One Partition missing a node in ISR

2019-10-01 Thread M. Manna
uot;topic":"my_topic","partition":2,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"my_topic","partition":9,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"my_topic","partition":5,"replicas":[3,2,1],"log_dirs":["any","any","any"]}]}
>
> >>
> >>
> >> Save this to use as the --reassignment-json-file option during rollback
> >> Successfully started reassignment of partitions.
> >>
> >> root@kafka_node_1:/opt/kafka_2.12-2.3.0/bin#
> >> ./kafka-reassign-partitions.sh --bootstrap-server localhost:9092
> >> --zookeeper atazkafkp01.aucklandtransport.govt.nz:2181
> >> --reassignment-json-file move2.json --verify
> >> Status of partition reassignment:
> >> Reassignment of partition my_topic-7 is still in progress
> >> Reassignment of partition my_topic-14 completed successfully
> >> Reassignment of partition my_topic-8 is still in progress
> >> Reassignment of partition my_topic-4 is still in progress
> >> Reassignment of partition my_topic-3 is still in progress
> >> Reassignment of partition my_topic-13 completed successfully
> >> Reassignment of partition my_topic-1 is still in progress
> >> Reassignment of partition my_topic-15 completed successfully
> >> Reassignment of partition my_topic-6 is still in progress
> >> Reassignment of partition my_topic-11 is still in progress
> >> Reassignment of partition my_topic-0 is still in progress
> >> Reassignment of partition my_topic-12 completed successfully
> >> Reassignment of partition my_topic-10 is still in progress
> >> Reassignment of partition my_topic-2 is still in progress
> >> Reassignment of partition my_topic-9 is still in progress
> >> Reassignment of partition my_topic-5 is still in progress
> >>
> >> root@kafka_node_1:/opt/kafka_2.12-2.3.0/bin# ./kafka-topics.sh
> >> --bootstrap-server localhost:9092 --topic my_topic --describe
> >> Topic:my_topicPartitionCount:16 ReplicationFactor:3
> >>
> Configs:segment.bytes=1073741824,message.format.version=2.3-IV1,retention.bytes=1073741824
> >> Topic: my_topic   Partition: 0Leader: 1 Replicas:
> >> 2,3,1 Isr: 1
> >> Topic: my_topic   Partition: 1Leader: 1 Replicas:
> >> 3,1,2 Isr: 1
> >> Topic: my_topic   Partition: 2Leader: 1 Replicas:
> >> 1,2,3 Isr: 1
> >> Topic: my_topic   Partition: 3Leader: 1 Replicas:
> >> 2,1,3 Isr: 1
> >> Topic: my_topic   Partition: 4Leader: 1 Replicas:
> >> 3,2,1 Isr: 1
> >> Topic: my_topic   Partition: 5Leader: 1 Replicas:
> >> 1,3,2 Isr: 1
> >> Topic: my_topic   Partition: 6Leader: 1 Replicas:
> >> 2,3,1 Isr: 1
> >> Topic: my_topic   Partition: 7Leader: 1 Replicas:
> >> 3,1,2 Isr: 1
> >> Topic: my_topic   Partition: 8Leader: 1 Replicas:
> >> 1,2,3 Isr: 1
> >> Topic: my_topic   Partition: 9Leader: 1 Replicas:
> >> 2,1,3 Isr: 1
> >> Topic: my_topic   Partition: 10   Leader: 1 Replicas:
> >> 3,2,1 Isr: 1
> >> Topic: my_topic   Partition: 11   Leader: 1 Replicas:
> >> 1,3,2 Isr: 1
> >> Topic: my_topic   Partition: 12   Leader: 1 Replicas:
> >> 2,3,1 Isr: 1,3,2
> >> Topic: my_topic   Partition: 13   Leader: 2 Replicas:
> >> 3,1,2 Isr: 1,3,2
> >> Topic: my_topic   Partition: 14   Leader: 3 Replicas:
> >> 1,2,3 Isr: 1,3,2
> >> Topic: my_topic   Partition: 15   Leader: 1 Replicas:
> >> 2,1,3 Isr: 1,3,2
> >>
> >> root@kafka_node_1:/opt/kafka_2.12-2.3.0/bin#
> >> ./kafka-reassign-partitions.sh --bootstrap-server localhost:9092
> >> --zookeeper atazkafkp01.aucklandtransport.govt.nz:2181
> >> --reassignment-json-file move2.json --verify
> >> Status of partition reassignment:
> >> Reassignment of partition my_topic-7 is still in progress
> >> Reassignment of partition my_topic-14 completed successfully
> >> Reassignment of partition my_topic-8 is still in progress
> >> Reassignment of partition my_topic-4 is still in progress
> >> Reassignment of partition my_topic-3 is still in progress
> >> Reassignment of partition my_topic-13 completed successfully
> >> Reassignment of partition my_topic-1 is still in progress
> >> Reassignment of partition my_topic-15 completed successfully
> >> Reassignment of partition my_topic-6 is still in progress
> >> Reassignment of partition my_topic-11 is still in progress
> >> Reassignment of partition my_topic-0 is still in progress
> >> Reassignment of partition my_topic-12 completed successfully
> >> Reassignment of partition my_topic-10 is still in progress
> >> Reassignment of partition my_topic-2 is still in progress
> >> Reassignment of partition my_topic-9 is still in progress
> >> Reassignment of partition my_topic-5 is still in progress
> >>
> >> I also checked Zookeeper for active brokers:
> >>
> >> root@kafka_node_1:/opt/kafka_2.12-2.3.0/bin# ./zookeeper-shell.sh
> >> node1:2181 ls /brokers/ids
> >> Connecting to node1:2181
> >>
> >> WATCHER::
> >>
> >> WatchedEvent state:SyncConnected type:None path:null
> >> [1, 2, 3]
> >>
> >> What's next?
> >>
> >> Thanks
> >>
> >> Sebastian
> >>
> >>
> >> On 26-Sep-19 10:04 PM, M. Manna wrote:
> >>> hello,
> >>>
> >>> Could you please try to run kafka-reassign-partitions with your topic
> >>> reassignment JSON? That doesn't require any restart, and should tell
> >>> you if
> >>> any issues with reassignment. The examples are provided in
> >>> Confluence wiki.
> >>>
> >>> I would recommend that you do a "Describe" on your topic to ensure
> >>> that all
> >>> partitions and ISR metadata is up-to-date.
> >>>
> >>> Thanks,
> >>>
> >>>
> >>>
> >>> On Thu, 26 Sep 2019 at 03:28, Sebastian Schmitz <
> >>> sebastian.schm...@propellerhead.co.nz> wrote:
> >>>
> >>>> Hello,
> >>>>
> >>>> I have one topic with 12 partitions where the partition 0 is
> >>>> missing one
> >>>> node from ISR... Is there a way I get get it back to work again
> >>>> without
> >>>> having to do some weird stuff like restarting the cluster?
> >>>> Because this missing node in ISR is causing some problems for the
> >>>> consumers...
> >>>>
> >>>> Thx
> >>>>
> >>>> Sebastian
> >>>>
> >>>>
> >>>> --
> >>>> DISCLAIMER
> >>>> This email contains information that is confidential and which
> >>>> may be
> >>>> legally privileged. If you have received this email in error please
> >>>>
> >>>> notify the sender immediately and delete the email.
> >>>> This email is intended
> >>>> solely for the use of the intended recipient and you may not use or
> >>>> disclose this email in any way.
> >>>>
>
> --
> DISCLAIMER
> This email contains information that is confidential and which
> may be
> legally privileged. If you have received this email in error please
>
> notify the sender immediately and delete the email.
> This email is intended
> solely for the use of the intended recipient and you may not use or
> disclose this email in any way.
>


Re: Kafka Broker - High CPU

2019-09-29 Thread M. Manna
1.0.1 is very old version of Kafka. Is there any chance you would consider
a rolling upgrade ?

Regards,

On Sun, 29 Sep 2019 at 22:43, Jamie  wrote:

> Have you got any producers using an older version of Kafka? Does the
> broker with high CPU usage contain the leader of any topics which don't
> have leaders on other brokers? Is the server.properties file the same
> across all brokers (minus broker.id etc)?
>
>
> Many Thanks,
> Jamie
>
> -Original Message-
> From: Sabarish Sasidharan 
> To: users 
> Sent: Sat, 28 Sep 2019 1:26
> Subject: Re: Kafka Broker - High CPU
>
> You can do a top -H to see which thread pid(s) is/are contributing to this.
> And then map to hex of that in thread dump to identify the culprits. Might
> lead to some additional clues.
>
> Regards
> Sab
>
>
> On Sat, 28 Sep 2019, 1:38 am Antony A,  wrote:
>
> > Partition Leaders are pretty evenly matched between the brokers around
> 500
> >
> > It is the kafka broker (java) process running at 550% on a 6 core VM. The
> > other brokers are running at 250% on a 4 core VMs.
> >
> > On Fri, Sep 27, 2019 at 1:44 PM Harper Henn
>  > >
> > wrote:
> >
> > > Is partition leadership spread evenly among the nodes in your cluster?
> > > Since only the leaders of a partition will service reads and writes,
> one
> > > broker could be using more CPU than the others if it was the leader for
> > > more partitions.
> > >
> > > Have you tried using a utility like top or htop? what processes does it
> > > show with the highest CPU?
> > >
> > > On Fri, Sep 27, 2019 at 12:35 PM Antony A 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am running Kafka 1.0.1 on a 7 broker cluster. On one of the brokers
> > the
> > > > CPU usage is always pegged around 98% utilization. If anyone had
> > similar
> > > > issues, please can you comment on it?
> > > >
> > > > Thanks,
> > > > AA
> > > >
> > >
> >
>


Re: Monitoring Broker/Prod/Cons on Kubernetes

2019-09-29 Thread M. Manna
Hey thanks for your response.

I was thinking along the same lines. Cruise Control comes with good set of
monitoring functions, but not ready for K8s. You would need to deploy that
as a container (no official image yet). Do you know if any k8s flavour has
been made available yet ?


Prometheus/grafana and DataDog are something we’re considering.

Thanks,

On Sun, 29 Sep 2019 at 11:26, Pere Urbón Bayes  wrote:

> Hi,
>   personally speaking my usual monitoring of choice for k8s is Prometheus +
> Grafana, but end of the day most of the monitoring solutions do a good job,
> so I would mostly ask what do you know the most. For this environments I
> would be more interested on proper container management for kafka and be
> sure that the pods are running in the best way possible.
>
> -- Pere
>
> Missatge de M. Manna  del dia dj., 26 de set. 2019 a
> les 16:01:
>
> > Hello,
> >
> > Has anyone got any experience in using monitoring tool (e.g. Prometheus,
> > DataDog, or custom) for Kafka on K8s?
> >
> > Cruise-control doesn't come with official docker image (Neither does
> Kafka
> > Manager). I was just curious to know if people usually monitor this as a
> > service through external providers, or using custom-made tools.
> >
> > Thanks,
> >
>
>
> --
> Pere Urbon-Bayes
> Software Architect
> http://www.purbon.com
> https://twitter.com/purbon
> https://www.linkedin.com/in/purbon/
>


Re: Only use SSL to encrypt the authentication messages

2019-09-29 Thread M. Manna
Hello,

Just curious, is there any article about Java 11 reducing SSL perf impact ?

Thanks,

On Sun, 29 Sep 2019 at 10:38, Pere Urbón Bayes  wrote:

> Hi,
>   if you're worried about the performance impact i would suggest :
>
> * first benchmark the impacts, by experience i would see around 30 to 40%
> performance degradation.
>
> * use java 11, and you will see a lot less performance impact when using
> ssl.
>
> -- Pere
>
> On Sun, 29 Sep 2019, 08:28 张祥 <18133622...@163.com> wrote:
>
> > Hi everyone !
> >
> >
> >  I am enabling SASL/PLAIN authentication for our Kafka and I am aware it
> > should be used with SSL encryption. But it may bring a performance
> impact.
> > So I am wondering whether it is possible that we only use SSL to encrypt
> > the authentication messages but leave the data unencrypted. Thanks.
> >
> >
> > | |
> > 张祥
> > |
> > |
> > 18133622...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
>


  1   2   3   4   >