[jira] [Commented] (KAFKA-1920) Add a metric to count client side errors in BrokerTopicMetrics

2015-02-07 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310783#comment-14310783
 ] 

Neha Narkhede commented on KAFKA-1920:
--

[~aauradkar] What sort of client side errors do you have in mind?

 Add a metric to count client side errors in BrokerTopicMetrics
 --

 Key: KAFKA-1920
 URL: https://issues.apache.org/jira/browse/KAFKA-1920
 Project: Kafka
  Issue Type: Improvement
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar

 Currently the BrokerTopicMetrics count only failures across all topics and 
 for individual topics. Should we consider adding a metric to count the number 
 of client side errors?
 This essentially counts the number of bad requests per topic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-620) UnknownHostError looking for a ZK node crashes the broker

2015-02-07 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310790#comment-14310790
 ] 

Neha Narkhede commented on KAFKA-620:
-

[~guozhang] It does. Feel free to close this as a duplicate so we can track the 
problem as part of KAFKA-1082.

 UnknownHostError looking for a ZK node crashes the broker
 -

 Key: KAFKA-620
 URL: https://issues.apache.org/jira/browse/KAFKA-620
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7.1
 Environment: linux. Amazon's AMI
Reporter: Matthew Rathbone

 If you totally kill a zookeeper node so that it's hostname no longer resolves 
 to anything, the broker will die with a java.net.UnknownHostException.
 You will then be unable to restart the broker until the unknown host(s) is 
 removed from the server.properties.
 We ran into this issue while testing our resilience to widespread AWS 
 outages, if you can point me to the right place, I could have a go at fixing 
 it? Unfortunately, I suspect the issue might be in the non-standard Zookeeper 
 library that kafka uses.
 Here's the stack trace:
 org.I0Itec.zkclient.exception.ZkException: Unable to connect to [list of 
 zookeepers]
   at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:66)
   at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:872)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84)
   at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
   at kafka.log.LogManager.init(LogManager.scala:87)
   at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
   at 
 kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
   at kafka.Kafka$.main(Kafka.scala:50)
   at kafka.Kafka.main(Kafka.scala)
 Caused by: java.net.UnknownHostException: zk-101
   at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
   at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:850)
   at java.net.InetAddress.getAddressFromNameService(InetAddress.java:1201)
   at java.net.InetAddress.getAllByName0(InetAddress.java:1154)
   at java.net.InetAddress.getAllByName(InetAddress.java:1084)
   at java.net.InetAddress.getAllByName(InetAddress.java:1020)
   at org.apache.zookeeper.ClientCnxn.init(ClientCnxn.java:387)
   at org.apache.zookeeper.ClientCnxn.init(ClientCnxn.java:332)
   at org.apache.zookeeper.ZooKeeper.init(ZooKeeper.java:383)
   at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:64)
   ... 9 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-02-07 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1809:

Attachment: KAFKA-1809.v1.patch

For some reason the latest patch generated by patch tool fails to apply on 
trunk (possibly due to multiple commits and rebases). Uploading a patch 
generated by git diff directly.

 Refactor brokers to allow listening on multiple ports and IPs 
 --

 Key: KAFKA-1809
 URL: https://issues.apache.org/jira/browse/KAFKA-1809
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
 KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
 KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
 KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
 KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
 KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
 KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
 KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
 KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch


 The goal is to eventually support different security mechanisms on different 
 ports. 
 Currently brokers are defined as host+port pair, and this definition exists 
 throughout the code-base, therefore some refactoring is needed to support 
 multiple ports for a single broker.
 The detailed design is here: 
 https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-1903) Zk Expiration causes controller deadlock

2015-02-07 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede closed KAFKA-1903.


 Zk Expiration causes controller deadlock
 

 Key: KAFKA-1903
 URL: https://issues.apache.org/jira/browse/KAFKA-1903
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.8.1, 0.8.1.1
 Environment: java version 1.7.0_55
 Java(TM) SE Runtime Environment (build 1.7.0_55-b13)
 Java HotSpot(TM) 64-Bit Server VM (build 24.55-b03, mixed mode)
 kafka_2.9.2-0.8.1
Reporter: yufeng.chen
Assignee: Neha Narkhede
Priority: Critical

 when controller encounter a ZK expired, zookeeper node /broker/ids  lost one 
 kafkk controler. If there has three node, e.g. 1 2, 3; and the 1 start 
 delete-topic-method thread. At this time, node 1 will lost. Why? The reason 
 is that: when ZK expiration happened,  the zk-event-thread will call 
 KafkaController.SessionExpirationListener.handleNewSession method. if the 
 zk-event-thread has the controllerContext.controllerLock, will call 
 onControllerResignation-deleteTopicManager.shutdown()-deleteTopicsThread.shutdown().
  And the delete-topic-thread is working, and await at 
 awaitTopicDeletionNotification() method。 Zk-event-thread call 
 deleteTopicsThread.shutdown() and wait until the run() method execute 
 compelely. Because the zk-event-thread has the lock,  
 deleteTopicsCond.await() whill not be really interruted  . Then 
 zk-event-thread whill pause,  not execute the 
 kafkaHealthcheck-SessionExpireListener.handleNewSession。 The controller will 
 not register again. The jstack log :
 delete-topics-thread prio=10 tid=0x7fb0bc21b000 nid=0x2825 waiting on 
 condition [0x7fb0f534a000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0xe4952da0 (a 
 java.util.concurrent.locks.ReentrantLock$NonfairSync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2047)
 at 
 kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$awaitTopicDeletionNotification(TopicDeletionManager.scala:178)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:334)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:333)
 at kafka.utils.Utils$.inLock(Utils.scala:538)
 at 
 kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:333)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
 ZkClient-EventThread-12-10.3.63.8:2181,10.3.63.9:2181 daemon prio=10 
 tid=0x7fb10038e800 nid=0x7d93 waiting on condition [0x7fb0f544a000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0xe4f4a760 (a 
 java.util.concurrent.CountDownLatch$Sync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
 at 
 kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
 at 
 kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
 at 
 kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
 at 
 kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
 at kafka.utils.Utils$.inLock(Utils.scala:538)
 at 
 kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
 at 
 

[jira] [Commented] (KAFKA-1908) Split brain

2015-02-07 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310901#comment-14310901
 ] 

Neha Narkhede commented on KAFKA-1908:
--

Thanks for sharing this test case! 

bq. A consumer can read data from replica-1 or replica-2. When it reads from 
replica-1 it resets the offsets and than can read duplicates from replica-2.

When the consumer wants to consume, it first issues a metadata request asking 
one of the brokers who the leader for partition 0 is. In your test, only 
brokers 2 and 3 can serve that metadata request and will end up telling the 
consumer to consume from broker 2 since it is the new leader. I'm not sure I 
understood how the consumer ends up consuming from broker 1 when its port is 
disabled?

 Split brain
 ---

 Key: KAFKA-1908
 URL: https://issues.apache.org/jira/browse/KAFKA-1908
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Alexey Ozeritskiy

 In some cases, there may be two leaders for one partition.
 Steps to reproduce:
 # We have 3 brokers, 1 partition with 3 replicas:
 {code}
 TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
 ISR: [1,2,3]
 {code} 
 # controller works on broker 3
 # let the kafka port be 9092. We execute on broker 1:
 {code}
 iptables -A INPUT -p tcp --dport 9092 -j REJECT
 {code}
 # Initiate replica election
 # As a result:
 Broker 1:
 {code}
 TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
 ISR: [1,2,3]
 {code}
 Broker 2:
 {code}
 TopicAndPartition: [partition,0]Leader: 2   Replicas: [2,1,3]   
 ISR: [1,2,3]
 {code}
 # Flush the iptables rules on broker 1
 Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not 
 receive new data. A consumer can read data from replica-1 or replica-2. When 
 it reads from replica-1 it resets the offsets and than can read duplicates 
 from replica-2.
 We saw this situation in our production cluster when it had network problems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1792) change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments

2015-02-07 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310907#comment-14310907
 ] 

Neha Narkhede commented on KAFKA-1792:
--

[~Dmitry Pekar] The last discussion on the KIP suggested several improvements 
to the KIP itself. So I think we should maybe continue the discussion on the 
mailing list thread until we agree on what's in the scope or not. As I 
mentioned earlier, it is better to know how the overall tool will work 
eventually even if we split the work into multiple patches. Would you like to 
pick up the suggestions made and reply on the thread?

 change behavior of --generate to produce assignment config with fair replica 
 distribution and minimal number of reassignments
 -

 Key: KAFKA-1792
 URL: https://issues.apache.org/jira/browse/KAFKA-1792
 Project: Kafka
  Issue Type: Sub-task
  Components: tools
Reporter: Dmitry Pekar
Assignee: Dmitry Pekar
 Fix For: 0.8.3

 Attachments: KAFKA-1792.patch, KAFKA-1792_2014-12-03_19:24:56.patch, 
 KAFKA-1792_2014-12-08_13:42:43.patch, KAFKA-1792_2014-12-19_16:48:12.patch, 
 KAFKA-1792_2015-01-14_12:54:52.patch, KAFKA-1792_2015-01-27_19:09:27.patch, 
 generate_alg_tests.txt, rebalance_use_cases.txt


 Current implementation produces fair replica distribution between specified 
 list of brokers. Unfortunately, it doesn't take
 into account current replica assignment.
 So if we have, for instance, 3 brokers id=[0..2] and are going to add fourth 
 broker id=3, 
 generate will create an assignment config which will redistribute replicas 
 fairly across brokers [0..3] 
 in the same way as those partitions were created from scratch. It will not 
 take into consideration current replica 
 assignment and accordingly will not try to minimize number of replica moves 
 between brokers.
 As proposed by [~charmalloc] this should be improved. New output of improved 
 --generate algorithm should suite following requirements:
 - fairness of replica distribution - every broker will have R or R+1 replicas 
 assigned;
 - minimum of reassignments - number of replica moves between brokers will be 
 minimal;
 Example.
 Consider following replica distribution per brokers [0..3] (we just added 
 brokers 2 and 3):
 - broker - 0, 1, 2, 3 
 - replicas - 7, 6, 0, 0
 The new algorithm will produce following assignment:
 - broker - 0, 1, 2, 3 
 - replicas - 4, 3, 3, 3
 - moves - -3, -3, +3, +3
 It will be fair and number of moves will be 6, which is minimal for specified 
 initial distribution.
 The scope of this issue is:
 - design an algorithm matching the above requirements;
 - implement this algorithm and unit tests;
 - test it manually using different initial assignments;



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310912#comment-14310912
 ] 

Jay Kreps commented on KAFKA-1646:
--

Hey guys if this forces full recovery the impact on startup time will be 
considerable if you have a large number of partitions.

Say you have 2000 partitions per machine and a 1GB log segment file size. On 
average these files will have about 500MB per partition when a restart occurs. 
The result is running recovery on 2000 * 500MB = 1TB of data. This will take 
about  5.5 hours at 50MB/sec.

[~qixia] not sure how the above reasoning compares to your test?

I think this would be a blocker issue, no?

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646_20141216_163008.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1831) Producer does not provide any information about which host the data was sent to

2015-02-07 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310915#comment-14310915
 ] 

Neha Narkhede commented on KAFKA-1831:
--

[~markap14] The producer has a client.id and the server logs which produce 
request it received from which client.id in the request log. This should 
suffice to get you the information you need. Or am I missing something?

 Producer does not provide any information about which host the data was sent 
 to
 ---

 Key: KAFKA-1831
 URL: https://issues.apache.org/jira/browse/KAFKA-1831
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.1.1
Reporter: Mark Payne
Assignee: Jun Rao

 For traceability purposes and for troubleshooting, when sending data to 
 Kafka, the Producer should provide information about which host the data was 
 sent to. This works well already in the SimpleConsumer, which provides host() 
 and port() methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerOffsetChecker

2015-02-07 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310918#comment-14310918
 ] 

Neha Narkhede commented on KAFKA-313:
-

[~singhashish] We are moving to a centralized ConsumerCommand in KAFKA-1476 
that will replace all other consumer admin tools. Can you review to see if the 
tool has the options you are looking for? Going forward, I think we may want to 
improve that and phase out the current tools.

 Add JSON/CSV output and looping options to ConsumerOffsetChecker
 

 Key: KAFKA-313
 URL: https://issues.apache.org/jira/browse/KAFKA-313
 Project: Kafka
  Issue Type: Improvement
Reporter: Dave DeMaagd
Assignee: Ashish Kumar Singh
Priority: Minor
  Labels: newbie, patch
 Fix For: 0.8.3

 Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, 
 KAFKA-313.patch


 Adds:
 * '--loop N' - causes the program to loop forever, sleeping for up to N 
 seconds between loops (loop time minus collection time, unless that's less 
 than 0, at which point it will just run again immediately)
 * '--asjson' - display as a JSON string instead of the more human readable 
 output format.
 Neither of the above  depend on each other (you can loop in the human 
 readable output, or do a single shot execution with JSON output).  Existing 
 behavior/output maintained if neither of the above are used.  Diff Attached.
 Impacted files:
 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-07 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1805:
-
Reviewer: Guozhang Wang

[~guozhang] Since you started looking at the patch, would you like to shepherd 
this patch to check in?

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1865) Investigate adding a flush() call to new producer API

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1865:
-
Assignee: Jay Kreps
  Status: Patch Available  (was: Open)

 Investigate adding a flush() call to new producer API
 -

 Key: KAFKA-1865
 URL: https://issues.apache.org/jira/browse/KAFKA-1865
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1865.patch


 The postconditions of this would be that any record enqueued prior to flush() 
 would have completed being sent (either successfully or not).
 An open question is whether you can continue sending new records while this 
 call is executing (on other threads).
 We should only do this if it doesn't add inefficiencies for people who don't 
 use it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1865) Investigate adding a flush() call to new producer API

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310927#comment-14310927
 ] 

Jay Kreps commented on KAFKA-1865:
--

Created reviewboard https://reviews.apache.org/r/30763/diff/
 against branch trunk

 Investigate adding a flush() call to new producer API
 -

 Key: KAFKA-1865
 URL: https://issues.apache.org/jira/browse/KAFKA-1865
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
 Attachments: KAFKA-1865.patch


 The postconditions of this would be that any record enqueued prior to flush() 
 would have completed being sent (either successfully or not).
 An open question is whether you can continue sending new records while this 
 call is executing (on other threads).
 We should only do this if it doesn't add inefficiencies for people who don't 
 use it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1865) Add a flush() call to the new producer API

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1865:
-
Summary: Add a flush() call to the new producer API  (was: Investigate 
adding a flush() call to new producer API)

 Add a flush() call to the new producer API
 --

 Key: KAFKA-1865
 URL: https://issues.apache.org/jira/browse/KAFKA-1865
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1865.patch


 The postconditions of this would be that any record enqueued prior to flush() 
 would have completed being sent (either successfully or not).
 An open question is whether you can continue sending new records while this 
 call is executing (on other threads).
 We should only do this if it doesn't add inefficiencies for people who don't 
 use it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1865) Investigate adding a flush() call to new producer API

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1865:
-
Attachment: KAFKA-1865.patch

 Investigate adding a flush() call to new producer API
 -

 Key: KAFKA-1865
 URL: https://issues.apache.org/jira/browse/KAFKA-1865
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
 Attachments: KAFKA-1865.patch


 The postconditions of this would be that any record enqueued prior to flush() 
 would have completed being sent (either successfully or not).
 An open question is whether you can continue sending new records while this 
 call is executing (on other threads).
 We should only do this if it doesn't add inefficiencies for people who don't 
 use it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1447) Controlled shutdown deadlock when trying to send state updates

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310948#comment-14310948
 ] 

Jay Kreps commented on KAFKA-1447:
--

Does this problem still exist?

 Controlled shutdown deadlock when trying to send state updates
 --

 Key: KAFKA-1447
 URL: https://issues.apache.org/jira/browse/KAFKA-1447
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.8.0
Reporter: Sam Meder
Priority: Critical
  Labels: newbie++

 We're seeing controlled shutdown indefinitely stuck on trying to send out 
 state change messages to the other brokers:
 [2014-05-03 04:01:30,580] INFO [Socket Server on Broker 4], Shutdown 
 completed (kafka.network.SocketServer)
 [2014-05-03 04:01:30,581] INFO [Kafka Request Handler on Broker 4], shutting 
 down (kafka.server.KafkaRequestHandlerPool)
 and stuck on:
 kafka-request-handler-12 daemon prio=10 tid=0x7f1f04a66800 nid=0x6e79 
 waiting on condition [0x7f1ad5767000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 parking to wait for 0x00078e91dc20 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
 at 
 kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
 locked 0x00078e91dc38 (a java.lang.Object)
 at kafka.controller.KafkaController.sendRequest(KafkaController.scala:655)
 at 
 kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:298)
 at 
 kafkler.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
 at scala.collection.Iterator$class.foreach(Iterator.scala:772)
 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
 at 
 kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:290)
 at 
 kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:97)
 at 
 kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:269)
 at 
 kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:253)
 at scala.Option.foreach(Option.scala:197)
 at 
 kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply$mcV$sp(KafkaController.scala:253)
 at 
 kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
 at 
 kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
 at kafka.utils.Utils$.inLock(Utils.scala:538)
 at 
 kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:252)
 at 
 kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:249)
 at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130)
 at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
 at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
 at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
 at kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:249)
 locked 0x00078b495af0 (a java.lang.Object)
 at kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:264)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:192)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
 at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310958#comment-14310958
 ] 

