Re: Kafka on DC/OS
Hi Abhimanyu, What errors are you seeing? And which version of DCOS are you running as well? Tim On Fri, Jul 22, 2016 at 6:14 AM, Chakrabarty, Abhimanyuwrote: > I had a question regarding Kafka on DC/OS because whenever we try to install > the Kafka package it always shows that it is deploying in the marathon UI > and gives us an error when we search for it using "dcos kafka".We have also > tried using "dcos package install kafka" and tried to add the brokers but it > doesn't help.At the current moment we have two brokers(broker0 and broker 1 > running in the marathon even after removing the last installed kafka).We > have very little experience in using kafka so any detailed help would be > very helpful. > The second question is whether Kafka Streams is included in the current > Kafka package provided by DC/OS.I am attaching the screenshot with the > email,would be glad to get your help. > Thank you very much. > > Regards, > Abhimanyu
Re: How does Cloudera manager Collects Kafka Metrics
That's all information available from the jmx endpoints in Kafka. Tim On Fri, Mar 25, 2016 at 1:21 PM, yeshwanth kumarwrote: > can someone explain, how Cloudera manager Collects Kafka Metrics, such as > TotalMessages in a Topic, Total Bytes read and written from and into Kafka. > > > please let me know > > Thanks, > -Yeshwanth > Can you Imagine what I would do if I could do all I can - Art of War
Re: Problem deleting topics in 0.8.2?
Hi Jeff, The controller should have a Topic deletion thread running coordinating the delete in the cluster, and the progress should be logged to the controller log. Can you look at the controller log to see what's going on? Tim On Wed, Mar 4, 2015 at 10:28 AM, Jeff Schroeder jeffschroe...@computer.org wrote: So I've got 3 kafka brokers that were started with delete.topic.enable set to true. When they start, I can see in the logs that the property was successfully set. The dataset in each broker is only approximately 2G (per du). When running kafaka-delete.sh with the correct arguments to delete all of the topics, it says that the topic is marked for deletion. When running again, it says that the topic is already marked for deletion. From reading the documentation, my understanding is that one of the 10 (default) background threads would eventually process the deletes, and clean up both the topics in zookeeper, and the actual data on disk. In reality, it didnt seem to delete the data on disk or remove anything in zookeeper. What is the correct way to remove a topic in kafka 0.8.2 and what is the expected timeframe for that to complete expected to be? My solution was stopping the brokers and rm -rf /var/lib/kafka/*, but that is clearly a very poor one once we are done testing our kafka + storm setup. -- Jeff Schroeder Don't drink and derive, alcohol and analysis don't mix. http://www.digitalprognosis.com
Re: Delete topic API in 0.8.2
I believe that's the only way it's supported from the CLI. Delete topic actually fully removes the topic from the cluster, which also includes cleaning the logs and removing it from zookeeper (once it is fully deleted). Tim On Fri, Jan 23, 2015 at 12:13 PM, Sumit Rangwala sumitrangw...@gmail.com wrote: I am trying to find if there is a supported API to delete topic (from within my code) specifically in 0.8.2. One method that I can think of is calling kafka.admin.TopicCommand.main with the same parameters as one gives on the command line. Is this the recommended way or is there a better way of doing it? Furthermore, are the details of deleting a topic in 0.8.2 documented, since I would like to understand if delete topic only deletes the topic in zookeeper or it also cleans the logs in kafka broker as well. Sumit
Re: messages lost
What's your configured required.acks? And also are you waiting for all your messages to be acknowledged as well? The new producer returns futures back, but you still need to wait for the futures to complete. Tim On Fri, Jan 2, 2015 at 9:54 AM, Sa Li sal...@gmail.com wrote: Hi, all We are sending the message from a producer, we send 10 records, but we see only 99573 records for that topics, we confirm this by consume this topic and check the log size in kafka web console. Any ideas for the message lost, what is the reason to cause this? thanks -- Alec Li
Re: delete topic ?
Is this the latest master? I've added the delete option in trunk, but it's not in any release yet. We used to have the delete option flag but I believe we removed it that's why the documentation difference. Tim On Wed, Aug 6, 2014 at 10:53 PM, Shlomi Hazan shl...@viber.com wrote: if the answer is pointing out the 'chroot', as a word, it makes no difference. the result is the same: kafka/bin/kafka-topics.sh --zookeeper localhost:2181/chroot --delete --topic topic-3 gives the same: Command must include exactly one action: --list, --describe, --create or --alter... or should I write something instead of chroot? On Wed, Jun 18, 2014 at 2:06 PM, Shlomi Hazan shl...@viber.com wrote: Hi, Doing some evaluation testing, and accidently create a queue with wrong replication factor. Trying to delete as in: kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic replicated-topic Yeilded: Command must include exactly one action: --list, --describe, --create or –alter Event though this page (https://kafka.apache.org/documentation.html) says: And finally deleting a topic: bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name WARNING: Delete topic functionality is beta in 0.8.1. Please report any bugs that you encounter on themailing list %20us...@kafka.apache.org or JIRA https://issues.apache.org/jira/browse/KAFKA. Kafka does not currently support reducing the number of partitions for a topic or changing the replication factor. What should I do? Shlomi
Re: delete topic ?
Hi Gwen, That is a very confusing error message for sure, feel free to file a jira for both the experience cases. But in general how delete topic works is that it creates a entry in the delete_topic zk path, and the leader has a delete topic thread that watches that path and starts the topic deletion once it receives the message. It then requires rounds of coordination among all the brokers that has partitions for the topic to delete all the partitions, then finally delete the topic from zk. Therefore once the deletion finishes it will also deleted from zk. The topic command can definitely however join the topic list with the delete topic list and mark the ones being deleted with a special status. Tim On Wed, Aug 6, 2014 at 11:20 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Timothy, While we are on the subject, few questions/comments (based on the trunk implementation of delete topic command): * After deleting a topic, I still see it when listing topics. Is the expected behavior? Should it disappear after some time? * When does the actual deletion gets triggered? * If I try to delete a topic twice I get a pretty confusing exception (Node exists from zkclient). It will be nice to catch this and say Topic is being deleted or something to this effect. * Even nicer if list topics command will mark topics as being deleted. I'll probably open a separate Jira for the nice behavior, but interested in hearing your thoughts. Gwen On Wed, Aug 6, 2014 at 11:01 PM, Timothy Chen tnac...@gmail.com wrote: Is this the latest master? I've added the delete option in trunk, but it's not in any release yet. We used to have the delete option flag but I believe we removed it that's why the documentation difference. Tim On Wed, Aug 6, 2014 at 10:53 PM, Shlomi Hazan shl...@viber.com wrote: if the answer is pointing out the 'chroot', as a word, it makes no difference. the result is the same: kafka/bin/kafka-topics.sh --zookeeper localhost:2181/chroot --delete --topic topic-3 gives the same: Command must include exactly one action: --list, --describe, --create or --alter... or should I write something instead of chroot? On Wed, Jun 18, 2014 at 2:06 PM, Shlomi Hazan shl...@viber.com wrote: Hi, Doing some evaluation testing, and accidently create a queue with wrong replication factor. Trying to delete as in: kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic replicated-topic Yeilded: Command must include exactly one action: --list, --describe, --create or –alter Event though this page (https://kafka.apache.org/documentation.html) says: And finally deleting a topic: bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name WARNING: Delete topic functionality is beta in 0.8.1. Please report any bugs that you encounter on themailing list %20us...@kafka.apache.org or JIRA https://issues.apache.org/jira/browse/KAFKA. Kafka does not currently support reducing the number of partitions for a topic or changing the replication factor. What should I do? Shlomi
Re: delete topic ?
Hi Jason, You do want to wait for the next release as a lot of stability fixes are going into that. Tim On Thu, Aug 7, 2014 at 10:25 AM, Gwen Shapira gshap...@cloudera.com wrote: Looking at the delete topic patch, it looks like there were significant modifications in the controller code to support that, so I think you are out of luck. (https://reviews.apache.org/r/20745) On Thu, Aug 7, 2014 at 8:18 AM, Jason Rosenberg j...@squareup.com wrote: Since the deletion stuff is now in trunk, would be compatible to issue the command from a jar built from trunk, against a running 0.8.1.1 cluster? Or does the cluster also have to be running trunk? (I'm guessing it does :)). I have some topics I'd like to delete, but don't want to wait for 0.8.2 (but will probably have to, I'm guessing). Jason On Thu, Aug 7, 2014 at 2:53 AM, Timothy Chen tnac...@gmail.com wrote: Hi Gwen, That is a very confusing error message for sure, feel free to file a jira for both the experience cases. But in general how delete topic works is that it creates a entry in the delete_topic zk path, and the leader has a delete topic thread that watches that path and starts the topic deletion once it receives the message. It then requires rounds of coordination among all the brokers that has partitions for the topic to delete all the partitions, then finally delete the topic from zk. Therefore once the deletion finishes it will also deleted from zk. The topic command can definitely however join the topic list with the delete topic list and mark the ones being deleted with a special status. Tim On Wed, Aug 6, 2014 at 11:20 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi Timothy, While we are on the subject, few questions/comments (based on the trunk implementation of delete topic command): * After deleting a topic, I still see it when listing topics. Is the expected behavior? Should it disappear after some time? * When does the actual deletion gets triggered? * If I try to delete a topic twice I get a pretty confusing exception (Node exists from zkclient). It will be nice to catch this and say Topic is being deleted or something to this effect. * Even nicer if list topics command will mark topics as being deleted. I'll probably open a separate Jira for the nice behavior, but interested in hearing your thoughts. Gwen On Wed, Aug 6, 2014 at 11:01 PM, Timothy Chen tnac...@gmail.com wrote: Is this the latest master? I've added the delete option in trunk, but it's not in any release yet. We used to have the delete option flag but I believe we removed it that's why the documentation difference. Tim On Wed, Aug 6, 2014 at 10:53 PM, Shlomi Hazan shl...@viber.com wrote: if the answer is pointing out the 'chroot', as a word, it makes no difference. the result is the same: kafka/bin/kafka-topics.sh --zookeeper localhost:2181/chroot --delete --topic topic-3 gives the same: Command must include exactly one action: --list, --describe, --create or --alter... or should I write something instead of chroot? On Wed, Jun 18, 2014 at 2:06 PM, Shlomi Hazan shl...@viber.com wrote: Hi, Doing some evaluation testing, and accidently create a queue with wrong replication factor. Trying to delete as in: kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic replicated-topic Yeilded: Command must include exactly one action: --list, --describe, --create or –alter Event though this page (https://kafka.apache.org/documentation.html) says: And finally deleting a topic: bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name WARNING: Delete topic functionality is beta in 0.8.1. Please report any bugs that you encounter on themailing list %20us...@kafka.apache.org or JIRA https://issues.apache.org/jira/browse/KAFKA. Kafka does not currently support reducing the number of partitions for a topic or changing the replication factor. What should I do? Shlomi
Re: delete topic ?
Yes the existing delete topic command just cleans up the topic entry in zk, but not really deleting the topic from the cluster. I have a patch that enables kafka-topics.sh to delete topic but not sure if it's merged to trunk. Tim On Jun 18, 2014, at 1:39 PM, hsy...@gmail.com hsy...@gmail.com wrote: I'm using 0.8.1.1 I use DeleteTopicCommand to delete topic args[0] = --topic; args[1] = the topic you want to delete args[2] = --zookeeper; args[3] = kafkaZookeepers; DeleteTopicCommand.main(args); You can write your own script to delete the topic, I guess. And I think it only deletes the entry in zookeeper Best, Siyuan On Wed, Jun 18, 2014 at 9:13 AM, Mark Roberts wiz...@gmail.com wrote: When we were in testing phase, we would either create a new topic with the correct details or shut the cluster down and hard kill the topic in zookeeper + local disk. In prod we have the cluster configured via configuration management and auto create turned off. The ability to delete a topic in a live, running kafka cluster is tricky, and the implementations of it have been subtly incorrect (and therefore dangerous). I know that there is work happening around that, but haven't kept up with the status of it. Maybe in 8.2? It sounds conceptually simpler to implement with the new metadata API. -Mark On Jun 18, 2014, at 4:06, Shlomi Hazan shl...@viber.com wrote: Hi, Doing some evaluation testing, and accidently create a queue with wrong replication factor. Trying to delete as in: kafka_2.10-0.8.1.1/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic replicated-topic Yeilded: Command must include exactly one action: --list, --describe, --create or -alter Event though this page (https://kafka.apache.org/documentation.html) says: And finally deleting a topic: bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name WARNING: Delete topic functionality is beta in 0.8.1. Please report any bugs that you encounter on the mailto:%20us...@kafka.apache.org mailing list or https://issues.apache.org/jira/browse/KAFKA JIRA. Kafka does not currently support reducing the number of partitions for a topic or changing the replication factor. What should I do? Shlomi
Re: Building Kafka on Mac OS X
What's the last line it's stuck on with debug flag on? Tim On Tue, Jun 17, 2014 at 4:46 PM, Jorge Marizan jorge.mari...@gmail.com wrote: I almost got it to work but still stuck compiling scala Any idea? Jorge. On Jun 17, 2014, at 7:22 AM, Jorge Marizan jorge.mari...@gmail.com wrote: It got stuck on this: ./gradlew jar The TaskContainer.add() method has been deprecated and is scheduled to be removed in Gradle 2.0. Please use the create() method instead. Building project 'core' with Scala version 2.8.0 Building project 'perf' with Scala version 2.8.0 :clients:compileJava :clients:processResources UP-TO-DATE :clients:classes :clients:jar :contrib:compileJava UP-TO-DATE :contrib:processResources UP-TO-DATE :contrib:classes UP-TO-DATE :contrib:jar :core:compileJava UP-TO-DATE :core:compileScala On Jun 17, 2014, at 12:46 AM, Steve Morin steve.mo...@gmail.com wrote: Have seen if you have a write with zero data it will hang On Jun 16, 2014, at 21:02, Timothy Chen tnac...@gmail.com wrote: Can you try running it in debug mode? (./gradlew jar -d) Tim On Mon, Jun 16, 2014 at 8:44 PM, Jorge Marizan jorge.mari...@gmail.com wrote: It just hangs there without any output at all. Jorge. On Jun 16, 2014, at 11:27 PM, Timothy Chen tnac...@gmail.com wrote: What output was it stuck on? Tim On Mon, Jun 16, 2014 at 6:39 PM, Jorge Marizan jorge.mari...@gmail.com wrote: Hi team, I’m a newcomer to Kafka, but I’m having some troubles trying to get it to run on OS X. Basically building Kafka on OS X with 'gradlew jar’ gets stuck forever without any progress (Indeed I tried to leave it building all night with no avail). Any advices will be greatly appreciated. Thanks in advance.
Re: Building Kafka on Mac OS X
So do you have the build folder generated in core/client and classes/jars compiled in them? Kafka-server-start.sh also sets the CLASSPATH to load the jar and kafka.Kafka as well, so you want to make sure they're there. Tim On Tue, Jun 17, 2014 at 9:18 PM, Jorge Marizan jorge.mari...@gmail.com wrote: Now when I try to run, it fails finding the kafka.Kafka class: kafka-server-start.sh /usr/local/etc/kafka/server.properties Error: Could not find or load main class kafka.Kafka Jorge On Jun 17, 2014, at 11:54 PM, Jorge Marizan jorge.mari...@gmail.com wrote: Not at all, I verified with ps aux and there is no Gradle processes left behind when I cancel the compile job. Jorge. On Jun 17, 2014, at 11:45 PM, Timothy Chen tnac...@gmail.com wrote: Not sure what's wrong but I'm guessing there probably can be a gradle lock somewhere. Is there other gradle processes that is hanging around? Tim Sent from my iPhone On Jun 17, 2014, at 8:35 PM, Jorge Marizan jorge.mari...@gmail.com wrote: :core:compileScala Jorge. On Jun 17, 2014, at 8:54 PM, Timothy Chen tnac...@gmail.com wrote: What's the last line it's stuck on with debug flag on? Tim On Tue, Jun 17, 2014 at 4:46 PM, Jorge Marizan jorge.mari...@gmail.com wrote: I almost got it to work but still stuck compiling scala Any idea? Jorge. On Jun 17, 2014, at 7:22 AM, Jorge Marizan jorge.mari...@gmail.com wrote: It got stuck on this: ./gradlew jar The TaskContainer.add() method has been deprecated and is scheduled to be removed in Gradle 2.0. Please use the create() method instead. Building project 'core' with Scala version 2.8.0 Building project 'perf' with Scala version 2.8.0 :clients:compileJava :clients:processResources UP-TO-DATE :clients:classes :clients:jar :contrib:compileJava UP-TO-DATE :contrib:processResources UP-TO-DATE :contrib:classes UP-TO-DATE :contrib:jar :core:compileJava UP-TO-DATE :core:compileScala On Jun 17, 2014, at 12:46 AM, Steve Morin steve.mo...@gmail.com wrote: Have seen if you have a write with zero data it will hang On Jun 16, 2014, at 21:02, Timothy Chen tnac...@gmail.com wrote: Can you try running it in debug mode? (./gradlew jar -d) Tim On Mon, Jun 16, 2014 at 8:44 PM, Jorge Marizan jorge.mari...@gmail.com wrote: It just hangs there without any output at all. Jorge. On Jun 16, 2014, at 11:27 PM, Timothy Chen tnac...@gmail.com wrote: What output was it stuck on? Tim On Mon, Jun 16, 2014 at 6:39 PM, Jorge Marizan jorge.mari...@gmail.com wrote: Hi team, I’m a newcomer to Kafka, but I’m having some troubles trying to get it to run on OS X. Basically building Kafka on OS X with 'gradlew jar’ gets stuck forever without any progress (Indeed I tried to leave it building all night with no avail). Any advices will be greatly appreciated. Thanks in advance.
Re: Building Kafka on Mac OS X
What output was it stuck on? Tim On Mon, Jun 16, 2014 at 6:39 PM, Jorge Marizan jorge.mari...@gmail.com wrote: Hi team, I’m a newcomer to Kafka, but I’m having some troubles trying to get it to run on OS X. Basically building Kafka on OS X with 'gradlew jar’ gets stuck forever without any progress (Indeed I tried to leave it building all night with no avail). Any advices will be greatly appreciated. Thanks in advance.
Re: Building Kafka on Mac OS X
Can you try running it in debug mode? (./gradlew jar -d) Tim On Mon, Jun 16, 2014 at 8:44 PM, Jorge Marizan jorge.mari...@gmail.com wrote: It just hangs there without any output at all. Jorge. On Jun 16, 2014, at 11:27 PM, Timothy Chen tnac...@gmail.com wrote: What output was it stuck on? Tim On Mon, Jun 16, 2014 at 6:39 PM, Jorge Marizan jorge.mari...@gmail.com wrote: Hi team, I’m a newcomer to Kafka, but I’m having some troubles trying to get it to run on OS X. Basically building Kafka on OS X with 'gradlew jar’ gets stuck forever without any progress (Indeed I tried to leave it building all night with no avail). Any advices will be greatly appreciated. Thanks in advance.
Re: Data loss detection
Hi Maung, If your required.acks is 1 then the producer only ensures that one broker receives the data before it's sucessfully returned to the client. Therefore if the broker crashes and lost all the data then you lose data, or similarly it can happen even before the data is fsynced. To ensure there are more copies of your data in case of failure scenarios you want to increase your required.acks to more than 1 to tolerate failuries. Also async producer doesn't wait until the data is sent before it returns, as it buffers and writes asynchronously. To ensure each write that has a succesful response is written you want to use the sync producer. Tim On Tue, Jun 3, 2014 at 2:13 PM, Maung Than maung_t...@apple.com wrote: Hi, We are seeing less data on the brokers than we send form the producers: 84 GB to 58 GB. What is the best way to ensure / detect if all data has been send properly to the brokers from the producers. Is there any logs that we can check on the producers? Configuration is 5 Brokers, 2 producers, no replication factor, async and ask is 1 and no compression. Thanks, Maung
Re: Data loss detection
By the way if you're using async producer how do you verify that you sent all the data from the producer? Do you shutdown the producer before you check? Tim On Tue, Jun 3, 2014 at 3:27 PM, Maung Than maung_t...@apple.com wrote: Thanks, Tim. We are just trying to benchmark the kafka producers and there is no issue with cluster or brokers being down in this case. We are seeing way less data on the borers after calculating the sizes of the logs on the brokers) and there is no compression. We send 84 GB, but total logs sizes are only 58 GB on the brokers. Since replication factor is zero, can we use ack other than 1? Maung On Jun 3, 2014, at 3:00 PM, Timothy Chen tnac...@gmail.com wrote: Hi Maung, If your required.acks is 1 then the producer only ensures that one broker receives the data before it's sucessfully returned to the client. Therefore if the broker crashes and lost all the data then you lose data, or similarly it can happen even before the data is fsynced. To ensure there are more copies of your data in case of failure scenarios you want to increase your required.acks to more than 1 to tolerate failuries. Also async producer doesn't wait until the data is sent before it returns, as it buffers and writes asynchronously. To ensure each write that has a succesful response is written you want to use the sync producer. Tim On Tue, Jun 3, 2014 at 2:13 PM, Maung Than maung_t...@apple.com wrote: Hi, We are seeing less data on the brokers than we send form the producers: 84 GB to 58 GB. What is the best way to ensure / detect if all data has been send properly to the brokers from the producers. Is there any logs that we can check on the producers? Configuration is 5 Brokers, 2 producers, no replication factor, async and ask is 1 and no compression. Thanks, Maung
Re: Java API to list topics and partitions
There is a Scala API. You can take a look at TopicCommand.scala as kafka-topics.sh simply calls that class. Tim On Tue, May 20, 2014 at 3:41 PM, Saurabh Agarwal (BLOOMBERG/ 731 LEX -) sagarwal...@bloomberg.net wrote: Hi, Is there java API in kafka to list topics and partitions in the kafka broker? Thanks, Saurabh.
Re: how to know kafka producer api status
It typically throws a exception in the end of the sync producer cannot deliver your message. In the case where there is a IOException or similiar exceptions that the Broker cannot deal with, I believe it will try to return UnknownError response which will then throw in the producer. In cases where it receives error codes where the producer can recover from (ie: NotLeaderForPartition), it simply retries up to the configured max retries. Tim On Fri, May 9, 2014 at 1:00 AM, Yonghui Zhao zhaoyong...@gmail.com wrote: If l use java producer api in sync mode. public void send(kafka.producer.KeyedMessageK,V message) { /* compiled code */ } How to know whether a send process is successful or failed? For example if the kafka broker disk is not accessible , will it throw exceptions?
Re: New consumer APIs
Hi Neha, Yes a way that allows each partition to be committed seperately. Couldn't remember if the new consumer allows it, but looks like it does! Tim On Fri, May 16, 2014 at 9:37 AM, Neha Narkhede neha.narkh...@gmail.com wrote: Tim, I'm going to ask you the same question :-) By per stream commit, do you mean a per partition commit like this API - public OffsetMetadata commit(MapTopicPartition, Long offsets); This API allows the consumer to commit the specified offsets only for selected partitions. Thanks, Neha On Thu, May 15, 2014 at 8:42 AM, Timothy Chen tnac...@gmail.com wrote: Also going to add that I know a per stream commit is a strong requirement for folks I know using Kafka, and seen custom code done just to do so. Tim On May 9, 2014, at 1:19 PM, Eric Sammer esam...@scalingdata.com wrote: All: I've been going over the new consumer APIs and it seems like we're squishing a lot of different concerns together into a single class. The scope of the new Consumer is kind of all over the place. Managing the lifecycle - and especially the thread safety - seems challenging. Specifically, Consumer seems to serve the following purposes: * Acts as a holder of subscription info (e.g. subscribe()). * Acts as a stream (e.g. poll(), seek()). I definitely think we want these to be separate. It's pretty common to have a consumer process that connects to the broker, creates N consumer threads, each of which working on a single stream (which could be composed of some number of partitions). In this scenario, you *really* want to explicitly control durability (e.g. commit()s) on a per-stream basis. You also have different lifecycle semantics and thread safety concerns at the stream level versus the global level. Is there a reason the API doesn't look more like: // Thread safe, owns the multiplexed connection Consumer: def subscribe(topic: String, streams: Int): Set[Stream] def close() // Release everything // Not at all thread safe, no synchronization. Stream: def commit() // Really important this be here and not on Consumer. def seek(...) def poll(duration: Long, unit: TimeUnit): List[MessageThingie] def close() // Release these partitions ... I think this also significantly reduces the complexity of the Consumer API and lets each thread in a consumer process handle stream lifecycle appropriately. Since the connection is multiplexed and things could get rebalanced, just toss an exception if the streams become invalid, forcing a resubscribe. That way we don't have crazy state logic. I'm sure I'm missing something, but I wanted to toss this out there for folks to poke at. (p.s. I *really* want per-stream commit baked into the API.) -- E. Sammer CTO - ScalingData
Re: CSharp librari and Producer Closing socket for because of error (kafka.network.Processor),java.nio.BufferUnderflowException
The C# client you're using only supports 0.7 Kafka, where 0.8 kafka is not backward compatible APIs anymore. If you want to use the latest Kafka you'll have to change the binary protocol yourself, or work with one of the other folks that has mentioend about .NET client in the mailing list. Tim On Mon, May 12, 2014 at 10:48 PM, Margusja mar...@roo.ee wrote: Hi I have kafka broker running (kafka_2.9.1-0.8.1.1) All is working. One project requires producer is written in CSharp I am not dot net programmer but I managed to write simple producer code using https://github.com/kafka-dev/kafka/blob/master/clients/csharp/README.md the code ... using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Kafka.Client; namespace DemoProducer { class Program { static void Main(string[] args) { string payload1 = kafka 1.; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); Message msg1 = new Message(payloadData1); string payload2 = kafka 2.; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); Message msg2 = new Message(payloadData2); Producer producer = new Producer(broker, 9092); producer.Send(kafkademo3, 0 , msg1 ); } } } ... In broker side I am getting the error if I executing the code above: [2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) at java.nio.ByteBuffer.get(ByteBuffer.java:694) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) [2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) I suspected that the problem is in the broker version (kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating. Now I was able to send messages using CSharp code. So is there workaround how I can use latest kafka version and CSharp ? Or What is the latest kafka version supporting CSharp producer? And one more question. In Csharp lib how can I give to producer brokers list to get fault tolerance in case one broker is down? -- Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)
Re: why kafka producer api use cpu so high?
What is your compression configuration for your producer? One of the biggest CPU source for the producer is doing compression and also checksuming. Tim On Sun, May 11, 2014 at 12:24 AM, yunbinw...@travelsky.com wrote: I write a very simple code , like this : public class LogProducer { private ProducerString,String inner; public LogProducer() throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream(producer.properties)); ProducerConfig config = new ProducerConfig(properties); inner = new ProducerString, String(config); } public void send(String topicName,String message) { if(topicName == null || message == null){ return; } KeyedMessageString, String km = new KeyedMessageString, String(topicName,message); inner.send(km); } public void close(){ inner.close(); } /** * @param args */ public static void main(String[] args) { LogProducer producer = null; try{ producer = new LogProducer(); int i=0; while(true){ producer.send(test, this is a sample); } }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){ producer.close(); } } } } ~~ and the producer.properties like this: metadata.broker.list=127.0.0.1:9092 producer.type=async serializer.class=kafka.serializer.StringEncoder batch.num.messages=200 compression.codec=snappy I run this procedure on linux, which is 4 core cpu , 16GB memory. I find this procedure using one core cpu totally , this is top command ouput: [root@localhost ~]# top top - 13:51:09 up 5 days, 13:27, 3 users, load average: 0.96, 0.48, 0.35 Tasks: 367 total, 3 running, 364 sleeping, 0 stopped, 0 zombie Cpu0 : 7.0%us, 0.3%sy, 0.0%ni, 92.0%id, 0.7%wa, 0.0%hi, 0.0%si, 0.0%st Cpu1 : 5.0%us, 0.0%sy, 0.0%ni, 95.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu2 : 5.0%us, 0.0%sy, 0.0%ni, 95.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu3 : 99.7%us, 0.3%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Mem: 16307528k total, 9398376k used, 6909152k free, 249952k buffers Swap: 8224760k total,0k used, 8224760k free, 6071348k cached why producer api use cpu so high ? or maybe I make something wrong ? by the way , the kafka version 0.8.0 .
Re: New to Kafka: ZooKeeper and Client
Hi Chris, Kafka producer doesn't require zookeeper anymore, so you can simply connect to one of the brokers directly. Tim On Tue, Apr 29, 2014 at 9:23 AM, Chris Helck chris.he...@ebs.com wrote: I have a few newbie questions. I need to create a Producer that sends messages to Kafka brokers. Does a machine running a Kafka client (Producer) need its own instance of ZooKeeper running? Or does is simply connect to the ZooKeeper that is running remotely with the Kafka brokers? ** This communication and all information contained in or attached to it (including, but not limited to market prices/levels and market commentary) (the “Information”) is for informational purposes only, is confidential, may be legally privileged and is the intellectual property of one of the companies of ICAP plc group (“ICAP”) or third parties. The Information is not, and should not be construed as, an offer, bid, recommendation or solicitation in relation to any financial instrument or investment or to participate in any particular trading strategy. The Information is not to be relied upon and is not warranted, including, but not limited, as to completeness, timeliness or accuracy and is subject to change without notice. All representations and warranties are expressly disclaimed. Access to the Information by anyone other than the intended recipient is unauthorised and any disclosure, copying or redistribution is prohibited. If you receive this message in error, please immediately delete all copies of it and notify the sender. For further information, please see ebs.com. ** --- This email has been scanned for email related threats and delivered safely by Mimecast. For more information please visit http://www.mimecast.com ---
Re: Please add Perl client on your wiki
Done, let me know if you want more changes. Tim On Tue, Apr 29, 2014 at 1:54 PM, Sergiy Zuban s.zu...@gmail.com wrote: Could someone please update Perl client information at https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Perl 1. GZIP and Snappy compression supported 2. Update formatting to match other clients formatting: - make Maintainer/License bold - convert author's name to a mailto: hyper link Thank you. -- Sergiy Zuban
Re: Kafka Performance Tuning
Hi Yashika, No logs in broker log is not normal, can you verify if you turned off logging in your log4j properties file? If it is please enable it and try again, and see what is in the logs. Tim On Thu, Apr 24, 2014 at 10:53 PM, Yashika Gupta yashika.gu...@impetus.co.in wrote: Jun, I am using Kafka 2.8.0- 0.8.0 version. There are no logs for the past month in the controller and state-change log. Though I can see dome gc logs in the kafka-home-dir/logs folder. zookeeper-gc.log kafkaServer-gc.log Yashika __ From: Jun Rao jun...@gmail.com Sent: Friday, April 25, 2014 9:03 AM To: users@kafka.apache.org Subject: Re: Kafka Performance Tuning Which version of Kafka are you using? Any error in the controller and state-change log? Thanks, Jun On Thu, Apr 24, 2014 at 7:37 PM, Yashika Gupta yashika.gu...@impetus.co.inwrote: I am running a single broker and the leader column has 0 as the value. pushkar priyadarshi priyadarshi.push...@gmail.com wrote: you can use the kafka-list-topic.sh to find out if leader for particual topic is available.-1 in leader column might indicate trouble. On Fri, Apr 25, 2014 at 6:34 AM, Guozhang Wang wangg...@gmail.com wrote: Could you double check if the topic LOGFILE04 is already created on the servers? Guozhang On Thu, Apr 24, 2014 at 10:46 AM, Yashika Gupta yashika.gu...@impetus.co.in wrote: Jun, The detailed logs are as follows: 24.04.2014 13:37:31812 INFO main kafka.producer.SyncProducer - Disconnecting from localhost:9092 24.04.2014 13:37:38612 WARN main kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic LOGFILE04 - No partition metadata for topic LOGFILE04 due to kafka.common.LeaderNotAvailableException}] for topic [LOGFILE04]: class kafka.common.LeaderNotAvailableException 24.04.2014 13:37:40712 INFO main kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 1 for 1 topic(s) Set(LOGFILE04) 24.04.2014 13:37:41212 INFO main kafka.producer.SyncProducer - Connected to localhost:9092 for producing 24.04.2014 13:37:48812 INFO main kafka.producer.SyncProducer - Disconnecting from localhost:9092 24.04.2014 13:37:48912 WARN main kafka.producer.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic LOGFILE04 - No partition metadata for topic LOGFILE04 due to kafka.common.LeaderNotAvailableException}] for topic [LOGFILE04]: class kafka.common.LeaderNotAvailableException 24.04.2014 13:37:49012 ERROR main kafka.producer.async.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: LOGFILE04 24.04.2014 13:39:96513 WARN ConsumerFetcherThread-produceLogLine2_vcmd-devanshu-1398361030812-8a0c706e-0-0 kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-produceLogLine2_vcmd-devanshu-1398361030812-8a0c706e-0-0], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 4; ClientId: produceLogLine2-ConsumerFetcherThread-produceLogLine2_vcmd-devanshu-1398361030812-8a0c706e-0-0; ReplicaId: -1; MaxWait: 6 ms; MinBytes: 1 bytes; RequestInfo: [LOGFILE04,0] - PartitionFetchInfo(2,1048576) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at kafka.utils.Utils$.read(Unknown Source) at kafka.network.BoundedByteBufferReceive.readFrom(Unknown Source) at kafka.network.Receive$class.readCompletely(Unknown Source) at kafka.network.BoundedByteBufferReceive.readCompletely(Unknown Source) at kafka.network.BlockingChannel.receive(Unknown Source) at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown Source) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown Source) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown Source) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown Source) at kafka.metrics.KafkaTimer.time(Unknown Source) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown Source) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown Source) at kafka.metrics.KafkaTimer.time(Unknown Source) at
Re: Too many replicas after partition reassignment
Hi Ryan, Also KAFKA-1317 should be fixed in both trunk and latest 0.8.1 branch, are you running with either or just with one of the previous released versions? Tim On Mon, Apr 21, 2014 at 5:00 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Ryan, Did you see any error logs on the new controller's controller log and state-change log? Guozhang On Mon, Apr 21, 2014 at 11:41 AM, Ryan Berdeen rberd...@hubspot.com wrote: After doing some partition reassignments, I've ended up with some partitions that have both the old and new brokers assigned. The output of kafka-topics.sh --describe looks like this: Topic:cs-es-indexer-a PartitionCount:30 ReplicationFactor:2 Configs: retention.ms=1080 ... Topic: cs-es-indexer-a Partition: 11 Leader: 11 Replicas: 11,12 Isr: 11,12 Topic: cs-es-indexer-a Partition: 12 Leader: 15 Replicas: 15,17,12 Isr: 12,17,15 ... Partition 12 has 3 replicas, but the topic has a replication factor of 2. It was reassigned from [12,17] to [15,17]. During the reassignment, the controller changed from one broker to another, and the resigning controller deadlocked with https://issues.apache.org/jira/browse/KAFKA-1317. Is this expected, or a known issue? I've had to alter some of my monitoring tools to handle different-sized replica lists for partitions of the same topic. -- -- Guozhang
Re: NullPointerException in broker on notification of ZK session end
Hey Clark, Small world indeed :) I don't believe it's associated with KAFKA-1310, and I actually already have a fix in the 0.8.1 branch and trunk. Please try it and it will be in our 0.8.1.1 release. I can actually repro the problem you're seeing, seems like we're calling onControllerResignation assuming it's the controller while the broker might be just re-establishing zookeeper session. I'll file a jira and fix this. Tim On Wed, Apr 2, 2014 at 4:00 PM, Clark Breyman cl...@breyman.com wrote: Hey Tim. Small world :). Kafka 0.8.1_2.10 On Wed, Apr 2, 2014 at 3:54 PM, Timothy Chen tnac...@gmail.com wrote: Hi Clark, What version of Kafka are you running this from? Thanks, Tim On Wed, Apr 2, 2014 at 3:49 PM, Clark Breyman cl...@breyman.com wrote: I'm seeing a lot of this in my logs on a non-controller broker: 2014-04-02 15:42:23,078] ERROR Error handling event ZkEvent[New session event sent to kafka.controller.KafkaController$SessionExpirationListener@204a18ac] (org.I0Itec.zkclient.ZkEventThread) java.lang.NullPointerException at kafka.controller.KafkaController$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340) at kafka.controller.KafkaController$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) at kafka.controller.KafkaController$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337) at kafka.controller.KafkaController$SessionExpirationListener$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068) at kafka.controller.KafkaController$SessionExpirationListener$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) at kafka.controller.KafkaController$SessionExpirationListener$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067) at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Is it associated with KAFKA-1310 or is it something else? It seems to be coming at the beginning of onControllerResignation, so non of the other shutdowns get triggered.
Re: Help in setting up Kafka cluster
Hi Roy, I wonder if you were able to start the broker following the steps here: http://kafka.apache.org/documentation.html#quickstart That page also shows you how to create a topic and send/consume messages using the console producer/consumer. Let us know if you run into any problems, Tim On Wed, Mar 26, 2014 at 3:04 PM, , Roy rp...@njit.edu wrote: Hi, First time I am trying to setup new kafka cluster. I have tried sudo cluster with cli based kafka producer and consumer. Having difficulties with setting up log aggregation kafka producer and consumer. I would appreciate if anyone can help me in this. - roy
Re: Need help for kafka on windows
Hi Sripada, Unfortunately I can't provide a code fix, but it's an easy fix actually. Basically the path that is trying to look for kafka-run-class.bat is wrong as it expects that file to be from the current window's folder. You can either cd into the parent and run it or fix the script. Tim On Tue, Jan 14, 2014 at 10:46 AM, Francois Langelier francois.langel...@mate1inc.com wrote: I think your bug was reported here, but there is not enough details so maybe i'm wrong https://issues.apache.org/jira/browse/KAFKA-1195 If this is your bug, the good new is you are not alone on the other hand, the bug priority is minor and there is no fix yet... On Tue, Jan 14, 2014 at 6:15 AM, sripada.ngg sripada@excelindia.com wrote: Hi All, I am new to apache kafka. Can you please any one help me to setup apache kafka (kafka_2.8.0-0.8.0). I have download and try to run batch file under bin/windows folder I am getting below error. 'kafka-run-class.bat' is not recognized as an internal or external command,operable program or batch file. Can you please guide me how to resolve this issue and steps to set up kafaka on window local system. Thanks and Regards, Shreepada.
Re: Updated kafka client (producer and consumer)
From the roadmap they published it looks like pipelining as part of the client rewrite is happening post 0.8. Tim On Thu, Dec 5, 2013 at 3:52 PM, Tom Brown tombrow...@gmail.com wrote: In our environment we use currently use Kafka 0.7.1. The core features I am looking for in a client are this: 1. Provide confirmation of produce requests (or notification of disconnection during requests). 2. Uses asynchronous IO so that: A. Multiple ops can be queued/in-flight at once. B. Fetch #2 can be in-flight while fetch #1 is being processed (without requiring an extra thread per consumer) 3. Allow precise control of offsets (like the SimpleConsumer). I have been working on a version of the Kafka client that meets those requirements. However, I recall hearing talk of rewriting the client. Did this happen as part of 0.8.x? If so, how different is it from the 0.7.x clients, and how well would it support the requirements listed above? Thanks in advance! --Tom
Re: Loggly's use of Kafka on AWS
Hi Philip, So I wonder if you guys hit disk perf problems with EBS? It seems quite common in the past but I haven't tried recently. Also can you share how you guys deployed zookeeper in AWS so that a qurom is always available? Tim Sent from my iPhone On Dec 2, 2013, at 5:15 PM, Steve Morin steve.mo...@gmail.com wrote: Philip this is definitely useful On Dec 2, 2013, at 14:55, Surendranauth Hiraman suren.hira...@sociocast.com wrote: S Ahmed, This combination of Kafka and Storm to process streaming data is becoming pretty common. Definitely worth looking at. The throughput will vary depending on your workload (cpu usage, etc.) and if you're talking to a backend, of course. But it scales very well. -Suren On Mon, Dec 2, 2013 at 5:49 PM, S Ahmed sahmed1...@gmail.com wrote: Interesting. So twitter storm is used to basically process the messages on kafka? I'll have to read-up on storm b/c I always thought the use case was a bit different. On Sun, Dec 1, 2013 at 9:59 PM, Joe Stein joe.st...@stealth.ly wrote: Awesome Philip, thanks for sharing! On Sun, Dec 1, 2013 at 9:17 PM, Philip O'Toole phi...@loggly.com wrote: A couple of us here at Loggly recently spoke at AWS reinvent, on how we use Kafka 0.72 in our ingestion pipeline. The slides are at the link below, and may be of interest to people on this list. http://www.slideshare.net/AmazonWebServices/infrastructure-at-scale-apache-kafka-twitter-storm-elastic-search-arc303-aws-reinvent-2013 Any questions, let me know, though I can't promise I can answer everything. Can't give the complete game away. :-) As always, Kafka rocks! Philip -- ___ Available at these partners: [image: CloudFlare | shopify | Bigcommerce] SUREN HIRAMAN, VP TECHNOLOGY SOCIOCAST Simple. Powerful. Predictions. 96 SPRING STREET, 7TH FLOOR NEW YORK, NY 10012 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hira...@sociocast.com W: www.sociocast.com Increase Conversion Rates up to 500%. Go to www.sociocast.com and enter your URL for a free trial!
Re: Broker bind address versus published hostname in ZooKeeper
Hi Folks/Roger, Unfortunately I don't have legal clearance to contribute patches yet back to Kafka for code done at work, so Roger it will be great if you can provide this patch. Thanks! Tim On Mon, Oct 21, 2013 at 11:17 AM, Roger Hoover roger.hoo...@gmail.comwrote: Agreed. Tim, it would be very helpful is you could provide a patch. Otherwise, I may be willing to create one. On Thu, Oct 17, 2013 at 8:15 PM, Jun Rao jun...@gmail.com wrote: Tim, This seems like a reasonable requirement. Would you be interested in providing a patch to the jira? Thanks, Jun On Thu, Oct 17, 2013 at 3:20 PM, Timothy Chen tnac...@gmail.com wrote: Hi Roger, That's exactly what I need in my end, and actually internally created a new property called zkHost.name to publish a different host to zk. This is also needed for deploying Kafka into Azure. I also created zkHost.port since the internal and external ports that's exposed might be different as well. Tim On Thu, Oct 17, 2013 at 3:13 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi all, I'm getting started experimenting with Kafka and ran into a configuration issue. Currently, in server.properties, you can configure host.name which gets used for two purposes: 1) to bind the socket 2) to publish the broker details to ZK for clients to use. There are times when these two settings need to be different. Here's an example. I want to setup Kafka brokers on OpenStack virtual machines in a private cloud but I need producers to connect from elsewhere on the internal corporate network. With OpenStack, the virtual machines are only exposed to DHCP addresses (typically RFC 1918 private addresses). You can assign floating ips to a virtual machine but it's forwarded using Network Address Translation and not exposed directly to the VM. Also, there's typically no DNS to provide hostname lookup. Hosts have names like fubar.novalocal that are not externally routable. Here's what I want. I want the broker to bind to the VM's private network IP but I want it to publish it's floating IP to ZooKeeper so that producers can publish to it. I propose a new optional parameter, listen, which would allow you to specify the socket address to listen on. If not set, the parameter would default to host.name, which is the current behavior. #Publish the externally routable IP in ZK host.name = floating ip #Accept connections from any interface the VM knows about listen = * I'm assuming others will eventually have the same requirement so I've added a JIRA ticket. https://issues.apache.org/jira/browse/KAFKA-1092 Thanks for your consideration. Cheers, Roger
Re: Broker bind address versus published hostname in ZooKeeper
Hi Roger, That's exactly what I need in my end, and actually internally created a new property called zkHost.name to publish a different host to zk. This is also needed for deploying Kafka into Azure. I also created zkHost.port since the internal and external ports that's exposed might be different as well. Tim On Thu, Oct 17, 2013 at 3:13 PM, Roger Hoover roger.hoo...@gmail.comwrote: Hi all, I'm getting started experimenting with Kafka and ran into a configuration issue. Currently, in server.properties, you can configure host.name which gets used for two purposes: 1) to bind the socket 2) to publish the broker details to ZK for clients to use. There are times when these two settings need to be different. Here's an example. I want to setup Kafka brokers on OpenStack virtual machines in a private cloud but I need producers to connect from elsewhere on the internal corporate network. With OpenStack, the virtual machines are only exposed to DHCP addresses (typically RFC 1918 private addresses). You can assign floating ips to a virtual machine but it's forwarded using Network Address Translation and not exposed directly to the VM. Also, there's typically no DNS to provide hostname lookup. Hosts have names like fubar.novalocal that are not externally routable. Here's what I want. I want the broker to bind to the VM's private network IP but I want it to publish it's floating IP to ZooKeeper so that producers can publish to it. I propose a new optional parameter, listen, which would allow you to specify the socket address to listen on. If not set, the parameter would default to host.name, which is the current behavior. #Publish the externally routable IP in ZK host.name = floating ip #Accept connections from any interface the VM knows about listen = * I'm assuming others will eventually have the same requirement so I've added a JIRA ticket. https://issues.apache.org/jira/browse/KAFKA-1092 Thanks for your consideration. Cheers, Roger
Re: Patch for mmap + windows
Btw, I've been running this patch in our cloud env and it's been working fine so far. I actually filed another bug as I saw another problem on windows locally ( https://issues.apache.org/jira/browse/KAFKA-1036). Tim On Wed, Aug 21, 2013 at 4:29 PM, Jay Kreps jay.kr...@gmail.com wrote: That would be great! -Jay On Wed, Aug 21, 2013 at 3:13 PM, Timothy Chen tnac...@gmail.com wrote: Hi Jay, I'm planning to test run Kafka on Windows in our test environments evaluating if it's suitable for production usage. I can provide feedback with the patch how well it works and if we encounter any functional or perf problems. Tim On Wed, Aug 21, 2013 at 2:54 PM, Jay Kreps jay.kr...@gmail.com wrote: Elizabeth and I have a patch to support our memory mapped offset index files properly on Windows: https://issues.apache.org/jira/browse/KAFKA-1008 Question: Do we want this on 0.8 or trunk? I would feel more comfortable with it in trunk, but that means windows support in 0.8 is known to be broken (as opposed to not known to be broken but not known to be working either since we are not doing aggressive system testing on windows). I would feel more comfortable doing the patch on 0.8 if there was someone who would be willing to take on real load testing and/or production operation on Windows so we could have some confidence that Kafka on Windows actually works, otherwise this could just be the tip of the iceberg. Also it would be great to get review on that patch regardless of the destination. -Jay
Re: Patch for mmap + windows
Gotcha :) Seems like this will be taken care of then. Tim On Mon, Sep 9, 2013 at 6:22 PM, Jay Kreps jay.kr...@gmail.com wrote: I think Srirams complaint is that I haven't yet addressed his concerns :-) Sent from my iPhone On Sep 9, 2013, at 3:56 PM, Sriram Subramanian srsubraman...@linkedin.com wrote: I did take a look at KAFKA-1008 a while back and added some comments. On 9/9/13 3:52 PM, Jay Kreps jay.kr...@gmail.com wrote: Cool can we get a reviewer for KAFKA-1008 then? I can take on the other issue for the checkpoint files. -Jay On Mon, Sep 9, 2013 at 3:16 PM, Neha Narkhede neha.narkh...@gmail.comwrote: +1 for windows support on 0.8 Thanks, Neha On Mon, Sep 9, 2013 at 10:48 AM, Jay Kreps jay.kr...@gmail.com wrote: So guys, do we want to do these in 0.8? The first patch was a little involved but I think it would be good to have windows support in 0.8 and it sounds like Tim is able to get things working after these changes. -Jay On Mon, Sep 9, 2013 at 10:19 AM, Timothy Chen tnac...@gmail.com wrote: Btw, I've been running this patch in our cloud env and it's been working fine so far. I actually filed another bug as I saw another problem on windows locally ( https://issues.apache.org/jira/browse/KAFKA-1036). Tim On Wed, Aug 21, 2013 at 4:29 PM, Jay Kreps jay.kr...@gmail.com wrote: That would be great! -Jay On Wed, Aug 21, 2013 at 3:13 PM, Timothy Chen tnac...@gmail.com wrote: Hi Jay, I'm planning to test run Kafka on Windows in our test environments evaluating if it's suitable for production usage. I can provide feedback with the patch how well it works and if we encounter any functional or perf problems. Tim On Wed, Aug 21, 2013 at 2:54 PM, Jay Kreps jay.kr...@gmail.com wrote: Elizabeth and I have a patch to support our memory mapped offset index files properly on Windows: https://issues.apache.org/jira/browse/KAFKA-1008 Question: Do we want this on 0.8 or trunk? I would feel more comfortable with it in trunk, but that means windows support in 0.8 is known to be broken (as opposed to not known to be broken but not known to be working either since we are not doing aggressive system testing on windows). I would feel more comfortable doing the patch on 0.8 if there was someone who would be willing to take on real load testing and/or production operation on Windows so we could have some confidence that Kafka on Windows actually works, otherwise this could just be the tip of the iceberg. Also it would be great to get review on that patch regardless of the destination. -Jay
Re: Correlation id
Thanks Tejas! That's very helpful. Tim On Mon, Aug 19, 2013 at 11:40 PM, Tejas Patil tejas.patil...@gmail.comwrote: Multiple produce requests are sent asynchronously over the same socket. Suppose you send 2 requests and get back single response, how do you figure out which one it corresponds to of those 2 requests ? Correlation Id helps here. AFAIK, correlation Id is added to produce requests and broker uses the same id in its response so that the producer can keep track of its requests. Correlation Id also helps in debugging issues as now you can uniquely identify requests across producer and broker logs. On Mon, Aug 19, 2013 at 11:01 PM, Timothy Chen tnac...@gmail.com wrote: Hi, This is probably a very obvious questoin, but I cannot find the answer for this. What does the correlation id mean in a producer request? Tim
Error when processing messages in Windows
Hi all, I've tried pushing a large amount of messages into Kafka on Windows, and got the following error: Caused by: java.io.IOException: The requested operation cannot be performed on a file with a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex.liftedTree2$1(OffsetIndex.scala:263) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:262) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:247) at kafka.log.Log.rollToOffset(Log.scala:518) at kafka.log.Log.roll(Log.scala:502) at kafka.log.Log.maybeRoll(Log.scala:484) at kafka.log.Log.append(Log.scala:297) ... 19 more I suspect that Windows is not releasing memory mapped file references soon enough. I wonder if there is any good workaround or solutions for this? Thanks! Tim
Re: Changing the number of partitions after a topic is created
Hi Jun, I wonder when will the tool be available? We're very interested in changing the number of partitions for a topic after creation too. Thanks! Tim On Thu, Jul 4, 2013 at 9:06 PM, Jun Rao jun...@gmail.com wrote: Currently, once a topic is created, the number of partitions can't be changed. We are working on a tool to allow that. For now, you will have to either use a new topic with more partitions or if you don't care about messages being consumed in order, you can feed the consumed messages to a separate thread pool for processing. Thanks, Jun On Thu, Jul 4, 2013 at 8:11 AM, Calvin Lei ckp...@gmail.com wrote: Hi I have a few topics created with 1 partition. After running the cluster for a few days, I want to increase the partition to 10 to improve the consumer throughput. I learnt that it is not supported in 0.8. What is the recommendation of improving consumer throughput after a topic is created and the data volume increased? Regards, Cal
Re: C# client for kafka 0.8
Hi Robert, The most recent one that I know of is the C# client that ExactTarget folks did, however not all calls are up to the 0.8 protocol so it doesn't completely work. I have a slightly more cleaned up version here https://github.com/tnachen/kafka/tree/feature/et-develop-0.8 It will be great if you are interested in finishing it :) Tim On Tue, Jun 18, 2013 at 8:23 AM, Withers, Robert robert.with...@dish.comwrote: I see an old C# client, which is 2 years old. Does anyone have a C# client that works with the kafka 0.8 producer? Thanks, rob
Re: Using Kafka for data messages
Also since you're going to be creating a topic per user, the number of concurrent users will also be a concern to Kafka as it doesn't like massive amounts of topics. Tim On Thu, Jun 13, 2013 at 10:47 AM, Josh Foure user...@yahoo.com wrote: Hi Mahendra, I think that is where it gets a little tricky. I think it would work something like this: 1. Web sends login event for user user123 to topic GUEST_EVENT. 2. All of the systems consume those messages and publish the data messages to topic GUEST_DATA.user123. 3. The Recommendation system gets all of the data from GUEST_DATA.user123, processes and then publishes back to the same topic GUEST_DATA.user123. 4. The Web consumes the messages from the same topic (there is a different topic for every user that logged in) GUEST_DATA.user123 and when it finds the recommendation messages it pushes that to the browser (note it will need to read all the other data messages and discard those when looking for the recommendation messages). I have a concern that the Web will be flooded with a ton of messages that it will promptly drop but I don't want to create a new response or recommendation topic because then I feel like I am tightly coupling the message to the functionality and in the future different systems may want to consume those messages as well. Does that make sense? Josh From: Mahendra M mahendr...@gmail.com To: users@kafka.apache.org; Josh Foure user...@yahoo.com Sent: Thursday, June 13, 2013 12:56 PM Subject: Re: Using Kafka for data messages Hi Josh, The idea looks very interesting. I just had one doubt. 1. A user logs in. His login id is sent on a topic 2. Other systems (consumers on this topic) consumer this message and publish their results to another topic This will be happening without any particular order for hundreds of users. Now the site being displayed to the user.. How will you fetch only messages for that user from the queue? Regards, Mahendra On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote: Hi all, my team is proposing a novel way of using Kafka and I am hoping someone can help do a sanity check on this: 1. When a user logs into our website, we will create a “logged in” event message in Kafka containing the user id. 2. 30+ systems (consumers each in their own consumer groups) will consume this event and lookup data about this user id. They will then publish all of this data back out into Kafka as a series of data messages. One message may include the user’s name, another the user’s address, another the user’s last 10 searches, another their last 10 orders, etc. The plan is that a single “logged in” event may trigger hundreds if not thousands of additional data messages. 3. Another system, the “Product Recommendation” system, will have consumed the original “logged in” message and will also consume a subset of the data messages (realistically I think it would need to consume all of the data messages but would discard the ones it doesn’t need). As the Product Recommendation consumes the data messages, it will process recommended products and publish out recommendation messages (that get more and more specific as it has consumed more and more data messages). 4. The original website will consume the recommendation messages and show the recommendations to the user as it gets them. You don’t see many systems implemented this way but since Kafka has such a higher throughput than your typical MOM, this approach seems innovative. The benefits are: 1. If we start collecting more information about the users, we can simply start publishing that in new data messages and consumers can start processing those messages whenever they want. If we were doing this in a more traditional SOA approach the schemas would need to change every time we added a field but with this approach we can just create new messages without touching existing ones. 2. We are looking to make our systems smaller so if we end up with more, smaller systems that each publish a small number of events, it becomes easier to make changes and test the changes. If we were doing this in a more traditional SOA approach we would need to retest each consumer every time we changed our bigger SOA services. The downside appears to be: 1. We may be publishing a large amount of data that never gets used but that everyone needs to consume to see if they need it before discarding it. 2. The Product Recommendation system may need to wait until it consumes a number of messages and keep track of all the data internally before it can start processing. 3. While we may be able to keep the messages somewhat small, the fact that they contain data will mean they will be bigger than your tradition EDA messages. 4. It seems like we
Custom partitioner
Hi, I'm trying to add my own custom partitioner and saw the example in the 0.8 producer example in the wiki. However, when I set a broker list and set the custom partitioner class name I did in the client, I see this error: Partitioner cannot be used when broker list is set Does this means a custom partitioner is only available when I use Zookeeper to connect to Kafka? Thanks, Tim
Re: Partitioning and scale
Hi Neha, Not sure if this sounds crazy, but if we'd like to have the events for the same session id go to the same partition one way could be that each session key creates its own topic with single partition, therefore there could be millions of topic with single partition. I wonder what would be the bottleneck of doing this? Thanks, Tim On Wed, May 22, 2013 at 4:32 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Not automatically as of today. You have to run the reassign-partitions tool and explicitly move selected partitions to the new brokers. If you use this tool, you can move partitions to the new broker without any downtime. Thanks, Neha On Wed, May 22, 2013 at 2:20 PM, Timothy Chen tnac...@gmail.com wrote: Hi Neha/Chris, Thanks for the reply, so if I set a fixed number of partitions and just add brokers to the broker pool, does it rebalance the load to the new brokers (along with the data)? Tim On Wed, May 22, 2013 at 1:15 PM, Neha Narkhede neha.narkh...@gmail.com wrote: - I see that Kafka server.properties allows one to specify the number of partitions it supports. However, when we want to scale I wonder if we add # of partitions or # of brokers, will the same partitioner start distributing the messages to different partitions? And if it does, how can that same consumer continue to read off the messages of those ids if it was interrupted in the middle? The num.partitions config in server.properties is used only for topics that are auto created (controlled by auto.create.topics.enable). For topics that you create using the admin tool, you can specify the number of partitions that you want. After that, currently there is no way to change that. For that reason, it is a good idea to over partition your topic, which also helps load balance partitions onto the brokers. You are right that if you change the number of partitions later, then previously messages that stuck to a certain partition would now get routed to a different partition, which is undesirable for applications that want to use sticky partitioning. - I'd like to create a consumer per partition, and for each one to subscribe to the changes of that one. How can this be done in kafka? For your use case, it seems like SimpleConsumer might be a better fit. However, it will require you to write code to handle discovery of leader for the partition that your consumer is consuming. Chris has written up a great example that you can follow - https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Thanks, Neha On Wed, May 22, 2013 at 12:37 PM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Tim, On Wed, May 22, 2013 at 3:25 PM, Timothy Chen tnac...@gmail.com wrote: Hi, I'm currently trying to understand how Kafka (0.8) can scale with our usage pattern and how to setup the partitioning. We want to route the same messages belonging to the same id to the same queue, so its consumer will able to consume all the messages of that id. My questions: - From my understanding, in Kafka we would need to have a custom partitioner that routes the same messages to the same partition right? I'm trying to find examples of writing this partitioner logic, but I can't find any. Can someone point me to an example? https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example The partitioner here does a simple mod on the IP address and the # of partitions. You'd need to define your own logic, but this is a start. - I see that Kafka server.properties allows one to specify the number of partitions it supports. However, when we want to scale I wonder if we add # of partitions or # of brokers, will the same partitioner start distributing the messages to different partitions? And if it does, how can that same consumer continue to read off the messages of those ids if it was interrupted in the middle? I'll let someone else answer this. - I'd like to create a consumer per partition, and for each one to subscribe to the changes of that one. How can this be done in kafka? Two ways: Simple Consumer or Consumer Groups: Depends on the level of control you want on code processing a specific partition vs. getting one assigned to it (and level of control over offset management). https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example Thanks, Tim
Partitioning and scale
Hi, I'm currently trying to understand how Kafka (0.8) can scale with our usage pattern and how to setup the partitioning. We want to route the same messages belonging to the same id to the same queue, so its consumer will able to consume all the messages of that id. My questions: - From my understanding, in Kafka we would need to have a custom partitioner that routes the same messages to the same partition right? I'm trying to find examples of writing this partitioner logic, but I can't find any. Can someone point me to an example? - I see that Kafka server.properties allows one to specify the number of partitions it supports. However, when we want to scale I wonder if we add # of partitions or # of brokers, will the same partitioner start distributing the messages to different partitions? And if it does, how can that same consumer continue to read off the messages of those ids if it was interrupted in the middle? - I'd like to create a consumer per partition, and for each one to subscribe to the changes of that one. How can this be done in kafka? Thanks, Tim
Re: Partitioning and scale
Hi Neha/Chris, Thanks for the reply, so if I set a fixed number of partitions and just add brokers to the broker pool, does it rebalance the load to the new brokers (along with the data)? Tim On Wed, May 22, 2013 at 1:15 PM, Neha Narkhede neha.narkh...@gmail.comwrote: - I see that Kafka server.properties allows one to specify the number of partitions it supports. However, when we want to scale I wonder if we add # of partitions or # of brokers, will the same partitioner start distributing the messages to different partitions? And if it does, how can that same consumer continue to read off the messages of those ids if it was interrupted in the middle? The num.partitions config in server.properties is used only for topics that are auto created (controlled by auto.create.topics.enable). For topics that you create using the admin tool, you can specify the number of partitions that you want. After that, currently there is no way to change that. For that reason, it is a good idea to over partition your topic, which also helps load balance partitions onto the brokers. You are right that if you change the number of partitions later, then previously messages that stuck to a certain partition would now get routed to a different partition, which is undesirable for applications that want to use sticky partitioning. - I'd like to create a consumer per partition, and for each one to subscribe to the changes of that one. How can this be done in kafka? For your use case, it seems like SimpleConsumer might be a better fit. However, it will require you to write code to handle discovery of leader for the partition that your consumer is consuming. Chris has written up a great example that you can follow - https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Thanks, Neha On Wed, May 22, 2013 at 12:37 PM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Tim, On Wed, May 22, 2013 at 3:25 PM, Timothy Chen tnac...@gmail.com wrote: Hi, I'm currently trying to understand how Kafka (0.8) can scale with our usage pattern and how to setup the partitioning. We want to route the same messages belonging to the same id to the same queue, so its consumer will able to consume all the messages of that id. My questions: - From my understanding, in Kafka we would need to have a custom partitioner that routes the same messages to the same partition right? I'm trying to find examples of writing this partitioner logic, but I can't find any. Can someone point me to an example? https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example The partitioner here does a simple mod on the IP address and the # of partitions. You'd need to define your own logic, but this is a start. - I see that Kafka server.properties allows one to specify the number of partitions it supports. However, when we want to scale I wonder if we add # of partitions or # of brokers, will the same partitioner start distributing the messages to different partitions? And if it does, how can that same consumer continue to read off the messages of those ids if it was interrupted in the middle? I'll let someone else answer this. - I'd like to create a consumer per partition, and for each one to subscribe to the changes of that one. How can this be done in kafka? Two ways: Simple Consumer or Consumer Groups: Depends on the level of control you want on code processing a specific partition vs. getting one assigned to it (and level of control over offset management). https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example Thanks, Tim