[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14168996#comment-14168996 ] Sriram Subramanian commented on KAFKA-1555: --- [~gwenshap] +1 on your suggestion. We can get the documentation ready and then do the linking during the release. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26633: Patch for KAFKA-1305
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26633/ --- (Updated Oct. 13, 2014, 2:30 p.m.) Review request for kafka. Bugs: KAFKA-1305 https://issues.apache.org/jira/browse/KAFKA-1305 Repository: kafka Description --- KAFKA-1305. Controller can hang on controlled shutdown with auto leader balance enabled. Diffs (updated) - core/src/main/scala/kafka/server/KafkaConfig.scala 90af698b01ec82b6168e02b6af41887ef164ad51 Diff: https://reviews.apache.org/r/26633/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled
[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169343#comment-14169343 ] Sriharsha Chintalapani commented on KAFKA-1305: --- Updated reviewboard https://reviews.apache.org/r/26633/diff/ against branch origin/trunk Controller can hang on controlled shutdown with auto leader balance enabled --- Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2, 0.9.0 Attachments: KAFKA-1305.patch, KAFKA-1305.patch, KAFKA-1305_2014-10-13_07:30:45.patch This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: 1. The previous controller was bounced and broker 265 became the new controller. 2. I went on to do a controlled shutdown of broker 265 (the new controller). 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) 6. So the request thread to broker 265 gets into infinite retries. 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: Controller-265-to-broker-265-send-thread - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... Thread-4 - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) at kafka.Kafka$$anon$1.run(Kafka.scala:42) ... kafka-scheduler-0 - Thread t@117 java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at
[jira] [Updated] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled
[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-1305: -- Attachment: KAFKA-1305_2014-10-13_07:30:45.patch Controller can hang on controlled shutdown with auto leader balance enabled --- Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2, 0.9.0 Attachments: KAFKA-1305.patch, KAFKA-1305.patch, KAFKA-1305_2014-10-13_07:30:45.patch This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: 1. The previous controller was bounced and broker 265 became the new controller. 2. I went on to do a controlled shutdown of broker 265 (the new controller). 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) 6. So the request thread to broker 265 gets into infinite retries. 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: Controller-265-to-broker-265-send-thread - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... Thread-4 - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) at kafka.Kafka$$anon$1.run(Kafka.scala:42) ... kafka-scheduler-0 - Thread t@117 java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169396#comment-14169396 ] Vladimir Tretyakov commented on KAFKA-1481: --- Added another patch which introduce metrics name like pairs (key=value), examples: {code} kafka.consumer:type=ConsumerTopicMetrics,name=clientId=af_servers,AllTopics,BytesPerSec kafka.cluster:type=Partition,name=topic=spm_alerts_topic,partitionId=0,UnderReplicated kafka.network:type=SocketServer,name=NetworkProcessorNum=1,IdlePercent kafka.server:type=BrokerTopicMetrics,name=topic=spm_topic,MessagesInPerSec kafka.server:type=FetcherStats,name=clientId=af_servers,ConsumerFetcherThread,groupId=af_servers,consumerHostName=wawanawna,timestamp=1413199781501,uuid=58a5cc70,fetcherId=0,sourceBrokerId=0,brokerHost=wawanawna,brokerPort=9092,BytesPerSec kafka.server:type=FetcherLagMetrics,name=clientId=af_servers,ConsumerFetcherThread,groupId=af_servers,consumerHostName=wawanawna,timestamp=1413199781501,uuid=58a5cc70,fetcherId=0,sourceBrokerId=0,brokerHost=wawanawna,brokerPort=9092,topic=spm_topic,partitionId=0,ConsumerLag kafka.consumer:type=FetchRequestAndResponseMetrics,name=clientId=af_servers,ConsumerFetcherThread,groupId=af_servers,consumerHostName=wawanawna,timestamp=1413199781501,uuid=58a5cc70,fetcherId=0,sourceBrokerId=0,brokerHost=wawanawna,brokerPort=9092,FetchResponseSize {code} Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Tretyakov updated KAFKA-1481: -- Attachment: KAFKA-1481_2014-10-13_18-23-35.patch Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry commented on KAFKA-1642: --- {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { public static void main(String[] args) throws IOException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } Callback callback = new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } }; ProducerRecord record = new ProducerRecord(topic, builder.toString().getBytes()); while (true) { try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callback); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } } } {code} {code: name=kafkaproducer.properties } # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all messages). buffer.memory=134217728 compression.type=snappy retries=3 # DEFAULT FROM THE KAFKA... # batch size = ((buffer.memory) / (number of partitions)) (so we can have in progress batch size created for each partition.). batch.size=1048576 #2MiB max.request.size=1048576 send.buffer.bytes=2097152 # We do not want to block the buffer Full so application thread will not be blocked but logs lines will be dropped... block.on.buffer.full=false #2MiB send.buffer.bytes=2097152 #wait... linger.ms=5000 {code} [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Jun Rao I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:08 PM: - {code TestNetworkDownProducer.java} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = logmon.test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} {code: kafkaproducer.properties } # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:09 PM: - {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} Property File {code } # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all messages).
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:09 PM: - {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} {code } # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all messages).
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:10 PM: - {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} This is property file used: {code } # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:11 PM: - {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} This is property file used: {code} # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169453#comment-14169453 ] Bhavesh Mistry commented on KAFKA-1642: --- [~jkreps] Let me know if you need any other help !! Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Jun Rao I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1670) Corrupt log files for segment.bytes values close to Int.MaxInt
[ https://issues.apache.org/jira/browse/KAFKA-1670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169493#comment-14169493 ] Guozhang Wang commented on KAFKA-1670: -- Verified that system test has recovered, thanks! Corrupt log files for segment.bytes values close to Int.MaxInt -- Key: KAFKA-1670 URL: https://issues.apache.org/jira/browse/KAFKA-1670 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1670.patch, KAFKA-1670.patch, KAFKA-1670_2014-10-04_20:17:46.patch, KAFKA-1670_2014-10-06_09:48:25.patch, KAFKA-1670_2014-10-07_13:39:13.patch, KAFKA-1670_2014-10-07_13:49:10.patch, KAFKA-1670_2014-10-07_18:39:31.patch The maximum value for the topic-level config {{segment.bytes}} is {{Int.MaxInt}} (2147483647). *Using this value causes brokers to corrupt their log files, leaving them unreadable.* We set {{segment.bytes}} to {{2122317824}} which is well below the maximum. One by one, the ISR of all partitions shrunk to 1. Brokers would crash when restarted, attempting to read from a negative offset in a log file. After discovering that many segment files had grown to 4GB or more, we were forced to shut down our *entire production Kafka cluster* for several hours while we split all segment files into 1GB chunks. Looking into the {{kafka.log}} code, the {{segment.bytes}} parameter is used inconsistently. It is treated as a *soft* maximum for the size of the segment file (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/LogConfig.scala#L26) with logs rolled only after (https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/log/Log.scala#L246) they exceed this value. However, much of the code that deals with log files uses *ints* to store the size of the file and the position in the file. Overflow of these ints leads the broker to append to the segments indefinitely, and to fail to read these segments for consuming or recovery. This is trivial to reproduce: {code} $ bin/kafka-topics.sh --topic segment-bytes-test --create --replication-factor 2 --partitions 1 --zookeeper zkhost:2181 $ bin/kafka-topics.sh --topic segment-bytes-test --alter --config segment.bytes=2147483647 --zookeeper zkhost:2181 $ yes Int.MaxValue is a ridiculous bound on file size in 2014 | bin/kafka-console-producer.sh --broker-list localhost:6667 zkhost:2181 --topic segment-bytes-test {code} After running for a few minutes, the log file is corrupt: {code} $ ls -lh data/segment-bytes-test-0/ total 9.7G -rw-r--r-- 1 root root 10M Oct 3 19:39 .index -rw-r--r-- 1 root root 9.7G Oct 3 19:39 .log {code} We recovered the data from the log files using a simple Python script: https://gist.github.com/also/9f823d9eb9dc0a410796 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415 ] Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 5:05 PM: - {code} import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 200; static CountDownLatch latch = new CountDownLatch(200); public static void main(String[] args) throws IOException, InterruptedException { Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 4; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); for(int i = 0 ; i numberTh;i++){ service.execute(new MyProducer(producer,10,builder.toString(), topic)); } latch.await(); System.out.println(All Producers done...!); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All done...!); } static class MyProducer implements Runnable { Producer[] producer; long maxloops; String msg ; String topic; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { ProducerRecord record = new ProducerRecord(topic, msg.toString().getBytes()); Callback callBack = new MyCallback(); try{ for(long j=0 ; j maxloops ; j++){ try { for (int i = 0; i producer.length; i++) { producer[i].send(record, callBack); } Thread.sleep(10); } catch (Throwable th) { System.err.println(FATAL ); th.printStackTrace(); } } }finally { latch.countDown(); } } } static class MyCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null){ System.err.println(Msg dropped..!); exception.printStackTrace(); } } } } {code} This is property file used: {code} # THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at https://kafka.apache.org/documentation.html#newproducerconfigs # Broker List bootstrap.servers= BROKERS HERE... #Data Acks acks=1 # 64MB of Buffer for log lines (including all
[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169616#comment-14169616 ] James Oliver commented on KAFKA-1493: - Sure, I'll take a look at it now. Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: Ivan Lyutov Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744
[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169648#comment-14169648 ] Joel Koshy commented on KAFKA-1634: --- [~junrao] yes you are right. The OffsetFetchResponse returns offset, metadata and error - it does not include the timestamp. Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Joel Koshy Priority: Blocker Fix For: 0.8.2 From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] Message Metadata
I need to take more time to think about this. Here are a few off-the-cuff remarks: - To date we have tried really, really hard to keep the data model for message simple since after all you can always add whatever you like inside the message body. - For system tags, why not just make these fields first class fields in message? The purpose of a system tag is presumably that Why have a bunch of key-value pairs versus first-class fields? - You don't necessarily need application-level tags explicitly represented in the message format for efficiency. The application can define their own header (e.g. their message could be a size delimited header followed by a size delimited body). But actually if you use Avro you don't even need this I don't think. Avro has the ability to just deserialize the header fields in your message. Avro has a notion of reader and writer schemas. The writer schema is whatever the message was written with. If the reader schema is just the header, avro will skip any fields it doesn't need and just deserialize the fields it does need. This is actually a much more usable and flexible way to define a header since you get all the types avro allows instead of just bytes. - We will need to think carefully about what to do with timestamps if we end up including them. There are actually several timestamps - The time the producer created the message - The time the leader received the message - The time the current broker received the message The producer timestamps won't be at all increasing. The leader timestamp will be mostly increasing except when the clock changes or leadership moves. This somewhat complicates the use of these timestamps, though. From the point of view of the producer the only time that matters is the time the message was created. However since the producer sets this it can be arbitrarily bad (remember all the ntp issues and 1970 timestamps we would get). Say that the heuristic was to use the timestamp of the first message in a file for retention, the problem would be that the timestamps for the segments need not even be sequential and a single bad producer could send data with time in the distant past or future causing data to be deleted or retained forever. Using the broker timestamp at write time is better, though obvious that would be overwritten when data is mirrored between clusters (the mirror would then have a different time--and if the mirroring ever stopped that gap could be large). One approach would be to use the client timestamp but have the broker overwrite it if it is too bad (e.g. off by more than a minute, say). -Jay On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy jjkosh...@gmail.com wrote: Thanks Guozhang! This is an excellent write-up and the approach nicely consolidates a number of long-standing issues. It would be great if everyone can review this carefully and give feedback. Also, wrt discussion in the past we have used a mix of wiki comments and the mailing list. Personally, I think it is better to discuss on the mailing list (for more visibility) and just post a bold link to the (archived) mailing list thread on the wiki. Joel On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote: Hello all, I put some thoughts on enhancing our current message metadata format to solve a bunch of existing issues: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata This wiki page is for kicking off some discussions about the feasibility of adding more info into the message header, and if possible how we would add them. -- Guozhang
Re: Review Request 26633: Patch for KAFKA-1305
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26633/#review56427 --- Ship it! core/src/main/scala/kafka/server/KafkaConfig.scala https://reviews.apache.org/r/26633/#comment96710 this looks good now. - Neha Narkhede On Oct. 13, 2014, 2:30 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26633/ --- (Updated Oct. 13, 2014, 2:30 p.m.) Review request for kafka. Bugs: KAFKA-1305 https://issues.apache.org/jira/browse/KAFKA-1305 Repository: kafka Description --- KAFKA-1305. Controller can hang on controlled shutdown with auto leader balance enabled. Diffs - core/src/main/scala/kafka/server/KafkaConfig.scala 90af698b01ec82b6168e02b6af41887ef164ad51 Diff: https://reviews.apache.org/r/26633/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Updated] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment
[ https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1631: - Assignee: Ewen Cheslack-Postava ReplicationFactor and under-replicated partitions incorrect during reassignment --- Key: KAFKA-1631 URL: https://issues.apache.org/jira/browse/KAFKA-1631 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1631-v1.patch We have a topic with a replication factor of 3. We monitor UnderReplicatedPartitions as recommended by the documentation. During a partition reassignment, partitions being reassigned are reported as under-replicated. Running a describe shows: {code} Topic:activity-wal-1PartitionCount:15 ReplicationFactor:5 Configs: Topic: activity-wal-1 Partition: 0Leader: 14 Replicas: 14,13,12,11,15Isr: 14,12,11,13 Topic: activity-wal-1 Partition: 1Leader: 14 Replicas: 15,14,11 Isr: 14,11 Topic: activity-wal-1 Partition: 2Leader: 11 Replicas: 11,15,12 Isr: 12,11,15 ... {code} It looks like the displayed replication factor, 5, is simply the number of replicas listed for the first partition, which includes both brokers in the current list and those onto which the partition is being reassigned. Partition 0 is also included in the list when using the `--under-replicated-partitions` option, even though it is replicated to more partitions than the true replication factor. During a reassignment, the under-replicated partitions metric is not usable, meaning that actual under-replicated partitions can go unnoticed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1196) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
[ https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1196: - Assignee: (was: Neha Narkhede) java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33 --- Key: KAFKA-1196 URL: https://issues.apache.org/jira/browse/KAFKA-1196 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: running java 1.7, linux and kafka compiled against scala 2.9.2 Reporter: Gerrit Jansen van Vuuren Priority: Blocker Labels: newbie Fix For: 0.9.0 I have 6 topics each with 8 partitions spread over 4 kafka servers. the servers are 24 core 72 gig ram. While consuming from the topics I get an IlegalArgumentException and all consumption stops, the error keeps on throwing. I've tracked it down to FectchResponse.scala line 33 The error happens when the FetchResponsePartitionData object's readFrom method calls: messageSetBuffer.limit(messageSetSize) I put in some debug code the the messageSetSize is 671758648, while the buffer.capacity() gives 155733313, for some reason the buffer is smaller than the required message size. I don't know the consumer code enough to debug this. It doesn't matter if compression is used or not. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169717#comment-14169717 ] Jay Kreps commented on KAFKA-1646: -- Hey [~xueqiang] what is the latency impact for that preallocation for a large file? I.e. if your segment size is 1GB will there be a long pause when the new log segment is rolled? How long does that take? Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1701) Improve controller and broker message handling.
[ https://issues.apache.org/jira/browse/KAFKA-1701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1701: Description: This ticket is a memo for future controller refactoring. It is related to KAFKA-1647. Ideally, the broker should only follow instruction from controller but not handle it smartly. For KAFKA-1647, the controller should filter out the partitions whose leader is not up yet before send LeaderAndIsrRequest to broker. The idea is controller should handle all the edge cases instead of letting broker do it. was: This ticket is a memo for future controller refactoring. It is related to KAFKA-1547. Ideally, the broker should only follow instruction from controller but not handle it smartly. For KAFKA-1547, the controller should filter out the partitions whose leader is not up yet before send LeaderAndIsrRequest to broker. The idea is controller should handle all the edge cases instead of letting broker do it. Improve controller and broker message handling. --- Key: KAFKA-1701 URL: https://issues.apache.org/jira/browse/KAFKA-1701 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin This ticket is a memo for future controller refactoring. It is related to KAFKA-1647. Ideally, the broker should only follow instruction from controller but not handle it smartly. For KAFKA-1647, the controller should filter out the partitions whose leader is not up yet before send LeaderAndIsrRequest to broker. The idea is controller should handle all the edge cases instead of letting broker do it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Jenkins build is back to normal : Kafka-trunk #300
See https://builds.apache.org/job/Kafka-trunk/300/changes
[jira] [Updated] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled
[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1305: - Resolution: Fixed Status: Resolved (was: Patch Available) Controller can hang on controlled shutdown with auto leader balance enabled --- Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2, 0.9.0 Attachments: KAFKA-1305.patch, KAFKA-1305.patch, KAFKA-1305_2014-10-13_07:30:45.patch This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: 1. The previous controller was bounced and broker 265 became the new controller. 2. I went on to do a controlled shutdown of broker 265 (the new controller). 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) 6. So the request thread to broker 265 gets into infinite retries. 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: Controller-265-to-broker-265-send-thread - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... Thread-4 - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) at kafka.Kafka$$anon$1.run(Kafka.scala:42) ... kafka-scheduler-0 - Thread t@117 java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at
[jira] [Comment Edited] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169616#comment-14169616 ] James Oliver edited comment on KAFKA-1493 at 10/13/14 7:41 PM: --- The i/o streams provided in the patch are nearly identical to the i/o streams in lz4-java. As it stands, I don't think it buys us much. Other than that, looks good, just a few spots still referring to lz4hc (I think KAFKA-1471 changes were applied to trunk after this patch was created). This details the formal specification http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html Here's an example C++ implementation of that format: https://github.com/t-mat/lz4mt/blob/master/src/lz4mt.cpp We still need a Java implementation was (Author: joliver): Sure, I'll take a look at it now. Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: Ivan Lyutov Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26658: Patch for KAFKA-1493
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26658/ --- Review request for kafka. Bugs: KAFKA-1493 https://issues.apache.org/jira/browse/KAFKA-1493 Repository: kafka Description --- KAFKA-1493 Diffs - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java bf4ed66791b9a502aae6cb2ec7681f42732d9a43 clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 5227b2d7ab803389d1794f48c8232350c05b14fd clients/src/main/java/org/apache/kafka/common/record/Compressor.java 0323f5f7032dceb49d820c17a41b78c56591ffc4 config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 core/src/main/scala/kafka/message/CompressionCodec.scala de0a0fade5387db63299c6b112b3c9a5e41d82ec core/src/main/scala/kafka/message/CompressionFactory.scala 8420e13d0d8680648df78f22ada4a0d4e3ab8758 core/src/main/scala/kafka/tools/ConsoleProducer.scala b024a693c23cb21f1efe405ed414bf23f3974f31 core/src/main/scala/kafka/tools/PerfConfig.scala c72002976d90416559090a665f6494072a6c2dec core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala c95485170fd8b4f5faad740f049e5d09aca8829d core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 6f0addcea64f1e78a4de50ec8135f4d02cebd305 core/src/test/scala/unit/kafka/message/MessageTest.scala 958c1a60069ad85ae20f5c58e74679cd9fa6f70e Diff: https://reviews.apache.org/r/26658/diff/ Testing --- Thanks, James Oliver
[jira] [Updated] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Oliver updated KAFKA-1493: Attachment: KAFKA-1493.patch Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: Ivan Lyutov Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169987#comment-14169987 ] Guozhang Wang commented on KAFKA-1555: -- One question I have while trying to rebase KAFKA-1583 on KAFKA-1555: for the NotEnoughReplicasAfterAppend exception, what should be the behavior of the client? Currently the producer client would retry on all exceptions, but would that be ideal for NotEnoughReplicasAfterAppend, since the message has already been appended and retrying will guarantee duplicates? provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170002#comment-14170002 ] Guozhang Wang commented on KAFKA-1555: -- Another thought about the min.isr config: we put it into the log configs since we do not have partition configs besides the global server configs and log configs, and by doing so we are required to pass the requiredAcks from replica manager to partition, then to log in order to have it checked against the min isr. It may be better to add the partition configs and move min.isr into that config set instead of making it part of the log configs. Thoughts? provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1493) Use a well-documented LZ4 compression format and remove redundant LZ4HC option
[ https://issues.apache.org/jira/browse/KAFKA-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170071#comment-14170071 ] Jun Rao commented on KAFKA-1493: James, Thanks for the patch. A couple of more questions. 1. The following header frame used in the patch doesn't seem to match exactly what's described in http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html. So, we are inventing our own header? Is that ok? /* * Message format: * HEADER which consists of: * 1) magic byte sequence (8 bytes) * 2) compression method token (1 byte) * 3) compressed length (4 bytes) * 4) original message length (4 bytes) * and compressed message itself * Block size: 64 Kb * */ 2. If the io stream code in this patch is identical to that in lz4-java, could we just use lz4-java instead? Thanks, Use a well-documented LZ4 compression format and remove redundant LZ4HC option -- Key: KAFKA-1493 URL: https://issues.apache.org/jira/browse/KAFKA-1493 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2 Reporter: James Oliver Assignee: Ivan Lyutov Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1493.patch, KAFKA-1493.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170095#comment-14170095 ] Jun Rao commented on KAFKA-1555: When a client receives a NotEnoughReplicasAfterAppendException, it can choose to stop producing or wait and then retry. So, we can probably make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. All the topic configs are essentially per partition. However, some of those configs are applicable to the log and some others are only applicable to the logical partition. So, we can probably split topic configs into a logConfig and a partitionConfig and pass them to Log and Partition, respectively. This can be handled in a separate jira though. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-979) Add jitter for time based rolling
[ https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-979: Assignee: Ewen Cheslack-Postava (was: Sriram Subramanian) Status: Patch Available (was: Open) Add jitter for time based rolling - Key: KAFKA-979 URL: https://issues.apache.org/jira/browse/KAFKA-979 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-979.patch Currently, for low volume topics time based rolling happens at the same time. This causes a lot of IO on a typical cluster and creates back pressure. We need to add a jitter to prevent them from happening at the same time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-979) Add jitter for time based rolling
[ https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-979: Attachment: KAFKA-979.patch Add jitter for time based rolling - Key: KAFKA-979 URL: https://issues.apache.org/jira/browse/KAFKA-979 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Sriram Subramanian Labels: newbie Attachments: KAFKA-979.patch Currently, for low volume topics time based rolling happens at the same time. This causes a lot of IO on a typical cluster and creates back pressure. We need to add a jitter to prevent them from happening at the same time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26663: Patch for KAFKA-979
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/ --- Review request for kafka. Bugs: KAFKA-979 https://issues.apache.org/jira/browse/KAFKA-979 Repository: kafka Description --- KAFKA-979 Add optional random jitter for time based log rolling. Diffs - core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff core/src/main/scala/kafka/log/LogConfig.scala d2cc9e3d6b7a4fd24516d164eb3673e6ce052129 core/src/main/scala/kafka/log/LogSegment.scala 7597d309f37a0b3756381f9500100ef763d466ba core/src/main/scala/kafka/server/KafkaConfig.scala 7fcbc16da898623b03659c803e2a20c7d1bd1011 core/src/main/scala/kafka/server/KafkaServer.scala 3e9e91f2b456bbdeb3055d571e18ffea8675b4bf core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 7b97e6a80753a770ac094e101c653193dec67e68 core/src/test/scala/unit/kafka/log/LogTest.scala a0cbd3bbbeeabae12caa6b41aec31a8f5dfd034b Diff: https://reviews.apache.org/r/26663/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-979) Add jitter for time based rolling
[ https://issues.apache.org/jira/browse/KAFKA-979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170196#comment-14170196 ] Ewen Cheslack-Postava commented on KAFKA-979: - Created reviewboard https://reviews.apache.org/r/26663/diff/ against branch origin/trunk Add jitter for time based rolling - Key: KAFKA-979 URL: https://issues.apache.org/jira/browse/KAFKA-979 Project: Kafka Issue Type: Bug Reporter: Sriram Subramanian Assignee: Sriram Subramanian Labels: newbie Attachments: KAFKA-979.patch Currently, for low volume topics time based rolling happens at the same time. This causes a lot of IO on a typical cluster and creates back pressure. We need to add a jitter to prevent them from happening at the same time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1702) Messages silently Lost by producer
[ https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1702: --- Resolution: Fixed Fix Version/s: 0.8.3 Assignee: Alexis Midon (was: Jun Rao) Status: Resolved (was: Patch Available) Got it. +1 for the patch. Committed to trunk. Messages silently Lost by producer -- Key: KAFKA-1702 URL: https://issues.apache.org/jira/browse/KAFKA-1702 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.1.1 Reporter: Alexis Midon Assignee: Alexis Midon Fix For: 0.8.3 Attachments: KAFKA-1702.0.patch Hello, we lost millions of messages because of this {{try/catch}} in the producer {{DefaultEventHandler}}: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116 If a Throwable is caught by this {{try/catch}}, the retry policy will have no effect and all yet-to-be-sent messages are lost (the error will break the loop over the broker list). This issue is very hard to detect because: the producer (async or sync) cannot even catch the error, and *all* the metrics are updated as if everything was fine. Only the abnormal drop in the producers network I/O, or the incoming message rate on the brokers; or the alerting on errors in producer logs could have revealed the issue. This behavior was introduced by KAFKA-300. I can't see a good reason for it, so here is a patch that will let the retry-policy do its job when such a {{Throwable}} occurs. Thanks in advance for your help. Alexis ps: you might wonder how could this {{try/catch}} ever caught something? {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. Here are the details: We use Snappy compression. When the native snappy library is not installed on the host, Snappy, during the initialization of class {{org.xerial.snappy.Snappy}} will [write a C library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312] in the JVM temp directory {{java.io.tmpdir}}. In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an instance reboot (thank you [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp directory was removed. The JVM was then running with a non-existing temp dir. Snappy class would be impossible to initialize and the following message would be silently logged: {code} ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: Failed to send messages ! java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26373: Patch for KAFKA-1647
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 13, 2014, 11:38 p.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description (updated) --- Addressed Joel's comments. Diffs (updated) - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts
[ https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1647: Attachment: KAFKA-1647_2014-10-13_16:38:39.patch Replication offset checkpoints (high water marks) can be lost on hard kills and restarts Key: KAFKA-1647 URL: https://issues.apache.org/jira/browse/KAFKA-1647 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: Jiangjie Qin Priority: Critical Labels: newbie++ Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch We ran into this scenario recently in a production environment. This can happen when enough brokers in a cluster are taken down. i.e., a rolling bounce done properly should not cause this issue. It can occur if all replicas for any partition are taken down. Here is a sample scenario: * Cluster of three brokers: b0, b1, b2 * Two partitions (of some topic) with replication factor two: p0, p1 * Initial state: p0: leader = b0, ISR = {b0, b1} p1: leader = b1, ISR = {b0, b1} * Do a parallel hard-kill of all brokers * Bring up b2, so it is the new controller * b2 initializes its controller context and populates its leader/ISR cache (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last known leaders are b0 (for p0) and b1 (for p2) * Bring up b1 * The controller's onBrokerStartup procedure initiates a replica state change for all replicas on b1 to become online. As part of this replica state change it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not included in the leaders field because b0 is down. * On receiving the LeaderAndIsrRequest, b1's replica manager will successfully make itself (b1) the leader for p1 (and create the local replica object corresponding to p1). It will however abort the become follower transition for p0 because the designated leader b0 is offline. So it will not create the local replica object for p0. * It will then start the high water mark checkpoint thread. Since only p1 has a local replica object, only p1's high water mark will be checkpointed to disk. p0's previously written checkpoint if any will be lost. So in summary it seems we should always create the local replica object even if the online transition does not happen. Possible symptoms of the above bug could be one or more of the following (we saw 2 and 3): # Data loss; yes on a hard-kill data loss is expected, but this can actually cause loss of nearly all data if the broker becomes follower, truncates, and soon after happens to become leader. # High IO on brokers that lose their high water mark then subsequently (on a successful become follower transition) truncate their log to zero and start catching up from the beginning. # If the offsets topic is affected, then offsets can get reset. This is because during an offset load we don't read past the high water mark. So if a water mark is missing then we don't load anything (even if the offsets are there in the log). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: including KAFKA-1555 in 0.8.2?
Great. Pushed to 0.8.2 too. Jun On Fri, Oct 10, 2014 at 2:52 PM, Joel Koshy jjkosh...@gmail.com wrote: +1 On Fri, Oct 10, 2014 at 04:53:45PM +, Sriram Subramanian wrote: +1 On 10/10/14 9:39 AM, Gwen Shapira gshap...@cloudera.com wrote: +1 :) On Fri, Oct 10, 2014 at 9:34 AM, Joe Stein joe.st...@stealth.ly wrote: +1 On Oct 10, 2014 12:08 PM, Neha Narkhede neha.narkh...@gmail.com wrote: +1. On Thu, Oct 9, 2014 at 6:41 PM, Jun Rao jun...@gmail.com wrote: Hi, Everyone, I just committed KAFKA-1555 (min.isr support) to trunk. I felt that it's probably useful to include it in the 0.8.2 release. Any objections? Thanks, Jun
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170256#comment-14170256 ] Jun Rao commented on KAFKA-1555: Committed to 0.8.2 too. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26666: Patch for KAFKA-1653
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/ --- Review request for kafka. Bugs: KAFKA-1653 https://issues.apache.org/jira/browse/KAFKA-1653 Repository: kafka Description --- KAFKA-1653 Disallow duplicate broker IDs in user input for admin commands. This covers a few cases besides the one identified in the bug. Aside from a major refactoring to use Sets for broker/replica lists, sanitizing user input seems to be the best solution here. I chose to generate errors instead of just using toSet since a duplicate entry may indicate that a different broker id was accidentally omitted. Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb Diff: https://reviews.apache.org/r/2/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170264#comment-14170264 ] Ewen Cheslack-Postava commented on KAFKA-1653: -- Created reviewboard https://reviews.apache.org/r/2/diff/ against branch origin/trunk Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1653.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170265#comment-14170265 ] Guozhang Wang commented on KAFKA-1634: -- I will work on this after KAFKA-1583. Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Joel Koshy Priority: Blocker Fix For: 0.8.2 From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1704) Add PartitionConfig besides LogConfig
Guozhang Wang created KAFKA-1704: Summary: Add PartitionConfig besides LogConfig Key: KAFKA-1704 URL: https://issues.apache.org/jira/browse/KAFKA-1704 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Today we only two places to store configs: server configs which is used to store server side global configs, and log configs to store others. However, many topic / partition level configs would be better stored in a partition config such that they do not need to require accessing the underlying logs, for example: 1. uncleanLeaderElectionEnable 2. minInSyncReplicas 3. compact [? this is defined per-topic / partition but maybe ok to store as log configs] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1697) remove code related to ack1 on the broker
[ https://issues.apache.org/jira/browse/KAFKA-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1697: - Description: We removed the ack1 support from the producer client in kafka-1555. We can completely remove the code in the broker that supports ack1. Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. was:We removed the ack1 support from the producer client in kafka-1555. We can completely remove the code in the broker that supports ack1. remove code related to ack1 on the broker -- Key: KAFKA-1697 URL: https://issues.apache.org/jira/browse/KAFKA-1697 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Gwen Shapira Fix For: 0.8.3 We removed the ack1 support from the producer client in kafka-1555. We can completely remove the code in the broker that supports ack1. Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable exception and let the client decide what to do. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170272#comment-14170272 ] Guozhang Wang commented on KAFKA-1555: -- Created KAFKA-1704 for partition configs. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170275#comment-14170275 ] Jun Rao commented on KAFKA-1481: Vladimir, Thanks for the patch. My suggestion is actually slightly different from what you did. Instead of using kafka.consumer:type=ConsumerTopicMetrics,name=clientId=af_servers,AllTopics,BytesPerSec I was suggesting kafka.consumer:type=ConsumerTopicMetrics,clientId=af_servers,topic=AllTopics,name=BytesPerSec This is probably the more standard mbean name. We can do that by using the following method to create MetricName and pass in the mBeanName that we want. public MetricName(String group, String type, String name, String scope, String mBeanName). We also need to extend KafkaMetricsGroup by adding new helper functions that take a MetricName explicitly. def newMeter(name: MetricName, eventType: String, timeUnit: TimeUnit) Also, your patch doesn't seem to apply to latest trunk. git apply ~/Downloads/KAFKA-1481_2014-10-13_18-23-35.patch error: core/src/main/scala/kafka/common/ClientIdTopic.scala: No such file or directory Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 26667: Patch for KAFKA-1698
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26667/ --- Review request for kafka. Bugs: KAFKA-1698 https://issues.apache.org/jira/browse/KAFKA-1698 Repository: kafka Description --- KAFKA-1698 Validate parsed ConfigDef values in addition to the default values. Diffs - clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 227309e8c62f9fc435722f28f2deff4a48e30853 clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java 09a82feeb7cae95209e54d3554224915a1498ebd Diff: https://reviews.apache.org/r/26667/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1698) Validator.ensureValid() only validates default config value
[ https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1698: - Assignee: Ewen Cheslack-Postava Status: Patch Available (was: Open) Validator.ensureValid() only validates default config value --- Key: KAFKA-1698 URL: https://issues.apache.org/jira/browse/KAFKA-1698 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Assignee: Ewen Cheslack-Postava Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1698.patch We should use it to validate the actual configured value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1698) Validator.ensureValid() only validates default config value
[ https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1698: - Attachment: KAFKA-1698.patch Validator.ensureValid() only validates default config value --- Key: KAFKA-1698 URL: https://issues.apache.org/jira/browse/KAFKA-1698 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1698.patch We should use it to validate the actual configured value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1698) Validator.ensureValid() only validates default config value
[ https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170280#comment-14170280 ] Ewen Cheslack-Postava commented on KAFKA-1698: -- Created reviewboard https://reviews.apache.org/r/26667/diff/ against branch origin/trunk Validator.ensureValid() only validates default config value --- Key: KAFKA-1698 URL: https://issues.apache.org/jira/browse/KAFKA-1698 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1698.patch We should use it to validate the actual configured value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
Hi Kafka Dev Team, Let me know if new Producer is supporting sending data to single partition (without blocking). We have use-case for Aggregating Events. Thanks, Bhavesh On Mon, Oct 13, 2014 at 10:54 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Dev Team, When I run the test to send message to single partition for 3 minutes or so on, I have encounter deadlock (please see the screen attached) and thread contention from YourKit profiling. Use Case: 1) Aggregating messages into same partition for metric counting. 2) Replicate Old Producer behavior for sticking to partition for 3 minutes. Here is output: Frozen threads found (potential deadlock) It seems that the following threads have not changed their stack for more than 10 seconds. These threads are possibly (but not necessarily!) in a deadlock or hung. pool-1-thread-128 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-159 --- Frozen for at least 2m 1 sec org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744 pool-1-thread-55 --- Frozen for at least 2m org.apache.kafka.clients.producer.internals.RecordAccumulator.append(TopicPartition, byte[], byte[], CompressionType, Callback) RecordAccumulator.java:139 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) KafkaProducer.java:237 org.kafka.test.TestNetworkDownProducer$MyProducer.run() TestNetworkDownProducer.java:84 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) ThreadPoolExecutor.java:1145 java.util.concurrent.ThreadPoolExecutor$Worker.run() ThreadPoolExecutor.java:615 java.lang.Thread.run() Thread.java:744
Re: Review Request 24676: Rebase KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 14, 2014, 2:42 a.m.) Review request for kafka. Summary (updated) - Rebase KAFKA-1583 Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14170405#comment-14170405 ] Guozhang Wang commented on KAFKA-1583: -- Updated reviewboard https://reviews.apache.org/r/24676/diff/ against branch origin/trunk Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1583: - Attachment: KAFKA-1583_2014-10-13_19:41:58.patch Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] Message Metadata
Hi Jay, Thanks for the comments. Replied inline. Guozhang On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps jay.kr...@gmail.com wrote: I need to take more time to think about this. Here are a few off-the-cuff remarks: - To date we have tried really, really hard to keep the data model for message simple since after all you can always add whatever you like inside the message body. - For system tags, why not just make these fields first class fields in message? The purpose of a system tag is presumably that Why have a bunch of key-value pairs versus first-class fields? Yes, we can alternatively make system tags as first class fields in the message header to make the format / processing logic simpler. The main reasons I put them as systems tags are 1) when I think about these possible system tags, some of them are for all types of messages (e.g. timestamps), but some of them may be for a specific type of message (compressed, control message) and for those not all of them are necessarily required all the time, hence making them as compact tags may save us some space when not all of them are available; 2) with tags we do not need to bump up the protocol version every time we make a change to it, which includes keeping the logic to handle all versions on the broker until the old ones are officially discarded; instead, the broker can just ignore a tag if its id is not recognizable since the client is on a newer version, or use some default value / throw exception if a required tag is missing since the client is on an older version. - You don't necessarily need application-level tags explicitly represented in the message format for efficiency. The application can define their own header (e.g. their message could be a size delimited header followed by a size delimited body). But actually if you use Avro you don't even need this I don't think. Avro has the ability to just deserialize the header fields in your message. Avro has a notion of reader and writer schemas. The writer schema is whatever the message was written with. If the reader schema is just the header, avro will skip any fields it doesn't need and just deserialize the fields it does need. This is actually a much more usable and flexible way to define a header since you get all the types avro allows instead of just bytes. I agree that we can use a reader schema to just read out the header without de-serializing the full message, and probably for compressed message we can add an Avro / etc header for the compressed wrapper message also, but that would enforce these applications (MM, auditor, clients) to be schema-aware, which would usually require the people who manage this data pipeline also manage the schemas, whereas ideally Kafka itself should just consider bytes-in and bytes-out (and maybe a little bit more, like timestamps). The purpose here is to not introduce an extra dependency while at the same time allow applications to not fully de-serialize / de-compress the message in order to do some simple processing based on metadata only. - We will need to think carefully about what to do with timestamps if we end up including them. There are actually several timestamps - The time the producer created the message - The time the leader received the message - The time the current broker received the message The producer timestamps won't be at all increasing. The leader timestamp will be mostly increasing except when the clock changes or leadership moves. This somewhat complicates the use of these timestamps, though. From the point of view of the producer the only time that matters is the time the message was created. However since the producer sets this it can be arbitrarily bad (remember all the ntp issues and 1970 timestamps we would get). Say that the heuristic was to use the timestamp of the first message in a file for retention, the problem would be that the timestamps for the segments need not even be sequential and a single bad producer could send data with time in the distant past or future causing data to be deleted or retained forever. Using the broker timestamp at write time is better, though obvious that would be overwritten when data is mirrored between clusters (the mirror would then have a different time--and if the mirroring ever stopped that gap could be large). One approach would be to use the client timestamp but have the broker overwrite it if it is too bad (e.g. off by more than a minute, say). We would need the reception timestamp (i.e. the third one) for log cleaning, and as for the first / second ones, I originally put them as app tags since they are likely to be used not by the brokers itself (e.g. auditor, etc). -Jay On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy jjkosh...@gmail.com wrote: Thanks Guozhang! This is an excellent write-up and the approach nicely consolidates a number of long-standing issues. It would be great if everyone can review this carefully
[jira] [Updated] (KAFKA-1703) The bat script failed to start on windows
[ https://issues.apache.org/jira/browse/KAFKA-1703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1703: - Labels: newbie (was: ) The bat script failed to start on windows - Key: KAFKA-1703 URL: https://issues.apache.org/jira/browse/KAFKA-1703 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.1.1 Reporter: ChengRen Labels: newbie The bat script in bin\windows can not start zookeeper and kafka correctly (where my os is just installed and only jdk ready). I modified the kafka-run-class.bat and add jars in libs folder to classpath. for %%i in (%BASE_DIR%\core\lib\*.jar) do ( call :concat %%i ) added begin for %%i in (%BASE_DIR%\..\libs\*.jar) do ( call :concat %%i ) added end for %%i in (%BASE_DIR%\perf\target\scala-2.8.0/kafka*.jar) do ( call :concat %%i ) Now it runs correctly. Under bin\windows: zookeeper-server-start.bat ..\..\config\zookeeper.properties kafka-server-start.bat ..\..\config\kafka.properties -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26663: Patch for KAFKA-979
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/#review56490 --- Looks good. Just one minor comment. Could you get rid of this build warning? /Users/nnarkhed/Projects/kafka/core/src/main/scala/kafka/log/LogConfig.scala:103: object Math is deprecated: use the scala.math package object instead. (Example package object usage: scala.math.Pi ) if (segmentJitterMs == 0) 0 else scala.util.Random.nextLong() % Math.min(segmentJitterMs, segmentMs) - Neha Narkhede On Oct. 13, 2014, 11:16 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26663/ --- (Updated Oct. 13, 2014, 11:16 p.m.) Review request for kafka. Bugs: KAFKA-979 https://issues.apache.org/jira/browse/KAFKA-979 Repository: kafka Description --- KAFKA-979 Add optional random jitter for time based log rolling. Diffs - core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff core/src/main/scala/kafka/log/LogConfig.scala d2cc9e3d6b7a4fd24516d164eb3673e6ce052129 core/src/main/scala/kafka/log/LogSegment.scala 7597d309f37a0b3756381f9500100ef763d466ba core/src/main/scala/kafka/server/KafkaConfig.scala 7fcbc16da898623b03659c803e2a20c7d1bd1011 core/src/main/scala/kafka/server/KafkaServer.scala 3e9e91f2b456bbdeb3055d571e18ffea8675b4bf core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 7b97e6a80753a770ac094e101c653193dec67e68 core/src/test/scala/unit/kafka/log/LogTest.scala a0cbd3bbbeeabae12caa6b41aec31a8f5dfd034b Diff: https://reviews.apache.org/r/26663/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1634: - Assignee: Guozhang Wang (was: Joel Koshy) Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Guozhang Wang Priority: Blocker Fix For: 0.8.2 From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)