Hi, Nathan

THe log size of that kafka topic is 23515541, each record is about 3K,  I
check the yaml file, I don't have max spout pending set, so I assume it is
should be default: topology.max.spout.pending: null

Should I set it to a certain value? Also I sometimes seeing the
java.nio.channels.ClosedChannelException: null, or  b.s.d.worker [ERROR]
Error on initialization of server mk-worker
does this mean I should add
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880 #5MB buffer
storm.messaging.netty.max_retries: 30  storm.messaging.netty.max_wait_ms:
1000  storm.messaging.netty.min_wait_ms: 100

into yaml, and modfiy the values?



thanks



On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <[email protected]> wrote:

> How much data do you have in Kafka? How is your max spout pending set? If
> you have a high max spout pending (or if you emit unanchored tuples) you
> could be using up a lot of memory.
> On Mar 6, 2015 5:14 PM, "Sa Li" <[email protected]> wrote:
>
>> Hi, Nathan
>>
>> I have met a strange issue, when I set spoutConf.forceFromStart=true, it
>> will quickly run into GC overhead limit, even I already increase the heap
>> size, but I if I remove this setting
>> it will work fine, I was thinking maybe the kafkaSpout consuming data
>> much faster than the data being written into postgresDB, and data will
>> quick take the memory and causing heap overflow. But I did the same test on
>> my DEV cluster, it will working fine, even I set
>> spoutConf.forceFromStart=true. I check the storm config for DEV and
>> production, they are all same.
>>
>> Any hints?
>>
>> thanks
>>
>> AL
>>
>>
>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <[email protected]> wrote:
>>
>>> I don't see anything glaring.  I would try increasing heap size.  It
>>> could be that you're right on the threshold of what you've allocated and
>>> you just need more memory.
>>>
>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <[email protected]> wrote:
>>>
>>>> Hi, All,
>>>> ,
>>>> I kind locate where the problem come from, in my running command, I
>>>> will specify the clientid of TridentKafkaConfig, if I keep the clientid as
>>>> the one I used before, it will cause GC error, otherwise I am completely
>>>> OK. Here is the code:
>>>>
>>>> if (parameters.containsKey("clientid")) {
>>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + 
>>>> parameters.get("topic"));
>>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), 
>>>> parameters.get("clientid"));
>>>>
>>>> Any idea about this error?
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>> AL
>>>>
>>>>
>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> wrote:
>>>>
>>>>> Sorry, continue last thread:
>>>>>
>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote address
>>>>> is not reachable. We will close this client 
>>>>> Netty-Client-complicated-laugh/
>>>>> 10.100.98.103:6703
>>>>>         at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>> Caused by: java.lang.RuntimeException: Remote address is not
>>>>> reachable. We will close this client Netty-Client-complicated-laugh/
>>>>> 10.100.98.103:6703
>>>>>         at
>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         ... 6 common frames omitted
>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async
>>>>> loop died!")
>>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>>         at
>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>>> [clojure-1.5.1.jar:na]
>>>>>         at
>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
>>>>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty
>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703
>>>>>
>>>>> I doubt this is caused by my eventUpfater, which write data in batch
>>>>>
>>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>>
>>>>>             @Override
>>>>>             public List<String> init(){
>>>>>                      return null;
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public List<String> reduce(List<String> curr, TridentTuple 
>>>>> tuple) {
>>>>>                    List<String> updated = null ;
>>>>>
>>>>>                    if ( curr == null ) {
>>>>>                                     String event = (String) 
>>>>> tuple.getValue(1);
>>>>>                                     System.out.println("===:" + event + 
>>>>> ":");
>>>>>                                     updated = Lists.newArrayList(event);
>>>>>                    } else {
>>>>>                                     System.out.println("===+" +  tuple + 
>>>>> ":");
>>>>>                                     updated = curr ;
>>>>>                    }
>>>>> //              System.out.println("(())");
>>>>>               return updated ;
>>>>>             }
>>>>>         }
>>>>>
>>>>> How do you think
>>>>>
>>>>> THanks
>>>>>
>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> wrote:
>>>>>
>>>>>> Thank you very much for the reply, here is error I saw in production
>>>>>> server worker-6703.log,
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>>>>>> remote machines and run the tools over X or something.  I'm assuming 
>>>>>>> this
>>>>>>> is a development cluster (not live / production) and that installing
>>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Nathan I think that if he wants to profile a bolt per se that runs
>>>>>>>> in a worker that resides in a different cluster node than the one the
>>>>>>>> profiling tool runs he won't be able to attach the process since it 
>>>>>>>> resides
>>>>>>>> in a different physical machine, me thinks (well, now that I think of 
>>>>>>>> it
>>>>>>>> better it can be done... via remote debugging but that's just a pain 
>>>>>>>> in the
>>>>>>>> ***).
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> A.
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> You don't need to change your code. As Andrew mentioned you can
>>>>>>>>> get a lot of mileage by profiling your logic in a standalone program. 
>>>>>>>>> For
>>>>>>>>> jvisualvm, you can just run your program (a loop that runs for a long 
>>>>>>>>> time
>>>>>>>>> is best) then attach to the running process with jvisualvm.  It's 
>>>>>>>>> pretty
>>>>>>>>> straightforward to use and you can also find good guides with a Google
>>>>>>>>> search.
>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> ​
>>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java
>>>>>>>>>> does a lot for you. Generally though, as long as you avoid using 
>>>>>>>>>> "new"
>>>>>>>>>> operator and close any resources that you do not use you should be 
>>>>>>>>>> fine...
>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you 
>>>>>>>>>> the whole
>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test 
>>>>>>>>>> drive it.
>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code 
>>>>>>>>>> that has a
>>>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your 
>>>>>>>>>> algorithm
>>>>>>>>>> (or portions of it) can be put in a sample test program that is able 
>>>>>>>>>> to be
>>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>>
>>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>>
>>>>>>>>>> A.
>>>>>>>>>> ​
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the computations
>>>>>>>>>>>> and data-structures you have; in my case for example I use more 
>>>>>>>>>>>> than 2GB
>>>>>>>>>>>> since I need to keep a large matrix in memory... having said that, 
>>>>>>>>>>>> in most
>>>>>>>>>>>> cases it should be relatively easy to estimate how much memory you 
>>>>>>>>>>>> are
>>>>>>>>>>>> going to need and use that... or if that's not possible you can 
>>>>>>>>>>>> just
>>>>>>>>>>>> increase it and try the "set and see" approach. Check for memory 
>>>>>>>>>>>> leaks as
>>>>>>>>>>>> well... (unclosed resources and so on...!)
>>>>>>>>>>>>
>>>>>>>>>>>> Regards.
>>>>>>>>>>>>
>>>>>>>>>>>> ​A.​
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's
>>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have been running a trident topology on production server,
>>>>>>>>>>>>>>> code is like this:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>>                 
>>>>>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new 
>>>>>>>>>>>>>>> Fields("eventType"), new EventUpdater(), new 
>>>>>>>>>>>>>>> Fields("eventWord"))
>>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>>         
>>>>>>>>>>>>>>> conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse 
>>>>>>>>>>>>>>> to different field and write into postgresDB. But in storm UI, 
>>>>>>>>>>>>>>> I did see such error, "java.lang.OutOfMemoryError: GC overhead 
>>>>>>>>>>>>>>> limit exceeded". It all happens in same worker of each node - 
>>>>>>>>>>>>>>> 6703. I understand this is because by default the JVM is 
>>>>>>>>>>>>>>> configured to throw this error if you are spending more than 
>>>>>>>>>>>>>>> *98% of the total time in GC and after the GC less than 2% of 
>>>>>>>>>>>>>>> the heap is recovered*.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by 
>>>>>>>>>>>>>>> simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>>>>>>> overcome?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to