i am using 0.8.0. The high level api works as expected. <dependency>
<groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> On Thu, Feb 20, 2014 at 2:40 PM, Chen Wang <[email protected]>wrote: > Hi, > I am using kafka for the first time, and was running the sample from > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > > However, I cannot read any data from kafka. The kafka has 10 > partitions,and I tried to read from any of them. The fetch can succeed, > however, the message size returned is always 0( System.out > .println("the message size is" + messageSet > .kafka$javaapi$message$ByteBufferMessageSet$$underlying() > .size());). Is there something apparent missing for my case? > > > while (a_maxReads > 0) { > if (consumer == null) { > consumer = new SimpleConsumer(leadBroker, a_port, 100000, > 10240 * 1024, clientName); > } > > System.out.println("start fetching"); > System.out.println("the readoffset is" + readOffset); > > FetchRequest req = new FetchRequestBuilder().clientId(clientName) > .addFetch(a_topic, a_partition, readOffset, 1000000) > .build(); > FetchResponse fetchResponse = consumer.fetch(req); > System.out.println("finish fetching"); > if (fetchResponse.hasError()) { > numErrors++; > // Something went wrong! > short code = fetchResponse.errorCode(a_topic, a_partition); > System.out.println("Error fetching data from the Broker:" + > leadBroker + " Reason: " + code); > if (numErrors > 5) > break; > if (code == ErrorMapping.OffsetOutOfRangeCode()) { > // We asked for an invalid offset. For simple case ask for > // the last element to reset > readOffset = getLastOffset(consumer, a_topic, a_partition, > kafka.api.OffsetRequest.LatestTime(), clientName); > continue; > } > consumer.close(); > consumer = null; > leadBroker = findNewLeader(leadBroker, a_topic, a_partition, > a_port); > continue; > } > numErrors = 0; > > long numRead = 0; > System.out.println("The topic is:" + a_topic + " partition is : " + > a_partition); > ByteBufferMessageSet messageSet = fetchResponse.messageSet(a_topic, > a_partition); > System.out > .println("the message size is" + messageSet > .kafka$javaapi$message$ByteBufferMessageSet$$underlying() > .size()); > for (MessageAndOffset messageAndOffset: messageSet) { > long currentOffset = messageAndOffset.offset(); > if (currentOffset < readOffset) { > System.out.println("Found an old offset: " + currentOffset + " > Expecting: " + readOffset); > continue; > } > readOffset = messageAndOffset.nextOffset(); > ByteBuffer payload = messageAndOffset.message().payload(); > > byte[] bytes = new byte[payload.limit()]; > payload.get(bytes); > System.out.println(String.valueOf(messageAndOffset.offset()) + ": > " + new String(bytes, "UTF-8")); > numRead++; > a_maxReads--; > } > > if (numRead == 0) { > try { > Thread.sleep(1000); > } catch (InterruptedException ie) {} > > Thanks much! > Chen >
