Thank you Nathan Leung. We set conf.setMaxSpoutPending(5000);
I checked the API that TOPOLOGY_MAX_SPOUT_PENDING The maximum number of tuples that can be pending on a spout task at any given time. In our scenario one file has 40K lines, so it will emit 40K tuples. Could you describe a little more about how this setting work? Regards, Sai On Wed, Nov 26, 2014 at 9:56 PM, Nathan Leung <[email protected]> wrote: > set the spout max pending, and make sure that you ack your messages in the > bolt/s. > > On Wed, Nov 26, 2014 at 8:39 AM, 张炜 <[email protected]> wrote: >> >> Dear all, >> We frequently meet a heap out of space problem when running topology using >> KafkaSpout. Please kindly help. >> >> Our scenario is that we send large files to Kafka, each file is about 3MB >> size. We use Storm to consume messages from Kafka (using KafkaSpout), and we >> process the message line by line and emit messages. >> We find that very frequently there are memory problems as shown below: >> >> java.lang.OutOfMemoryError: Java heap space at >> java.util.Arrays.copyOf(Arrays.java:2271) at >> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at >> java.io.ByteArrayOutputStream.ensur >> java.lang.OutOfMemoryError: Java heap space at >> com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296) at >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read( >> java.lang.OutOfMemoryError: GC overhead limit exceeded at >> java.util.Arrays.copyOf(Arrays.java:2367) at >> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) >> at java.lang.Ab >> >> >> Our settings of Storm are: >> >> drpc.childopts -Xmx768m >> supervisor.childopts -Xmx256m >> worker.childopts -Xmx1536m -Xms1024m -XX:MaxPermSize=128m -XX:NewSize=512m >> -XX:MaxNewSize=1024m >> >> Each node of our cluster is 4CPU, 8GB memory, and we configured 4 workers >> a node >> >> We dumped the memory and analyzed that there is an LinkedList object >> holding lots of memory, and we found that it's used by KafkaSpout. >> >> The List is used to hold all the messages, if we understand correctly, >> KafkaSpout will fetch all the messages from current consumed offset to the >> max offset and store the messages in the list. >> Because kafka producer is very fast, and in Storm we process line by line >> which is not consuming fast enough, the list gets bigger and bigger. >> >> So my questions are these: >> 1) If our analysis is correct, how to limit the size of messages that >> KafkaSpout fetch every time, for example, make it not fetch from current >> offset to the latest messages. Or to say, fetch a fixed number of messages, >> for instance. >> >> 2) If our analysis is not correct, could you give a suggestion where the >> problems are? Also are the memory settings correct? >> >> Thank you very much for your help! >> >> Regards, >> Sai >> > -- 流水不争先o00