Jay Kreps commented on KAFKA-1718:
--

[~junrao], [~guozhang] is this still ongoing?

 Message Size Too Large error when only small messages produced with Snappy
 

 Key: KAFKA-1718
 URL: https://issues.apache.org/jira/browse/KAFKA-1718
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Evan Huus
Priority: Critical

 I'm the primary author of the Go bindings, and while I originally received 
 this as a bug against my bindings, I'm coming to the conclusion that it's a 
 bug in the broker somehow.
 Specifically, take a look at the last two kafka packets in the following 
 packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
 will need a trunk build of Wireshark to fully decode the kafka part of the 
 packets).
 The produce request contains two partitions on one topic. Each partition has 
 one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
 message set is a sequential collection of snappy-compressed messages, each 
 message of size 46899. When uncompressed, each message contains a message set 
 of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
 However, the broker responds to this with a MessageSizeTooLarge error, full 
 stacktrace from the broker logs being:
 kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
 which exceeds the maximum configured message size of 112.
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
   at kafka.log.Log.append(Log.scala:265)
   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
   at 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
   at java.lang.Thread.run(Thread.java:695)
 Since as far as I can tell none of the sizes in the actual produced packet 
 exceed the defined maximum, I can only assume that the broker is 
 miscalculating something somewhere and throwing the exception improperly.
 ---
 This issue can be reliably reproduced using an out-of-the-box binary download 
 of 0.8.1.1 and the following gist: 
 https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
 the `producer-ng` branch of the Sarama library).
 ---
 I am happy to provide any more information you might need, or to do relevant 
 experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1745.
--
Resolution: Won't Fix

Seems to be a problem in Java.

 Each new thread creates a PIPE and KQUEUE as open files during 
 producer.send() and does not get cleared when the thread that creates them is 
 cleared.
 -

 Key: KAFKA-1745
 URL: https://issues.apache.org/jira/browse/KAFKA-1745
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: Mac OS Mavericks
Reporter: Vishal
Priority: Critical

 Hi,
 I'm using the java client API for Kafka. I wanted to send data to Kafka 
 by using a producer pool as I'm using a sync producer. The thread that sends 
 the data is from the thread pool that grows and shrinks depending on the 
 usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
 created (got this info by using lsof). If I keep using the same thread it's 
 fine but when a new thread sends data to Kafka (using producer.send() ) a new 
 KQUEUE and 2 PIPEs are created.
 This is okay, but when the thread is cleared from the thread pool and a new 
 thread is created, then new KQUEUEs and PIPEs are created. The problem is 
 that the old ones which were created are not getting destroyed and they are 
 showing up as open files. This is causing a major problem as the number of 
 open file keep increasing and does not decrease.
 Please suggest any solutions.
 FYI, the number of TCP connections established from the producer system to 
 the Kafka Broker remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1657) Fetch request using Simple consumer fails due to failed due to Leader not local for partition

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1657.
--
Resolution: Won't Fix

I think the ultimate solution here is to move to the new consumer which should 
fix all this.

 Fetch request using Simple consumer fails due to failed due to Leader not 
 local for partition
 -

 Key: KAFKA-1657
 URL: https://issues.apache.org/jira/browse/KAFKA-1657
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: aarti gupta

 I have a three node Kafka cluster, running on the same physical machine, (on 
 different ports)
  with replication factor = 3, and a single topic with 3 partitions.
 Multiple producers write to the topic, and a custom partitioner is used to 
 direct messages to a given partition.
 I use the simple consumer to read from a given partition of the topic, and 
 have three instances of my consumer running
 The code snippet for the simple consumer suggests, that having any node in 
 the cluster, (not necessarily the leader for that partition) is sufficient to 
 find the leader for the partition, however, on running this, I find, that 
 given a different node in the cluster, a null pointer exception is thrown, 
 and the logs show the error
 [2014-09-28 20:40:20,984] WARN [KafkaApi-1] Fetch request with correlation id 
 0 from client testClient on partition [VCCTask,1] failed due to Leader not 
 local for partition [VCCTask,1] on broker 1 (kafka.server.KafkaApis)
 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic VCCTask
 Topic:VCCTask PartitionCount:3ReplicationFactor:3 Configs:
   Topic: VCCTask  Partition: 0Leader: 1   Replicas: 2,3,1 Isr: 
 1,2,3
   Topic: VCCTask  Partition: 1Leader: 1   Replicas: 3,1,2 Isr: 
 1,2,3
   Topic: VCCTask  Partition: 2Leader: 1   Replicas: 1,2,3 Isr: 
 1,2,3
 If i specify the leader for the partition, instead of any node in the 
 cluster, everything works great, but this is an operational nightmare.
 I was able to reproduce this using a simple test, where a producer writes 
 numbers from 1 to 99, and the consumers, consume from a specific 
 partition.
 Here are the code snippets
 public class TestConsumerStoreOffsetZookeeper {
 public static void main(String[] args) throws JSONException {
 TestConsumerStoreOffsetZookeeper testConsumer = new 
 TestConsumerStoreOffsetZookeeper();
 JSONObject jsonObject = new JSONObject();
 jsonObject.put(topicName, VCCTask);
 jsonObject.put(clientName, testClient);
 jsonObject.put(partition, args[0]);
 jsonObject.put(hostPort, 172.16.78.171);
 jsonObject.put(znodeName, VCCTask);
 jsonObject.put(port, args[1]);
 testConsumer.initialize(jsonObject);
 final long startTime = System.currentTimeMillis();
 testConsumer.startReceiving(new FutureCallbackbyte[]() {
 int noOfMessagesConsumed= 0;
 @Override
 public void onSuccess(byte[] result) {
 LOG.info(YES!!  + ByteBuffer.wrap(result).getLong());
 ++noOfMessagesConsumed;
 LOG.info(# Messages consumed + noOfMessagesConsumed + Time 
 elapsed+ (System.currentTimeMillis()-startTime )/1000 + seconds);
 }
 @Override
 public void onFailure(Throwable t) {
 LOG.info(NO!!  + t.fillInStackTrace().getMessage());
 }
 });
 }
 private String topicToRead;
 private static Logger LOG = 
 Logger.getLogger(TestConsumerStoreOffsetZookeeper);
 ListString seedBrokers = Lists.newArrayList(localhost);
 private int port;
 private SimpleConsumer consumer;
 Integer partition;
 String clientName;
 private Broker currentLeader;
 private String counter;
 CuratorFramework zooKeeper;
 public void startReceiving(final FutureCallbackbyte[] futureCallback) {
 findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead, 
 partition);
 LOG.info(Kafka consumer delegate listening on topic  + topicToRead 
 +  and partition  + partition);
 int numErrors = 0;
 while (true) {
 long latestOffset = 0;
 Stat stat = null;
 final String path = / + topicToRead + /+partition;
 try {
 //Read top of the
 stat = zooKeeper.checkExists().forPath(path);
 if (stat == null) {
 latestOffset = 
 getLastOffsetFromBeginningOfStream(this.consumer, topicToRead, partition, 
 OffsetRequest.EarliestTime(), clientName);
 byte b[] = new byte[8];
 ByteBuffer byteBuffer = ByteBuffer.wrap(b);

[jira] [Commented] (KAFKA-1758) corrupt recovery file prevents startup

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310991#comment-14310991
 ] 

Jay Kreps commented on KAFKA-1758:
--

This is actually not a very difficult change--in LogManager.loadLogs we would 
need to basically handle an error in reading the recovery checkpoint, log it, 
and then just start a full recovery.

 corrupt recovery file prevents startup
 --

 Key: KAFKA-1758
 URL: https://issues.apache.org/jira/browse/KAFKA-1758
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg

 Hi,
 We recently had a kafka node go down suddenly. When it came back up, it 
 apparently had a corrupt recovery file, and refused to startup:
 {code}
 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error starting up 
 KafkaServer
 java.lang.NumberFormatException: For input string: 
 ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
 ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
 at 
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
 at java.lang.Integer.parseInt(Integer.java:481)
 at java.lang.Integer.parseInt(Integer.java:527)
 at 
 scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
 at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
 at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at 
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at kafka.log.LogManager.loadLogs(LogManager.scala:105)
 at kafka.log.LogManager.init(LogManager.scala:57)
 at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
 {code}
 And the app is under a monitor (so it was repeatedly restarting and failing 
 with this error for several minutes before we got to it)…
 We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and it 
 then restarted cleanly (but of course re-synced all it’s data from replicas, 
 so we had no data loss).
 Anyway, I’m wondering if that’s the expected behavior? Or should it not 
 declare it corrupt and then proceed automatically to an unclean restart?
 Should this NumberFormatException be handled a bit more gracefully?
 We saved the corrupt file if it’s worth inspecting (although I doubt it will 
 be useful!)….
 The corrupt files appeared to be all zeroes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-402) Adding handling various kind of exception support at server side

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-402.
-
Resolution: Won't Fix

 Adding handling various kind of exception support at server side
 

 Key: KAFKA-402
 URL: https://issues.apache.org/jira/browse/KAFKA-402
 Project: Kafka
  Issue Type: Task
Affects Versions: 0.8.0
Reporter: Yang Ye
Assignee: Yang Ye

 Currently at server side handlers, some exceptions are not caught and 
 handled. There're chances where servers may be brought down due to some 
 uncaught exception. We need to catch and handle them at the server. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-410) after been killed, one broker fail to re-join the cluster after restarted

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-410.
-
Resolution: Cannot Reproduce

 after been killed, one broker fail to re-join the cluster after restarted
 -

 Key: KAFKA-410
 URL: https://issues.apache.org/jira/browse/KAFKA-410
 Project: Kafka
  Issue Type: Bug
  Components: core
 Environment: the version of kafka we use is trunk 2a59ad76c6
Reporter: mmliu

 In my kafka cluster ,there are 2 brokers,
 One of them was killed by accident yesterday, after that ,I tried to restart 
 the broker,but seems that it failed to join the cluster.
 In zookeeper,it successfully register itself,but after sending logs, no data 
 show up in kafka data directory
 This is the log of the broker after restart it:
 [2012-07-21 17:34:27,807] DEBUG preRegister called. 
 Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, 
 name=kafka:type=kafka.KafkaLog4j (root)
 [2012-07-21 17:34:27,808] DEBUG Adding AppenderMBean for appender named 
 fileAppender (org.apache.log4j.jmx.LoggerDynamicMBean)
 [2012-07-21 17:34:27,810] DEBUG getMBeanInfo called. 
 (org.apache.log4j.jmx.AppenderDynamicMBean)
 [2012-07-21 17:34:27,810] DEBUG preRegister called. 
 Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, 
 name=log4j:appender=fileAppender (org.apache.log4j.jmx.AppenderDynamicMBean)
 [2012-07-21 17:34:27,810] DEBUG Adding 
 LayoutMBean:fileAppender,layout=org.apache.log4j.PatternLayout 
 (org.apache.log4j.jmx.AppenderDynamicMBean)
 [2012-07-21 17:34:27,811] DEBUG getMBeanInfo called. 
 (org.apache.log4j.jmx.LayoutDynamicMBean)
 [2012-07-21 17:34:27,811] DEBUG preRegister called. 
 Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, 
 name=log4j:appender=fileAppender,layout=org.apache.log4j.PatternLayout 
 (org.apache.log4j.jmx.LayoutDynamicMBean)
 [2012-07-21 17:34:27,967] INFO The number of partitions for topic  no_appkey 
 : 1 (kafka.utils.Utils$)
 [2012-07-21 17:34:27,969] INFO The number of partitions for topic   
 parse_exception : 1 (kafka.utils.Utils$)
 [2012-07-21 17:34:27,972] INFO Starting Kafka server... 
 (kafka.server.KafkaServer)
 [2012-07-21 17:34:27,984] INFO starting log cleaner every 6 ms 
 (kafka.log.LogManager)
 [2012-07-21 17:34:27,990] INFO connecting to ZK: XX.XX.XX.229:2181/kafka 
 (kafka.server.KafkaZooKeeper)
 [2012-07-21 17:34:27,999] DEBUG Creating new ZookKeeper instance to connect 
 to XX.XX.XX.229:2181/kafka. (org.I0Itec.zkclient.ZkConnection)
 [2012-07-21 17:34:27,999] INFO Starting ZkClient event thread. 
 (org.I0Itec.zkclient.ZkEventThread)
 [2012-07-21 17:34:28,006] INFO Client 
 environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 
 (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client environment:host.name=mobile-1 
 (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client environment:java.version=1.6.0_17 
 (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client environment:java.vendor=Sun 
 Microsystems Inc. (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client 
 environment:java.home=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre 
 (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client 
 environment:java.class.path=:/home/Our_Server/kafka/libs/jopt-simple-3.2.jar:/home/Our_Server/kafka/libs/log4j-1.2.15.jar:/home/Our_Server/kafka/libs/scala-compiler.jar:/home/Our_Server/kafka/libs/scala-library.jar:/home/Our_Server/kafka/libs/snappy-java-1.0.4.1.jar:/home/Our_Server/kafka/libs/zkclient-0.1.jar:/home/Our_Server/kafka/libs/zookeeper-3.3.4.jar:/home/Our_Server/kafka/kafka-trunk-2a59ad76c6_scala-2.8.0.jar
  (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client 
 environment:java.library.path=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/lib/amd64/server:/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/lib/amd64:/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
  (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client environment:java.io.tmpdir=/tmp 
 (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client environment:java.compiler=NA 
 (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client environment:os.name=Linux 
 (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client environment:os.arch=amd64 
 (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client 
 environment:os.version=2.6.32-71.29.1.el6.x86_64 
 (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client environment:user.name=Our_Server 
 (org.apache.zookeeper.ZooKeeper)
 [2012-07-21 17:34:28,006] INFO Client environment:user.home=/home/Our_Server 

[jira] [Resolved] (KAFKA-398) Enhance SocketServer to Enable Sending Requests

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-398.
-
Resolution: Won't Fix

 Enhance SocketServer to Enable Sending Requests
 ---

 Key: KAFKA-398
 URL: https://issues.apache.org/jira/browse/KAFKA-398
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.1
Reporter: Guozhang Wang
  Labels: features
 Attachments: kafka-398-0.7-v1.patch, kafka-398-0.7-v2.patch


 Currently the SocketServer is only used for reactively receiving requests and 
 send responses but not used for pro-actively send requests and receive 
 responses. Hence it does not need to remember which channel/key correspond to 
 which consumer.
 On the other hand, there are cases such as consumer coordinator that needs 
 SocketServer to send requests and receive responses to the consumers.
 It would be nice to add this functionality such that an API can be called 
 with the id string and the request message, and the SocketServer will figure 
 out which channel to use and write that message to the key's attachment and 
 set the flag as WRITABLE so that the processor can then send it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-462) ZK thread crashing doesn't bring down the broker (and doesn't come back up).

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-462.
-
Resolution: Won't Fix

 ZK thread crashing doesn't bring down the broker (and doesn't come back up).
 

 Key: KAFKA-462
 URL: https://issues.apache.org/jira/browse/KAFKA-462
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7
Reporter: Matt Jones

 I think the simplest explanation is the traceback. The broker had been up 
 starting at 2012-07-31 18:45:42,951 (based upon the 'Starting Kafka server' 
 log entry), and the error was fixed with a restart of the broker at 
 2012-08-14 20:59:41,581.
 It looks like zookeeper thread crashed, but the broker kept operating as 
 usual. The expected behavior would be that the zookeeper thread crashing 
 would cause the whole broker to crash, or the zookeeper thread would start 
 itself back up.
 [2012-08-08 01:25:13,398] 624270894 [main-SendThread(zookeeper001:2181)] INFO 
  org.apache.zookeeper.ClientCnxn  - Client session timed out, have not heard 
 from server in 8749ms for sessionid 0x138e4edc04c1e50, closing socket 
 connection and attempting reconnect
  [2012-08-08 01:25:15,136] 624272632 [main-EventThread] INFO  
 org.I0Itec.zkclient.ZkClient  - zookeeper state changed (Disconnected)
  [2012-08-08 01:25:15,702] 624273198 [main-SendThread(zookeeper001:2181)] 
 INFO  org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 
 zookeeper003/10.125.95.193:2181
  [2012-08-08 01:25:15,704] 624273200 [main-SendThread(zookeeper003:2181)] 
 INFO  org.apache.zookeeper.ClientCnxn  - Socket connection established to 
 zookeeper003/10.125.95.193:2181, initiating session
  [2012-08-08 01:25:15,709] 624273205 [main-EventThread] INFO  
 org.I0Itec.zkclient.ZkClient  - zookeeper state changed (Expired)
  [2012-08-08 01:25:15,709] 624273205 [main-EventThread] INFO  
 org.apache.zookeeper.ZooKeeper  - Initiating client connection, 
 connectString=zookeeper001:2181,zookeeper002:2181,zookeeper003:2181 
 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@26d66426
  [2012-08-08 01:25:21,514] 624279010 [main-SendThread(zookeeper003:2181)] 
 INFO  org.apache.zookeeper.ClientCnxn  - Unable to reconnect to ZooKeeper 
 service, session 0x138e4edc04c1e50 has expired, closing socket connection
  [2012-08-08 01:25:47,135] 624304631 [main-EventThread] ERROR 
 org.apache.zookeeper.ClientCnxn  - Error while calling watcher 
   at 
 org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:530)
   at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
 Caused by: org.I0Itec.zkclient.exception.ZkException: Unable to connect to 
 zookeeper001:2181,zookeeper002:2181,zookeeper003:2181
 Caused by: java.net.UnknownHostException: zookeeper001
   at org.apache.zookeeper.ClientCnxn.init(ClientCnxn.java:386)
   at org.apache.zookeeper.ClientCnxn.init(ClientCnxn.java:331)
   at org.apache.zookeeper.ZooKeeper.init(ZooKeeper.java:377)
 [2012-08-08 01:25:48,620] 624306116 [main-EventThread] INFO  
 org.apache.zookeeper.ClientCnxn  - EventThread shut down



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-478) Move start_consumer start_producer inside start_entity_in_background

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-478.
-
Resolution: Won't Fix

 Move start_consumer  start_producer inside start_entity_in_background
 

 Key: KAFKA-478
 URL: https://issues.apache.org/jira/browse/KAFKA-478
 Project: Kafka
  Issue Type: Sub-task
Affects Versions: 0.8.0
Reporter: John Fung
Assignee: John Fung
  Labels: replication-testing
 Fix For: 0.9.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-558) KafkaETLContext should use getTopicMetadata before sending offset requests

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-558.
-
Resolution: Won't Fix

 KafkaETLContext should use getTopicMetadata before sending offset requests
 --

 Key: KAFKA-558
 URL: https://issues.apache.org/jira/browse/KAFKA-558
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: Joel Koshy
Assignee: Joel Koshy
 Fix For: 0.9.0


 Filing this or I may forget.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1790) Remote controlled shutdown was removed

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1790.
--
Resolution: Won't Fix

 Remote controlled shutdown was removed
 --

 Key: KAFKA-1790
 URL: https://issues.apache.org/jira/browse/KAFKA-1790
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: James Oliver
 Fix For: 0.8.3


 In core:
 kafka.admin.ShutdownBroker was removed, rendering remote controlled shutdowns 
 impossible. 
 A Kafka administrator needs to be able to perform a controlled shutdown 
 without issuing a SIGTERM/SIGKILL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1898) compatibility testing framework

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311039#comment-14311039
 ] 

Jay Kreps commented on KAFKA-1898:
--

[~charmalloc] Can you expand on this ticket, there is no description.

 compatibility testing framework 
 

 Key: KAFKA-1898
 URL: https://issues.apache.org/jira/browse/KAFKA-1898
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
 Fix For: 0.8.3






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-02-07 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-1927:


 Summary: Replace requests in kafka.api with requests in 
org.apache.kafka.common.requests
 Key: KAFKA-1927
 URL: https://issues.apache.org/jira/browse/KAFKA-1927
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps


The common package introduced a better way of defining requests using a new 
protocol definition DSL and also includes wrapper objects for these.

We should switch KafkaApis over to use these request definitions and consider 
the scala classes deprecated (we probably need to retain some of them for a 
while for the scala clients).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1898) compatibility testing framework

2015-02-07 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311098#comment-14311098
 ] 

Joe Stein commented on KAFKA-1898:
--

Updated the description. The part that the client library would be doing is 
consuming from the data it is given (so we need some data sets to use) and a 
set of test scenarios where in they need to produce the right data. The data 
sets and the analysis output would be what the client library is trying to get 
to from a result perspective. Another example/test would be supports 
compression ... in this case the data set would be compressed, they would have 
to read it, make some change (like hash the message) and re-compress that 
message hashed in another topic. The validation could run and see, yup my input 
of List(x) (which was compressed) gave me a result of hash(List(x)) in dest 
topic (and when analyzing you would uncompress to see the value to check one by 
one every message).  It would be hard to cheat since new data could be 
generated each time. We also are looking to use this for some end to end 
latency testing. The goal behind that is to calculate an array of timestamps 
(based on a key/value map) so you can figure out some more internals and things 
about the system e.g. 

{code}

{namespace: ly.stealth.kafka.metrics,
 type: record,
  name: Timings,
   fields: [
{name: id, type: long},
 {name: timings,  type: {type:array, items: long} }
   ]
   }

{code}

 compatibility testing framework 
 

 Key: KAFKA-1898
 URL: https://issues.apache.org/jira/browse/KAFKA-1898
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
 Fix For: 0.8.3

 Attachments: cctk.png


 There are a few different scenarios where you want/need to know the 
 status/state of a client library that works with Kafka. Client library 
 development is not just about supporting the wire protocol but also the 
 implementations around specific interactions of the API.  The API has 
 blossomed into a robust set of producer, consumer, broker and administrative 
 calls all of which have layers of logic above them.  A Client Library may 
 choose to deviate from the path the project sets out and that is ok. The goal 
 of this ticket is to have a system for Kafka that can help to explain what 
 the library is or isn't doing (regardless of what it claims).
 The idea behind this stems in being able to quickly/easily/succinctly analyze 
 the topic message data. Once you can analyze the topic(s) message you can 
 gather lots of information about what the client library is doing, is not 
 doing and such.  There are a few components to this.
 1) dataset-generator 
 Test Kafka dataset generation tool. Generates a random text file with given 
 params:
 --filename, -f - output file name.
 --filesize, -s - desired size of output file. The actual size will always be 
 a bit larger (with a maximum size of $filesize + $max.length - 1)
 --min.length, -l - minimum generated entry length.
 --max.length, -h - maximum generated entry length.
 Usage:
 ./gradlew build
 java -jar dataset-generator/build/libs/dataset-generator-*.jar -s 10 -l 2 
 -h 20
 2) dataset-producer
 Test Kafka dataset producer tool. Able to produce the given dataset to Kafka 
 or Syslog server.  The idea here is you already have lots of data sets that 
 you want to test different things for. You might have different sized 
 messages, formats, etc and want a repeatable benchmark to run and re-run the 
 testing on. You could just have a days worth of data and just choose to 
 replay it.  The CCTK idea is that you are always starting from CONSUME in 
 your state of library. If your library is only producing then you will fail a 
 bunch of tests and that might be ok for people.
 Accepts following params:
 {code}
 --filename, -f - input file name.
 --kafka, -k - Kafka broker address in host:port format. If this parameter is 
 set, --producer.config and --topic must be set too (otherwise they're 
 ignored).
 --producer.config, -p - Kafka producer properties file location.
 --topic, -t - Kafka topic to produce to.
 --syslog, -s - Syslog server address. Format: protocol://host:port 
 (tcp://0.0.0.0:5140 or udp://0.0.0.0:5141 for example)
 --loop, -l - flag to loop through file until shut off manually. False by 
 default.
 Usage:
 ./gradlew build
 java -jar dataset-producer/build/libs/dataset-producer-*.jar --filename 
 dataset --syslog tcp://0.0.0.0:5140 --loop true
 {code}
 3) extract
 This step is good so you can save data and compare tests. It could also be 
 removed if folks are just looking for a real live test (and we could support 
 that too).  Here we are taking data out of Kafka and putting it into 
 Cassandra (but other data stores can be used too and we should come up with 

[jira] [Resolved] (KAFKA-1290) TestLogCleaning tool hangs on the new producer

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1290.
--
Resolution: Fixed

Seems to have fixed itself.

 TestLogCleaning tool hangs on the new producer
 --

 Key: KAFKA-1290
 URL: https://issues.apache.org/jira/browse/KAFKA-1290
 Project: Kafka
  Issue Type: Sub-task
  Components: producer 
Affects Versions: 0.9.0
Reporter: Neha Narkhede
Priority: Blocker

 KAFKA-1281 (yet to be checked in) converted existing tools to use the new 
 producer. I found that TestLogCleaning hangs while sending messages using the 
 new producer. Following is a thread dump and steps to reproduce the issue.
 nnarkhed-mn1:kafka-git-idea nnarkhed$ ./bin/kafka-run-class.sh 
 kafka.TestLogCleaning --broker localhost:9092 --topics 1 --zk localhost:2181 
 --messages 10
 Producing 10 messages...
 Logging produce requests to 
 /var/folders/61/bspy8z8n1t5dn5sdqzsnhbdr000383/T/kafka-log-cleaner-produced-3744326506335955516.txt
 2014-03-04 10:51:35
 Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.65-b04-462 mixed mode):
 kafka-network-thread daemon prio=5 tid=7fc27e94c000 nid=0x10a643000 
 runnable [10a642000]
