Doubts Kafka
Hi Guys, I have some doubts about the Kafka, the first is Why sometimes the applications prefer to connect to zookeeper instead brokers? Connecting to zookeeper could create an overhead, because we are inserting other element between producer and consumer. Another question is about the information sent by producer, in my tests the producer send the messages to brokers and a few minutes my HardDisk is full (my harddisk has 250GB), is there something to do in the configuration to minimize this? Thanks -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Can't send a keyedMessage to brokers with partitioner.class=kafka.producer.DefaultPartitioner
Hi,I have a 2 nodes kafka cluster with 2 instances of brokers and zookeepers. And then I create a topic kafka-test with 2 partitions and replication-factor =2. My producer config is: {partitioner.class kafka.producer.DefaultPartitioner metadata.broker.list 172.32.1.248:9092,172.32.1.251:9092 request.required.acks 1} So for the DefaultPartitoner, it will calculate and hashvalue and divide by the num_partiton to decide which partition the data it will go, so I create my keyedMessageval key-msg = KeyedMessage(kafka-test,a,test message!)prod.send(key-msg) a's hashValue is 97 and 97 % 2 = 1, so the data should go to partition1. However, the data did't get send to the brokers (I have a console consumer running that didn't receive any message from this topic). If I create the key-msg without the key, it works fine val key-msg = KeyedMessage(kafka-test,test message!)prod.send(key-msg) Am I using the key wrong or anything?ThanksEdwin
Re: high cpu and network traffic when cluster has no topic
Sounds like this patch fixed the issue. It would be good to get some review on KAFKA-1919--it is only a four line change. On Wed, Feb 4, 2015 at 1:15 PM, Steven Wu stevenz...@gmail.com wrote: Bhavesh, unfortunately, ps cmd in Mac doesn't display thread id. I tried DTrace, but it only shows kernel thread id (not Java thread id). anyway, I updated the jira with producer metrics. it clearly shows request rate shoot up to 18K/sec. Thanks, Steven On Wed, Feb 4, 2015 at 9:48 AM, Steven Wu stevenz...@gmail.com wrote: Bhavesh, this is on Mac OS. I couldn't get similar options to make ps/jstack work on Mac. will continue to try if I can make them work. logging output does show kafka-producer-network-thread sends two metadata requests per milli-seconds. Thanks, Steven On Wed, Feb 4, 2015 at 9:15 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Steven, Can you please try to see if io thread is indeed a problem ? The following on works on Linux: ps -p $java_pid -L -o tid,pcpu jstack -F $java_pid Then compare the thread # (may have to Hex # to decimal) between the Jstack and ps command. This will tell you which thread is consuming more CPU for that process. Thanks, Bhavesh On Wed, Feb 4, 2015 at 9:01 AM, Steven Wu stevenz...@gmail.com wrote: I have re-run my unit test with 0.8.2.0. same tight-loop problem happened after a few mins. On Tue, Feb 3, 2015 at 10:00 PM, Guozhang Wang wangg...@gmail.com wrote: Steven, you may be hitting on KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642. As Jay said, a bunch of such issues are fixed in the new release. Please let us know if you still see the issue with it. Guozhang On Tue, Feb 3, 2015 at 8:52 PM, Steven Wu stevenz...@gmail.com wrote: sure. will try my unit test again with 0.8.2.0 release tomorrow and report back my findings. On Tue, Feb 3, 2015 at 8:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, That sounds like a bug. I think we fixed a few producer high cpu issues since the beta, I wonder if you could repeat the same test with the 0.8.2. final release? -Jay On Tue, Feb 3, 2015 at 8:37 PM, Steven Wu stevenz...@gmail.com wrote: actually, my local test can reproduce the issue although not immediately. seems to happen after a few mins. I enabled TRACE level logging. here seems to be the tight loop. you can see that there are two metadata requests in one milli-seconds. kafka-producer-network-thread | foo 20:34:32,626 TRACE NetworkClient:301 - Ignoring empty metadata response with correlation id 360185. kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:369 - Trying to send metadata request to node -2 kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:374 - Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=360186,client_id=foo}, body={topics=[]})) to node -2 kafka-producer-network-thread | foo 20:34:32,626 TRACE NetworkClient:301 - Ignoring empty metadata response with correlation id 360186. kafka-producer-network-thread | foo 20:34:32,626 DEBUG NetworkClient:369 - Trying to send metadata request to node -2 On Tue, Feb 3, 2015 at 8:10 PM, Steven Wu stevenz...@gmail.com wrote: Hi, We have observed high cpu and high network traffic problem when 1) cluster (0.8.1.1) has no topic 2) KafkaProducer (0.8.2-beta) object is created without sending any traffic We have observed such problem twice. In both cases, problem went away immediately after one/any topic is created. Is this a known issue? Just want to check with the community first before I spend much time to reproduce it. I couldn't reproduce the issue with similar setup with unit test code in IDE. start two brokers with no topic locally on my laptop. create a KafkaProducer object without sending any msgs. but I only tested with 0.8.2-beta for both broker and producer. Thanks, Steven -- -- Guozhang
Re: New Producer - ONLY sync mode?
Hey Otis, Yeah, Gwen is correct. The future from the send will be satisfied when the response is received so it will be exactly the same as the performance of the sync producer previously. -Jay On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira gshap...@cloudera.com wrote: If I understood the code and Jay correctly - if you wait for the future it will be a similar delay to that of the old sync producer. Put another way, if you test it out and see longer delays than the sync producer had, we need to find out why and fix it. Gwen On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, Nope, unfortunately it can't do that. X is a remote app, doesn't listen to anything external, calls Y via HTTPS. So X has to decide what to do with its data based on Y's synchronous response. It has to block until Y responds. And it wouldn't be pretty, I think, because nobody wants to run apps that talk to remove servers and hang on to connections more than they have to. But perhaps that is the only way? Or maybe the answer to I'm guessing the delay would be more or less the same as if the Producer was using SYNC mode? is YES, in which case the connection from X to Y would be open for just as long as with a SYNC producer running in Y? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira gshap...@cloudera.com wrote: Can Y have a callback that will handle the notification to X? In this case, perhaps Y can be async and X can buffer the data until the callback triggers and says all good (or resend if the callback indicates an error) On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, Thanks for the info. Here's the use case. We have something up stream sending data, say a log shipper called X. It sends it to some remote component Y. Y is the Kafka Producer and it puts data into Kafka. But Y needs to send a reply to X and tell it whether it successfully put all its data into Kafka. If it did not, Y wants to tell X to buffer data locally and resend it later. If producer is ONLY async, Y can't easily do that. Or maybe Y would just need to wait for the Future to come back and only then send the response back to X? If so, I'm guessing the delay would be more or less the same as if the Producer was using SYNC mode? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah as Gwen says there is no sync/async mode anymore. There is a new configuration which does a lot of what async did in terms of allowing batching: batch.size - This is the target amount of data per partition the server will attempt to batch together. linger.ms - This is the time the producer will wait for more data to be sent to better batch up writes. The default is 0 (send immediately). So if you set this to 50 ms the client will send immediately if it has already filled up its batch, otherwise it will wait to accumulate the number of bytes given by batch.size. To send asynchronously you do producer.send(record) whereas to block on a response you do producer.send(record).get(); which will wait for acknowledgement from the server. One advantage of this model is that the client will do it's best to batch under the covers even if linger.ms=0. It will do this by batching any data that arrives while another send is in progress into a single request--giving a kind of group commit effect. The hope is that this will be both simpler to understand (a single api that always works the same) and more powerful (you always get a response with error and offset information whether or not you choose to use it). -Jay On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira gshap...@cloudera.com wrote: If you want to emulate the old sync producer behavior, you need to set the batch size to 1 (in producer config) and wait on the future you get from Send (i.e. future.get) I can't think of good reasons to do so, though. Gwen On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, Is the plan for New Producer to have ONLY async mode? I'm asking because of this info from the Wiki: - The producer will always attempt to batch data and will always immediately return a SendResponse which acts as a Future to allow the client to await the completion of the request. The word always makes me think there will be no sync mode.
Console Producer Throwing LeaderNotAvailableException Despite Existing Leader for Partition
Howdy all, I recently upgraded to Kafka 0.8.2.0 and am trying to verify that everything still works as expected. I spin up two brokers, one zk instance, and then create a topic using kafka-topics.sh --create --zookeeper ad-0104:2181 --topic deleteme --partitions 2 --replication-factor 1 Then I run --describe to check if the partitions have leaders. I get kafka-topics.sh --describe --zookeeper ad-0104:2181 --topic deleteme Topic:deleteme PartitionCount:2 ReplicationFactor:1 Configs: Topic: deleteme Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: deleteme Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Finally, I run the console producer kafka-console-producer.sh --broker-list ad-0102:9092 --topic deleteme I get the following warning [2015-02-08 00:36:24,244] WARN Property topic is not valid (kafka.utils.VerifiableProperties) and then it waits for console input. When I try to send a message I get the following list of error messages [2015-02-08 00:37:04,735] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,751] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,752] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:04,859] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,863] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,863] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:04,968] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,974] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,974] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,079] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,084] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,084] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,189] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,191] ERROR Failed to send requests for topics deleteme with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,192] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at
Re: New Producer - ONLY sync mode?
I implemented the flush() call I hypothesized earlier in this thread as a patch on KAFKA-1865. So now producer.flush() will block until all buffered requests complete. The post condition is that all previous send futures are satisfied and have error/offset information. This is a little easier to use then keeping a list of all the futures and also probably a bit more efficient. It also immediately unblocks all requests regardless of the linger.ms setting, which is in keeping with the name I think and important for the use case we described. Code here: https://issues.apache.org/jira/browse/KAFKA-1865 -Jay On Sat, Feb 7, 2015 at 1:24 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Otis, Yeah, Gwen is correct. The future from the send will be satisfied when the response is received so it will be exactly the same as the performance of the sync producer previously. -Jay On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira gshap...@cloudera.com wrote: If I understood the code and Jay correctly - if you wait for the future it will be a similar delay to that of the old sync producer. Put another way, if you test it out and see longer delays than the sync producer had, we need to find out why and fix it. Gwen On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, Nope, unfortunately it can't do that. X is a remote app, doesn't listen to anything external, calls Y via HTTPS. So X has to decide what to do with its data based on Y's synchronous response. It has to block until Y responds. And it wouldn't be pretty, I think, because nobody wants to run apps that talk to remove servers and hang on to connections more than they have to. But perhaps that is the only way? Or maybe the answer to I'm guessing the delay would be more or less the same as if the Producer was using SYNC mode? is YES, in which case the connection from X to Y would be open for just as long as with a SYNC producer running in Y? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira gshap...@cloudera.com wrote: Can Y have a callback that will handle the notification to X? In this case, perhaps Y can be async and X can buffer the data until the callback triggers and says all good (or resend if the callback indicates an error) On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, Thanks for the info. Here's the use case. We have something up stream sending data, say a log shipper called X. It sends it to some remote component Y. Y is the Kafka Producer and it puts data into Kafka. But Y needs to send a reply to X and tell it whether it successfully put all its data into Kafka. If it did not, Y wants to tell X to buffer data locally and resend it later. If producer is ONLY async, Y can't easily do that. Or maybe Y would just need to wait for the Future to come back and only then send the response back to X? If so, I'm guessing the delay would be more or less the same as if the Producer was using SYNC mode? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah as Gwen says there is no sync/async mode anymore. There is a new configuration which does a lot of what async did in terms of allowing batching: batch.size - This is the target amount of data per partition the server will attempt to batch together. linger.ms - This is the time the producer will wait for more data to be sent to better batch up writes. The default is 0 (send immediately). So if you set this to 50 ms the client will send immediately if it has already filled up its batch, otherwise it will wait to accumulate the number of bytes given by batch.size. To send asynchronously you do producer.send(record) whereas to block on a response you do producer.send(record).get(); which will wait for acknowledgement from the server. One advantage of this model is that the client will do it's best to batch under the covers even if linger.ms=0. It will do this by batching any data that arrives while another send is in progress into a single request--giving a kind of group commit effect. The hope is that this will be both simpler to understand (a single api that always works the same) and more powerful (you always get a response with error and offset information whether or not you choose to use it). -Jay On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira gshap...@cloudera.com wrote: If you want to emulate the old sync producer behavior, you need to set the batch
Re: Not found NewShinyProducer sync performance metrics
You can configure the jmx ports by using below system properties. -Dcom.sun.management.jmxremote.port= -Dcom.sun.management.jmxremote.rmi.port=8889 On Fri, Feb 6, 2015 at 9:19 AM, Xinyi Su xiny...@gmail.com wrote: Hi, I try to use Jconsole to connect remote Kafka broker which is running behind a firewall. But it is blocked by the firewall. I can specify JMX registry port by set JMX_PORT= which is allowed by firewall, but I cannot specify the ephemeral port which is always chosen randomly at startup. This ephemeral port is which JMX RMI server listens on and through which actual data exchange takes place. It is randomly assigned and I have no way to specify it as some port which firewall does not block. How to solve this issue since I cannot access Jconsole because of firewall? Thanks. Xinyi On 6 February 2015 at 07:24, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Not announced yet, but http://sematext.com/spm should be showing you all the new shiny Kafka (new producer) metrics out of the box. If you don't see them, please shout (I know we have a bit more tweaking to do in the coming day-two-three). If you want to just dump MBeans from JMX manually and eyeball the output, you could use something like https://github.com/sematext/jmxc to dump the whole JMX content of your Java Consumer, Producer, or Broker. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Thu, Feb 5, 2015 at 5:58 AM, Manikumar Reddy ku...@nmsworks.co.in wrote: New Producer uses Kafka's own metrics api. Currently metrics are reported using jmx. Any jmx monitoring tool (jconsole) can be used for monitoring. On Feb 5, 2015 3:56 PM, Xinyi Su xiny...@gmail.com wrote: Hi, I am using kafka-producer-perf-test.sh to study NewShinyProducer *sync* performance. I have not found any CSV output or metrics collector for NewShinyProducer sync performance. Would you like to share with me about how to collect NewShinyProducer metrics? Thanks. Best regards. Xinyi
one message consumed by both consumers in the same group?
my understanding is that for 2 consumers within the same group, a message goes to only one of the consumers, not both. but I did a simple test: get the kafka distro, copy the code dir to /tmp/kafka/ then run console consumer from both dirs at the same time bin/kafka-console-consumer.sh --. all the parameters are the same then I use console producer to dump some messages into the queue. the message does show up on both ends. where am I doing wrong? thanks Yang
Re: one message consumed by both consumers in the same group?
Hi, bin/kafka-console-consumer.sh --. all the parameters are the same You need to set same group.id to create a consumer group. By default console consumer creates a random group.id. You can set group.id by using --consumer.config /tmp/comsumer.props flag. $$echo group.id=1 /tmp/consumer.props
High Latency in Kafka
Hi All, I am having some log files of around 30GB, I am trying to event process these logs by pushing them to Kafka. I could clearly see the throughput achieved while publishing these event to Kafka is quiet slow. So as mentioned for the single log file of 30GB, the Logstash is continuously emitting to Kafka and it is running from more than 2 days but still it has processed just 60% of the log data. I was looking out for a way to increase the efficiency of the publishing the event to kafka as with this rate of data ingestion I don't think it will be a good option to move ahead. Looking out for performance improvisation for the same. Experts advise required! Thanks!
Re: Console Producer Throwing LeaderNotAvailableException Despite Existing Leader for Partition
Alex, I got similar error before due to incorrect network binding of my laptop's wireless interface. You can try with setting advertised.host.name=kafka's server hostname in the server.properties and run it again. On Sun, Feb 8, 2015 at 8:38 AM, Alex Melville amelvi...@g.hmc.edu wrote: Howdy all, I recently upgraded to Kafka 0.8.2.0 and am trying to verify that everything still works as expected. I spin up two brokers, one zk instance, and then create a topic using kafka-topics.sh --create --zookeeper ad-0104:2181 --topic deleteme --partitions 2 --replication-factor 1 Then I run --describe to check if the partitions have leaders. I get kafka-topics.sh --describe --zookeeper ad-0104:2181 --topic deleteme Topic:deleteme PartitionCount:2 ReplicationFactor:1 Configs: Topic: deleteme Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: deleteme Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Finally, I run the console producer kafka-console-producer.sh --broker-list ad-0102:9092 --topic deleteme I get the following warning [2015-02-08 00:36:24,244] WARN Property topic is not valid (kafka.utils.VerifiableProperties) and then it waits for console input. When I try to send a message I get the following list of error messages [2015-02-08 00:37:04,735] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,751] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,752] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:04,859] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,863] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,863] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:04,968] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,974] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,974] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,079] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,084] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,084] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,189] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,191] ERROR Failed to send requests for topics deleteme with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,192] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
Re: New Producer - ONLY sync mode?
Steve In terms of mimicing the sync behavior, I think that is what .get() does, no? We are always returning the offset and error information. The example I gave didn't make use of it, but you definitely can make use of it if you want to. -Jay On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin st...@stevemorin.com wrote: Looking at this thread I would ideally want something at least the right recipe to mimic sync behavior like Otis is talking about. In the second case, would like to be able to individually know if messages have failed even regardless if they are in separate batches, sort of like what Kinesis does as Pradeep mentioned. -Steve On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps jay.kr...@gmail.com wrote: Yeah totally. Using a callback is, of course, the Right Thing for this kind of stuff. But I have found that kind of asynchronous thinking can be hard for people. Even if you get out of the pre-java 8 syntactic pain that anonymous inner classes inflict just dealing with multiple threads of control without creating async spaghetti can be a challenge for complex stuff. That is really the only reason for the futures in the api, they are strictly less powerful than the callbacks, but at least using them you can just call .get() and pretend it is blocking. -Jay On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein joe.st...@stealth.ly wrote: Now that 0.8.2.0 is in the wild I look forward to working with more and seeing what folks start to-do with this function https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord , org.apache.kafka.clients.producer.Callback) and keeping it fully non blocking. One sprint I know of coming up is going to have the new producer as a component in their reactive calls and handling bookkeeping and retries through that type of call back approach. Should work well (haven't tried but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc, etc in functional languages and frameworks. I think as JDK 8 starts to get out in the wild too more (may after jdk7 eol) the use of .get will be reduced (imho) and folks will be thinking more about non-blocking vs blocking and not as so much sync vs async but my crystal ball just back from the shop so well see =8^) /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, I guess the question is whether it really matters how many underlying network requests occur? It is very hard for an application to depend on this even in the old producer since it depends on the partitions placement (a send to two partitions may go to either one machine or two and so it will send either one or two requests). So when you send a batch in one call you may feel that is all at once, but that is only actually guaranteed if all messages have the same partition. The challenge is allowing even this in the presence of bounded request sizes which we have in the new producer. The user sends a list of objects and the serialized size that will result is not very apparent to them. If you break it up into multiple requests then that is kind of further ruining the illusion of a single send. If you don't then you have to just error out which is equally annoying to have to handle. But I'm not sure if from your description you are saying you actually care how many physical requests are issued. I think it is more like it is just syntactically annoying to send a batch of data now because it needs a for loop. Currently to do this you would do: List responses = new ArrayList(); for(input: recordBatch) responses.add(producer.send(input)); for(response: responses) response.get If you don't depend on the offset/error info we could add a flush call so you could instead do for(input: recordBatch) producer.send(input); producer.flush(); But if you do want the error/offset then you are going to be back to the original case. Thoughts? -Jay On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira gshap...@cloudera.com wrote: I've been thinking about that too, since both Flume and Sqoop rely on send(List) API of the old API. I'd like to see this API come back, but I'm debating how we'd handle errors. IIRC, the old API would fail an entire batch on a single error, which can lead to duplicates. Having N callbacks lets me retry
Re: How to delete defunct topics
Hi Joel, The mbean approach depends upon storing traffic stats over a period of time, so is doable but complex. I am thinking about another approach to check logfiles time-stamp (to check producer activity) in conjunction with the mtime for topic offsets (consumers activity). So if both timestamp and mtime are older than 'n' days that may imply a defunct topic. Will appreciate your feedback. Thanks, Jagbir On Thu, Feb 5, 2015 at 6:57 PM, Joel Koshy jjkosh...@gmail.com wrote: There are mbeans (http://kafka.apache.org/documentation.html#monitoring) that you can poke for incoming message rate - if you look at those over a period of time you can figure out which of those are likely to be defunct and then delete those topics. On Thu, Feb 05, 2015 at 02:38:27PM -0800, Jagbir Hooda wrote: First I would like to take this opportunity to thank this group for releasing 0.8.2.0. It's a major milestone with a rich set of features. Kudos to all the contributors! We are still running 0.8.1.2 and are planning to upgrade to 0.8.2.0. While planning this upgrade we discovered many topics that are no longer active and best be deleted. Since it would be a common task faced by many kafka adopters I thought I'd raise it here and seek expert advise. Basically what we desire is a utility/script/steps to first identify the defunct topics (let's say those topics that haven't seen any traffic in the past 'n' days) and then delete them. Will appreciate your response. Thanks, Jagbir
[DISCUSS] KIP-8 Add a flush method to the new Java producer
Following up on our previous thread on making batch send a little easier, here is a concrete proposal to add a flush() method to the producer: https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API A proposed implementation is here: https://issues.apache.org/jira/browse/KAFKA-1865 Thoughts? -Jay
Re: Doubts Kafka
Have there been any changes with 0.8.2 in how the marker gets moved when you use the high-level consumer? One problem I have always had was: what if I pull something from the stream, but then I have an error in processing it? I don't really want to move the marker. I would almost like the client to have a callback mechanism for processing, and the marker only gets moved if the high level consumer successfully implements my callback/processor (with no exceptions, at least). On Sun, Feb 8, 2015 at 9:49 AM, Gwen Shapira gshap...@cloudera.com wrote: On Sun, Feb 8, 2015 at 6:39 AM, Christopher Piggott cpigg...@gmail.com wrote: The consumer used Zookeeper to store offsets, in 0.8.2 there's an option to use Kafka itself for that (by setting *offsets.storage = kafka Does it still really live in zookeeper, and kafka is proxying the requests through? They don't live in Zookeeper. They live in a secret Kafka topic (__offsets or something like that). For migration purposes, you can set dual.commit.enable = true and then offsets will be stored in both Kafka and ZK, but the intention is to migrate to 100% Kafka storage. On Sun, Feb 8, 2015 at 9:25 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Eduardo, 1. Why sometimes the applications prefer to connect to zookeeper instead brokers? I assume you are talking about the clients and some of our tools? These are parts of an older design and we are actively working on fixing this. The consumer used Zookeeper to store offsets, in 0.8.2 there's an option to use Kafka itself for that (by setting *offsets.storage = kafka*). We are planning on fixing the tools in 0.9, but obviously they are less performance sensitive than the consumers. 2. Regarding your tests and disk usage - I'm not sure exactly what fills your disk - if its the kafka transaction logs (i.e. log.dir), then we expect to store the size of all messages sent times the replication faction configured for each topic. We keep messages for the amount of time specified in *log.retention* parameters. If the disk is filled within minutes, either set log.retention.minutes very low (at risk of losing data if consumers need restart), or make sure your disk capacity matches the rates in which producers send data. Gwen On Sat, Feb 7, 2015 at 3:01 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I have some doubts about the Kafka, the first is Why sometimes the applications prefer to connect to zookeeper instead brokers? Connecting to zookeeper could create an overhead, because we are inserting other element between producer and consumer. Another question is about the information sent by producer, in my tests the producer send the messages to brokers and a few minutes my HardDisk is full (my harddisk has 250GB), is there something to do in the configuration to minimize this? Thanks -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Fetch size
Hi Kafka team, We have a use case where we need to consume from ~20 topics (each with 24 partitions), we have a potential max message size of 20MB so we've set our consumer fetch.size to 20MB but that's causing very poor performance on our consumer (most of our messages are in the 10-100k range). Is it possible to set the fetch size to a lower number than the max message size and gracefully handle larger messages (as a trapped exception for example) in order to improve our throughput? Thank you in advance for your help CJ Woolard
Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer
Looks good to me. I like the idea of not blocking additional sends but not guaranteeing that flush() will deliver them. I assume that with linger.ms = 0, flush will just be a noop (since the queue will be empty). Is that correct? Gwen On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps jay.kr...@gmail.com wrote: Following up on our previous thread on making batch send a little easier, here is a concrete proposal to add a flush() method to the producer: https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API A proposed implementation is here: https://issues.apache.org/jira/browse/KAFKA-1865 Thoughts? -Jay
Poor performance consuming multiple topics
Hi Kafka team, We have a use case where we need to consume from ~20 topics (each with 24 partitions), we have a potential max message size of 20MB so we've set our consumer fetch.size to 20MB but that's causing very poor performance on our consumer (most of our messages are in the 10-100k range). Is it possible to set the fetch size to a lower number than the max message size and gracefully handle larger messages (as a trapped exception for example) in order to improve our throughput? Thank you in advance for your help CJ Woolard
Apache Kafka 0.8.2 Consumer Example
Hi, I had a client running on Kafka 0.8.1 using the High-Level consumer API working fine. Today I updated my Kafka installation for the 0.8.2 version (tried all versions of Scala) but the consumer doesn't get any messages. I tested using the kafka-console-consumer.sh utility tool and works fine, only my Java program that not. Did I miss something? I heard that the API changed, so I'd like to know if someone can share a simple client with me. Please, respond directly to me or just reply all because I am not currently subscribed to the group. Thanks, Ricardo Ferreira
Re: Apache Kafka 0.8.2 Consumer Example
I have a 0.8.1 high level consumer working fine with 0.8.2 server. Few of them actually :) AFAIK the API did not change. Do you see any error messages? Do you have timeout configured on the consumer? What does the offset checker tool say? On Fri, Feb 6, 2015 at 4:49 PM, Ricardo Ferreira jricardoferre...@gmail.com wrote: Hi, I had a client running on Kafka 0.8.1 using the High-Level consumer API working fine. Today I updated my Kafka installation for the 0.8.2 version (tried all versions of Scala) but the consumer doesn't get any messages. I tested using the kafka-console-consumer.sh utility tool and works fine, only my Java program that not. Did I miss something? I heard that the API changed, so I'd like to know if someone can share a simple client with me. Please, respond directly to me or just reply all because I am not currently subscribed to the group. Thanks, Ricardo Ferreira
Re: High Latency in Kafka
I'm wondering how much of the time is spent by Logstash reading and processing the log vs. time spent sending data to Kafka. Also, I'm not familiar with log.stash internals, perhaps it can be tuned to send the data to Kafka in larger batches? At the moment its difficult to tell where is the slowdown. More information about the breakdown of time will help. You can try Flume's SpoolingDirectory source with Kafka Channel or Sink and see if you get improved performance out of other tools. Gwen On Sun, Feb 8, 2015 at 12:06 AM, Vineet Mishra clearmido...@gmail.com wrote: Hi All, I am having some log files of around 30GB, I am trying to event process these logs by pushing them to Kafka. I could clearly see the throughput achieved while publishing these event to Kafka is quiet slow. So as mentioned for the single log file of 30GB, the Logstash is continuously emitting to Kafka and it is running from more than 2 days but still it has processed just 60% of the log data. I was looking out for a way to increase the efficiency of the publishing the event to kafka as with this rate of data ingestion I don't think it will be a good option to move ahead. Looking out for performance improvisation for the same. Experts advise required! Thanks!
Re: Doubts Kafka
Sorry, I should have read the release notes before I asked this question. The answer was in there. Internally the implementation of the offset storage is just a compacted http://kafka.apache.org/documentation.html#compaction Kafka topic ( __consumer_offsets) keyed on the consumer’s group, topic, and partition. The offset commit request writes the offset to the compacted Kafka topic using the highest level of durability guarantee that Kafka provides ( acks=-1) so that offsets are never lost in the presence of uncorrelated failures. Kafka maintains an in-memory view of the latest offset per consumer group, topic, partition triplet, so offset fetch requests can be served quickly without requiring a full scan of the compacted offsets topic. With this feature, consumers can checkpoint offsets very often, possibly per message. On Sun, Feb 8, 2015 at 9:39 AM, Christopher Piggott cpigg...@gmail.com wrote: The consumer used Zookeeper to store offsets, in 0.8.2 there's an option to use Kafka itself for that (by setting *offsets.storage = kafka Does it still really live in zookeeper, and kafka is proxying the requests through? On Sun, Feb 8, 2015 at 9:25 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Eduardo, 1. Why sometimes the applications prefer to connect to zookeeper instead brokers? I assume you are talking about the clients and some of our tools? These are parts of an older design and we are actively working on fixing this. The consumer used Zookeeper to store offsets, in 0.8.2 there's an option to use Kafka itself for that (by setting *offsets.storage = kafka*). We are planning on fixing the tools in 0.9, but obviously they are less performance sensitive than the consumers. 2. Regarding your tests and disk usage - I'm not sure exactly what fills your disk - if its the kafka transaction logs (i.e. log.dir), then we expect to store the size of all messages sent times the replication faction configured for each topic. We keep messages for the amount of time specified in *log.retention* parameters. If the disk is filled within minutes, either set log.retention.minutes very low (at risk of losing data if consumers need restart), or make sure your disk capacity matches the rates in which producers send data. Gwen On Sat, Feb 7, 2015 at 3:01 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I have some doubts about the Kafka, the first is Why sometimes the applications prefer to connect to zookeeper instead brokers? Connecting to zookeeper could create an overhead, because we are inserting other element between producer and consumer. Another question is about the information sent by producer, in my tests the producer send the messages to brokers and a few minutes my HardDisk is full (my harddisk has 250GB), is there something to do in the configuration to minimize this? Thanks -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Doubts Kafka
The consumer used Zookeeper to store offsets, in 0.8.2 there's an option to use Kafka itself for that (by setting *offsets.storage = kafka Does it still really live in zookeeper, and kafka is proxying the requests through? On Sun, Feb 8, 2015 at 9:25 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Eduardo, 1. Why sometimes the applications prefer to connect to zookeeper instead brokers? I assume you are talking about the clients and some of our tools? These are parts of an older design and we are actively working on fixing this. The consumer used Zookeeper to store offsets, in 0.8.2 there's an option to use Kafka itself for that (by setting *offsets.storage = kafka*). We are planning on fixing the tools in 0.9, but obviously they are less performance sensitive than the consumers. 2. Regarding your tests and disk usage - I'm not sure exactly what fills your disk - if its the kafka transaction logs (i.e. log.dir), then we expect to store the size of all messages sent times the replication faction configured for each topic. We keep messages for the amount of time specified in *log.retention* parameters. If the disk is filled within minutes, either set log.retention.minutes very low (at risk of losing data if consumers need restart), or make sure your disk capacity matches the rates in which producers send data. Gwen On Sat, Feb 7, 2015 at 3:01 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I have some doubts about the Kafka, the first is Why sometimes the applications prefer to connect to zookeeper instead brokers? Connecting to zookeeper could create an overhead, because we are inserting other element between producer and consumer. Another question is about the information sent by producer, in my tests the producer send the messages to brokers and a few minutes my HardDisk is full (my harddisk has 250GB), is there something to do in the configuration to minimize this? Thanks -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Doubts Kafka
On Sun, Feb 8, 2015 at 6:39 AM, Christopher Piggott cpigg...@gmail.com wrote: The consumer used Zookeeper to store offsets, in 0.8.2 there's an option to use Kafka itself for that (by setting *offsets.storage = kafka Does it still really live in zookeeper, and kafka is proxying the requests through? They don't live in Zookeeper. They live in a secret Kafka topic (__offsets or something like that). For migration purposes, you can set dual.commit.enable = true and then offsets will be stored in both Kafka and ZK, but the intention is to migrate to 100% Kafka storage. On Sun, Feb 8, 2015 at 9:25 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Eduardo, 1. Why sometimes the applications prefer to connect to zookeeper instead brokers? I assume you are talking about the clients and some of our tools? These are parts of an older design and we are actively working on fixing this. The consumer used Zookeeper to store offsets, in 0.8.2 there's an option to use Kafka itself for that (by setting *offsets.storage = kafka*). We are planning on fixing the tools in 0.9, but obviously they are less performance sensitive than the consumers. 2. Regarding your tests and disk usage - I'm not sure exactly what fills your disk - if its the kafka transaction logs (i.e. log.dir), then we expect to store the size of all messages sent times the replication faction configured for each topic. We keep messages for the amount of time specified in *log.retention* parameters. If the disk is filled within minutes, either set log.retention.minutes very low (at risk of losing data if consumers need restart), or make sure your disk capacity matches the rates in which producers send data. Gwen On Sat, Feb 7, 2015 at 3:01 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I have some doubts about the Kafka, the first is Why sometimes the applications prefer to connect to zookeeper instead brokers? Connecting to zookeeper could create an overhead, because we are inserting other element between producer and consumer. Another question is about the information sent by producer, in my tests the producer send the messages to brokers and a few minutes my HardDisk is full (my harddisk has 250GB), is there something to do in the configuration to minimize this? Thanks -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Doubts Kafka
Hi Eduardo, 1. Why sometimes the applications prefer to connect to zookeeper instead brokers? I assume you are talking about the clients and some of our tools? These are parts of an older design and we are actively working on fixing this. The consumer used Zookeeper to store offsets, in 0.8.2 there's an option to use Kafka itself for that (by setting *offsets.storage = kafka*). We are planning on fixing the tools in 0.9, but obviously they are less performance sensitive than the consumers. 2. Regarding your tests and disk usage - I'm not sure exactly what fills your disk - if its the kafka transaction logs (i.e. log.dir), then we expect to store the size of all messages sent times the replication faction configured for each topic. We keep messages for the amount of time specified in *log.retention* parameters. If the disk is filled within minutes, either set log.retention.minutes very low (at risk of losing data if consumers need restart), or make sure your disk capacity matches the rates in which producers send data. Gwen On Sat, Feb 7, 2015 at 3:01 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi Guys, I have some doubts about the Kafka, the first is Why sometimes the applications prefer to connect to zookeeper instead brokers? Connecting to zookeeper could create an overhead, because we are inserting other element between producer and consumer. Another question is about the information sent by producer, in my tests the producer send the messages to brokers and a few minutes my HardDisk is full (my harddisk has 250GB), is there something to do in the configuration to minimize this? Thanks -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Apache Kafka 0.8.2 Consumer Example
Hi Gwen, Sorry, both the consumer and the broker are 0.8.2? A = Yes So what's on 0.8.1? A = It works fine using 0.8.1 for server AND client. You probably know the consumer group of your application. Can you use the offset checker tool on that? A = Yes, I know from the consumer, and the offset checker gave me nothing about that group. Thanks, Ricardo On Sun, Feb 8, 2015 at 1:19 PM, Gwen Shapira gshap...@cloudera.com wrote: Sorry, both the consumer and the broker are 0.8.2? So what's on 0.8.1? I seriously doubt downgrading is the solution. You probably know the consumer group of your application. Can you use the offset checker tool on that? Gwen On Sun, Feb 8, 2015 at 9:01 AM, Ricardo Ferreira jricardoferre...@gmail.com wrote: Hi Gwen, Thanks for the response. In my case, I have both consumer application and the server versions in 0.8.2, Scala 2.10. No errors are thrown, and my *zookeeper.session.timeout.ms http://zookeeper.session.timeout.ms* property is set to 500, although I tried 5000 and also didn't worked. I checked the offset checker tool, but it asks for a group in which I don't know which group the kafka-console-producer is using. I tried the consumer group but it gave the following message: Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/test/0. Perhaps the solution is downgrade the consumer libs to 0.8.1? Thanks, Ricardo On Sun, Feb 8, 2015 at 11:27 AM, Gwen Shapira gshap...@cloudera.com wrote: I have a 0.8.1 high level consumer working fine with 0.8.2 server. Few of them actually :) AFAIK the API did not change. Do you see any error messages? Do you have timeout configured on the consumer? What does the offset checker tool say? On Fri, Feb 6, 2015 at 4:49 PM, Ricardo Ferreira jricardoferre...@gmail.com wrote: Hi, I had a client running on Kafka 0.8.1 using the High-Level consumer API working fine. Today I updated my Kafka installation for the 0.8.2 version (tried all versions of Scala) but the consumer doesn't get any messages. I tested using the kafka-console-consumer.sh utility tool and works fine, only my Java program that not. Did I miss something? I heard that the API changed, so I'd like to know if someone can share a simple client with me. Please, respond directly to me or just reply all because I am not currently subscribed to the group. Thanks, Ricardo Ferreira
Re: Console Producer Throwing LeaderNotAvailableException Despite Existing Leader for Partition
Thanks Tao, That fixed the problem. Console producer now correctly pushes to the topic and console consumer can read the topic data. -Alex On Sun, Feb 8, 2015 at 1:47 AM, tao xiao xiaotao...@gmail.com wrote: Alex, I got similar error before due to incorrect network binding of my laptop's wireless interface. You can try with setting advertised.host.name=kafka's server hostname in the server.properties and run it again. On Sun, Feb 8, 2015 at 8:38 AM, Alex Melville amelvi...@g.hmc.edu wrote: Howdy all, I recently upgraded to Kafka 0.8.2.0 and am trying to verify that everything still works as expected. I spin up two brokers, one zk instance, and then create a topic using kafka-topics.sh --create --zookeeper ad-0104:2181 --topic deleteme --partitions 2 --replication-factor 1 Then I run --describe to check if the partitions have leaders. I get kafka-topics.sh --describe --zookeeper ad-0104:2181 --topic deleteme Topic:deleteme PartitionCount:2 ReplicationFactor:1 Configs: Topic: deleteme Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: deleteme Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Finally, I run the console producer kafka-console-producer.sh --broker-list ad-0102:9092 --topic deleteme I get the following warning [2015-02-08 00:36:24,244] WARN Property topic is not valid (kafka.utils.VerifiableProperties) and then it waits for console input. When I try to send a message I get the following list of error messages [2015-02-08 00:37:04,735] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,751] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,752] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:04,859] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,863] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,863] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:04,968] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,974] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,974] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,079] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,084] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,084] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,189] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException
Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer
Well actually in the case of linger.ms = 0 the send is still asynchronous so calling flush() blocks until all the previously sent records have completed. It doesn't speed anything up in that case, though, since they are already available to send. -Jay On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira gshap...@cloudera.com wrote: Looks good to me. I like the idea of not blocking additional sends but not guaranteeing that flush() will deliver them. I assume that with linger.ms = 0, flush will just be a noop (since the queue will be empty). Is that correct? Gwen On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps jay.kr...@gmail.com wrote: Following up on our previous thread on making batch send a little easier, here is a concrete proposal to add a flush() method to the producer: https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API A proposed implementation is here: https://issues.apache.org/jira/browse/KAFKA-1865 Thoughts? -Jay