RE: Unable to Process Tuples In Storm cluster modeprasad ch  06-05-2015  To: 
storm-usersHI FYI,
please help me
> > ----------------------------------------------> > MainApplication .java> > 
> > ......................................> > public class MainApplication {> > 
> > public static void main(String[] args) throws Exception {> > Config con = 
> > new Config();> > con.setDebug(true);> > int i = Integer.parseInt(args[0]);> 
> > > con.put("fileLocation", args[1]);> > con.put("outFile", args[2]);> > 
> > con.put("ext", ".txt");> > con.setNumWorkers(Integer.parseInt(args[4]));> > 
> > TopologyBuilder builder = new TopologyBuilder();> > 
> > builder.setSpout("FileREADING-SPOUT", new FileReadingSpout(), i);> > 
> > builder.setBolt("TEST-BOLT", new TestBolt(),> > 
> > i).shuffleGrouping("FileREADING-SPOUT");> > 
> > StormSubmitter.submitTopology(args[3], con,> > builder.createTopology()); } 
> > } }> > > > > > FileReadingSpout.java> > ..................................> 
> > > > > > > public class FileReadingSpout implements IRichSpout {> > static 
> > int count=0;> > long windowTime;> > SpoutOutputCollector collector;> > 
> > TopologyContext context;> > String fileLocation, ext;> > @Override> > 
> > public void open(Map conf, TopologyContext context,> > SpoutOutputCollector 
> > collector) {> > try {> > > > > > 
> > System.out.println("********************************************************************");>
> >  > System.out.println(" FileReadingSpout ********* "+new> > Date()+" 
> > acker"+conf.get("TOPOLOGY_MESSAGE_TIMEOUT_SECS"));> > > > 
> > System.out.println("********************************************************************");>
> >  > > > this.context = context;> > this.collector = collector;> > 
> > this.fileLocation = conf.get("fileLocation").toString();> > this.ext = 
> > conf.get("ext").toString();> > } catch (Exception e) {> > 
> > e.printStackTrace();> > }> > }> > > > @Override> > public void close() {> > 
> > }> > > > @Override> > public void activate() {> > }> > > > @Override> > 
> > public void deactivate() {> > }> > > > @Override> > public void nextTuple() 
> > {> > String line;> > try {> > 
> > System.out.println(Thread.currentThread().getName() + "in> > side 
> > ReadingSpout ...*" + new Date());> > String l = getFileNames(fileLocation, 
> > ext);> > if (l.length() > 0) {> > System.out.println("** list of files 
> > count:" + l);> > File oldName = new File(l);> > String file = 
> > l.replace(".txt", ".Reading");> > File newName = new File(file);> > 
> > oldName.renameTo(newName);> > readFiles(file);> > }> > } catch (Exception 
> > e) {> > e.printStackTrace();> > }> > > > }> > > > @Override> > public void 
> > ack(Object msgId) {> > System.out.println(count+" In side ack ::* " + 
> > msgId);> > }> > > > @Override> > public void fail(Object msgId) {> > }> > > 
> > > @Override> > public void declareOutputFields(OutputFieldsDeclarer 
> > declarer) {> > > > declarer.declare(new Fields("line"));> > > > }> > > > 
> > @Override> > public Map<String, Object> getComponentConfiguration() {> > 
> > return null;> > }> > > > public static long stringToLong(String st) {> > 
> > long result = 0L;> > try {> > if (st != null) {> > SimpleDateFormat sdf = 
> > new SimpleDateFormat("yyyy-MM-dd> > HH:mm:ss");> > Date d = sdf.parse(st);> 
> > > result = d.getTime();> > }> > } catch (Exception e) {> > 
> > e.printStackTrace();> > }> > return result;> > }> > > > public void 
> > readFiles(String fileName) {> > String data = null;> > String arr[];> > 
> > BufferedReader reader = null;> > try {> > reader = new BufferedReader(new 
> > FileReader(fileName));> > > > 
> > System.out.println("=====================*====================");> > 
> > System.out.println(" current reading fileName is :" +> > fileName);> > 
> > while ((data = reader.readLine()) != null && data.length()>0) {> > 
> > count++;> > collector.emit(new Values(data),data);> > }> > > > } catch 
> > (IOException e) {> > System.out.println(" file Not found :" + e);> > } 
> > finally {> > try {> > if (reader != null) {> > reader.close();> > 
> > System.out.println(" reader is closed successfully !");> > }> > } catch 
> > (IOException e) {> > System.out.println("reader is not closed properly .." 
> > + e);> > }> > }> > > > }> > > > public String getFileNames(String 
> > folderpath, String extention) {> > List<String> fileNames = new 
> > ArrayList();> > > > File file = new File(folderpath);> > final String ext = 
> > extention;> > File[] files = file.listFiles(new FilenameFilter() {> > 
> > @Override> > public boolean accept(File dir, String name) {> > if 
> > (name.toLowerCase().endsWith(ext) &&> > !name.contains("ReadingComplete")) 
> > {> > return true;> > } else {> > return false;> > }> > }> > });> > 
> > Arrays.sort(files,> > LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);> 
> > > for (File f : files) {> > return f.getAbsolutePath();> > }> > return "";> 
> > > }> > > > }> > > > TestBolt.java> > ..........................> > > > 
> > public class TestBolt implements IRichBolt {> > OutputCollector collector;> 
> > > BufferedWriter writer;> > String filePath;> > @Override> > public void 
> > prepare(Map stormConf, TopologyContext context,> > OutputCollector 
> > collector) {> > this.collector = collector;> > try {> > > > 
> > System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");>
> >  > System.out.println(stormConf.get("outFile").toString()+"> > TESTBOLT 
> > ********* "+new Date()+"> > acker"+stormConf.get("topology.environment"));> 
> > > > > 
> > System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");>
> >  > > > writer = new BufferedWriter(new> > 
> > FileWriter(stormConf.get("outFile").toString(),true));> > } catch 
> > (Exception e) {> > e.printStackTrace();> > }> > }> > > > @Override> > 
> > public void execute(Tuple input) {> > 
> > System.out.println("^^^^^^^^^^^^^^^^^^");> > System.out.println(" tuple 
> > data is :" + input.getString(0));> > try {> > collector.ack(input);> > 
> > collector.emit(new Values(input));> > writer.write(input.getString(0) + " 
> > \n");> > writer.flush();> > > > } catch (Exception e) {> > 
> > e.printStackTrace();> > }> > > > }> > @Override> > public void cleanup() {> 
> > > System.out.println(" in side clean up");> > }> > @Override> > public void 
> > declareOutputFields(OutputFieldsDeclarer declarer) {> > 
> > declarer.declare(new Fields("data"));> > }> > @Override> > public 
> > Map<String, Object> getComponentConfiguration() {> > return null;> > }> > 
> > }> > 

Thanks !
From: [email protected]
To: [email protected]
Subject: Re: java.lang.OutOfMemoryError
Date: Thu, 7 May 2015 14:24:04 +0000







GC Overhead limit exceeded usually indicates you are rapidly instantiating and 
releasing objects to the extent that the JVM is spending too much time in GC.



You could try increasing the worker heap size, but I’d take a look at your 
topology first to see if you can calm down the instantiate / release thrashing.














Grant Overby

Software Engineer

Cisco.com

[email protected]

Mobile: 865 724 4910





















 Think before you print.



This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the
 recipient), please contact the sender by reply email and delete all copies of 
this message.

Please click here for Company Registration Information.























From: prasad ch <[email protected]>

Reply-To: "[email protected]" <[email protected]>

Date: Thursday, May 7, 2015 at 9:54 AM

To: storm-users <[email protected]>

Subject: java.lang.OutOfMemoryError







HI,



When i run storm application , which contains one spout and bolt here my spout 
is reading data from files
while my bolt is to receive tuples and write into a file(just reading and 
writing.)
i did the example with 2 gb data  of 10 files each file 200 mb, when i run 
example i got  below  error






java.lang.OutOfMemoryError: GC overhead limit exceeded at 
java.lang.reflect.Method.copy(Method.java:151) at 
java.lang.reflect.ReflectAccess.copyMethod(ReflectAccess.java:136) at 
sun.reflect.Reflect






what is the reason for above error , any configuration need to do in storm.yaml 
,by default nimbus.childOpts contains Xmx1024mb



please help me.



what are the basic properties need to configure with large data like 2 gb or 
more  please help me









Thanks!



prasad.ch






                                          

Reply via email to