I still think you should try running with a larger heap. :) Max spout pending determines how many tuples can be pending (tuple tree is not fully acked) per spout task. If you have many spout tasks per worker this can be a large amount of memory. It also depends on how big your tuples are.
On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <[email protected]> wrote: > Hi, Nathan > > We have played around max spout pending in dev, if we set it as 10, it is > OK, but if we set it more than 50, GC overhead starts to come out. We are > finally writing tuples into postgresqlDB, the highest speed for writing > into DB is around 40Krecords/minute, which is supposed to be very slow, > maybe that is why tuples getting accumulated in memory before dumped into > DB. But I think 10 is too small, does that mean, only 10 tuples are allowed > in the flight? > > thanks > > AL > > On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <[email protected]> wrote: > >> I've not modified netty so I can't comment on that. I would set max >> spout pending; try 1000 at first. This will limit the number of tuples >> that you can have in flight simultaneously and therefore limit the amount >> of memory used by these tuples and their processing. >> >> On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <[email protected]> wrote: >> >>> Hi, Nathan >>> >>> THe log size of that kafka topic is 23515541, each record is about 3K, >>> I check the yaml file, I don't have max spout pending set, so I assume >>> it is should be default: topology.max.spout.pending: null >>> >>> Should I set it to a certain value? Also I sometimes seeing the >>> java.nio.channels.ClosedChannelException: null, or b.s.d.worker >>> [ERROR] Error on initialization of server mk-worker >>> does this mean I should add >>> storm.messaging.netty.server_worker_threads: 1 >>> storm.messaging.netty.client_worker_threads: 1 >>> storm.messaging.netty.buffer_size: 5242880 #5MB buffer >>> storm.messaging.netty.max_retries: 30 >>> storm.messaging.netty.max_wait_ms: 1000 >>> storm.messaging.netty.min_wait_ms: 100 >>> >>> into yaml, and modfiy the values? >>> >>> >>> >>> thanks >>> >>> >>> >>> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <[email protected]> wrote: >>> >>>> How much data do you have in Kafka? How is your max spout pending set? >>>> If you have a high max spout pending (or if you emit unanchored tuples) you >>>> could be using up a lot of memory. >>>> On Mar 6, 2015 5:14 PM, "Sa Li" <[email protected]> wrote: >>>> >>>>> Hi, Nathan >>>>> >>>>> I have met a strange issue, when I set spoutConf.forceFromStart=true, >>>>> it will quickly run into GC overhead limit, even I already increase the >>>>> heap size, but I if I remove this setting >>>>> it will work fine, I was thinking maybe the kafkaSpout consuming data >>>>> much faster than the data being written into postgresDB, and data will >>>>> quick take the memory and causing heap overflow. But I did the same test >>>>> on >>>>> my DEV cluster, it will working fine, even I set >>>>> spoutConf.forceFromStart=true. I check the storm config for DEV and >>>>> production, they are all same. >>>>> >>>>> Any hints? >>>>> >>>>> thanks >>>>> >>>>> AL >>>>> >>>>> >>>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]> >>>>> wrote: >>>>> >>>>>> I don't see anything glaring. I would try increasing heap size. It >>>>>> could be that you're right on the threshold of what you've allocated and >>>>>> you just need more memory. >>>>>> >>>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote: >>>>>> >>>>>>> Hi, All, >>>>>>> , >>>>>>> I kind locate where the problem come from, in my running command, I >>>>>>> will specify the clientid of TridentKafkaConfig, if I keep the clientid >>>>>>> as >>>>>>> the one I used before, it will cause GC error, otherwise I am completely >>>>>>> OK. Here is the code: >>>>>>> >>>>>>> if (parameters.containsKey("clientid")) { >>>>>>> logger.info("topic=>" + parameters.get("clientid") + "/" + >>>>>>> parameters.get("topic")); >>>>>>> spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), >>>>>>> parameters.get("clientid")); >>>>>>> >>>>>>> Any idea about this error? >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> >>>>>>> AL >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> wrote: >>>>>>> >>>>>>>> Sorry, continue last thread: >>>>>>>> >>>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died! >>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote >>>>>>>> address is not reachable. We will close this client >>>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703 >>>>>>>> at >>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>>>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>>>>>>> Caused by: java.lang.RuntimeException: Remote address is not >>>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/ >>>>>>>> 10.100.98.103:6703 >>>>>>>> at >>>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>> ... 6 common frames omitted >>>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: >>>>>>>> ("Async loop died!") >>>>>>>> java.lang.RuntimeException: ("Async loop died!") >>>>>>>> at >>>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) >>>>>>>> [storm-core-0.9.3.jar:0.9.3] >>>>>>>> at clojure.lang.RestFn.invoke(RestFn.java:423) >>>>>>>> [clojure-1.5.1.jar:na] >>>>>>>> at >>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92) >>>>>>>> [storm-core-0.9.3.jar:0.9.3] >>>>>>>> at >>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) >>>>>>>> [storm-core-0.9.3.jar:0.9.3] >>>>>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>>>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down >>>>>>>> worker eventsStreamerv1-48-1425499636 >>>>>>>> 0673ece0-cea2-4185-9b3e-6c49ad585576 >>>>>>>> 6703 >>>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty >>>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703 >>>>>>>> >>>>>>>> I doubt this is caused by my eventUpfater, which write data in batch >>>>>>>> >>>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> { >>>>>>>> >>>>>>>> @Override >>>>>>>> public List<String> init(){ >>>>>>>> return null; >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public List<String> reduce(List<String> curr, TridentTuple >>>>>>>> tuple) { >>>>>>>> List<String> updated = null ; >>>>>>>> >>>>>>>> if ( curr == null ) { >>>>>>>> String event = (String) >>>>>>>> tuple.getValue(1); >>>>>>>> System.out.println("===:" + event >>>>>>>> + ":"); >>>>>>>> updated = >>>>>>>> Lists.newArrayList(event); >>>>>>>> } else { >>>>>>>> System.out.println("===+" + tuple >>>>>>>> + ":"); >>>>>>>> updated = curr ; >>>>>>>> } >>>>>>>> // System.out.println("(())"); >>>>>>>> return updated ; >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> How do you think >>>>>>>> >>>>>>>> THanks >>>>>>>> >>>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thank you very much for the reply, here is error I saw in >>>>>>>>> production server worker-6703.log, >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in >>>>>>>>>> the remote machines and run the tools over X or something. I'm >>>>>>>>>> assuming >>>>>>>>>> this is a development cluster (not live / production) and that >>>>>>>>>> installing >>>>>>>>>> debugging tools and running remote UIs etc is not a problem. :) >>>>>>>>>> >>>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that >>>>>>>>>>> runs in a worker that resides in a different cluster node than the >>>>>>>>>>> one the >>>>>>>>>>> profiling tool runs he won't be able to attach the process since it >>>>>>>>>>> resides >>>>>>>>>>> in a different physical machine, me thinks (well, now that I think >>>>>>>>>>> of it >>>>>>>>>>> better it can be done... via remote debugging but that's just a >>>>>>>>>>> pain in the >>>>>>>>>>> ***). >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> >>>>>>>>>>> A. >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> You don't need to change your code. As Andrew mentioned you can >>>>>>>>>>>> get a lot of mileage by profiling your logic in a standalone >>>>>>>>>>>> program. For >>>>>>>>>>>> jvisualvm, you can just run your program (a loop that runs for a >>>>>>>>>>>> long time >>>>>>>>>>>> is best) then attach to the running process with jvisualvm. It's >>>>>>>>>>>> pretty >>>>>>>>>>>> straightforward to use and you can also find good guides with a >>>>>>>>>>>> Google >>>>>>>>>>>> search. >>>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Well... detecting memory leaks in Java is a bit tricky as >>>>>>>>>>>>> Java does a lot for you. Generally though, as long as you avoid >>>>>>>>>>>>> using "new" >>>>>>>>>>>>> operator and close any resources that you do not use you should >>>>>>>>>>>>> be fine... >>>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you >>>>>>>>>>>>> the whole >>>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test >>>>>>>>>>>>> drive it. >>>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code >>>>>>>>>>>>> that has a >>>>>>>>>>>>> main function in it) in order to profile it, although if you want >>>>>>>>>>>>> to >>>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your >>>>>>>>>>>>> algorithm >>>>>>>>>>>>> (or portions of it) can be put in a sample test program that is >>>>>>>>>>>>> able to be >>>>>>>>>>>>> executed locally for you to profile it. >>>>>>>>>>>>> >>>>>>>>>>>>> Hope this helped. Regards, >>>>>>>>>>>>> >>>>>>>>>>>>> A. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the >>>>>>>>>>>>>>> computations and data-structures you have; in my case for >>>>>>>>>>>>>>> example I use >>>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in memory... >>>>>>>>>>>>>>> having said >>>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate >>>>>>>>>>>>>>> how much >>>>>>>>>>>>>>> memory you are going to need and use that... or if that's not >>>>>>>>>>>>>>> possible you >>>>>>>>>>>>>>> can just increase it and try the "set and see" approach. Check >>>>>>>>>>>>>>> for memory >>>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> A. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's >>>>>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i >>>>>>>>>>>>>>>>> worker.childopts. >>>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, All >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I have been running a trident topology on production >>>>>>>>>>>>>>>>>> server, code is like this: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout) >>>>>>>>>>>>>>>>>> .each(new Fields("str"), >>>>>>>>>>>>>>>>>> new JsonObjectParse(), >>>>>>>>>>>>>>>>>> new Fields("eventType", "event")) >>>>>>>>>>>>>>>>>> .parallelismHint(pHint) >>>>>>>>>>>>>>>>>> .groupBy(new Fields("event")) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new >>>>>>>>>>>>>>>>>> Fields("eventType"), new EventUpdater(), new >>>>>>>>>>>>>>>>>> Fields("eventWord")) >>>>>>>>>>>>>>>>>> ; >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Config conf = new Config(); >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> conf.registerMetricsConsumer(LoggingMetricsConsumer.class, >>>>>>>>>>>>>>>>>> 1); >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, >>>>>>>>>>>>>>>>>> parse to different field and write into postgresDB. But in >>>>>>>>>>>>>>>>>> storm UI, I did see such error, "java.lang.OutOfMemoryError: >>>>>>>>>>>>>>>>>> GC overhead limit exceeded". It all happens in same worker >>>>>>>>>>>>>>>>>> of each node - 6703. I understand this is because by default >>>>>>>>>>>>>>>>>> the JVM is configured to throw this error if you are >>>>>>>>>>>>>>>>>> spending more than *98% of the total time in GC and after >>>>>>>>>>>>>>>>>> the GC less than 2% of the heap is recovered*. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK >>>>>>>>>>>>>>>>>> by simply increase the heap? Here is my storm.yaml: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> supervisor.slots.ports: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - 6700 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - 6701 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - 6702 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - 6703 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way >>>>>>>>>>>>>>>>>> to overcome? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> thanks in advance >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> AL >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> >
