Does kafka write key to broker?
Hi, We are using kafka 0.8.1.1 in our production cluster. I recently started specifying key as the message itself. I just realised that the key is also written to the broker which means that the data is duplicated within a keyed message. I am going to change the key. Stupid mistake. However, just out of anxiety, I want to know whether we can turn off writing the key to the broker. Any configuration I can change to achieve this? -Thanks, Mohit Kathuria
Increase partitions and replication of __consumer_offsets
Hi all, I'm using Kafka 0.8.2.1 in production. My Kafka Config is pretty much vanilla, so (as far as I understand) offsets are being written to Zookeeper. As recommended, I want to start writing offsets to Kafka instead of Zookeeper. I was surprised to see that the __consumer_offsets topic already exists. But let's assume that's ok. The topic has 50 partitions (which is the default) and a replication factor of 1. Of course, I only had 1 broker to begin with so it makes sense that the replication factor is 1 and not 3 (which is the default). Now that I've added more brokers, I want to increase the number of partitions to 200 (as recommended) and the replication factor to 3 (also as recommended). I'd like to do this before starting the process of migrating offsets to Kafka. What is the best way of increasing the partitions and replication factor of __consumer_offsets? Thanks, *Daniel Coldham* N.B. Recommendations taken from: http://kafka.apache.org/documentation.html https://cwiki.apache.org/confluence/display/KAFKA/FAQ http://www.slideshare.net/jjkoshy/offset-management-in-kafka
Re: Broker Fails to restart
Thanks, Jiangjie, Yes, we had reduced the segmetn.index.bytes to 1K in order to maintain more frequent offset index, which was required for ability to fetch start and end offsets for a given span of time say 15 mins. Ideally changing only the index.interval.bytes to 1K should have been sufficient config for that but we found that wouldn’t give expected results. We are using simple consumer offset API to fetch the offsets for given timestamps before starting to consume messages for that period, and get the same offset every time, even though data was produced during the time. -Zakee On Jun 20, 2015, at 4:23 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: It seems that your log.index.size.max.bytes was 1K and probably was too small. This will cause your index file to reach its upper limit before fully index the log segment. Jiangjie (Becket) Qin On 6/18/15, 4:52 PM, Zakee kzak...@netzero.net wrote: Any ideas on why one of the brokers which was down for a day, fails to restart with exception as below? The 10-node cluster has been up and running fine for quite a few weeks. [2015-06-18 16:44:25,746] ERROR [app=broker] [main] There was an error in one of the threads during logs loading: java.lang.IllegalArgumentException: requirement failed: Attempt to append to a full index (size = 128). (kafka.log.LogManager) [2015-06-18 16:44:25,747] FATAL [app=broker] [main] [Kafka Server 13], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.IllegalArgumentException: requirement failed: Attempt to append to a full index (size = 128). at scala.Predef$.require(Predef.scala:233) at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:198 ) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) at kafka.log.LogSegment.recover(LogSegment.scala:187) at kafka.log.Log.recoverLog(Log.scala:205) at kafka.log.Log.loadSegments(Log.scala:177) at kafka.log.Log.init(Log.scala:67) at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anon fun$apply$1.apply$mcV$sp(LogManager.scala:142) at kafka.utils.Utils$$anon$1.run(Utils.scala:54) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor. java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java :918) at java.lang.Thread.run(Thread.java:662) Thanks Zakee Old School Yearbook Pics View Class Yearbooks Online Free. Search by School Year. Look Now! http://thirdpartyoffers.netzero.net/TGL3231/558359b1bf13159b1361dst03vuc Old School Yearbook Pics View Class Yearbooks Online Free. Search by School Year. Look Now! http://thirdpartyoffers.netzero.net/TGL3255/5585fbf0f3ee17bf0407emp13duc
Re: Increase partitions and replication of __consumer_offsets
To make my question clearer: I know how to increase the partitions and the replication factor of any plain old topic. I'm worried that making changes to this internal topic could cause problems, so I'm looking for advice. Thanks, *Daniel Coldham* On Tue, Jun 23, 2015 at 3:15 PM, Daniel Coldham dani...@wix.com wrote: Hi all, I'm using Kafka 0.8.2.1 in production. My Kafka Config is pretty much vanilla, so (as far as I understand) offsets are being written to Zookeeper. As recommended, I want to start writing offsets to Kafka instead of Zookeeper. I was surprised to see that the __consumer_offsets topic already exists. But let's assume that's ok. The topic has 50 partitions (which is the default) and a replication factor of 1. Of course, I only had 1 broker to begin with so it makes sense that the replication factor is 1 and not 3 (which is the default). Now that I've added more brokers, I want to increase the number of partitions to 200 (as recommended) and the replication factor to 3 (also as recommended). I'd like to do this before starting the process of migrating offsets to Kafka. What is the best way of increasing the partitions and replication factor of __consumer_offsets? Thanks, *Daniel Coldham* N.B. Recommendations taken from: http://kafka.apache.org/documentation.html https://cwiki.apache.org/confluence/display/KAFKA/FAQ http://www.slideshare.net/jjkoshy/offset-management-in-kafka
Re: Does kafka write key to broker?
Hey Mohit, Unfortunately, I don't think there's any such configuration. By the way, there are some pretty cool things you can do with keys in Kafka (such as semantic partitioning and log compaction). I don't know if they would help in your use case, but it might be worth checking out http://kafka.apache.org/documentation.html#design for more details. -Jason On Tue, Jun 23, 2015 at 8:18 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Hi, We are using kafka 0.8.1.1 in our production cluster. I recently started specifying key as the message itself. I just realised that the key is also written to the broker which means that the data is duplicated within a keyed message. I am going to change the key. Stupid mistake. However, just out of anxiety, I want to know whether we can turn off writing the key to the broker. Any configuration I can change to achieve this? -Thanks, Mohit Kathuria
Re: Is trunk safe for production?
Yes and no. We're running a version about a month behind trunk at any given time here at LinkedIn. That's generally the amount of time we spend testing and going through our release process internally (less if there are no problems). So it can be done. That said, we also have several Kafka contributors and a committer who work on Kafka constantly here. When talking to others about how we run Kafka at LinkedIn, I usually say we run trunk so you don't have to. Unless you have someone to track down and work on fixing the bugs, it's probably a good idea to stick with the release versions, unless you run in a development environment where you can tolerate failures and performance regressions. -Todd On Tue, Jun 23, 2015 at 2:59 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: I am planning to use for the producer part. How stable is trunk generally? -- Regards Vamsi Subhash -- -- This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. If you are not the intended recipient you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. Although Flipkart has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments
Re: Does kafka write key to broker?
Hi Mohit, If you instantiate the keyed message with val topic = topic val value = value val message = new KeyedMessage[String, String](topic, value); Then the key in the KeyedMessage will be null. Hope this helps! Thanks, Liquan On Tue, Jun 23, 2015 at 8:18 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Hi, We are using kafka 0.8.1.1 in our production cluster. I recently started specifying key as the message itself. I just realised that the key is also written to the broker which means that the data is duplicated within a keyed message. I am going to change the key. Stupid mistake. However, just out of anxiety, I want to know whether we can turn off writing the key to the broker. Any configuration I can change to achieve this? -Thanks, Mohit Kathuria -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Is trunk safe for production?
Yes new features are a big part of it and sometimes bug fixes/improvements. Bug fixes are mostly due to being on trunk, but some aren't necessarily introduced on trunk. For e.g., we would like to do a broader roll-out of the new producer, but KAFKA-2121 (adding a request timeout to NetworkClient) actually blocks that effort. (The reason for that being we have occasional broker hardware failures given the size of our deployment which can actually cause producers under certain circumstances to become wedged). Joel On Tue, Jun 23, 2015 at 09:54:30AM -0700, Gwen Shapira wrote: Out of curiosity, why do you want to run trunk? General fondness for cutting edge stuff? Or are there specific features in trunk that you need? Gwen On Tue, Jun 23, 2015 at 2:59 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: I am planning to use for the producer part. How stable is trunk generally? -- Regards Vamsi Subhash -- -- This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. If you are not the intended recipient you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. Although Flipkart has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments
Re: data loss - replicas
Thanks, Joel. I know I remember a case where we had a difference like this between two brokers, and it was not due to retention settings or some other problem, but I can't remember exactly what we determined it was. -Todd On Mon, Jun 22, 2015 at 4:22 PM, Joel Koshy jjkosh...@gmail.com wrote: The replicas do not have to decompress/recompress so I don't think that would contribute to this. There may be some corner cases such as: - Multiple unclean leadership elections in sequence - Changing the compression codec for a topic on the fly - different brokers may see this config change at almost (but not exactly) the same time, but not sure if you are using that feature. You may want to use the DumpLogSegments tool to actually compare the offsets present in both log files. On Mon, Jun 22, 2015 at 08:55:40AM -0700, Todd Palino wrote: I assume that you are considering the data loss to be the difference in size between the two directories? This is generally not a good guideline, as the batching and compression will be different between the two replicas. -Todd On Mon, Jun 22, 2015 at 7:26 AM, Nirmal ram nirmal106110...@gmail.com wrote: Hi, I noticed a data loss while storing in kafka logs. Generally, leader hands the request to followers, is there a data loss in that process? topic 'jun8' with 2 replicas and 8 partitions *Broker 1*[user@ jun8-6]$ ls -ltr total 7337500 -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log -rw-rw-r-- 1 user user1127512 Jun 22 12:45 15195331.index -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log -rw-rw-r-- 1 user user1108544 Jun 22 12:48 16509739.index -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log -rw-rw-r-- 1 user user1129064 Jun 22 12:52 17823869.index -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log -rw-rw-r-- 1 user user1161152 Jun 22 13:17 19136798.index -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log -rw-rw-r-- 1 user user1152448 Jun 22 13:21 20451309.index *-rw-rw-r-- 1 user user 1073740588 Jun 22 13:39 21764229.log* -rw-rw-r-- 1 user user1241168 Jun 22 13:39 21764229.index -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log -rw-rw-r-- 1 user user 10485760 Jun 22 13:42 23077448.index [user@ jun8-6]$ *Broker 2*[user@ jun8-6]$ ls -ltr total 7340468 -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log -rw-rw-r-- 1 user user1857144 Jun 22 12:45 15195331.index -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log -rw-rw-r-- 1 user user1857168 Jun 22 12:48 16509739.index -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log -rw-rw-r-- 1 user user1857752 Jun 22 12:52 17823869.index -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log -rw-rw-r-- 1 user user1857440 Jun 22 13:17 19136798.index -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log -rw-rw-r-- 1 user user1856968 Jun 22 13:21 20451309.index *-rw-rw-r-- 1 user user 1073722781 Jun 22 13:39 21764229.log* -rw-rw-r-- 1 user user1762288 Jun 22 13:39 21764229.index -rw-rw-r-- 1 user user 10485760 Jun 22 13:42 23077448.index -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log [user@ jun8-6]$ -- Joel
Re: Is trunk safe for production?
Out of curiosity, why do you want to run trunk? General fondness for cutting edge stuff? Or are there specific features in trunk that you need? Gwen On Tue, Jun 23, 2015 at 2:59 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: I am planning to use for the producer part. How stable is trunk generally? -- Regards Vamsi Subhash -- -- This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. If you are not the intended recipient you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. Although Flipkart has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments
high level consumer memory footprint
Hi, I was just wondering if there is any difference in the memory footprint of a high level consumer when: 1. the consumer is live and continuously consuming messages with no backlogs 2. when the consumer is down for quite some time and needs to be brought up to clear the backlog. My test case with kafka 0.8.2.1 using only one topic has: Setup: 6 brokers and 3 zookeeper nodes Message Size: 1 MB Producer rate: 100 threads with 1000 messages per thread No. of partitions in topic: 100 Consumer threads: 100 consumer threads in the same group I initially started producer and consumer on the same java process with a heap size 1 GB. The producer could send all the messages to broker. But the consumer started throwing OutOfMemory exceptions after consuming 26k messages. Upon restarting the process with 5 GB heap, the consumer consumed around 4.8k messages before going OOM (while clearing a backlog of around 74k). The rest of the messages got consumed when I bumped up heap to 10 GB. On the consumer, I have the default values for fetch.message.max.bytes and queued.max.message.chunks. If the calculation (fetch.message.max.bytes)*(queued.max.message.chunks)*(no. of consumer threads) holds good for consumer, then 1024*1024*10*100 (close to 1GB) is well below the 5GB heap allocated. Did I leave something out of this calculation? Regards, Kris
Re: Is trunk safe for production?
@Gwen I want to patch this JIRA https://issues.apache.org/jira/browse/KAFKA-1865 to 0.8.2.1. So, I was thinking instead of patching it can we run it against the trunk as I see other producer changes also pushed to trunk. We are facing latency problems with the current producer (sent out a separate mail reg this) and hence want to try these patches. Other than these, I do apply patches like this JIRA https://issues.apache.org/jira/browse/KAFKA-1977 to track offsets of high-level-consumers.etc. which might or might-not get merged to solve our use-cases for now. But it is definitely not for cutting-edge stuff as I don't want to loose sleep due to bugs on production. Hence this email :) @Joel/others I have gone through the producer changes and I see there are many changes in the trunk after 0.8.2.1. Can you once look at my other email reg. latencies. If the patches solves my problem I am ready to apply them from trunk and test it on stage (with our load and use-cases) and then production. https://issues.apache.org/jira/browse/KAFKA-1977 On Tue, Jun 23, 2015 at 10:59 PM, Joel Koshy jjkosh...@gmail.com wrote: Yes new features are a big part of it and sometimes bug fixes/improvements. Bug fixes are mostly due to being on trunk, but some aren't necessarily introduced on trunk. For e.g., we would like to do a broader roll-out of the new producer, but KAFKA-2121 (adding a request timeout to NetworkClient) actually blocks that effort. (The reason for that being we have occasional broker hardware failures given the size of our deployment which can actually cause producers under certain circumstances to become wedged). Joel On Tue, Jun 23, 2015 at 09:54:30AM -0700, Gwen Shapira wrote: Out of curiosity, why do you want to run trunk? General fondness for cutting edge stuff? Or are there specific features in trunk that you need? Gwen On Tue, Jun 23, 2015 at 2:59 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: I am planning to use for the producer part. How stable is trunk generally? -- Regards Vamsi Subhash -- -- This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. If you are not the intended recipient you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. Although Flipkart has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments -- Regards Vamsi Subhash -- -- This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. If you are not the intended recipient you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. Although Flipkart has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments
Best Practices for Java Consumers
Hello Is there a good reference for best practices on running Java consumers? I'm thinking a FAQ format. - How should we run them? We are currently running them in Tomcat on Ubuntu, are there other approaches using services? Maybe the service wrapper http://wrapper.tanukisoftware.com/doc/english/download.jsp? - How should we monitor them? I going to try https://github.com/stealthly/metrics-kafka - How do we scale? I'm guessing we run more servers but we could also run more threads? What are the tradeoffs? - Links to code examples - How do we make consumers robust? i.e. Best practices for exceptions handling. We have noticed if our code has a NPE it's stops consuming. Also if this doesn't exist I would be willing to start this document. I would think it would be near this page https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Thanks, Tom
Re: data loss - replicas
It seems you might have run that on the last log segment. Can you run it on 21764229.log on both brokers and compare? I'm guessing there may be a message-set with a different compression codec that may be causing this. Thanks, Joel On Tue, Jun 23, 2015 at 01:06:16PM +0530, nirmal wrote: Hi, i ran DumpLogSegments. *Broker 1* offset: 23077447 position: 1073722324 isvalid: true payloadsize: 431 magic: 0 compresscodec: NoCompressionCodec crc: 895349554 *Broker 2* offset: 23077447 position: 1073740131 isvalid: true payloadsize: 431 magic: 0 compresscodec: NoCompressionCodec crc: 895349554 Thanks On 06/23/2015 04:52 AM, Joel Koshy wrote: The replicas do not have to decompress/recompress so I don't think that would contribute to this. There may be some corner cases such as: - Multiple unclean leadership elections in sequence - Changing the compression codec for a topic on the fly - different brokers may see this config change at almost (but not exactly) the same time, but not sure if you are using that feature. You may want to use the DumpLogSegments tool to actually compare the offsets present in both log files. On Mon, Jun 22, 2015 at 08:55:40AM -0700, Todd Palino wrote: I assume that you are considering the data loss to be the difference in size between the two directories? This is generally not a good guideline, as the batching and compression will be different between the two replicas. -Todd On Mon, Jun 22, 2015 at 7:26 AM, Nirmal ram nirmal106110...@gmail.com wrote: Hi, I noticed a data loss while storing in kafka logs. Generally, leader hands the request to followers, is there a data loss in that process? topic 'jun8' with 2 replicas and 8 partitions *Broker 1*[user@ jun8-6]$ ls -ltr total 7337500 -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log -rw-rw-r-- 1 user user1127512 Jun 22 12:45 15195331.index -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log -rw-rw-r-- 1 user user1108544 Jun 22 12:48 16509739.index -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log -rw-rw-r-- 1 user user1129064 Jun 22 12:52 17823869.index -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log -rw-rw-r-- 1 user user1161152 Jun 22 13:17 19136798.index -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log -rw-rw-r-- 1 user user1152448 Jun 22 13:21 20451309.index *-rw-rw-r-- 1 user user 1073740588 Jun 22 13:39 21764229.log* -rw-rw-r-- 1 user user1241168 Jun 22 13:39 21764229.index -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log -rw-rw-r-- 1 user user 10485760 Jun 22 13:42 23077448.index [user@ jun8-6]$ *Broker 2*[user@ jun8-6]$ ls -ltr total 7340468 -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log -rw-rw-r-- 1 user user1857144 Jun 22 12:45 15195331.index -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log -rw-rw-r-- 1 user user1857168 Jun 22 12:48 16509739.index -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log -rw-rw-r-- 1 user user1857752 Jun 22 12:52 17823869.index -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log -rw-rw-r-- 1 user user1857440 Jun 22 13:17 19136798.index -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log -rw-rw-r-- 1 user user1856968 Jun 22 13:21 20451309.index *-rw-rw-r-- 1 user user 1073722781 Jun 22 13:39 21764229.log* -rw-rw-r-- 1 user user1762288 Jun 22 13:39 21764229.index -rw-rw-r-- 1 user user 10485760 Jun 22 13:42 23077448.index -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log [user@ jun8-6]$ -- Joel
Re: Best Practices for Java Consumers
I don't know of any such resource, but I'll be happy to help contribute from my experience. I'm sure others would too. Do you want to start one? Gwen On Tue, Jun 23, 2015 at 2:03 PM, Tom McKenzie thomaswmcken...@gmail.com wrote: Hello Is there a good reference for best practices on running Java consumers? I'm thinking a FAQ format. - How should we run them? We are currently running them in Tomcat on Ubuntu, are there other approaches using services? Maybe the service wrapper http://wrapper.tanukisoftware.com/doc/english/download.jsp? - How should we monitor them? I going to try https://github.com/stealthly/metrics-kafka - How do we scale? I'm guessing we run more servers but we could also run more threads? What are the tradeoffs? - Links to code examples - How do we make consumers robust? i.e. Best practices for exceptions handling. We have noticed if our code has a NPE it's stops consuming. Also if this doesn't exist I would be willing to start this document. I would think it would be near this page https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Thanks, Tom
No key specified when sending the message to Kafka
I have the following code snippet that use Kafka Producer to send message(No key is specified in the KeyedMessage): val data = new KeyedMessage[String, String](topicName, msg); Kafka_Producer.send(data) Kafka_Producer is an instance of kafka.producer.Producer. With above code, I observed that the message sent to kafka is not partitioned(That is, all the messages are pushed to partition 0). If I give the message a key, they it can be partitioned across the topic. So, my question is: If no key is provided in the message , will Kafka producer not automatically partition the message with some built-in balancing algorithm? Thanks bit1...@163.com
Re: data loss - replicas
Hi, i ran DumpLogSegments. *Broker 1* offset: 23077447 position: 1073722324 isvalid: true payloadsize: 431 magic: 0 compresscodec: NoCompressionCodec crc: 895349554 *Broker 2* offset: 23077447 position: 1073740131 isvalid: true payloadsize: 431 magic: 0 compresscodec: NoCompressionCodec crc: 895349554 Thanks On 06/23/2015 04:52 AM, Joel Koshy wrote: The replicas do not have to decompress/recompress so I don't think that would contribute to this. There may be some corner cases such as: - Multiple unclean leadership elections in sequence - Changing the compression codec for a topic on the fly - different brokers may see this config change at almost (but not exactly) the same time, but not sure if you are using that feature. You may want to use the DumpLogSegments tool to actually compare the offsets present in both log files. On Mon, Jun 22, 2015 at 08:55:40AM -0700, Todd Palino wrote: I assume that you are considering the data loss to be the difference in size between the two directories? This is generally not a good guideline, as the batching and compression will be different between the two replicas. -Todd On Mon, Jun 22, 2015 at 7:26 AM, Nirmal ram nirmal106110...@gmail.com wrote: Hi, I noticed a data loss while storing in kafka logs. Generally, leader hands the request to followers, is there a data loss in that process? topic 'jun8' with 2 replicas and 8 partitions *Broker 1*[user@ jun8-6]$ ls -ltr total 7337500 -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log -rw-rw-r-- 1 user user1127512 Jun 22 12:45 15195331.index -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log -rw-rw-r-- 1 user user1108544 Jun 22 12:48 16509739.index -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log -rw-rw-r-- 1 user user1129064 Jun 22 12:52 17823869.index -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log -rw-rw-r-- 1 user user1161152 Jun 22 13:17 19136798.index -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log -rw-rw-r-- 1 user user1152448 Jun 22 13:21 20451309.index *-rw-rw-r-- 1 user user 1073740588 Jun 22 13:39 21764229.log* -rw-rw-r-- 1 user user1241168 Jun 22 13:39 21764229.index -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log -rw-rw-r-- 1 user user 10485760 Jun 22 13:42 23077448.index [user@ jun8-6]$ *Broker 2*[user@ jun8-6]$ ls -ltr total 7340468 -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log -rw-rw-r-- 1 user user1857144 Jun 22 12:45 15195331.index -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log -rw-rw-r-- 1 user user1857168 Jun 22 12:48 16509739.index -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log -rw-rw-r-- 1 user user1857752 Jun 22 12:52 17823869.index -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log -rw-rw-r-- 1 user user1857440 Jun 22 13:17 19136798.index -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log -rw-rw-r-- 1 user user1856968 Jun 22 13:21 20451309.index *-rw-rw-r-- 1 user user 1073722781 Jun 22 13:39 21764229.log* -rw-rw-r-- 1 user user1762288 Jun 22 13:39 21764229.index -rw-rw-r-- 1 user user 10485760 Jun 22 13:42 23077448.index -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log [user@ jun8-6]$
Batch producer latencies and flush()
Hi, We are using the batch producer of 0.8.2.1 and we are getting very bad latencies for the topics. We have ~40K partitions now in a 20-node cluster. - We have many topics and each with messages published to them varying. Ex: some topics take 10k/sec and other 2000/minute. - We are seeing latencies of 99th percentile 2sec and 95th percentile of 1sec. - The current parameters that are tunable are batch size, buffer size and linger. We monitor the metrics for the new producer and tuned the above accordingly. Still, we are not able to get any improvements. Batch size in a sense didn't matter after increasing from 64KB (we increased it till 1MB). - We also noticed that the record queue time is high (2-3sec). Documentation describes that this is the time records wait in the accumulator to be sent. Later looking at the code in the trunk, I see that the batch size set is same for all the TopicPartitions and each have their own RecordBatch. Also, flush() method is added in the latest code. We want to have an upper bound on the latencies for every message push irrespective of the incoming rate. Can we achieve it by following logic: - Wait until X-Kb of batch size / Topic Partition is reached (or) - Wait for Y-ms If either of them is reached, flush the producer records. Can this be part of the producer code itself? This will avoid the case of records getting accumulated for 2-3 sec. Please correct me if the analysis is wrong and suggest me on how do we improve latencies of the new producer. Thanks. -- Regards Vamsi Subhash -- -- This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. If you are not the intended recipient you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. Although Flipkart has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments
Is trunk safe for production?
I am planning to use for the producer part. How stable is trunk generally? -- Regards Vamsi Subhash -- -- This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. If you are not the intended recipient you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited. Although Flipkart has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments
High level consumer rebalance question
Hi, I have 3 high level consumers with the same group id. One of the consumer goes down, I know rebalance will kick in in the remaining two consumers. What happens if one of the remaining consumers is very slow during rebalancing and it hasn't released ownership of some of the topics will the other consumer claim the ownership of those topics that are currently owned by the slow consumer (assuming those topics will be assigned to the other consumer after a successful rebalance) and start consuming?
Re: No key specified when sending the message to Kafka
It does balance data, but is sticky over short periods of time (for some definition of short...). See this FAQ for an explanation: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified ? This behavior has been changed in the new producer to work the way you expected, and can be overridden by providing your own Partitioner interface. On Tue, Jun 23, 2015 at 8:28 PM, bit1...@163.com bit1...@163.com wrote: I have the following code snippet that use Kafka Producer to send message(No key is specified in the KeyedMessage): val data = new KeyedMessage[String, String](topicName, msg); Kafka_Producer.send(data) Kafka_Producer is an instance of kafka.producer.Producer. With above code, I observed that the message sent to kafka is not partitioned(That is, all the messages are pushed to partition 0). If I give the message a key, they it can be partitioned across the topic. So, my question is: If no key is provided in the message , will Kafka producer not automatically partition the message with some built-in balancing algorithm? Thanks bit1...@163.com -- Thanks, Ewen
Re: Consumer rebalancing based on partition sizes?
Current partition assignment only has a few limited options -- see the partition.assignment.strategy consumer option (which seems to be listed twice, see the second version for a more detailed explanation). There has been some discussion of making assignment strategies user extensible to support use cases like this. Is there a reason your data is unbalanced that might be avoidable? Ideally good hashing of keys combined with a large enough number of keys with reasonable data distribution across keys (not necessarily uniform) leads to a reasonable balance, although there are certainly some workloads that are so skewed that this doesn't work out. On Tue, Jun 23, 2015 at 7:34 PM, Joel Ohman maelstrom.thunderb...@gmail.com wrote: Hello! I'm working with a topic of largely variable partition sizes. My biggest concern is that I have no control over which keys are assigned to which consumers in my consumer group, as the amount of data my consumer sees is directly reflected on it's work load. Is there a way to distribute partitions to consumers evenly based on the size of each partition? The provided Consumer Rebalancing Algorithm prioritizes assigning consumers even numbers of partitions, regardless of their size. Regards, Joel -- Thanks, Ewen
Consumer rebalancing based on partition sizes?
Hello! I'm working with a topic of largely variable partition sizes. My biggest concern is that I have no control over which keys are assigned to which consumers in my consumer group, as the amount of data my consumer sees is directly reflected on it's work load. Is there a way to distribute partitions to consumers evenly based on the size of each partition? The provided Consumer Rebalancing Algorithm prioritizes assigning consumers even numbers of partitions, regardless of their size. Regards, Joel