RE: Unable to Process Tuples In Storm cluster modeprasad ch 06-05-2015 To:
storm-usersHI FYI,
please help me
> > ----------------------------------------------> > MainApplication .java> >
> > ......................................> > public class MainApplication {> >
> > public static void main(String[] args) throws Exception {> > Config con =
> > new Config();> > con.setDebug(true);> > int i = Integer.parseInt(args[0]);>
> > > con.put("fileLocation", args[1]);> > con.put("outFile", args[2]);> >
> > con.put("ext", ".txt");> > con.setNumWorkers(Integer.parseInt(args[4]));> >
> > TopologyBuilder builder = new TopologyBuilder();> >
> > builder.setSpout("FileREADING-SPOUT", new FileReadingSpout(), i);> >
> > builder.setBolt("TEST-BOLT", new TestBolt(),> >
> > i).shuffleGrouping("FileREADING-SPOUT");> >
> > StormSubmitter.submitTopology(args[3], con,> > builder.createTopology()); }
> > } }> > > > > > FileReadingSpout.java> > ..................................>
> > > > > > > public class FileReadingSpout implements IRichSpout {> > static
> > int count=0;> > long windowTime;> > SpoutOutputCollector collector;> >
> > TopologyContext context;> > String fileLocation, ext;> > @Override> >
> > public void open(Map conf, TopologyContext context,> > SpoutOutputCollector
> > collector) {> > try {> > > > > >
> > System.out.println("********************************************************************");>
> > > System.out.println(" FileReadingSpout ********* "+new> > Date()+"
> > acker"+conf.get("TOPOLOGY_MESSAGE_TIMEOUT_SECS"));> > > >
> > System.out.println("********************************************************************");>
> > > > > this.context = context;> > this.collector = collector;> >
> > this.fileLocation = conf.get("fileLocation").toString();> > this.ext =
> > conf.get("ext").toString();> > } catch (Exception e) {> >
> > e.printStackTrace();> > }> > }> > > > @Override> > public void close() {> >
> > }> > > > @Override> > public void activate() {> > }> > > > @Override> >
> > public void deactivate() {> > }> > > > @Override> > public void nextTuple()
> > {> > String line;> > try {> >
> > System.out.println(Thread.currentThread().getName() + "in> > side
> > ReadingSpout ...*" + new Date());> > String l = getFileNames(fileLocation,
> > ext);> > if (l.length() > 0) {> > System.out.println("** list of files
> > count:" + l);> > File oldName = new File(l);> > String file =
> > l.replace(".txt", ".Reading");> > File newName = new File(file);> >
> > oldName.renameTo(newName);> > readFiles(file);> > }> > } catch (Exception
> > e) {> > e.printStackTrace();> > }> > > > }> > > > @Override> > public void
> > ack(Object msgId) {> > System.out.println(count+" In side ack ::* " +
> > msgId);> > }> > > > @Override> > public void fail(Object msgId) {> > }> > >
> > > @Override> > public void declareOutputFields(OutputFieldsDeclarer
> > declarer) {> > > > declarer.declare(new Fields("line"));> > > > }> > > >
> > @Override> > public Map<String, Object> getComponentConfiguration() {> >
> > return null;> > }> > > > public static long stringToLong(String st) {> >
> > long result = 0L;> > try {> > if (st != null) {> > SimpleDateFormat sdf =
> > new SimpleDateFormat("yyyy-MM-dd> > HH:mm:ss");> > Date d = sdf.parse(st);>
> > > result = d.getTime();> > }> > } catch (Exception e) {> >
> > e.printStackTrace();> > }> > return result;> > }> > > > public void
> > readFiles(String fileName) {> > String data = null;> > String arr[];> >
> > BufferedReader reader = null;> > try {> > reader = new BufferedReader(new
> > FileReader(fileName));> > > >
> > System.out.println("=====================*====================");> >
> > System.out.println(" current reading fileName is :" +> > fileName);> >
> > while ((data = reader.readLine()) != null && data.length()>0) {> >
> > count++;> > collector.emit(new Values(data),data);> > }> > > > } catch
> > (IOException e) {> > System.out.println(" file Not found :" + e);> > }
> > finally {> > try {> > if (reader != null) {> > reader.close();> >
> > System.out.println(" reader is closed successfully !");> > }> > } catch
> > (IOException e) {> > System.out.println("reader is not closed properly .."
> > + e);> > }> > }> > > > }> > > > public String getFileNames(String
> > folderpath, String extention) {> > List<String> fileNames = new
> > ArrayList();> > > > File file = new File(folderpath);> > final String ext =
> > extention;> > File[] files = file.listFiles(new FilenameFilter() {> >
> > @Override> > public boolean accept(File dir, String name) {> > if
> > (name.toLowerCase().endsWith(ext) &&> > !name.contains("ReadingComplete"))
> > {> > return true;> > } else {> > return false;> > }> > }> > });> >
> > Arrays.sort(files,> > LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);>
> > > for (File f : files) {> > return f.getAbsolutePath();> > }> > return "";>
> > > }> > > > }> > > > TestBolt.java> > ..........................> > > >
> > public class TestBolt implements IRichBolt {> > OutputCollector collector;>
> > > BufferedWriter writer;> > String filePath;> > @Override> > public void
> > prepare(Map stormConf, TopologyContext context,> > OutputCollector
> > collector) {> > this.collector = collector;> > try {> > > >
> > System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");>
> > > System.out.println(stormConf.get("outFile").toString()+"> > TESTBOLT
> > ********* "+new Date()+"> > acker"+stormConf.get("topology.environment"));>
> > > > >
> > System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");>
> > > > > writer = new BufferedWriter(new> >
> > FileWriter(stormConf.get("outFile").toString(),true));> > } catch
> > (Exception e) {> > e.printStackTrace();> > }> > }> > > > @Override> >
> > public void execute(Tuple input) {> >
> > System.out.println("^^^^^^^^^^^^^^^^^^");> > System.out.println(" tuple
> > data is :" + input.getString(0));> > try {> > collector.ack(input);> >
> > collector.emit(new Values(input));> > writer.write(input.getString(0) + "
> > \n");> > writer.flush();> > > > } catch (Exception e) {> >
> > e.printStackTrace();> > }> > > > }> > @Override> > public void cleanup() {>
> > > System.out.println(" in side clean up");> > }> > @Override> > public void
> > declareOutputFields(OutputFieldsDeclarer declarer) {> >
> > declarer.declare(new Fields("data"));> > }> > @Override> > public
> > Map<String, Object> getComponentConfiguration() {> > return null;> > }> >
> > }> >
Thanks !
From: [email protected]
To: [email protected]
Subject: Re: java.lang.OutOfMemoryError
Date: Thu, 7 May 2015 14:24:04 +0000
GC Overhead limit exceeded usually indicates you are rapidly instantiating and
releasing objects to the extent that the JVM is spending too much time in GC.
You could try increasing the worker heap size, but I’d take a look at your
topology first to see if you can calm down the instantiate / release thrashing.
Grant Overby
Software Engineer
Cisco.com
[email protected]
Mobile: 865 724 4910
Think before you print.
This email may contain confidential and privileged material for the sole use of
the intended recipient. Any review, use, distribution or disclosure by others
is strictly prohibited. If you are not the intended recipient (or authorized to
receive for the
recipient), please contact the sender by reply email and delete all copies of
this message.
Please click here for Company Registration Information.
From: prasad ch <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, May 7, 2015 at 9:54 AM
To: storm-users <[email protected]>
Subject: java.lang.OutOfMemoryError
HI,
When i run storm application , which contains one spout and bolt here my spout
is reading data from files
while my bolt is to receive tuples and write into a file(just reading and
writing.)
i did the example with 2 gb data of 10 files each file 200 mb, when i run
example i got below error
java.lang.OutOfMemoryError: GC overhead limit exceeded at
java.lang.reflect.Method.copy(Method.java:151) at
java.lang.reflect.ReflectAccess.copyMethod(ReflectAccess.java:136) at
sun.reflect.Reflect
what is the reason for above error , any configuration need to do in storm.yaml
,by default nimbus.childOpts contains Xmx1024mb
please help me.
what are the basic properties need to configure with large data like 2 gb or
more please help me
Thanks!
prasad.ch