Sorry, continue last thread:

2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Remote address is
not reachable. We will close this client Netty-Client-complicated-laugh/
10.100.98.103:6703
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
~[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
Caused by: java.lang.RuntimeException: Remote address is not reachable. We
will close this client Netty-Client-complicated-laugh/10.100.98.103:6703
        at backtype.storm.messaging.netty.Client.connect(Client.java:171)
~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.messaging.netty.Client.send(Client.java:194)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
~[storm-core-0.9.3.jar:0.9.3]
        ... 6 common frames omitted
2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async loop
died!")
java.lang.RuntimeException: ("Async loop died!")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.RestFn.invoke(RestFn.java:423)
[clojure-1.5.1.jar:na]
        at
backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client
Netty-Client-beloved-judge/10.100.98.104:6703

I doubt this is caused by my eventUpfater, which write data in batch

static class EventUpdater implements ReducerAggregator<List<String>> {

            @Override
            public List<String> init(){
                     return null;
            }

            @Override
            public List<String> reduce(List<String> curr, TridentTuple tuple) {
                   List<String> updated = null ;

                   if ( curr == null ) {
                                    String event = (String) tuple.getValue(1);
                                    System.out.println("===:" + event + ":");
                                    updated = Lists.newArrayList(event);
                   } else {
                                    System.out.println("===+" +  tuple + ":");
                                    updated = curr ;
                   }
//              System.out.println("(())");
              return updated ;
            }
        }

How do you think

THanks

On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> wrote:

> Thank you very much for the reply, here is error I saw in production
> server worker-6703.log,
>
>
> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]> wrote:
>
>> Yeah, then in this case maybe you can install JDK / Yourkit in the remote
>> machines and run the tools over X or something.  I'm assuming this is a
>> development cluster (not live / production) and that installing debugging
>> tools and running remote UIs etc is not a problem.  :)
>>
>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <[email protected]>
>> wrote:
>>
>>> Nathan I think that if he wants to profile a bolt per se that runs in a
>>> worker that resides in a different cluster node than the one the profiling
>>> tool runs he won't be able to attach the process since it resides in a
>>> different physical machine, me thinks (well, now that I think of it better
>>> it can be done... via remote debugging but that's just a pain in the ***).
>>>
>>> Regards,
>>>
>>> A.
>>>
>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]> wrote:
>>>
>>>> You don't need to change your code. As Andrew mentioned you can get a
>>>> lot of mileage by profiling your logic in a standalone program. For
>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>> straightforward to use and you can also find good guides with a Google
>>>> search.
>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <[email protected]>
>>>> wrote:
>>>>
>>>>> ​
>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java does a
>>>>> lot for you. Generally though, as long as you avoid using "new" operator
>>>>> and close any resources that you do not use you should be fine... but a
>>>>> Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive 
>>>>> it.
>>>>> I am pretty sure that you need a working jar (or compilable code that has 
>>>>> a
>>>>> main function in it) in order to profile it, although if you want to
>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>> executed locally for you to profile it.
>>>>>
>>>>> Hope this helped. Regards,
>>>>>
>>>>> A.
>>>>> ​
>>>>>
>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]> wrote:
>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Unfortunately that is not fixed, it depends on the computations and
>>>>>>> data-structures you have; in my case for example I use more than 2GB 
>>>>>>> since
>>>>>>> I need to keep a large matrix in memory... having said that, in most 
>>>>>>> cases
>>>>>>> it should be relatively easy to estimate how much memory you are going 
>>>>>>> to
>>>>>>> need and use that... or if that's not possible you can just increase it 
>>>>>>> and
>>>>>>> try the "set and see" approach. Check for memory leaks as well... 
>>>>>>> (unclosed
>>>>>>> resources and so on...!)
>>>>>>>
>>>>>>> Regards.
>>>>>>>
>>>>>>> ​A.​
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]> wrote:
>>>>>>>
>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>>>>>> possible that this is not enough. Try increasing Xmx i 
>>>>>>>>> worker.childopts.
>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi, All
>>>>>>>>>>
>>>>>>>>>> I have been running a trident topology on production server, code
>>>>>>>>>> is like this:
>>>>>>>>>>
>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>                 
>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new 
>>>>>>>>>> Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>         ;
>>>>>>>>>>
>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 
>>>>>>>>>> 1);
>>>>>>>>>>
>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to 
>>>>>>>>>> different field and write into postgresDB. But in storm UI, I did 
>>>>>>>>>> see such error, "java.lang.OutOfMemoryError: GC overhead limit 
>>>>>>>>>> exceeded". It all happens in same worker of each node - 6703. I 
>>>>>>>>>> understand this is because by default the JVM is configured to throw 
>>>>>>>>>> this error if you are spending more than *98% of the total time in 
>>>>>>>>>> GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>
>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by 
>>>>>>>>>> simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>
>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>
>>>>>>>>>>      - 6700
>>>>>>>>>>
>>>>>>>>>>      - 6701
>>>>>>>>>>
>>>>>>>>>>      - 6702
>>>>>>>>>>
>>>>>>>>>>      - 6703
>>>>>>>>>>
>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>
>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>
>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>
>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>> overcome?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> thanks in advance
>>>>>>>>>>
>>>>>>>>>> AL
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>
>>
>

Reply via email to