You are right , I have already increased the heap in yaml to 2 G for each worker, but still have the issue, so I doubt I may running into some other causes, receive,send buffer size? And in general, before I see the GC overhead in storm ui, I came cross other errors in worker log as well, like Netty connection, null pointer,etc, as I show in another post.
Thanks On Mar 9, 2015 5:36 PM, "Nathan Leung" <[email protected]> wrote: > I still think you should try running with a larger heap. :) Max spout > pending determines how many tuples can be pending (tuple tree is not fully > acked) per spout task. If you have many spout tasks per worker this can be > a large amount of memory. It also depends on how big your tuples are. > > On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <[email protected]> wrote: > >> Hi, Nathan >> >> We have played around max spout pending in dev, if we set it as 10, it is >> OK, but if we set it more than 50, GC overhead starts to come out. We are >> finally writing tuples into postgresqlDB, the highest speed for writing >> into DB is around 40Krecords/minute, which is supposed to be very slow, >> maybe that is why tuples getting accumulated in memory before dumped into >> DB. But I think 10 is too small, does that mean, only 10 tuples are allowed >> in the flight? >> >> thanks >> >> AL >> >> On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <[email protected]> wrote: >> >>> I've not modified netty so I can't comment on that. I would set max >>> spout pending; try 1000 at first. This will limit the number of tuples >>> that you can have in flight simultaneously and therefore limit the amount >>> of memory used by these tuples and their processing. >>> >>> On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <[email protected]> wrote: >>> >>>> Hi, Nathan >>>> >>>> THe log size of that kafka topic is 23515541, each record is about 3K, >>>> I check the yaml file, I don't have max spout pending set, so I >>>> assume it is should be default: topology.max.spout.pending: null >>>> >>>> Should I set it to a certain value? Also I sometimes seeing the >>>> java.nio.channels.ClosedChannelException: null, or b.s.d.worker >>>> [ERROR] Error on initialization of server mk-worker >>>> does this mean I should add >>>> storm.messaging.netty.server_worker_threads: 1 >>>> storm.messaging.netty.client_worker_threads: 1 >>>> storm.messaging.netty.buffer_size: 5242880 #5MB buffer >>>> storm.messaging.netty.max_retries: 30 >>>> storm.messaging.netty.max_wait_ms: 1000 >>>> storm.messaging.netty.min_wait_ms: 100 >>>> >>>> into yaml, and modfiy the values? >>>> >>>> >>>> >>>> thanks >>>> >>>> >>>> >>>> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <[email protected]> wrote: >>>> >>>>> How much data do you have in Kafka? How is your max spout pending set? >>>>> If you have a high max spout pending (or if you emit unanchored tuples) >>>>> you >>>>> could be using up a lot of memory. >>>>> On Mar 6, 2015 5:14 PM, "Sa Li" <[email protected]> wrote: >>>>> >>>>>> Hi, Nathan >>>>>> >>>>>> I have met a strange issue, when I set spoutConf.forceFromStart=true, >>>>>> it will quickly run into GC overhead limit, even I already increase the >>>>>> heap size, but I if I remove this setting >>>>>> it will work fine, I was thinking maybe the kafkaSpout consuming data >>>>>> much faster than the data being written into postgresDB, and data will >>>>>> quick take the memory and causing heap overflow. But I did the same test >>>>>> on >>>>>> my DEV cluster, it will working fine, even I set >>>>>> spoutConf.forceFromStart=true. I check the storm config for DEV and >>>>>> production, they are all same. >>>>>> >>>>>> Any hints? >>>>>> >>>>>> thanks >>>>>> >>>>>> AL >>>>>> >>>>>> >>>>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> I don't see anything glaring. I would try increasing heap size. It >>>>>>> could be that you're right on the threshold of what you've allocated and >>>>>>> you just need more memory. >>>>>>> >>>>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote: >>>>>>> >>>>>>>> Hi, All, >>>>>>>> , >>>>>>>> I kind locate where the problem come from, in my running command, I >>>>>>>> will specify the clientid of TridentKafkaConfig, if I keep the >>>>>>>> clientid as >>>>>>>> the one I used before, it will cause GC error, otherwise I am >>>>>>>> completely >>>>>>>> OK. Here is the code: >>>>>>>> >>>>>>>> if (parameters.containsKey("clientid")) { >>>>>>>> logger.info("topic=>" + parameters.get("clientid") + "/" + >>>>>>>> parameters.get("topic")); >>>>>>>> spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), >>>>>>>> parameters.get("clientid")); >>>>>>>> >>>>>>>> Any idea about this error? >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> >>>>>>>> AL >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Sorry, continue last thread: >>>>>>>>> >>>>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died! >>>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote >>>>>>>>> address is not reachable. We will close this client >>>>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703 >>>>>>>>> at >>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>>>>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>>>>>>>> Caused by: java.lang.RuntimeException: Remote address is not >>>>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/ >>>>>>>>> 10.100.98.103:6703 >>>>>>>>> at >>>>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) >>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3] >>>>>>>>> ... 6 common frames omitted >>>>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: >>>>>>>>> ("Async loop died!") >>>>>>>>> java.lang.RuntimeException: ("Async loop died!") >>>>>>>>> at >>>>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) >>>>>>>>> [storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at clojure.lang.RestFn.invoke(RestFn.java:423) >>>>>>>>> [clojure-1.5.1.jar:na] >>>>>>>>> at >>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92) >>>>>>>>> [storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at >>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) >>>>>>>>> [storm-core-0.9.3.jar:0.9.3] >>>>>>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>>>>>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >>>>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down >>>>>>>>> worker eventsStreamerv1-48-1425499636 >>>>>>>>> 0673ece0-cea2-4185-9b3e-6c49ad585576 >>>>>>>>> 6703 >>>>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty >>>>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703 >>>>>>>>> >>>>>>>>> I doubt this is caused by my eventUpfater, which write data in >>>>>>>>> batch >>>>>>>>> >>>>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> { >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public List<String> init(){ >>>>>>>>> return null; >>>>>>>>> } >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public List<String> reduce(List<String> curr, >>>>>>>>> TridentTuple tuple) { >>>>>>>>> List<String> updated = null ; >>>>>>>>> >>>>>>>>> if ( curr == null ) { >>>>>>>>> String event = (String) >>>>>>>>> tuple.getValue(1); >>>>>>>>> System.out.println("===:" + event >>>>>>>>> + ":"); >>>>>>>>> updated = >>>>>>>>> Lists.newArrayList(event); >>>>>>>>> } else { >>>>>>>>> System.out.println("===+" + >>>>>>>>> tuple + ":"); >>>>>>>>> updated = curr ; >>>>>>>>> } >>>>>>>>> // System.out.println("(())"); >>>>>>>>> return updated ; >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> How do you think >>>>>>>>> >>>>>>>>> THanks >>>>>>>>> >>>>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thank you very much for the reply, here is error I saw in >>>>>>>>>> production server worker-6703.log, >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in >>>>>>>>>>> the remote machines and run the tools over X or something. I'm >>>>>>>>>>> assuming >>>>>>>>>>> this is a development cluster (not live / production) and that >>>>>>>>>>> installing >>>>>>>>>>> debugging tools and running remote UIs etc is not a problem. :) >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that >>>>>>>>>>>> runs in a worker that resides in a different cluster node than the >>>>>>>>>>>> one the >>>>>>>>>>>> profiling tool runs he won't be able to attach the process since >>>>>>>>>>>> it resides >>>>>>>>>>>> in a different physical machine, me thinks (well, now that I think >>>>>>>>>>>> of it >>>>>>>>>>>> better it can be done... via remote debugging but that's just a >>>>>>>>>>>> pain in the >>>>>>>>>>>> ***). >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>> A. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected] >>>>>>>>>>>> > wrote: >>>>>>>>>>>> >>>>>>>>>>>>> You don't need to change your code. As Andrew mentioned you >>>>>>>>>>>>> can get a lot of mileage by profiling your logic in a standalone >>>>>>>>>>>>> program. >>>>>>>>>>>>> For jvisualvm, you can just run your program (a loop that runs >>>>>>>>>>>>> for a long >>>>>>>>>>>>> time is best) then attach to the running process with jvisualvm. >>>>>>>>>>>>> It's >>>>>>>>>>>>> pretty straightforward to use and you can also find good guides >>>>>>>>>>>>> with a >>>>>>>>>>>>> Google search. >>>>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Well... detecting memory leaks in Java is a bit tricky as >>>>>>>>>>>>>> Java does a lot for you. Generally though, as long as you avoid >>>>>>>>>>>>>> using "new" >>>>>>>>>>>>>> operator and close any resources that you do not use you should >>>>>>>>>>>>>> be fine... >>>>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell >>>>>>>>>>>>>> you the whole >>>>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and >>>>>>>>>>>>>> test drive it. >>>>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code >>>>>>>>>>>>>> that has a >>>>>>>>>>>>>> main function in it) in order to profile it, although if you >>>>>>>>>>>>>> want to >>>>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your >>>>>>>>>>>>>> algorithm >>>>>>>>>>>>>> (or portions of it) can be put in a sample test program that is >>>>>>>>>>>>>> able to be >>>>>>>>>>>>>> executed locally for you to profile it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hope this helped. Regards, >>>>>>>>>>>>>> >>>>>>>>>>>>>> A. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the >>>>>>>>>>>>>>>> computations and data-structures you have; in my case for >>>>>>>>>>>>>>>> example I use >>>>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in memory... >>>>>>>>>>>>>>>> having said >>>>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate >>>>>>>>>>>>>>>> how much >>>>>>>>>>>>>>>> memory you are going to need and use that... or if that's not >>>>>>>>>>>>>>>> possible you >>>>>>>>>>>>>>>> can just increase it and try the "set and see" approach. Check >>>>>>>>>>>>>>>> for memory >>>>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> A. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected] >>>>>>>>>>>>>>>> > wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's >>>>>>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i >>>>>>>>>>>>>>>>>> worker.childopts. >>>>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, All >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I have been running a trident topology on production >>>>>>>>>>>>>>>>>>> server, code is like this: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout) >>>>>>>>>>>>>>>>>>> .each(new Fields("str"), >>>>>>>>>>>>>>>>>>> new JsonObjectParse(), >>>>>>>>>>>>>>>>>>> new Fields("eventType", "event")) >>>>>>>>>>>>>>>>>>> .parallelismHint(pHint) >>>>>>>>>>>>>>>>>>> .groupBy(new Fields("event")) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), >>>>>>>>>>>>>>>>>>> new Fields("eventType"), new EventUpdater(), new >>>>>>>>>>>>>>>>>>> Fields("eventWord")) >>>>>>>>>>>>>>>>>>> ; >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Config conf = new Config(); >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> conf.registerMetricsConsumer(LoggingMetricsConsumer.class, >>>>>>>>>>>>>>>>>>> 1); >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, >>>>>>>>>>>>>>>>>>> parse to different field and write into postgresDB. But in >>>>>>>>>>>>>>>>>>> storm UI, I did see such error, >>>>>>>>>>>>>>>>>>> "java.lang.OutOfMemoryError: GC overhead limit exceeded". >>>>>>>>>>>>>>>>>>> It all happens in same worker of each node - 6703. I >>>>>>>>>>>>>>>>>>> understand this is because by default the JVM is configured >>>>>>>>>>>>>>>>>>> to throw this error if you are spending more than *98% of >>>>>>>>>>>>>>>>>>> the total time in GC and after the GC less than 2% of the >>>>>>>>>>>>>>>>>>> heap is recovered*. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK >>>>>>>>>>>>>>>>>>> by simply increase the heap? Here is my storm.yaml: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> supervisor.slots.ports: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - 6700 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - 6701 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - 6702 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - 6703 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m >>>>>>>>>>>>>>>>>>> -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way >>>>>>>>>>>>>>>>>>> to overcome? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> thanks in advance >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> AL >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >> >
