[jira] [Commented] (KAFKA-1920) Add a metric to count client side errors in BrokerTopicMetrics
[ https://issues.apache.org/jira/browse/KAFKA-1920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310783#comment-14310783 ] Neha Narkhede commented on KAFKA-1920: -- [~aauradkar] What sort of client side errors do you have in mind? Add a metric to count client side errors in BrokerTopicMetrics -- Key: KAFKA-1920 URL: https://issues.apache.org/jira/browse/KAFKA-1920 Project: Kafka Issue Type: Improvement Reporter: Aditya Auradkar Assignee: Aditya Auradkar Currently the BrokerTopicMetrics count only failures across all topics and for individual topics. Should we consider adding a metric to count the number of client side errors? This essentially counts the number of bad requests per topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-620) UnknownHostError looking for a ZK node crashes the broker
[ https://issues.apache.org/jira/browse/KAFKA-620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310790#comment-14310790 ] Neha Narkhede commented on KAFKA-620: - [~guozhang] It does. Feel free to close this as a duplicate so we can track the problem as part of KAFKA-1082. UnknownHostError looking for a ZK node crashes the broker - Key: KAFKA-620 URL: https://issues.apache.org/jira/browse/KAFKA-620 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7.1 Environment: linux. Amazon's AMI Reporter: Matthew Rathbone If you totally kill a zookeeper node so that it's hostname no longer resolves to anything, the broker will die with a java.net.UnknownHostException. You will then be unable to restart the broker until the unknown host(s) is removed from the server.properties. We ran into this issue while testing our resilience to widespread AWS outages, if you can point me to the right place, I could have a go at fixing it? Unfortunately, I suspect the issue might be in the non-standard Zookeeper library that kafka uses. Here's the stack trace: org.I0Itec.zkclient.exception.ZkException: Unable to connect to [list of zookeepers] at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:66) at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84) at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) at kafka.log.LogManager.init(LogManager.scala:87) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:50) at kafka.Kafka.main(Kafka.scala) Caused by: java.net.UnknownHostException: zk-101 at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:850) at java.net.InetAddress.getAddressFromNameService(InetAddress.java:1201) at java.net.InetAddress.getAllByName0(InetAddress.java:1154) at java.net.InetAddress.getAllByName(InetAddress.java:1084) at java.net.InetAddress.getAllByName(InetAddress.java:1020) at org.apache.zookeeper.ClientCnxn.init(ClientCnxn.java:387) at org.apache.zookeeper.ClientCnxn.init(ClientCnxn.java:332) at org.apache.zookeeper.ZooKeeper.init(ZooKeeper.java:383) at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64) ... 9 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs
[ https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1809: Attachment: KAFKA-1809.v1.patch For some reason the latest patch generated by patch tool fails to apply on trunk (possibly due to multiple commits and rebases). Uploading a patch generated by git diff directly. Refactor brokers to allow listening on multiple ports and IPs -- Key: KAFKA-1809 URL: https://issues.apache.org/jira/browse/KAFKA-1809 Project: Kafka Issue Type: Sub-task Components: security Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch The goal is to eventually support different security mechanisms on different ports. Currently brokers are defined as host+port pair, and this definition exists throughout the code-base, therefore some refactoring is needed to support multiple ports for a single broker. The detailed design is here: https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (KAFKA-1903) Zk Expiration causes controller deadlock
[ https://issues.apache.org/jira/browse/KAFKA-1903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede closed KAFKA-1903. Zk Expiration causes controller deadlock Key: KAFKA-1903 URL: https://issues.apache.org/jira/browse/KAFKA-1903 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.1, 0.8.1.1 Environment: java version 1.7.0_55 Java(TM) SE Runtime Environment (build 1.7.0_55-b13) Java HotSpot(TM) 64-Bit Server VM (build 24.55-b03, mixed mode) kafka_2.9.2-0.8.1 Reporter: yufeng.chen Assignee: Neha Narkhede Priority: Critical when controller encounter a ZK expired, zookeeper node /broker/ids lost one kafkk controler. If there has three node, e.g. 1 2, 3; and the 1 start delete-topic-method thread. At this time, node 1 will lost. Why? The reason is that: when ZK expiration happened, the zk-event-thread will call KafkaController.SessionExpirationListener.handleNewSession method. if the zk-event-thread has the controllerContext.controllerLock, will call onControllerResignation-deleteTopicManager.shutdown()-deleteTopicsThread.shutdown(). And the delete-topic-thread is working, and await at awaitTopicDeletionNotification() method。 Zk-event-thread call deleteTopicsThread.shutdown() and wait until the run() method execute compelely. Because the zk-event-thread has the lock, deleteTopicsCond.await() whill not be really interruted . Then zk-event-thread whill pause, not execute the kafkaHealthcheck-SessionExpireListener.handleNewSession。 The controller will not register again. The jstack log : delete-topics-thread prio=10 tid=0x7fb0bc21b000 nid=0x2825 waiting on condition [0x7fb0f534a000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0xe4952da0 (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2047) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:334) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:333) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) ZkClient-EventThread-12-10.3.63.8:2181,10.3.63.9:2181 daemon prio=10 tid=0x7fb10038e800 nid=0x7d93 waiting on condition [0x7fb0f544a000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0xe4f4a760 (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236) at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337) at
[jira] [Commented] (KAFKA-1908) Split brain
[ https://issues.apache.org/jira/browse/KAFKA-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310901#comment-14310901 ] Neha Narkhede commented on KAFKA-1908: -- Thanks for sharing this test case! bq. A consumer can read data from replica-1 or replica-2. When it reads from replica-1 it resets the offsets and than can read duplicates from replica-2. When the consumer wants to consume, it first issues a metadata request asking one of the brokers who the leader for partition 0 is. In your test, only brokers 2 and 3 can serve that metadata request and will end up telling the consumer to consume from broker 2 since it is the new leader. I'm not sure I understood how the consumer ends up consuming from broker 1 when its port is disabled? Split brain --- Key: KAFKA-1908 URL: https://issues.apache.org/jira/browse/KAFKA-1908 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Alexey Ozeritskiy In some cases, there may be two leaders for one partition. Steps to reproduce: # We have 3 brokers, 1 partition with 3 replicas: {code} TopicAndPartition: [partition,0]Leader: 1 Replicas: [2,1,3] ISR: [1,2,3] {code} # controller works on broker 3 # let the kafka port be 9092. We execute on broker 1: {code} iptables -A INPUT -p tcp --dport 9092 -j REJECT {code} # Initiate replica election # As a result: Broker 1: {code} TopicAndPartition: [partition,0]Leader: 1 Replicas: [2,1,3] ISR: [1,2,3] {code} Broker 2: {code} TopicAndPartition: [partition,0]Leader: 2 Replicas: [2,1,3] ISR: [1,2,3] {code} # Flush the iptables rules on broker 1 Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not receive new data. A consumer can read data from replica-1 or replica-2. When it reads from replica-1 it resets the offsets and than can read duplicates from replica-2. We saw this situation in our production cluster when it had network problems. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1792) change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments
[ https://issues.apache.org/jira/browse/KAFKA-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310907#comment-14310907 ] Neha Narkhede commented on KAFKA-1792: -- [~Dmitry Pekar] The last discussion on the KIP suggested several improvements to the KIP itself. So I think we should maybe continue the discussion on the mailing list thread until we agree on what's in the scope or not. As I mentioned earlier, it is better to know how the overall tool will work eventually even if we split the work into multiple patches. Would you like to pick up the suggestions made and reply on the thread? change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments - Key: KAFKA-1792 URL: https://issues.apache.org/jira/browse/KAFKA-1792 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 Attachments: KAFKA-1792.patch, KAFKA-1792_2014-12-03_19:24:56.patch, KAFKA-1792_2014-12-08_13:42:43.patch, KAFKA-1792_2014-12-19_16:48:12.patch, KAFKA-1792_2015-01-14_12:54:52.patch, KAFKA-1792_2015-01-27_19:09:27.patch, generate_alg_tests.txt, rebalance_use_cases.txt Current implementation produces fair replica distribution between specified list of brokers. Unfortunately, it doesn't take into account current replica assignment. So if we have, for instance, 3 brokers id=[0..2] and are going to add fourth broker id=3, generate will create an assignment config which will redistribute replicas fairly across brokers [0..3] in the same way as those partitions were created from scratch. It will not take into consideration current replica assignment and accordingly will not try to minimize number of replica moves between brokers. As proposed by [~charmalloc] this should be improved. New output of improved --generate algorithm should suite following requirements: - fairness of replica distribution - every broker will have R or R+1 replicas assigned; - minimum of reassignments - number of replica moves between brokers will be minimal; Example. Consider following replica distribution per brokers [0..3] (we just added brokers 2 and 3): - broker - 0, 1, 2, 3 - replicas - 7, 6, 0, 0 The new algorithm will produce following assignment: - broker - 0, 1, 2, 3 - replicas - 4, 3, 3, 3 - moves - -3, -3, +3, +3 It will be fair and number of moves will be 6, which is minimal for specified initial distribution. The scope of this issue is: - design an algorithm matching the above requirements; - implement this algorithm and unit tests; - test it manually using different initial assignments; -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310912#comment-14310912 ] Jay Kreps commented on KAFKA-1646: -- Hey guys if this forces full recovery the impact on startup time will be considerable if you have a large number of partitions. Say you have 2000 partitions per machine and a 1GB log segment file size. On average these files will have about 500MB per partition when a restart occurs. The result is running recovery on 2000 * 500MB = 1TB of data. This will take about 5.5 hours at 50MB/sec. [~qixia] not sure how the above reasoning compares to your test? I think this would be a blocker issue, no? Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646_20141216_163008.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1831) Producer does not provide any information about which host the data was sent to
[ https://issues.apache.org/jira/browse/KAFKA-1831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310915#comment-14310915 ] Neha Narkhede commented on KAFKA-1831: -- [~markap14] The producer has a client.id and the server logs which produce request it received from which client.id in the request log. This should suffice to get you the information you need. Or am I missing something? Producer does not provide any information about which host the data was sent to --- Key: KAFKA-1831 URL: https://issues.apache.org/jira/browse/KAFKA-1831 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.1.1 Reporter: Mark Payne Assignee: Jun Rao For traceability purposes and for troubleshooting, when sending data to Kafka, the Producer should provide information about which host the data was sent to. This works well already in the SimpleConsumer, which provides host() and port() methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerOffsetChecker
[ https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310918#comment-14310918 ] Neha Narkhede commented on KAFKA-313: - [~singhashish] We are moving to a centralized ConsumerCommand in KAFKA-1476 that will replace all other consumer admin tools. Can you review to see if the tool has the options you are looking for? Going forward, I think we may want to improve that and phase out the current tools. Add JSON/CSV output and looping options to ConsumerOffsetChecker Key: KAFKA-313 URL: https://issues.apache.org/jira/browse/KAFKA-313 Project: Kafka Issue Type: Improvement Reporter: Dave DeMaagd Assignee: Ashish Kumar Singh Priority: Minor Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, KAFKA-313.patch Adds: * '--loop N' - causes the program to loop forever, sleeping for up to N seconds between loops (loop time minus collection time, unless that's less than 0, at which point it will just run again immediately) * '--asjson' - display as a JSON string instead of the more human readable output format. Neither of the above depend on each other (you can loop in the human readable output, or do a single shot execution with JSON output). Existing behavior/output maintained if neither of the above are used. Diff Attached. Impacted files: core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1805: - Reviewer: Guozhang Wang [~guozhang] Since you started looking at the patch, would you like to shepherd this patch to check in? Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1865) Investigate adding a flush() call to new producer API
[ https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1865: - Assignee: Jay Kreps Status: Patch Available (was: Open) Investigate adding a flush() call to new producer API - Key: KAFKA-1865 URL: https://issues.apache.org/jira/browse/KAFKA-1865 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1865.patch The postconditions of this would be that any record enqueued prior to flush() would have completed being sent (either successfully or not). An open question is whether you can continue sending new records while this call is executing (on other threads). We should only do this if it doesn't add inefficiencies for people who don't use it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1865) Investigate adding a flush() call to new producer API
[ https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310927#comment-14310927 ] Jay Kreps commented on KAFKA-1865: -- Created reviewboard https://reviews.apache.org/r/30763/diff/ against branch trunk Investigate adding a flush() call to new producer API - Key: KAFKA-1865 URL: https://issues.apache.org/jira/browse/KAFKA-1865 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Attachments: KAFKA-1865.patch The postconditions of this would be that any record enqueued prior to flush() would have completed being sent (either successfully or not). An open question is whether you can continue sending new records while this call is executing (on other threads). We should only do this if it doesn't add inefficiencies for people who don't use it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1865) Add a flush() call to the new producer API
[ https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1865: - Summary: Add a flush() call to the new producer API (was: Investigate adding a flush() call to new producer API) Add a flush() call to the new producer API -- Key: KAFKA-1865 URL: https://issues.apache.org/jira/browse/KAFKA-1865 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-1865.patch The postconditions of this would be that any record enqueued prior to flush() would have completed being sent (either successfully or not). An open question is whether you can continue sending new records while this call is executing (on other threads). We should only do this if it doesn't add inefficiencies for people who don't use it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1865) Investigate adding a flush() call to new producer API
[ https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1865: - Attachment: KAFKA-1865.patch Investigate adding a flush() call to new producer API - Key: KAFKA-1865 URL: https://issues.apache.org/jira/browse/KAFKA-1865 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Attachments: KAFKA-1865.patch The postconditions of this would be that any record enqueued prior to flush() would have completed being sent (either successfully or not). An open question is whether you can continue sending new records while this call is executing (on other threads). We should only do this if it doesn't add inefficiencies for people who don't use it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1447) Controlled shutdown deadlock when trying to send state updates
[ https://issues.apache.org/jira/browse/KAFKA-1447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310948#comment-14310948 ] Jay Kreps commented on KAFKA-1447: -- Does this problem still exist? Controlled shutdown deadlock when trying to send state updates -- Key: KAFKA-1447 URL: https://issues.apache.org/jira/browse/KAFKA-1447 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.0 Reporter: Sam Meder Priority: Critical Labels: newbie++ We're seeing controlled shutdown indefinitely stuck on trying to send out state change messages to the other brokers: [2014-05-03 04:01:30,580] INFO [Socket Server on Broker 4], Shutdown completed (kafka.network.SocketServer) [2014-05-03 04:01:30,581] INFO [Kafka Request Handler on Broker 4], shutting down (kafka.server.KafkaRequestHandlerPool) and stuck on: kafka-request-handler-12 daemon prio=10 tid=0x7f1f04a66800 nid=0x6e79 waiting on condition [0x7f1ad5767000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) parking to wait for 0x00078e91dc20 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) locked 0x00078e91dc38 (a java.lang.Object) at kafka.controller.KafkaController.sendRequest(KafkaController.scala:655) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:298) at kafkler.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:290) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:97) at kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:269) at kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:253) at scala.Option.foreach(Option.scala:197) at kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply$mcV$sp(KafkaController.scala:253) at kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253) at kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:252) at kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:249) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275) at kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:249) locked 0x00078b495af0 (a java.lang.Object) at kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:264) at kafka.server.KafkaApis.handle(KafkaApis.scala:192) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:722) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310958#comment-14310958 ] Jay Kreps commented on KAFKA-1718: -- [~junrao], [~guozhang] is this still ongoing? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1745. -- Resolution: Won't Fix Seems to be a problem in Java. Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared. - Key: KAFKA-1745 URL: https://issues.apache.org/jira/browse/KAFKA-1745 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: Mac OS Mavericks Reporter: Vishal Priority: Critical Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established from the producer system to the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1657) Fetch request using Simple consumer fails due to failed due to Leader not local for partition
[ https://issues.apache.org/jira/browse/KAFKA-1657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1657. -- Resolution: Won't Fix I think the ultimate solution here is to move to the new consumer which should fix all this. Fetch request using Simple consumer fails due to failed due to Leader not local for partition - Key: KAFKA-1657 URL: https://issues.apache.org/jira/browse/KAFKA-1657 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: aarti gupta I have a three node Kafka cluster, running on the same physical machine, (on different ports) with replication factor = 3, and a single topic with 3 partitions. Multiple producers write to the topic, and a custom partitioner is used to direct messages to a given partition. I use the simple consumer to read from a given partition of the topic, and have three instances of my consumer running The code snippet for the simple consumer suggests, that having any node in the cluster, (not necessarily the leader for that partition) is sufficient to find the leader for the partition, however, on running this, I find, that given a different node in the cluster, a null pointer exception is thrown, and the logs show the error [2014-09-28 20:40:20,984] WARN [KafkaApi-1] Fetch request with correlation id 0 from client testClient on partition [VCCTask,1] failed due to Leader not local for partition [VCCTask,1] on broker 1 (kafka.server.KafkaApis) bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic VCCTask Topic:VCCTask PartitionCount:3ReplicationFactor:3 Configs: Topic: VCCTask Partition: 0Leader: 1 Replicas: 2,3,1 Isr: 1,2,3 Topic: VCCTask Partition: 1Leader: 1 Replicas: 3,1,2 Isr: 1,2,3 Topic: VCCTask Partition: 2Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 If i specify the leader for the partition, instead of any node in the cluster, everything works great, but this is an operational nightmare. I was able to reproduce this using a simple test, where a producer writes numbers from 1 to 99, and the consumers, consume from a specific partition. Here are the code snippets public class TestConsumerStoreOffsetZookeeper { public static void main(String[] args) throws JSONException { TestConsumerStoreOffsetZookeeper testConsumer = new TestConsumerStoreOffsetZookeeper(); JSONObject jsonObject = new JSONObject(); jsonObject.put(topicName, VCCTask); jsonObject.put(clientName, testClient); jsonObject.put(partition, args[0]); jsonObject.put(hostPort, 172.16.78.171); jsonObject.put(znodeName, VCCTask); jsonObject.put(port, args[1]); testConsumer.initialize(jsonObject); final long startTime = System.currentTimeMillis(); testConsumer.startReceiving(new FutureCallbackbyte[]() { int noOfMessagesConsumed= 0; @Override public void onSuccess(byte[] result) { LOG.info(YES!! + ByteBuffer.wrap(result).getLong()); ++noOfMessagesConsumed; LOG.info(# Messages consumed + noOfMessagesConsumed + Time elapsed+ (System.currentTimeMillis()-startTime )/1000 + seconds); } @Override public void onFailure(Throwable t) { LOG.info(NO!! + t.fillInStackTrace().getMessage()); } }); } private String topicToRead; private static Logger LOG = Logger.getLogger(TestConsumerStoreOffsetZookeeper); ListString seedBrokers = Lists.newArrayList(localhost); private int port; private SimpleConsumer consumer; Integer partition; String clientName; private Broker currentLeader; private String counter; CuratorFramework zooKeeper; public void startReceiving(final FutureCallbackbyte[] futureCallback) { findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead, partition); LOG.info(Kafka consumer delegate listening on topic + topicToRead + and partition + partition); int numErrors = 0; while (true) { long latestOffset = 0; Stat stat = null; final String path = / + topicToRead + /+partition; try { //Read top of the stat = zooKeeper.checkExists().forPath(path); if (stat == null) { latestOffset = getLastOffsetFromBeginningOfStream(this.consumer, topicToRead, partition, OffsetRequest.EarliestTime(), clientName); byte b[] = new byte[8]; ByteBuffer byteBuffer = ByteBuffer.wrap(b);
[jira] [Commented] (KAFKA-1758) corrupt recovery file prevents startup
[ https://issues.apache.org/jira/browse/KAFKA-1758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310991#comment-14310991 ] Jay Kreps commented on KAFKA-1758: -- This is actually not a very difficult change--in LogManager.loadLogs we would need to basically handle an error in reading the recovery checkpoint, log it, and then just start a full recovery. corrupt recovery file prevents startup -- Key: KAFKA-1758 URL: https://issues.apache.org/jira/browse/KAFKA-1758 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Hi, We recently had a kafka node go down suddenly. When it came back up, it apparently had a corrupt recovery file, and refused to startup: {code} 2014-11-06 08:17:19,299 WARN [main] server.KafkaServer - Error starting up KafkaServer java.lang.NumberFormatException: For input string: ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@ ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@ at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:481) at java.lang.Integer.parseInt(Integer.java:527) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) {code} And the app is under a monitor (so it was repeatedly restarting and failing with this error for several minutes before we got to it)… We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and it then restarted cleanly (but of course re-synced all it’s data from replicas, so we had no data loss). Anyway, I’m wondering if that’s the expected behavior? Or should it not declare it corrupt and then proceed automatically to an unclean restart? Should this NumberFormatException be handled a bit more gracefully? We saved the corrupt file if it’s worth inspecting (although I doubt it will be useful!)…. The corrupt files appeared to be all zeroes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-402) Adding handling various kind of exception support at server side
[ https://issues.apache.org/jira/browse/KAFKA-402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-402. - Resolution: Won't Fix Adding handling various kind of exception support at server side Key: KAFKA-402 URL: https://issues.apache.org/jira/browse/KAFKA-402 Project: Kafka Issue Type: Task Affects Versions: 0.8.0 Reporter: Yang Ye Assignee: Yang Ye Currently at server side handlers, some exceptions are not caught and handled. There're chances where servers may be brought down due to some uncaught exception. We need to catch and handle them at the server. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-410) after been killed, one broker fail to re-join the cluster after restarted
[ https://issues.apache.org/jira/browse/KAFKA-410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-410. - Resolution: Cannot Reproduce after been killed, one broker fail to re-join the cluster after restarted - Key: KAFKA-410 URL: https://issues.apache.org/jira/browse/KAFKA-410 Project: Kafka Issue Type: Bug Components: core Environment: the version of kafka we use is trunk 2a59ad76c6 Reporter: mmliu In my kafka cluster ,there are 2 brokers, One of them was killed by accident yesterday, after that ,I tried to restart the broker,but seems that it failed to join the cluster. In zookeeper,it successfully register itself,but after sending logs, no data show up in kafka data directory This is the log of the broker after restart it: [2012-07-21 17:34:27,807] DEBUG preRegister called. Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, name=kafka:type=kafka.KafkaLog4j (root) [2012-07-21 17:34:27,808] DEBUG Adding AppenderMBean for appender named fileAppender (org.apache.log4j.jmx.LoggerDynamicMBean) [2012-07-21 17:34:27,810] DEBUG getMBeanInfo called. (org.apache.log4j.jmx.AppenderDynamicMBean) [2012-07-21 17:34:27,810] DEBUG preRegister called. Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, name=log4j:appender=fileAppender (org.apache.log4j.jmx.AppenderDynamicMBean) [2012-07-21 17:34:27,810] DEBUG Adding LayoutMBean:fileAppender,layout=org.apache.log4j.PatternLayout (org.apache.log4j.jmx.AppenderDynamicMBean) [2012-07-21 17:34:27,811] DEBUG getMBeanInfo called. (org.apache.log4j.jmx.LayoutDynamicMBean) [2012-07-21 17:34:27,811] DEBUG preRegister called. Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, name=log4j:appender=fileAppender,layout=org.apache.log4j.PatternLayout (org.apache.log4j.jmx.LayoutDynamicMBean) [2012-07-21 17:34:27,967] INFO The number of partitions for topic no_appkey : 1 (kafka.utils.Utils$) [2012-07-21 17:34:27,969] INFO The number of partitions for topic parse_exception : 1 (kafka.utils.Utils$) [2012-07-21 17:34:27,972] INFO Starting Kafka server... (kafka.server.KafkaServer) [2012-07-21 17:34:27,984] INFO starting log cleaner every 6 ms (kafka.log.LogManager) [2012-07-21 17:34:27,990] INFO connecting to ZK: XX.XX.XX.229:2181/kafka (kafka.server.KafkaZooKeeper) [2012-07-21 17:34:27,999] DEBUG Creating new ZookKeeper instance to connect to XX.XX.XX.229:2181/kafka. (org.I0Itec.zkclient.ZkConnection) [2012-07-21 17:34:27,999] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2012-07-21 17:34:28,006] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:host.name=mobile-1 (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:java.version=1.6.0_17 (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:java.vendor=Sun Microsystems Inc. (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:java.home=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:java.class.path=:/home/Our_Server/kafka/libs/jopt-simple-3.2.jar:/home/Our_Server/kafka/libs/log4j-1.2.15.jar:/home/Our_Server/kafka/libs/scala-compiler.jar:/home/Our_Server/kafka/libs/scala-library.jar:/home/Our_Server/kafka/libs/snappy-java-1.0.4.1.jar:/home/Our_Server/kafka/libs/zkclient-0.1.jar:/home/Our_Server/kafka/libs/zookeeper-3.3.4.jar:/home/Our_Server/kafka/kafka-trunk-2a59ad76c6_scala-2.8.0.jar (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:java.library.path=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/lib/amd64/server:/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/lib/amd64:/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:java.compiler=NA (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:os.version=2.6.32-71.29.1.el6.x86_64 (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:user.name=Our_Server (org.apache.zookeeper.ZooKeeper) [2012-07-21 17:34:28,006] INFO Client environment:user.home=/home/Our_Server
[jira] [Resolved] (KAFKA-398) Enhance SocketServer to Enable Sending Requests
[ https://issues.apache.org/jira/browse/KAFKA-398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-398. - Resolution: Won't Fix Enhance SocketServer to Enable Sending Requests --- Key: KAFKA-398 URL: https://issues.apache.org/jira/browse/KAFKA-398 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1 Reporter: Guozhang Wang Labels: features Attachments: kafka-398-0.7-v1.patch, kafka-398-0.7-v2.patch Currently the SocketServer is only used for reactively receiving requests and send responses but not used for pro-actively send requests and receive responses. Hence it does not need to remember which channel/key correspond to which consumer. On the other hand, there are cases such as consumer coordinator that needs SocketServer to send requests and receive responses to the consumers. It would be nice to add this functionality such that an API can be called with the id string and the request message, and the SocketServer will figure out which channel to use and write that message to the key's attachment and set the flag as WRITABLE so that the processor can then send it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-462) ZK thread crashing doesn't bring down the broker (and doesn't come back up).
[ https://issues.apache.org/jira/browse/KAFKA-462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-462. - Resolution: Won't Fix ZK thread crashing doesn't bring down the broker (and doesn't come back up). Key: KAFKA-462 URL: https://issues.apache.org/jira/browse/KAFKA-462 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7 Reporter: Matt Jones I think the simplest explanation is the traceback. The broker had been up starting at 2012-07-31 18:45:42,951 (based upon the 'Starting Kafka server' log entry), and the error was fixed with a restart of the broker at 2012-08-14 20:59:41,581. It looks like zookeeper thread crashed, but the broker kept operating as usual. The expected behavior would be that the zookeeper thread crashing would cause the whole broker to crash, or the zookeeper thread would start itself back up. [2012-08-08 01:25:13,398] 624270894 [main-SendThread(zookeeper001:2181)] INFO org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 8749ms for sessionid 0x138e4edc04c1e50, closing socket connection and attempting reconnect [2012-08-08 01:25:15,136] 624272632 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (Disconnected) [2012-08-08 01:25:15,702] 624273198 [main-SendThread(zookeeper001:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server zookeeper003/10.125.95.193:2181 [2012-08-08 01:25:15,704] 624273200 [main-SendThread(zookeeper003:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to zookeeper003/10.125.95.193:2181, initiating session [2012-08-08 01:25:15,709] 624273205 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (Expired) [2012-08-08 01:25:15,709] 624273205 [main-EventThread] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=zookeeper001:2181,zookeeper002:2181,zookeeper003:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@26d66426 [2012-08-08 01:25:21,514] 624279010 [main-SendThread(zookeeper003:2181)] INFO org.apache.zookeeper.ClientCnxn - Unable to reconnect to ZooKeeper service, session 0x138e4edc04c1e50 has expired, closing socket connection [2012-08-08 01:25:47,135] 624304631 [main-EventThread] ERROR org.apache.zookeeper.ClientCnxn - Error while calling watcher at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:530) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) Caused by: org.I0Itec.zkclient.exception.ZkException: Unable to connect to zookeeper001:2181,zookeeper002:2181,zookeeper003:2181 Caused by: java.net.UnknownHostException: zookeeper001 at org.apache.zookeeper.ClientCnxn.init(ClientCnxn.java:386) at org.apache.zookeeper.ClientCnxn.init(ClientCnxn.java:331) at org.apache.zookeeper.ZooKeeper.init(ZooKeeper.java:377) [2012-08-08 01:25:48,620] 624306116 [main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-478) Move start_consumer start_producer inside start_entity_in_background
[ https://issues.apache.org/jira/browse/KAFKA-478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-478. - Resolution: Won't Fix Move start_consumer start_producer inside start_entity_in_background Key: KAFKA-478 URL: https://issues.apache.org/jira/browse/KAFKA-478 Project: Kafka Issue Type: Sub-task Affects Versions: 0.8.0 Reporter: John Fung Assignee: John Fung Labels: replication-testing Fix For: 0.9.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-558) KafkaETLContext should use getTopicMetadata before sending offset requests
[ https://issues.apache.org/jira/browse/KAFKA-558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-558. - Resolution: Won't Fix KafkaETLContext should use getTopicMetadata before sending offset requests -- Key: KAFKA-558 URL: https://issues.apache.org/jira/browse/KAFKA-558 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0 Reporter: Joel Koshy Assignee: Joel Koshy Fix For: 0.9.0 Filing this or I may forget. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1790) Remote controlled shutdown was removed
[ https://issues.apache.org/jira/browse/KAFKA-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1790. -- Resolution: Won't Fix Remote controlled shutdown was removed -- Key: KAFKA-1790 URL: https://issues.apache.org/jira/browse/KAFKA-1790 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: James Oliver Assignee: James Oliver Fix For: 0.8.3 In core: kafka.admin.ShutdownBroker was removed, rendering remote controlled shutdowns impossible. A Kafka administrator needs to be able to perform a controlled shutdown without issuing a SIGTERM/SIGKILL. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1898) compatibility testing framework
[ https://issues.apache.org/jira/browse/KAFKA-1898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311039#comment-14311039 ] Jay Kreps commented on KAFKA-1898: -- [~charmalloc] Can you expand on this ticket, there is no description. compatibility testing framework Key: KAFKA-1898 URL: https://issues.apache.org/jira/browse/KAFKA-1898 Project: Kafka Issue Type: Bug Reporter: Joe Stein Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests
Jay Kreps created KAFKA-1927: Summary: Replace requests in kafka.api with requests in org.apache.kafka.common.requests Key: KAFKA-1927 URL: https://issues.apache.org/jira/browse/KAFKA-1927 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps The common package introduced a better way of defining requests using a new protocol definition DSL and also includes wrapper objects for these. We should switch KafkaApis over to use these request definitions and consider the scala classes deprecated (we probably need to retain some of them for a while for the scala clients). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1898) compatibility testing framework
[ https://issues.apache.org/jira/browse/KAFKA-1898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311098#comment-14311098 ] Joe Stein commented on KAFKA-1898: -- Updated the description. The part that the client library would be doing is consuming from the data it is given (so we need some data sets to use) and a set of test scenarios where in they need to produce the right data. The data sets and the analysis output would be what the client library is trying to get to from a result perspective. Another example/test would be supports compression ... in this case the data set would be compressed, they would have to read it, make some change (like hash the message) and re-compress that message hashed in another topic. The validation could run and see, yup my input of List(x) (which was compressed) gave me a result of hash(List(x)) in dest topic (and when analyzing you would uncompress to see the value to check one by one every message). It would be hard to cheat since new data could be generated each time. We also are looking to use this for some end to end latency testing. The goal behind that is to calculate an array of timestamps (based on a key/value map) so you can figure out some more internals and things about the system e.g. {code} {namespace: ly.stealth.kafka.metrics, type: record, name: Timings, fields: [ {name: id, type: long}, {name: timings, type: {type:array, items: long} } ] } {code} compatibility testing framework Key: KAFKA-1898 URL: https://issues.apache.org/jira/browse/KAFKA-1898 Project: Kafka Issue Type: Bug Reporter: Joe Stein Fix For: 0.8.3 Attachments: cctk.png There are a few different scenarios where you want/need to know the status/state of a client library that works with Kafka. Client library development is not just about supporting the wire protocol but also the implementations around specific interactions of the API. The API has blossomed into a robust set of producer, consumer, broker and administrative calls all of which have layers of logic above them. A Client Library may choose to deviate from the path the project sets out and that is ok. The goal of this ticket is to have a system for Kafka that can help to explain what the library is or isn't doing (regardless of what it claims). The idea behind this stems in being able to quickly/easily/succinctly analyze the topic message data. Once you can analyze the topic(s) message you can gather lots of information about what the client library is doing, is not doing and such. There are a few components to this. 1) dataset-generator Test Kafka dataset generation tool. Generates a random text file with given params: --filename, -f - output file name. --filesize, -s - desired size of output file. The actual size will always be a bit larger (with a maximum size of $filesize + $max.length - 1) --min.length, -l - minimum generated entry length. --max.length, -h - maximum generated entry length. Usage: ./gradlew build java -jar dataset-generator/build/libs/dataset-generator-*.jar -s 10 -l 2 -h 20 2) dataset-producer Test Kafka dataset producer tool. Able to produce the given dataset to Kafka or Syslog server. The idea here is you already have lots of data sets that you want to test different things for. You might have different sized messages, formats, etc and want a repeatable benchmark to run and re-run the testing on. You could just have a days worth of data and just choose to replay it. The CCTK idea is that you are always starting from CONSUME in your state of library. If your library is only producing then you will fail a bunch of tests and that might be ok for people. Accepts following params: {code} --filename, -f - input file name. --kafka, -k - Kafka broker address in host:port format. If this parameter is set, --producer.config and --topic must be set too (otherwise they're ignored). --producer.config, -p - Kafka producer properties file location. --topic, -t - Kafka topic to produce to. --syslog, -s - Syslog server address. Format: protocol://host:port (tcp://0.0.0.0:5140 or udp://0.0.0.0:5141 for example) --loop, -l - flag to loop through file until shut off manually. False by default. Usage: ./gradlew build java -jar dataset-producer/build/libs/dataset-producer-*.jar --filename dataset --syslog tcp://0.0.0.0:5140 --loop true {code} 3) extract This step is good so you can save data and compare tests. It could also be removed if folks are just looking for a real live test (and we could support that too). Here we are taking data out of Kafka and putting it into Cassandra (but other data stores can be used too and we should come up with
[jira] [Resolved] (KAFKA-1290) TestLogCleaning tool hangs on the new producer
[ https://issues.apache.org/jira/browse/KAFKA-1290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1290. -- Resolution: Fixed Seems to have fixed itself. TestLogCleaning tool hangs on the new producer -- Key: KAFKA-1290 URL: https://issues.apache.org/jira/browse/KAFKA-1290 Project: Kafka Issue Type: Sub-task Components: producer Affects Versions: 0.9.0 Reporter: Neha Narkhede Priority: Blocker KAFKA-1281 (yet to be checked in) converted existing tools to use the new producer. I found that TestLogCleaning hangs while sending messages using the new producer. Following is a thread dump and steps to reproduce the issue. nnarkhed-mn1:kafka-git-idea nnarkhed$ ./bin/kafka-run-class.sh kafka.TestLogCleaning --broker localhost:9092 --topics 1 --zk localhost:2181 --messages 10 Producing 10 messages... Logging produce requests to /var/folders/61/bspy8z8n1t5dn5sdqzsnhbdr000383/T/kafka-log-cleaner-produced-3744326506335955516.txt 2014-03-04 10:51:35 Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.65-b04-462 mixed mode): kafka-network-thread daemon prio=5 tid=7fc27e94c000 nid=0x10a643000 runnable [10a642000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:136) at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:69) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) - locked 7ec0b0170 (a sun.nio.ch.Util$2) - locked 7ec0b0180 (a java.util.Collections$UnmodifiableSet) - locked 7ec0b0128 (a sun.nio.ch.KQueueSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) at org.apache.kafka.common.network.Selector.select(Selector.java:296) at org.apache.kafka.common.network.Selector.poll(Selector.java:198) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:153) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:98) at java.lang.Thread.run(Thread.java:695) RMI TCP Accept-0 daemon prio=5 tid=7fc27e99c800 nid=0x10a43d000 runnable [10a43c000] java.lang.Thread.State: RUNNABLE at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:439) - locked 7ec0b6088 (a java.net.SocksSocketImpl) at java.net.ServerSocket.implAccept(ServerSocket.java:468) at java.net.ServerSocket.accept(ServerSocket.java:436) at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:34) at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369) at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341) at java.lang.Thread.run(Thread.java:695) AWT-AppKit daemon prio=5 tid=7fc27f83 nid=0x7fff7b984180 runnable [] java.lang.Thread.State: RUNNABLE Low Memory Detector daemon prio=5 tid=7fc27e8db000 nid=0x109b3 runnable [] java.lang.Thread.State: RUNNABLE C2 CompilerThread1 daemon prio=9 tid=7fc27e8da800 nid=0x109a2d000 waiting on condition [] java.lang.Thread.State: RUNNABLE C2 CompilerThread0 daemon prio=9 tid=7fc27e8d9800 nid=0x10992a000 waiting on condition [] java.lang.Thread.State: RUNNABLE Signal Dispatcher daemon prio=9 tid=7fc27e8d9000 nid=0x109827000 waiting on condition [] java.lang.Thread.State: RUNNABLE Surrogate Locker Thread (Concurrent GC) daemon prio=5 tid=7fc27e002000 nid=0x109724000 waiting on condition [] java.lang.Thread.State: RUNNABLE Finalizer daemon prio=8 tid=7fc27e8d8000 nid=0x109519000 in Object.wait() [109518000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 7ec0b23b0 (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) - locked 7ec0b23b0 (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:171) Reference Handler daemon prio=10 tid=7fc27e8d7800 nid=0x109416000 in Object.wait() [109415000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 7ec0b4000 (a java.lang.ref.Reference$Lock) at java.lang.Object.wait(Object.java:485) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) - locked 7ec0b4000 (a java.lang.ref.Reference$Lock) main prio=5 tid=7fc27e000800 nid=0x102159000 in Object.wait() [102158000] java.lang.Thread.State:
[jira] [Updated] (KAFKA-1404) Close unused log file
[ https://issues.apache.org/jira/browse/KAFKA-1404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1404: - Priority: Major (was: Critical) Close unused log file - Key: KAFKA-1404 URL: https://issues.apache.org/jira/browse/KAFKA-1404 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1 Reporter: Xinyao Hu Original Estimate: 336h Remaining Estimate: 336h This is somewhat related to KAFKA-1403. One way to hack KAFKA-1403 is to roll a new file in a short period of time. However, this will result in many file descriptors open. Take our application for example, each server hosts about 5k topic-partition, if we roll a new file per hour, we will add ~100k file descriptors per day (I checked only .log is open but not .index which might be pinned in memory). We will run out of 1M file descriptor in about a week. However our disk can host much longer. In reality very few of these file descriptors will be used. The most recent fd will be used to append data and the old file descriptor will be used for query. We should provide a parameter like max.num.fds and do LRU to decide which fds should be open. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-674) Clean Shutdown Testing - Log segments checksums mismatch
[ https://issues.apache.org/jira/browse/KAFKA-674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-674. - Resolution: Invalid Clean Shutdown Testing - Log segments checksums mismatch Key: KAFKA-674 URL: https://issues.apache.org/jira/browse/KAFKA-674 Project: Kafka Issue Type: Bug Reporter: John Fung Priority: Critical Labels: replication-testing Attachments: kafka-674-reproduce-issue-v2.patch, kafka-674-reproduce-issue.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-723) Scala's default case class toString() is very inefficient
[ https://issues.apache.org/jira/browse/KAFKA-723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310965#comment-14310965 ] Jay Kreps commented on KAFKA-723: - [~nehanarkhede] I think we did this, no? Scala's default case class toString() is very inefficient - Key: KAFKA-723 URL: https://issues.apache.org/jira/browse/KAFKA-723 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.0 Reporter: Neha Narkhede Priority: Critical Request logging is in the critical path of processing requests and we use Scala's default toString() API to log the requests. We should override the toString() in these case classes and log only what is useful. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1758) corrupt recovery file prevents startup
[ https://issues.apache.org/jira/browse/KAFKA-1758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310991#comment-14310991 ] Jay Kreps edited comment on KAFKA-1758 at 2/7/15 11:13 PM: --- This is actually not a very difficult change--in LogManager.loadLogs we would need to basically handle an error in reading the recovery checkpoint, log it, and then just treat it as though our recovery point was 0 (or something like that) for all logs. was (Author: jkreps): This is actually not a very difficult change--in LogManager.loadLogs we would need to basically handle an error in reading the recovery checkpoint, log it, and then just start a full recovery. corrupt recovery file prevents startup -- Key: KAFKA-1758 URL: https://issues.apache.org/jira/browse/KAFKA-1758 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Hi, We recently had a kafka node go down suddenly. When it came back up, it apparently had a corrupt recovery file, and refused to startup: {code} 2014-11-06 08:17:19,299 WARN [main] server.KafkaServer - Error starting up KafkaServer java.lang.NumberFormatException: For input string: ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@ ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@ at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:481) at java.lang.Integer.parseInt(Integer.java:527) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) {code} And the app is under a monitor (so it was repeatedly restarting and failing with this error for several minutes before we got to it)… We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and it then restarted cleanly (but of course re-synced all it’s data from replicas, so we had no data loss). Anyway, I’m wondering if that’s the expected behavior? Or should it not declare it corrupt and then proceed automatically to an unclean restart? Should this NumberFormatException be handled a bit more gracefully? We saved the corrupt file if it’s worth inspecting (although I doubt it will be useful!)…. The corrupt files appeared to be all zeroes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1072) Allow mulitple topics selected with a TopicFilter to be balanced among consumers
[ https://issues.apache.org/jira/browse/KAFKA-1072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1072. -- Resolution: Fixed I think this is solved with the partition.assignment.strategy option in 0.8.2 Allow mulitple topics selected with a TopicFilter to be balanced among consumers Key: KAFKA-1072 URL: https://issues.apache.org/jira/browse/KAFKA-1072 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0 Reporter: Jason Rosenberg Fix For: 0.9.0 Currently, there is no parallelism used when consuming a set of topics selected by a white list topic filter, if those topics all have a partition count of 1. Currently, all topics that match the filter get assigned to the same thread on the same consumer, even though there may be plenty of different topics (and therefore many partitions to be consumed from). There are often good reasons to use a partition count of only 1 (e.g. to preserve message ordering). For arbitrary scalability, over a large number of topics, this would be a great benefit to be able to consume topics balanced over a set of available consumers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1044) change log4j to slf4j
[ https://issues.apache.org/jira/browse/KAFKA-1044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1044: - Assignee: (was: Jay Kreps) change log4j to slf4j -- Key: KAFKA-1044 URL: https://issues.apache.org/jira/browse/KAFKA-1044 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.0 Reporter: sjk Fix For: 0.9.0 can u chanage the log4j to slf4j, in my project, i use logback, it's conflict with log4j. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-839) Race condition between flushInterval and flushSchedulerInterval
[ https://issues.apache.org/jira/browse/KAFKA-839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-839. - Resolution: Won't Fix Race condition between flushInterval and flushSchedulerInterval --- Key: KAFKA-839 URL: https://issues.apache.org/jira/browse/KAFKA-839 Project: Kafka Issue Type: Bug Affects Versions: 0.7 Reporter: Jason Rosenberg It looks like there is a race condition between the settings for the 2 properties: log.default.flush.scheduler.interval.ms log.default.flush.interval.ms. I'm using 0.7.2. By default, both of these get set to 3000ms (and in the docs, it recommends setting flushInterval to be a multiple of the flushSchedulerInterval). However, the code in LogManager.flushAllLogs (which is scheduled to run at a fixed rate using the flushSchedulerInterval property) looks like this: val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime var logFlushInterval = config.defaultFlushIntervalMs if(timeSinceLastFlush = logFlushInterval) log.flush So, it will only flush logs if the the time since the last flush is longer than the flush interval. But, the log.lastFlushedTime is not set until after flushing is completed (which can incur some io time). Thus, by enabling TRACE logging for this method, I was able to see that with the defaults, timeSinceLastFlush was usually about 2998 (which is less than the logFlushInterval of 3000). Thus, setting a flushInterval the same as the scheduler.flushInterval essentially devolves to an effective flushInterval = 2X the schedulerFlushInterval. So, setting a flushIinterval slightly less than the flushSchedulerInterval (e.g. 2500) will guarantee that the flush will happen on each scheduler invocation. I'm guessing that it might make sense to change the logic gating the flush to something like: if(timeSinceLastFlush = 0.90 * logFlushInterval) might be reasonable. Also, the scheduler probably ought to use a 'fixedDelay' rather than a 'fixedRate' schedule. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-841) Merge the partition and replica state machines into a single unified state machine in the controller
[ https://issues.apache.org/jira/browse/KAFKA-841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311021#comment-14311021 ] Jay Kreps commented on KAFKA-841: - [~nehanarkhede] is this still a consideration? Merge the partition and replica state machines into a single unified state machine in the controller Key: KAFKA-841 URL: https://issues.apache.org/jira/browse/KAFKA-841 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1 Reporter: Neha Narkhede Assignee: Neha Narkhede After using the controller for a while as part of 0.8, I think it might be worth looking into merging the separate state machines into a single unified one. The reason is most events end up invoking state transitions on both partitions and replicas. Initially, the thought of separating the two was to handle cases which only touch one state machine (for example, changing the replication factor or changing the number of partitions online). However, these features also would end up touching both state machines. The complexity comes from the correct ordering that one has to ensure between the operations on both these state machines (KAFKA-831). Due to this, some state transitions are unable to batch RPCs and/or zookeeper writes/reads since they need to happen in each of the state machines separately. Since this will significantly change controller code, I'm filing this for 0.8.1 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-182) Set a TCP connection timeout for the SimpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-182. - Resolution: Fixed Set a TCP connection timeout for the SimpleConsumer --- Key: KAFKA-182 URL: https://issues.apache.org/jira/browse/KAFKA-182 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Currently we use SocketChannel.open which I *think* can block for a long time. We should make this configurable, and we may have to create the socket in a different way to enable this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-874) Migration tool's system tests should use separate server.properties files for 0.7 and 0.8 brokers
[ https://issues.apache.org/jira/browse/KAFKA-874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-874. - Resolution: Won't Fix Migration tool's system tests should use separate server.properties files for 0.7 and 0.8 brokers - Key: KAFKA-874 URL: https://issues.apache.org/jira/browse/KAFKA-874 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0 Reporter: Swapnil Ghike Assignee: John Fung Labels: bugs Currently the migration tool's system test suite is using 0.8 broker's server.properties as default. When the test brings up an 0.7 broker, it gets the appropriate configs like brokerid via addedCSVConfigs in system_test/utils/kafka_system_test_utils.py. I suspect that for 0.7 broker, the zk.connectiontimeout.ms will also not be overridden. We should rather use two separate server.properties files for 0.7 and 0.8 brokers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-875) System test cleanup
[ https://issues.apache.org/jira/browse/KAFKA-875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-875. - Resolution: Won't Fix System test cleanup --- Key: KAFKA-875 URL: https://issues.apache.org/jira/browse/KAFKA-875 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0 Reporter: Swapnil Ghike Assignee: John Fung Labels: cleanup There is a lot of unused stuff lying in system_test. If we have converted all the bash tests to python, we should delete the unused code. Also there is commented code at a few places that should be deleted if it is not going to be used. For instance, search for brokerLogCksumDict in system_test/utils/kafka_system_test_utils.py. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-266) Kafka web console
[ https://issues.apache.org/jira/browse/KAFKA-266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-266. - Resolution: Won't Fix Kafka web console - Key: KAFKA-266 URL: https://issues.apache.org/jira/browse/KAFKA-266 Project: Kafka Issue Type: New Feature Components: contrib Reporter: Evan Chan Labels: project Original Estimate: 672h Remaining Estimate: 672h This issue is created to track a community-contributed Kafka Web UI. Here is an initial list of goals: - Be able to easily see which brokers are up - Be able to see lists of topics, connected producers, consumer groups, connected consumers - Be able to see, for each consumer/partition, its offset, and more importantly, # of bytes unconsumed (== largest offset for partition - current offset) - (Wish list) have a graphical view of the offsets - (Wish list) be able to clean up consumer state, such as stale claimed partitions List of challenges/questions: - Which framework? Play! for Scala? - Is all the data available from JMX and ZK? Hopefully, watching the files on the filesystem can be avoided - How to handle large numbers of topics, partitions, consumers, etc. efficiently -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests
[ https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1927: - Description: The common package introduced a better way of defining requests using a new protocol definition DSL and also includes wrapper objects for these. We should switch KafkaApis over to use these request definitions and consider the scala classes deprecated (we probably need to retain some of them for a while for the scala clients). This will be a big improvement because 1. We will have each request now defined in only one place (Protocol.java) 2. We will have built-in support for multi-version requests 3. We will have much better error messages (no more cryptic underflow errors) was: The common package introduced a better way of defining requests using a new protocol definition DSL and also includes wrapper objects for these. We should switch KafkaApis over to use these request definitions and consider the scala classes deprecated (we probably need to retain some of them for a while for the scala clients). Replace requests in kafka.api with requests in org.apache.kafka.common.requests --- Key: KAFKA-1927 URL: https://issues.apache.org/jira/browse/KAFKA-1927 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps The common package introduced a better way of defining requests using a new protocol definition DSL and also includes wrapper objects for these. We should switch KafkaApis over to use these request definitions and consider the scala classes deprecated (we probably need to retain some of them for a while for the scala clients). This will be a big improvement because 1. We will have each request now defined in only one place (Protocol.java) 2. We will have built-in support for multi-version requests 3. We will have much better error messages (no more cryptic underflow errors) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-936) Kafka Metrics Memory Leak
[ https://issues.apache.org/jira/browse/KAFKA-936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311139#comment-14311139 ] Manikumar Reddy commented on KAFKA-936: --- After KAFKA-1481, I am observing one issue. We are not removing the newly added Version/AppInfo Mbean on old producer/consumer close call. There is no other memory leak. Will submit a patch for this. Kafka Metrics Memory Leak -- Key: KAFKA-936 URL: https://issues.apache.org/jira/browse/KAFKA-936 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: centos linux, jdk 1.6, jboss Reporter: Senthil Chittibabu Assignee: Neha Narkhede Priority: Critical I am using kafka_2.8.0-0.8.0-SNAPSHOT version. I am running into OutOfMemoryError in PermGen Space. I have set the -XX:MaxPermSize=512m, but I still get the same error. I used profiler to trace the memory leak, and found the following kafka classes to be the cause for the memory leak. Please let me know if you need any additional information to debug this issue. kafka.server.FetcherLagMetrics kafka.consumer.FetchRequestAndResponseMetrics kafka.consumer.FetchRequestAndResponseStats kafka.metrics.KafkaTimer kafka.utils.Pool -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1794) Make config and config defaults accessible to clients
[ https://issues.apache.org/jira/browse/KAFKA-1794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311142#comment-14311142 ] Guozhang Wang commented on KAFKA-1794: -- Makes sense. Make config and config defaults accessible to clients - Key: KAFKA-1794 URL: https://issues.apache.org/jira/browse/KAFKA-1794 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Navina Ramesh In the new Kafka producer API, the ProducerConfig is not accessible to the clients. Samza uses the ProducerConfig instance to access the defaults property values, which can then be used in the various helper utils. Config instance is accessible even without instantiating a Kafka producer. With the new API, there is no way to instantiate a ProducerConfig as the constructor is marked private. Also, it does not make the default config values accessible to the client without actually instantiating a KafkaProducer. Changes suggested: 1. Make the ProducerConfig constructor public 2. Make ConfigDef in ProducerConfig accessible by the client 3. Use public static variables for kafka config default values -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1930) Move server over to new metrics library
[ https://issues.apache.org/jira/browse/KAFKA-1930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311151#comment-14311151 ] Manikumar Reddy commented on KAFKA-1930: Are we supporting self-documentation in new metrics library? Also, Current Kafka community is using various metrics reporting formats. In new metrics, currently we only support metrics report via JMX. It is required to support more reporting formats, to get community acceptance. (1) Support metrics report via Ganglia (2) Support metrics report via Graphite (3) Support metrics report via CSV (4) Support metrics report via STDOUT If ok, I will create sub-tasks. Move server over to new metrics library --- Key: KAFKA-1930 URL: https://issues.apache.org/jira/browse/KAFKA-1930 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps We are using org.apache.kafka.common.metrics on the clients, but using Coda Hale metrics on the server. We should move the server over to the new metrics package as well. This will help to make all our metrics self-documenting. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1314) recurring errors
[ https://issues.apache.org/jira/browse/KAFKA-1314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1314. -- Resolution: Won't Fix Doesn't seem to be under active investigation. recurring errors Key: KAFKA-1314 URL: https://issues.apache.org/jira/browse/KAFKA-1314 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0 Reporter: alex kamil Priority: Critical we're getting hundreds of these errs with kafka 0.8 and topics become unavailable after running for a few days kafka error 1 (had to recreate the topic as it became unavailable after a few days) [2014-03-18 16:34:56,403] ERROR [KafkaApi-3] Error while fetchingmetadata for partition [mytopic,0] (kafka.server.KafkaApis) kafka.common.LeaderNotAvailableException: Leader not available for partition [mytopic,0] kafka error 2 [2014-03-17 12:23:27,536] ERROR Closing socket for /kafka consumer ip address because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 768 kafka error 3 ERROR Closing socket for /kafka consumer ip address because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer kafka error 4 ERROR Closing socket for /kafka broker ip address because of error (kafka.network.Processor) zookeeper error 2014-03-18 16:40:02,794 [myid:3] - WARN [QuorumPeer[myid=3]/0.0.0.0:2181:QuorumCnxManager@368] - Cannot open channel to 1 at election address /kafka broker ip address:3888 java.net.ConnectException: Connection refused -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-902) Randomize backoff on the clients for metadata requests
[ https://issues.apache.org/jira/browse/KAFKA-902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-902: Labels: newbie (was: ) Randomize backoff on the clients for metadata requests -- Key: KAFKA-902 URL: https://issues.apache.org/jira/browse/KAFKA-902 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.0 Reporter: Neha Narkhede Assignee: Neha Narkhede Priority: Critical Labels: newbie If a Kafka broker dies and there are a large number of clients talking to the Kafka cluster, each of the clients can end up shooting metadata requests at around the same time. It is better to randomize the backoff on the clients so the metadata requests are more evenly spread out -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1403) Adding timestamp to kafka index structure
[ https://issues.apache.org/jira/browse/KAFKA-1403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1403. -- Resolution: Won't Fix Adding timestamp to kafka index structure - Key: KAFKA-1403 URL: https://issues.apache.org/jira/browse/KAFKA-1403 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1 Reporter: Xinyao Hu Original Estimate: 336h Remaining Estimate: 336h Right now, kafka doesn't have timestamp per message. It makes an assumption that all the messages in the same file has the same timestamp which is the mtime of the file. This makes it inefficient to scan all the messages within a time window, which is a valid use case in a lot of realtime data analysis. One way to hack this is to roll a new file in a short period of time. However, this will result in opening lots of files (KAFKA-1404) which crashed the servers eventually. My guess this is not implemented due to the efficiency reason. It will cost additional four bytes per message which might be pinned in memory for fast access. There might be some simple perf optimization, such as differential encoding + var length encoding, which should bring down the cost to 1-2 bytes avg per message. Let me know if this makes sense. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1403) Adding timestamp to kafka index structure
[ https://issues.apache.org/jira/browse/KAFKA-1403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310980#comment-14310980 ] Jay Kreps commented on KAFKA-1403: -- Ultimately in order to be accurate the time will actually need to be in the message itself. Currently we use the write time but this can be arbitrarily inaccurate: if you delete the data on a server and restart it it will rewrite everything with new timestamps. Adding timestamp to kafka index structure - Key: KAFKA-1403 URL: https://issues.apache.org/jira/browse/KAFKA-1403 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1 Reporter: Xinyao Hu Original Estimate: 336h Remaining Estimate: 336h Right now, kafka doesn't have timestamp per message. It makes an assumption that all the messages in the same file has the same timestamp which is the mtime of the file. This makes it inefficient to scan all the messages within a time window, which is a valid use case in a lot of realtime data analysis. One way to hack this is to roll a new file in a short period of time. However, this will result in opening lots of files (KAFKA-1404) which crashed the servers eventually. My guess this is not implemented due to the efficiency reason. It will cost additional four bytes per message which might be pinned in memory for fast access. There might be some simple perf optimization, such as differential encoding + var length encoding, which should bring down the cost to 1-2 bytes avg per message. Let me know if this makes sense. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-971) Handle synchronization in updatemetatdata in KafkaApi better
[ https://issues.apache.org/jira/browse/KAFKA-971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-971. - Resolution: Fixed I don't see this in KafkaApis any more. [~junrao] can you reopen if this is still a real issue. Handle synchronization in updatemetatdata in KafkaApi better Key: KAFKA-971 URL: https://issues.apache.org/jira/browse/KAFKA-971 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1 Reporter: Jun Rao It's better to wrapped all synchronization of metadata cache in a MetadataCache. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-909) Provide support in the producer to specify a stateful instance of an Encoder, like the consuemr allows with the Decoder
[ https://issues.apache.org/jira/browse/KAFKA-909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-909. - Resolution: Fixed Fix Version/s: 0.8.2 This is available in the new producer. Provide support in the producer to specify a stateful instance of an Encoder, like the consuemr allows with the Decoder --- Key: KAFKA-909 URL: https://issues.apache.org/jira/browse/KAFKA-909 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.0 Environment: Kafka 0.8, jdk 6 Reporter: Rob Withers Assignee: Jun Rao Labels: features Fix For: 0.8.2 It is very useful to have a shared instance of the Encoder, to do advanced encoding. The consumer allows the ability to set the Decoder instance and achieve this capability. The producer should offer the same capability. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-288) java impacted changes from new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-288. - Resolution: Won't Fix java impacted changes from new producer and consumer request format --- Key: KAFKA-288 URL: https://issues.apache.org/jira/browse/KAFKA-288 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.0 Reporter: Joe Stein Labels: replication, wireprotocol Fix For: 0.9.0 1) javaapi.SyncProducer: We should get rid of send(topic: String, messages: ByteBufferMessageSet) and only keep send(producerRequest: kafka.javaapi.ProducerRequest). This affects KafkaRecordWriter and DataGenerator 2) javaapi.ProducerRequest: We will need to define a java version of TopicData so that java producers can create request conveniently. The java version of TopicData will use the java version of ByteBufferMessageSet. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-293) Allow to configure all broker ids at once similar to how zookeeper handles server ids
[ https://issues.apache.org/jira/browse/KAFKA-293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-293. - Resolution: Won't Fix Allow to configure all broker ids at once similar to how zookeeper handles server ids - Key: KAFKA-293 URL: https://issues.apache.org/jira/browse/KAFKA-293 Project: Kafka Issue Type: Improvement Affects Versions: 0.7 Reporter: Thomas Dudziak Zookeeper allows to specify all server ids in the same configuration (https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_configuration) which has the benefit that the configuration file is the same for all zookeeper instances. A similar approach for Kafka would be quite useful, e.g. brokerid.1=host 1 brokerid.2=host 2 etc. It'd still require per-instance configuration (myid file in the zookeeper case) but that can be created separately (e.g. by the deployment tool used). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-287) Running a consumer client using scala 2.8 fails
[ https://issues.apache.org/jira/browse/KAFKA-287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-287. - Resolution: Fixed Running a consumer client using scala 2.8 fails --- Key: KAFKA-287 URL: https://issues.apache.org/jira/browse/KAFKA-287 Project: Kafka Issue Type: Bug Environment: Java 1.6, OS X Reporter: Elben Shira Built the kafka library using the instructions found in the README. My client uses scala 2.9.1, sbt 0.11. My consumer client has this snippet of code: https://gist.github.com/a35006cc25e39ba386e2 The client compiles, but running it produces this stacktrace: https://gist.github.com/efeb85f50402b477d6e0 I think this may be because of a bug found in scala 2.9.0 (though I'm not sure if it was present in scala 2.8.0): https://issues.scala-lang.org/browse/SI-4575 To get around this, I built the kafka library using scala 2.9.1 (by changing build.properties). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-280) change on-disk log layout to {log.dir}/topicname/partitionid
[ https://issues.apache.org/jira/browse/KAFKA-280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-280. - Resolution: Won't Fix change on-disk log layout to {log.dir}/topicname/partitionid Key: KAFKA-280 URL: https://issues.apache.org/jira/browse/KAFKA-280 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8.0 Reporter: Jun Rao Labels: replication Currently, the on-disk layout is {log.dir}/topicname-partitionid. The problem is that there is no appropriate place to store topicname level information such as topic version. An alternative layout is {log.dir}/topicname/partitionid. Then, we can store topic level meta data under {log.dir}/topicname. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-282) Currently iterated chunk is not cleared during consumer shutdown
[ https://issues.apache.org/jira/browse/KAFKA-282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-282. - Resolution: Fixed Currently iterated chunk is not cleared during consumer shutdown Key: KAFKA-282 URL: https://issues.apache.org/jira/browse/KAFKA-282 Project: Kafka Issue Type: Bug Components: core Reporter: Joel Koshy Assignee: Joel Koshy During consumer connector shutdown, fetch queues are cleared, but the currently iterated chunk is not cleared. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-289) reuse topicdata when sending producerrequest
[ https://issues.apache.org/jira/browse/KAFKA-289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-289. - Resolution: Won't Fix reuse topicdata when sending producerrequest Key: KAFKA-289 URL: https://issues.apache.org/jira/browse/KAFKA-289 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.0 Reporter: Joe Stein Labels: optimization, replication, wireprotocol Fix For: 0.9.0 The way that SyncProducer sends a ProducerRequest over socket is to first serialize the whole request in a bytebuffer and then sends the bytebuffer through socket. An alternative is to send the request like FetchReponse, using a ProduceRequestSend that reuses TopicDataSend. This avoids code duplication and is more efficient since it sends data in ByteBufferMessagesSet directly to socket and avoids extra copying from messageset to bytebuffer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-291) Add builder to create configs for consumer and broker
[ https://issues.apache.org/jira/browse/KAFKA-291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-291. - Resolution: Fixed Add builder to create configs for consumer and broker - Key: KAFKA-291 URL: https://issues.apache.org/jira/browse/KAFKA-291 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: John Wang Attachments: builderPatch.diff Creating Consumer and Producer can be cumbersome because you have to remember the exact string for the property to be set. And since these are just strings, IDEs cannot really help. This patch contains builders that help with this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1898) compatibility testing framework
[ https://issues.apache.org/jira/browse/KAFKA-1898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1898: - Description: There are a few different scenarios where you want/need to know the status/state of a client library that works with Kafka. Client library development is not just about supporting the wire protocol but also the implementations around specific interactions of the API. The API has blossomed into a robust set of producer, consumer, broker and administrative calls all of which have layers of logic above them. A Client Library may choose to deviate from the path the project sets out and that is ok. The goal of this ticket is to have a system for Kafka that can help to explain what the library is or isn't doing (regardless of what it claims). The idea behind this stems in being able to quickly/easily/succinctly analyze the topic message data. Once you can analyze the topic(s) message you can gather lots of information about what the client library is doing, is not doing and such. There are a few components to this. 1) dataset-generator Test Kafka dataset generation tool. Generates a random text file with given params: --filename, -f - output file name. --filesize, -s - desired size of output file. The actual size will always be a bit larger (with a maximum size of $filesize + $max.length - 1) --min.length, -l - minimum generated entry length. --max.length, -h - maximum generated entry length. Usage: ./gradlew build java -jar dataset-generator/build/libs/dataset-generator-*.jar -s 10 -l 2 -h 20 2) dataset-producer Test Kafka dataset producer tool. Able to produce the given dataset to Kafka or Syslog server. The idea here is you already have lots of data sets that you want to test different things for. You might have different sized messages, formats, etc and want a repeatable benchmark to run and re-run the testing on. You could just have a days worth of data and just choose to replay it. The CCTK idea is that you are always starting from CONSUME in your state of library. If your library is only producing then you will fail a bunch of tests and that might be ok for people. Accepts following params: {code} --filename, -f - input file name. --kafka, -k - Kafka broker address in host:port format. If this parameter is set, --producer.config and --topic must be set too (otherwise they're ignored). --producer.config, -p - Kafka producer properties file location. --topic, -t - Kafka topic to produce to. --syslog, -s - Syslog server address. Format: protocol://host:port (tcp://0.0.0.0:5140 or udp://0.0.0.0:5141 for example) --loop, -l - flag to loop through file until shut off manually. False by default. Usage: ./gradlew build java -jar dataset-producer/build/libs/dataset-producer-*.jar --filename dataset --syslog tcp://0.0.0.0:5140 --loop true {code} 3) extract This step is good so you can save data and compare tests. It could also be removed if folks are just looking for a real live test (and we could support that too). Here we are taking data out of Kafka and putting it into Cassandra (but other data stores can be used too and we should come up with a way to abstract this out completely so folks could implement whatever they wanted. {code} package ly.stealth.shaihulud.reader import java.util.UUID import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector import consumer.kafka.MessageAndMetadata import consumer.kafka.client.KafkaReceiver import org.apache.spark._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream.DStream object Main extends App with Logging { val parser = new scopt.OptionParser[ReaderConfiguration](spark-reader) { head(Spark Reader for Kafka client applications, 1.0) opt[String](testId) unbounded() optional() action { (x, c) = c.copy(testId = x) } text (Source topic with initial set of data) opt[String](source) unbounded() required() action { (x, c) = c.copy(sourceTopic = x) } text (Source topic with initial set of data) opt[String](destination) unbounded() required() action { (x, c) = c.copy(destinationTopic = x) } text (Destination topic with processed set of data) opt[Int](partitions) unbounded() optional() action { (x, c) = c.copy(partitions = x) } text (Partitions in topic) opt[String](zookeeper) unbounded() required() action { (x, c) = c.copy(zookeeper = x) } text (Zookeeper connection host:port) opt[Int](kafka.fetch.size) unbounded() optional() action { (x, c) = c.copy(kafkaFetchSize = x) } text (Maximum KBs to fetch from Kafka) checkConfig { c = if (c.testId.isEmpty || c.sourceTopic.isEmpty || c.destinationTopic.isEmpty || c.zookeeper.isEmpty) { failure(You haven't provided all required parameters) } else
[jira] [Commented] (KAFKA-1856) Add PreCommit Patch Testing
[ https://issues.apache.org/jira/browse/KAFKA-1856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311140#comment-14311140 ] Gwen Shapira commented on KAFKA-1856: - Looks good! [~joestein] or [~nehanarkhede] - please try it out on one of the patches you are reviewing. For example: python dev-utils/test-patch.py --defect KAFKA-1333 --output patch-process --run-tests --username user --password password Let us know what you think. Add PreCommit Patch Testing --- Key: KAFKA-1856 URL: https://issues.apache.org/jira/browse/KAFKA-1856 Project: Kafka Issue Type: Task Reporter: Ashish Kumar Singh Assignee: Ashish Kumar Singh Attachments: KAFKA-1856.patch, KAFKA-1856_2015-01-18_21:43:56.patch, KAFKA-1856_2015-02-04_14:57:05.patch, KAFKA-1856_2015-02-04_15:44:47.patch h1. Kafka PreCommit Patch Testing - *Don't wait for it to break* h2. Motivation *With great power comes great responsibility* - Uncle Ben. As Kafka user list is growing, mechanism to ensure quality of the product is required. Quality becomes hard to measure and maintain in an open source project, because of a wide community of contributors. Luckily, Kafka is not the first open source project and can benefit from learnings of prior projects. PreCommit tests are the tests that are run for each patch that gets attached to an open JIRA. Based on tests results, test execution framework, test bot, +1 or -1 the patch. Having PreCommit tests take the load off committers to look at or test each patch. h2. Tests in Kafka h3. Unit and Integraiton Tests [Unit and Integration tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Unit+and+Integration+Tests] are cardinal to help contributors to avoid breaking existing functionalities while adding new functionalities or fixing older ones. These tests, atleast the ones relevant to the changes, must be run by contributors before attaching a patch to a JIRA. h3. System Tests [System tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests] are much wider tests that, unlike unit tests, focus on end-to-end scenarios and not some specific method or class. h2. Apache PreCommit tests Apache provides a mechanism to automatically build a project and run a series of tests whenever a patch is uploaded to a JIRA. Based on test execution, the test framework will comment with a +1 or -1 on the JIRA. You can read more about the framework here: http://wiki.apache.org/general/PreCommitBuilds h2. Plan # Create a test-patch.py script (similar to the one used in Flume, Sqoop and other projects) that will take a jira as a parameter, apply on the appropriate branch, build the project, run tests and report results. This script should be committed into the Kafka code-base. To begin with, this will only run unit tests. We can add code sanity checks, system_tests, etc in the future. # Create a jenkins job for running the test (as described in http://wiki.apache.org/general/PreCommitBuilds) and validate that it works manually. This must be done by a committer with Jenkins access. # Ask someone with access to https://builds.apache.org/job/PreCommit-Admin/ to add Kafka to the list of projects PreCommit-Admin triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils
Jay Kreps created KAFKA-1926: Summary: Replace kafka.utils.Utils with o.a.k.common.utils.Utils Key: KAFKA-1926 URL: https://issues.apache.org/jira/browse/KAFKA-1926 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps There is currently a lot of duplication between the Utils class in common and the one in core. Our plan has been to deprecate duplicate code in the server and replace it with the new common code. As such we should evaluate each method in the scala Utils and do one of the following: 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose utility that is not Kafka-specific. If we migrate it we should really think about the API and make sure there is some test coverage. A few things in there are kind of funky and we shouldn't just blindly copy them over. 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold any utilities that really need to make use of Scala features to be convenient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-170) Support for non-blocking polling on multiple streams
[ https://issues.apache.org/jira/browse/KAFKA-170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-170. - Resolution: Duplicate This is being solved in the new consumer api. Support for non-blocking polling on multiple streams Key: KAFKA-170 URL: https://issues.apache.org/jira/browse/KAFKA-170 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8.0 Reporter: Jay Kreps Priority: Critical Labels: replication Currently we provide a blocking iterator in the consumer. This is a good mechanism for consuming data from a single topic, but is limited as a mechanism for polling multiple streams. For example if one wants to implement a non-blocking union across multiple streams this is hard to do because calls may block indefinitely. A similar situation arrises if trying to implement a streaming join of between two streams. I would propose two changes: 1. Implement a next(timeout) interface on KafkaMessageStream. This will easily handle some simple cases with minimal change. This handles certain limited cases nicely and is easy to implement, but doesn't actually cover the two cases above. 2. Add an interface to poll streams. I don't know the best approach for the later api, but it is important to get it right. One option would be to add a ConsumerConnector.drainTopics(topic1, topic2, ...) which blocks until there is at least one message and then returns a list of triples (topic, partition, message). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-724) Allow automatic socket.send.buffer from operating system
[ https://issues.apache.org/jira/browse/KAFKA-724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311008#comment-14311008 ] Jay Kreps commented on KAFKA-724: - This is fixed on the old scala clients, but not on the new producer and consumer. It would be good to add the -1 means use OS default setting. Allow automatic socket.send.buffer from operating system Key: KAFKA-724 URL: https://issues.apache.org/jira/browse/KAFKA-724 Project: Kafka Issue Type: Improvement Reporter: Pablo Barrera Labels: newbie To do this, don't call to socket().setXXXBufferSize. This can be controlled by the configuration parameter: if the value socket.send.buffer or others are set to -1, don't call to socket().setXXXBufferSize -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-724) Allow automatic socket.send.buffer from operating system
[ https://issues.apache.org/jira/browse/KAFKA-724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-724: Labels: newbie (was: ) Allow automatic socket.send.buffer from operating system Key: KAFKA-724 URL: https://issues.apache.org/jira/browse/KAFKA-724 Project: Kafka Issue Type: Improvement Reporter: Pablo Barrera Labels: newbie To do this, don't call to socket().setXXXBufferSize. This can be controlled by the configuration parameter: if the value socket.send.buffer or others are set to -1, don't call to socket().setXXXBufferSize -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-58) Combine fetch/multifetch and send/multisend
[ https://issues.apache.org/jira/browse/KAFKA-58?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-58. Resolution: Fixed (was: Unresolved) Combine fetch/multifetch and send/multisend --- Key: KAFKA-58 URL: https://issues.apache.org/jira/browse/KAFKA-58 Project: Kafka Issue Type: Improvement We should expose only the more general multisend and multi fetch. The overhead of these is not high, and it would be good to reduce the number of network APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-333) mirroring - enable non-random partitioning in embedded producer
[ https://issues.apache.org/jira/browse/KAFKA-333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-333. - Resolution: Invalid mirroring - enable non-random partitioning in embedded producer --- Key: KAFKA-333 URL: https://issues.apache.org/jira/browse/KAFKA-333 Project: Kafka Issue Type: New Feature Components: core Reporter: xiaoyu wang Currently the producer for mirroring uses the random partitioner. It will be very useful if we can specify partitioner there. One use case is when we aggregate ad server logs by mirroring the local kafka cluster on each ad server, we want to partition ad logs based on where they come from - the ad server id. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-57) Add an optional acknowledgement in the producer
[ https://issues.apache.org/jira/browse/KAFKA-57?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-57. Resolution: Fixed (was: Unresolved) Add an optional acknowledgement in the producer --- Key: KAFKA-57 URL: https://issues.apache.org/jira/browse/KAFKA-57 Project: Kafka Issue Type: New Feature Currently the producer will flush the socket buffer but does not wait for an answer. This is good for tracking-like use cases but bad for queue-like cases. We should add an optional acknowledgement to the protocol and have the producer await this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-331) recurrent produce errors
[ https://issues.apache.org/jira/browse/KAFKA-331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-331. - Resolution: Incomplete recurrent produce errors Key: KAFKA-331 URL: https://issues.apache.org/jira/browse/KAFKA-331 Project: Kafka Issue Type: Bug Reporter: Pierre-Yves Ritschard I am using trunk and regularily see such errors popping up: 32477890 [kafka-processor-7] ERROR kafka.server.KafkaRequestHandlers - Error processing ProduceRequest on pref^@^@^@:0 java.io.FileNotFoundException: /mnt/kafka/logs/pref^@^@^@-0/.kafka (Is a directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:233) at kafka.utils.Utils$.openChannel(Utils.scala:324) at kafka.message.FileMessageSet.init(FileMessageSet.scala:75) at kafka.log.Log.loadSegments(Log.scala:144) at kafka.log.Log.init(Log.scala:116) at kafka.log.LogManager.createLog(LogManager.scala:149) at kafka.log.LogManager.getOrCreateLog(LogManager.scala:204) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:679) 32477890 [kafka-processor-7] ERROR kafka.network.Processor - Closing socket for /xx.xx.xx.xx because of error java.io.FileNotFoundException: /mnt/kafka/logs/pref^@^@^@-0/.kafka (Is a directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:233) at kafka.utils.Utils$.openChannel(Utils.scala:324) at kafka.message.FileMessageSet.init(FileMessageSet.scala:75) at kafka.log.Log.loadSegments(Log.scala:144) at kafka.log.Log.init(Log.scala:116) at kafka.log.LogManager.createLog(LogManager.scala:149) at kafka.log.LogManager.getOrCreateLog(LogManager.scala:204) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:679) This results in a pref directory created inside the log dir. The original topic should be prefix, somehow a NUL gets inserted there. The producing was done with a kafka.javaapi.producer.Producer instance, on which send was called with a kafka.javaapi.producer.ProducerData instance. There are no log entries created inside that dir and no impact on the overall operation of the broker operations and consumers. Is the producer thread-safe ? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-157) Message may be delivered to incorrect partition incase of semantic partitioning
[ https://issues.apache.org/jira/browse/KAFKA-157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-157. - Resolution: Fixed Message may be delivered to incorrect partition incase of semantic partitioning --- Key: KAFKA-157 URL: https://issues.apache.org/jira/browse/KAFKA-157 Project: Kafka Issue Type: Bug Reporter: Sharad Agarwal Incase the broker hosting the partition is down, messages are currently repartitioned with the number of available brokers. This may lead to void the partitioning contract. http://bit.ly/oEz2fT -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-5) Continuous log flusher
[ https://issues.apache.org/jira/browse/KAFKA-5?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-5. --- Resolution: Invalid (was: Unresolved) Continuous log flusher -- Key: KAFKA-5 URL: https://issues.apache.org/jira/browse/KAFKA-5 Project: Kafka Issue Type: Improvement Today, a kafka log is flushed based on either the number of messages or a certain amount of time elapsed. Setting those numbers properly may not be easy. A better way is do have a background thread that keeps flushing dirty logs as faster as the underlying storage allows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-168) Support locality in consumer partition assignment algorithm
[ https://issues.apache.org/jira/browse/KAFKA-168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-168. - Resolution: Won't Fix Support locality in consumer partition assignment algorithm --- Key: KAFKA-168 URL: https://issues.apache.org/jira/browse/KAFKA-168 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jay Kreps There are some use-cases where it makes sense to co-locate brokers and consumer processes. In this case it would be nice to optimize the assignment of partitions to consumers so that the consumer preferentially consumes from the broker with which it is co-located. If we are going to do KAFKA-167, moving the assignment to the broker, it would make sense to do that first so we only have to change the logic in one place. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1) The log4j appender still uses the SyncProducer API
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1. --- Resolution: Invalid (was: Unresolved) The log4j appender still uses the SyncProducer API -- Key: KAFKA-1 URL: https://issues.apache.org/jira/browse/KAFKA-1 Project: Kafka Issue Type: Improvement Affects Versions: 0.6 The log4j appender still uses the SyncProducer API. Change it to use the Producer API using the StringEncoder instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1239) New producer checklist
[ https://issues.apache.org/jira/browse/KAFKA-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1239. -- Resolution: Fixed New producer checklist -- Key: KAFKA-1239 URL: https://issues.apache.org/jira/browse/KAFKA-1239 Project: Kafka Issue Type: New Feature Components: producer Affects Versions: 0.8.2 Reporter: Neha Narkhede Here is the list of todo items we have for the new producer (in no particular order): 1. Rename to org.apache.* package 2. Discuss config approach 3. Finalize config approach 4. Add slf4j logging for debugging purposes 5. Discuss metrics approach 6. Add metrics 7. Convert perf test to optionally use new producer 8. Get system tests passing with new producer 9. Write integration tests that test the producer against the real server 10. Expand unit test coverage a bit 11. Performance testing and analysis. 12. Add compression support 13. Discuss and perhaps add retry support 14. Discuss the approach to protocol definition and perhaps refactor a bit 15. Deeper code review 16. Convert mirror maker This doesn't count general bug fixing which I assume we will do as we find them. Let's file subtasks for each of the above, so there is a single place to track what's outstanding on the new producer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1655) Allow high performance SimpleConsumer use cases to still work with new Kafka 0.9 consumer APIs
[ https://issues.apache.org/jira/browse/KAFKA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1655. -- Resolution: Invalid Allow high performance SimpleConsumer use cases to still work with new Kafka 0.9 consumer APIs -- Key: KAFKA-1655 URL: https://issues.apache.org/jira/browse/KAFKA-1655 Project: Kafka Issue Type: New Feature Components: consumer Affects Versions: 0.9.0 Reporter: Valentin Hi guys, currently Kafka allows consumers to either chose the low level or the high level API, depending on the specific requirements of the consumer implementation. However, I was told that the current low level API (SimpleConsumer) will be deprecated once the new Kafka 0.9 consumer APIs are available. In this case it would be good, if we can ensure that the new API does offer some ways to get similar performance for use cases which perfectly fit the old SimpleConsumer API approach. Example Use Case: A high throughput HTTP API wrapper for consumer requests which gets HTTP REST calls to retrieve data for a specific set of topic partitions and offsets. Here the SimpleConsumer is perfect because it allows connection pooling in the HTTP API web application with one pool per existing kafka broker and the web application can handle the required metadata managment to know which pool to fetch a connection for, for each used topic partition. This means connections to Kafka brokers can remain open/pooled and connection/reconnection and metadata overhead is minimized. To achieve something similar with the new Kafka 0.9 consumer APIs, it would be good if it could: - provide a lowlevel call to connect to a specific broker and to read data from a topic+partition+offset OR - ensure that subscribe/unsubscribe calls are very cheap and can run without requiring any network traffic. If I subscribe to a topic partition for which the same broker is the leader as the last topic partition which was in use for this consumer API connection, then the consumer API implementation should recognize this and should not do any disconnects/reconnects and just reuse the existing connection to that kafka broker. Or put differently, it should be possible to do external metadata handling in the consumer API client and the client should be able to pool consumer API connections effectively by having one pool per Kafka broker. Greetings Valentin -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1930) Move server over to new metrics library
Jay Kreps created KAFKA-1930: Summary: Move server over to new metrics library Key: KAFKA-1930 URL: https://issues.apache.org/jira/browse/KAFKA-1930 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps We are using org.apache.kafka.common.metrics on the clients, but using Coda Hale metrics on the server. We should move the server over to the new metrics package as well. This will help to make all our metrics self-documenting. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1929) Convert core kafka module to use the errors in org.apache.kafka.common.errors
Jay Kreps created KAFKA-1929: Summary: Convert core kafka module to use the errors in org.apache.kafka.common.errors Key: KAFKA-1929 URL: https://issues.apache.org/jira/browse/KAFKA-1929 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps With the introduction of the common package there are now a lot of errors duplicated in both the common package and in the server. We should refactor the server code (but not the scala clients) to switch over to the exceptions in common. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1804) Kafka network thread lacks top exception handler
[ https://issues.apache.org/jira/browse/KAFKA-1804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310972#comment-14310972 ] Jay Kreps commented on KAFKA-1804: -- The remaining issue is the lack of logging. However we actually do set an uncaught exception handler that should log any uncaught exception. [~aozeritsky] is there any chance this was just showing up in a different log? Kafka network thread lacks top exception handler Key: KAFKA-1804 URL: https://issues.apache.org/jira/browse/KAFKA-1804 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Oleg Golovin Priority: Critical We have faced the problem that some kafka network threads may fail, so that jstack attached to Kafka process showed fewer threads than we had defined in our Kafka configuration. This leads to API requests processed by this thread getting stuck unresponed. There were no error messages in the log regarding thread failure. We have examined Kafka code to find out there is no top try-catch block in the network thread code, which could at least log possible errors. Could you add top-level try-catch block for the network thread, which should recover network thread in case of exception? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1638) transient unit test failure UncleanLeaderElectionTest
[ https://issues.apache.org/jira/browse/KAFKA-1638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310982#comment-14310982 ] Jay Kreps commented on KAFKA-1638: -- [~junrao] is this still happening? I haven't seen it. transient unit test failure UncleanLeaderElectionTest - Key: KAFKA-1638 URL: https://issues.apache.org/jira/browse/KAFKA-1638 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie++ Saw the following transient unit test failure. kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabled FAILED java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/1. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering. at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:174) at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:63) at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:45) at kafka.server.KafkaServer.startup(KafkaServer.scala:121) at kafka.integration.UncleanLeaderElectionTest$$anonfun$verifyUncleanLeaderElectionEnabled$8.apply(UncleanLeaderElectionTest.scala:187) at kafka.integration.UncleanLeaderElectionTest$$anonfun$verifyUncleanLeaderElectionEnabled$8.apply(UncleanLeaderElectionTest.scala:187) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionEnabled(UncleanLeaderElectionTest.scala:187) at kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionEnabled(UncleanLeaderElectionTest.scala:106) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-881) Kafka broker not respecting log.roll.hours
[ https://issues.apache.org/jira/browse/KAFKA-881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-881. - Resolution: Won't Fix Closing due to ancientness. Kafka broker not respecting log.roll.hours -- Key: KAFKA-881 URL: https://issues.apache.org/jira/browse/KAFKA-881 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.7.2 Reporter: Dan F Assignee: Jay Kreps Attachments: kafka-roll-0.8.patch, kafka-roll.again.patch, kafka_roll.patch We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs would be rolled every hour, or more. Only, sometimes logs that are many hours (sometimes days) old have more data added to them. This perturbs our systems for reasons I won't get in to. I don't know Scala or Kafka well, but I have proposal for why this might happen: upon restart, a broker forgets when its log files have been appended to (firstAppendTime). Then a potentially infinite amount of time later, the restarted broker receives another message for the particular (topic, partition), and starts the clock again. It will then roll over that log after an hour. https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala says: /* the maximum time before a new log segment is rolled out */ val logRollHours = Utils.getIntInRange(props, log.roll.hours, 24*7, (1, Int.MaxValue)) https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala has maybeRoll, which needs segment.firstAppendTime defined. It also has updateFirstAppendTime() which says if it's empty, then set it. If my hypothesis is correct about why it is happening, here is a case where rolling is longer than an hour, even on a high volume topic: - write to a topic for 20 minutes - restart the broker - wait for 5 days - write to a topic for 20 minutes - restart the broker - write to a topic for an hour The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long as you want. Proposed solution: The very easiest thing to do would be to have Kafka re-initialized firstAppendTime with the file creation time. Unfortunately, there is no file creation time in UNIX. There is ctime, change time, updated when a file's inode information is changed. One solution is to embed the firstAppendTime in the filename (say, seconds since epoch). Then when you open it you could reset firstAppendTime to exactly what it really was. This ignores clock drift or resetting. One could set firstAppendTime to min(filename-based time, current time). A second solution is to make the Kafka log roll over at specific times, regardless of when the file was created. Conceptually, time can be divided into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, when firstAppendTime is empty, compute the next rollover time (say, next = (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file mtime (last modified) is before the current rollover window ( (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll over when you cross next, and reset next. A third solution (not perfect, but an approximation at least) would be to not to write to a segment if firstAppendTime is not defined and the timestamp on the file is more than log.roll.hours old. There are probably other solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-864) bin/kafka-gen-reassignment.sh
[ https://issues.apache.org/jira/browse/KAFKA-864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-864. - Resolution: Duplicate I think this is being pursued on another ticket. bin/kafka-gen-reassignment.sh - Key: KAFKA-864 URL: https://issues.apache.org/jira/browse/KAFKA-864 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.0 Reporter: Scott Clasen Better tooling for replacing failed brokers and reassigning partitions. The output of ./bin/kafka-list-topic.sh makes doing this in bulk painful. Tool should output json format acceptable to ./bin/kafka-reassign-partitions.sh Spec: bin/kafka-gen-reassignment.sh Option Description -- --- --topic topic REQUIRED: The topic to be reassigned. Defaults to all existing topics. (default: ) --partition partitionREQUIRED: The partition to be reassigned. Defaults to all partitions. (default: ) --from broker-idREQUIRED: The broker to reassign the partition from --to broker-idREQUIRED: The broker to reassign the partition to. --zookeeper urls REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. Workflow: Replacing a broker ./bin/kafka-gen-reassignment.sh --zookeeper zks --from failed --to newreassign.json ./bin/kafka-reassign-partitions.sh --zookeeper zks --path-to-json-file reassign.json -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-493) High CPU usage on inactive server
[ https://issues.apache.org/jira/browse/KAFKA-493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-493. - Resolution: Incomplete High CPU usage on inactive server - Key: KAFKA-493 URL: https://issues.apache.org/jira/browse/KAFKA-493 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.0 Reporter: Jay Kreps Fix For: 0.9.0 Attachments: Kafka-2014-11-10.snapshot.zip, Kafka-sampling1.zip, Kafka-sampling2.zip, Kafka-sampling3.zip, Kafka-trace1.zip, Kafka-trace2.zip, Kafka-trace3.zip, backtraces.txt, stacktrace.txt I've been playing with the 0.8 branch of Kafka and noticed that idle CPU usage is fairly high (13% of a core). Is that to be expected? I did look at the stack, but didn't see anything obvious. A background task? I wanted to mention how I am getting into this state. I've set up two machines with the latest 0.8 code base and am using a replication factor of 2. On starting the brokers there is no idle CPU activity. Then I run a test that essential does 10k publish operations followed by immediate consume operations (I was measuring latency). Once this has run the kafka nodes seem to consistently be consuming CPU essentially forever. hprof results: THREAD START (obj=53ae, id = 24, name=RMI TCP Accept-0, group=system) THREAD START (obj=53ae, id = 25, name=RMI TCP Accept-, group=system) THREAD START (obj=53ae, id = 26, name=RMI TCP Accept-0, group=system) THREAD START (obj=53ae, id = 21, name=main, group=main) THREAD START (obj=53ae, id = 27, name=Thread-2, group=main) THREAD START (obj=53ae, id = 28, name=Thread-3, group=main) THREAD START (obj=53ae, id = 29, name=kafka-processor-9092-0, group=main) THREAD START (obj=53ae, id = 200010, name=kafka-processor-9092-1, group=main) THREAD START (obj=53ae, id = 200011, name=kafka-acceptor, group=main) THREAD START (obj=574b, id = 200012, name=ZkClient-EventThread-20-localhost:2181, group=main) THREAD START (obj=576e, id = 200014, name=main-SendThread(), group=main) THREAD START (obj=576d, id = 200013, name=main-EventThread, group=main) THREAD START (obj=53ae, id = 200015, name=metrics-meter-tick-thread-1, group=main) THREAD START (obj=53ae, id = 200016, name=metrics-meter-tick-thread-2, group=main) THREAD START (obj=53ae, id = 200017, name=request-expiration-task, group=main) THREAD START (obj=53ae, id = 200018, name=request-expiration-task, group=main) THREAD START (obj=53ae, id = 200019, name=kafka-request-handler-0, group=main) THREAD START (obj=53ae, id = 200020, name=kafka-request-handler-1, group=main) THREAD START (obj=53ae, id = 200021, name=Thread-6, group=main) THREAD START (obj=53ae, id = 200022, name=Thread-7, group=main) THREAD START (obj=5899, id = 200023, name=ReplicaFetcherThread-0-2 on broker 1, , group=main) THREAD START (obj=5899, id = 200024, name=ReplicaFetcherThread-0-3 on broker 1, , group=main) THREAD START (obj=5899, id = 200025, name=ReplicaFetcherThread-0-0 on broker 1, , group=main) THREAD START (obj=5899, id = 200026, name=ReplicaFetcherThread-0-1 on broker 1, , group=main) THREAD START (obj=53ae, id = 200028, name=SIGINT handler, group=system) THREAD START (obj=53ae, id = 200029, name=Thread-5, group=main) THREAD START (obj=574b, id = 200030, name=Thread-1, group=main) THREAD START (obj=574b, id = 200031, name=Thread-0, group=main) THREAD END (id = 200031) THREAD END (id = 200029) THREAD END (id = 200020) THREAD END (id = 200019) THREAD END (id = 28) THREAD END (id = 200021) THREAD END (id = 27) THREAD END (id = 200022) THREAD END (id = 200018) THREAD END (id = 200017) THREAD END (id = 200012) THREAD END (id = 200013) THREAD END (id = 200014) THREAD END (id = 200025) THREAD END (id = 200023) THREAD END (id = 200026) THREAD END (id = 200024) THREAD END (id = 200011) THREAD END (id = 29) THREAD END (id = 200010) THREAD END (id = 200030) THREAD END (id = 200028) TRACE 301281: sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown line) sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228) sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:81) sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:218) sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
[jira] [Commented] (KAFKA-1831) Producer does not provide any information about which host the data was sent to
[ https://issues.apache.org/jira/browse/KAFKA-1831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311042#comment-14311042 ] Jay Kreps commented on KAFKA-1831: -- The new producer returns RecordMetadata on each request. This contains the partition to which the record was written. The producer will also give you the current state of the cluster which will show where the leader for that partition is. I think this is probably sufficient. Producer does not provide any information about which host the data was sent to --- Key: KAFKA-1831 URL: https://issues.apache.org/jira/browse/KAFKA-1831 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.1.1 Reporter: Mark Payne Assignee: Jun Rao For traceability purposes and for troubleshooting, when sending data to Kafka, the Producer should provide information about which host the data was sent to. This works well already in the SimpleConsumer, which provides host() and port() methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1831) Producer does not provide any information about which host the data was sent to
[ https://issues.apache.org/jira/browse/KAFKA-1831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1831. -- Resolution: Fixed I think the new producer fixes this, reopen if you disagree. Producer does not provide any information about which host the data was sent to --- Key: KAFKA-1831 URL: https://issues.apache.org/jira/browse/KAFKA-1831 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.1.1 Reporter: Mark Payne Assignee: Jun Rao For traceability purposes and for troubleshooting, when sending data to Kafka, the Producer should provide information about which host the data was sent to. This works well already in the SimpleConsumer, which provides host() and port() methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1908) Split brain
[ https://issues.apache.org/jira/browse/KAFKA-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311109#comment-14311109 ] Gwen Shapira commented on KAFKA-1908: - I think its a multi-lan scenario. The broker can bind on all available interfaces (0.0.0.0). If the port is blocked on inter-broker interface but not on the network between clients and brokers, the scenario described seems possible (Although I didn't try to replicate myself). In other clusters, this scenario is prevented by having brokers check access to each other periodically (heartbeat) and validate against ZK. If a node is visible in ZK but not accessible in the network, the minority partition is killed (STONITH, or using ZK to case a node to commit suicide) and the majority triggers leader election. Not a simple mechanism to add to Kafka. And I'm not sure if this is a common enough issue to warrant the complexity involved. Split brain --- Key: KAFKA-1908 URL: https://issues.apache.org/jira/browse/KAFKA-1908 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Alexey Ozeritskiy In some cases, there may be two leaders for one partition. Steps to reproduce: # We have 3 brokers, 1 partition with 3 replicas: {code} TopicAndPartition: [partition,0]Leader: 1 Replicas: [2,1,3] ISR: [1,2,3] {code} # controller works on broker 3 # let the kafka port be 9092. We execute on broker 1: {code} iptables -A INPUT -p tcp --dport 9092 -j REJECT {code} # Initiate replica election # As a result: Broker 1: {code} TopicAndPartition: [partition,0]Leader: 1 Replicas: [2,1,3] ISR: [1,2,3] {code} Broker 2: {code} TopicAndPartition: [partition,0]Leader: 2 Replicas: [2,1,3] ISR: [1,2,3] {code} # Flush the iptables rules on broker 1 Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not receive new data. A consumer can read data from replica-1 or replica-2. When it reads from replica-1 it resets the offsets and than can read duplicates from replica-2. We saw this situation in our production cluster when it had network problems. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals
[ https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311136#comment-14311136 ] Guozhang Wang commented on KAFKA-1805: -- Thanks for the patch [~parth.brahmbhatt], some comments added by Ewen and myself in the RB. Kafka ProducerRecord should implement equals Key: KAFKA-1805 URL: https://issues.apache.org/jira/browse/KAFKA-1805 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2 Reporter: Thomas Omans Assignee: Thomas Omans Priority: Minor Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch I was writing some tests to verify that I am calculating my partitions, topics, keys, and values properly in my producer code and discovered that ProducerRecord does not implement equality. This makes tests integrating kafka particularly awkward. https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java I can whip up a patch since this is essentially just a value object. Thanks, Thomas Omans -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1709) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1709. -- Resolution: Invalid This didn't turn out to be a deadlock. [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition Key: KAFKA-1709 URL: https://issues.apache.org/jira/browse/KAFKA-1709 Project: Kafka Issue Type: Bug Components: producer Environment: Development Reporter: Bhavesh Mistry Assignee: Jun Rao Priority: Critical Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1587) Possible Memory Leak when we use Kafka 8.0 Producer for sending messages
[ https://issues.apache.org/jira/browse/KAFKA-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-1587. -- Resolution: Won't Fix Possible Memory Leak when we use Kafka 8.0 Producer for sending messages Key: KAFKA-1587 URL: https://issues.apache.org/jira/browse/KAFKA-1587 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0 Reporter: Gopinath Sundaram Priority: Critical Hi Kafka team, We use Kafka to send messages in an high volume/memory crazy application which uses Parallel GC. We send messages at the rate of 12500/min in the first few hours and then the number of messages drop down to 6000/min. Our application usually runs for a maximum of 24 hours What we have: 1) When we do not send messages through Kafka Producer 0.8, then our application never slows down much and our entire process completes within 24 hours 2) When we use Kafka, our machines slow down in sending messages to around 2500/min and as time progresses, the number of messages being sent is even lesser 3) We suspect that our application spends more time in GC and hence the problem. The Heap Dump does not contain an leak suspect with Kafka, but this slowness happens only when Kafka messaging system is used. Any pointers that could help us resolve this issue will be highly appreciated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-936) Kafka Metrics Memory Leak
[ https://issues.apache.org/jira/browse/KAFKA-936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310961#comment-14310961 ] Jay Kreps commented on KAFKA-936: - [~junrao] Does this issue still exist? Kafka Metrics Memory Leak -- Key: KAFKA-936 URL: https://issues.apache.org/jira/browse/KAFKA-936 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: centos linux, jdk 1.6, jboss Reporter: Senthil Chittibabu Assignee: Neha Narkhede Priority: Critical I am using kafka_2.8.0-0.8.0-SNAPSHOT version. I am running into OutOfMemoryError in PermGen Space. I have set the -XX:MaxPermSize=512m, but I still get the same error. I used profiler to trace the memory leak, and found the following kafka classes to be the cause for the memory leak. Please let me know if you need any additional information to debug this issue. kafka.server.FetcherLagMetrics kafka.consumer.FetchRequestAndResponseMetrics kafka.consumer.FetchRequestAndResponseStats kafka.metrics.KafkaTimer kafka.utils.Pool -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1408) Kafk broker can not stop itself normaly after problems with connection to ZK
[ https://issues.apache.org/jira/browse/KAFKA-1408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310973#comment-14310973 ] Jay Kreps commented on KAFKA-1408: -- [~junrao], [~dmitrybugaychenko] is this still active? Kafk broker can not stop itself normaly after problems with connection to ZK Key: KAFKA-1408 URL: https://issues.apache.org/jira/browse/KAFKA-1408 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede After getting to inconsistence state due to short netwrok failure broker can not stop itself. The last message in the log is: {code} INFO | jvm 1| 2014/04/21 08:53:07 | [2014-04-21 09:53:06,999] INFO [kafka-log-cleaner-thread-0], Stopped (kafka.log.LogCleaner) INFO | jvm 1| 2014/04/21 08:53:07 | [2014-04-21 09:53:06,999] INFO [kafka-log-cleaner-thread-0], Shutdown completed (kafka.log.LogCleaner) {code} There is also a preceding error: {code} INFO | jvm 1| 2014/04/21 08:52:55 | [2014-04-21 09:52:55,015] WARN Controller doesn't exist (kafka.utils.Utils$) INFO | jvm 1| 2014/04/21 08:52:55 | kafka.common.KafkaException: Controller doesn't exist INFO | jvm 1| 2014/04/21 08:52:55 | at kafka.utils.ZkUtils$.getController(ZkUtils.scala:70) INFO | jvm 1| 2014/04/21 08:52:55 | at kafka.server.KafkaServer.kafka$server$KafkaServer$$controlledShutdown(KafkaServer.scala:148) INFO | jvm 1| 2014/04/21 08:52:55 | at kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$sp(KafkaServer.scala:220) {code} Here is a part of jstack (it looks like there is a deadlock between delete-topics-thread and ZkClient-EventThread): {code} IWrapper-Connection id=10 state=WAITING - waiting on 0x15d6aa44 (a java.util.concurrent.locks.ReentrantLock$NonfairSync) - locked 0x15d6aa44 (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by ZkClient-EventThread-37-devlnx2:2181 id=37 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:641) at kafka.server.KafkaServer$$anonfun$shutdown$8.apply$mcV$sp(KafkaServer.scala:233) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:233) at odkl.databus.server.Main.stop(Main.java:184) at org.tanukisoftware.wrapper.WrapperManager.stopInner(WrapperManager.java:1982) at org.tanukisoftware.wrapper.WrapperManager.handleSocket(WrapperManager.java:2391) at org.tanukisoftware.wrapper.WrapperManager.run(WrapperManager.java:2696) at java.lang.Thread.run(Thread.java:744) ZkClient-EventThread-37-devlnx2:2181 id=37 state=WAITING - waiting on 0x3d5f9878 (a java.util.concurrent.CountDownLatch$Sync) - locked 0x3d5f9878 (a java.util.concurrent.CountDownLatch$Sync) at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236) at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337) at
[jira] [Updated] (KAFKA-1416) Unify sendMessages/getMessages in unit tests
[ https://issues.apache.org/jira/browse/KAFKA-1416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1416: - Labels: newbie (was: ) Unify sendMessages/getMessages in unit tests Key: KAFKA-1416 URL: https://issues.apache.org/jira/browse/KAFKA-1416 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Labels: newbie Multiple unit tests have its own internal function to send/get messages from the brokers. For example: sendMessages in ZookeeperConsumerConnectorTest produceMessage in UncleanLeaderElectionTest sendMessages in FetcherTest etc It is better to unify them in TestUtils. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-867) producer transaction issue when metrics lib not correct
[ https://issues.apache.org/jira/browse/KAFKA-867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-867. - Resolution: Incomplete No follow-up. producer transaction issue when metrics lib not correct --- Key: KAFKA-867 URL: https://issues.apache.org/jira/browse/KAFKA-867 Project: Kafka Issue Type: Bug Reporter: Ian Roughley When building a producer, I used a different version of metrics (v2.2.0). What I found in this case was that I could deploy a producer, however the consumer received 4 copies of each message I sent. Further investigation let to: - the timer start method was found in the producer code - the message was sent to the broker - the timer method was not found in the producer code, causing an exception to be thrown - the exception caused a retry (3 times) - as the message was already at the broker, there was no rollback Not sure if this warrant a higher level of handshaking with the broker. But it is a fairly simple issue to cause (given kafka's currently buit with a non-published metrics version), that may cause miss-understanding. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-865) Mavenize and separate the client.
[ https://issues.apache.org/jira/browse/KAFKA-865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-865. - Resolution: Duplicate Mavenize and separate the client. - Key: KAFKA-865 URL: https://issues.apache.org/jira/browse/KAFKA-865 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.0 Reporter: Ashwanth Fernando It seems that the java client for Kafka is also bundled with the server JAR file and this is generated using sbt package. This is difficult for java folks to work with because: 1) Many java shops use maven and they want to specify the GAV of kafka in their pom and bang, the client jar and all its dependencies should be added to the application's classpath. I can't do that right now, because I need to run ./sbt eclipse, get the .JAR, add that to my classpath, add a whole lot of dependencies (log4j, slf4j, zkClient and so on) manually, which is a pain. There are 90 million maven central uploads/downloads in 2012 alone. Almost all the java shops out there have maven (either central or in house sonatype). 2) Separation of concerns - keeping the server (core) and the client's classes together in same jar file, increases the size of the bundle for a client and also everytime the server's code changes and a release is performed, the client also needs to update their .JAR file. which is not very great. We don't want a ton of clients to update their .JAR file, just because a faster replication strategy for the kafka server cluster changed in a new release. Action items are to separate the client and server portions of Kafka, add it in a pom along with the compile time dependencies and upload it to Maven Central or if you have a LinkedIn externally exposed Nexus, over there. This will increase adoption of the Kafka framework. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-715) NumberFormatException in PartitionStateInfo
[ https://issues.apache.org/jira/browse/KAFKA-715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-715. - Resolution: Duplicate NumberFormatException in PartitionStateInfo --- Key: KAFKA-715 URL: https://issues.apache.org/jira/browse/KAFKA-715 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.0 Reporter: Chris Riccomini Assignee: Neha Narkhede Hey Guys, During a broker restart, I got this exception: 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:host.name=eat1-qa466.corp.linkedin.com 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.version=1.6.0_21 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.vendor=Sun Microsystems Inc. 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.home=/export/apps/jdk/JDK-1_6_0_21/jre 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.class.path=/export/apps/jdk/JDK-1_6_0_21/lib/tools.jar:lib/activation-1.0.2.jar:lib/ant-1.6.5.jar:lib/aopalliance-1.0.jar:lib/cfg-2.8.0.jar:lib/cfg-api-6.6.6.jar:lib/cfg-impl-6.6.6.jar:lib/com.linkedin.customlibrary.j2ee-1.0.jar:lib/com.linkedin.customlibrary.mx4j-3.0.2.jar:lib/com.linkedin.customlibrary.xmsg-0.6.jar:lib/commons-beanutils-1.7.0.jar:lib/commons-cli-1.0.jar:lib/commons-lang-2.4.jar:lib/commons-logging-1.1.jar:lib/configuration-api-1.4.8.jar:lib/configuration-repository-impl-1.4.8.jar:lib/container-management-impl-1.1.15.jar:lib/container-server-1.1.15.jar:lib/emweb-impl-1.1.15.jar:lib/jaxen-1.1.1.jar:lib/jdom-1.0.jar:lib/jetty-6.1.26.jar:lib/jetty-management-6.1.26.jar:lib/jetty-naming-6.1.26.jar:lib/jetty-plus-6.1.26.jar:lib/jetty-util5-6.1.26.jar:lib/jetty-util-6.1.26.jar:lib/jmx-impl-1.4.8.jar:lib/json-simple-1.1.jar:lib/jsp-2.1-6.1.1.jar:lib/jsp-api-2.1-6.1.1.jar:lib/lispring-lispring-core-1.4.8.jar:lib/lispring-lispring-servlet-1.4.8.jar:lib/log4j-1.2.15.jar:lib/mail-1.3.0.jar:lib/mx4j-tools-3.0.2.jar:lib/servlet-api-2.5.jar:lib/spring-aop-3.0.3.jar:lib/spring-asm-3.0.3.jar:lib/spring-aspects-3.0.3.jar:lib/spring-beans-3.0.3.jar:lib/spring-context-3.0.3.jar:lib/spring-context-support-3.0.3.jar:lib/spring-core-3.0.3.jar:lib/spring-expression-3.0.3.jar:lib/spring-jdbc-3.0.3.jar:lib/spring-jms-3.0.3.jar:lib/spring-orm-3.0.3.jar:lib/spring-transaction-3.0.3.jar:lib/spring-web-3.0.3.jar:lib/spring-web-servlet-3.0.3.jar:lib/util-core-4.0.40.jar:lib/util-i18n-4.0.40.jar:lib/util-jmx-4.0.22.jar:lib/util-log-4.0.40.jar:lib/util-servlet-4.0.40.jar:lib/util-xmsg-4.0.40.jar:lib/xml-apis-1.3.04.jar 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.library.path=/export/apps/jdk/JDK-1_6_0_21/jre/lib/amd64/server:/export/apps/jdk/JDK-1_6_0_21/jre/lib/amd64:/export/apps/jdk/JDK-1_6_0_21/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.io.tmpdir=/export/content/glu/apps/kafka/i001/tmp 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:java.compiler=NA 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:os.name=Linux 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:os.arch=amd64 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:os.version=2.6.32-220.13.1.el6.x86_64 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:user.name=app 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:user.home=/home/app 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] [] Client environment:user.dir=/export/content/glu/apps/kafka/i001 2013/01/21 19:21:10.919 INFO [ZooKeeper] [main] [kafka] [] Initiating client connection, connectString=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1bfdbab5 2013/01/21 19:21:10.932 INFO [ClientCnxn] [main-SendThread()] [kafka] [] Opening socket connection to server eat1-app313.corp.linkedin.com/172.20.72.73:12913 2013/01/21 19:21:10.933 INFO [ClientCnxn] [main-SendThread(eat1-app313.corp.linkedin.com:12913)] [kafka] [] Socket connection established to eat1-app313.corp.linkedin.com/172.20.72.73:12913, initiating session 2013/01/21 19:21:10.963 INFO [ClientCnxn]
[jira] [Resolved] (KAFKA-711) NPE in TRUNK broker
[ https://issues.apache.org/jira/browse/KAFKA-711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-711. - Resolution: Invalid NPE in TRUNK broker --- Key: KAFKA-711 URL: https://issues.apache.org/jira/browse/KAFKA-711 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: John Fung Assignee: Jay Kreps This issue happens in Trunk only. ** To reproduce the issue: 1. Start zookeeper: $ bin/zookeeper-server-start.sh config/zookeeper.properties 2. Start broker: $ bin/kafka-server-start.sh config/server.properties 3. Start producer: $ bin/kafka-run-class.sh kafka.perf.ProducerPerformance --broker-list localhost:9092 --messages 100 --topics topic_001 --batch-size 50 --threads 1 --message-size 200 ** Broker exception: [2013-01-18 10:11:41,241] ERROR Replica Manager on Broker 0: Error processing leaderAndISR request LeaderAndIsrRequest(0,5,,1000,Map((topic_001,0) - PartitionStateInfo(LeaderIsrAndControllerEpoch({ ISR:0,leader:0,leaderEpoch:0 },1),1)),Set(id:0,host:jfung-ld.linkedin.biz,port:9092),1) (kafka.server.ReplicaManager) java.lang.NullPointerException at kafka.log.OffsetIndex.entries(OffsetIndex.scala:285) at kafka.log.OffsetIndex$$anonfun$1.apply(OffsetIndex.scala:88) at kafka.log.OffsetIndex$$anonfun$1.apply(OffsetIndex.scala:88) at kafka.utils.Logging$class.info(Logging.scala:67) at kafka.log.OffsetIndex.info(OffsetIndex.scala:52) at kafka.log.OffsetIndex.init(OffsetIndex.scala:87) at kafka.log.LogSegment.init(LogSegment.scala:37) at kafka.log.Log.loadSegments(Log.scala:132) at kafka.log.Log.init(Log.scala:75) at kafka.log.LogManager.createLogIfNotExists(LogManager.scala:202) at kafka.log.LogManager.getOrCreateLog(LogManager.scala:179) at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:77) at kafka.cluster.Partition$$anonfun$1.apply(Partition.scala:142) at kafka.cluster.Partition$$anonfun$1.apply(Partition.scala:142) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.List.map(List.scala:45) at kafka.cluster.Partition.makeLeader(Partition.scala:142) at kafka.server.ReplicaManager.kafka$server$ReplicaManager$$makeLeader(ReplicaManager.scala:219) at kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:199) at kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:191) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:191) at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:155) at kafka.server.KafkaApis.handle(KafkaApis.scala:65) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Thread.java:662) ** ProducerPerformance exception: [2013-01-18 10:07:02,779] ERROR Error in handling batch of 50 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:87) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:254) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-662) Create testcases for unclean shut down
[ https://issues.apache.org/jira/browse/KAFKA-662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-662. - Resolution: Fixed Create testcases for unclean shut down -- Key: KAFKA-662 URL: https://issues.apache.org/jira/browse/KAFKA-662 Project: Kafka Issue Type: Task Reporter: John Fung Assignee: John Fung -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-663) Add deploy feature to System Test
[ https://issues.apache.org/jira/browse/KAFKA-663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-663. - Resolution: Won't Fix Not under active development. Add deploy feature to System Test --- Key: KAFKA-663 URL: https://issues.apache.org/jira/browse/KAFKA-663 Project: Kafka Issue Type: Task Reporter: John Fung Assignee: John Fung -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-321) Remove dead brokers from ProducerPool
[ https://issues.apache.org/jira/browse/KAFKA-321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-321. - Resolution: Fixed Remove dead brokers from ProducerPool - Key: KAFKA-321 URL: https://issues.apache.org/jira/browse/KAFKA-321 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.0 Reporter: Prashanth Menon Labels: replication Currently, the ProducerPool does not remove producers that are tied to dead brokers. Keeping such producers around can adversely effect normal producer operation by handing them out and failling - one such scenario is when updating cached topic metadata. It's best if we remove any producers that are tied to dead brokers to avoid such situations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-299) Change broker request and response to use Seqs rather than Array
[ https://issues.apache.org/jira/browse/KAFKA-299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-299. - Resolution: Fixed Change broker request and response to use Seqs rather than Array Key: KAFKA-299 URL: https://issues.apache.org/jira/browse/KAFKA-299 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.0 Reporter: Prashanth Menon Assignee: Prashanth Menon Labels: 0.8, replication, wireprotocol The new Produce and Fetch request and response classes use primitive Arrays, but becaue they are case classes and Java's array hashCode/equals functionality is broken, the case class equality contract is broken as well. We should change the models to use Seqs to resolve the issue along with gaining all the functional benefits that goes along with it. This change will require appropriate Java versions to convert between Array's and Seqs for Java clients. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-260) Add audit trail to kafka
[ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps resolved KAFKA-260. - Resolution: Won't Fix Add audit trail to kafka Key: KAFKA-260 URL: https://issues.apache.org/jira/browse/KAFKA-260 Project: Kafka Issue Type: New Feature Affects Versions: 0.8.0 Reporter: Jay Kreps Assignee: Jay Kreps Attachments: Picture 18.png, kafka-audit-trail-draft.patch LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical tier through which data passes produce messages to a central audit-trail topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be producer, broker, hadoop-etl, etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers. This turns out to be extremely useful. We also have an application that balances the books and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any). This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data. Some details, the proposed format of the data is JSON using the following format for messages: { time:1301727060032, // the timestamp at which this audit message is sent topic: my_topic_name, // the topic this audit data is for tier:producer, // a user-defined tier name bucket_start: 130172640, // the beginning of the time bucket this data applies to bucket_end: 130172700, // the end of the time bucket this data applies to host:my_host_name.datacenter.linkedin.com, // the server that this was sent from datacenter:hlx32, // the datacenter this occurred in application:newsfeed_service, // a user-defined application name guid:51656274-a86a-4dff-b824-8e8e20a6348f, // a unique identifier for this message count:43634 } DISCUSSION Time is complex: 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events. 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable. For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication). Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though. This would integrate into the client (producer and consumer both) in the following way: 1. The user provides a way to get timestamps from messages (required) 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to producer and get the hostname from the OS). The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier. LOGISTICS I would recommend the following steps: 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called audit to house this application. At this point it would just be sitting