Never mind. It was actually working. I just need to wait a bit longer for data to come into the partition i was testing for. Chen
On Thu, Feb 20, 2014 at 2:41 PM, Chen Wang <[email protected]>wrote: > i am using 0.8.0. The high level api works as expected. > > <dependency> > > <groupId>org.apache.kafka</groupId> > > <artifactId>kafka_2.10</artifactId> > > <version>0.8.0</version> > > > On Thu, Feb 20, 2014 at 2:40 PM, Chen Wang <[email protected]>wrote: > >> Hi, >> I am using kafka for the first time, and was running the sample from >> >> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example >> >> However, I cannot read any data from kafka. The kafka has 10 >> partitions,and I tried to read from any of them. The fetch can succeed, >> however, the message size returned is always 0( System.out >> .println("the message size is" + messageSet >> .kafka$javaapi$message$ByteBufferMessageSet$$underlying() >> .size());). Is there something apparent missing for my case? >> >> >> while (a_maxReads > 0) { >> if (consumer == null) { >> consumer = new SimpleConsumer(leadBroker, a_port, 100000, >> 10240 * 1024, clientName); >> } >> >> System.out.println("start fetching"); >> System.out.println("the readoffset is" + readOffset); >> >> FetchRequest req = new FetchRequestBuilder().clientId(clientName) >> .addFetch(a_topic, a_partition, readOffset, 1000000) >> .build(); >> FetchResponse fetchResponse = consumer.fetch(req); >> System.out.println("finish fetching"); >> if (fetchResponse.hasError()) { >> numErrors++; >> // Something went wrong! >> short code = fetchResponse.errorCode(a_topic, a_partition); >> System.out.println("Error fetching data from the Broker:" + >> leadBroker + " Reason: " + code); >> if (numErrors > 5) >> break; >> if (code == ErrorMapping.OffsetOutOfRangeCode()) { >> // We asked for an invalid offset. For simple case ask for >> // the last element to reset >> readOffset = getLastOffset(consumer, a_topic, a_partition, >> kafka.api.OffsetRequest.LatestTime(), clientName); >> continue; >> } >> consumer.close(); >> consumer = null; >> leadBroker = findNewLeader(leadBroker, a_topic, a_partition, >> a_port); >> continue; >> } >> numErrors = 0; >> >> long numRead = 0; >> System.out.println("The topic is:" + a_topic + " partition is : " + >> a_partition); >> ByteBufferMessageSet messageSet = fetchResponse.messageSet(a_topic, >> a_partition); >> System.out >> .println("the message size is" + messageSet >> .kafka$javaapi$message$ByteBufferMessageSet$$underlying() >> .size()); >> for (MessageAndOffset messageAndOffset: messageSet) { >> long currentOffset = messageAndOffset.offset(); >> if (currentOffset < readOffset) { >> System.out.println("Found an old offset: " + currentOffset + >> " Expecting: " + readOffset); >> continue; >> } >> readOffset = messageAndOffset.nextOffset(); >> ByteBuffer payload = messageAndOffset.message().payload(); >> >> byte[] bytes = new byte[payload.limit()]; >> payload.get(bytes); >> System.out.println(String.valueOf(messageAndOffset.offset()) + ": >> " + new String(bytes, "UTF-8")); >> numRead++; >> a_maxReads--; >> } >> >> if (numRead == 0) { >> try { >> Thread.sleep(1000); >> } catch (InterruptedException ie) {} >> >> Thanks much! >> Chen >> > >
