Not sure if you fixed the issue but I think the problem may come from the max spout pending. You are using Trident and this value is the max number of pending "BATCHES" and not number of "tuples" so let say your topic has 10 partitions and max spout pending is set to 10 and the max fetch size is set to 1MB then you will have 10*10*1 = 100MB input data in your topology at a moment and this will blow up your heap really quick. I think go with max spout pending set to 1 and then tune it is better way to go.
Hope this help -Binh On Tue, Mar 10, 2015 at 3:56 AM, Brunner, Bill <[email protected]> wrote: > Once you’ve profiled your app, you should also play around with > different garbage collectors. Considering you’re reaching max heap, I > assume your tuples are probably pretty large. If that’s the case and > you’re using the CMS garbage collector, you’re going to blow out your heap > regularly. I found with large tuples and/or memory intensive computations > that the old parallel GC works the best because it compresses old gen every > time it collects… CMS doesn’t and each sweep it tries to jam more into the > heap until it can’t any longer and then blows up. There is also a great > article by Michael Knoll about storm’s message buffers and how to tweak > them depending on your needs. > http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/ > > > > > > > > *From:* Sa Li [mailto:[email protected]] > *Sent:* Monday, March 09, 2015 10:15 PM > *To:* [email protected] > *Subject:* Re: java.lang.OutOfMemoryError: GC overhead limit exceeded > > > > I have not done that yet, not quite familiar with this, but I will try to > do that tomorrow, thanks. > > On Mar 9, 2015 7:10 PM, "Nathan Leung" <[email protected]> wrote: > > Have you profiled you spout / bolt logic as recommended earlier in this > thread? > > > > On Mon, Mar 9, 2015 at 9:49 PM, Sa Li <[email protected]> wrote: > > You are right , I have already increased the heap in yaml to 2 G for each > worker, but still have the issue, so I doubt I may running into some other > causes, receive,send buffer size? And in general, before I see the GC > overhead in storm ui, I came cross other errors in worker log as well, > like Netty connection, null pointer,etc, as I show in another post. > > Thanks > > On Mar 9, 2015 5:36 PM, "Nathan Leung" <[email protected]> wrote: > > I still think you should try running with a larger heap. :) Max spout > pending determines how many tuples can be pending (tuple tree is not fully > acked) per spout task. If you have many spout tasks per worker this can be > a large amount of memory. It also depends on how big your tuples are. > > > > On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <[email protected]> wrote: > > Hi, Nathan > > > > We have played around max spout pending in dev, if we set it as 10, it is > OK, but if we set it more than 50, GC overhead starts to come out. We are > finally writing tuples into postgresqlDB, the highest speed for writing > into DB is around 40Krecords/minute, which is supposed to be very slow, > maybe that is why tuples getting accumulated in memory before dumped into > DB. But I think 10 is too small, does that mean, only 10 tuples are allowed > in the flight? > > > > thanks > > > > AL > > > > On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <[email protected]> wrote: > > I've not modified netty so I can't comment on that. I would set max spout > pending; try 1000 at first. This will limit the number of tuples that you > can have in flight simultaneously and therefore limit the amount of memory > used by these tuples and their processing. > > > > On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <[email protected]> wrote: > > Hi, Nathan > > > > THe log size of that kafka topic is 23515541, each record is about 3K, I > check the yaml file, I don't have max spout pending set, so I assume it > is should be default: topology.max.spout.pending: null > > > > Should I set it to a certain value? Also I sometimes seeing the > java.nio.channels.ClosedChannelException: > null, or b.s.d.worker [ERROR] Error on initialization of server mk-worker > > does this mean I should add > > storm.messaging.netty.server_worker_threads: 1 > > > > storm.messaging.netty.client_worker_threads: 1 > > storm.messaging.netty.buffer_size: 5242880 #5MB buffer > > storm.messaging.netty.max_retries: 30 > > storm.messaging.netty.max_wait_ms: 1000 > > > > storm.messaging.netty.min_wait_ms: 100 > > into yaml, and modfiy the values? > > > > > > thanks > > > > > > > > On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <[email protected]> wrote: > > How much data do you have in Kafka? How is your max spout pending set? If > you have a high max spout pending (or if you emit unanchored tuples) you > could be using up a lot of memory. > > On Mar 6, 2015 5:14 PM, "Sa Li" <[email protected]> wrote: > > Hi, Nathan > > > > I have met a strange issue, when I set spoutConf.forceFromStart=true, it > will quickly run into GC overhead limit, even I already increase the heap > size, but I if I remove this setting > > it will work fine, I was thinking maybe the kafkaSpout consuming data much > faster than the data being written into postgresDB, and data will quick > take the memory and causing heap overflow. But I did the same test on my > DEV cluster, it will working fine, even I set > spoutConf.forceFromStart=true. I check the storm config for DEV and > production, they are all same. > > > > Any hints? > > > > thanks > > > > AL > > > > > > On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]> wrote: > > I don't see anything glaring. I would try increasing heap size. It could > be that you're right on the threshold of what you've allocated and you just > need more memory. > > > > On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote: > > Hi, All, > > , > > I kind locate where the problem come from, in my running command, I will > specify the clientid of TridentKafkaConfig, if I keep the clientid as the > one I used before, it will cause GC error, otherwise I am completely OK. > Here is the code: > > > > *if *(parameters.containsKey(*"clientid"*)) { > *logger*.info(*"topic=>" *+ parameters.get(*"clientid"*) + *"/" *+ > parameters.get(*"topic"*)); > spoutConf = *new *TridentKafkaConfig(zk, parameters.get(*"topic"*), > parameters.get(*"clientid"*)); > > Any idea about this error? > > > > Thanks > > > > AL > > > > On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> wrote: > > Sorry, continue last thread: > > > > 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died! > > java.lang.RuntimeException: java.lang.RuntimeException: Remote address is > not reachable. We will close this client Netty-Client-complicated-laugh/ > 10.100.98.103:6703 > > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) > ~[storm-core-0.9.3.jar:0.9.3] > > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) > ~[storm-core-0.9.3.jar:0.9.3] > > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) > ~[storm-core-0.9.3.jar:0.9.3] > > at > backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) > ~[storm-core-0.9.3.jar:0.9.3] > > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) > ~[storm-core-0.9.3.jar:0.9.3] > > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] > > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] > > Caused by: java.lang.RuntimeException: Remote address is not reachable. We > will close this client Netty-Client-complicated-laugh/10.100.98.103:6703 > > at backtype.storm.messaging.netty.Client.connect(Client.java:171) > ~[storm-core-0.9.3.jar:0.9.3] > > at backtype.storm.messaging.netty.Client.send(Client.java:194) > ~[storm-core-0.9.3.jar:0.9.3] > > at > backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) > ~[storm-core-0.9.3.jar:0.9.3] > > at > backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330) > ~[storm-core-0.9.3.jar:0.9.3] > > at > backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328) > ~[storm-core-0.9.3.jar:0.9.3] > > at > backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) > ~[storm-core-0.9.3.jar:0.9.3] > > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) > ~[storm-core-0.9.3.jar:0.9.3] > > ... 6 common frames omitted > > 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async > loop died!") > > java.lang.RuntimeException: ("Async loop died!") > > at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) > [storm-core-0.9.3.jar:0.9.3] > > at clojure.lang.RestFn.invoke(RestFn.java:423) > [clojure-1.5.1.jar:na] > > at > backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92) > [storm-core-0.9.3.jar:0.9.3] > > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) > [storm-core-0.9.3.jar:0.9.3] > > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] > > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] > > 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker > eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703 > > 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client > Netty-Client-beloved-judge/10.100.98.104:6703 > > > > I doubt this is caused by my eventUpfater, which write data in batch > > > > *static class *EventUpdater *implements *ReducerAggregator<List<String>> { > > @Override > *public *List<String> init(){ > *return null*; > } > > @Override > *public *List<String> reduce(List<String> curr, TridentTuple > tuple) { > List<String> updated = *null *; > > *if *( curr == *null *) { > String event = (String) tuple.getValue(1); > System.*out*.println(*"===:" *+ event + > *":"*); > updated = Lists.*newArrayList*(event); > } *else *{ > System.*out*.println(*"===+" *+ tuple + > *":"*); > updated = curr ; > } > > *// System.out.println("(())"); **return *updated ; > } > } > > How do you think > > > > THanks > > > > On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> wrote: > > Thank you very much for the reply, here is error I saw in production > server worker-6703.log, > > > > > > On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]> wrote: > > Yeah, then in this case maybe you can install JDK / Yourkit in the remote > machines and run the tools over X or something. I'm assuming this is a > development cluster (not live / production) and that installing debugging > tools and running remote UIs etc is not a problem. :) > > > > On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <[email protected]> > wrote: > > Nathan I think that if he wants to profile a bolt per se that runs in a > worker that resides in a different cluster node than the one the profiling > tool runs he won't be able to attach the process since it resides in a > different physical machine, me thinks (well, now that I think of it better > it can be done... via remote debugging but that's just a pain in the ***). > > Regards, > > A. > > > > On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]> wrote: > > You don't need to change your code. As Andrew mentioned you can get a lot > of mileage by profiling your logic in a standalone program. For jvisualvm, > you can just run your program (a loop that runs for a long time is best) > then attach to the running process with jvisualvm. It's pretty > straightforward to use and you can also find good guides with a Google > search. > > On Mar 5, 2015 1:43 PM, "Andrew Xor" <[email protected]> wrote: > > > > Well... detecting memory leaks in Java is a bit tricky as Java does a lot > for you. Generally though, as long as you avoid using "new" operator and > close any resources that you do not use you should be fine... but a > Profiler such as the ones mentioned by Nathan will tell you the whole > truth. YourKit is awesome and has a free trial, go ahead and test drive it. > I am pretty sure that you need a working jar (or compilable code that has a > main function in it) in order to profile it, although if you want to > profile your bolts and spouts is a bit tricker. Hopefully your algorithm > (or portions of it) can be put in a sample test program that is able to be > executed locally for you to profile it. > > Hope this helped. Regards, > > A. > > > > > > On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]> wrote: > > > > On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <[email protected]> > wrote: > > Unfortunately that is not fixed, it depends on the computations and > data-structures you have; in my case for example I use more than 2GB since > I need to keep a large matrix in memory... having said that, in most cases > it should be relatively easy to estimate how much memory you are going to > need and use that... or if that's not possible you can just increase it and > try the "set and see" approach. Check for memory leaks as well... (unclosed > resources and so on...!) > > Regards. > > > > A. > > > > On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]> wrote: > > Thanks, Nathan. How much is should be in general? > > > > On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <[email protected]> wrote: > > Your worker is allocated a maximum of 768mb of heap. It's quite possible > that this is not enough. Try increasing Xmx i worker.childopts. > > On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> wrote: > > Hi, All > > > > I have been running a trident topology on production server, code is like > this: > > > > topology.newStream("spoutInit", kafkaSpout) > .each(new Fields("str"), > new JsonObjectParse(), > new Fields("eventType", "event")) > .parallelismHint(pHint) > .groupBy(new Fields("event")) > .persistentAggregate(PostgresqlState.newFactory(config), new > Fields("eventType"), new EventUpdater(), new Fields("eventWord")) > ; > > Config conf = new Config(); > conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1); > > Basically, it does simple things to get data from kafka, parse to different > field and write into postgresDB. But in storm UI, I did see such error, > "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in > same worker of each node - 6703. I understand this is because by default the > JVM is configured to throw this error if you are spending more than *98% of > the total time in GC and after the GC less than 2% of the heap is recovered*. > > I am not sure what is exact cause for memory leak, is it OK by simply > increase the heap? Here is my storm.yaml: > > supervisor.slots.ports: > > - 6700 > > - 6701 > > - 6702 > > - 6703 > > nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true" > > ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" > > supervisor.childopts: "-Djava.net.preferIPv4Stack=true" > > worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true" > > > > Anyone has similar issues, and what will be the best way to overcome? > > > > > > thanks in advance > > > > AL > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ------------------------------ > This message, and any attachments, is for the intended recipient(s) only, > may contain information that is privileged, confidential and/or proprietary > and subject to important terms and conditions available at > http://www.bankofamerica.com/emaildisclaimer. If you are not the intended > recipient, please delete this message. >
