[ 
https://issues.apache.org/jira/browse/KAFKA-576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13481558#comment-13481558
 ] 

John Fung edited comment on KAFKA-576 at 10/22/12 6:01 PM:
-----------------------------------------------------------

1. SimpleConsumerShell will receive data from 1 of the broker in a 3-broker 
cluster with this change:

$ svn diff core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
===================================================================
--- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala   (revision 
1400944)
+++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala   (working copy)
@@ -186,7 +186,7 @@
             var consumed = 0
             for(messageAndOffset <- messageSet) {
               try {
-                offset = messageAndOffset.offset
+                offset = messageAndOffset.nextOffset
                 if(printOffsets)
                   System.out.println("next offset = " + offset)
                 formatter.writeTo(messageAndOffset.message, System.out)


2. By printing out the producedOffset in SimpleConsumer and showing -1:

$ svn diff core/src/main/scala/kafka/consumer/SimpleConsumer.scala
Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala     (revision 
1400944)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala     (working copy)
@@ -50,6 +50,7 @@
       if (simpleConsumer != null)
         simpleConsumer.close()
     }
+    System.out.println("====> producedOffset : " + producedOffset)
     producedOffset
   }


3. Exception is thrown from SimpleConsumer.earliestOrLatestOffset( ):

$ bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list 
localhost:9093 --topic test_1 --partition 0 --replica 3

[2012-10-22 10:41:56,525] INFO Getting topic metatdata... 
(kafka.tools.SimpleConsumerShell$)
[2012-10-22 10:41:56,583] INFO Fetching metadata for topic Set(test_1) 
(kafka.client.ClientUtils$)
[2012-10-22 10:41:56,589] INFO Connected to localhost:9093 for producing 
(kafka.producer.SyncProducer)
[2012-10-22 10:41:56,751] INFO Disconnecting from localhost:9093 
(kafka.producer.SyncProducer)
[2012-10-22 10:41:56,784] ERROR error in earliestOrLatestOffset()  
(kafka.consumer.SimpleConsumer$)
java.lang.UnsupportedOperationException: empty.head
        at scala.collection.immutable.Vector.head(Vector.scala:162)
        at 
kafka.consumer.SimpleConsumer$.earliestOrLatestOffset(SimpleConsumer.scala:45)
        at kafka.tools.SimpleConsumerShell$.main(SimpleConsumerShell.scala:169)
        at kafka.tools.SimpleConsumerShell.main(SimpleConsumerShell.scala)
====> producedOffset : -1
[2012-10-22 10:41:56,786] INFO Starting simple consumer shell to partition 
[test_1, 0], replica [3], host and port: [127.0.0.1, 9093], from offset [-1] 
(kafka.tools.SimpleConsumerShell$)

                
      was (Author: jfung):
    * SimpleConsumerShell will receive data from 1 of the broker in a 3-broker 
cluster with this change:

$ svn diff core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
===================================================================
--- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala   (revision 
1400944)
+++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala   (working copy)
@@ -186,7 +186,7 @@
             var consumed = 0
             for(messageAndOffset <- messageSet) {
               try {
-                offset = messageAndOffset.offset
+                offset = messageAndOffset.nextOffset
                 if(printOffsets)
                   System.out.println("next offset = " + offset)
                 formatter.writeTo(messageAndOffset.message, System.out)


* By printing out the producedOffset in SimpleConsumer and showing -1:

$ svn diff core/src/main/scala/kafka/consumer/SimpleConsumer.scala
Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala     (revision 
1400944)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala     (working copy)
@@ -50,6 +50,7 @@
       if (simpleConsumer != null)
         simpleConsumer.close()
     }
+    System.out.println("====> producedOffset : " + producedOffset)
     producedOffset
   }


* Exception is thrown from SimpleConsumer.earliestOrLatestOffset( ):

$ bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list 
localhost:9093 --topic test_1 --partition 0 --replica 3

