Thanks Jun. I don't see any error code and the fetch size is large enough to than the largest single message. Actually, when I call response.messageSet(topic, partition).toBuffer.size the value is the number of messages I've produced to Kafka.
On Tue Jan 20 2015 at 上午12:31:53 Jun Rao <j...@confluent.io> wrote: > Did you get any error code in the response? Also, make sure fetchSize is > larger than the largest single message. > > Thanks, > > Jun > > On Sun, Jan 18, 2015 at 4:54 PM, Manu Zhang <owenzhang1...@gmail.com> > wrote: > > > Hi all, > > > > I'm using Kafka low level consumer api and find in the below codes > > "iterator.hasNext" always return false. Through debugging, I'm sure the > > messageSet has the size of "fetchSize" > > > > * val consumer = new SimpleConsumer(broker.host, broker.port, > > soTimeout, soBufferSize, clientId)* > > * val request = new FetchRequestBuilder()* > > * .addFetch(topic, partition, offset, fetchSize)* > > * .build()* > > * val response = consumer.fetch(request)* > > * response.errorCode(topic, partition) match {* > > * case NoError => {* > > * iterator = response.messageSet(topic, partition).iterator* > > * }* > > * case error => throw exceptionFor(error)* > > * }* > > > > The weird thing is that the iterator works fine when I get iterator > > directly without checking the error code. > > > > * val consumer = new SimpleConsumer(broker.host, broker.port, > > soTimeout, soBufferSize, clientId)* > > * val request = new FetchRequestBuilder()* > > * .addFetch(topic, partition, offset, fetchSize)* > > * .build()* > > * consumer.fetch(request).messageSet(topic, partition).iterator* > > > > Any thoughts ? > > > > Thanks, > > Manu Zhang > > >