java.lang.Thread.State: RUNNABLE
   at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
   at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:136)
   at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:69)
   at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
   - locked 7ec0b0170 (a sun.nio.ch.Util$2)
   - locked 7ec0b0180 (a java.util.Collections$UnmodifiableSet)
   - locked 7ec0b0128 (a sun.nio.ch.KQueueSelectorImpl)
   at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
   at org.apache.kafka.common.network.Selector.select(Selector.java:296)
   at org.apache.kafka.common.network.Selector.poll(Selector.java:198)
   at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:153)
   at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:98)
   at java.lang.Thread.run(Thread.java:695)
 RMI TCP Accept-0 daemon prio=5 tid=7fc27e99c800 nid=0x10a43d000 runnable 
 [10a43c000]
java.lang.Thread.State: RUNNABLE
   at java.net.PlainSocketImpl.socketAccept(Native Method)
   at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:439)
   - locked 7ec0b6088 (a java.net.SocksSocketImpl)
   at java.net.ServerSocket.implAccept(ServerSocket.java:468)
   at java.net.ServerSocket.accept(ServerSocket.java:436)
   at 
 sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:34)
   at 
 sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
   at 
 sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
   at java.lang.Thread.run(Thread.java:695)
 AWT-AppKit daemon prio=5 tid=7fc27f83 nid=0x7fff7b984180 runnable 
 []
java.lang.Thread.State: RUNNABLE
 Low Memory Detector daemon prio=5 tid=7fc27e8db000 nid=0x109b3 runnable 
 []
java.lang.Thread.State: RUNNABLE
 C2 CompilerThread1 daemon prio=9 tid=7fc27e8da800 nid=0x109a2d000 waiting 
 on condition []
java.lang.Thread.State: RUNNABLE
 C2 CompilerThread0 daemon prio=9 tid=7fc27e8d9800 nid=0x10992a000 waiting 
 on condition []
java.lang.Thread.State: RUNNABLE
 Signal Dispatcher daemon prio=9 tid=7fc27e8d9000 nid=0x109827000 waiting on 
 condition []
java.lang.Thread.State: RUNNABLE
 Surrogate Locker Thread (Concurrent GC) daemon prio=5 tid=7fc27e002000 
 nid=0x109724000 waiting on condition []
java.lang.Thread.State: RUNNABLE
 Finalizer daemon prio=8 tid=7fc27e8d8000 nid=0x109519000 in Object.wait() 
 [109518000]
java.lang.Thread.State: WAITING (on object monitor)
   at java.lang.Object.wait(Native Method)
   - waiting on 7ec0b23b0 (a java.lang.ref.ReferenceQueue$Lock)
   at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118)
   - locked 7ec0b23b0 (a java.lang.ref.ReferenceQueue$Lock)
   at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134)
   at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:171)
 Reference Handler daemon prio=10 tid=7fc27e8d7800 nid=0x109416000 in 
 Object.wait() [109415000]
java.lang.Thread.State: WAITING (on object monitor)
   at java.lang.Object.wait(Native Method)
   - waiting on 7ec0b4000 (a java.lang.ref.Reference$Lock)
   at java.lang.Object.wait(Object.java:485)
   at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116)
   - locked 7ec0b4000 (a java.lang.ref.Reference$Lock)
 main prio=5 tid=7fc27e000800 nid=0x102159000 in Object.wait() [102158000]
java.lang.Thread.State: 

[jira] [Updated] (KAFKA-1404) Close unused log file

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1404:
-
Priority: Major  (was: Critical)

 Close unused log file
 -

 Key: KAFKA-1404
 URL: https://issues.apache.org/jira/browse/KAFKA-1404
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.1
Reporter: Xinyao Hu
   Original Estimate: 336h
  Remaining Estimate: 336h

 This is somewhat related to KAFKA-1403. 
 One way to hack KAFKA-1403 is to roll a new file in a short period of time. 
 However, this will result in many file descriptors open. Take our application 
 for example, each server hosts about 5k topic-partition, if we roll a new 
 file per hour, we will add ~100k file descriptors per day (I checked only 
 .log is open but not .index which might be pinned in memory). We will run out 
 of 1M file descriptor in about a week. However our disk can host much longer. 
  
 In reality very few of these file descriptors will be used. The most recent 
 fd will be used to append data and the old file descriptor will be used for 
 query. We should provide a parameter like max.num.fds and do LRU to decide 
 which fds should be open. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-674) Clean Shutdown Testing - Log segments checksums mismatch

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-674.
-
Resolution: Invalid

 Clean Shutdown Testing - Log segments checksums mismatch
 

 Key: KAFKA-674
 URL: https://issues.apache.org/jira/browse/KAFKA-674
 Project: Kafka
  Issue Type: Bug
Reporter: John Fung
Priority: Critical
  Labels: replication-testing
 Attachments: kafka-674-reproduce-issue-v2.patch, 
 kafka-674-reproduce-issue.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-723) Scala's default case class toString() is very inefficient

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310965#comment-14310965
 ] 

Jay Kreps commented on KAFKA-723:
-

[~nehanarkhede] I think we did this, no?

 Scala's default case class toString() is very inefficient
 -

 Key: KAFKA-723
 URL: https://issues.apache.org/jira/browse/KAFKA-723
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.0
Reporter: Neha Narkhede
Priority: Critical

 Request logging is in the critical path of processing requests and we use 
 Scala's default toString() API to log the requests. We should override the 
 toString() in these case classes and log only what is useful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1758) corrupt recovery file prevents startup

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310991#comment-14310991
 ] 

Jay Kreps edited comment on KAFKA-1758 at 2/7/15 11:13 PM:
---

This is actually not a very difficult change--in LogManager.loadLogs we would 
need to basically handle an error in reading the recovery checkpoint, log it, 
and then just treat it as though our recovery point was 0 (or something like 
that) for all logs.


was (Author: jkreps):
This is actually not a very difficult change--in LogManager.loadLogs we would 
need to basically handle an error in reading the recovery checkpoint, log it, 
and then just start a full recovery.

 corrupt recovery file prevents startup
 --

 Key: KAFKA-1758
 URL: https://issues.apache.org/jira/browse/KAFKA-1758
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg

 Hi,
 We recently had a kafka node go down suddenly. When it came back up, it 
 apparently had a corrupt recovery file, and refused to startup:
 {code}
 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error starting up 
 KafkaServer
 java.lang.NumberFormatException: For input string: 
 ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
 ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
 at 
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
 at java.lang.Integer.parseInt(Integer.java:481)
 at java.lang.Integer.parseInt(Integer.java:527)
 at 
 scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
 at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
 at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at 
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at kafka.log.LogManager.loadLogs(LogManager.scala:105)
 at kafka.log.LogManager.init(LogManager.scala:57)
 at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
 {code}
 And the app is under a monitor (so it was repeatedly restarting and failing 
 with this error for several minutes before we got to it)…
 We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and it 
 then restarted cleanly (but of course re-synced all it’s data from replicas, 
 so we had no data loss).
 Anyway, I’m wondering if that’s the expected behavior? Or should it not 
 declare it corrupt and then proceed automatically to an unclean restart?
 Should this NumberFormatException be handled a bit more gracefully?
 We saved the corrupt file if it’s worth inspecting (although I doubt it will 
 be useful!)….
 The corrupt files appeared to be all zeroes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1072) Allow mulitple topics selected with a TopicFilter to be balanced among consumers

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1072.
--
Resolution: Fixed

