[ https://issues.apache.org/jira/browse/KAFKA-576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13481558#comment-13481558 ]
John Fung edited comment on KAFKA-576 at 10/22/12 6:01 PM: ----------------------------------------------------------- 1. SimpleConsumerShell will receive data from 1 of the broker in a 3-broker cluster with this change: $ svn diff core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala =================================================================== --- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (revision 1400944) +++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (working copy) @@ -186,7 +186,7 @@ var consumed = 0 for(messageAndOffset <- messageSet) { try { - offset = messageAndOffset.offset + offset = messageAndOffset.nextOffset if(printOffsets) System.out.println("next offset = " + offset) formatter.writeTo(messageAndOffset.message, System.out) 2. By printing out the producedOffset in SimpleConsumer and showing -1: $ svn diff core/src/main/scala/kafka/consumer/SimpleConsumer.scala Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1400944) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -50,6 +50,7 @@ if (simpleConsumer != null) simpleConsumer.close() } + System.out.println("====> producedOffset : " + producedOffset) producedOffset } 3. Exception is thrown from SimpleConsumer.earliestOrLatestOffset( ): $ bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9093 --topic test_1 --partition 0 --replica 3 [2012-10-22 10:41:56,525] INFO Getting topic metatdata... (kafka.tools.SimpleConsumerShell$) [2012-10-22 10:41:56,583] INFO Fetching metadata for topic Set(test_1) (kafka.client.ClientUtils$) [2012-10-22 10:41:56,589] INFO Connected to localhost:9093 for producing (kafka.producer.SyncProducer) [2012-10-22 10:41:56,751] INFO Disconnecting from localhost:9093 (kafka.producer.SyncProducer) [2012-10-22 10:41:56,784] ERROR error in earliestOrLatestOffset() (kafka.consumer.SimpleConsumer$) java.lang.UnsupportedOperationException: empty.head at scala.collection.immutable.Vector.head(Vector.scala:162) at kafka.consumer.SimpleConsumer$.earliestOrLatestOffset(SimpleConsumer.scala:45) at kafka.tools.SimpleConsumerShell$.main(SimpleConsumerShell.scala:169) at kafka.tools.SimpleConsumerShell.main(SimpleConsumerShell.scala) ====> producedOffset : -1 [2012-10-22 10:41:56,786] INFO Starting simple consumer shell to partition [test_1, 0], replica [3], host and port: [127.0.0.1, 9093], from offset [-1] (kafka.tools.SimpleConsumerShell$) was (Author: jfung): * SimpleConsumerShell will receive data from 1 of the broker in a 3-broker cluster with this change: $ svn diff core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala =================================================================== --- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (revision 1400944) +++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (working copy) @@ -186,7 +186,7 @@ var consumed = 0 for(messageAndOffset <- messageSet) { try { - offset = messageAndOffset.offset + offset = messageAndOffset.nextOffset if(printOffsets) System.out.println("next offset = " + offset) formatter.writeTo(messageAndOffset.message, System.out) * By printing out the producedOffset in SimpleConsumer and showing -1: $ svn diff core/src/main/scala/kafka/consumer/SimpleConsumer.scala Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1400944) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -50,6 +50,7 @@ if (simpleConsumer != null) simpleConsumer.close() } + System.out.println("====> producedOffset : " + producedOffset) producedOffset } * Exception is thrown from SimpleConsumer.earliestOrLatestOffset( ): $ bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9093 --topic test_1 --partition 0 --replica 3 [2012-10-22 10:41:56,525] INFO Getting topic metatdata... (kafka.tools.SimpleConsumerShell$) [2012-10-22 10:41:56,583] INFO Fetching metadata for topic Set(test_1) (kafka.client.ClientUtils$) [2012-10-22 10:41:56,589] INFO Connected to localhost:9093 for producing (kafka.producer.SyncProducer) [2012-10-22 10:41:56,751] INFO Disconnecting from localhost:9093 (kafka.producer.SyncProducer) [2012-10-22 10:41:56,784] ERROR error in earliestOrLatestOffset() (kafka.consumer.SimpleConsumer$) java.lang.UnsupportedOperationException: empty.head at scala.collection.immutable.Vector.head(Vector.scala:162) at kafka.consumer.SimpleConsumer$.earliestOrLatestOffset(SimpleConsumer.scala:45) at kafka.tools.SimpleConsumerShell$.main(SimpleConsumerShell.scala:169) at kafka.tools.SimpleConsumerShell.main(SimpleConsumerShell.scala) ====> producedOffset : -1 [2012-10-22 10:41:56,786] INFO Starting simple consumer shell to partition [test_1, 0], replica [3], host and port: [127.0.0.1, 9093], from offset [-1] (kafka.tools.SimpleConsumerShell$) > SimpleConsumerShell runs into an infinite loop > ---------------------------------------------- > > Key: KAFKA-576 > URL: https://issues.apache.org/jira/browse/KAFKA-576 > Project: Kafka > Issue Type: Bug > Reporter: John Fung > > * In this case, there are 15 log segment files in broker-1 data dir: > ls -l /tmp/kafka_server_1_logs/test_1-0/ > total 240 > -rw-r--r-- 1 jfung eng 16 Oct 16 10:41 00000000000000000000.index > -rw-r--r-- 1 jfung eng 10440 Oct 16 10:40 00000000000000000000.log > -rw-r--r-- 1 jfung eng 8 Oct 16 10:41 00000000000000000020.index > -rw-r--r-- 1 jfung eng 10440 Oct 16 10:40 00000000000000000020.log > . . . > -rw-r--r-- 1 jfung eng 8 Oct 16 10:41 00000000000000000280.index > -rw-r--r-- 1 jfung eng 10440 Oct 16 10:41 00000000000000000280.log > * The following are the dump log segment of the first log segment file > bin/kafka-run-class.sh kafka.tools.DumpLogSegments > /tmp/kafka_server_1_logs/test_1-0/00000000000000000000.log > Dumping /tmp/kafka_server_1_logs/test_1-0/00000000000000000000.log > Starting offset: 0 > offset: 0 isvalid: true payloadsize: 500 magic: 2 compresscodec: > NoCompressionCodec crc: 1663889063 > offset: 1 isvalid: true payloadsize: 500 magic: 2 compresscodec: > NoCompressionCodec crc: 2803454828 > offset: 2 isvalid: true payloadsize: 500 magic: 2 compresscodec: > NoCompressionCodec crc: 683347625 > . . . > offset: 18 isvalid: true payloadsize: 500 magic: 2 compresscodec: > NoCompressionCodec crc: 1892511043 > offset: 19 isvalid: true payloadsize: 500 magic: 2 compresscodec: > NoCompressionCodec crc: 601297044 > * Output of SimpleConsumerShell: > . . . > next offset = 16 > Topic:test_1:ThreadID:2:MessageID:0000000043:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx > > next offset = 17 > Topic:test_1:ThreadID:3:MessageID:0000000063:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx > > next offset = 18 > Topic:test_1:ThreadID:4:MessageID:0000000083:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx > > next offset = 19 > Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx > > next offset = 19 > Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx > > next offset = 19 > Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx > > next offset = 19 > Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx > > . . . > * It appears that SimpleConsumerShell doesn't advance to the next log segment > file > * It should probably block inside the while loop to prevent infinite looping -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira