0.7 design doc?
There used to be available a very lucid page describing Kafka 0.7, its design, and the rationale behind certain decisions. I last saw it about 18 months ago. I can't find it now. Is it still available? I can find the 0.8 version, it's up there on the site. Any help? Any links? Philip http://www.philipotoole.com
Re: If you run Kafka in AWS or Docker, how do you persist data?
Kafka on dedicated hosts running in docker under marathon under Mesos. It was a real bear to get working, but is really beautiful once I did manage to get it working. I simply run with a unique hostname constraint and number of instances = replication factor. If a broker dies and it isn't a hardware or network issue, marathon restarts it. The hardest part was that Kafka was registering to ZK with the internal (to docker) port. My workaround was that you have to use the same port inside and outside docker or it will register to ZK with whatever the port is inside the container. FYI this is an on premise dedicated Mesos cluster running on bare metal :) On Friday, February 27, 2015, James Cheng jch...@tivo.com wrote: Hi, I know that Netflix might be talking about Kafka on AWS at the March meetup, but I wanted to bring up the topic anyway. I'm sure that some people are running Kafka in AWS. Is anyone running Kafka within docker in production? How does that work? For both of these, how do you persist data? If on AWS, do you use EBS? Do you use ephemeral storage and then rely on replication? And if using docker, do you persist data outside the docker container and on the host machine? And related, how do you deal with broker failure? Do you simply replace it, and repopulate a new broker via replication? Or do you bring back up the broker with the persisted files? Trying to learn about what people are doing, beyond on premises and dedicated hardware. Thanks, -James -- Text by Jeff, typos by iPhone
.deleted file descriptors
Hi, After Kafka cleaned .log / .index files based on topic retention. I can still lsof a lot of .index.deleted files. And df shows usage on disk space is accumulated to full. When this happened, just by restarting broker, it will immediately free those disk space. I seems to me kafka after cleaning expired files still hold file descriptors which lead to disk space still being held. How do you config kafka to let kafka release file descriptors in this case ? Using kafka 0.8.1.1 Regards, Guangle
How replicas catch up the leader
Hi team, I had a replica node that was shutdown improperly due to no disk space left. I managed to clean up the disk and restarted the replica but the replica since then never caught up the leader shown below Topic:test PartitionCount:1 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 5 Replicas: 1,5,6 Isr: 5,6 broker 1 is the replica that failed before. Is there a way that I can force the replica to catch up the leader? -- Regards, Tao
Re: Unlimited Log Retention
Wouldn't it be a better choice to store the logs offline somewhere? HDFS and S3 are both good choices... -Mark On Feb 27, 2015, at 16:12, Warren Kiser war...@hioscar.com wrote: Does anyone know how to achieve unlimited log retention either globally or on a per topic basis? I tried explicitly setting the log.retention.bytes to -1 but the default time policy kicked in after 7 days and cleaned up the messages. Thanks! Warren
Re: cross-colo writing/reading?
Perhaps mirror maker is what you want? https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330 On Friday, February 27, 2015, Yang tedd...@gmail.com wrote: we tested our new application that reads and writes to kafka. at first we found the access latency is very high. then we realized that it's because the client and server are in different colos. moving them together reduces down the access time to 4 ms. I was wondering if there are any techniques/properties to set , or maybe enhancements to be made to the kafka code, that could cater to such situations ? Thanks Yang -- Text by Jeff, typos by iPhone
Re: NetworkProcessorAvgIdlePercent
Zakee, It would be useful to get the following. kafka.network:name=RequestQueueSize,type=RequestChannel kafka.network:name=RequestQueueTimeMs,request=Fetch,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=Produce,type=RequestMetrics Thanks, Jun On Thu, Feb 26, 2015 at 2:17 PM, Zakee kzak...@netzero.net wrote: Hi Jun, There are too many KPIs generated by kafka that I can’t monitor them all, I have monitoring a filtered list that I can understand so far. Which keys are you talking of? Currently I am not monitoring any of below keys but will add only those that are useful. kafka.network:name=RequestQueueSize,type=RequestChannel kafka.network:name=RequestQueueTimeMs,request=ConsumerMetadata,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=ControlledShutdown,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=Fetch,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=FetchConsumer,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=FetchFollower,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=Heartbeat,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=JoinGroup,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=LeaderAndIsr,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=Metadata,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=OffsetCommit,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=OffsetFetch,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=Offsets,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=Produce,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=StopReplica,type=RequestMetrics kafka.network:name=RequestQueueTimeMs,request=UpdateMetadata,type=RequestMetrics Thanks, Zakee On Feb 26, 2015, at 9:10 AM, Jun Rao j...@confluent.io wrote: That may be enough. What's the RequestQueueSize and RequestQueueTimeMs? Thanks, Jun On Wed, Feb 25, 2015 at 10:24 PM, Zakee kzak...@netzero.net wrote: Well currently I have configured 14 thread for both io and network. Do you think we should consider more? Thanks -Zakee On Wed, Feb 25, 2015 at 6:22 PM, Jun Rao j...@confluent.io wrote: Then you may want to consider increasing num.io.threads and num.network.threads. Thanks, Jun On Tue, Feb 24, 2015 at 7:48 PM, Zakee kzak...@netzero.net wrote: Similar pattern for that too. Mostly hovering below. -Zakee On Tue, Feb 24, 2015 at 2:43 PM, Jun Rao j...@confluent.io wrote: What about RequestHandlerAvgIdlePercent? Thanks, Jun On Mon, Feb 23, 2015 at 8:47 PM, Zakee kzak...@netzero.net wrote: Hi Jun, With ~100G of data being pushed per hour across 35 topics (replication-factor 3), the NetworkProcessorAvgIdlePercent is mostly showing below 0.5 sometimes when the producers send on a high rate. Thanks -Zakee On Sun, Feb 22, 2015 at 10:29 PM, Jun Rao j...@confluent.io wrote: What kind of load do you have on the brokers? On an idle cluster (just fetch requests from the follower replicas), I saw NetworkProcessorAvgIdlePercent at about 97%. Thanks, Jun On Thu, Feb 19, 2015 at 5:19 PM, Zakee kzak...@netzero.net wrote: Jun, I am already using the latest release 0.8.2.1. -Zakee On Thu, Feb 19, 2015 at 2:46 PM, Jun Rao j...@confluent.io wrote: Could you try the 0.8.2.1 release being voted on now? It fixes a CPU issue and should reduce the CPU load in network thread. Thanks, Jun On Thu, Feb 19, 2015 at 11:54 AM, Zakee kzak...@netzero.net wrote: Kafka documentation recommends 0.3 for above metric. I assume processor is busier if this goes below 0.3 and obviously it being 0.3 for long does not seem to be a good sign. What should be our criteria to raise an alert, I though it should be when its value goes below 0.3. However, the value seems to be below 0.3 a lot of the times, almost always if we take samples every five mins. What should be the threshold to raise an alarm ? What would be the impact of having this below 0.3 or even zero like most of the times? -Zakee How Old Men Tighten Skin 63 Year Old Man Shares DIY Skin Tightening Method You Can Do From Home http://thirdpartyoffers.netzero.net/TGL3231/54e63f5bda4c23f5b6560st02vuc 8% Annuity Return Secret Earn Guaranteed Income for Life! Compare Rates Today. http://thirdpartyoffers.netzero.net/TGL3255/54e6782bcbe78782b37bdmp15duc High School Yearbooks View Class Yearbooks Online Free. Reminisce Buy a Reprint Today!
Re: broker restart problems
The log seems to suggest that broker 1 is offline. Is broker 1 registered properly in ZK? You can find this out by reading the broker registration path ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper) from ZK. Thanks, Jun On Thu, Feb 26, 2015 at 10:31 PM, ZhuGe t...@outlook.com wrote: Sorry, i found the controller log. it shows some log like this: [2015-02-16 11:10:03,094] DEBUG [Controller 0]: Removing replica 1 from ISR 2,0 for partition [wx_rtdc_PageViewData_bolt1,4]. (kafka.controller.KafkaController)[2015-02-16 11:10:03,096] WARN [Controller 0]: Cannot remove replica 1 from ISR of partition [wx_rtdc_PageViewData_bolt1,4] since it is not in the ISR. Leader = 2 ; ISR = List(2, 0) (kafka.controller.KafkaController)[2015-02-16 11:10:03,096] DEBUG [Controller 0]: Removing replica 1 from ISR 0,1,2 for partition [wx_rtdc_DownLoadData_bolt2,9]. (kafka.controller.KafkaController)[2015-02-16 11:10:03,099] INFO [Controller 0]: New leader and ISR for partition [wx_rtdc_DownLoadData_bolt2,9] is {leader:0,leader_epoch:1,isr:[0,2]} (kafka.controller.KafkaController)[2015-02-16 11:10:03,100] DEBUG [Controller 0]: Removing replica 1 from ISR 2,0,1 for partition [wx_rtdc_EventTrackData_bolt5,2]. (kafka.controller.KafkaController)[2015-02-16 11:10:03,103] INFO [Controller 0]: New leader and ISR for partition [wx_rtdc_EventTrackData_bolt5,2] is {leader:2,leader_epoch:1,isr:[2,0]} (kafka.controller.KafkaController) And after that, there are a lot WARN message like:[2015-02-16 11:10:03,220] WARN [Channel manager on controller 0]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 188; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 36; Partitions: [wx_rtdc_DownLoadData_bolt1,8] to broker 1, since it is offline. (kafka.controller.ControllerChannelManager)[2015-02-16 11:10:03,221] WARN [Channel manager on controller 0]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 188; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 36; Partitions: [wx_rtdc_StartStat_bolt3,0] to broker 1, since it is offline. (kafka.controller.ControllerChannelManager)[2015-02-16 11:10:03,221] WARN [Channel manager on controller 0]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 188; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 36; Partitions: [wx_rtdc_PageViewData_bolt3,5] to broker 1, since it is offline. (kafka.controller.ControllerChannelManager) And when the broker 1 stat up, it log the message: [2015-02-16 11:11:04,960] INFO [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children 2,1,0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)[2015-02-16 11:11:04,969] INFO [BrokerChangeListener on Controller 0]: Newly added brokers: 1, deleted brokers: , all live brokers: 2,1,0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)[2015-02-16 11:11:04,969] DEBUG [Channel manager on controller 0]: Controller 0 trying to connect to broker 1 (kafka.controller.ControllerChannelManager)[2015-02-16 11:11:04,970] INFO [Controller-0-to-broker-1-send-thread], Controller 0 connected to id:1,host:kslave2,port:9092 for sending state change requests (kafka.controller.RequestSendThread)[2015-02-16 11:11:04,970] INFO [Controller 0]: New broker startup callback for 1 (kafka.controller.KafkaController)[2015-02-16 11:11:04,970] INFO [Controller-0-to-broker-1-send-thread], Starting (kafka.controller.RequestSendThread)[2015-02-16 11:11:04,978] ERROR [BrokerChangeListener on Controller 0]: Error while handling broker changes (kafka.controller.ReplicaStateMachine$BrokerChangeListener)java.util.NoSuchElementException: key not found: [cjy_kafkawebservice_AirTicketHbaseData_New,0]at scala.collection.MapLike$class.default(MapLike.scala:225)at scala.collection.mutable.HashMap.default(HashMap.scala:45)at scala.collection.MapLike$class.apply(MapLike.scala:135)at scala.collection.mutable.HashMap.apply(HashMap.scala:45)at kafka.controller.ControllerBrokerRequestBatch.updateMetadataRequestMapFor$1(ControllerChannelManager.scala:242) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at
Re: Repeated failures due to ConsumerRebalanceFailedException
Can you paste the error log for each rebalance try? You may search for keyword ³exception during rebalance². On 2/26/15, 7:41 PM, Ashwin Jayaprakash ashwin.jayaprak...@gmail.com wrote: Just give you some more debugging context, we noticed that the consumers path becomes empty after all the JVMs have exited because of this error. So, when we restart, there are no visible entries in ZK. On Thu, Feb 26, 2015 at 6:04 PM, Ashwin Jayaprakash ashwin.jayaprak...@gmail.com wrote: Hello, we have a set of JVMs that consume messages from Kafka topics. Each JVM creates 4 ConsumerConnectors that are used by 4 separate threads. These JVMs also create and use the CuratorFramework's Path children cache to watch and keep a sub-tree of the ZooKeeper in sync with other JVMs. This path has several thousand children elements. Everything was working perfectly until one fine day we decided to restart these JVMs. We restart these JVMs to roll in new code every few weeks or so. We never had any problems until suddenly the Kafka consumers on these JVMs were unable to rebalance partitions among themselves. We have bounced these JVMs before with no issues. The exception: Caused by: kafka.common.ConsumerRebalanceFailedException: group1-system01-27422-kafka-787 can't rebalance after 12 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedReba lance(ZookeeperConsumerConnector.scala:432) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsume rConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(Z ookeeperConsumerConnector.scala:756) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(Zo okeeperConsumerConnector.scala:145) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByF ilter(ZookeeperConsumerConnector.scala:96) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByF ilter(ZookeeperConsumerConnector.scala:100) We then set rebalance.max.retries=16 and rebalance.backoff.ms=1. I've seen the Spark-Kafka issue https://issues.apache.org/jira/browse/SPARK-5505 and Jun's recommendation to increase the backoff property. We must've tried restarting these JVMs about 20 times now both with and without the rebalance.xx properties. Every time it is the same issue. Except for the first time we applied the rebalance.backoff.ms=1 property when all 4 JVMs started! We thought that solved everything and then we tried restarting it just to make sure and then we were back to square one. If we have only 1 thread create 1 ConsumerConnector instead of 4 it works. This way we can have any number of JVMs running 1 ConsumerConnector and they all behave well and rebalance partitions. It is only when we try to start multiple ConsumerConnectors on the same JVM does this problem occur. I'd like to remind you that 4 ConsumerConnectors was working for several months. The ZK sub-tree for our non-Kafka part of the code was small when we started. Does anybody have any thoughts on this? What could be causing this issue? Could there be a Curator/ZK client conflict with the High level Kafka consumer? Or is the number of nodes that we have on ZK from our code causing problems with partition assignment in the Kafka code? Because the Curator framework keeps syncing data in the background while the Kafka code is creating ConsumerConnectors and rebalancing topics. Thanks, Ashwin Jayaprakash.
Re: Kafka High Level Consumer
Great news. Thanks a lot Joe On Wed, Feb 25, 2015 at 11:46 AM, Joseph Lawson jlaw...@roomkey.com wrote: Doh that was probably my bad Pranay! A misinterpretation of some old consumer code. btw, jruby-kafka is now at 1.1.1 with proper support for deleting the offset, setting the auto_offset_reset and whitelist/blacklist topics. It's packed up in a nice gem file that includes all Kafka and log4j pre-requisites too. It's pretty feature complete for Kafka 0.8.1.1. Hurray! Thanks to everyone that submitted PRs to make it better. -Joe Lawson From: Pranay Agarwal agarwalpran...@gmail.com Sent: Wednesday, February 25, 2015 1:45 AM To: users@kafka.apache.org Subject: Re: Kafka High Level Consumer Thanks Jun. It seems it was an issue with jruby client I was using. Now, they fixed it. -Pranay On Mon, Feb 23, 2015 at 4:57 PM, Jun Rao j...@confluent.io wrote: Did you enable auto offset commit? Thanks, Jun On Tue, Feb 17, 2015 at 4:22 PM, Pranay Agarwal agarwalpran...@gmail.com wrote: Hi, I am trying to read kafka consumer using high level kafka Consumer API. I had to restart the consumers for some reason but I kept the same group id. It seems the consumers have started consuming from the beginning (0 offset) instead from the point they had already consumed. What am I doing wrong here? How do I make sure the consumer start only from the point they had left before? Thanks -Pranay
Re: zookeeper connection failure
Do you mean you were not able to connect to zookeeper after retry? We see this error in the log from time to time, but the zkClient will retry and usually it will succeed. Can you verify if you were finally be able to connect or not? Jiangjie (Becket) Qin On 2/27/15, 12:53 AM, Victor L vlyamt...@gmail.com wrote: I seem to unable to connect single instance of Kafka server to single instance of local zookeeper server on local host (Ubuntu 14.04LTS/ open jdk 1.7.0_75): [2015-02-27 08:10:11,467] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5513425f (org.apache.zookeeper.ZooKeeper) [2015-02-27 08:10:11,502] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2015-02-27 08:10:11,506] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146) One thread describes ipv6 as possible cause for this problem: https://gist.github.com/akiatoji/f1a5ca319325788cf287 so, i disabled ipv6, but no luck. It must be the common problem, Is there some FAQ i've missed? How can i troubleshoot it?
Re: Kafka metrics: high level vs simple consumer
Bok Stevo, Simple as well, if I'm not mistaken. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Fri, Feb 27, 2015 at 2:35 AM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Kafka community, In Kafka 0.8.1.1, are Kafka metrics updated/tracked/marked by simple consumer implementation or only by high level one? Kind regards, Stevo Slavic.
Re: what groupID does camus use?
Thanks ! On Tue, Feb 24, 2015 at 8:23 PM, Gwen Shapira gshap...@cloudera.com wrote: Camus uses the simple consumer, which doesn't have the concept of consumer group in the API (i.e. Camus is responsible for allocating threads to partitions on its own). The client-id is hard coded and is hadoop-etl in some places (when it initializes the offsets) and camus in other places. The name camus is defined in camus.properties in kafka.client.name, so you can override it. Gwen On Tue, Feb 24, 2015 at 12:26 PM, Yang tedd...@gmail.com wrote: I looked at the example camus.properties, it does not specify this. how could we set the groupId ? thanks Yang
Re: zookeeper connection failure
I eventually figured it out: my zkClient was running on vm/guest os with zookeeper on host and vm/host port mapping was broken... On Fri, Feb 27, 2015 at 1:17 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Do you mean you were not able to connect to zookeeper after retry? We see this error in the log from time to time, but the zkClient will retry and usually it will succeed. Can you verify if you were finally be able to connect or not? Jiangjie (Becket) Qin On 2/27/15, 12:53 AM, Victor L vlyamt...@gmail.com wrote: I seem to unable to connect single instance of Kafka server to single instance of local zookeeper server on local host (Ubuntu 14.04LTS/ open jdk 1.7.0_75): [2015-02-27 08:10:11,467] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5513425f (org.apache.zookeeper.ZooKeeper) [2015-02-27 08:10:11,502] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2015-02-27 08:10:11,506] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146) One thread describes ipv6 as possible cause for this problem: https://gist.github.com/akiatoji/f1a5ca319325788cf287 so, i disabled ipv6, but no luck. It must be the common problem, Is there some FAQ i've missed? How can i troubleshoot it?
zookeeper connection failure
I seem to unable to connect single instance of Kafka server to single instance of local zookeeper server on local host (Ubuntu 14.04LTS/ open jdk 1.7.0_75): [2015-02-27 08:10:11,467] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5513425f (org.apache.zookeeper.ZooKeeper) [2015-02-27 08:10:11,502] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2015-02-27 08:10:11,506] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146) One thread describes ipv6 as possible cause for this problem: https://gist.github.com/akiatoji/f1a5ca319325788cf287 so, i disabled ipv6, but no luck. It must be the common problem, Is there some FAQ i've missed? How can i troubleshoot it?
Re: How Query Topic For Metadata
You might want ZkUtils.getPartitionsForTopic. But beware that it's an internal method that could potentially change or disappear in the future. If you're just looking for protocol-level solutions, the metadata API has a request that will return info about the number of partitions: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI On Thu, Feb 26, 2015 at 10:10 PM, Alex Melville amelvi...@g.hmc.edu wrote: I am writing a custom producer that needs to know information about the topic it's about to produce to. In particular it needs to know the number of partitions on the topic. Is there some utility method that returns such data? I am using scala v2.9.2 and kafka v8.2.0. Alex -- Thanks, Ewen
Unlimited Log Retention
Does anyone know how to achieve unlimited log retention either globally or on a per topic basis? I tried explicitly setting the log.retention.bytes to -1 but the default time policy kicked in after 7 days and cleaned up the messages. Thanks! Warren
Question on ISR inclusion leader election for failed replica on catchup
Hi Gang, I am testing some of the durability guarantees given by Kafka 8.2.1 which involve min in-sync replicas and disabling unclean leader election. My question is: *When will the failed replica after successfully coming up will be included back in ISR? Is this governed by replica.lag.max.messages property or will it have to completely catch up with the leader to be back in ISR?* Alternately, In more detail, Will we loose a committed write in the following theoretical setup: - Single topic - 3 Kafka Brokers K1, K2, K3 - Replication : 3 - Minimum In-Sync Replica : 2 - Acks : -1 - Compression : Gzip - Producer type : Async - Batch size : 16000 - replica.lag.max.messages : 4000 There are 3 batches of data to be sent. Producer will retry if the batch of data fails on error callback. Batch 1 : Leader : K1 ; ISR : K1, K2, K3 Result: Data committed Batch 2 : Leader : K1 ; ISR : K1, K2 ( K3 crashed) Result: Data committed Batch 3 : Leader : K1 ; ISR : K1 (K2 crashed) Result: Data uncommitted due to min in-sync replica violation. K3 wakes up, Starts catching up with current leader. It doesn't have batch 2 data. At this point, broker K1 crashes and K3 has about 2K messages less than K1. Will K3 be elected the leader at this point as it's within 4K messages to be in ISR? If true, this probably will lead to committed data loss despite disabling the unclean leader election, if I am not wrong here? Thanks, Puneet Mehta
cross-colo writing/reading?
we tested our new application that reads and writes to kafka. at first we found the access latency is very high. then we realized that it's because the client and server are in different colos. moving them together reduces down the access time to 4 ms. I was wondering if there are any techniques/properties to set , or maybe enhancements to be made to the kafka code, that could cater to such situations ? Thanks Yang
Camus reads from multiple offsets in parallel?
we have a single partition, and the topic contains 300k events. we fired off a camus job, it finished within 1 minute. this is rather fast. I was guess that the multiple mappers must be reading from multiple offsets in parallel, right? otherwise if they are reading in serial (like in a consumer group, which a previous thead has clarified to be not the case, i.e. camus does not use group api), it would be very slow. Thanks Yang
RE: broker restart problems
Thanks for the reply.I confirmed that broker 1 is registered in the zk. Date: Fri, 27 Feb 2015 09:27:52 -0800 Subject: Re: broker restart problems From: j...@confluent.io To: users@kafka.apache.org The log seems to suggest that broker 1 is offline. Is broker 1 registered properly in ZK? You can find this out by reading the broker registration path ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper) from ZK. Thanks, Jun On Thu, Feb 26, 2015 at 10:31 PM, ZhuGe t...@outlook.com wrote: Sorry, i found the controller log. it shows some log like this: [2015-02-16 11:10:03,094] DEBUG [Controller 0]: Removing replica 1 from ISR 2,0 for partition [wx_rtdc_PageViewData_bolt1,4]. (kafka.controller.KafkaController)[2015-02-16 11:10:03,096] WARN [Controller 0]: Cannot remove replica 1 from ISR of partition [wx_rtdc_PageViewData_bolt1,4] since it is not in the ISR. Leader = 2 ; ISR = List(2, 0) (kafka.controller.KafkaController)[2015-02-16 11:10:03,096] DEBUG [Controller 0]: Removing replica 1 from ISR 0,1,2 for partition [wx_rtdc_DownLoadData_bolt2,9]. (kafka.controller.KafkaController)[2015-02-16 11:10:03,099] INFO [Controller 0]: New leader and ISR for partition [wx_rtdc_DownLoadData_bolt2,9] is {leader:0,leader_epoch:1,isr:[0,2]} (kafka.controller.KafkaController)[2015-02-16 11:10:03,100] DEBUG [Controller 0]: Removing replica 1 from ISR 2,0,1 for partition [wx_rtdc_EventTrackData_bolt5,2]. (kafka.controller.KafkaController)[2015-02-16 11:10:03,103] INFO [Controller 0]: New leader and ISR for partition [wx_rtdc_EventTrackData_bolt5,2] is {leader:2,leader_epoch:1,isr:[2,0]} (kafka.controller.KafkaController) And after that, there are a lot WARN message like:[2015-02-16 11:10:03,220] WARN [Channel manager on controller 0]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 188; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 36; Partitions: [wx_rtdc_DownLoadData_bolt1,8] to broker 1, since it is offline. (kafka.controller.ControllerChannelManager)[2015-02-16 11:10:03,221] WARN [Channel manager on controller 0]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 188; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 36; Partitions: [wx_rtdc_StartStat_bolt3,0] to broker 1, since it is offline. (kafka.controller.ControllerChannelManager)[2015-02-16 11:10:03,221] WARN [Channel manager on controller 0]: Not sending request Name: StopReplicaRequest; Version: 0; CorrelationId: 188; ClientId: ; DeletePartitions: false; ControllerId: 0; ControllerEpoch: 36; Partitions: [wx_rtdc_PageViewData_bolt3,5] to broker 1, since it is offline. (kafka.controller.ControllerChannelManager) And when the broker 1 stat up, it log the message: [2015-02-16 11:11:04,960] INFO [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children 2,1,0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)[2015-02-16 11:11:04,969] INFO [BrokerChangeListener on Controller 0]: Newly added brokers: 1, deleted brokers: , all live brokers: 2,1,0 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)[2015-02-16 11:11:04,969] DEBUG [Channel manager on controller 0]: Controller 0 trying to connect to broker 1 (kafka.controller.ControllerChannelManager)[2015-02-16 11:11:04,970] INFO [Controller-0-to-broker-1-send-thread], Controller 0 connected to id:1,host:kslave2,port:9092 for sending state change requests (kafka.controller.RequestSendThread)[2015-02-16 11:11:04,970] INFO [Controller 0]: New broker startup callback for 1 (kafka.controller.KafkaController)[2015-02-16 11:11:04,970] INFO [Controller-0-to-broker-1-send-thread], Starting (kafka.controller.RequestSendThread)[2015-02-16 11:11:04,978] ERROR [BrokerChangeListener on Controller 0]: Error while handling broker changes (kafka.controller.ReplicaStateMachine$BrokerChangeListener)java.util.NoSuchElementException: key not found: [cjy_kafkawebservice_AirTicketHbaseData_New,0]at scala.collection.MapLike$class.default(MapLike.scala:225)at scala.collection.mutable.HashMap.default(HashMap.scala:45)at scala.collection.MapLike$class.apply(MapLike.scala:135)at scala.collection.mutable.HashMap.apply(HashMap.scala:45)at kafka.controller.ControllerBrokerRequestBatch.updateMetadataRequestMapFor$1(ControllerChannelManager.scala:242) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$1.apply(ControllerChannelManager.scala:268) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:99) at
Re: Repeated failures due to ConsumerRebalanceFailedException
Do you see zookeeper state changed (Expired) in your logs? On Fri, Feb 27, 2015 at 10:12 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Can you paste the error log for each rebalance try? You may search for keyword ³exception during rebalance². On 2/26/15, 7:41 PM, Ashwin Jayaprakash ashwin.jayaprak...@gmail.com wrote: Just give you some more debugging context, we noticed that the consumers path becomes empty after all the JVMs have exited because of this error. So, when we restart, there are no visible entries in ZK. On Thu, Feb 26, 2015 at 6:04 PM, Ashwin Jayaprakash ashwin.jayaprak...@gmail.com wrote: Hello, we have a set of JVMs that consume messages from Kafka topics. Each JVM creates 4 ConsumerConnectors that are used by 4 separate threads. These JVMs also create and use the CuratorFramework's Path children cache to watch and keep a sub-tree of the ZooKeeper in sync with other JVMs. This path has several thousand children elements. Everything was working perfectly until one fine day we decided to restart these JVMs. We restart these JVMs to roll in new code every few weeks or so. We never had any problems until suddenly the Kafka consumers on these JVMs were unable to rebalance partitions among themselves. We have bounced these JVMs before with no issues. The exception: Caused by: kafka.common.ConsumerRebalanceFailedException: group1-system01-27422-kafka-787 can't rebalance after 12 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedReba lance(ZookeeperConsumerConnector.scala:432) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsume rConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(Z ookeeperConsumerConnector.scala:756) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(Zo okeeperConsumerConnector.scala:145) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByF ilter(ZookeeperConsumerConnector.scala:96) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByF ilter(ZookeeperConsumerConnector.scala:100) We then set rebalance.max.retries=16 and rebalance.backoff.ms=1. I've seen the Spark-Kafka issue https://issues.apache.org/jira/browse/SPARK-5505 and Jun's recommendation to increase the backoff property. We must've tried restarting these JVMs about 20 times now both with and without the rebalance.xx properties. Every time it is the same issue. Except for the first time we applied the rebalance.backoff.ms=1 property when all 4 JVMs started! We thought that solved everything and then we tried restarting it just to make sure and then we were back to square one. If we have only 1 thread create 1 ConsumerConnector instead of 4 it works. This way we can have any number of JVMs running 1 ConsumerConnector and they all behave well and rebalance partitions. It is only when we try to start multiple ConsumerConnectors on the same JVM does this problem occur. I'd like to remind you that 4 ConsumerConnectors was working for several months. The ZK sub-tree for our non-Kafka part of the code was small when we started. Does anybody have any thoughts on this? What could be causing this issue? Could there be a Curator/ZK client conflict with the High level Kafka consumer? Or is the number of nodes that we have on ZK from our code causing problems with partition assignment in the Kafka code? Because the Curator framework keeps syncing data in the background while the Kafka code is creating ConsumerConnectors and rebalancing topics. Thanks, Ashwin Jayaprakash. -- -Regards, Mayuresh R. Gharat (862) 250-7125