Re: Horizontally Scaling Kafka Consumers
The Go Kafka Client also supports offset storage in ZK and Kafka https://github.com/stealthly/go_kafka_client/blob/master/docs/offset_storage.md and has two other strategies for partition ownership with a consensus server (currently uses Zookeeper will be implementing Consul in near future). ~ Joestein On Thu, Apr 30, 2015 at 2:15 AM, Nimi Wariboko Jr n...@channelmeter.com wrote: My mistake, it seems the Java drivers are a lot more advanced than the Shopify's Kafka driver (or I am missing something) - and I haven't used Kafka before. With the Go driver - it seems you have to manage offsets and partitions within the application code, while in Scala driver it seems you have the option of simply subscribing to a topic, and someone else will manage that part. After digging around a bit more, I found there is another library - https://github.com/wvanbergen/kafka - that speaks the consumergroup API and accomplishes what I was looking for and I assume is implemented by keeping track of memberships w/ Zookeeper. Thank you for the information - it really helped clear up what I failing to understand with kafka. Nimi On Wed, Apr 29, 2015 at 10:10 PM, Joe Stein joe.st...@stealth.ly wrote: You can do this with the existing Kafka Consumer https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106 and probably any other Kafka client too (maybe with minor/major rework to-do the offset management). The new consumer approach is more transparent on Subscribing To Specific Partitions https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234 . Here is a Docker file (** pull request pending **) for wrapping kafka consumers (doesn't have to be the go client, need to abstract that out some more after more testing) https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile Also a VM (** pull request pending **) to build container, push to local docker repository and launch on Apache Mesos https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant as working example how-to-do. All of this could be done without the Docker container and still work on Mesos ... or even without Mesos and on YARN. You might also want to checkout how Samza integrates with Execution Frameworks http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375 and built in YARN support. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Wed, Apr 29, 2015 at 8:56 AM, David Corley davidcor...@gmail.com wrote: You're right Stevo, I should re-phrase to say that there can be no more _active_ consumers than there are partitions (within a single consumer group). I'm guessing that's what Nimi is alluding to asking, but perhaps he can elaborate on whether he's using consumer groups and/or whether the 100 partitions are all for a single topic, or multiple topics. On 29 April 2015 at 13:38, Stevo Slavić ssla...@gmail.com wrote: Please correct me if wrong, but I think it is really not hard constraint that one cannot have more consumers (from same group) than partitions on single topic - all the surplus consumers will not be assigned to consume any partition, but they can be there and as soon as one active consumer from same group goes offline (its connection to ZK is dropped), consumers from the group will be rebalanced so one passively waiting consumer will become active. Kind regards, Stevo Slavic. On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com wrote: If the 100 partitions are all for the same topic, you can have up to 100 consumers working as part of a single consumer group for that topic. You cannot have more consumers than there are partitions within a given consumer group. On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com wrote: Hi, I was wondering what options there are for horizontally scaling kafka consumers? Basically if I have 100 partitions and 10 consumers, and want to temporarily scale up to 50 consumers, what options do I have? So far I've thought of just simply tracking consumer membership somehow (either through Raft or zookeeper's znodes) on the consumers.
Re: Kafka 0.8.2 beta - release
That's part of the new consumer API that hasn't been released yet. The API happens to be included in the 0.8.2.* artifacts because it is under development, but isn't yet released -- it hasn't been mentioned in the release notes, nor is it in the official documentation: http://kafka.apache.org/documentation.html That API is currently under active development and should be available in the next release. If you want to test it out, you can use build a copy yourself of trunk, but the high-level consumer functionality is not yet implemented so it likely does not include everything you want. For the time being, you probably want to use the existing high level consumer API: http://kafka.apache.org/documentation.html#highlevelconsumerapi On Wed, Apr 29, 2015 at 11:07 PM, Gomathivinayagam Muthuvinayagam sankarm...@gmail.com wrote: Thank you, It seems the following methods are not supported in KafkaConsumer. Do you know when they will be supported? public OffsetMetadata commit(MapTopicPartition, Long offsets, boolean sync) { throw new UnsupportedOperationException(); } Thanks Regards, On Wed, Apr 29, 2015 at 10:52 PM, Ewen Cheslack-Postava e...@confluent.io wrote: It has already been released, including a minor revision to fix some critical bugs. The latest release is 0.8.2.1. The downloads page has links and release notes: http://kafka.apache.org/downloads.html On Wed, Apr 29, 2015 at 10:22 PM, Gomathivinayagam Muthuvinayagam sankarm...@gmail.com wrote: I see lot of interesting features with Kafka 0.8.2 beta. I am just wondering when that will be released. Is there any timeline for that? Thanks Regards, -- Thanks, Ewen -- Thanks, Ewen
Re: MultiThreaded HLConsumer Exits before events are all consumed
What I found was 2 problems. 1. The producer wasn't passing in a partition key, so not all partitions were getting data. 2. After fixing the producer, I could see all threads getting data consistently then the shutdown method was clearly killing the threads. I have removed the shutdown,and with the producer changes sending in a key, this looks like it is running correctly now. Thanks! On Wed, Apr 29, 2015 at 10:59 PM, tao xiao xiaotao...@gmail.com wrote: The log suggests that the shutdown method were still called Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down Please ensure no consumer.shutdown(); and executor.shutdown(); are called during the course of your program On Thu, Apr 30, 2015 at 2:23 AM, christopher palm cpa...@gmail.com wrote: Commenting out Example shutdown did not seem to make a difference, I added the print statement below to highlight the fact. The other threads still shut down, and only one thread lives on, eventually that dies after a few minutes as well Could this be that the producer default partitioner is isn't balancing data across all partitions? Thanks, Chris Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka scheduler 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping leader finder thread 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping all fetchers 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-consumergroup], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-] All connections stopped 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. Shutting down Thread: 2 Shutting down Thread: 1 Shutting down Thread: 3 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector: [consumergroup], ZKConsumerConnector shut down completed Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail distance|-73.99021500035|40.6636611 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector: [consumergroup], stopping watcher executor thread for consumer consumergroup Thread 0: 2015-04-29 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote: example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote: Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties();
Re: Horizontally Scaling Kafka Consumers
On Thu, Apr 30, 2015 at 2:15 AM, Nimi Wariboko Jr n...@channelmeter.com wrote: My mistake, it seems the Java drivers are a lot more advanced than the Shopify's Kafka driver (or I am missing something) - and I haven't used Kafka before. With the Go driver - it seems you have to manage offsets and partitions within the application code, while in Scala driver it seems you have the option of simply subscribing to a topic, and someone else will manage that part. After digging around a bit more, I found there is another library - https://github.com/wvanbergen/kafka - that speaks the consumergroup API and accomplishes what I was looking for and I assume is implemented by keeping track of memberships w/ Zookeeper. Yes. That library is built on top of Sarama (Shopify's Go kafka driver), and it's on our roadmap to integrate it properly. As far as I know, this is the only major area where Sarama is lagging behind the jvm client. Thank you for the information - it really helped clear up what I failing to understand with kafka. Nimi On Wed, Apr 29, 2015 at 10:10 PM, Joe Stein joe.st...@stealth.ly wrote: You can do this with the existing Kafka Consumer https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106 and probably any other Kafka client too (maybe with minor/major rework to-do the offset management). The new consumer approach is more transparent on Subscribing To Specific Partitions https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234 . Here is a Docker file (** pull request pending **) for wrapping kafka consumers (doesn't have to be the go client, need to abstract that out some more after more testing) https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile Also a VM (** pull request pending **) to build container, push to local docker repository and launch on Apache Mesos https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant as working example how-to-do. All of this could be done without the Docker container and still work on Mesos ... or even without Mesos and on YARN. You might also want to checkout how Samza integrates with Execution Frameworks http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375 and built in YARN support. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Wed, Apr 29, 2015 at 8:56 AM, David Corley davidcor...@gmail.com wrote: You're right Stevo, I should re-phrase to say that there can be no more _active_ consumers than there are partitions (within a single consumer group). I'm guessing that's what Nimi is alluding to asking, but perhaps he can elaborate on whether he's using consumer groups and/or whether the 100 partitions are all for a single topic, or multiple topics. On 29 April 2015 at 13:38, Stevo Slavić ssla...@gmail.com wrote: Please correct me if wrong, but I think it is really not hard constraint that one cannot have more consumers (from same group) than partitions on single topic - all the surplus consumers will not be assigned to consume any partition, but they can be there and as soon as one active consumer from same group goes offline (its connection to ZK is dropped), consumers from the group will be rebalanced so one passively waiting consumer will become active. Kind regards, Stevo Slavic. On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com wrote: If the 100 partitions are all for the same topic, you can have up to 100 consumers working as part of a single consumer group for that topic. You cannot have more consumers than there are partitions within a given consumer group. On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com wrote: Hi, I was wondering what options there are for horizontally scaling kafka consumers? Basically if I have 100 partitions and 10 consumers, and want to temporarily scale up to 50 consumers, what options do I have? So far I've thought of just simply tracking consumer membership somehow (either through Raft or zookeeper's znodes) on the consumers.
Sometimes I don't get a leader with 1 broker
Running a 1 broker system. I had some issues with the system but got it working. I've deleted the topic I had trouble with and re-created it. But describing shows no leader, not producer/consumption works on it. I create a brand new topic with a name I never used before and I get a leader. I think I sometimes get a leader and sometimes don't. Not sure. controller log is fine. Is this normal?
Kafka still aware of old zookeeper nodes
I had 3 zookeeper nodes. I added 3 new ones and shut down the old 3. The server.log shows Closing socket connection error to the old IPs. I rebooted the kafka server entirely but it still somehow seems aware of these servers. Any ideas what's up?
Re: Kafka still aware of old zookeeper nodes
Have you changed zookeeper.connect= in server.properties. A better procedure for replacing zookeeper nodes would be to shutdown one and install the new one with the same ip. This can easily be done to a running cluster. /svante 2015-04-30 20:08 GMT+02:00 Dillian Murphey crackshotm...@gmail.com: I had 3 zookeeper nodes. I added 3 new ones and shut down the old 3. The server.log shows Closing socket connection error to the old IPs. I rebooted the kafka server entirely but it still somehow seems aware of these servers. Any ideas what's up?
Re: Horizontally Scaling Kafka Consumers
You need to first decide the conditions that need to be met for you to scale to 50 consumers. These can be as simple as the consumer lag. Look at the console offset checker tool and see if any of those numbers make sense. Your existing consumers could also produce some metrics based on which another process will decide when to spawn new customers. -- Sharninder On Wed, Apr 29, 2015 at 11:58 PM, Nimi Wariboko Jr n...@channelmeter.com wrote: Hi, I was wondering what options there are/what other people are doing for horizontally scaling kafka consumers? Basically if I have 100 partitions and 10 consumers, and want to temporarily scale up to 50 consumers, what can I do? So far I've thought of just simply tracking consumer membership somehow (either through zookeeper's ephemeral nodes or maybe using gossip) on the consumers to achieve consensus on who consumes what. Another option would be having a router, possibly using something like nsq (I understand that they are similar pieces of software, but what we are going for is a persistent distributed queue (sharding) which is why I'm looking into Kafka)? -- -- Sharninder
Delete topic / Recreate = No leader
I am trying to reproduce this. But if I create a topic, then delete it, then re-create it, no leader is getting assigned. I can still produce/consume messages (via command line, basic testing). Is there some additional cleanup I need to do? Thanks for your time!
Re: Kafka still aware of old zookeeper nodes
Not sure if this is the best way to do this, but my zookeeper.connect is set to a DNS alias which points to a load balancer for 3 zookeeper nodes. I was trying this to see if I could have the kafka config dynamic and allow me to change/scale whatever I wanted with zookeeper and not have to ever mess with the config for kafka. Thanks for your comments. On Thu, Apr 30, 2015 at 11:35 AM, svante karlsson s...@csi.se wrote: Have you changed zookeeper.connect= in server.properties. A better procedure for replacing zookeeper nodes would be to shutdown one and install the new one with the same ip. This can easily be done to a running cluster. /svante 2015-04-30 20:08 GMT+02:00 Dillian Murphey crackshotm...@gmail.com: I had 3 zookeeper nodes. I added 3 new ones and shut down the old 3. The server.log shows Closing socket connection error to the old IPs. I rebooted the kafka server entirely but it still somehow seems aware of these servers. Any ideas what's up?
Re: New Producer API - batched sync mode support
2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava e...@confluent.io: They aren't going to get this anyway (as Jay pointed out) given the current broker implementation Is it also incorrect to assume atomicity even if all messages in the batch go to the same partition?
Re: New Producer API - batched sync mode support
Why do we think atomicity is expected, if the old API we are emulating here lacks atomicity? I don't remember emails to the mailing list saying: I expected this batch to be atomic, but instead I got duplicates when retrying after a failed batch send. Maybe atomicity isn't as strong requirement as we believe? That is, everyone expects some duplicates during failure events and handles them downstream? On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov ibalas...@gmail.com wrote: 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava e...@confluent.io: They aren't going to get this anyway (as Jay pointed out) given the current broker implementation Is it also incorrect to assume atomicity even if all messages in the batch go to the same partition?
Re: Data replication and zero data loss
Which mirror maker version did you look at? The MirrorMaker in trunk should not have data loss if you just use the default setting. On 4/30/15, 7:53 PM, Joong Lee jo...@me.com wrote: Hi, We are exploring Kafka to keep two data centers (primary and DR) running hosts of elastic search nodes in sync. One key requirement is that we can't lose any data. We POC'd use of MirrorMaker and felt it may not meet out data loss requirement. I would like ask the community if we should look for another solution or would Kafka be the right solution considering zero data loss requirement. Thanks
RE: Java Consumer API
It'll be officially ready only in version 0.9. Aditya From: Mohit Gupta [success.mohit.gu...@gmail.com] Sent: Thursday, April 30, 2015 8:58 PM To: users@kafka.apache.org Subject: Java Consumer API Hello, Kafka documentation ( http://kafka.apache.org/documentation.html#producerapi ) suggests using only Producer from kafka-clients ( 0.8.2.0 ) and to use Consumer from the packaged scala client. I just want to check once if the Consumer API from this client is ready for production use. -- Best Regards, Mohit
Re: New Producer API - batched sync mode support
Roshan, If I understand correctly, you just want to make sure a number of messages has been sent successfully. Using callback might be easier to do so. Public class MyCallback implements Callback { public SetRecordMetadata failedSend; @Override Public void onCompletion(RecordMetadata metadata, Exception exception) { If (exception != null) failedSend.add(metadata); } Public boolean hasFailure() {return failedSend.size() 0); } In main code, you just need to do the following: { MyCallback callback = new MyCallback(); For (ProducerRecord record: records) Producer.send(); Producer.flush(); If (callback.hasFailure()) // do something } This will avoid the loop checking and provide you pretty much the same guarantee as old producer if not better. Jiangjie (Becket) Qin On 4/30/15, 4:54 PM, Roshan Naik ros...@hortonworks.com wrote: @Gwen, @Ewen, While atomicity of a batch is nice to have, it is not essential. I don't think users always expect such atomicity. Atomicity is not even guaranteed in many un-batched systems let alone batched systems. As long as the client gets informed about the ones that failed in the batch.. that would suffice. One issue with the current flush() based batch-sync implementation is that the client needs to iterate over *all* futures in order to scan for any failed messages. In the common case, it is just wasted CPU cycles as there won't be any failures. Would be ideal if the client is informed about only problematic messages. IMO, adding a new send(batch) API may be meaningful if it can provide benefits beyond what user can do with a simple wrapper on existing stuff. For example: eliminate the CPU cycles wasted on examining results from successful message deliveries, or other efficiencies. @Ivan, I am not certain, I am thinking that there is a possibility that the first few messages of the batch got accepted, but not the remainder ? At the same time based on some comments made earlier it appears underlying implementation does have an all-or-none mechanism for a batch going to a partition. For simplicity, streaming clients may not want to deal explicitly with partitions (and get exposed to repartitioning leader change type issues) -roshan On 4/30/15 2:07 PM, Gwen Shapira gshap...@cloudera.com wrote: Why do we think atomicity is expected, if the old API we are emulating here lacks atomicity? I don't remember emails to the mailing list saying: I expected this batch to be atomic, but instead I got duplicates when retrying after a failed batch send. Maybe atomicity isn't as strong requirement as we believe? That is, everyone expects some duplicates during failure events and handles them downstream? On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov ibalas...@gmail.com wrote: 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava e...@confluent.io: They aren't going to get this anyway (as Jay pointed out) given the current broker implementation Is it also incorrect to assume atomicity even if all messages in the batch go to the same partition?
Re: Data replication and zero data loss
When we evaluated MirrorMaker last year we didn't find any risk of data loss, only duplicate messages in the case of a network partition. Did you discover data loss in your tests, or were you just looking at the docs? On Fri, 1 May 2015 at 4:31 pm Jiangjie Qin j...@linkedin.com.invalid wrote: Which mirror maker version did you look at? The MirrorMaker in trunk should not have data loss if you just use the default setting. On 4/30/15, 7:53 PM, Joong Lee jo...@me.com wrote: Hi, We are exploring Kafka to keep two data centers (primary and DR) running hosts of elastic search nodes in sync. One key requirement is that we can't lose any data. We POC'd use of MirrorMaker and felt it may not meet out data loss requirement. I would like ask the community if we should look for another solution or would Kafka be the right solution considering zero data loss requirement. Thanks
Java Consumer API
Hello, Kafka documentation ( http://kafka.apache.org/documentation.html#producerapi ) suggests using only Producer from kafka-clients ( 0.8.2.0 ) and to use Consumer from the packaged scala client. I just want to check once if the Consumer API from this client is ready for production use. -- Best Regards, Mohit
Pulling Snapshots from Kafka, Log compaction last compact offset
Hello Everyone, I am quite exited about the recent example of replicating PostgresSQL Changes to Kafka. My view on the log compaction feature always had been a very sceptical one, but now with its great potential exposed to the wide public, I think its an awesome feature. Especially when pulling this data into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want to thank everyone who had the vision of building these kind of systems during a time I could not imagine those. There is one open question that I would like people to help me with. When pulling a snapshot of a partition into HDFS using a camus-like application I feel the need of keeping a Set of all keys read so far and stop as soon as I find a key beeing already in my set. I use this as an indicator of how far the log compaction has happened already and only pull up to this point. This works quite well as I do not need to keep the messages but only their keys in memory. The question I want to raise with the community is: How do you prevent pulling the same record twice (in different versions) and would it be beneficial if the OffsetResponse would also return the last offset that got compacted so far and the application would just pull up to this point? Looking forward for some recommendations and comments. Best Jan
Data replication and zero data loss
Hi, We are exploring Kafka to keep two data centers (primary and DR) running hosts of elastic search nodes in sync. One key requirement is that we can't lose any data. We POC'd use of MirrorMaker and felt it may not meet out data loss requirement. I would like ask the community if we should look for another solution or would Kafka be the right solution considering zero data loss requirement. Thanks
Re: Leaderless topics
Which Kafka version are you using? On Thu, Apr 30, 2015 at 4:11 PM, Dillian Murphey crackshotm...@gmail.com wrote: Scenerio with 1 node broker, and 3 node zookeeper ensemble. 1) Create topic 2) Delete topic 3) Re-create with same name I'm noticing this recreation gives me Leader: non, and Isr: as empty. Any ideas what the deal is here? I googled around and not being an experienced kafka admin, someone said to delete the /controller entry in zk. This appears to fix the problem on existing topics that show no leader. Is it ok to do this? What am I doing by deleting /controller? Is there a better way? Thanks for any advice, and your time of course.
Hitting integer limit when setting log segment.bytes
Hey all, I am attempting to create a topic which uses 8GB log segment sizes, like so: ./kafka-topics.sh --zookeeper localhost:2181 --create --topic perftest6p2r --partitions 6 --replication-factor 2 --config max.message.bytes=655360 --config segment.bytes=8589934592 And am getting the following error: Error while executing topic command For input string: 8589934592 java.lang.NumberFormatException: For input string: 8589934592 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:583) ... ... Upon further testing with --alter topic, it would appear that segment.bytes will not accept a value higher than 2,147,483,647, which is the upper limit for a signed 32bit int. This then restricts log segment size to an upper limit of ~2GB. We run Kafka on hard drive dense machines, each with 10gbit uplinks. We can set ulimits higher in order to deal with all the open file handles (since Kafka keeps all log segment file handles open), but it would be preferable to minimize this number, as well as minimize the amount of log segment rollover experienced at high traffic (ie: a rollover every 1-2 seconds or so when saturating 10gbe). Is there a reason (performance or otherwise) that a 32 bit integer is used rather than something larger? Thanks, -Lance
Leaderless topics
Scenerio with 1 node broker, and 3 node zookeeper ensemble. 1) Create topic 2) Delete topic 3) Re-create with same name I'm noticing this recreation gives me Leader: non, and Isr: as empty. Any ideas what the deal is here? I googled around and not being an experienced kafka admin, someone said to delete the /controller entry in zk. This appears to fix the problem on existing topics that show no leader. Is it ok to do this? What am I doing by deleting /controller? Is there a better way? Thanks for any advice, and your time of course.
Re: New Producer API - batched sync mode support
@Gwen, @Ewen, While atomicity of a batch is nice to have, it is not essential. I don't think users always expect such atomicity. Atomicity is not even guaranteed in many un-batched systems let alone batched systems. As long as the client gets informed about the ones that failed in the batch.. that would suffice. One issue with the current flush() based batch-sync implementation is that the client needs to iterate over *all* futures in order to scan for any failed messages. In the common case, it is just wasted CPU cycles as there won't be any failures. Would be ideal if the client is informed about only problematic messages. IMO, adding a new send(batch) API may be meaningful if it can provide benefits beyond what user can do with a simple wrapper on existing stuff. For example: eliminate the CPU cycles wasted on examining results from successful message deliveries, or other efficiencies. @Ivan, I am not certain, I am thinking that there is a possibility that the first few messages of the batch got accepted, but not the remainder ? At the same time based on some comments made earlier it appears underlying implementation does have an all-or-none mechanism for a batch going to a partition. For simplicity, streaming clients may not want to deal explicitly with partitions (and get exposed to repartitioning leader change type issues) -roshan On 4/30/15 2:07 PM, Gwen Shapira gshap...@cloudera.com wrote: Why do we think atomicity is expected, if the old API we are emulating here lacks atomicity? I don't remember emails to the mailing list saying: I expected this batch to be atomic, but instead I got duplicates when retrying after a failed batch send. Maybe atomicity isn't as strong requirement as we believe? That is, everyone expects some duplicates during failure events and handles them downstream? On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov ibalas...@gmail.com wrote: 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava e...@confluent.io: They aren't going to get this anyway (as Jay pointed out) given the current broker implementation Is it also incorrect to assume atomicity even if all messages in the batch go to the same partition?
Re: Why fetching meta-data for topic is done three times?
With reties 1 you still see the 3 secs delay? The idea is, you can change these property to reduce the time to throw exception to 1 secs or below. Does that help? Thanks Zakee On Apr 28, 2015, at 10:29 PM, Madhukar Bharti bhartimadhu...@gmail.com wrote: Hi Zakee, message.send.max.retries is 1 Regards, Madhukar On Tue, Apr 28, 2015 at 6:17 PM, Madhukar Bharti bhartimadhu...@gmail.com mailto:bhartimadhu...@gmail.com wrote: Hi Zakee, Thanks for your reply. message.send.max.retries 3 retry.backoff.ms 100 topic.metadata.refresh.interval.ms 600*1000 This is my properties. Regards, Madhukar On Tue, Apr 28, 2015 at 3:26 AM, Zakee kzak...@netzero.net wrote: What values do you have for below properties? Or are these set to defaults? message.send.max.retries retry.backoff.ms topic.metadata.refresh.interval.ms Thanks Zakee On Apr 23, 2015, at 11:48 PM, Madhukar Bharti bhartimadhu...@gmail.com wrote: Hi All, Once gone through code found that, While Producer starts it does three things: 1. Sends Meta-data request 2. Send message to broker(fetching broker list) 3. If number of message to be produce is grater than 0 then again tries to refresh metadata for outstanding produce requests. Each of the request takes configured timeout and go to next and finally once all is done then it will throw Exception(if 3 also fails). Here the problem is, if we set timeout as 1 sec then to throw an exception It takes 3 sec, so user request will be hanged up till 3 sec, that is comparatively high for response time and if all threads will be blocked due to producer send then whole application will be blocked for 3 sec. So we want to reduce this time to 1 sec. in overall to throw Exception. What is the possible way to do this? Thanks Madhukar On Thu, Apr 16, 2015 at 8:10 PM, Madhukar Bharti bhartimadhu...@gmail.com wrote: Hi All, I came across a problem, If we use broker IP which is not reachable or out of network. Then it takes more than configured time(request.timeout.ms ). After checking the log got to know that it is trying to fetch topic meta-data three times by changing correlation id. Due to this even though i keep (request.timeout.ms=1000) It takes 3 sec to throw Exception. I am using Kafka0.8.1.1 with patch https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch I have attached the log. Please check this and clarify why it is behaving like this. Whether it is by design or have to set some other property also. Regards Madhukar Want to place your ad here? Advertise on United Online http://thirdpartyoffers.netzero.net/TGL3255/5539ed87d69846d871dafmp08duc The WORST exercise for aging Avoid this #34;healthy#34; exercise to look feel 5-10 years YOUNGER http://thirdpartyoffers.netzero.net/TGL3255/5540b94620e14394636c0mp13duc http://thirdpartyoffers.netzero.net/TGL3255/5540b94620e14394636c0mp13duc
Re: Pulling Snapshots from Kafka, Log compaction last compact offset
I feel a need to respond to the Sqoop-killer comment :) 1) Note that most databases have a single transaction log per db and in order to get the correct view of the DB, you need to read it in order (otherwise transactions will get messed up). This means you are limited to a single producer reading data from the log, writing it to a single partition and getting it read from a single consumer. If the database is very large and very active, you may run into some issues there... Because Sqoop doesn't try to catch up with all the changes, but takes a snapshot (from multiple mappers in parallel), we can very rapidly Sqoop 10TB databases. 2) If HDFS is the target of getting data from Postgres, then postgresql - kafka - HDFS seems less optimal than postgresql - HDFS directly (in parallel). There are good reasons to get Postgres data to Kafka, but if the eventual goal is HDFS (or HBase), I suspect Sqoop still has a place. 3) Due to its parallelism and general purpose JDBC connector, I suspect that Sqoop is even a very viable way of getting data into Kafka. Gwen On Thu, Apr 30, 2015 at 2:27 PM, Jan Filipiak jan.filip...@trivago.com wrote: Hello Everyone, I am quite exited about the recent example of replicating PostgresSQL Changes to Kafka. My view on the log compaction feature always had been a very sceptical one, but now with its great potential exposed to the wide public, I think its an awesome feature. Especially when pulling this data into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want to thank everyone who had the vision of building these kind of systems during a time I could not imagine those. There is one open question that I would like people to help me with. When pulling a snapshot of a partition into HDFS using a camus-like application I feel the need of keeping a Set of all keys read so far and stop as soon as I find a key beeing already in my set. I use this as an indicator of how far the log compaction has happened already and only pull up to this point. This works quite well as I do not need to keep the messages but only their keys in memory. The question I want to raise with the community is: How do you prevent pulling the same record twice (in different versions) and would it be beneficial if the OffsetResponse would also return the last offset that got compacted so far and the application would just pull up to this point? Looking forward for some recommendations and comments. Best Jan
Re: Kafka 0.8.2 beta - release
Thank you, It seems the following methods are not supported in KafkaConsumer. Do you know when they will be supported? public OffsetMetadata commit(MapTopicPartition, Long offsets, boolean sync) { throw new UnsupportedOperationException(); } Thanks Regards, On Wed, Apr 29, 2015 at 10:52 PM, Ewen Cheslack-Postava e...@confluent.io wrote: It has already been released, including a minor revision to fix some critical bugs. The latest release is 0.8.2.1. The downloads page has links and release notes: http://kafka.apache.org/downloads.html On Wed, Apr 29, 2015 at 10:22 PM, Gomathivinayagam Muthuvinayagam sankarm...@gmail.com wrote: I see lot of interesting features with Kafka 0.8.2 beta. I am just wondering when that will be released. Is there any timeline for that? Thanks Regards, -- Thanks, Ewen
Re: Horizontally Scaling Kafka Consumers
My mistake, it seems the Java drivers are a lot more advanced than the Shopify's Kafka driver (or I am missing something) - and I haven't used Kafka before. With the Go driver - it seems you have to manage offsets and partitions within the application code, while in Scala driver it seems you have the option of simply subscribing to a topic, and someone else will manage that part. After digging around a bit more, I found there is another library - https://github.com/wvanbergen/kafka - that speaks the consumergroup API and accomplishes what I was looking for and I assume is implemented by keeping track of memberships w/ Zookeeper. Thank you for the information - it really helped clear up what I failing to understand with kafka. Nimi On Wed, Apr 29, 2015 at 10:10 PM, Joe Stein joe.st...@stealth.ly wrote: You can do this with the existing Kafka Consumer https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106 and probably any other Kafka client too (maybe with minor/major rework to-do the offset management). The new consumer approach is more transparent on Subscribing To Specific Partitions https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234 . Here is a Docker file (** pull request pending **) for wrapping kafka consumers (doesn't have to be the go client, need to abstract that out some more after more testing) https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile Also a VM (** pull request pending **) to build container, push to local docker repository and launch on Apache Mesos https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant as working example how-to-do. All of this could be done without the Docker container and still work on Mesos ... or even without Mesos and on YARN. You might also want to checkout how Samza integrates with Execution Frameworks http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375 and built in YARN support. ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Wed, Apr 29, 2015 at 8:56 AM, David Corley davidcor...@gmail.com wrote: You're right Stevo, I should re-phrase to say that there can be no more _active_ consumers than there are partitions (within a single consumer group). I'm guessing that's what Nimi is alluding to asking, but perhaps he can elaborate on whether he's using consumer groups and/or whether the 100 partitions are all for a single topic, or multiple topics. On 29 April 2015 at 13:38, Stevo Slavić ssla...@gmail.com wrote: Please correct me if wrong, but I think it is really not hard constraint that one cannot have more consumers (from same group) than partitions on single topic - all the surplus consumers will not be assigned to consume any partition, but they can be there and as soon as one active consumer from same group goes offline (its connection to ZK is dropped), consumers from the group will be rebalanced so one passively waiting consumer will become active. Kind regards, Stevo Slavic. On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com wrote: If the 100 partitions are all for the same topic, you can have up to 100 consumers working as part of a single consumer group for that topic. You cannot have more consumers than there are partitions within a given consumer group. On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com wrote: Hi, I was wondering what options there are for horizontally scaling kafka consumers? Basically if I have 100 partitions and 10 consumers, and want to temporarily scale up to 50 consumers, what options do I have? So far I've thought of just simply tracking consumer membership somehow (either through Raft or zookeeper's znodes) on the consumers.