I think this is solved with the partition.assignment.strategy option in 0.8.2

 Allow mulitple topics selected with a TopicFilter to be balanced among 
 consumers
 

 Key: KAFKA-1072
 URL: https://issues.apache.org/jira/browse/KAFKA-1072
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: Jason Rosenberg
 Fix For: 0.9.0


 Currently, there is no parallelism used when consuming a set of topics 
 selected by a white list topic filter, if those topics all have a partition 
 count of 1.  Currently, all topics that match the filter get assigned to the 
 same thread on the same consumer, even though there may be plenty of 
 different topics (and therefore many partitions to be consumed from).
 There are often good reasons to use a partition count of only 1 (e.g. to 
 preserve message ordering).  For arbitrary scalability, over a large number 
 of topics, this would be a great benefit to be able to consume topics 
 balanced over a set of available consumers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1044) change log4j to slf4j

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1044:
-
Assignee: (was: Jay Kreps)

 change log4j to slf4j 
 --

 Key: KAFKA-1044
 URL: https://issues.apache.org/jira/browse/KAFKA-1044
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8.0
Reporter: sjk
 Fix For: 0.9.0


 can u chanage the log4j to slf4j, in my project, i use logback, it's conflict 
 with log4j.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-839) Race condition between flushInterval and flushSchedulerInterval

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-839.
-
Resolution: Won't Fix

 Race condition between flushInterval and flushSchedulerInterval
 ---

 Key: KAFKA-839
 URL: https://issues.apache.org/jira/browse/KAFKA-839
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.7
Reporter: Jason Rosenberg

 It looks like there is a race condition between the settings for the 2
 properties:  log.default.flush.scheduler.interval.ms 
 log.default.flush.interval.ms.  I'm using 0.7.2.
 By default, both of these get set to 3000ms (and in the docs, it
 recommends setting flushInterval to be a multiple of the
 flushSchedulerInterval).
 However, the code in LogManager.flushAllLogs (which is scheduled to
 run at a fixed rate using the flushSchedulerInterval property) looks
 like this:
 val timeSinceLastFlush = System.currentTimeMillis - 
 log.getLastFlushedTime
 var logFlushInterval = config.defaultFlushIntervalMs
 
 
 if(timeSinceLastFlush = logFlushInterval)
   log.flush
 So, it will only flush logs if the the time since the last flush is
 longer than the flush interval.   But, the log.lastFlushedTime is not
 set until after flushing is completed (which can incur some io time).
 Thus, by enabling TRACE logging for this method, I was able to see
 that with the defaults, timeSinceLastFlush was usually about 2998
 (which is less than the logFlushInterval of 3000).  Thus, setting a
 flushInterval the same as the scheduler.flushInterval essentially
 devolves to an effective flushInterval = 2X the
 schedulerFlushInterval.
 So, setting a flushIinterval slightly less than the
 flushSchedulerInterval (e.g. 2500) will guarantee that the flush will
 happen on each scheduler invocation.
 I'm guessing that it might make sense to change the logic gating the
 flush to something like:
   if(timeSinceLastFlush = 0.90 * logFlushInterval)
 might be reasonable.  Also, the scheduler probably ought to use a
 'fixedDelay' rather than a 'fixedRate' schedule.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-841) Merge the partition and replica state machines into a single unified state machine in the controller

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311021#comment-14311021
 ] 

Jay Kreps commented on KAFKA-841:
-

[~nehanarkhede] is this still a consideration?

 Merge the partition and replica state machines into a single unified state 
 machine in the controller
 

 Key: KAFKA-841
 URL: https://issues.apache.org/jira/browse/KAFKA-841
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1
Reporter: Neha Narkhede
Assignee: Neha Narkhede

 After using the controller for a while as part of 0.8, I think it might be 
 worth looking into merging the separate state machines into a single unified 
 one. The reason is most events end up invoking state transitions on both 
 partitions and replicas. Initially, the thought of separating the two was to 
 handle cases which only touch one state machine (for example, changing the 
 replication factor or changing the number of partitions online). However, 
 these features also would end up touching both state machines. The complexity 
 comes from the correct ordering that one has to ensure between the operations 
 on both these state machines (KAFKA-831). Due to this, some state transitions 
 are unable to batch RPCs and/or zookeeper writes/reads since they need to 
 happen in each of the state machines separately. Since this will 
 significantly change controller code, I'm filing this for 0.8.1



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-182) Set a TCP connection timeout for the SimpleConsumer

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-182.
-
Resolution: Fixed

 Set a TCP connection timeout for the SimpleConsumer
 ---

 Key: KAFKA-182
 URL: https://issues.apache.org/jira/browse/KAFKA-182
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps

 Currently we use SocketChannel.open which I *think* can block for a long 
 time. We should make this configurable, and we may have to create the socket 
 in a different way to enable this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-874) Migration tool's system tests should use separate server.properties files for 0.7 and 0.8 brokers

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-874.
-
Resolution: Won't Fix

 Migration tool's system tests should use separate server.properties files for 
 0.7 and 0.8 brokers
 -

 Key: KAFKA-874
 URL: https://issues.apache.org/jira/browse/KAFKA-874
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: Swapnil Ghike
Assignee: John Fung
  Labels: bugs

 Currently the migration tool's system test suite is using 0.8 broker's 
 server.properties as default. When the test brings up an 0.7 broker, it gets 
 the appropriate configs like brokerid via addedCSVConfigs in 
 system_test/utils/kafka_system_test_utils.py. I suspect that for 0.7 broker, 
 the zk.connectiontimeout.ms will also not be overridden.
 We should rather use two separate server.properties files for 0.7 and 0.8 
 brokers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-875) System test cleanup

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-875.
-
Resolution: Won't Fix

 System test cleanup
 ---

 Key: KAFKA-875
 URL: https://issues.apache.org/jira/browse/KAFKA-875
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: Swapnil Ghike
Assignee: John Fung
  Labels: cleanup

 There is a lot of unused stuff lying in system_test. If we have converted all 
 the bash tests to python, we should delete the unused code.
 Also there is commented code at a few places that should be deleted if it is 
 not going to be used. For instance, search for brokerLogCksumDict in 
 system_test/utils/kafka_system_test_utils.py. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-266) Kafka web console

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-266.
-
Resolution: Won't Fix

 Kafka web console
 -

 Key: KAFKA-266
 URL: https://issues.apache.org/jira/browse/KAFKA-266
 Project: Kafka
  Issue Type: New Feature
  Components: contrib
Reporter: Evan Chan
  Labels: project
   Original Estimate: 672h
  Remaining Estimate: 672h

 This issue is created to track a community-contributed Kafka Web UI.
 Here is an initial list of goals:
 - Be able to easily see which brokers are up
 - Be able to see lists of topics, connected producers, consumer groups, 
 connected consumers
 - Be able to see, for each consumer/partition, its offset, and more 
 importantly, # of bytes unconsumed (== largest offset for partition - current 
 offset)
 - (Wish list) have a graphical view of the offsets
 - (Wish list) be able to clean up consumer state, such as stale claimed 
 partitions
 List of challenges/questions:
 - Which framework?  Play! for Scala?
 - Is all the data available from JMX and ZK?  Hopefully, watching the files 
 on the filesystem can be avoided
 - How to handle large numbers of topics, partitions, consumers, etc. 
 efficiently



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1927:
-
Description: 
The common package introduced a better way of defining requests using a new 
protocol definition DSL and also includes wrapper objects for these.

We should switch KafkaApis over to use these request definitions and consider 
the scala classes deprecated (we probably need to retain some of them for a 
while for the scala clients).

This will be a big improvement because
1. We will have each request now defined in only one place (Protocol.java)
2. We will have built-in support for multi-version requests
3. We will have much better error messages (no more cryptic underflow errors)

  was:
The common package introduced a better way of defining requests using a new 
protocol definition DSL and also includes wrapper objects for these.

We should switch KafkaApis over to use these request definitions and consider 
the scala classes deprecated (we probably need to retain some of them for a 
while for the scala clients).


 Replace requests in kafka.api with requests in 
 org.apache.kafka.common.requests
 ---

 Key: KAFKA-1927
 URL: https://issues.apache.org/jira/browse/KAFKA-1927
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps

 The common package introduced a better way of defining requests using a new 
 protocol definition DSL and also includes wrapper objects for these.
 We should switch KafkaApis over to use these request definitions and consider 
 the scala classes deprecated (we probably need to retain some of them for a 
 while for the scala clients).
 This will be a big improvement because
 1. We will have each request now defined in only one place (Protocol.java)
 2. We will have built-in support for multi-version requests
 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-936) Kafka Metrics Memory Leak

2015-02-07 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311139#comment-14311139
 ] 

Manikumar Reddy commented on KAFKA-936:
---

After KAFKA-1481,  I am observing one issue. We are not removing the newly 
added Version/AppInfo Mbean on old producer/consumer close call.  There is no 
other memory leak. Will submit a patch for this.

 Kafka Metrics Memory Leak 
 --

 Key: KAFKA-936
 URL: https://issues.apache.org/jira/browse/KAFKA-936
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.0
 Environment: centos linux, jdk 1.6, jboss
Reporter: Senthil Chittibabu
Assignee: Neha Narkhede
Priority: Critical

 I am using kafka_2.8.0-0.8.0-SNAPSHOT version. I am running into 
 OutOfMemoryError in PermGen Space. I have set the -XX:MaxPermSize=512m, but I 
 still get the same error. I used profiler to trace the memory leak, and found 
 the following kafka classes to be the cause for the memory leak. Please let 
 me know if you need any additional information to debug this issue. 
 kafka.server.FetcherLagMetrics
 kafka.consumer.FetchRequestAndResponseMetrics
 kafka.consumer.FetchRequestAndResponseStats
 kafka.metrics.KafkaTimer
 kafka.utils.Pool



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1794) Make config and config defaults accessible to clients

2015-02-07 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311142#comment-14311142
 ] 

Guozhang Wang commented on KAFKA-1794:
--

Makes sense.

 Make config and config defaults accessible to clients
 -

 Key: KAFKA-1794
 URL: https://issues.apache.org/jira/browse/KAFKA-1794
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Navina Ramesh

 In the new Kafka producer API, the ProducerConfig is not accessible to the 
 clients. Samza uses the ProducerConfig instance to access the defaults 
 property values, which can then be used in the various helper utils. Config 
 instance is accessible even without instantiating a Kafka producer. 
 With the new API, there is no way to instantiate a ProducerConfig as the 
 constructor is marked private. Also, it does not make the default config 
 values accessible to the client without actually instantiating a 
 KafkaProducer.
 Changes suggested:
 1. Make the ProducerConfig constructor public
 2. Make ConfigDef in ProducerConfig accessible by the client
 3. Use public static variables for kafka config default values 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1930) Move server over to new metrics library

2015-02-07 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311151#comment-14311151
 ] 

Manikumar Reddy commented on KAFKA-1930:


Are we supporting self-documentation in new metrics library? 

Also, Current Kafka community is using various metrics reporting formats. 
In new metrics, currently we only support metrics report via JMX. 
It is required to support more reporting formats, to get community acceptance.

(1) Support metrics report via Ganglia
(2) Support metrics report via Graphite
(3) Support metrics report via CSV
(4) Support metrics report via STDOUT

If ok, I will create sub-tasks.

 Move server over to new metrics library
 ---

 Key: KAFKA-1930
 URL: https://issues.apache.org/jira/browse/KAFKA-1930
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps

 We are using org.apache.kafka.common.metrics on the clients, but using Coda 
 Hale metrics on the server. We should move the server over to the new metrics 
 package as well. This will help to make all our metrics self-documenting.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1314) recurring errors

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1314.
--
Resolution: Won't Fix

Doesn't seem to be under active investigation.

 recurring errors
 

 Key: KAFKA-1314
 URL: https://issues.apache.org/jira/browse/KAFKA-1314
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: alex kamil
Priority: Critical

 we're getting hundreds of these errs with kafka 0.8 and topics become 
 unavailable after running for a few days
 kafka error 1 (had to recreate the topic as it became unavailable after a few 
 days)
 [2014-03-18 16:34:56,403] ERROR [KafkaApi-3] Error while fetchingmetadata for 
 partition [mytopic,0] (kafka.server.KafkaApis)
 kafka.common.LeaderNotAvailableException: Leader not available for partition 
 [mytopic,0]
 kafka error 2
 [2014-03-17 12:23:27,536] ERROR Closing socket for /kafka consumer ip 
 address because of error (kafka.network.Processor)
 kafka.common.KafkaException: Wrong request type 768
 kafka error 3 
 ERROR Closing socket for /kafka consumer ip address because of error 
 (kafka.network.Processor)
 java.io.IOException: Connection reset by peer
 kafka error 4 
  ERROR Closing socket for /kafka broker ip address because of error 
 (kafka.network.Processor)
 zookeeper error 
 2014-03-18 16:40:02,794 [myid:3] - WARN  
 [QuorumPeer[myid=3]/0.0.0.0:2181:QuorumCnxManager@368] - Cannot open channel 
 to 1 at election address /kafka broker ip address:3888
 java.net.ConnectException: Connection refused



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-902) Randomize backoff on the clients for metadata requests

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-902:

Labels: newbie  (was: )

 Randomize backoff on the clients for metadata requests
 --

 Key: KAFKA-902
 URL: https://issues.apache.org/jira/browse/KAFKA-902
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.0
Reporter: Neha Narkhede
Assignee: Neha Narkhede
Priority: Critical
  Labels: newbie

 If a Kafka broker dies and there are a large number of clients talking to the 
 Kafka cluster, each of the clients can end up shooting metadata requests at 
 around the same time. It is better to randomize the backoff on the clients so 
 the metadata requests are more evenly spread out



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1403) Adding timestamp to kafka index structure

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1403.
--
Resolution: Won't Fix

 Adding timestamp to kafka index structure
 -

 Key: KAFKA-1403
 URL: https://issues.apache.org/jira/browse/KAFKA-1403
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.1
Reporter: Xinyao Hu
   Original Estimate: 336h
  Remaining Estimate: 336h

 Right now, kafka doesn't have timestamp per message. It makes an assumption 
 that all the messages in the same file has the same timestamp which is the 
 mtime of the file. This makes it inefficient to scan all the messages within 
 a time window, which is a valid use case in a lot of realtime data analysis. 
 One way to hack this is to roll a new file in a short period of time. 
 However, this will result in opening lots of files (KAFKA-1404) which crashed 
 the servers eventually. 
 My guess this is not implemented due to the efficiency reason. It will cost 
 additional four bytes per message which might be pinned in memory for fast 
 access. There might be some simple perf optimization, such as differential 
 encoding + var length encoding, which should bring down the cost to 1-2 bytes 
 avg per message. 
 Let me know if this makes sense. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1403) Adding timestamp to kafka index structure

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310980#comment-14310980
 ] 

Jay Kreps commented on KAFKA-1403:
--

Ultimately in order to be accurate the time will actually need to be in the 
message itself. Currently we use the write time but this can be arbitrarily 
inaccurate: if you delete the data on a server and restart it it will rewrite 
everything with new timestamps.

 Adding timestamp to kafka index structure
 -

 Key: KAFKA-1403
 URL: https://issues.apache.org/jira/browse/KAFKA-1403
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.1
Reporter: Xinyao Hu
   Original Estimate: 336h
  Remaining Estimate: 336h

 Right now, kafka doesn't have timestamp per message. It makes an assumption 
 that all the messages in the same file has the same timestamp which is the 
 mtime of the file. This makes it inefficient to scan all the messages within 
 a time window, which is a valid use case in a lot of realtime data analysis. 
 One way to hack this is to roll a new file in a short period of time. 
 However, this will result in opening lots of files (KAFKA-1404) which crashed 
 the servers eventually. 
 My guess this is not implemented due to the efficiency reason. It will cost 
 additional four bytes per message which might be pinned in memory for fast 
 access. There might be some simple perf optimization, such as differential 
 encoding + var length encoding, which should bring down the cost to 1-2 bytes 
 avg per message. 
 Let me know if this makes sense. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-971) Handle synchronization in updatemetatdata in KafkaApi better

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-971.
-
Resolution: Fixed

