Hi

I face to OOME while trying to consume from one topic 10 partitions (100
000 messages each partition) 5 consumers(consumer groups),
consumer.timeout=10ms. OOME was gotten after 1-2 minutes after start.
Java heap - Xms=1024M
LAN about 10Gbit
This is standalone application.

Kafka version 0.8.2

Messages have about 5-10kB size each.
Before OOME consumers received from 5 000 to 30 000 messages per request
from one topic.
Each consumer reads from different topics.

Part of the code(manually handle offsets) :

 Map<String, Integer> streamCounts = Collections.singletonMap(topic, 1);
        ConsumerConnector connector =
consumer.createJavaConsumerConnector(consumerConf);

Map<String, List<KafkaStream<byte[], byte[]>>> streams =
connector.createMessageStreams(streamCounts);
//read in one stream
KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);
//read from stream in infinity loop
while(true){
         try {
            for (MessageAndMetadata<byte[], byte[]> messageAndMetadata :
stream) {
                Message msg = new Message(messageAndMetadata);
                messages.add(msg);
            }
        } catch (ConsumerTimeoutException ignore) {
            // throws every time when Kafka consumer timeout was reached
        }
//some logic

//commit offsets

//thread sleep 1 sec
}

//close connection



Stacktrace log

[[03/06/2015 13:23:56
[ConsumerFetcherThread-preprocessorTopicReporting_mo-host-1425648218660-2d7f632b-0-5]
ERROR kafka.network.BoundedByteBufferReceive - OOME with size 1048612
                   2097206
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:329)
        at
kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
        at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
        at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
        at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
        at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
        at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

*Q1: How to solve this issue?*
*Q2: What can be the root cause?*
*Q3: How can i control memory allocating for each consumer?*

*Thanks*
*Dima.*

Reply via email to