Hi, All,
,
I kind locate where the problem come from, in my running command, I will
specify the clientid of TridentKafkaConfig, if I keep the clientid as the
one I used before, it will cause GC error, otherwise I am completely OK.
Here is the code:

if (parameters.containsKey("clientid")) {
    logger.info("topic=>" + parameters.get("clientid") + "/" +
parameters.get("topic"));
    spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"),
parameters.get("clientid"));

Any idea about this error?


Thanks


AL


On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <[email protected]> wrote:

> Sorry, continue last thread:
>
> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.RuntimeException: Remote address is
> not reachable. We will close this client Netty-Client-complicated-laugh/
> 10.100.98.103:6703
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> Caused by: java.lang.RuntimeException: Remote address is not reachable. We
> will close this client Netty-Client-complicated-laugh/10.100.98.103:6703
>         at backtype.storm.messaging.netty.Client.connect(Client.java:171)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.messaging.netty.Client.send(Client.java:194)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> ~[storm-core-0.9.3.jar:0.9.3]
>         ... 6 common frames omitted
> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async
> loop died!")
> java.lang.RuntimeException: ("Async loop died!")
>         at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.RestFn.invoke(RestFn.java:423)
> [clojure-1.5.1.jar:na]
>         at
> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
> [storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client
> Netty-Client-beloved-judge/10.100.98.104:6703
>
> I doubt this is caused by my eventUpfater, which write data in batch
>
> static class EventUpdater implements ReducerAggregator<List<String>> {
>
>             @Override
>             public List<String> init(){
>                      return null;
>             }
>
>             @Override
>             public List<String> reduce(List<String> curr, TridentTuple tuple) 
> {
>                    List<String> updated = null ;
>
>                    if ( curr == null ) {
>                                     String event = (String) tuple.getValue(1);
>                                     System.out.println("===:" + event + ":");
>                                     updated = Lists.newArrayList(event);
>                    } else {
>                                     System.out.println("===+" +  tuple + ":");
>                                     updated = curr ;
>                    }
> //              System.out.println("(())");
>               return updated ;
>             }
>         }
>
> How do you think
>
> THanks
>
> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <[email protected]> wrote:
>
>> Thank you very much for the reply, here is error I saw in production
>> server worker-6703.log,
>>
>>
>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <[email protected]> wrote:
>>
>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>> remote machines and run the tools over X or something.  I'm assuming this
>>> is a development cluster (not live / production) and that installing
>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>
>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <[email protected]>
>>> wrote:
>>>
>>>> Nathan I think that if he wants to profile a bolt per se that runs in a
>>>> worker that resides in a different cluster node than the one the profiling
>>>> tool runs he won't be able to attach the process since it resides in a
>>>> different physical machine, me thinks (well, now that I think of it better
>>>> it can be done... via remote debugging but that's just a pain in the ***).
>>>>
>>>> Regards,
>>>>
>>>> A.
>>>>
>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <[email protected]> wrote:
>>>>
>>>>> You don't need to change your code. As Andrew mentioned you can get a
>>>>> lot of mileage by profiling your logic in a standalone program. For
>>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>>> straightforward to use and you can also find good guides with a Google
>>>>> search.
>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> ​
>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java does
>>>>>> a lot for you. Generally though, as long as you avoid using "new" 
>>>>>> operator
>>>>>> and close any resources that you do not use you should be fine... but a
>>>>>> Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive 
>>>>>> it.
>>>>>> I am pretty sure that you need a working jar (or compilable code that 
>>>>>> has a
>>>>>> main function in it) in order to profile it, although if you want to
>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>> (or portions of it) can be put in a sample test program that is able to 
>>>>>> be
>>>>>> executed locally for you to profile it.
>>>>>>
>>>>>> Hope this helped. Regards,
>>>>>>
>>>>>> A.
>>>>>> ​
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <[email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Unfortunately that is not fixed, it depends on the computations and
>>>>>>>> data-structures you have; in my case for example I use more than 2GB 
>>>>>>>> since
>>>>>>>> I need to keep a large matrix in memory... having said that, in most 
>>>>>>>> cases
>>>>>>>> it should be relatively easy to estimate how much memory you are going 
>>>>>>>> to
>>>>>>>> need and use that... or if that's not possible you can just increase 
>>>>>>>> it and
>>>>>>>> try the "set and see" approach. Check for memory leaks as well... 
>>>>>>>> (unclosed
>>>>>>>> resources and so on...!)
>>>>>>>>
>>>>>>>> Regards.
>>>>>>>>
>>>>>>>> ​A.​
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>>>>>>> possible that this is not enough. Try increasing Xmx i 
>>>>>>>>>> worker.childopts.
>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi, All
>>>>>>>>>>>
>>>>>>>>>>> I have been running a trident topology on production server,
>>>>>>>>>>> code is like this:
>>>>>>>>>>>
>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>                 
>>>>>>>>>>> .persistentAggregate(PostgresqlState.newFactory(config), new 
>>>>>>>>>>> Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>         ;
>>>>>>>>>>>
>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 
>>>>>>>>>>> 1);
>>>>>>>>>>>
>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to 
>>>>>>>>>>> different field and write into postgresDB. But in storm UI, I did 
>>>>>>>>>>> see such error, "java.lang.OutOfMemoryError: GC overhead limit 
>>>>>>>>>>> exceeded". It all happens in same worker of each node - 6703. I 
>>>>>>>>>>> understand this is because by default the JVM is configured to 
>>>>>>>>>>> throw this error if you are spending more than *98% of the total 
>>>>>>>>>>> time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>
>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by 
>>>>>>>>>>> simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>
>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>
>>>>>>>>>>>      - 6700
>>>>>>>>>>>
>>>>>>>>>>>      - 6701
>>>>>>>>>>>
>>>>>>>>>>>      - 6702
>>>>>>>>>>>
>>>>>>>>>>>      - 6703
>>>>>>>>>>>
>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>
>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>
>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>
>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>>> overcome?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> thanks in advance
>>>>>>>>>>>
>>>>>>>>>>> AL
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>>>
>>
>

Reply via email to