I don't see this in KafkaApis any more. [~junrao] can you reopen if this is 
still a real issue.

 Handle synchronization in updatemetatdata in KafkaApi better
 

 Key: KAFKA-971
 URL: https://issues.apache.org/jira/browse/KAFKA-971
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.1
Reporter: Jun Rao

 It's better to wrapped all synchronization of metadata cache in a 
 MetadataCache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-909) Provide support in the producer to specify a stateful instance of an Encoder, like the consuemr allows with the Decoder

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-909.
-
   Resolution: Fixed
Fix Version/s: 0.8.2

This is available in the new producer.

 Provide support in the producer to specify a stateful instance of an Encoder, 
 like the consuemr allows with the Decoder
 ---

 Key: KAFKA-909
 URL: https://issues.apache.org/jira/browse/KAFKA-909
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.0
 Environment: Kafka 0.8, jdk 6
Reporter: Rob Withers
Assignee: Jun Rao
  Labels: features
 Fix For: 0.8.2


 It is very useful to have a shared instance of the Encoder, to do advanced 
 encoding.  The consumer allows the ability to set the Decoder instance and 
 achieve this capability.  The producer should offer the same capability.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-288) java impacted changes from new producer and consumer request format

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-288.
-
Resolution: Won't Fix

 java impacted changes from new producer and consumer request format
 ---

 Key: KAFKA-288
 URL: https://issues.apache.org/jira/browse/KAFKA-288
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.0
Reporter: Joe Stein
  Labels: replication, wireprotocol
 Fix For: 0.9.0


 1) javaapi.SyncProducer: We should get rid of send(topic: String, messages: 
 ByteBufferMessageSet) and only keep send(producerRequest: 
 kafka.javaapi.ProducerRequest). 
 This affects KafkaRecordWriter and DataGenerator
 2)  javaapi.ProducerRequest: We will need to define a java version of 
 TopicData so that java producers can create request conveniently. The java 
 version of TopicData will use the java version of ByteBufferMessageSet. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-293) Allow to configure all broker ids at once similar to how zookeeper handles server ids

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-293.
-
Resolution: Won't Fix

 Allow to configure all broker ids at once similar to how zookeeper handles 
 server ids
 -

 Key: KAFKA-293
 URL: https://issues.apache.org/jira/browse/KAFKA-293
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.7
Reporter: Thomas Dudziak

 Zookeeper allows to specify all server ids in the same configuration 
 (https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_configuration)
  which has the benefit that the configuration file is the same for all 
 zookeeper instances. A similar approach for Kafka would be quite useful, e.g.
 brokerid.1=host 1
 brokerid.2=host 2
 etc.
 It'd still require per-instance configuration (myid file in the zookeeper 
 case) but that can be created separately (e.g. by the deployment tool used).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-287) Running a consumer client using scala 2.8 fails

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-287.
-
Resolution: Fixed

 Running a consumer client using scala 2.8 fails
 ---

 Key: KAFKA-287
 URL: https://issues.apache.org/jira/browse/KAFKA-287
 Project: Kafka
  Issue Type: Bug
 Environment: Java 1.6, OS X
Reporter: Elben Shira

 Built the kafka library using the instructions found in the README. My client 
 uses scala 2.9.1, sbt 0.11. My consumer client has this snippet of code: 
 https://gist.github.com/a35006cc25e39ba386e2
 The client compiles, but running it produces this stacktrace: 
 https://gist.github.com/efeb85f50402b477d6e0
 I think this may be because of a bug found in scala 2.9.0 (though I'm not 
 sure if it was present in scala 2.8.0): 
 https://issues.scala-lang.org/browse/SI-4575
 To get around this, I built the kafka library using scala 2.9.1 (by changing 
 build.properties).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-280) change on-disk log layout to {log.dir}/topicname/partitionid

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-280.
-
Resolution: Won't Fix

 change on-disk log layout to {log.dir}/topicname/partitionid
 

 Key: KAFKA-280
 URL: https://issues.apache.org/jira/browse/KAFKA-280
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8.0
Reporter: Jun Rao
  Labels: replication

 Currently, the on-disk layout is {log.dir}/topicname-partitionid. The problem 
 is that there is no appropriate place to store topicname level information 
 such as topic version. An alternative layout is 
 {log.dir}/topicname/partitionid. Then, we can store topic level meta data 
 under {log.dir}/topicname. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-282) Currently iterated chunk is not cleared during consumer shutdown

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-282.
-
Resolution: Fixed

 Currently iterated chunk is not cleared during consumer shutdown
 

 Key: KAFKA-282
 URL: https://issues.apache.org/jira/browse/KAFKA-282
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Joel Koshy
Assignee: Joel Koshy

 During consumer connector shutdown, fetch queues are cleared, but the 
 currently iterated chunk is not cleared.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-289) reuse topicdata when sending producerrequest

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-289.
-
Resolution: Won't Fix

 reuse topicdata when sending producerrequest
 

 Key: KAFKA-289
 URL: https://issues.apache.org/jira/browse/KAFKA-289
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.0
Reporter: Joe Stein
  Labels: optimization, replication, wireprotocol
 Fix For: 0.9.0


 The way that SyncProducer sends a ProducerRequest over socket is to first 
 serialize the whole request in a bytebuffer and then sends the bytebuffer 
 through socket. An alternative is to send the request like FetchReponse, 
 using a ProduceRequestSend that reuses TopicDataSend. This avoids code 
 duplication and is more efficient since it sends data in 
 ByteBufferMessagesSet directly to socket and avoids extra copying from 
 messageset to bytebuffer. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-291) Add builder to create configs for consumer and broker

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-291.
-
Resolution: Fixed

 Add builder to create configs for consumer and broker
 -

 Key: KAFKA-291
 URL: https://issues.apache.org/jira/browse/KAFKA-291
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.7
Reporter: John Wang
 Attachments: builderPatch.diff


 Creating Consumer and Producer can be cumbersome because you have to remember 
 the exact string for the property to be set. And since these are just 
 strings, IDEs cannot really help.
 This patch contains builders that help with this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1898) compatibility testing framework

2015-02-07 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1898:
-
Description: 
There are a few different scenarios where you want/need to know the 
status/state of a client library that works with Kafka. Client library 
development is not just about supporting the wire protocol but also the 
implementations around specific interactions of the API.  The API has blossomed 
into a robust set of producer, consumer, broker and administrative calls all of 
which have layers of logic above them.  A Client Library may choose to deviate 
from the path the project sets out and that is ok. The goal of this ticket is 
to have a system for Kafka that can help to explain what the library is or 
isn't doing (regardless of what it claims).

The idea behind this stems in being able to quickly/easily/succinctly analyze 
the topic message data. Once you can analyze the topic(s) message you can 
gather lots of information about what the client library is doing, is not doing 
and such.  There are a few components to this.

1) dataset-generator 

Test Kafka dataset generation tool. Generates a random text file with given 
params:

--filename, -f - output file name.
--filesize, -s - desired size of output file. The actual size will always be a 
bit larger (with a maximum size of $filesize + $max.length - 1)
--min.length, -l - minimum generated entry length.
--max.length, -h - maximum generated entry length.

Usage:

./gradlew build
java -jar dataset-generator/build/libs/dataset-generator-*.jar -s 10 -l 2 
-h 20

2) dataset-producer

Test Kafka dataset producer tool. Able to produce the given dataset to Kafka or 
Syslog server.  The idea here is you already have lots of data sets that you 
want to test different things for. You might have different sized messages, 
formats, etc and want a repeatable benchmark to run and re-run the testing on. 
You could just have a days worth of data and just choose to replay it.  The 
CCTK idea is that you are always starting from CONSUME in your state of 
library. If your library is only producing then you will fail a bunch of tests 
and that might be ok for people.

Accepts following params:

{code}

--filename, -f - input file name.

--kafka, -k - Kafka broker address in host:port format. If this parameter is 
set, --producer.config and --topic must be set too (otherwise they're ignored).

--producer.config, -p - Kafka producer properties file location.

--topic, -t - Kafka topic to produce to.

--syslog, -s - Syslog server address. Format: protocol://host:port 
(tcp://0.0.0.0:5140 or udp://0.0.0.0:5141 for example)

--loop, -l - flag to loop through file until shut off manually. False by 
default.

Usage:

./gradlew build
java -jar dataset-producer/build/libs/dataset-producer-*.jar --filename dataset 
--syslog tcp://0.0.0.0:5140 --loop true

{code}

3) extract

This step is good so you can save data and compare tests. It could also be 
removed if folks are just looking for a real live test (and we could support 
that too).  Here we are taking data out of Kafka and putting it into Cassandra 
(but other data stores can be used too and we should come up with a way to 
abstract this out completely so folks could implement whatever they wanted.

{code}

package ly.stealth.shaihulud.reader

import java.util.UUID

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import consumer.kafka.MessageAndMetadata
import consumer.kafka.client.KafkaReceiver
import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream

object Main extends App with Logging {
  val parser = new scopt.OptionParser[ReaderConfiguration](spark-reader) {
head(Spark Reader for Kafka client applications, 1.0)
opt[String](testId) unbounded() optional() action { (x, c) =
  c.copy(testId = x)
} text (Source topic with initial set of data)
opt[String](source) unbounded() required() action { (x, c) =
  c.copy(sourceTopic = x)
} text (Source topic with initial set of data)
opt[String](destination) unbounded() required() action { (x, c) =
  c.copy(destinationTopic = x)
} text (Destination topic with processed set of data)
opt[Int](partitions) unbounded() optional() action { (x, c) =
  c.copy(partitions = x)
} text (Partitions in topic)
opt[String](zookeeper) unbounded() required() action { (x, c) =
  c.copy(zookeeper = x)
} text (Zookeeper connection host:port)
opt[Int](kafka.fetch.size) unbounded() optional() action { (x, c) =
  c.copy(kafkaFetchSize = x)
} text (Maximum KBs to fetch from Kafka)
checkConfig { c =
  if (c.testId.isEmpty || c.sourceTopic.isEmpty || 
c.destinationTopic.isEmpty || c.zookeeper.isEmpty) {
failure(You haven't provided all required parameters)
  } else 

[jira] [Commented] (KAFKA-1856) Add PreCommit Patch Testing

2015-02-07 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311140#comment-14311140
 ] 

Gwen Shapira commented on KAFKA-1856:
-

Looks good!

[~joestein] or [~nehanarkhede] - please try it out on one of the patches you 
are reviewing.
For example:
python dev-utils/test-patch.py --defect KAFKA-1333 --output patch-process 
--run-tests --username user  --password password

Let us know what you think.

 Add PreCommit Patch Testing
 ---

 Key: KAFKA-1856
 URL: https://issues.apache.org/jira/browse/KAFKA-1856
 Project: Kafka
  Issue Type: Task
Reporter: Ashish Kumar Singh
Assignee: Ashish Kumar Singh
 Attachments: KAFKA-1856.patch, KAFKA-1856_2015-01-18_21:43:56.patch, 
 KAFKA-1856_2015-02-04_14:57:05.patch, KAFKA-1856_2015-02-04_15:44:47.patch


 h1. Kafka PreCommit Patch Testing - *Don't wait for it to break*
 h2. Motivation
 *With great power comes great responsibility* - Uncle Ben. As Kafka user list 
 is growing, mechanism to ensure quality of the product is required. Quality 
 becomes hard to measure and maintain in an open source project, because of a 
 wide community of contributors. Luckily, Kafka is not the first open source 
 project and can benefit from learnings of prior projects.
 PreCommit tests are the tests that are run for each patch that gets attached 
 to an open JIRA. Based on tests results, test execution framework, test bot, 
 +1 or -1 the patch. Having PreCommit tests take the load off committers to 
 look at or test each patch.
 h2. Tests in Kafka
 h3. Unit and Integraiton Tests
 [Unit and Integration 
 tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Unit+and+Integration+Tests]
  are cardinal to help contributors to avoid breaking existing functionalities 
 while adding new functionalities or fixing older ones. These tests, atleast 
 the ones relevant to the changes, must be run by contributors before 
 attaching a patch to a JIRA.
 h3. System Tests
 [System 
 tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests] 
 are much wider tests that, unlike unit tests, focus on end-to-end scenarios 
 and not some specific method or class.
 h2. Apache PreCommit tests
 Apache provides a mechanism to automatically build a project and run a series 
 of tests whenever a patch is uploaded to a JIRA. Based on test execution, the 
 test framework will comment with a +1 or -1 on the JIRA.
 You can read more about the framework here:
 http://wiki.apache.org/general/PreCommitBuilds
 h2. Plan
 # Create a test-patch.py script (similar to the one used in Flume, Sqoop and 
 other projects) that will take a jira as a parameter, apply on the 
 appropriate branch, build the project, run tests and report results. This 
 script should be committed into the Kafka code-base. To begin with, this will 
 only run unit tests. We can add code sanity checks, system_tests, etc in the 
 future.
 # Create a jenkins job for running the test (as described in 
 http://wiki.apache.org/general/PreCommitBuilds) and validate that it works 
 manually. This must be done by a committer with Jenkins access.
 # Ask someone with access to https://builds.apache.org/job/PreCommit-Admin/ 
 to add Kafka to the list of projects PreCommit-Admin triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-02-07 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-1926:


 Summary: Replace kafka.utils.Utils with o.a.k.common.utils.Utils
 Key: KAFKA-1926
 URL: https://issues.apache.org/jira/browse/KAFKA-1926
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps


There is currently a lot of duplication between the Utils class in common and 
the one in core.

Our plan has been to deprecate duplicate code in the server and replace it with 
the new common code.

As such we should evaluate each method in the scala Utils and do one of the 
following:
1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose 
utility that is not Kafka-specific. If we migrate it we should really think 
about the API and make sure there is some test coverage. A few things in there 
are kind of funky and we shouldn't just blindly copy them over.
2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold 
any utilities that really need to make use of Scala features to be convenient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-170) Support for non-blocking polling on multiple streams

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-170.
-
Resolution: Duplicate

This is being solved in the new consumer api.

 Support for non-blocking polling on multiple streams
 

 Key: KAFKA-170
 URL: https://issues.apache.org/jira/browse/KAFKA-170
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8.0
Reporter: Jay Kreps
Priority: Critical
  Labels: replication

 Currently we provide a blocking iterator in the consumer. This is a good 
 mechanism for consuming data from a single topic, but is limited as a 
 mechanism for polling multiple streams.
 For example if one wants to implement a non-blocking union across multiple 
 streams this is hard to do because calls may block indefinitely. A similar 
 situation arrises if trying to implement a streaming join of between two 
 streams.
 I would propose two changes:
 1. Implement a next(timeout) interface on KafkaMessageStream. This will 
 easily handle some simple cases with minimal change. This handles certain 
 limited cases nicely and is easy to implement, but doesn't actually cover the 
 two cases above.
 2. Add an interface to poll streams.
 I don't know the best approach for the later api, but it is important to get 
 it right. One option would be to add a 
 ConsumerConnector.drainTopics(topic1, topic2, ...) which blocks until 
 there is at least one message and then returns a list of triples (topic, 
 partition, message).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-724) Allow automatic socket.send.buffer from operating system

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311008#comment-14311008
 ] 

Jay Kreps commented on KAFKA-724:
-

This is fixed on the old scala clients, but not on the new producer and 
consumer. It would be good to add the -1 means use OS default setting.

 Allow automatic socket.send.buffer from operating system
 

 Key: KAFKA-724
 URL: https://issues.apache.org/jira/browse/KAFKA-724
 Project: Kafka
  Issue Type: Improvement
Reporter: Pablo Barrera
  Labels: newbie

 To do this, don't call to socket().setXXXBufferSize. This can be 
 controlled by the configuration parameter: if the value socket.send.buffer or 
 others are set to -1, don't call to socket().setXXXBufferSize



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-724) Allow automatic socket.send.buffer from operating system

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-724:

Labels: newbie  (was: )

 Allow automatic socket.send.buffer from operating system
 

 Key: KAFKA-724
 URL: https://issues.apache.org/jira/browse/KAFKA-724
 Project: Kafka
  Issue Type: Improvement
Reporter: Pablo Barrera
  Labels: newbie

 To do this, don't call to socket().setXXXBufferSize. This can be 
 controlled by the configuration parameter: if the value socket.send.buffer or 
 others are set to -1, don't call to socket().setXXXBufferSize



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-58) Combine fetch/multifetch and send/multisend

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-58?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-58.

