Hi.

Please keep in mind that Spout uses single thread to follow control flow.
For example, it would be better to return fast within nextTuple() cause
Spout cannot check acker's response while in nextTuple() so already acked
tuples can be timed out.
Reading 100 mb files and emit whole lines in one nextTuple() call could be
too long.
You can check your nextTuple()'s elapsed time, and compare to message
timeout sec.

Hope this helps.

Thanks.
Jungtaek Lim (HeartSaVioR)


2015-05-11 23:04 GMT+09:00 prasad ch <[email protected]>:

> *Hi,*
>
>
> i designed sample topology , which will contain one spout and two bolts.
>
> Here my spout is for reading list of files and emit tuple ,
> here i write the code when control comes to next tuple i designed the
> logic which will pick a file and keep emitting until reached to end of the
> file.
> this way is correct , if any thing wrong please help me. is  it possible
> to  read list of files  i mean if we have  specified location
> which may contain any number of files with any number of records this
> scenario is* possible in spout.?*
>
>
> *When i submit topology with 5 executors for spout and 5 executors for
> bolt1  , 5 executors for bolt2 and 5 workers *
> *in storm cluster with 3 machines one is nimbus while remaining two are
> supervisors.*
>
> *am unable to perform with this example ,for 10 files (each file has 100
> mb size),it's looks like very slow.*
>
> *please specify any setting i  forget to configure in topology.*
>
> FileReadingSpout .java
> ----------------------------------
> public class FileReadingSpout implements IRichSpout {
>
>     //BufferedReader reader;
>     static int count = 0;
>     SpoutOutputCollector collector;
>     TopologyContext context;
>     String fileLocation, ext;
>
>     @Override
>     public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
>         try {
>             //this.reader = new BufferedReader(new
> FileReader("D://abc1.txt"));
>             System.out.println(" Worker Port :-> " +
> context.getThisWorkerPort());
>
> System.out.println("********************************************************************");
>             System.out.println(" FileReadingSpout  ********* " + new
> Date() + " currentmillseconds --> :" + System.currentTimeMillis());
>
> System.out.println("********************************************************************");
>
>             this.context = context;
>             this.collector = collector;
>             this.fileLocation = conf.get("fileLocation").toString();
>             this.ext = conf.get("ext").toString();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
>
>     @Override
>     public void close() {
>     }
>
>     @Override
>     public void activate() {
>     }
>
>     @Override
>     public void deactivate() {
>     }
>
>     @Override
>     public void nextTuple() {
>         String line;
>         try {
>             System.out.println(Thread.currentThread().getName() + "in side
> ReadingSpout ...*" + new Date() + " currentymillseconds --> " +
> System.currentTimeMillis());
> //            while ((line = reader.readLine()) != null) {
> //                if (validateTuple(line)) {
> //                    System.out.println(" data reading :" + line);
> //                    this.collector.emit(new Values(line));
> //                }
> //            }
>             System.out.println(" context :-->  getThisWorkerPort* " +
> context.getThisWorkerPort());
>             System.out.println(" context :-->  getThisWorkerTasks * " +
> context.getThisWorkerTasks() + "  current getThisWorkerTasks " +
> context.getThisTaskId());
>
>             String l = getFileNames(fileLocation, ext);
>             if (l.length() > 0) {
>                 System.out.println("** list of files  count:" + l);
>                 File oldName = new File(l);
>                 String file = l.replace(".txt", ".Reading");
>                 File newName = new File(file);
>                 oldName.renameTo(newName);
>                 readFiles(file);
>             }
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>
>     }
>
>     @Override
>     public void ack(Object msgId) {
>         System.out.println(count + " In side ack ::* " + msgId);
>     }
>
>     @Override
>     public void fail(Object msgId) {
>         System.out.println(" In side fail ::* " + msgId);
>
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>
>         declarer.declare(new Fields("feed"));
>
>     }
>
>     @Override
>     public Map<String, Object> getComponentConfiguration() {
>         return null;
>     }
>
>
>     public static long stringToLong(String st) {
>         long result = 0L;
>         try {
>             if (st != null) {
>                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> HH:mm:ss");
>                 Date d = sdf.parse(st);
>                 result = d.getTime();
>             }
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>         return result;
>     }
>
>     public void readFiles(String fileName) {
>         String data = null;
>         String arr[];
>         BufferedReader reader = null;
>         try {
>             reader = new BufferedReader(new FileReader(fileName));
>             System.out.println(" current reading fileName is  :" +
> fileName);
>             while ((data = reader.readLine()) != null && data.length() >
> 0) {
>
>                     collector.emit(new Values(data),data);
>
>             }
>
>         } catch (IOException e) {
>             System.out.println(" file Not found :" + e);
>         } finally {
>             try {
>                 if (reader != null) {
>                     reader.close();
>                     System.out.println(" reader is closed successfully !");
>                 }
>             } catch (IOException e) {
>                 System.out.println("reader is not closed properly .." + e);
>             }
>         }
>
>     }
>
>     public String getFileNames(String folderpath, String extention) {
>         List<String> fileNames = new ArrayList();
>
>         File file = new File(folderpath);
>         final String ext = extention;
>         File[] files = file.listFiles(new FilenameFilter() {
>             @Override
>             public boolean accept(File dir, String name) {
>                 if (name.toLowerCase().endsWith(ext) &&
> !name.contains("ReadingComplete")) {
>                     return true;
>                 } else {
>                     return false;
>                 }
>             }
>         });
>         Arrays.sort(files,
> LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);
>         for (File f : files) {
>             return f.getAbsolutePath();
> //            fileNames.add(f.getAbsolutePath());
>         }
>         return "";
>     }
>
> }
>
>
>
> My main application
>
> storm .java
> ------------
>
> public class storm {
>
>     /**
>      * @param args the command line arguments
>      */
>     public static Properties objProperties = null;
>
>     public static void submitTopology(String propFileLoc, String
> topologyName) throws Exception {
>         String spout = "FILE_READING_SPOUT", bolt = "MOU_BOLT";
>         Config con = new Config();
>         con.setDebug(true);
> //        con.put("fileLocation", "D:\\strom\\inputFiles");
> //        //con.put("outFile",".txt");
> //        con.put("outFile",
> "D:/strom/inputFiles/result/CallDurationGroupingBolt.txt");
> //
>  con.put("outFile1","D:/strom/inputFiles/result/EachCallDurationBolt.txt");
> //
>
>         objProperties = new Properties();
>         objProperties.load(new FileReader(propFileLoc));
>         System.out.println(" PROPERTIES  ARE LOADED " +
> objProperties.size());
>
>         con.put("fileLocation", objProperties.get("inputFileLoc"));
>         con.put("outFile", objProperties.get("outputFileLoc"));
>         //con.put("windowTimeInMS",3600000);
>         con.put("callDurationValInSec",
> Integer.parseInt(objProperties.get("callDuartionValInSec").toString()));
>
>         con.put("ext", ".txt");
>
> con.setNumWorkers(Integer.parseInt(objProperties.get("NumberOfWorkers").toString()));
>         int eB =
> Integer.parseInt(objProperties.get("ExecutorsForBolt").toString());
>         int eS =
> Integer.parseInt(objProperties.get("ExecutorsForSpout").toString());
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout(spout, new FileReadingSpout(), 10);
>         builder.setBolt(bolt, new
> CallDurationGroupingBolt(Integer.parseInt(objProperties.get("windowSizeInSec").toString()),
> Integer.parseInt(objProperties.get("SlidingTimeInSec").toString())),
> 10).fieldsGrouping(spout, new Fields("feed"));
>         builder.setBolt("MOU", new
> EachCallDurationBolt(Integer.parseInt(objProperties.get("windowSizeInSec").toString())),
> 10).shuffleGrouping(spout);
>         if (topologyName != null && topologyName.length() > 0) {
>             System.out.println(" SUBMITTING TOPOLOGY INTO STORM CLUSTER
> ...");
>             StormSubmitter.submitTopology(topologyName, con,
> builder.createTopology());
>         } else {
>             LocalCluster cluster = new LocalCluster();
>             cluster.submitTopology("MOU_TOPOLOGY", con,
> builder.createTopology());
>         }
>     }
>
>     public static void main(String[] args) {
>         // TODO code application logic here
>         try {
>             String fl=args[0];
>             String tn=args[1];
>             if (tn.equals("1")) {
>                 System.out.println("SUBMITTED IN LOCAL CLUSTER MODE");
>                 submitTopology(fl, null);
>             } else {
>                 submitTopology(fl, tn);
>
>             }
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
>
> }
>
>
>
>
> Thanks
>
> Prasad.ch
>
>
>
>
>
>
>
>
>
>


-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to