[2012-10-22 10:41:56,525] INFO Getting topic metatdata... 
(kafka.tools.SimpleConsumerShell$)
[2012-10-22 10:41:56,583] INFO Fetching metadata for topic Set(test_1) 
(kafka.client.ClientUtils$)
[2012-10-22 10:41:56,589] INFO Connected to localhost:9093 for producing 
(kafka.producer.SyncProducer)
[2012-10-22 10:41:56,751] INFO Disconnecting from localhost:9093 
(kafka.producer.SyncProducer)
[2012-10-22 10:41:56,784] ERROR error in earliestOrLatestOffset()  
(kafka.consumer.SimpleConsumer$)
java.lang.UnsupportedOperationException: empty.head
        at scala.collection.immutable.Vector.head(Vector.scala:162)
        at 
kafka.consumer.SimpleConsumer$.earliestOrLatestOffset(SimpleConsumer.scala:45)
        at kafka.tools.SimpleConsumerShell$.main(SimpleConsumerShell.scala:169)
        at kafka.tools.SimpleConsumerShell.main(SimpleConsumerShell.scala)
====> producedOffset : -1
[2012-10-22 10:41:56,786] INFO Starting simple consumer shell to partition 
[test_1, 0], replica [3], host and port: [127.0.0.1, 9093], from offset [-1] 
(kafka.tools.SimpleConsumerShell$)

                  
> SimpleConsumerShell runs into an infinite loop
> ----------------------------------------------
>
>                 Key: KAFKA-576
>                 URL: https://issues.apache.org/jira/browse/KAFKA-576
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: John Fung
>
> * In this case, there are 15 log segment files in broker-1 data dir:
> ls -l /tmp/kafka_server_1_logs/test_1-0/
> total 240
> -rw-r--r-- 1 jfung eng    16 Oct 16 10:41 00000000000000000000.index
> -rw-r--r-- 1 jfung eng 10440 Oct 16 10:40 00000000000000000000.log
> -rw-r--r-- 1 jfung eng     8 Oct 16 10:41 00000000000000000020.index
> -rw-r--r-- 1 jfung eng 10440 Oct 16 10:40 00000000000000000020.log
> . . .
> -rw-r--r-- 1 jfung eng     8 Oct 16 10:41 00000000000000000280.index
> -rw-r--r-- 1 jfung eng 10440 Oct 16 10:41 00000000000000000280.log
> * The following are the dump log segment of the first log segment file
> bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> /tmp/kafka_server_1_logs/test_1-0/00000000000000000000.log 
> Dumping /tmp/kafka_server_1_logs/test_1-0/00000000000000000000.log
> Starting offset: 0
> offset: 0 isvalid: true payloadsize: 500 magic: 2 compresscodec: 
> NoCompressionCodec crc: 1663889063
> offset: 1 isvalid: true payloadsize: 500 magic: 2 compresscodec: 
> NoCompressionCodec crc: 2803454828
> offset: 2 isvalid: true payloadsize: 500 magic: 2 compresscodec: 
> NoCompressionCodec crc: 683347625
> . . .
> offset: 18 isvalid: true payloadsize: 500 magic: 2 compresscodec: 
> NoCompressionCodec crc: 1892511043
> offset: 19 isvalid: true payloadsize: 500 magic: 2 compresscodec: 
> NoCompressionCodec crc: 601297044
> * Output of SimpleConsumerShell:
> . . .
> next offset = 16
> Topic:test_1:ThreadID:2:MessageID:0000000043:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>  
> next offset = 17
> Topic:test_1:ThreadID:3:MessageID:0000000063:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>  
> next offset = 18
> Topic:test_1:ThreadID:4:MessageID:0000000083:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>  
> next offset = 19
> Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>  
> next offset = 19
> Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>  
> next offset = 19
> Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>  
> next offset = 19
> Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>  
> . . .
> * It appears that SimpleConsumerShell doesn't advance to the next log segment 
> file
> * It should probably block inside the while loop to prevent infinite looping

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to