[
https://issues.apache.org/jira/browse/KAFKA-576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13481558#comment-13481558
]
John Fung commented on KAFKA-576:
---------------------------------
* SimpleConsumerShell will receive data from 1 of the broker in a 3-broker
cluster with this change:
$ svn diff core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
===================================================================
--- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (revision
1400944)
+++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (working copy)
@@ -186,7 +186,7 @@
var consumed = 0
for(messageAndOffset <- messageSet) {
try {
- offset = messageAndOffset.offset
+ offset = messageAndOffset.nextOffset
if(printOffsets)
System.out.println("next offset = " + offset)
formatter.writeTo(messageAndOffset.message, System.out)
* By printing out the producedOffset in SimpleConsumer and showing -1:
$ svn diff core/src/main/scala/kafka/consumer/SimpleConsumer.scala
Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision
1400944)
+++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy)
@@ -50,6 +50,7 @@
if (simpleConsumer != null)
simpleConsumer.close()
}
+ System.out.println("====> producedOffset : " + producedOffset)
producedOffset
}
* Exception is thrown from SimpleConsumer.earliestOrLatestOffset( ):
$ bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list
localhost:9093 --topic test_1 --partition 0 --replica 3
[2012-10-22 10:41:56,525] INFO Getting topic metatdata...
(kafka.tools.SimpleConsumerShell$)
[2012-10-22 10:41:56,583] INFO Fetching metadata for topic Set(test_1)
(kafka.client.ClientUtils$)
[2012-10-22 10:41:56,589] INFO Connected to localhost:9093 for producing
(kafka.producer.SyncProducer)
[2012-10-22 10:41:56,751] INFO Disconnecting from localhost:9093
(kafka.producer.SyncProducer)
[2012-10-22 10:41:56,784] ERROR error in earliestOrLatestOffset()
(kafka.consumer.SimpleConsumer$)
java.lang.UnsupportedOperationException: empty.head
at scala.collection.immutable.Vector.head(Vector.scala:162)
at
kafka.consumer.SimpleConsumer$.earliestOrLatestOffset(SimpleConsumer.scala:45)
at kafka.tools.SimpleConsumerShell$.main(SimpleConsumerShell.scala:169)
at kafka.tools.SimpleConsumerShell.main(SimpleConsumerShell.scala)
====> producedOffset : -1
[2012-10-22 10:41:56,786] INFO Starting simple consumer shell to partition
[test_1, 0], replica [3], host and port: [127.0.0.1, 9093], from offset [-1]
(kafka.tools.SimpleConsumerShell$)
> SimpleConsumerShell runs into an infinite loop
> ----------------------------------------------
>
> Key: KAFKA-576
> URL: https://issues.apache.org/jira/browse/KAFKA-576
> Project: Kafka
> Issue Type: Bug
> Reporter: John Fung
>
> * In this case, there are 15 log segment files in broker-1 data dir:
> ls -l /tmp/kafka_server_1_logs/test_1-0/
> total 240
> -rw-r--r-- 1 jfung eng 16 Oct 16 10:41 00000000000000000000.index
> -rw-r--r-- 1 jfung eng 10440 Oct 16 10:40 00000000000000000000.log
> -rw-r--r-- 1 jfung eng 8 Oct 16 10:41 00000000000000000020.index
> -rw-r--r-- 1 jfung eng 10440 Oct 16 10:40 00000000000000000020.log
> . . .
> -rw-r--r-- 1 jfung eng 8 Oct 16 10:41 00000000000000000280.index
> -rw-r--r-- 1 jfung eng 10440 Oct 16 10:41 00000000000000000280.log
> * The following are the dump log segment of the first log segment file
> bin/kafka-run-class.sh kafka.tools.DumpLogSegments
> /tmp/kafka_server_1_logs/test_1-0/00000000000000000000.log
> Dumping /tmp/kafka_server_1_logs/test_1-0/00000000000000000000.log
> Starting offset: 0
> offset: 0 isvalid: true payloadsize: 500 magic: 2 compresscodec:
> NoCompressionCodec crc: 1663889063
> offset: 1 isvalid: true payloadsize: 500 magic: 2 compresscodec:
> NoCompressionCodec crc: 2803454828
> offset: 2 isvalid: true payloadsize: 500 magic: 2 compresscodec:
> NoCompressionCodec crc: 683347625
> . . .
> offset: 18 isvalid: true payloadsize: 500 magic: 2 compresscodec:
> NoCompressionCodec crc: 1892511043
> offset: 19 isvalid: true payloadsize: 500 magic: 2 compresscodec:
> NoCompressionCodec crc: 601297044
> * Output of SimpleConsumerShell:
> . . .
> next offset = 16
> Topic:test_1:ThreadID:2:MessageID:0000000043:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>
> next offset = 17
> Topic:test_1:ThreadID:3:MessageID:0000000063:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>
> next offset = 18
> Topic:test_1:ThreadID:4:MessageID:0000000083:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>
> next offset = 19
> Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>
> next offset = 19
> Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>
> next offset = 19
> Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>
> next offset = 19
> Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>
> . . .
> * It appears that SimpleConsumerShell doesn't advance to the next log segment
> file
> * It should probably block inside the while loop to prevent infinite looping
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira