The max spout pending dictates how many tuples can be "in flight" from a
spout task at any given time. This means the number of tuples that are not
fully acknowledged by the downstream bolts. You can see the guaranteed
message processing document for more details:
https://storm.apache.org/documentation/Guaranteeing-message-processing.html

Note that if you have 1 spout task, and max spout pending is 5,000, then
you can have 5,000 tuples in flight.  But if you have 4 spout tasks, that
number becomes 20,000 (5,000 per spout task).  So if you have a lot of
spout tasks for each worker then the amount of memory required will
increase.

Also this kind of depends on how your kafka topic is setup.  If you have 4
partitions but 8 spout tasks, then only 4 of the spout tasks will read from
the topic.

On Wed, Nov 26, 2014 at 9:11 AM, 张炜 <[email protected]> wrote:

> Thank you Nathan Leung.
>
> We set conf.setMaxSpoutPending(5000);
>
> I checked the API that
>
> TOPOLOGY_MAX_SPOUT_PENDING
> The maximum number of tuples that can be pending on a spout task at
> any given time.
>
> In our scenario one file has 40K lines, so it will emit 40K tuples.
>
> Could you describe a little more about how this setting work?
>
> Regards,
> Sai
>
> On Wed, Nov 26, 2014 at 9:56 PM, Nathan Leung <[email protected]> wrote:
> > set the spout max pending, and make sure that you ack your messages in
> the
> > bolt/s.
> >
> > On Wed, Nov 26, 2014 at 8:39 AM, 张炜 <[email protected]> wrote:
> >>
> >> Dear all,
> >> We frequently meet a heap out of space problem when running topology
> using
> >> KafkaSpout. Please kindly help.
> >>
> >> Our scenario is that we send large files to Kafka, each file is about
> 3MB
> >> size. We use Storm to consume messages from Kafka (using KafkaSpout),
> and we
> >> process the message line by line and emit messages.
> >> We find that very frequently there are memory problems as shown below:
> >>
> >> java.lang.OutOfMemoryError: Java heap space at
> >> java.util.Arrays.copyOf(Arrays.java:2271) at
> >> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at
> >> java.io.ByteArrayOutputStream.ensur
> >> java.lang.OutOfMemoryError: Java heap space at
> >> com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296) at
> >>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(
> >> java.lang.OutOfMemoryError: GC overhead limit exceeded at
> >> java.util.Arrays.copyOf(Arrays.java:2367) at
> >>
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> >> at java.lang.Ab
> >>
> >>
> >> Our settings of Storm are:
> >>
> >> drpc.childopts -Xmx768m
> >> supervisor.childopts -Xmx256m
> >> worker.childopts -Xmx1536m -Xms1024m -XX:MaxPermSize=128m
> -XX:NewSize=512m
> >> -XX:MaxNewSize=1024m
> >>
> >> Each node of our cluster is 4CPU, 8GB memory, and we configured 4
> workers
> >> a node
> >>
> >> We dumped the memory and analyzed that there is an LinkedList object
> >> holding lots of memory, and we found that it's used by KafkaSpout.
> >>
> >> The List is used to hold all the messages, if we understand correctly,
> >> KafkaSpout will fetch all the messages from current consumed offset to
> the
> >> max offset and store the messages in the list.
> >> Because kafka producer is very fast, and in Storm we process line by
> line
> >> which is not consuming fast enough, the list gets bigger and bigger.
> >>
> >> So my questions are these:
> >> 1) If our analysis is correct, how to limit the size of messages that
> >> KafkaSpout fetch every time, for example, make it not fetch from current
> >> offset to the latest messages. Or to say, fetch a fixed number of
> messages,
> >> for instance.
> >>
> >> 2) If our analysis is not correct, could you give a suggestion where the
> >> problems are? Also are the memory settings correct?
> >>
> >> Thank you very much for your help!
> >>
> >> Regards,
> >> Sai
> >>
> >
>
>
>
> --
> 流水不争先o00
>

Reply via email to