Hi,

i designed sample topology , which will contain one spout and two bolts.
Here my spout is for reading list of files and emit tuple ,here i write the 
code when control comes to next tuple i designed the logic which will pick a 
file and keep emitting until reached to end of the file. this way is correct , 
if any thing wrong please help me. is  it possible to  read list of files  i 
mean if we have  specified locationwhich may contain any number of files with 
any number of records this scenario is possible in spout.?

When i submit topology with 5 executors for spout and 5 executors for bolt1  , 
5 executors for bolt2 and 5 workers in storm cluster with 3 machines one is 
nimbus while remaining two are supervisors.
am unable to perform with this example ,for 10 files (each file has 100 mb 
size),it's looks like very slow.
please specify any setting i  forget to configure in topology.
FileReadingSpout .java----------------------------------public class 
FileReadingSpout implements IRichSpout {
    //BufferedReader reader;    static int count = 0;    SpoutOutputCollector 
collector;    TopologyContext context;    String fileLocation, ext;
    @Override    public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {        try {            //this.reader = new 
BufferedReader(new FileReader("D://abc1.txt"));            System.out.println(" 
Worker Port :-> " + context.getThisWorkerPort());            
System.out.println("********************************************************************");
            System.out.println(" FileReadingSpout  ********* " + new Date() + " 
currentmillseconds --> :" + System.currentTimeMillis());            
System.out.println("********************************************************************");
            this.context = context;            this.collector = collector;      
      this.fileLocation = conf.get("fileLocation").toString();            
this.ext = conf.get("ext").toString();        } catch (Exception e) {           
 e.printStackTrace();        }    }
    @Override    public void close() {    }
    @Override    public void activate() {    }
    @Override    public void deactivate() {    }
    @Override    public void nextTuple() {        String line;        try {     
       System.out.println(Thread.currentThread().getName() + "in side 
ReadingSpout ...*" + new Date() + " currentymillseconds --> " + 
System.currentTimeMillis());//            while ((line = reader.readLine()) != 
null) {//                if (validateTuple(line)) {//                    
System.out.println(" data reading :" + line);//                    
this.collector.emit(new Values(line));//                }//            }        
    System.out.println(" context :-->  getThisWorkerPort* " + 
context.getThisWorkerPort());            System.out.println(" context :-->  
getThisWorkerTasks * " + context.getThisWorkerTasks() + "  current 
getThisWorkerTasks " + context.getThisTaskId());
            String l = getFileNames(fileLocation, ext);            if 
(l.length() > 0) {                System.out.println("** list of files  count:" 
+ l);                File oldName = new File(l);                String file = 
l.replace(".txt", ".Reading");                File newName = new File(file);    
            oldName.renameTo(newName);                readFiles(file);          
  }        } catch (Exception e) {            e.printStackTrace();        }
    }
    @Override    public void ack(Object msgId) {        
System.out.println(count + " In side ack ::* " + msgId);    }
    @Override    public void fail(Object msgId) {        System.out.println(" 
In side fail ::* " + msgId);
    }
    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) 
{
        declarer.declare(new Fields("feed"));
    }
    @Override    public Map<String, Object> getComponentConfiguration() {       
 return null;    }

    public static long stringToLong(String st) {        long result = 0L;       
 try {            if (st != null) {                SimpleDateFormat sdf = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");                Date d = sdf.parse(st); 
               result = d.getTime();            }        } catch (Exception e) 
{            e.printStackTrace();        }        return result;    }
    public void readFiles(String fileName) {        String data = null;        
String arr[];        BufferedReader reader = null;        try {            
reader = new BufferedReader(new FileReader(fileName));            
System.out.println(" current reading fileName is  :" + fileName);            
while ((data = reader.readLine()) != null && data.length() > 0) {               
                 collector.emit(new Values(data),data);                         
 }
        } catch (IOException e) {            System.out.println(" file Not 
found :" + e);        } finally {            try {                if (reader != 
null) {                    reader.close();                    
System.out.println(" reader is closed successfully !");                }        
    } catch (IOException e) {                System.out.println("reader is not 
closed properly .." + e);            }        }
    }
    public String getFileNames(String folderpath, String extention) {        
List<String> fileNames = new ArrayList();
        File file = new File(folderpath);        final String ext = extention;  
      File[] files = file.listFiles(new FilenameFilter() {            @Override 
           public boolean accept(File dir, String name) {                if 
(name.toLowerCase().endsWith(ext) && !name.contains("ReadingComplete")) {       
             return true;                } else {                    return 
false;                }            }        });        Arrays.sort(files, 
LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);        for (File f : 
files) {            return f.getAbsolutePath();//            
fileNames.add(f.getAbsolutePath());        }        return "";    }
}


My main application
storm .java------------
public class storm {
    /**     * @param args the command line arguments     */    public static 
Properties objProperties = null;
    public static void submitTopology(String propFileLoc, String topologyName) 
throws Exception {        String spout = "FILE_READING_SPOUT", bolt = 
"MOU_BOLT";        Config con = new Config();        con.setDebug(true);//      
  con.put("fileLocation", "D:\\strom\\inputFiles");//        
//con.put("outFile",".txt");//        con.put("outFile", 
"D:/strom/inputFiles/result/CallDurationGroupingBolt.txt");//        
con.put("outFile1","D:/strom/inputFiles/result/EachCallDurationBolt.txt");//    
    
        objProperties = new Properties();        objProperties.load(new 
FileReader(propFileLoc));        System.out.println(" PROPERTIES  ARE LOADED " 
+ objProperties.size());
        con.put("fileLocation", objProperties.get("inputFileLoc"));        
con.put("outFile", objProperties.get("outputFileLoc"));        
//con.put("windowTimeInMS",3600000);        con.put("callDurationValInSec", 
Integer.parseInt(objProperties.get("callDuartionValInSec").toString()));
        con.put("ext", ".txt");        
con.setNumWorkers(Integer.parseInt(objProperties.get("NumberOfWorkers").toString()));
        int eB = 
Integer.parseInt(objProperties.get("ExecutorsForBolt").toString());        int 
eS = Integer.parseInt(objProperties.get("ExecutorsForSpout").toString());       
 TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout(spout, new FileReadingSpout(), 10);        
builder.setBolt(bolt, new 
CallDurationGroupingBolt(Integer.parseInt(objProperties.get("windowSizeInSec").toString()),
 Integer.parseInt(objProperties.get("SlidingTimeInSec").toString())), 
10).fieldsGrouping(spout, new Fields("feed"));        builder.setBolt("MOU", 
new 
EachCallDurationBolt(Integer.parseInt(objProperties.get("windowSizeInSec").toString())),
 10).shuffleGrouping(spout);        if (topologyName != null && 
topologyName.length() > 0) {            System.out.println(" SUBMITTING 
TOPOLOGY INTO STORM CLUSTER ...");            
StormSubmitter.submitTopology(topologyName, con, builder.createTopology());     
   } else {            LocalCluster cluster = new LocalCluster();            
cluster.submitTopology("MOU_TOPOLOGY", con, builder.createTopology());        } 
   }
    public static void main(String[] args) {        // TODO code application 
logic here        try {            String fl=args[0];            String 
tn=args[1];            if (tn.equals("1")) {                
System.out.println("SUBMITTED IN LOCAL CLUSTER MODE");                
submitTopology(fl, null);            } else {                submitTopology(fl, 
tn);
            }        } catch (Exception e) {            e.printStackTrace();    
    }    }
}



Thanks
Prasad.ch








                                          

Reply via email to