Resolution: Fixed  (was: Unresolved)

 Combine fetch/multifetch and send/multisend
 ---

 Key: KAFKA-58
 URL: https://issues.apache.org/jira/browse/KAFKA-58
 Project: Kafka
  Issue Type: Improvement

 We should expose only the more general multisend and multi fetch. The 
 overhead of these is not high, and it would be good to reduce the number of 
 network APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-333) mirroring - enable non-random partitioning in embedded producer

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-333.
-
Resolution: Invalid

 mirroring - enable non-random partitioning in embedded producer
 ---

 Key: KAFKA-333
 URL: https://issues.apache.org/jira/browse/KAFKA-333
 Project: Kafka
  Issue Type: New Feature
  Components: core
Reporter: xiaoyu wang

 Currently the producer for mirroring uses the random partitioner. It will be 
 very useful if we can specify partitioner there. One use case is when we 
 aggregate ad server logs by mirroring the local kafka cluster on each ad 
 server, we want to partition ad logs based on where they come from - the ad 
 server id.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-57) Add an optional acknowledgement in the producer

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-57?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-57.

Resolution: Fixed  (was: Unresolved)

 Add an optional acknowledgement in the producer
 ---

 Key: KAFKA-57
 URL: https://issues.apache.org/jira/browse/KAFKA-57
 Project: Kafka
  Issue Type: New Feature

 Currently the producer will flush the socket buffer but does not wait for an 
 answer. This is good for tracking-like use cases but bad for queue-like 
 cases. We should add an optional acknowledgement to the protocol and have the 
 producer await this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-331) recurrent produce errors

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-331.
-
Resolution: Incomplete

 recurrent produce errors
 

 Key: KAFKA-331
 URL: https://issues.apache.org/jira/browse/KAFKA-331
 Project: Kafka
  Issue Type: Bug
Reporter: Pierre-Yves Ritschard

 I am using trunk and regularily see such errors popping up:
 32477890 [kafka-processor-7] ERROR kafka.server.KafkaRequestHandlers  - Error 
 processing ProduceRequest on pref^@^@^@:0
 java.io.FileNotFoundException: 
 /mnt/kafka/logs/pref^@^@^@-0/.kafka (Is a directory)
 at java.io.RandomAccessFile.open(Native Method)
 at java.io.RandomAccessFile.init(RandomAccessFile.java:233)
 at kafka.utils.Utils$.openChannel(Utils.scala:324)
 at kafka.message.FileMessageSet.init(FileMessageSet.scala:75)
 at kafka.log.Log.loadSegments(Log.scala:144)
 at kafka.log.Log.init(Log.scala:116)
 at kafka.log.LogManager.createLog(LogManager.scala:149)
 at kafka.log.LogManager.getOrCreateLog(LogManager.scala:204)
 at 
 kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
 at 
 kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
 at kafka.network.Processor.handle(SocketServer.scala:296)
 at kafka.network.Processor.read(SocketServer.scala:319)
 at kafka.network.Processor.run(SocketServer.scala:214)
 at java.lang.Thread.run(Thread.java:679)
 32477890 [kafka-processor-7] ERROR kafka.network.Processor  - Closing socket 
 for /xx.xx.xx.xx because of error
 java.io.FileNotFoundException: 
 /mnt/kafka/logs/pref^@^@^@-0/.kafka (Is a directory)
 at java.io.RandomAccessFile.open(Native Method)
 at java.io.RandomAccessFile.init(RandomAccessFile.java:233)
 at kafka.utils.Utils$.openChannel(Utils.scala:324)
 at kafka.message.FileMessageSet.init(FileMessageSet.scala:75)
 at kafka.log.Log.loadSegments(Log.scala:144)
 at kafka.log.Log.init(Log.scala:116)
 at kafka.log.LogManager.createLog(LogManager.scala:149)
 at kafka.log.LogManager.getOrCreateLog(LogManager.scala:204)
 at 
 kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
 at 
 kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
 at 
 kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
 at kafka.network.Processor.handle(SocketServer.scala:296)
 at kafka.network.Processor.read(SocketServer.scala:319)
 at kafka.network.Processor.run(SocketServer.scala:214)
 at java.lang.Thread.run(Thread.java:679)
 This results in a pref directory created inside the log dir. The original 
 topic should be prefix, somehow a NUL gets inserted there.
 The producing was done with a kafka.javaapi.producer.Producer instance, on 
 which send was called with a kafka.javaapi.producer.ProducerData instance.
 There are no log entries created inside that dir and no impact on the overall 
 operation of the broker operations and consumers.
 Is the producer thread-safe ?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-157) Message may be delivered to incorrect partition incase of semantic partitioning

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-157.
-
Resolution: Fixed

 Message may be delivered to incorrect partition incase of semantic 
 partitioning
 ---

 Key: KAFKA-157
 URL: https://issues.apache.org/jira/browse/KAFKA-157
 Project: Kafka
  Issue Type: Bug
Reporter: Sharad Agarwal

 Incase the broker hosting the partition is down, messages are currently 
 repartitioned with the number of available brokers. This may lead to void the 
 partitioning contract.
 http://bit.ly/oEz2fT



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-5) Continuous log flusher

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-5.
---
Resolution: Invalid  (was: Unresolved)

 Continuous log flusher
 --

 Key: KAFKA-5
 URL: https://issues.apache.org/jira/browse/KAFKA-5
 Project: Kafka
  Issue Type: Improvement

 Today, a kafka log is flushed based on either the number of messages or a 
 certain amount of time elapsed. Setting those numbers properly may not be 
 easy. A better way is do have a background thread that keeps flushing dirty 
 logs as faster as the underlying storage allows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-168) Support locality in consumer partition assignment algorithm

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-168.
-
Resolution: Won't Fix

 Support locality in consumer partition assignment algorithm
 ---

 Key: KAFKA-168
 URL: https://issues.apache.org/jira/browse/KAFKA-168
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Jay Kreps

 There are some use-cases where it makes sense to co-locate brokers and 
 consumer processes. In this case it would be nice to optimize the assignment 
 of partitions to consumers so that the consumer preferentially consumes from 
 the broker with which it is co-located.
 If we are going to do KAFKA-167, moving the assignment to the broker, it 
 would make sense to do that first so we only have to change the logic in one 
 place.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1) The log4j appender still uses the SyncProducer API

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1.
---
Resolution: Invalid  (was: Unresolved)

 The log4j appender still uses the SyncProducer API
 --

 Key: KAFKA-1
 URL: https://issues.apache.org/jira/browse/KAFKA-1
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.6

 The log4j appender still uses the SyncProducer API. Change it to use the 
 Producer API using the StringEncoder instead.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1239) New producer checklist

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1239.
--
Resolution: Fixed

 New producer checklist
 --

 Key: KAFKA-1239
 URL: https://issues.apache.org/jira/browse/KAFKA-1239
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Affects Versions: 0.8.2
Reporter: Neha Narkhede

 Here is the list of todo items we have for the new producer (in no
 particular order):
 1. Rename to org.apache.* package
 2. Discuss config approach
 3. Finalize config approach
 4. Add slf4j logging for debugging purposes
 5. Discuss metrics approach
 6. Add metrics
 7. Convert perf test to optionally use new producer
 8. Get system tests passing with new producer
 9. Write integration tests that test the producer against the real server
 10. Expand unit test coverage a bit
 11. Performance testing and analysis.
 12. Add compression support
 13. Discuss and perhaps add retry support
 14. Discuss the approach to protocol definition and perhaps refactor a bit
 15. Deeper code review
 16. Convert mirror maker
 This doesn't count general bug fixing which I assume we will do as we find
 them.
 Let's file subtasks for each of the above, so there is a single place to 
 track what's outstanding on the new producer. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1655) Allow high performance SimpleConsumer use cases to still work with new Kafka 0.9 consumer APIs

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1655.
--
Resolution: Invalid

 Allow high performance SimpleConsumer use cases to still work with new Kafka 
 0.9 consumer APIs
 --

 Key: KAFKA-1655
 URL: https://issues.apache.org/jira/browse/KAFKA-1655
 Project: Kafka
  Issue Type: New Feature
  Components: consumer
Affects Versions: 0.9.0
Reporter: Valentin

 Hi guys,
 currently Kafka allows consumers to either chose the low level or the high 
 level API, depending on the specific requirements of the consumer 
 implementation. However, I was told that the current low level API 
 (SimpleConsumer) will be deprecated once the new Kafka 0.9 consumer APIs are 
 available.
 In this case it would be good, if we can ensure that the new API does offer 
 some ways to get similar performance for use cases which perfectly fit the 
 old SimpleConsumer API approach.
 Example Use Case:
 A high throughput HTTP API wrapper for consumer requests which gets HTTP REST 
 calls to retrieve data for a specific set of topic partitions and offsets.
 Here the SimpleConsumer is perfect because it allows connection pooling in 
 the HTTP API web application with one pool per existing kafka broker and the 
 web application can handle the required metadata managment to know which pool 
 to fetch a connection for, for each used topic partition. This means 
 connections to Kafka brokers can remain open/pooled and 
 connection/reconnection and metadata overhead is minimized.
 To achieve something similar with the new Kafka 0.9 consumer APIs, it would 
 be good if it could:
 - provide a lowlevel call to connect to a specific broker and to read data 
 from a topic+partition+offset
 OR
 - ensure that subscribe/unsubscribe calls are very cheap and can run without 
 requiring any network traffic. If I subscribe to a topic partition for which 
 the same broker is the leader as the last topic partition which was in use 
 for this consumer API connection, then the consumer API implementation should 
 recognize this and should not do any disconnects/reconnects and just reuse 
 the existing connection to that kafka broker.
 Or put differently, it should be possible to do external metadata handling in 
 the consumer API client and the client should be able to pool consumer API 
 connections effectively by having one pool per Kafka broker.
 Greetings
 Valentin



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1930) Move server over to new metrics library

2015-02-07 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-1930:


 Summary: Move server over to new metrics library
 Key: KAFKA-1930
 URL: https://issues.apache.org/jira/browse/KAFKA-1930
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps


We are using org.apache.kafka.common.metrics on the clients, but using Coda 
Hale metrics on the server. We should move the server over to the new metrics 
package as well. This will help to make all our metrics self-documenting.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1929) Convert core kafka module to use the errors in org.apache.kafka.common.errors

2015-02-07 Thread Jay Kreps (JIRA)
Jay Kreps created KAFKA-1929:


 Summary: Convert core kafka module to use the errors in 
org.apache.kafka.common.errors
 Key: KAFKA-1929
 URL: https://issues.apache.org/jira/browse/KAFKA-1929
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps


With the introduction of the common package there are now a lot of errors 
duplicated in both the common package and in the server. We should refactor the 
server code (but not the scala clients) to switch over to the exceptions in 
common.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1804) Kafka network thread lacks top exception handler

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310972#comment-14310972
 ] 

Jay Kreps commented on KAFKA-1804:
--

The remaining issue is the lack of logging. However we actually do set an 
uncaught exception handler that should log any uncaught exception.  
[~aozeritsky] is there any chance this was just showing up in a different log?

 Kafka network thread lacks top exception handler
 

 Key: KAFKA-1804
 URL: https://issues.apache.org/jira/browse/KAFKA-1804
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Oleg Golovin
Priority: Critical

 We have faced the problem that some kafka network threads may fail, so that 
 jstack attached to Kafka process showed fewer threads than we had defined in 
 our Kafka configuration. This leads to API requests processed by this thread 
 getting stuck unresponed.
 There were no error messages in the log regarding thread failure.
 We have examined Kafka code to find out there is no top try-catch block in 
 the network thread code, which could at least log possible errors.
 Could you add top-level try-catch block for the network thread, which should 
 recover network thread in case of exception?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1638) transient unit test failure UncleanLeaderElectionTest

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310982#comment-14310982
 ] 

Jay Kreps commented on KAFKA-1638:
--

[~junrao] is this still happening? I haven't seen it.

 transient unit test failure UncleanLeaderElectionTest
 -

 Key: KAFKA-1638
 URL: https://issues.apache.org/jira/browse/KAFKA-1638
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie++

 Saw the following transient unit test failure.
 kafka.integration.UncleanLeaderElectionTest  
 testUncleanLeaderElectionEnabled FAILED
 java.lang.RuntimeException: A broker is already registered on the path 
 /brokers/ids/1. This probably indicates that you either have configured a 
 brokerid that is already in use, or else you have shutdown this broker and 
 restarted it faster than the zookeeper timeout so it appears to be 
 re-registering.
 at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:174)
 at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:63)
 at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:45)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:121)
 at 
 kafka.integration.UncleanLeaderElectionTest$$anonfun$verifyUncleanLeaderElectionEnabled$8.apply(UncleanLeaderElectionTest.scala:187)
 at 
 kafka.integration.UncleanLeaderElectionTest$$anonfun$verifyUncleanLeaderElectionEnabled$8.apply(UncleanLeaderElectionTest.scala:187)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionEnabled(UncleanLeaderElectionTest.scala:187)
 at 
 kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionEnabled(UncleanLeaderElectionTest.scala:106)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-881) Kafka broker not respecting log.roll.hours

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-881.
-
Resolution: Won't Fix

