Hi I face to OOME while trying to consume from one topic 10 partitions (100 000 messages each partition) 5 consumers(consumer groups), consumer.timeout=10ms. OOME was gotten after 1-2 minutes after start. Java heap - Xms=1024M LAN about 10Gbit This is standalone application.
Kafka version 0.8.2 Messages have about 5-10kB size each. Before OOME consumers received from 5 000 to 30 000 messages per request from one topic. Each consumer reads from different topics. Part of the code(manually handle offsets) : Map<String, Integer> streamCounts = Collections.singletonMap(topic, 1); ConsumerConnector connector = consumer.createJavaConsumerConnector(consumerConf); Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(streamCounts); //read in one stream KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0); //read from stream in infinity loop while(true){ try { for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) { Message msg = new Message(messageAndMetadata); messages.add(msg); } } catch (ConsumerTimeoutException ignore) { // throws every time when Kafka consumer timeout was reached } //some logic //commit offsets //thread sleep 1 sec } //close connection Stacktrace log [[03/06/2015 13:23:56 [ConsumerFetcherThread-preprocessorTopicReporting_mo-host-1425648218660-2d7f632b-0-5] ERROR kafka.network.BoundedByteBufferReceive - OOME with size 1048612 2097206 java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:329) at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) *Q1: How to solve this issue?* *Q2: What can be the root cause?* *Q3: How can i control memory allocating for each consumer?* *Thanks* *Dima.*