[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 8:07 AM: [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does it have to create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that keepalive time is exceeded by the thread pool and threads are cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:04 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that the keep alive time set on the thread pool will be exceeded and the threads in the threadpool will be cleared after each iteration. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does Kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); }
[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817 ] Vishal edited comment on KAFKA-1745 at 11/6/14 12:03 PM: - [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); } Thread.sleep(1); //10 seconds So that the keep alive time set on the thread pool will be exceeded and the threads in the threadpool will be cleared. } } } {code} Though KQUEUE and PIPE does get cleared after some time (in this test case, it was after 1-2 minutes), why does Kafka's api create a new one when a new thread accesses the producer object? was (Author: vishal m): [~junrao] Sorry, I should have been more clear. [~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a sample test case which can replicate the problem (check lsof every 10 seconds to notice the increase in KQUEUEs and PIPESs though the producer object is being reused): {code} public class Test { private static QueueProducerString, String producerPool = new ConcurrentLinkedQueueProducerString,String(); private static ProducerConfig config; static { Properties props = new Properties(); props.put(metadata.broker.list, IP:Port); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); config = new ProducerConfig(props); } public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueueRunnable()); tpe.allowCoreThreadTimeOut(true); Thread run = new Thread(new Runnable() { @Override public void run() { ProducerString, String producer = producerPool.poll(); if(producer == null) { producer = new ProducerString, String(config); } KeyedMessageString, String data = new KeyedMessageString, String(SaSMQ, 0, test); producer.send(data); producerPool.add(producer); } }); while(true) //To make sure that the main program does not terminate { for(int i = 0;i100; i++) { tpe.submit(run); }
[jira] [Created] (KAFKA-1757) Can not delete Topic index on Windows
Lukáš Vyhlídka created KAFKA-1757: - Summary: Can not delete Topic index on Windows Key: KAFKA-1757 URL: https://issues.apache.org/jira/browse/KAFKA-1757 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.2 Reporter: Lukáš Vyhlídka Assignee: Jay Kreps Priority: Minor When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to delete the Topic throwed an error: ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] (kafka.server.KafkaApis) kafka.common.KafkaStorageException: Delete of index .index failed. at kafka.log.LogSegment.delete(LogSegment.scala:283) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.delete(Log.scala:608) at kafka.log.LogManager.deleteLog(LogManager.scala:375) at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inWriteLock(Utils.scala:543) at kafka.cluster.Partition.delete(Partition.scala:139) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:744) When I have investigated the issue I figured out that the index file (in my environment it was C:\tmp\kafka-logs\----0014-0\.index) was locked by the kafka process and the OS did not allow to delete that file. I tried to fix the problem in source codes and when I added close() method call into LogSegment.delete(), the Topic deletion started to work. I will add here (not sure how to upload the file during issue creation) a diff with the changes I have made so You can take a look on that whether it is reasonable or not. It would be perfect if it could make it into the product... In the end I would like to say that on Linux the deletion works just fine... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1757) Can not delete Topic index on Windows
[ https://issues.apache.org/jira/browse/KAFKA-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lukáš Vyhlídka updated KAFKA-1757: -- Fix Version/s: 0.8.2 Status: Patch Available (was: Open) Can not delete Topic index on Windows - Key: KAFKA-1757 URL: https://issues.apache.org/jira/browse/KAFKA-1757 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.2 Reporter: Lukáš Vyhlídka Assignee: Jay Kreps Priority: Minor Fix For: 0.8.2 When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to delete the Topic throwed an error: ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] (kafka.server.KafkaApis) kafka.common.KafkaStorageException: Delete of index .index failed. at kafka.log.LogSegment.delete(LogSegment.scala:283) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.delete(Log.scala:608) at kafka.log.LogManager.deleteLog(LogManager.scala:375) at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inWriteLock(Utils.scala:543) at kafka.cluster.Partition.delete(Partition.scala:139) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:744) When I have investigated the issue I figured out that the index file (in my environment it was C:\tmp\kafka-logs\----0014-0\.index) was locked by the kafka process and the OS did not allow to delete that file. I tried to fix the problem in source codes and when I added close() method call into LogSegment.delete(), the Topic deletion started to work. I will add here (not sure how to upload the file during issue creation) a diff with the changes I have made so You can take a look on that whether it is reasonable or not. It would be perfect if it could make it into the product... In the end I would like to say that on Linux the deletion works just fine... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1757) Can not delete Topic index on Windows
[ https://issues.apache.org/jira/browse/KAFKA-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lukáš Vyhlídka updated KAFKA-1757: -- Status: Open (was: Patch Available) Can not delete Topic index on Windows - Key: KAFKA-1757 URL: https://issues.apache.org/jira/browse/KAFKA-1757 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.2 Reporter: Lukáš Vyhlídka Assignee: Jay Kreps Priority: Minor Fix For: 0.8.2 When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to delete the Topic throwed an error: ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] (kafka.server.KafkaApis) kafka.common.KafkaStorageException: Delete of index .index failed. at kafka.log.LogSegment.delete(LogSegment.scala:283) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.delete(Log.scala:608) at kafka.log.LogManager.deleteLog(LogManager.scala:375) at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inWriteLock(Utils.scala:543) at kafka.cluster.Partition.delete(Partition.scala:139) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:744) When I have investigated the issue I figured out that the index file (in my environment it was C:\tmp\kafka-logs\----0014-0\.index) was locked by the kafka process and the OS did not allow to delete that file. I tried to fix the problem in source codes and when I added close() method call into LogSegment.delete(), the Topic deletion started to work. I will add here (not sure how to upload the file during issue creation) a diff with the changes I have made so You can take a look on that whether it is reasonable or not. It would be perfect if it could make it into the product... In the end I would like to say that on Linux the deletion works just fine... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1757) Can not delete Topic index on Windows
[ https://issues.apache.org/jira/browse/KAFKA-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lukáš Vyhlídka updated KAFKA-1757: -- Attachment: lucky-v.patch A patch that looks like a fix for the issue. Can not delete Topic index on Windows - Key: KAFKA-1757 URL: https://issues.apache.org/jira/browse/KAFKA-1757 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.2 Reporter: Lukáš Vyhlídka Assignee: Jay Kreps Priority: Minor Fix For: 0.8.2 Attachments: lucky-v.patch When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to delete the Topic throwed an error: ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] (kafka.server.KafkaApis) kafka.common.KafkaStorageException: Delete of index .index failed. at kafka.log.LogSegment.delete(LogSegment.scala:283) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.delete(Log.scala:608) at kafka.log.LogManager.deleteLog(LogManager.scala:375) at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inWriteLock(Utils.scala:543) at kafka.cluster.Partition.delete(Partition.scala:139) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:744) When I have investigated the issue I figured out that the index file (in my environment it was C:\tmp\kafka-logs\----0014-0\.index) was locked by the kafka process and the OS did not allow to delete that file. I tried to fix the problem in source codes and when I added close() method call into LogSegment.delete(), the Topic deletion started to work. I will add here (not sure how to upload the file during issue creation) a diff with the changes I have made so You can take a look on that whether it is reasonable or not. It would be perfect if it could make it into the product... In the end I would like to say that on Linux the deletion works just fine... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1757) Can not delete Topic index on Windows
[ https://issues.apache.org/jira/browse/KAFKA-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lukáš Vyhlídka updated KAFKA-1757: -- Status: Patch Available (was: Open) Can not delete Topic index on Windows - Key: KAFKA-1757 URL: https://issues.apache.org/jira/browse/KAFKA-1757 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.2 Reporter: Lukáš Vyhlídka Assignee: Jay Kreps Priority: Minor Fix For: 0.8.2 Attachments: lucky-v.patch When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to delete the Topic throwed an error: ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] (kafka.server.KafkaApis) kafka.common.KafkaStorageException: Delete of index .index failed. at kafka.log.LogSegment.delete(LogSegment.scala:283) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.delete(Log.scala:608) at kafka.log.LogManager.deleteLog(LogManager.scala:375) at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inWriteLock(Utils.scala:543) at kafka.cluster.Partition.delete(Partition.scala:139) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:744) When I have investigated the issue I figured out that the index file (in my environment it was C:\tmp\kafka-logs\----0014-0\.index) was locked by the kafka process and the OS did not allow to delete that file. I tried to fix the problem in source codes and when I added close() method call into LogSegment.delete(), the Topic deletion started to work. I will add here (not sure how to upload the file during issue creation) a diff with the changes I have made so You can take a look on that whether it is reasonable or not. It would be perfect if it could make it into the product... In the end I would like to say that on Linux the deletion works just fine... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmytro Kostiuchenko updated KAFKA-1667: --- Attachment: KAFKA-1667_2014-11-06_17:10:14.patch topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch, KAFKA-1667_2014-11-05_19:43:53.patch, KAFKA-1667_2014-11-06_17:10:14.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27634: Patch for KAFKA-1667
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/ --- (Updated Nov. 6, 2014, 4:10 p.m.) Review request for kafka. Bugs: KAFKA-1667 https://issues.apache.org/jira/browse/KAFKA-1667 Repository: kafka Description (updated) --- KAFKA-1667 Fixed bugs in LogConfig. Added test and documentation KAFKA-1667 Updated tests to reflect new boolean property parsing logic KAFKA-1677 renamed methods to match naming convention Diffs (updated) - clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java c4cea2cc072f4db4ce014b63d226431d3766bef1 core/src/main/scala/kafka/admin/TopicCommand.scala 0b2735e7fc42ef9894bef1997b1f06a8ebee5439 core/src/main/scala/kafka/log/LogConfig.scala e48922a97727dd0b98f3ae630ebb0af3bef2373d core/src/main/scala/kafka/utils/Utils.scala 23aefb4715b177feae1d2f83e8b910653ea10c5f core/src/test/scala/kafka/log/LogConfigTest.scala PRE-CREATION core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala f44568cb25edf25db857415119018fd4c9922f61 Diff: https://reviews.apache.org/r/27634/diff/ Testing --- Thanks, Dmytro Kostiuchenko
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200370#comment-14200370 ] Dmytro Kostiuchenko commented on KAFKA-1667: Updated reviewboard https://reviews.apache.org/r/27634/diff/ against branch origin/trunk topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch, KAFKA-1667_2014-11-05_19:43:53.patch, KAFKA-1667_2014-11-06_17:10:14.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27634: Patch for KAFKA-1667
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/ --- (Updated Nov. 6, 2014, 4:12 p.m.) Review request for kafka. Bugs: KAFKA-1667 https://issues.apache.org/jira/browse/KAFKA-1667 Repository: kafka Description (updated) --- KAFKA-1667 Fixed bugs in LogConfig. Added test and documentation KAFKA-1667 Updated tests to reflect new boolean property parsing logic KAFKA-1667 renamed methods to match naming convention Diffs - clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java c4cea2cc072f4db4ce014b63d226431d3766bef1 core/src/main/scala/kafka/admin/TopicCommand.scala 0b2735e7fc42ef9894bef1997b1f06a8ebee5439 core/src/main/scala/kafka/log/LogConfig.scala e48922a97727dd0b98f3ae630ebb0af3bef2373d core/src/main/scala/kafka/utils/Utils.scala 23aefb4715b177feae1d2f83e8b910653ea10c5f core/src/test/scala/kafka/log/LogConfigTest.scala PRE-CREATION core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala f44568cb25edf25db857415119018fd4c9922f61 Diff: https://reviews.apache.org/r/27634/diff/ Testing --- Thanks, Dmytro Kostiuchenko
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200381#comment-14200381 ] Dmytro Kostiuchenko commented on KAFKA-1667: The main issue with the old patch was handling {{Properties}} defaults. When properties were passed to {{fromProps(Properties)}} method, {{Properties}} object has been exposed as a {{Map}}. {{Map.get()}}, though is implemented in {{Hashtable}} and thus knows nothing about {{Properties}} defaults. Fixed that, added few trivial tests to check general correctness. There is an impact on the client code though: boolean values are now parsed via {{Boolean.parseBoolean}} throwing no exception but instead falling back to {{false}} for invalid input. topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667.patch, KAFKA-1667_2014-11-05_19:43:53.patch, KAFKA-1667_2014-11-06_17:10:14.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1754) KOYA - Kafka on YARN
[ https://issues.apache.org/jira/browse/KAFKA-1754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200409#comment-14200409 ] Thomas Weise commented on KAFKA-1754: - [~ste...@apache.org] Yes, we are running DataTorrent on secure clusters. You are probably thinking about the token max life time issue as addressed in [SLIDER-474]? KOYA - Kafka on YARN Key: KAFKA-1754 URL: https://issues.apache.org/jira/browse/KAFKA-1754 Project: Kafka Issue Type: New Feature Reporter: Thomas Weise Attachments: DT-KOYA-Proposal- JIRA.pdf YARN (Hadoop 2.x) has enabled clusters to be used for a variety of workloads, emerging as distributed operating system for big data applications. Initiatives are on the way to bring long running services under the YARN umbrella, leveraging it for centralized resource management and operations ([YARN-896] and examples such as HBase, Accumulo or Memcached through Slider). This JIRA is to propose KOYA (Kafka On Yarn), a YARN application master to launch and manage Kafka clusters running on YARN. Brokers will use resources allocated through YARN with support for recovery, monitoring etc. Please see attached for more details. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1754) KOYA - Kafka on YARN
[ https://issues.apache.org/jira/browse/KAFKA-1754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200455#comment-14200455 ] Thomas Weise commented on KAFKA-1754: - [~acmurthy] Thanks for the support. Disk reservation will benefit Kafka greatly. Will reach out with a few other questions. {quote} Inspired by this jira, I've opened YARN-2817... I'd like to throw out the idea that Kafka could start by reserving entire drives on nodes, exclusively when running on YARN. This would ensure that Kafka would not get interference from other applications like HDFS, MR etc. {quote} KOYA - Kafka on YARN Key: KAFKA-1754 URL: https://issues.apache.org/jira/browse/KAFKA-1754 Project: Kafka Issue Type: New Feature Reporter: Thomas Weise Attachments: DT-KOYA-Proposal- JIRA.pdf YARN (Hadoop 2.x) has enabled clusters to be used for a variety of workloads, emerging as distributed operating system for big data applications. Initiatives are on the way to bring long running services under the YARN umbrella, leveraging it for centralized resource management and operations ([YARN-896] and examples such as HBase, Accumulo or Memcached through Slider). This JIRA is to propose KOYA (Kafka On Yarn), a YARN application master to launch and manage Kafka clusters running on YARN. Brokers will use resources allocated through YARN with support for recovery, monitoring etc. Please see attached for more details. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1754) KOYA - Kafka on YARN
[ https://issues.apache.org/jira/browse/KAFKA-1754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200457#comment-14200457 ] Thomas Weise commented on KAFKA-1754: - [~acmurthy] Thanks for the support. Disk reservation will benefit Kafka greatly. Will reach out with a few other questions. KOYA - Kafka on YARN Key: KAFKA-1754 URL: https://issues.apache.org/jira/browse/KAFKA-1754 Project: Kafka Issue Type: New Feature Reporter: Thomas Weise Attachments: DT-KOYA-Proposal- JIRA.pdf YARN (Hadoop 2.x) has enabled clusters to be used for a variety of workloads, emerging as distributed operating system for big data applications. Initiatives are on the way to bring long running services under the YARN umbrella, leveraging it for centralized resource management and operations ([YARN-896] and examples such as HBase, Accumulo or Memcached through Slider). This JIRA is to propose KOYA (Kafka On Yarn), a YARN application master to launch and manage Kafka clusters running on YARN. Brokers will use resources allocated through YARN with support for recovery, monitoring etc. Please see attached for more details. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1743) ConsumerConnector.commitOffsets in 0.8.2 is not backward compatible
[ https://issues.apache.org/jira/browse/KAFKA-1743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1743: --- Assignee: Manikumar Reddy Status: Patch Available (was: Open) ConsumerConnector.commitOffsets in 0.8.2 is not backward compatible --- Key: KAFKA-1743 URL: https://issues.apache.org/jira/browse/KAFKA-1743 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Manikumar Reddy Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1743.patch In 0.8.1.x, ConsumerConnector has the following api: def commitOffsets This is changed to the following in 0.8.2 and breaks compatibility def commitOffsets(retryOnFailure: Boolean = true) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200510#comment-14200510 ] Jun Rao commented on KAFKA-1738: You actually found a real bug, thanks! We exposed an existing problem after adding the ability to kill idle connections in KAFKA-1282. The default max idle time happens to be 10 minutes. That's why you only see the issue if the topics are created more than 10 mins apart. I will attach a patch soon. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] -
Review Request 27690: Patch for kafka-1738
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27690/ --- Review request for kafka. Bugs: kafka-1738 https://issues.apache.org/jira/browse/kafka-1738 Repository: kafka Description --- try/catch should include channel.receive() Diffs - core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 Diff: https://reviews.apache.org/r/27690/diff/ Testing --- Thanks, Jun Rao
[jira] [Updated] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1738: --- Assignee: Jun Rao Status: Patch Available (was: Open) Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Assignee: Jun Rao Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt, kafka-1738.patch We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200512#comment-14200512 ] Jun Rao commented on KAFKA-1738: Created reviewboard https://reviews.apache.org/r/27690/diff/ against branch origin/trunk Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1, 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt, kafka-1738.patch We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at
[jira] [Updated] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1738: --- Priority: Blocker (was: Major) Affects Version/s: (was: 0.8.1.1) Fix Version/s: 0.8.2 Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt, kafka-1738.patch We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread)
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200522#comment-14200522 ] Jun Rao commented on KAFKA-1738: Sri, Pradeep, Do you think you can try the patch and see if this fixes your issue? Also marking this as an 0.8.2 blocker. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt, kafka-1738.patch We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092.
Re: No longer supporting Java 6, if? when?
Java 6 has been End of Life since Feb 2013. Java 7 (and 8, but unfortunately that's too new still) has very compelling features which can make development a lot easier. The sooner more projects drop Java 6 the better, in my opinion :) On Nov 5, 2014, at 7:45 PM, Worthy LaFollette wort...@gmail.com wrote: Mostly converted now to 1.7, this would be welcomed to get any new features. On Wed Nov 05 2014 at 7:32:55 PM Joe Stein joe.st...@stealth.ly wrote: This has been coming up in a lot of projects and for other reasons too I wanted to kick off the discussion about if/when we end support for Java 6. Besides any API we may want to use in = 7 we also compile our binaries for 6 for release currently. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
[jira] [Commented] (KAFKA-1723) make the metrics name in new producer more standard
[ https://issues.apache.org/jira/browse/KAFKA-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200536#comment-14200536 ] Otis Gospodnetic commented on KAFKA-1723: - I don't follow this 100%, [~junrao]. The old producer should be all set once KAFKA-1481 is committed - all (not just producer) mbean names will be consistent and easily parseable. I think there are really 2 parts here: # mbean naming consistency # existence of mbean names For 1) I think you are saying that when this new producer is released in 0.8.3 whoever works on it will make sure the mbean names are consistent with the naming we got going via KAFKA-1481. But what about 2)? Are you saying this new producer will have some *new mbeans*? Some *additional mbeans*? Or are some mbeans that exist in 0.8.2 going to be *removed*? make the metrics name in new producer more standard --- Key: KAFKA-1723 URL: https://issues.apache.org/jira/browse/KAFKA-1723 Project: Kafka Issue Type: Sub-task Components: clients Affects Versions: 0.8.2 Reporter: Jun Rao Fix For: 0.8.3 The jmx name in the new producer looks like the following: kafka.producer.myclientid:type=mytopic However, this can be ambiguous since we allow . in client id and topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: No longer supporting Java 6, if? when?
+1 for dropping Java 6 On Thu, Nov 6, 2014 at 9:31 AM, Steven Schlansker sschlans...@opentable.com wrote: Java 6 has been End of Life since Feb 2013. Java 7 (and 8, but unfortunately that's too new still) has very compelling features which can make development a lot easier. The sooner more projects drop Java 6 the better, in my opinion :) On Nov 5, 2014, at 7:45 PM, Worthy LaFollette wort...@gmail.com wrote: Mostly converted now to 1.7, this would be welcomed to get any new features. On Wed Nov 05 2014 at 7:32:55 PM Joe Stein joe.st...@stealth.ly wrote: This has been coming up in a lot of projects and for other reasons too I wanted to kick off the discussion about if/when we end support for Java 6. Besides any API we may want to use in = 7 we also compile our binaries for 6 for release currently. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
Review Request 27691: Patch for KAFKA-1476
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27691/ --- Review request for kafka. Bugs: KAFKA-1476 https://issues.apache.org/jira/browse/KAFKA-1476 Repository: kafka Description --- KAFKA-328 Write unit test for kafka server startup and shutdown API Diffs - core/src/main/scala/kafka/tools/ConsumerCommand.scala PRE-CREATION core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 1bfb501b2f29c50f3fc5f930fdaad02e03b91e4f core/src/test/scala/unit/kafka/server/ServerStartupTest.scala a0ed4855f2550a0eb2e363dd2fccd8377a9ac172 Diff: https://reviews.apache.org/r/27691/diff/ Testing --- Thanks, Balaji Seshadri
[jira] [Updated] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BalajiSeshadri updated KAFKA-1476: -- Attachment: KAFKA-1476.patch Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200550#comment-14200550 ] BalajiSeshadri commented on KAFKA-1476: --- Created reviewboard https://reviews.apache.org/r/27691/diff/ against branch origin/trunk Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BalajiSeshadri updated KAFKA-1476: -- Status: Patch Available (was: In Progress) Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200556#comment-14200556 ] BalajiSeshadri commented on KAFKA-1476: --- [~nehanarkhede] or [~junrao] or [~jkreps] Can anyone of you please review this ?.I created review board for both of my patches KAFKA-328 and KAFKA-1476. https://reviews.apache.org/r/27691/diff/ Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 27693: Patch for KAFKA-1476
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27693/ --- Review request for kafka. Bugs: KAFKA-1476 https://issues.apache.org/jira/browse/KAFKA-1476 Repository: kafka Description --- KAFKA-1476 Get list of consumer groups Diffs - core/src/main/scala/kafka/tools/ConsumerCommand.scala PRE-CREATION core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd Diff: https://reviews.apache.org/r/27693/diff/ Testing --- Thanks, Balaji Seshadri
[jira] [Updated] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BalajiSeshadri updated KAFKA-1476: -- Attachment: KAFKA-1476.patch Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200570#comment-14200570 ] BalajiSeshadri commented on KAFKA-1476: --- Created reviewboard https://reviews.apache.org/r/27693/diff/ against branch origin/trunk Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1476) Get a list of consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200575#comment-14200575 ] BalajiSeshadri commented on KAFKA-1476: --- Created review board for just this JIRA. [~nehanarkhede] or [~junrao] or [~jkreps] Please review. https://reviews.apache.org/r/27693/diff/ Get a list of consumer groups - Key: KAFKA-1476 URL: https://issues.apache.org/jira/browse/KAFKA-1476 Project: Kafka Issue Type: Wish Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Williams Assignee: BalajiSeshadri Labels: newbie Fix For: 0.9.0 Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476.patch It would be useful to have a way to get a list of consumer groups currently active via some tool/script that ships with kafka. This would be helpful so that the system tools can be explored more easily. For example, when running the ConsumerOffsetChecker, it requires a group option bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group ? But, when just getting started with kafka, using the console producer and consumer, it is not clear what value to use for the group option. If a list of consumer groups could be listed, then it would be clear what value to use. Background: http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Announcing Confluent
Hey all, I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a company around Kafka called Confluent. We are planning on productizing the kind of Kafka-based real-time data platform we built out at LinkedIn. We are doing this because we think this is a really powerful idea and we felt there was a lot to do to make this idea really take root. We wanted to make that our full time mission and focus. There is a blog post that goes into a little more depth here: http://blog.confluent.io/ LinkedIn will remain a heavy Kafka user and contributor. Combined with our additional resources from the funding of the company this should be a really good thing for the Kafka development effort. Especially when combined with the increasing contributions from the rest of the development community. This is great news, as there is a lot of work to do. We'll need to really focus on scaling this distributed development in a healthy way. One thing I do want to emphasize is that the addition of a company in the Kafka ecosystem won’t mean meddling with open source. Kafka will remain 100% open source and community focused, as of course is true of any Apache project. I have been doing open source for a long time and strongly believe it is the right model for infrastructure software development. Confluent is just getting off the ground now. We left LinkedIn, raised some money, and we have an office (but no furniture yet!). None the less, f you are interested in finding out more about the company and either getting help with your Kafka usage or joining us to help build all this, by all means reach out to us, we’d love to talk. Wish us luck! -Jay
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200603#comment-14200603 ] schandr commented on KAFKA-1738: GreatAnd thank you for the Patch. How should we apply this patch? Are there any instructions on how to apply the patch. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt, kafka-1738.patch We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092.
Re: No longer supporting Java 6, if? when?
Java6 is supported on CDH4 but not CDH5. On Thu, Nov 6, 2014 at 9:54 AM, Koert Kuipers ko...@tresata.com wrote: when is java 6 dropped by the hadoop distros? i am still aware of many clusters that are java 6 only at the moment. On Thu, Nov 6, 2014 at 12:44 PM, Gwen Shapira gshap...@cloudera.com wrote: +1 for dropping Java 6 On Thu, Nov 6, 2014 at 9:31 AM, Steven Schlansker sschlans...@opentable.com wrote: Java 6 has been End of Life since Feb 2013. Java 7 (and 8, but unfortunately that's too new still) has very compelling features which can make development a lot easier. The sooner more projects drop Java 6 the better, in my opinion :) On Nov 5, 2014, at 7:45 PM, Worthy LaFollette wort...@gmail.com wrote: Mostly converted now to 1.7, this would be welcomed to get any new features. On Wed Nov 05 2014 at 7:32:55 PM Joe Stein joe.st...@stealth.ly wrote: This has been coming up in a lot of projects and for other reasons too I wanted to kick off the discussion about if/when we end support for Java 6. Besides any API we may want to use in = 7 we also compile our binaries for 6 for release currently. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
Re: No longer supporting Java 6, if? when?
Yeah it is a little bit silly that people are still using Java 6. I guess this is a tradeoff--being more conservative in our java support means more people can use our software, whereas upgrading gives us developers a better experience since we aren't stuck with ancient stuff. Nonetheless I would argue for being a bit conservative here. Sadly a shocking number of people are still using Java 6. The Kafka clients get embedded in applications all over the place, and likely having even one application not yet upgraded would block adopting the new Kafka version that dropped java 6 support. So unless there is something in Java 7 we really really want I think it might be good to hold out a bit. As an example we dropped java 6 support in Samza and immediately had people blocked by that, and unlike the Kafka clients, Samza use is pretty centralized. -Jay On Wed, Nov 5, 2014 at 5:32 PM, Joe Stein joe.st...@stealth.ly wrote: This has been coming up in a lot of projects and for other reasons too I wanted to kick off the discussion about if/when we end support for Java 6. Besides any API we may want to use in = 7 we also compile our binaries for 6 for release currently. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
Re: Announcing Confluent
HI Guys, Thanks for your awesome support. I wish you good luck !! Thanks for open sources Kafka !! Thanks, Bhavesh On Thu, Nov 6, 2014 at 10:52 AM, Rajasekar Elango rela...@salesforce.com wrote: Congrats. Wish you all the very best and success. Thanks, Raja. On Thu, Nov 6, 2014 at 1:36 PM, Niek Sanders niek.sand...@gmail.com wrote: Congrats! On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a company around Kafka called Confluent. We are planning on productizing the kind of Kafka-based real-time data platform we built out at LinkedIn. We are doing this because we think this is a really powerful idea and we felt there was a lot to do to make this idea really take root. We wanted to make that our full time mission and focus. There is a blog post that goes into a little more depth here: http://blog.confluent.io/ LinkedIn will remain a heavy Kafka user and contributor. Combined with our additional resources from the funding of the company this should be a really good thing for the Kafka development effort. Especially when combined with the increasing contributions from the rest of the development community. This is great news, as there is a lot of work to do. We'll need to really focus on scaling this distributed development in a healthy way. One thing I do want to emphasize is that the addition of a company in the Kafka ecosystem won’t mean meddling with open source. Kafka will remain 100% open source and community focused, as of course is true of any Apache project. I have been doing open source for a long time and strongly believe it is the right model for infrastructure software development. Confluent is just getting off the ground now. We left LinkedIn, raised some money, and we have an office (but no furniture yet!). None the less, f you are interested in finding out more about the company and either getting help with your Kafka usage or joining us to help build all this, by all means reach out to us, we’d love to talk. Wish us luck! -Jay -- Thanks, Raja.
Re: Review Request 27690: Patch for kafka-1738
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27690/#review60215 --- Ship it! Ship It! - Neha Narkhede On Nov. 6, 2014, 5:34 p.m., Jun Rao wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27690/ --- (Updated Nov. 6, 2014, 5:34 p.m.) Review request for kafka. Bugs: kafka-1738 https://issues.apache.org/jira/browse/kafka-1738 Repository: kafka Description --- try/catch should include channel.receive() Diffs - core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 Diff: https://reviews.apache.org/r/27690/diff/ Testing --- Thanks, Jun Rao
Re: Review Request 27690: Patch for kafka-1738
On Nov. 6, 2014, 7:20 p.m., Neha Narkhede wrote: Ship It! Minor nit: Could you change fails to failed? - Neha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27690/#review60215 --- On Nov. 6, 2014, 5:34 p.m., Jun Rao wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27690/ --- (Updated Nov. 6, 2014, 5:34 p.m.) Review request for kafka. Bugs: kafka-1738 https://issues.apache.org/jira/browse/kafka-1738 Repository: kafka Description --- try/catch should include channel.receive() Diffs - core/src/main/scala/kafka/controller/ControllerChannelManager.scala ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 Diff: https://reviews.apache.org/r/27690/diff/ Testing --- Thanks, Jun Rao
[jira] [Updated] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1738: - Reviewer: Neha Narkhede Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt, kafka-1738.patch We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200708#comment-14200708 ] Neha Narkhede commented on KAFKA-1738: -- Good catch. Thanks for following up on this, Jun. Reviewed the patch, looks good. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt, kafka-1738.patch We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker.
[jira] [Commented] (KAFKA-1738) Partitions for topic not created after restart from forced shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14200748#comment-14200748 ] schandr commented on KAFKA-1738: Will apply the patch against the 0.8.2-beta and post the update. Partitions for topic not created after restart from forced shutdown --- Key: KAFKA-1738 URL: https://issues.apache.org/jira/browse/KAFKA-1738 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Environment: Linux, 2GB RAM, 2 Core CPU Reporter: Pradeep Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: 1738.zip, ServerLogForFailedTopicCreation.txt, ServerLogForFailedTopicCreation.txt, ServerLogForSuccessfulTopicCreation.txt, kafka-1738.patch We are using Kafka Topic APIs to create the topic. But in some cases, the topic gets created but we don't see the partition specific files and when producer/consumer tries to get the topic metadata and it fails with exception. Same happens if one tries to create using the command line. k.p.BrokerPartitionInfo - Error while fetching metadata [{TopicMetadata for topic tloader1 - No partition metadata for topic tloader1 due to kafka.common.UnknownTopicOrPartitionException}] for topic [tloader1]: class kafka.common.UnknownTopicOrPartitionException Steps to reproduce - 1. Stop kafka using kill -9 PID of Kafka 2. Start Kafka 3. Create Topic with partition and replication factor of 1. 4. Check the response “Created topic topic_name” 5. Run the list command to verify if its created. 6. Now check the data directory of kakfa. There would not be any for the newly created topic. We see issues when we are creating new topics. This happens randomly and we dont know the exact reasons. We see the below logs in controller during the time of creation of topics which doesnt have the partition files. 2014-11-03 13:12:50,625] INFO [Controller 0]: New topic creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Controller 0]: New partition creation callback for [JobJTopic,0] (kafka.controller.KafkaController) [2014-11-03 13:12:50,626] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,653] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,654] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [JobJTopic,0] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [JobJTopic,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,654] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [JobJTopic,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2014-11-03 13:12:50,667] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=JobJTopic,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2014-11-03 13:12:50,794] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:DMIPVM,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2014-11-03 13:12:50,965] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:43;ClientId:id_0-host_null-port_9092;AliveBrokers:id:0,host:DMIPVM,port:9092;PartitionState:[JobJTopic,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:DMIPVM,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread)
[jira] [Issue Comment Deleted] (KAFKA-1754) KOYA - Kafka on YARN
[ https://issues.apache.org/jira/browse/KAFKA-1754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Weise updated KAFKA-1754: Comment: was deleted (was: [~acmurthy] Thanks for the support. Disk reservation will benefit Kafka greatly. Will reach out with a few other questions. ) KOYA - Kafka on YARN Key: KAFKA-1754 URL: https://issues.apache.org/jira/browse/KAFKA-1754 Project: Kafka Issue Type: New Feature Reporter: Thomas Weise Attachments: DT-KOYA-Proposal- JIRA.pdf YARN (Hadoop 2.x) has enabled clusters to be used for a variety of workloads, emerging as distributed operating system for big data applications. Initiatives are on the way to bring long running services under the YARN umbrella, leveraging it for centralized resource management and operations ([YARN-896] and examples such as HBase, Accumulo or Memcached through Slider). This JIRA is to propose KOYA (Kafka On Yarn), a YARN application master to launch and manage Kafka clusters running on YARN. Brokers will use resources allocated through YARN with support for recovery, monitoring etc. Please see attached for more details. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Announcing Confluent
Jay, Neha and Jun congratz!! On Thu, Nov 6, 2014 at 11:09 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Guys, Thanks for your awesome support. I wish you good luck !! Thanks for open sources Kafka !! Thanks, Bhavesh On Thu, Nov 6, 2014 at 10:52 AM, Rajasekar Elango rela...@salesforce.com wrote: Congrats. Wish you all the very best and success. Thanks, Raja. On Thu, Nov 6, 2014 at 1:36 PM, Niek Sanders niek.sand...@gmail.com wrote: Congrats! On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a company around Kafka called Confluent. We are planning on productizing the kind of Kafka-based real-time data platform we built out at LinkedIn. We are doing this because we think this is a really powerful idea and we felt there was a lot to do to make this idea really take root. We wanted to make that our full time mission and focus. There is a blog post that goes into a little more depth here: http://blog.confluent.io/ LinkedIn will remain a heavy Kafka user and contributor. Combined with our additional resources from the funding of the company this should be a really good thing for the Kafka development effort. Especially when combined with the increasing contributions from the rest of the development community. This is great news, as there is a lot of work to do. We'll need to really focus on scaling this distributed development in a healthy way. One thing I do want to emphasize is that the addition of a company in the Kafka ecosystem won’t mean meddling with open source. Kafka will remain 100% open source and community focused, as of course is true of any Apache project. I have been doing open source for a long time and strongly believe it is the right model for infrastructure software development. Confluent is just getting off the ground now. We left LinkedIn, raised some money, and we have an office (but no furniture yet!). None the less, f you are interested in finding out more about the company and either getting help with your Kafka usage or joining us to help build all this, by all means reach out to us, we’d love to talk. Wish us luck! -Jay -- Thanks, Raja.
Re: Announcing Confluent
Best of luck!!! J On 6 Nov 2014, at 18:28, Jay Kreps jay.kr...@gmail.com wrote: Hey all, I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a company around Kafka called Confluent. We are planning on productizing the kind of Kafka-based real-time data platform we built out at LinkedIn. We are doing this because we think this is a really powerful idea and we felt there was a lot to do to make this idea really take root. We wanted to make that our full time mission and focus. There is a blog post that goes into a little more depth here: http://blog.confluent.io/ LinkedIn will remain a heavy Kafka user and contributor. Combined with our additional resources from the funding of the company this should be a really good thing for the Kafka development effort. Especially when combined with the increasing contributions from the rest of the development community. This is great news, as there is a lot of work to do. We'll need to really focus on scaling this distributed development in a healthy way. One thing I do want to emphasize is that the addition of a company in the Kafka ecosystem won’t mean meddling with open source. Kafka will remain 100% open source and community focused, as of course is true of any Apache project. I have been doing open source for a long time and strongly believe it is the right model for infrastructure software development. Confluent is just getting off the ground now. We left LinkedIn, raised some money, and we have an office (but no furniture yet!). None the less, f you are interested in finding out more about the company and either getting help with your Kafka usage or joining us to help build all this, by all means reach out to us, we’d love to talk. Wish us luck! -Jay
[jira] [Updated] (KAFKA-1757) Can not delete Topic index on Windows
[ https://issues.apache.org/jira/browse/KAFKA-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1757: - Reviewer: Jay Kreps Assignee: (was: Jay Kreps) Can not delete Topic index on Windows - Key: KAFKA-1757 URL: https://issues.apache.org/jira/browse/KAFKA-1757 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.2 Reporter: Lukáš Vyhlídka Priority: Minor Fix For: 0.8.2 Attachments: lucky-v.patch When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to delete the Topic throwed an error: ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] (kafka.server.KafkaApis) kafka.common.KafkaStorageException: Delete of index .index failed. at kafka.log.LogSegment.delete(LogSegment.scala:283) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.delete(Log.scala:608) at kafka.log.LogManager.deleteLog(LogManager.scala:375) at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inWriteLock(Utils.scala:543) at kafka.cluster.Partition.delete(Partition.scala:139) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:744) When I have investigated the issue I figured out that the index file (in my environment it was C:\tmp\kafka-logs\----0014-0\.index) was locked by the kafka process and the OS did not allow to delete that file. I tried to fix the problem in source codes and when I added close() method call into LogSegment.delete(), the Topic deletion started to work. I will add here (not sure how to upload the file during issue creation) a diff with the changes I have made so You can take a look on that whether it is reasonable or not. It would be perfect if it could make it into the product... In the end I would like to say that on Linux the deletion works just fine... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1654) Provide a way to override server configuration from command line
[ https://issues.apache.org/jira/browse/KAFKA-1654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201096#comment-14201096 ] Jarek Jarcec Cecho commented on KAFKA-1654: --- Looking at feedback proposal more closely [~nehanarkhede], I wanted to explain a bit of my thinking. My intention is to add ability to specify command line arguments while starting Kafka broker similarly as we do have for the command line tools. In this patch I'm adding one argument only ({{--set}} that will be renamed to {{--override}} as you've suggested), that happens to have the ability to be specified multiple times. That explains why I'm expecting that you have to specify it for every property that is being overriden and also why I do have the [kafka options] section without mentioning that you have to use {{--set}} (you might want to add different parameter in the future, like completely bogus {{--disable-jmx}} or whatever :)). Perhaps I should have also submitted a second documentation patch to cover the usage. Does the reasoning resonate with you or would you prefer to simply add ability to override the config properties without opening the code to introduce additional command line arguments in the future? I'll definitely address the fact that I'm not properly failing on error cases, that is a huge problem in my mind - thank you for uncovering it! Provide a way to override server configuration from command line Key: KAFKA-1654 URL: https://issues.apache.org/jira/browse/KAFKA-1654 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1.1 Reporter: Jarek Jarcec Cecho Assignee: Jarek Jarcec Cecho Fix For: 0.8.3 Attachments: KAFKA-1654.patch I've been recently playing with Kafka and I found the current way of server configuration quite inflexible. All the configuration options have to be inside a properties file and there is no way how they can be overridden for execution. In order to temporarily change one property I had to copy the config file and change the property there. Hence, I'm wondering if people would be open to provide a way how to specify and override the configs from the command line when starting Kafka? Something like: {code} ./bin/kafka-server-start.sh -Dmy.cool.property=X kafka.properties {code} or {code} ./bin/kafka-server-start.sh --set my.cool.property=X kafka.properties {code} I'm more than happy to take a stab at it, but I would like to see if there is an interest for such capability? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1654) Provide a way to override server configuration from command line
[ https://issues.apache.org/jira/browse/KAFKA-1654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201096#comment-14201096 ] Jarek Jarcec Cecho edited comment on KAFKA-1654 at 11/6/14 10:42 PM: - Looking at feedback proposal more closely [~nehanarkhede], I wanted to explain a bit of my thinking. My intention is to add ability to specify command line arguments while starting Kafka broker similarly as we do have for the command line tools. In this patch I'm adding one argument only ({{\-\-set}} that will be renamed to {{\-\-override}} as you've suggested), that happens to have the ability to be specified multiple times. That explains why I'm expecting that you have to specify it for every property that is being overriden and also why I do have the [kafka options] section without mentioning that you have to use {{\-\-set}} (you might want to add different parameter in the future, like completely bogus {{\-\-disable\-jmx}} or whatever :)). Perhaps I should have also submitted a second documentation patch to cover the usage. Does the reasoning resonate with you or would you prefer to simply add ability to override the config properties without opening the code to introduce additional command line arguments in the future? I'll definitely address the fact that I'm not properly failing on error cases, that is a huge problem in my mind - thank you for uncovering it! was (Author: jarcec): Looking at feedback proposal more closely [~nehanarkhede], I wanted to explain a bit of my thinking. My intention is to add ability to specify command line arguments while starting Kafka broker similarly as we do have for the command line tools. In this patch I'm adding one argument only ({{--set}} that will be renamed to {{--override}} as you've suggested), that happens to have the ability to be specified multiple times. That explains why I'm expecting that you have to specify it for every property that is being overriden and also why I do have the [kafka options] section without mentioning that you have to use {{--set}} (you might want to add different parameter in the future, like completely bogus {{--disable-jmx}} or whatever :)). Perhaps I should have also submitted a second documentation patch to cover the usage. Does the reasoning resonate with you or would you prefer to simply add ability to override the config properties without opening the code to introduce additional command line arguments in the future? I'll definitely address the fact that I'm not properly failing on error cases, that is a huge problem in my mind - thank you for uncovering it! Provide a way to override server configuration from command line Key: KAFKA-1654 URL: https://issues.apache.org/jira/browse/KAFKA-1654 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1.1 Reporter: Jarek Jarcec Cecho Assignee: Jarek Jarcec Cecho Fix For: 0.8.3 Attachments: KAFKA-1654.patch I've been recently playing with Kafka and I found the current way of server configuration quite inflexible. All the configuration options have to be inside a properties file and there is no way how they can be overridden for execution. In order to temporarily change one property I had to copy the config file and change the property there. Hence, I'm wondering if people would be open to provide a way how to specify and override the configs from the command line when starting Kafka? Something like: {code} ./bin/kafka-server-start.sh -Dmy.cool.property=X kafka.properties {code} or {code} ./bin/kafka-server-start.sh --set my.cool.property=X kafka.properties {code} I'm more than happy to take a stab at it, but I would like to see if there is an interest for such capability? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1723) make the metrics name in new producer more standard
[ https://issues.apache.org/jira/browse/KAFKA-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201138#comment-14201138 ] Jun Rao commented on KAFKA-1723: [~otis], let me explain that a bit more. Historically, we have been using metrics-core to expose mbeans in both the client and the server. When we added the new java producer, we decided to use a metrics package of our own (org.apache.kafka.common.metrics) to simplify the jar dependency. The new java producer exposes a bunch of new mbeans (completely different from the old producer) using our own metrics. The naming of those mbean is in the description of the jira and is not completely consistent with those created by metrics-core (after kafka-1481 is fixed). This jira is intended to make mbean names created by our own metrics consistent with what's in metrics-core. Longer term, our thinking is to eventually use our own metrics for the server side mbean too. make the metrics name in new producer more standard --- Key: KAFKA-1723 URL: https://issues.apache.org/jira/browse/KAFKA-1723 Project: Kafka Issue Type: Sub-task Components: clients Affects Versions: 0.8.2 Reporter: Jun Rao Fix For: 0.8.3 The jmx name in the new producer looks like the following: kafka.producer.myclientid:type=mytopic However, this can be ambiguous since we allow . in client id and topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1753) add --decommission-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201162#comment-14201162 ] Neha Narkhede commented on KAFKA-1753: -- +1 on decommission-broker add --decommission-broker option Key: KAFKA-1753 URL: https://issues.apache.org/jira/browse/KAFKA-1753 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1753) add --decommission-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1753: - Reviewer: Neha Narkhede add --decommission-broker option Key: KAFKA-1753 URL: https://issues.apache.org/jira/browse/KAFKA-1753 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 6, 2014, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201211#comment-14201211 ] Guozhang Wang commented on KAFKA-1634: -- Updated reviewboard https://reviews.apache.org/r/27391/diff/ against branch origin/trunk Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Guozhang Wang Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-1634.patch, KAFKA-1634_2014-11-06_15:35:46.patch From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1634: - Attachment: KAFKA-1634_2014-11-06_15:35:46.patch Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Guozhang Wang Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-1634.patch, KAFKA-1634_2014-11-06_15:35:46.patch From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1751) handle broker not exists scenario
[ https://issues.apache.org/jira/browse/KAFKA-1751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201243#comment-14201243 ] Neha Narkhede commented on KAFKA-1751: -- [~junrao] This is something I found while using the tool. See this https://issues.apache.org/jira/browse/KAFKA-1678?focusedCommentId=14178620page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14178620 handle broker not exists scenario --- Key: KAFKA-1751 URL: https://issues.apache.org/jira/browse/KAFKA-1751 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 Attachments: kafka-1751.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1752) add --replace-broker option
[ https://issues.apache.org/jira/browse/KAFKA-1752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201245#comment-14201245 ] Neha Narkhede commented on KAFKA-1752: -- +1 on [~gwenshap]'s suggestion. add --replace-broker option --- Key: KAFKA-1752 URL: https://issues.apache.org/jira/browse/KAFKA-1752 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1481: -- Attachment: alternateLayout2.png alternateLayout1.png diff-for-alternate-layout1.patch diff-for-alternate-layout2.patch originalLayout.png Can you rebase? Sorry I know you have rebased a couple times already. Hopefully this should be the last time as these are minor comments. KafkaMetricsGroup: 64: foreach KafkaMetricsGroup: toMbeanName: 150/153: can you use filter { case(tagKey, tagValue) = ...} For aggregate topic metrics, since allTopics=true appears at the end it is a bit weird when browsing mbeans in jvisualvm/other tools. i.e., the mbean is listed as true. I understand why - it is just a bit weird. I'm referring to (for example) kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,allTopics=true See the attached originalLayout.png Personally I prefer aggregate=true to allTopics=true. A further improvement with aggregate=true is the following: in KafkaMetricsGroup.metricName you can check in the tags map if aggregate=true. If so, then modify the typeName by pre-pending Aggregate to it and then strip off the aggregate=true tag. So you will end up with: kafka.server:type=BrokerTopicMetrics,name=AggregateBytesOutPerSec See alternateLayout1.png Another alternative is to modify the name (not the typeName). See alternateLayout2.png The aggregate=true approach seems generic enough to apply to any other all-topic, all-request, or all-broker level mbeans. What do you think? Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, KAFKA-1481_2014-10-31_14-35-43.patch, KAFKA-1481_2014-11-03_16-39-41_doc.patch, KAFKA-1481_2014-11-03_17-02-23.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch, alternateLayout1.png, alternateLayout2.png, diff-for-alternate-layout1.patch, diff-for-alternate-layout2.patch, originalLayout.png MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27534: Patch for KAFKA-1746
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27534/#review60276 --- Ship it! Ship It! - Neha Narkhede On Nov. 3, 2014, 7:46 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27534/ --- (Updated Nov. 3, 2014, 7:46 p.m.) Review request for kafka. Bugs: KAFKA-1746 https://issues.apache.org/jira/browse/KAFKA-1746 Repository: kafka Description --- KAFKA-1746 Make system tests return a useful exit code. KAFKA-1746 Check the exit code when running DumpLogSegments to verify data. Diffs - system_test/mirror_maker_testsuite/mirror_maker_test.py c0117c64cbb7687ca8fbcec6b5c188eb880300ef system_test/offset_management_testsuite/offset_management_test.py 12b5cd25140e1eb407dd57eef63d9783257688b2 system_test/replication_testsuite/replica_basic_test.py 660006cc253bbae3e7cd9f02601f1c1937dd1714 system_test/system_test_runner.py ee7aa252333553e8eb0bc046edf968ec99dddb70 system_test/utils/kafka_system_test_utils.py 1093b660ebd0cb5ab6d3731d26f151e1bf717f8a Diff: https://reviews.apache.org/r/27534/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1755) Log cleaner thread should not exit on errors
[ https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1755: - Labels: newbie++ (was: ) Log cleaner thread should not exit on errors Key: KAFKA-1755 URL: https://issues.apache.org/jira/browse/KAFKA-1755 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Labels: newbie++ Fix For: 0.8.3 The log cleaner is a critical process when using compacted topics. However, if there is any error in any topic (notably if a key is missing) then the cleaner exits and all other compacted topics will also be adversely affected - i.e., compaction stops across the board. This can be improved by just aborting compaction for a topic on any error and keep the thread from exiting. Another improvement would be to reject messages without keys that are sent to compacted topics although this is not enough by itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27535: Patch for KAFKA-1747
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27535/#review60279 --- Ship it! Ship It! - Neha Narkhede On Nov. 3, 2014, 7:46 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27535/ --- (Updated Nov. 3, 2014, 7:46 p.m.) Review request for kafka. Bugs: KAFKA-1747 https://issues.apache.org/jira/browse/KAFKA-1747 Repository: kafka Description --- KAKFA-1747 Fix TestcaseEnv so state isn't shared between instances. Diffs - system_test/utils/testcase_env.py b3c29105c04348f036efbbdc430e14e099ca8c70 Diff: https://reviews.apache.org/r/27535/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1744) Fetch Response contains messages prior to the requested offset
[ https://issues.apache.org/jira/browse/KAFKA-1744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201365#comment-14201365 ] Neha Narkhede commented on KAFKA-1744: -- [~eapache] I'm assuming that you are referring to a non-java consumer right? We would always want to do this in the java consumer, so it's worth fixing the docs. Can you point me to the spec where you found this? Fetch Response contains messages prior to the requested offset -- Key: KAFKA-1744 URL: https://issues.apache.org/jira/browse/KAFKA-1744 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus As reported in https://github.com/Shopify/sarama/issues/166 there are cases where a FetchRequest for a particular offset returns some messages prior to that offset. The spec does not seem to indicate that this is possible; it does state that As an optimization the server is allowed to return a partial message at the end of the message set. but otherwise implies that a request for offset X will only return complete messages starting at X. The scala consumer does seem to handle this case gracefully though, if I am reading it correctly (my scala is not the best): https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala#L96-L99 So is this a bug or just a case that needs to be added to the spec? Something like As an optimization the server is allowed to return some messages in the message set prior to the requested offset. Clients should handle this case.? Although I can't imagine why sending extra data would be faster than only sending the necessary messages... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1744) Fetch Response contains messages prior to the requested offset
[ https://issues.apache.org/jira/browse/KAFKA-1744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201374#comment-14201374 ] Evan Huus commented on KAFKA-1744: -- [~nehanarkhede] this was discovered in the golang consumer I maintain - the scala consumer (as I linked) seems to handle this case already. I have not checked the java consumer. The [spec for the fetch API|https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI] implies (though it does not explicitly state) that if I perform a fetch request for offset X, the fetch response will contain messages whose offset is strictly = X. If this is not true (in practice I have seen messages with offsets X) I would suggest explicitly noting this in the spec to avoid confusion. Alternatively it may be a real bug in the broker, in which case the spec is fine and the broker should be fixed. I don't have enough information to say for sure. Fetch Response contains messages prior to the requested offset -- Key: KAFKA-1744 URL: https://issues.apache.org/jira/browse/KAFKA-1744 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus As reported in https://github.com/Shopify/sarama/issues/166 there are cases where a FetchRequest for a particular offset returns some messages prior to that offset. The spec does not seem to indicate that this is possible; it does state that As an optimization the server is allowed to return a partial message at the end of the message set. but otherwise implies that a request for offset X will only return complete messages starting at X. The scala consumer does seem to handle this case gracefully though, if I am reading it correctly (my scala is not the best): https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala#L96-L99 So is this a bug or just a case that needs to be added to the spec? Something like As an optimization the server is allowed to return some messages in the message set prior to the requested offset. Clients should handle this case.? Although I can't imagine why sending extra data would be faster than only sending the necessary messages... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1741) consumer get always old messages
[ https://issues.apache.org/jira/browse/KAFKA-1741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede resolved KAFKA-1741. -- Resolution: Won't Fix consumer get always old messages Key: KAFKA-1741 URL: https://issues.apache.org/jira/browse/KAFKA-1741 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1, 0.8.2 Reporter: hamza ezzi Assignee: Neha Narkhede every time when a consumer get a message, i have this error, and when i restart consumer i get old message knowing i specified in my consumer config to do not get old message my nodejs consumer code : var kafka = require('kafka-node'); var HighLevelConsumer = kafka.HighLevelConsumer; var Offset = kafka.Offset; var Client = kafka.Client; var argv = require('optimist').argv; var topic = argv.topic || 'sLNzXYHLJA'; var client = new Client('XXX.XXX.XXX:2181','consumer'+process.pid); var payloads = [{topic:topic}]; var options = { groupId: 'kafka-node-group', // Auto commit config autoCommit: true, autoCommitMsgCount: 100, autoCommitIntervalMs: 5000, // Fetch message config fetchMaxWaitMs: 100, fetchMinBytes: 1, fetchMaxBytes: 1024 * 10, fromOffset: false, fromBeginning: false }; var consumer = new HighLevelConsumer(client, payloads, options); var offset = new Offset(client); consumer.on('message', function (message) { console.log(this.id, message); }); consumer.on('error', function (err) { console.log('error', err); }); consumer.on('offsetOutOfRange', function (topic) { console.log(- offsetOutOfRange ); topic.maxNum = 2; offset.fetch([topic], function (err, offsets) { var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); consumer.setOffset(topic.topic, topic.partition, min); }); }); error kafka log : [2014-10-31 17:13:32,173] ERROR Closing socket for /212.XXX.XXX.XXX because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.Buffer.nextGetIndex(Buffer.java:498) at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406) at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:62) at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:58) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:58) at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:55) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:55) at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47) at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:50) at kafka.network.Processor.read(SocketServer.scala:450) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1741) consumer get always old messages
[ https://issues.apache.org/jira/browse/KAFKA-1741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201391#comment-14201391 ] Neha Narkhede commented on KAFKA-1741: -- It seems like the format of the OffsetCommitRequest you are using is different from what the server expects. See the format explained here - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest. Best if we take such questions to the mailing list though. consumer get always old messages Key: KAFKA-1741 URL: https://issues.apache.org/jira/browse/KAFKA-1741 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1, 0.8.2 Reporter: hamza ezzi Assignee: Neha Narkhede every time when a consumer get a message, i have this error, and when i restart consumer i get old message knowing i specified in my consumer config to do not get old message my nodejs consumer code : var kafka = require('kafka-node'); var HighLevelConsumer = kafka.HighLevelConsumer; var Offset = kafka.Offset; var Client = kafka.Client; var argv = require('optimist').argv; var topic = argv.topic || 'sLNzXYHLJA'; var client = new Client('XXX.XXX.XXX:2181','consumer'+process.pid); var payloads = [{topic:topic}]; var options = { groupId: 'kafka-node-group', // Auto commit config autoCommit: true, autoCommitMsgCount: 100, autoCommitIntervalMs: 5000, // Fetch message config fetchMaxWaitMs: 100, fetchMinBytes: 1, fetchMaxBytes: 1024 * 10, fromOffset: false, fromBeginning: false }; var consumer = new HighLevelConsumer(client, payloads, options); var offset = new Offset(client); consumer.on('message', function (message) { console.log(this.id, message); }); consumer.on('error', function (err) { console.log('error', err); }); consumer.on('offsetOutOfRange', function (topic) { console.log(- offsetOutOfRange ); topic.maxNum = 2; offset.fetch([topic], function (err, offsets) { var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); consumer.setOffset(topic.topic, topic.partition, min); }); }); error kafka log : [2014-10-31 17:13:32,173] ERROR Closing socket for /212.XXX.XXX.XXX because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.Buffer.nextGetIndex(Buffer.java:498) at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406) at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:62) at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:58) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:58) at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:55) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:55) at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47) at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:50) at kafka.network.Processor.read(SocketServer.scala:450) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1744) Fetch Response contains messages prior to the requested offset
[ https://issues.apache.org/jira/browse/KAFKA-1744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201394#comment-14201394 ] Neha Narkhede commented on KAFKA-1744: -- The broker sends data to the consumer using zero-copy, so it cannot filter the extra messages out. The spec already says Clients should handle this case. Should we close this JIRA? Fetch Response contains messages prior to the requested offset -- Key: KAFKA-1744 URL: https://issues.apache.org/jira/browse/KAFKA-1744 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus As reported in https://github.com/Shopify/sarama/issues/166 there are cases where a FetchRequest for a particular offset returns some messages prior to that offset. The spec does not seem to indicate that this is possible; it does state that As an optimization the server is allowed to return a partial message at the end of the message set. but otherwise implies that a request for offset X will only return complete messages starting at X. The scala consumer does seem to handle this case gracefully though, if I am reading it correctly (my scala is not the best): https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala#L96-L99 So is this a bug or just a case that needs to be added to the spec? Something like As an optimization the server is allowed to return some messages in the message set prior to the requested offset. Clients should handle this case.? Although I can't imagine why sending extra data would be faster than only sending the necessary messages... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1695) Authenticate connection to Zookeeper
[ https://issues.apache.org/jira/browse/KAFKA-1695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201393#comment-14201393 ] Gwen Shapira commented on KAFKA-1695: - The pull request is in! I think it makes sense to open separate JIRA for upgrading zkclient? Authenticate connection to Zookeeper Key: KAFKA-1695 URL: https://issues.apache.org/jira/browse/KAFKA-1695 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Gwen Shapira We need to make it possible to secure the Zookeeper cluster Kafka is using. This would make use of the normal authentication ZooKeeper provides. ZooKeeper supports a variety of authentication mechanisms so we will need to figure out what has to be passed in to the zookeeper client. The intention is that when the current round of client work is done it should be possible to run without clients needing access to Zookeeper so all we need here is to make it so that only the Kafka cluster is able to read and write to the Kafka znodes (we shouldn't need to set any kind of acl on a per-znode basis). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1756) never allow the replica fetch size to be less than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-1756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201404#comment-14201404 ] Jun Rao commented on KAFKA-1756: Joe, The fix for 1) is a bit hard to do. kafka-topics just creates some paths in ZK and it doesn't have access to the default broker side configs. Once the topic and its config are created in ZK, it's hard to back in out in the broker. For 2), we are already doing this check. Gwen, What you mentioned is the benefit for any topic level config. My point is that we probably shouldn't make max.message.size a topic level config given the implication to downstream consumers such as mirrormaker and the replica fetchers. never allow the replica fetch size to be less than the max message size --- Key: KAFKA-1756 URL: https://issues.apache.org/jira/browse/KAFKA-1756 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1, 0.8.2 Reporter: Joe Stein Priority: Blocker Fix For: 0.8.2 There exists a very hazardous scenario where if the max.message.bytes is greather than the replica.fetch.max.bytes the message will never replicate. This will bring the ISR down to 1 (eventually/quickly once replica.lag.max.messages is reached). If during this window the leader itself goes out of the ISR then the new leader will commit the last offset it replicated. This is also bad for sync producers with -1 ack because they will all block (heard affect caused upstream) in this scenario too. The fix here is two fold 1) when setting max.message.bytes using kafka-topics we must check first each and every broker (which will need some thought about how todo this because of the topiccommand zk notification) that max.message.bytes = replica.fetch.max.bytes and if it is NOT then DO NOT create the topic 2) if you change this in server.properties then the broker should not start if max.message.bytes replica.fetch.max.bytes This does beg the question/issue some about centralizing certain/some/all configurations so that inconsistencies do not occur (where broker 1 has max.message.bytes replica.fetch.max.bytes but broker 2 max.message.bytes = replica.fetch.max.bytes because of error in properties). I do not want to conflate this ticket but I think it is worth mentioning/bringing up here as it is a good example where it could make sense. I set this as BLOCKER for 0.8.2-beta because we did so much work to enable consistency vs availability and 0 data loss this corner case should be part of 0.8.2-final Also, I could go one step further (though I would not consider this part as a blocker for 0.8.2 but interested to what other folks think) about a consumer replica fetch size so that if the message max is increased messages will no longer be consumed (since the consumer fetch max would be max.message.bytes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1695) Authenticate connection to Zookeeper
[ https://issues.apache.org/jira/browse/KAFKA-1695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201419#comment-14201419 ] Neha Narkhede commented on KAFKA-1695: -- bq. I think it makes sense to open separate JIRA for upgrading zkclient? Yup. That'll be great Authenticate connection to Zookeeper Key: KAFKA-1695 URL: https://issues.apache.org/jira/browse/KAFKA-1695 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Gwen Shapira We need to make it possible to secure the Zookeeper cluster Kafka is using. This would make use of the normal authentication ZooKeeper provides. ZooKeeper supports a variety of authentication mechanisms so we will need to figure out what has to be passed in to the zookeeper client. The intention is that when the current round of client work is done it should be possible to run without clients needing access to Zookeeper so all we need here is to make it so that only the Kafka cluster is able to read and write to the Kafka znodes (we shouldn't need to set any kind of acl on a per-znode basis). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1744) Fetch Response contains messages prior to the requested offset
[ https://issues.apache.org/jira/browse/KAFKA-1744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201424#comment-14201424 ] Jun Rao commented on KAFKA-1744: Evan, I added the following explanation to the wiki. In general, the return messages will have offsets larger than or equal to the starting offset. However, with compressed messages, it's possible for the returned messages to have offsets smaller than the starting offset. The number of such messages is typically small and the caller is responsible for filter out those messages. Fetch Response contains messages prior to the requested offset -- Key: KAFKA-1744 URL: https://issues.apache.org/jira/browse/KAFKA-1744 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus As reported in https://github.com/Shopify/sarama/issues/166 there are cases where a FetchRequest for a particular offset returns some messages prior to that offset. The spec does not seem to indicate that this is possible; it does state that As an optimization the server is allowed to return a partial message at the end of the message set. but otherwise implies that a request for offset X will only return complete messages starting at X. The scala consumer does seem to handle this case gracefully though, if I am reading it correctly (my scala is not the best): https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala#L96-L99 So is this a bug or just a case that needs to be added to the spec? Something like As an optimization the server is allowed to return some messages in the message set prior to the requested offset. Clients should handle this case.? Although I can't imagine why sending extra data would be faster than only sending the necessary messages... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1744) Fetch Response contains messages prior to the requested offset
[ https://issues.apache.org/jira/browse/KAFKA-1744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201425#comment-14201425 ] Evan Huus commented on KAFKA-1744: -- [~junrao] very nice stealth edit of the spec, thank you :) That clarification is what I was looking for, this ticket can be closed. Fetch Response contains messages prior to the requested offset -- Key: KAFKA-1744 URL: https://issues.apache.org/jira/browse/KAFKA-1744 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus As reported in https://github.com/Shopify/sarama/issues/166 there are cases where a FetchRequest for a particular offset returns some messages prior to that offset. The spec does not seem to indicate that this is possible; it does state that As an optimization the server is allowed to return a partial message at the end of the message set. but otherwise implies that a request for offset X will only return complete messages starting at X. The scala consumer does seem to handle this case gracefully though, if I am reading it correctly (my scala is not the best): https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala#L96-L99 So is this a bug or just a case that needs to be added to the spec? Something like As an optimization the server is allowed to return some messages in the message set prior to the requested offset. Clients should handle this case.? Although I can't imagine why sending extra data would be faster than only sending the necessary messages... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1744) Fetch Response contains messages prior to the requested offset
[ https://issues.apache.org/jira/browse/KAFKA-1744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201425#comment-14201425 ] Evan Huus edited comment on KAFKA-1744 at 11/7/14 2:00 AM: --- [~junrao] thanks for the clarification, that's what I was looking for, this ticket can be closed. was (Author: eapache): [~junrao] very nice stealth edit of the spec, thank you :) That clarification is what I was looking for, this ticket can be closed. Fetch Response contains messages prior to the requested offset -- Key: KAFKA-1744 URL: https://issues.apache.org/jira/browse/KAFKA-1744 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus As reported in https://github.com/Shopify/sarama/issues/166 there are cases where a FetchRequest for a particular offset returns some messages prior to that offset. The spec does not seem to indicate that this is possible; it does state that As an optimization the server is allowed to return a partial message at the end of the message set. but otherwise implies that a request for offset X will only return complete messages starting at X. The scala consumer does seem to handle this case gracefully though, if I am reading it correctly (my scala is not the best): https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ConsumerIterator.scala#L96-L99 So is this a bug or just a case that needs to be added to the spec? Something like As an optimization the server is allowed to return some messages in the message set prior to the requested offset. Clients should handle this case.? Although I can't imagine why sending extra data would be faster than only sending the necessary messages... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1747) TestcaseEnv improperly shares state between instances
[ https://issues.apache.org/jira/browse/KAFKA-1747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1747: - Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch, Ewen! Pushed to trunk TestcaseEnv improperly shares state between instances - Key: KAFKA-1747 URL: https://issues.apache.org/jira/browse/KAFKA-1747 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1747.patch TestcaseEnv in system tests uses class variables instead of instance variables for a bunch of state. This causes the data to persist between tests. In some cases this can cause tests to break (e.g. there will be state from a service running in a previous test that doesn't exist in the current test; trying to look up state about that service raises an exception or produces invalid data). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1746) System tests don't handle errors well
[ https://issues.apache.org/jira/browse/KAFKA-1746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1746: - Component/s: system tests System tests don't handle errors well - Key: KAFKA-1746 URL: https://issues.apache.org/jira/browse/KAFKA-1746 Project: Kafka Issue Type: Bug Components: system tests Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-1746.patch The system test scripts don't handle errors well. A couple of key issues: * Unexpected exceptions during tests are just ignored and the tests appear to be successful in the reports. * The scripts exit code is always 0, even if tests fail. * Almost no subprocess calls are checked. In a lot of cases this is ok, and sometimes it's not possible (e.g. after starting a long-running remote process), but in some cases such as calls to DumpLogSegments, the tests can miss that the tools is exiting with an exception and the test appears to be successful even though no data was verified. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1746) System tests don't handle errors well
[ https://issues.apache.org/jira/browse/KAFKA-1746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1746: - Fix Version/s: 0.8.3 System tests don't handle errors well - Key: KAFKA-1746 URL: https://issues.apache.org/jira/browse/KAFKA-1746 Project: Kafka Issue Type: Bug Components: system tests Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-1746.patch The system test scripts don't handle errors well. A couple of key issues: * Unexpected exceptions during tests are just ignored and the tests appear to be successful in the reports. * The scripts exit code is always 0, even if tests fail. * Almost no subprocess calls are checked. In a lot of cases this is ok, and sometimes it's not possible (e.g. after starting a long-running remote process), but in some cases such as calls to DumpLogSegments, the tests can miss that the tools is exiting with an exception and the test appears to be successful even though no data was verified. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1747) TestcaseEnv improperly shares state between instances
[ https://issues.apache.org/jira/browse/KAFKA-1747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1747: - Fix Version/s: 0.8.3 TestcaseEnv improperly shares state between instances - Key: KAFKA-1747 URL: https://issues.apache.org/jira/browse/KAFKA-1747 Project: Kafka Issue Type: Bug Components: system tests Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-1747.patch TestcaseEnv in system tests uses class variables instead of instance variables for a bunch of state. This causes the data to persist between tests. In some cases this can cause tests to break (e.g. there will be state from a service running in a previous test that doesn't exist in the current test; trying to look up state about that service raises an exception or produces invalid data). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1747) TestcaseEnv improperly shares state between instances
[ https://issues.apache.org/jira/browse/KAFKA-1747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1747: - Component/s: system tests TestcaseEnv improperly shares state between instances - Key: KAFKA-1747 URL: https://issues.apache.org/jira/browse/KAFKA-1747 Project: Kafka Issue Type: Bug Components: system tests Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-1747.patch TestcaseEnv in system tests uses class variables instead of instance variables for a bunch of state. This causes the data to persist between tests. In some cases this can cause tests to break (e.g. there will be state from a service running in a previous test that doesn't exist in the current test; trying to look up state about that service raises an exception or produces invalid data). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1746) System tests don't handle errors well
[ https://issues.apache.org/jira/browse/KAFKA-1746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1746: - Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch! Pushed to trunk System tests don't handle errors well - Key: KAFKA-1746 URL: https://issues.apache.org/jira/browse/KAFKA-1746 Project: Kafka Issue Type: Bug Components: system tests Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-1746.patch The system test scripts don't handle errors well. A couple of key issues: * Unexpected exceptions during tests are just ignored and the tests appear to be successful in the reports. * The scripts exit code is always 0, even if tests fail. * Almost no subprocess calls are checked. In a lot of cases this is ok, and sometimes it's not possible (e.g. after starting a long-running remote process), but in some cases such as calls to DumpLogSegments, the tests can miss that the tools is exiting with an exception and the test appears to be successful even though no data was verified. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1756) never allow the replica fetch size to be less than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-1756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201449#comment-14201449 ] Gwen Shapira commented on KAFKA-1756: - I wonder if it makes sense for the brokers to persist their configs in their respective ZK nodes, so we can validate topic configs against broker configs? never allow the replica fetch size to be less than the max message size --- Key: KAFKA-1756 URL: https://issues.apache.org/jira/browse/KAFKA-1756 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1, 0.8.2 Reporter: Joe Stein Priority: Blocker Fix For: 0.8.2 There exists a very hazardous scenario where if the max.message.bytes is greather than the replica.fetch.max.bytes the message will never replicate. This will bring the ISR down to 1 (eventually/quickly once replica.lag.max.messages is reached). If during this window the leader itself goes out of the ISR then the new leader will commit the last offset it replicated. This is also bad for sync producers with -1 ack because they will all block (heard affect caused upstream) in this scenario too. The fix here is two fold 1) when setting max.message.bytes using kafka-topics we must check first each and every broker (which will need some thought about how todo this because of the topiccommand zk notification) that max.message.bytes = replica.fetch.max.bytes and if it is NOT then DO NOT create the topic 2) if you change this in server.properties then the broker should not start if max.message.bytes replica.fetch.max.bytes This does beg the question/issue some about centralizing certain/some/all configurations so that inconsistencies do not occur (where broker 1 has max.message.bytes replica.fetch.max.bytes but broker 2 max.message.bytes = replica.fetch.max.bytes because of error in properties). I do not want to conflate this ticket but I think it is worth mentioning/bringing up here as it is a good example where it could make sense. I set this as BLOCKER for 0.8.2-beta because we did so much work to enable consistency vs availability and 0 data loss this corner case should be part of 0.8.2-final Also, I could go one step further (though I would not consider this part as a blocker for 0.8.2 but interested to what other folks think) about a consumer replica fetch size so that if the message max is increased messages will no longer be consumed (since the consumer fetch max would be max.message.bytes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1282) Disconnect idle socket connection in Selector
[ https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201475#comment-14201475 ] Jun Rao commented on KAFKA-1282: Nicu, I was doing some manual testing of this feature. What I observed is that sometimes, the idle connections are not closed. The following was what I did. 1. Configure a small connections.max.idle.ms = 1. 2. start ZK and Kafka broker 3. start a console consumer bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic1 --from-beginning 4. start a console producer and type in sth every 15 secs or so. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1 --request-required-acks 1 What I observed was that initially, the producer connections kept getting killed by the broker correctly after being idle for 10 secs. The next producer send would hit an IOException and trigger a resend. However, after typing in 10 or so messages, at some point, no idle connections were killed by the broker any more and the producer send always succeeded. Disconnect idle socket connection in Selector - Key: KAFKA-1282 URL: https://issues.apache.org/jira/browse/KAFKA-1282 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: nicu marasoiu Labels: newbie++ Fix For: 0.9.0 Attachments: 1282_brush.patch, 1282_brushed_up.patch, KAFKA-1282_Disconnect_idle_socket_connection_in_Selector.patch To reduce # socket connections, it would be useful for the new producer to close socket connections that are idle. We can introduce a new producer config for the idle time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review60285 --- clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101649 @Deprecated clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101651 This confused me a bit, and I think it is because initCommonFields is intended to initialize fields common to all versions of the request. It is a useful helper method but it becomes somewhat clunky when removing fields. The partition-level timestamp is no longer a common field. If this is v2 then we should _never_ set anything in the timestamp field of the struct; and if it is v2 then we should _always_ set the timestamp field (even if it is the default). However, since the timestamp field in the Field declaration for OFFSET_COMMIT_REQUEST_PARTITION_V0 does not have a default explicitly specified, I think this will break with a SchemaException(missing value...) for offset commit request v0, v1 if we choose to write to a bytebuffer under those versions with this code. One option is to explicitly pass in the constructor version (0, 1, 2) to initCommonFields and use that to decide whether to include/exclude this field, but that is weird. Another alternative is a separate helper method for v0v1. That is even weirder. clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101650 Would help to add a comment This field only exists in v0 and v1 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala https://reviews.apache.org/r/27391/#comment101657 Can we mark this @Deprecated as well? We should probably make the primary constructor without timestamp and add a secondary constructor with timestamp and mark deprecated there. Also, can we use case class.copy if timestamp needs to be modified? However, per comment further down I don't think it needs to be touched. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101658 This was already there, but it would be clearer to use: filter { case (..., ...) = } core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101659 Found %d expired offsets. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101660 Actually, since we always set the retention period (for v0, v1) in KafkaApis do we need to even touch this timestamp? i.e., we should basically ignore it right? So we only need to do: value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp). - Joel Koshy On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 6, 2014, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Jenkins build is back to normal : Kafka-trunk #324
See https://builds.apache.org/job/Kafka-trunk/324/changes
[jira] [Created] (KAFKA-1758) corrupt recovery file prevents startup
Jason Rosenberg created KAFKA-1758: -- Summary: corrupt recovery file prevents startup Key: KAFKA-1758 URL: https://issues.apache.org/jira/browse/KAFKA-1758 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Hi, We recently had a kafka node go down suddenly. When it came back up, it apparently had a corrupt recovery file, and refused to startup: {code} 2014-11-06 08:17:19,299 WARN [main] server.KafkaServer - Error starting up KafkaServer java.lang.NumberFormatException: For input string: ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@ ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@ at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:481) at java.lang.Integer.parseInt(Integer.java:527) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) {code} And the app is under a monitor (so it was repeatedly restarting and failing with this error for several minutes before we got to it)… We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and it then restarted cleanly (but of course re-synced all it’s data from replicas, so we had no data loss). Anyway, I’m wondering if that’s the expected behavior? Or should it not declare it corrupt and then proceed automatically to an unclean restart? Should this NumberFormatException be handled a bit more gracefully? We saved the corrupt file if it’s worth inspecting (although I doubt it will be useful!)…. The corrupt files appeared to be all zeroes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27634: Patch for KAFKA-1667
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/#review60188 --- Awesome. Its building successfully now. core/src/main/scala/kafka/utils/Utils.scala https://reviews.apache.org/r/27634/#comment101524 I think I'm not seeing why we need this. Shouldn't Scala's JavaConversion class handle this exact case? core/src/test/scala/kafka/log/LogConfigTest.scala https://reviews.apache.org/r/27634/#comment101525 Can you add a test that shows that we indeed fail validation when creating LogConfig with invalid values? core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala https://reviews.apache.org/r/27634/#comment101532 String.valueOf(false) should evaluate to false, right? Why do we need nottrue? If String.valueOf(false) no longer works as expected, it looks like a bug waiting to happen... core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala https://reviews.apache.org/r/27634/#comment101527 why are we removing a test? - Gwen Shapira On Nov. 6, 2014, 4:12 p.m., Dmytro Kostiuchenko wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/ --- (Updated Nov. 6, 2014, 4:12 p.m.) Review request for kafka. Bugs: KAFKA-1667 https://issues.apache.org/jira/browse/KAFKA-1667 Repository: kafka Description --- KAFKA-1667 Fixed bugs in LogConfig. Added test and documentation KAFKA-1667 Updated tests to reflect new boolean property parsing logic KAFKA-1667 renamed methods to match naming convention Diffs - clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java c4cea2cc072f4db4ce014b63d226431d3766bef1 core/src/main/scala/kafka/admin/TopicCommand.scala 0b2735e7fc42ef9894bef1997b1f06a8ebee5439 core/src/main/scala/kafka/log/LogConfig.scala e48922a97727dd0b98f3ae630ebb0af3bef2373d core/src/main/scala/kafka/utils/Utils.scala 23aefb4715b177feae1d2f83e8b910653ea10c5f core/src/test/scala/kafka/log/LogConfigTest.scala PRE-CREATION core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala f44568cb25edf25db857415119018fd4c9922f61 Diff: https://reviews.apache.org/r/27634/diff/ Testing --- Thanks, Dmytro Kostiuchenko
Re: Announcing Confluent
Best of Luck..keep rocking... On Fri, Nov 7, 2014 at 1:30 AM, Joe Brown brownjoe...@gmail.com wrote: Best of luck!!! J On 6 Nov 2014, at 18:28, Jay Kreps jay.kr...@gmail.com wrote: Hey all, I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a company around Kafka called Confluent. We are planning on productizing the kind of Kafka-based real-time data platform we built out at LinkedIn. We are doing this because we think this is a really powerful idea and we felt there was a lot to do to make this idea really take root. We wanted to make that our full time mission and focus. There is a blog post that goes into a little more depth here: http://blog.confluent.io/ LinkedIn will remain a heavy Kafka user and contributor. Combined with our additional resources from the funding of the company this should be a really good thing for the Kafka development effort. Especially when combined with the increasing contributions from the rest of the development community. This is great news, as there is a lot of work to do. We'll need to really focus on scaling this distributed development in a healthy way. One thing I do want to emphasize is that the addition of a company in the Kafka ecosystem won’t mean meddling with open source. Kafka will remain 100% open source and community focused, as of course is true of any Apache project. I have been doing open source for a long time and strongly believe it is the right model for infrastructure software development. Confluent is just getting off the ground now. We left LinkedIn, raised some money, and we have an office (but no furniture yet!). None the less, f you are interested in finding out more about the company and either getting help with your Kafka usage or joining us to help build all this, by all means reach out to us, we’d love to talk. Wish us luck! -Jay -- Thanks, Pankaj Ojha
[jira] [Updated] (KAFKA-1742) ControllerContext removeTopic does not correctly update state
[ https://issues.apache.org/jira/browse/KAFKA-1742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-1742: Attachment: KAFKA-1742.patch ControllerContext removeTopic does not correctly update state - Key: KAFKA-1742 URL: https://issues.apache.org/jira/browse/KAFKA-1742 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Onur Karaman Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1742.patch removeTopic does not correctly update the state of ControllerContext. This is because it removes the topic from some underlying maps through dropWhile. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-391) Producer request and response classes should use maps
[ https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201702#comment-14201702 ] Honghai Chen commented on KAFKA-391: Many thanks for you help. After debugging and testing, seemly I can't hit that exception. Actually we're using one c# version client which is inherit from https://github.com/precog/kafka/tree/master/clients/csharp/src/Kafka/Kafka.Client , and after debug and compare it's code with java version, finally prove that it's the bug of the C# code. In java version, when create ProducerRequest, it set produceRequest.data as messagesPerTopic, and do group by topic just before send binary. But in our c# version, it group it first and set the produceRequest.data as dictionary of Topic,Data, so we hit this exception wrongly, we fixed it. Many thanks for your time. But anyway, can't find our related open source version from internet, our version has DefaultCallbackHandler.cs, but the version on https://github.com/precog/kafka/tree/master/clients/csharp/src/Kafka/Kafka.Client has no, so can't provide the link here. The java link: https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/api/ProducerRequest.scala;h=570b2da1d865086f9830aa919a49063abbbe574d;hb=HEAD https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala;h=821901e4f434dfd9eec6eceabfc2e1e65507a57c;hb=HEAD#l260 Producer request and response classes should use maps - Key: KAFKA-391 URL: https://issues.apache.org/jira/browse/KAFKA-391 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Joel Koshy Priority: Blocker Labels: optimization Fix For: 0.8.0 Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch, KAFKA-391-v3.patch, KAFKA-391-v4.patch Producer response contains two arrays of error codes and offsets - the ordering in these arrays correspond to the flattened ordering of the request arrays. It would be better to switch to maps in the request and response as this would make the code clearer and more efficient (right now, linear scans are used in handling producer acks). We can probably do the same in the fetch request/response. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26474: KAFKA-1654 Provide a way to override server configuration from command line
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26474/ --- (Updated Nov. 7, 2014, 7:14 a.m.) Review request for kafka and Neha Narkhede. Changes --- Incorporated Neha's feedback from JIRA. Bugs: SQOOP-1654 https://issues.apache.org/jira/browse/SQOOP-1654 Repository: kafka Description --- I'm assuming that we might want to add additional arguments in the future as well, so I've added general facility to parse arguments to Kafka main class and added argument --set that defines/overrides any property in the config file. I've decided to use --set rather then exposing each property that is availalbe in KafkaConfig class as it's own argument, so that we don't have to keep those two classes always in sync. This is first bigger patch that I've written in Scala, so I'm particularly interested to hear feedback on the coding style. Diffs (updated) - core/src/main/scala/kafka/Kafka.scala 2e94fee core/src/test/scala/unit/kafka/KafkaTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/26474/diff/ Testing --- I've added unit tests and verified the functionality on real cluster. Thanks, Jarek Cecho
[jira] [Updated] (KAFKA-1654) Provide a way to override server configuration from command line
[ https://issues.apache.org/jira/browse/KAFKA-1654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jarek Jarcec Cecho updated KAFKA-1654: -- Attachment: KAFKA-1654.patch Provide a way to override server configuration from command line Key: KAFKA-1654 URL: https://issues.apache.org/jira/browse/KAFKA-1654 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1.1 Reporter: Jarek Jarcec Cecho Assignee: Jarek Jarcec Cecho Fix For: 0.8.3 Attachments: KAFKA-1654.patch, KAFKA-1654.patch I've been recently playing with Kafka and I found the current way of server configuration quite inflexible. All the configuration options have to be inside a properties file and there is no way how they can be overridden for execution. In order to temporarily change one property I had to copy the config file and change the property there. Hence, I'm wondering if people would be open to provide a way how to specify and override the configs from the command line when starting Kafka? Something like: {code} ./bin/kafka-server-start.sh -Dmy.cool.property=X kafka.properties {code} or {code} ./bin/kafka-server-start.sh --set my.cool.property=X kafka.properties {code} I'm more than happy to take a stab at it, but I would like to see if there is an interest for such capability? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1654) Provide a way to override server configuration from command line
[ https://issues.apache.org/jira/browse/KAFKA-1654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14201718#comment-14201718 ] Jarek Jarcec Cecho commented on KAFKA-1654: --- I've added second version of the patch that: * Renames the {{\-\-set}} to {{\-\-override}} * Enforces that only valid parameters are on the command line and there are no so called nonOptions Provide a way to override server configuration from command line Key: KAFKA-1654 URL: https://issues.apache.org/jira/browse/KAFKA-1654 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.1.1 Reporter: Jarek Jarcec Cecho Assignee: Jarek Jarcec Cecho Fix For: 0.8.3 Attachments: KAFKA-1654.patch, KAFKA-1654.patch I've been recently playing with Kafka and I found the current way of server configuration quite inflexible. All the configuration options have to be inside a properties file and there is no way how they can be overridden for execution. In order to temporarily change one property I had to copy the config file and change the property there. Hence, I'm wondering if people would be open to provide a way how to specify and override the configs from the command line when starting Kafka? Something like: {code} ./bin/kafka-server-start.sh -Dmy.cool.property=X kafka.properties {code} or {code} ./bin/kafka-server-start.sh --set my.cool.property=X kafka.properties {code} I'm more than happy to take a stab at it, but I would like to see if there is an interest for such capability? -- This message was sent by Atlassian JIRA (v6.3.4#6332)