Closing due to ancientness.

 Kafka broker not respecting log.roll.hours
 --

 Key: KAFKA-881
 URL: https://issues.apache.org/jira/browse/KAFKA-881
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.7.2
Reporter: Dan F
Assignee: Jay Kreps
 Attachments: kafka-roll-0.8.patch, kafka-roll.again.patch, 
 kafka_roll.patch


 We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs 
 would be rolled every hour, or more. Only, sometimes logs that are many hours 
 (sometimes days) old have more data added to them. This perturbs our systems 
 for reasons I won't get in to.
 I don't know Scala or Kafka well, but I have proposal for why this might 
 happen: upon restart, a broker forgets when its log files have been appended 
 to (firstAppendTime). Then a potentially infinite amount of time later, the 
 restarted broker receives another message for the particular (topic, 
 partition), and starts the clock again. It will then roll over that log after 
 an hour.
 https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala
  says:
   /* the maximum time before a new log segment is rolled out */
   val logRollHours = Utils.getIntInRange(props, log.roll.hours, 24*7, (1, 
 Int.MaxValue))
 https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala
  has maybeRoll, which needs segment.firstAppendTime defined. It also has 
 updateFirstAppendTime() which says if it's empty, then set it.
 If my hypothesis is correct about why it is happening, here is a case where 
 rolling is longer than an hour, even on a high volume topic:
 - write to a topic for 20 minutes
 - restart the broker
 - wait for 5 days
 - write to a topic for 20 minutes
 - restart the broker
 - write to a topic for an hour
 The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long 
 as you want.
 Proposed solution:
 The very easiest thing to do would be to have Kafka re-initialized 
 firstAppendTime with the file creation time. Unfortunately, there is no file 
 creation time in UNIX. There is ctime, change time, updated when a file's 
 inode information is changed.
 One solution is to embed the firstAppendTime in the filename (say, seconds 
 since epoch). Then when you open it you could reset firstAppendTime to 
 exactly what it really was. This ignores clock drift or resetting. One could 
 set firstAppendTime to min(filename-based time, current time).
 A second solution is to make the Kafka log roll over at specific times, 
 regardless of when the file was created. Conceptually, time can be divided 
 into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, 
 when firstAppendTime is empty, compute the next rollover time (say, next = 
 (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file 
 mtime (last modified) is before the current rollover window ( 
 (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll 
 over when you cross next, and reset next.
 A third solution (not perfect, but an approximation at least) would be to not 
 to write to a segment if firstAppendTime is not defined and the timestamp on 
 the file is more than log.roll.hours old.
 There are probably other solutions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-864) bin/kafka-gen-reassignment.sh

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-864.
-
Resolution: Duplicate

I think this is being pursued on another ticket.

 bin/kafka-gen-reassignment.sh
 -

 Key: KAFKA-864
 URL: https://issues.apache.org/jira/browse/KAFKA-864
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.0
Reporter: Scott Clasen

 Better tooling for replacing failed brokers and reassigning partitions.  
 The output of ./bin/kafka-list-topic.sh makes doing this in bulk painful.
 Tool should output json format acceptable to 
 ./bin/kafka-reassign-partitions.sh
 Spec: bin/kafka-gen-reassignment.sh
 Option  Description
 --  ---
 --topic topic REQUIRED: The topic to be reassigned.
   Defaults to all existing topics.
   (default: )
 --partition partitionREQUIRED: The partition to be 
 reassigned.
   Defaults to all partitions.
   (default: )
 --from broker-idREQUIRED: The broker to reassign the 
 partition from
 --to broker-idREQUIRED: The broker to reassign the 
 partition to.
 --zookeeper urls  REQUIRED: The connection string for
   the zookeeper connection in the form
   host:port. Multiple URLS can be
   given to allow fail-over.
 Workflow: Replacing a broker
 ./bin/kafka-gen-reassignment.sh   --zookeeper zks --from failed --to 
 newreassign.json
 ./bin/kafka-reassign-partitions.sh  --zookeeper zks --path-to-json-file 
 reassign.json



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-493) High CPU usage on inactive server

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-493.
-
Resolution: Incomplete

 High CPU usage on inactive server
 -

 Key: KAFKA-493
 URL: https://issues.apache.org/jira/browse/KAFKA-493
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.0
Reporter: Jay Kreps
 Fix For: 0.9.0

 Attachments: Kafka-2014-11-10.snapshot.zip, Kafka-sampling1.zip, 
 Kafka-sampling2.zip, Kafka-sampling3.zip, Kafka-trace1.zip, Kafka-trace2.zip, 
 Kafka-trace3.zip, backtraces.txt, stacktrace.txt


  I've been playing with the 0.8 branch of Kafka and noticed that idle CPU 
  usage is fairly high (13% of a 
  core). Is that to be expected? I did look at the stack, but didn't see 
  anything obvious. A background 
  task?
  I wanted to mention how I am getting into this state. I've set up two 
  machines with the latest 0.8 
  code base and am using a replication factor of 2. On starting the brokers 
  there is no idle CPU activity. 
  Then I run a test that essential does 10k publish operations followed by 
  immediate consume operations 
  (I was measuring latency). Once this has run the kafka nodes seem to 
  consistently be consuming CPU 
  essentially forever.
 hprof results:
 THREAD START (obj=53ae, id = 24, name=RMI TCP Accept-0, 
 group=system)
 THREAD START (obj=53ae, id = 25, name=RMI TCP Accept-, 
 group=system)
 THREAD START (obj=53ae, id = 26, name=RMI TCP Accept-0, 
 group=system)
 THREAD START (obj=53ae, id = 21, name=main, group=main)
 THREAD START (obj=53ae, id = 27, name=Thread-2, group=main)
 THREAD START (obj=53ae, id = 28, name=Thread-3, group=main)
 THREAD START (obj=53ae, id = 29, name=kafka-processor-9092-0, 
 group=main)
 THREAD START (obj=53ae, id = 200010, name=kafka-processor-9092-1, 
 group=main)
 THREAD START (obj=53ae, id = 200011, name=kafka-acceptor, group=main)
 THREAD START (obj=574b, id = 200012, 
 name=ZkClient-EventThread-20-localhost:2181, group=main)
 THREAD START (obj=576e, id = 200014, name=main-SendThread(), 
 group=main)
 THREAD START (obj=576d, id = 200013, name=main-EventThread, 
 group=main)
 THREAD START (obj=53ae, id = 200015, name=metrics-meter-tick-thread-1, 
 group=main)
 THREAD START (obj=53ae, id = 200016, name=metrics-meter-tick-thread-2, 
 group=main)
 THREAD START (obj=53ae, id = 200017, name=request-expiration-task, 
 group=main)
 THREAD START (obj=53ae, id = 200018, name=request-expiration-task, 
 group=main)
 THREAD START (obj=53ae, id = 200019, name=kafka-request-handler-0, 
 group=main)
 THREAD START (obj=53ae, id = 200020, name=kafka-request-handler-1, 
 group=main)
 THREAD START (obj=53ae, id = 200021, name=Thread-6, group=main)
 THREAD START (obj=53ae, id = 200022, name=Thread-7, group=main)
 THREAD START (obj=5899, id = 200023, name=ReplicaFetcherThread-0-2 on 
 broker 1, , group=main)
 THREAD START (obj=5899, id = 200024, name=ReplicaFetcherThread-0-3 on 
 broker 1, , group=main)
 THREAD START (obj=5899, id = 200025, name=ReplicaFetcherThread-0-0 on 
 broker 1, , group=main)
 THREAD START (obj=5899, id = 200026, name=ReplicaFetcherThread-0-1 on 
 broker 1, , group=main)
 THREAD START (obj=53ae, id = 200028, name=SIGINT handler, 
 group=system)
 THREAD START (obj=53ae, id = 200029, name=Thread-5, group=main)
 THREAD START (obj=574b, id = 200030, name=Thread-1, group=main)
 THREAD START (obj=574b, id = 200031, name=Thread-0, group=main)
 THREAD END (id = 200031)
 THREAD END (id = 200029)
 THREAD END (id = 200020)
 THREAD END (id = 200019)
 THREAD END (id = 28)
 THREAD END (id = 200021)
 THREAD END (id = 27)
 THREAD END (id = 200022)
 THREAD END (id = 200018)
 THREAD END (id = 200017)
 THREAD END (id = 200012)
 THREAD END (id = 200013)
 THREAD END (id = 200014)
 THREAD END (id = 200025)
 THREAD END (id = 200023)
 THREAD END (id = 200026)
 THREAD END (id = 200024)
 THREAD END (id = 200011)
 THREAD END (id = 29)
 THREAD END (id = 200010)
 THREAD END (id = 200030)
 THREAD END (id = 200028)
 TRACE 301281:
 sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown 
 line)
 sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228)
 sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:81)
 sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 
 sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:218)
 sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
 
 java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
 

[jira] [Commented] (KAFKA-1831) Producer does not provide any information about which host the data was sent to

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311042#comment-14311042
 ] 

Jay Kreps commented on KAFKA-1831:
--

The new producer returns RecordMetadata on each request. This contains the 
partition to which the record was written. The producer will also give you the 
current state of the cluster which will show where the leader for that 
partition is. I think this is probably sufficient.

 Producer does not provide any information about which host the data was sent 
 to
 ---

 Key: KAFKA-1831
 URL: https://issues.apache.org/jira/browse/KAFKA-1831
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.1.1
Reporter: Mark Payne
Assignee: Jun Rao

 For traceability purposes and for troubleshooting, when sending data to 
 Kafka, the Producer should provide information about which host the data was 
 sent to. This works well already in the SimpleConsumer, which provides host() 
 and port() methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1831) Producer does not provide any information about which host the data was sent to

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1831.
--
Resolution: Fixed

I think the new producer fixes this, reopen if you disagree.

 Producer does not provide any information about which host the data was sent 
 to
 ---

 Key: KAFKA-1831
 URL: https://issues.apache.org/jira/browse/KAFKA-1831
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.1.1
Reporter: Mark Payne
Assignee: Jun Rao

 For traceability purposes and for troubleshooting, when sending data to 
 Kafka, the Producer should provide information about which host the data was 
 sent to. This works well already in the SimpleConsumer, which provides host() 
 and port() methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1908) Split brain

2015-02-07 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311109#comment-14311109
 ] 

Gwen Shapira commented on KAFKA-1908:
-

I think its a multi-lan scenario.

The broker can bind on all available interfaces (0.0.0.0). If the port is 
blocked on inter-broker interface but not on the network between clients and 
brokers, the scenario described seems possible (Although I didn't try to 
replicate myself).

In other clusters, this scenario is prevented by having brokers check access to 
each other periodically (heartbeat) and validate against ZK. If a node is 
visible in ZK but not accessible in the network, the minority partition is 
killed (STONITH, or using ZK to case a node to commit suicide) and the majority 
triggers leader election.

Not a simple mechanism to add to Kafka. And I'm not sure if this is a common 
enough issue to warrant the complexity involved.

 Split brain
 ---

 Key: KAFKA-1908
 URL: https://issues.apache.org/jira/browse/KAFKA-1908
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Alexey Ozeritskiy

 In some cases, there may be two leaders for one partition.
 Steps to reproduce:
 # We have 3 brokers, 1 partition with 3 replicas:
 {code}
 TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
 ISR: [1,2,3]
 {code} 
 # controller works on broker 3
 # let the kafka port be 9092. We execute on broker 1:
 {code}
 iptables -A INPUT -p tcp --dport 9092 -j REJECT
 {code}
 # Initiate replica election
 # As a result:
 Broker 1:
 {code}
 TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
 ISR: [1,2,3]
 {code}
 Broker 2:
 {code}
 TopicAndPartition: [partition,0]Leader: 2   Replicas: [2,1,3]   
 ISR: [1,2,3]
 {code}
 # Flush the iptables rules on broker 1
 Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not 
 receive new data. A consumer can read data from replica-1 or replica-2. When 
 it reads from replica-1 it resets the offsets and than can read duplicates 
 from replica-2.
 We saw this situation in our production cluster when it had network problems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-07 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14311136#comment-14311136
 ] 

Guozhang Wang commented on KAFKA-1805:
--

Thanks for the patch [~parth.brahmbhatt], some comments added by Ewen and 
myself in the RB.

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1709) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1709.
--
Resolution: Invalid

This didn't turn out to be a deadlock.

 [New Java Producer Potential Deadlock] Producer Deadlock when all messages is 
 being sent to single partition
 

 Key: KAFKA-1709
 URL: https://issues.apache.org/jira/browse/KAFKA-1709
 Project: Kafka
  Issue Type: Bug
  Components: producer 
 Environment: Development
Reporter: Bhavesh Mistry
Assignee: Jun Rao
Priority: Critical

 Hi Kafka Dev Team,
 When I run the test to send message to single partition for 3 minutes or so 
 on, I have encounter deadlock (please see the screen attached) and thread 
 contention from YourKit profiling.  
 Use Case:
 1)  Aggregating messages into same partition for metric counting. 
 2)  Replicate Old Producer behavior for sticking to partition for 3 minutes.
 Here is output:
 Frozen threads found (potential deadlock)
  
 It seems that the following threads have not changed their stack for more 
 than 10 seconds.
 These threads are possibly (but not necessarily!) in a deadlock or hung.
  
 pool-1-thread-128 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-159 --- Frozen for at least 2m 1 sec
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 pool-1-thread-55 --- Frozen for at least 2m
 org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition,
  byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139
 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, 
 Callback) KafkaProducer.java:237
 org.kafka.test.TestNetworkDownProducer$MyProducer.run() 
 TestNetworkDownProducer.java:84
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) 
 ThreadPoolExecutor.java:1145
 java.util.concurrent.ThreadPoolExecutor$Worker.run() 
 ThreadPoolExecutor.java:615
 java.lang.Thread.run() Thread.java:744
 Thanks,
 Bhavesh 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1587) Possible Memory Leak when we use Kafka 8.0 Producer for sending messages

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1587.
--
Resolution: Won't Fix

 Possible Memory Leak when we use Kafka 8.0 Producer for sending messages
 

 Key: KAFKA-1587
 URL: https://issues.apache.org/jira/browse/KAFKA-1587
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0
Reporter: Gopinath Sundaram
Priority: Critical

 Hi Kafka team,
 We use Kafka to send messages in an high volume/memory crazy application 
 which uses Parallel GC. We send messages at the rate of 12500/min in the 
 first few hours and then the number of messages drop down to 6000/min. Our 
 application usually runs for a maximum of 24 hours
 What we have:
 1) When we do not send messages through Kafka Producer 0.8, then our 
 application never slows down much and our entire process completes within 24 
 hours
 2) When we use Kafka, our machines slow down in sending messages to around 
 2500/min and as time progresses, the number of messages being sent is even 
 lesser
 3) We suspect that our application spends more time in GC and hence the 
 problem. The Heap Dump does not contain an leak suspect with Kafka, but this 
 slowness happens only when Kafka messaging system is used.
 Any pointers that could help us resolve this issue will be highly appreciated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-936) Kafka Metrics Memory Leak

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310961#comment-14310961
 ] 

Jay Kreps commented on KAFKA-936:
-

[~junrao] Does this issue still exist?

 Kafka Metrics Memory Leak 
 --

 Key: KAFKA-936
 URL: https://issues.apache.org/jira/browse/KAFKA-936
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.0
 Environment: centos linux, jdk 1.6, jboss
Reporter: Senthil Chittibabu
Assignee: Neha Narkhede
Priority: Critical

 I am using kafka_2.8.0-0.8.0-SNAPSHOT version. I am running into 
 OutOfMemoryError in PermGen Space. I have set the -XX:MaxPermSize=512m, but I 
 still get the same error. I used profiler to trace the memory leak, and found 
 the following kafka classes to be the cause for the memory leak. Please let 
 me know if you need any additional information to debug this issue. 
 kafka.server.FetcherLagMetrics
 kafka.consumer.FetchRequestAndResponseMetrics
 kafka.consumer.FetchRequestAndResponseStats
 kafka.metrics.KafkaTimer
 kafka.utils.Pool



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1408) Kafk broker can not stop itself normaly after problems with connection to ZK

2015-02-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14310973#comment-14310973
 ] 

Jay Kreps commented on KAFKA-1408:
--

[~junrao], [~dmitrybugaychenko] is this still active?

 Kafk broker can not stop itself normaly after problems with connection to ZK
 

 Key: KAFKA-1408
 URL: https://issues.apache.org/jira/browse/KAFKA-1408
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.8.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede

 After getting to inconsistence state due to short netwrok failure broker can 
 not stop itself. The last message in the log is:
 {code}
 INFO   | jvm 1| 2014/04/21 08:53:07 | [2014-04-21 09:53:06,999] INFO 
 [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
 INFO   | jvm 1| 2014/04/21 08:53:07 | [2014-04-21 09:53:06,999] INFO 
 [kafka-log-cleaner-thread-0], Shutdown completed (kafka.log.LogCleaner)
 {code}
 There is also a preceding error:
 {code}
 INFO   | jvm 1| 2014/04/21 08:52:55 | [2014-04-21 09:52:55,015] WARN 
 Controller doesn't exist (kafka.utils.Utils$)
 INFO   | jvm 1| 2014/04/21 08:52:55 | kafka.common.KafkaException: 
 Controller doesn't exist
 INFO   | jvm 1| 2014/04/21 08:52:55 |   at 
 kafka.utils.ZkUtils$.getController(ZkUtils.scala:70)
 INFO   | jvm 1| 2014/04/21 08:52:55 |   at 
 kafka.server.KafkaServer.kafka$server$KafkaServer$$controlledShutdown(KafkaServer.scala:148)
 INFO   | jvm 1| 2014/04/21 08:52:55 |   at 
 kafka.server.KafkaServer$$anonfun$shutdown$1.apply$mcV$sp(KafkaServer.scala:220)
 {code}
 Here is a part of jstack (it looks like there is a deadlock between 
 delete-topics-thread  and ZkClient-EventThread):
 {code}
 IWrapper-Connection id=10 state=WAITING
 - waiting on 0x15d6aa44 (a 
 java.util.concurrent.locks.ReentrantLock$NonfairSync)
 - locked 0x15d6aa44 (a 
 java.util.concurrent.locks.ReentrantLock$NonfairSync)
  owned by ZkClient-EventThread-37-devlnx2:2181 id=37
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
 at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
 at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
 at kafka.utils.Utils$.inLock(Utils.scala:536)
 at kafka.controller.KafkaController.shutdown(KafkaController.scala:641)
 at 
 kafka.server.KafkaServer$$anonfun$shutdown$8.apply$mcV$sp(KafkaServer.scala:233)
 at kafka.utils.Utils$.swallow(Utils.scala:167)
 at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
 at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
 at kafka.utils.Logging$class.swallow(Logging.scala:94)
 at kafka.utils.Utils$.swallow(Utils.scala:46)
 at kafka.server.KafkaServer.shutdown(KafkaServer.scala:233)
 at odkl.databus.server.Main.stop(Main.java:184)
 at 
 org.tanukisoftware.wrapper.WrapperManager.stopInner(WrapperManager.java:1982)
 at 
 org.tanukisoftware.wrapper.WrapperManager.handleSocket(WrapperManager.java:2391)
 at org.tanukisoftware.wrapper.WrapperManager.run(WrapperManager.java:2696)
 at java.lang.Thread.run(Thread.java:744)
 ZkClient-EventThread-37-devlnx2:2181 id=37 state=WAITING
 - waiting on 0x3d5f9878 (a java.util.concurrent.CountDownLatch$Sync)
 - locked 0x3d5f9878 (a java.util.concurrent.CountDownLatch$Sync)
 at sun.misc.Unsafe.park(Native Method)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
 at 
 kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
 at 
 kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
 at 
 

[jira] [Updated] (KAFKA-1416) Unify sendMessages/getMessages in unit tests

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1416:
-
Labels: newbie  (was: )

 Unify sendMessages/getMessages in unit tests
 

 Key: KAFKA-1416
 URL: https://issues.apache.org/jira/browse/KAFKA-1416
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
  Labels: newbie

 Multiple unit tests have its own internal function to send/get messages from 
 the brokers. For example:
 sendMessages in ZookeeperConsumerConnectorTest
 produceMessage in UncleanLeaderElectionTest
 sendMessages in FetcherTest
 etc
 It is better to unify them in TestUtils.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-867) producer transaction issue when metrics lib not correct

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-867.
-
Resolution: Incomplete

