I still think you should try running with a larger heap.  :)  Max spout
pending determines how many tuples can be pending (tuple tree is not fully
acked) per spout task.  If you have many spout tasks per worker this can be
a large amount of memory.  It also depends on how big your tuples are.

On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <[email protected]> wrote:

> Hi, Nathan
>
> We have played around max spout pending in dev, if we set it as 10, it is
> OK, but if we set it more than 50, GC overhead starts to come out. We are
> finally writing tuples into postgresqlDB, the highest speed for writing
> into DB is around 40Krecords/minute, which is supposed to be very slow,
> maybe that is why tuples getting accumulated in memory before dumped into
> DB. But I think 10 is too small, does that mean, only 10 tuples are allowed
> in the flight?
>
> thanks
>
> AL
>
> On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <[email protected]> wrote:
>
>> I've not modified netty so I can't comment on that.  I would set max
>> spout pending; try 1000 at first.  This will limit the number of tuples
>> that you can have in flight simultaneously and therefore limit the amount
>> of memory used by these tuples and their processing.
>>
>> On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <[email protected]> wrote:
>>
>>> Hi, Nathan
>>>
>>> THe log size of that kafka topic is 23515541, each record is about 3K,
>>>  I check the yaml file, I don't have max spout pending set, so I assume
>>> it is should be default: topology.max.spout.pending: null
>>>
>>> Should I set it to a certain value? Also I sometimes seeing the
>>> java.nio.channels.ClosedChannelException: null, or  b.s.d.worker
>>> [ERROR] Error on initialization of server mk-worker
>>> does this mean I should add
>>> storm.messaging.netty.server_worker_threads: 1
>>> storm.messaging.netty.client_worker_threads: 1
>>> storm.messaging.netty.buffer_size: 5242880 #5MB buffer
>>> storm.messaging.netty.max_retries: 30
>>> storm.messaging.netty.max_wait_ms: 1000
>>> storm.messaging.netty.min_wait_ms: 100
>>>
>>> into yaml, and modfiy the values?
>>>
>>>
>>>
>>> thanks
>>>
>>>
>>>
>>> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <[email protected]> wrote:
>>>
>>>> How much data do you have in Kafka? How is your max spout pending set?
>>>> If you have a high max spout pending (or if you emit unanchored tuples) you
>>>> could be using up a lot of memory.
>>>> On Mar 6, 2015 5:14 PM, "Sa Li" <[email protected]> wrote:
>>>>
>>>>> Hi, Nathan
>>>>>
>>>>> I have met a strange issue, when I set spoutConf.forceFromStart=true,
>>>>> it will quickly run into GC overhead limit, even I already increase the
>>>>> heap size, but I if I remove this setting
>>>>> it will work fine, I was thinking maybe the kafkaSpout consuming data
>>>>> much faster than the data being written into postgresDB, and data will
>>>>> quick take the memory and causing heap overflow. But I did the same test 
>>>>> on
>>>>> my DEV cluster, it will working fine, even I set
>>>>> spoutConf.forceFromStart=true. I check the storm config for DEV and
>>>>> production, they are all same.
>>>>>
>>>>> Any hints?
>>>>>
>>>>> thanks
>>>>>
>>>>> AL
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I don't see anything glaring.  I would try increasing heap size.  It
>>>>>> could be that you're right on the threshold of what you've allocated and
>>>>>> you just need more memory.
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote:
>>>>>>
>>>>>>> Hi, All,
>>>>>>> ,
>>>>>>> I kind locate where the problem come from, in my running command, I
>>>>>>> will specify the clientid of TridentKafkaConfig, if I keep the clientid 
>>>>>>> as
>>>>>>> the one I used before, it will cause GC error, otherwise I am completely
>>>>>>> OK. Here is the code:
>>>>>>>
>>>>>>> if (parameters.containsKey("clientid")) {
>>>>>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + 
>>>>>>> parameters.get("topic"));
>>>>>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), 
>>>>>>> parameters.get("clientid"));
>>>>>>>
>>>>>>> Any idea about this error?
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>> AL
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> wrote:
>>>>>>>
>>>>>>>> Sorry, continue last thread:
>>>>>>>>
>>>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote
>>>>>>>> address is not reachable. We will close this client
>>>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703
>>>>>>>>         at
>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>>> Caused by: java.lang.RuntimeException: Remote address is not
>>>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/
>>>>>>>> 10.100.98.103:6703
>>>>>>>>         at
>>>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         ... 6 common frames omitted
>>>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process:
>>>>>>>> ("Async loop died!")
>>>>>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>>>>>         at
>>>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>>>>>> [clojure-1.5.1.jar:na]
>>>>>>>>         at
>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down
>>>>>>>> worker eventsStreamerv1-48-1425499636 
>>>>>>>> 0673ece0-cea2-4185-9b3e-6c49ad585576
>>>>>>>> 6703
>>>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty
>>>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703
>>>>>>>>
>>>>>>>> I doubt this is caused by my eventUpfater, which write data in batch
>>>>>>>>
>>>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>>>>>
>>>>>>>>             @Override
>>>>>>>>             public List<String> init(){
>>>>>>>>                      return null;
>>>>>>>>             }
>>>>>>>>
>>>>>>>>             @Override
>>>>>>>>             public List<String> reduce(List<String> curr, TridentTuple 
>>>>>>>> tuple) {
>>>>>>>>                    List<String> updated = null ;
>>>>>>>>
>>>>>>>>                    if ( curr == null ) {
>>>>>>>>                                     String event = (String) 
>>>>>>>> tuple.getValue(1);
>>>>>>>>                                     System.out.println("===:" + event 
>>>>>>>> + ":");
>>>>>>>>                                     updated = 
>>>>>>>> Lists.newArrayList(event);
>>>>>>>>                    } else {
>>>>>>>>                                     System.out.println("===+" +  tuple 
>>>>>>>> + ":");
>>>>>>>>                                     updated = curr ;
>>>>>>>>                    }
>>>>>>>> //              System.out.println("(())");
>>>>>>>>               return updated ;
>>>>>>>>             }
>>>>>>>>         }
>>>>>>>>
>>>>>>>> How do you think
>>>>>>>>
>>>>>>>> THanks
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thank you very much for the reply, here is error I saw in
>>>>>>>>> production server worker-6703.log,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in
>>>>>>>>>> the remote machines and run the tools over X or something.  I'm 
>>>>>>>>>> assuming
>>>>>>>>>> this is a development cluster (not live / production) and that 
>>>>>>>>>> installing
>>>>>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that
>>>>>>>>>>> runs in a worker that resides in a different cluster node than the 
>>>>>>>>>>> one the
>>>>>>>>>>> profiling tool runs he won't be able to attach the process since it 
>>>>>>>>>>> resides
>>>>>>>>>>> in a different physical machine, me thinks (well, now that I think 
>>>>>>>>>>> of it
>>>>>>>>>>> better it can be done... via remote debugging but that's just a 
>>>>>>>>>>> pain in the
>>>>>>>>>>> ***).
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>>
>>>>>>>>>>> A.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> You don't need to change your code. As Andrew mentioned you can
>>>>>>>>>>>> get a lot of mileage by profiling your logic in a standalone 
>>>>>>>>>>>> program. For
>>>>>>>>>>>> jvisualvm, you can just run your program (a loop that runs for a 
>>>>>>>>>>>> long time
>>>>>>>>>>>> is best) then attach to the running process with jvisualvm.  It's 
>>>>>>>>>>>> pretty
>>>>>>>>>>>> straightforward to use and you can also find good guides with a 
>>>>>>>>>>>> Google
>>>>>>>>>>>> search.
>>>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> ​
>>>>>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as
>>>>>>>>>>>>> Java does a lot for you. Generally though, as long as you avoid 
>>>>>>>>>>>>> using "new"
>>>>>>>>>>>>> operator and close any resources that you do not use you should 
>>>>>>>>>>>>> be fine...
>>>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you 
>>>>>>>>>>>>> the whole
>>>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test 
>>>>>>>>>>>>> drive it.
>>>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code 
>>>>>>>>>>>>> that has a
>>>>>>>>>>>>> main function in it) in order to profile it, although if you want 
>>>>>>>>>>>>> to
>>>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your 
>>>>>>>>>>>>> algorithm
>>>>>>>>>>>>> (or portions of it) can be put in a sample test program that is 
>>>>>>>>>>>>> able to be
>>>>>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> A.
>>>>>>>>>>>>> ​
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the
>>>>>>>>>>>>>>> computations and data-structures you have; in my case for 
>>>>>>>>>>>>>>> example I use
>>>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in memory... 
>>>>>>>>>>>>>>> having said
>>>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate 
>>>>>>>>>>>>>>> how much
>>>>>>>>>>>>>>> memory you are going to need and use that... or if that's not 
>>>>>>>>>>>>>>> possible you
>>>>>>>>>>>>>>> can just increase it and try the "set and see" approach. Check 
>>>>>>>>>>>>>>> for memory
>>>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ​A.​
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's
>>>>>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have been running a trident topology on production
>>>>>>>>>>>>>>>>>> server, code is like this:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>>>>>                 
>>>>>>>>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new 
>>>>>>>>>>>>>>>>>> Fields("eventType"), new EventUpdater(), new 
>>>>>>>>>>>>>>>>>> Fields("eventWord"))
>>>>>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>>>>>         
>>>>>>>>>>>>>>>>>> conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 
>>>>>>>>>>>>>>>>>> 1);
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, 
>>>>>>>>>>>>>>>>>> parse to different field and write into postgresDB. But in 
>>>>>>>>>>>>>>>>>> storm UI, I did see such error, "java.lang.OutOfMemoryError: 
>>>>>>>>>>>>>>>>>> GC overhead limit exceeded". It all happens in same worker 
>>>>>>>>>>>>>>>>>> of each node - 6703. I understand this is because by default 
>>>>>>>>>>>>>>>>>> the JVM is configured to throw this error if you are 
>>>>>>>>>>>>>>>>>> spending more than *98% of the total time in GC and after 
>>>>>>>>>>>>>>>>>> the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK 
>>>>>>>>>>>>>>>>>> by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way
>>>>>>>>>>>>>>>>>> to overcome?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Reply via email to