No follow-up.

 producer transaction issue when metrics lib not correct
 ---

 Key: KAFKA-867
 URL: https://issues.apache.org/jira/browse/KAFKA-867
 Project: Kafka
  Issue Type: Bug
Reporter: Ian Roughley

 When building a producer, I used a different version of metrics (v2.2.0).  
 What I found in this case was that I could deploy a producer, however the 
 consumer received 4 copies of each message I sent.
 Further investigation let to:
 - the timer start method was found in the producer code
 - the message was sent to the broker
 - the timer method was not found in the producer code, causing an exception 
 to be thrown
 - the exception caused a retry (3 times)
 - as the message was already at the broker, there was no rollback
 Not sure if this warrant a higher level of handshaking with the broker.  But 
 it is a fairly simple issue to cause (given kafka's currently buit with a 
 non-published metrics version), that may cause miss-understanding.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-865) Mavenize and separate the client.

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-865.
-
Resolution: Duplicate

 Mavenize and separate the client.
 -

 Key: KAFKA-865
 URL: https://issues.apache.org/jira/browse/KAFKA-865
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.0
Reporter: Ashwanth Fernando

 It seems that the java client for Kafka is also bundled with the server JAR 
 file and this is generated using sbt package. This is difficult for java 
 folks to work with because:
 1) Many java shops use maven and they want to specify the GAV of kafka in 
 their pom and bang, the client jar and all its dependencies should be added 
 to the application's classpath. I can't do that right now, because I need to 
 run ./sbt eclipse, get the .JAR, add that to my classpath, add a whole lot of 
 dependencies (log4j, slf4j, zkClient and so on) manually, which is a pain. 
 There are 90 million maven central uploads/downloads in 2012 alone. Almost 
 all the java shops out there have maven (either central or in house sonatype).
 2) Separation of concerns - keeping the server (core) and the client's 
 classes together in same jar file, increases the size of the bundle for a 
 client and also everytime the server's code changes and a release is 
 performed, the client also needs to update their .JAR file. which is not very 
 great. We don't want a ton of clients to update their .JAR file, just because 
 a faster replication strategy for the kafka server cluster changed in a new 
 release.
 Action items are to separate the client and server portions of Kafka, add it 
 in a pom along with the compile time dependencies and upload it to Maven 
 Central or if you have a LinkedIn externally exposed Nexus, over there.
 This will increase adoption of the Kafka framework.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-715) NumberFormatException in PartitionStateInfo

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-715.
-
Resolution: Duplicate

 NumberFormatException in PartitionStateInfo
 ---

 Key: KAFKA-715
 URL: https://issues.apache.org/jira/browse/KAFKA-715
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.0
Reporter: Chris Riccomini
Assignee: Neha Narkhede

 Hey Guys,
 During a broker restart, I got this exception:
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:host.name=eat1-qa466.corp.linkedin.com
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:java.version=1.6.0_21
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:java.vendor=Sun Microsystems Inc.
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:java.home=/export/apps/jdk/JDK-1_6_0_21/jre
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:java.class.path=/export/apps/jdk/JDK-1_6_0_21/lib/tools.jar:lib/activation-1.0.2.jar:lib/ant-1.6.5.jar:lib/aopalliance-1.0.jar:lib/cfg-2.8.0.jar:lib/cfg-api-6.6.6.jar:lib/cfg-impl-6.6.6.jar:lib/com.linkedin.customlibrary.j2ee-1.0.jar:lib/com.linkedin.customlibrary.mx4j-3.0.2.jar:lib/com.linkedin.customlibrary.xmsg-0.6.jar:lib/commons-beanutils-1.7.0.jar:lib/commons-cli-1.0.jar:lib/commons-lang-2.4.jar:lib/commons-logging-1.1.jar:lib/configuration-api-1.4.8.jar:lib/configuration-repository-impl-1.4.8.jar:lib/container-management-impl-1.1.15.jar:lib/container-server-1.1.15.jar:lib/emweb-impl-1.1.15.jar:lib/jaxen-1.1.1.jar:lib/jdom-1.0.jar:lib/jetty-6.1.26.jar:lib/jetty-management-6.1.26.jar:lib/jetty-naming-6.1.26.jar:lib/jetty-plus-6.1.26.jar:lib/jetty-util5-6.1.26.jar:lib/jetty-util-6.1.26.jar:lib/jmx-impl-1.4.8.jar:lib/json-simple-1.1.jar:lib/jsp-2.1-6.1.1.jar:lib/jsp-api-2.1-6.1.1.jar:lib/lispring-lispring-core-1.4.8.jar:lib/lispring-lispring-servlet-1.4.8.jar:lib/log4j-1.2.15.jar:lib/mail-1.3.0.jar:lib/mx4j-tools-3.0.2.jar:lib/servlet-api-2.5.jar:lib/spring-aop-3.0.3.jar:lib/spring-asm-3.0.3.jar:lib/spring-aspects-3.0.3.jar:lib/spring-beans-3.0.3.jar:lib/spring-context-3.0.3.jar:lib/spring-context-support-3.0.3.jar:lib/spring-core-3.0.3.jar:lib/spring-expression-3.0.3.jar:lib/spring-jdbc-3.0.3.jar:lib/spring-jms-3.0.3.jar:lib/spring-orm-3.0.3.jar:lib/spring-transaction-3.0.3.jar:lib/spring-web-3.0.3.jar:lib/spring-web-servlet-3.0.3.jar:lib/util-core-4.0.40.jar:lib/util-i18n-4.0.40.jar:lib/util-jmx-4.0.22.jar:lib/util-log-4.0.40.jar:lib/util-servlet-4.0.40.jar:lib/util-xmsg-4.0.40.jar:lib/xml-apis-1.3.04.jar
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:java.library.path=/export/apps/jdk/JDK-1_6_0_21/jre/lib/amd64/server:/export/apps/jdk/JDK-1_6_0_21/jre/lib/amd64:/export/apps/jdk/JDK-1_6_0_21/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:java.io.tmpdir=/export/content/glu/apps/kafka/i001/tmp
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:java.compiler=NA
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:os.name=Linux
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:os.arch=amd64
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:os.version=2.6.32-220.13.1.el6.x86_64
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:user.name=app
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:user.home=/home/app
 2013/01/21 19:21:10.918 INFO [ZooKeeper] [main] [kafka] []  Client 
 environment:user.dir=/export/content/glu/apps/kafka/i001
 2013/01/21 19:21:10.919 INFO [ZooKeeper] [main] [kafka] []  Initiating client 
 connection, 
 connectString=eat1-app309.corp.linkedin.com:12913,eat1-app310.corp.linkedin.com:12913,eat1-app311.corp.linkedin.com:12913,eat1-app312.corp.linkedin.com:12913,eat1-app313.corp.linkedin.com:12913/kafka-samsa
  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1bfdbab5
 2013/01/21 19:21:10.932 INFO [ClientCnxn] [main-SendThread()] [kafka] []  
 Opening socket connection to server 
 eat1-app313.corp.linkedin.com/172.20.72.73:12913
 2013/01/21 19:21:10.933 INFO [ClientCnxn] 
 [main-SendThread(eat1-app313.corp.linkedin.com:12913)] [kafka] []  Socket 
 connection established to eat1-app313.corp.linkedin.com/172.20.72.73:12913, 
 initiating session
 2013/01/21 19:21:10.963 INFO [ClientCnxn] 
 

[jira] [Resolved] (KAFKA-711) NPE in TRUNK broker

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-711.
-
Resolution: Invalid

 NPE in TRUNK broker
 ---

 Key: KAFKA-711
 URL: https://issues.apache.org/jira/browse/KAFKA-711
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: John Fung
Assignee: Jay Kreps

 This issue happens in Trunk only.
 ** To reproduce the issue:
 1. Start zookeeper:
 $ bin/zookeeper-server-start.sh config/zookeeper.properties
 2. Start broker:
 $ bin/kafka-server-start.sh config/server.properties
 3. Start producer:
 $ bin/kafka-run-class.sh kafka.perf.ProducerPerformance --broker-list 
 localhost:9092 --messages 100 --topics topic_001 --batch-size 50 --threads 1 
 --message-size 200
 ** Broker exception:
 [2013-01-18 10:11:41,241] ERROR Replica Manager on Broker 0: Error processing 
 leaderAndISR request LeaderAndIsrRequest(0,5,,1000,Map((topic_001,0) - 
 PartitionStateInfo(LeaderIsrAndControllerEpoch({ 
 ISR:0,leader:0,leaderEpoch:0 
 },1),1)),Set(id:0,host:jfung-ld.linkedin.biz,port:9092),1) 
 (kafka.server.ReplicaManager)
 java.lang.NullPointerException
 at kafka.log.OffsetIndex.entries(OffsetIndex.scala:285)
 at kafka.log.OffsetIndex$$anonfun$1.apply(OffsetIndex.scala:88)
 at kafka.log.OffsetIndex$$anonfun$1.apply(OffsetIndex.scala:88)
 at kafka.utils.Logging$class.info(Logging.scala:67)
 at kafka.log.OffsetIndex.info(OffsetIndex.scala:52)
 at kafka.log.OffsetIndex.init(OffsetIndex.scala:87)
 at kafka.log.LogSegment.init(LogSegment.scala:37)
 at kafka.log.Log.loadSegments(Log.scala:132)
 at kafka.log.Log.init(Log.scala:75)
 at kafka.log.LogManager.createLogIfNotExists(LogManager.scala:202)
 at kafka.log.LogManager.getOrCreateLog(LogManager.scala:179)
 at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:77)
 at kafka.cluster.Partition$$anonfun$1.apply(Partition.scala:142)
 at kafka.cluster.Partition$$anonfun$1.apply(Partition.scala:142)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
 at 
 scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
 at scala.collection.immutable.List.foreach(List.scala:45)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
 at scala.collection.immutable.List.map(List.scala:45)
 at kafka.cluster.Partition.makeLeader(Partition.scala:142)
 at 
 kafka.server.ReplicaManager.kafka$server$ReplicaManager$$makeLeader(ReplicaManager.scala:219)
 at 
 kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:199)
 at 
 kafka.server.ReplicaManager$$anonfun$becomeLeaderOrFollower$3.apply(ReplicaManager.scala:191)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
 at 
 kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:191)
 at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:155)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:65)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
 at java.lang.Thread.run(Thread.java:662)
 ** ProducerPerformance exception:
 [2013-01-18 10:07:02,779] ERROR Error in handling batch of 50 events 
 (kafka.producer.async.ProducerSendThread)
 kafka.common.FailedToSendMessageException: Failed to send messages after 3 
 tries.
 at 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:87)
 at 
 kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
 at 
 kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
 at 
 kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
 at scala.collection.immutable.Stream.foreach(Stream.scala:254)
 at 
 kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
 at 
 kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-662) Create testcases for unclean shut down

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-662.
-
Resolution: Fixed

 Create testcases for unclean shut down
 --

 Key: KAFKA-662
 URL: https://issues.apache.org/jira/browse/KAFKA-662
 Project: Kafka
  Issue Type: Task
Reporter: John Fung
Assignee: John Fung





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-663) Add deploy feature to System Test

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-663.
-
Resolution: Won't Fix

Not under active development.

 Add deploy feature to System Test
 ---

 Key: KAFKA-663
 URL: https://issues.apache.org/jira/browse/KAFKA-663
 Project: Kafka
  Issue Type: Task
Reporter: John Fung
Assignee: John Fung





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-321) Remove dead brokers from ProducerPool

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-321.
-
Resolution: Fixed

 Remove dead brokers from ProducerPool
 -

 Key: KAFKA-321
 URL: https://issues.apache.org/jira/browse/KAFKA-321
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.0
Reporter: Prashanth Menon
  Labels: replication

 Currently, the ProducerPool does not remove producers that are tied to dead 
 brokers.  Keeping such producers around can adversely effect normal producer 
 operation by handing them out and failling - one such scenario is when 
 updating cached topic metadata.  It's best if we remove any producers that 
 are tied to dead brokers to avoid such situations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-299) Change broker request and response to use Seqs rather than Array

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-299.
-
Resolution: Fixed

 Change broker request and response to use Seqs rather than Array
 

 Key: KAFKA-299
 URL: https://issues.apache.org/jira/browse/KAFKA-299
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.0
Reporter: Prashanth Menon
Assignee: Prashanth Menon
  Labels: 0.8, replication, wireprotocol

 The new Produce and Fetch request and response classes use primitive Arrays, 
 but becaue they are case classes and Java's array hashCode/equals 
 functionality is broken, the case class equality contract is broken as well.  
 We should change the models to use Seqs to resolve the issue along with 
 gaining all the functional benefits that goes along with it.  This change 
 will require appropriate Java versions to convert between Array's and Seqs 
 for Java clients.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-260) Add audit trail to kafka

2015-02-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-260.
-
Resolution: Won't Fix

 Add audit trail to kafka
 

 Key: KAFKA-260
 URL: https://issues.apache.org/jira/browse/KAFKA-260
 Project: Kafka
  Issue Type: New Feature
Affects Versions: 0.8.0
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: Picture 18.png, kafka-audit-trail-draft.patch


 LinkedIn has a system that does monitoring on top of our data flow to ensure 
 all data is delivered to all consumers of data. This works by having each 
 logical tier through which data passes produce messages to a central 
 audit-trail topic; these messages give a time period and the number of 
 messages that passed through that tier in that time period. Example of tiers 
 for data might be producer, broker, hadoop-etl, etc. This makes it 
 possible to compare the total events for a given time period to ensure that 
 all events that are produced are consumed by all consumers.
 This turns out to be extremely useful. We also have an application that 
 balances the books and checks that all data is consumed in a timely 
 fashion. This gives graphs for each topic and shows any data loss and the lag 
 at which the data is consumed (if any).
 This would be an optional feature that would allow you to to this kind of 
 reconciliation automatically for all the topics kafka hosts against all the 
 tiers of applications that interact with the data.
 Some details, the proposed format of the data is JSON using the following 
 format for messages:
 {
   time:1301727060032,  // the timestamp at which this audit message is sent
   topic: my_topic_name, // the topic this audit data is for
   tier:producer, // a user-defined tier name
   bucket_start: 130172640, // the beginning of the time bucket this 
 data applies to
   bucket_end: 130172700, // the end of the time bucket this data 
 applies to
   host:my_host_name.datacenter.linkedin.com, // the server that this was 
 sent from
   datacenter:hlx32, // the datacenter this occurred in
   application:newsfeed_service, // a user-defined application name
   guid:51656274-a86a-4dff-b824-8e8e20a6348f, // a unique identifier for 
 this message
   count:43634
 }
 DISCUSSION
 Time is complex:
 1. The audit data must be based on a timestamp in the events not the time on 
 machine processing the event. Using this timestamp means that all downstream 
 consumers will report audit data on the right time bucket. This means that 
 there must be a timestamp in the event, which we don't currently require. 
 Arguably we should just add a timestamp to the events, but I think it is 
 sufficient for now just to allow the user to provide a function to extract 
 the time from their events.
 2. For counts to reconcile exactly we can only do analysis at a granularity 
 based on the least common multiple of the bucket size used by all tiers. The 
 simplest is just to configure them all to use the same bucket size. We 
 currently use a bucket size of 10 mins, but anything from 1-60 mins is 
 probably reasonable.
 For analysis purposes one tier is designated as the source tier and we do 
 reconciliation against this count (e.g. if another tier has less, that is 
 treated as lost, if another tier has more that is duplication).
 Note that this system makes false positives possible since you can lose an 
 audit message. It also makes false negatives possible since if you lose both 
 normal messages and the associated audit messages it will appear that 
 everything adds up. The later problem is astronomically unlikely to happen 
 exactly, though.
 This would integrate into the client (producer and consumer both) in the 
 following way:
 1. The user provides a way to get timestamps from messages (required)
 2. The user configures the tier name, host name, datacenter name, and 
 application name as part of the consumer and producer config. We can provide 
 reasonable defaults if not supplied (e.g. if it is a Producer then set tier 
 to producer and get the hostname from the OS).
 The application that processes this data is currently a Java Jetty app and 
 talks to mysql. It feeds off the audit topic in kafka and runs both automatic 
 monitoring checks and graphical displays of data against this. The data layer 
 is not terribly scalable but because the audit data is sent only periodically 
 this is enough to allow us to audit thousands of servers on very modest 
 hardware, and having sql access makes diving into the data to trace problems 
 to particular hosts easier.
 LOGISTICS
 I would recommend the following steps:
 1. Add the audit application, the proposal would be to add a new top-level 
 directory equivalent to core or perf called audit to house this 
 application. At this point it would just be sitting 

  1   2   >