HI,

I designed a storm sample topology, which is contains a spout ,and bolt.Here am 
reading from files(currently i have 10 files each file having 1000 records) 
using spout and process to bolt.In bolt ,am processing  to receiving tuples by 
writing into a file .
for my example totally my spout emitting 10000 records ,while my bolt also 
receive 10000 tuples and write into file.Here problem is when i run local mode  
everything is fine.but when i run in cluster mode with any number of workers 
,my bolt will receive or write 5000  or 7000 (randomly )records into file. am 
loss the data please help me !

below is the my application code.---------------------------------------------- 
  MainApplication .java......................................public class 
MainApplication {    public static void main(String[] args) throws Exception {  
      Config con = new Config();        con.setDebug(true);        int i = 
Integer.parseInt(args[0]);        con.put("fileLocation", args[1]);        
con.put("outFile", args[2]);        con.put("ext", ".txt");        
con.setNumWorkers(Integer.parseInt(args[4]));        TopologyBuilder builder = 
new TopologyBuilder();        builder.setSpout("FileREADING-SPOUT", new 
FileReadingSpout(), i);        builder.setBolt("TEST-BOLT", new TestBolt(), 
i).shuffleGrouping("FileREADING-SPOUT");            
StormSubmitter.submitTopology(args[3], con, builder.createTopology());   } } }

FileReadingSpout.java..................................

public class FileReadingSpout implements IRichSpout {    static int count=0;    
long windowTime;    SpoutOutputCollector collector;    TopologyContext context; 
   String fileLocation, ext;       @Override    public void open(Map conf, 
TopologyContext context, SpoutOutputCollector collector) {        try {
             
System.out.println("********************************************************************");
            System.out.println(" FileReadingSpout  ********* "+new Date()+" 
acker"+conf.get("TOPOLOGY_MESSAGE_TIMEOUT_SECS"));             
System.out.println("********************************************************************");
                        this.context = context;            this.collector = 
collector;            this.fileLocation = conf.get("fileLocation").toString();  
          this.ext = conf.get("ext").toString();        } catch (Exception e) { 
           e.printStackTrace();        }    }
    @Override    public void close() {    }
    @Override    public void activate() {    }
    @Override    public void deactivate() {    }
    @Override    public void nextTuple() {        String line;        try {     
       System.out.println(Thread.currentThread().getName() + "in side 
ReadingSpout ...*" + new Date());            String l = 
getFileNames(fileLocation, ext);            if (l.length() > 0) {               
 System.out.println("** list of files  count:" + l);                File 
oldName = new File(l);                String file = l.replace(".txt", 
".Reading");                File newName = new File(file);                
oldName.renameTo(newName);                readFiles(file);            }        
} catch (Exception e) {            e.printStackTrace();        }
    }
    @Override    public void ack(Object msgId) {        
System.out.println(count+" In side ack ::* " + msgId);    }
    @Override    public void fail(Object msgId) {    }
    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) 
{
        declarer.declare(new Fields("line"));            }
    @Override    public Map<String, Object> getComponentConfiguration() {       
 return null;    }
    public static long stringToLong(String st) {        long result = 0L;       
 try {            if (st != null) {                SimpleDateFormat sdf = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");                Date d = sdf.parse(st); 
               result = d.getTime();            }        } catch (Exception e) 
{            e.printStackTrace();        }        return result;    }
    public void readFiles(String fileName) {        String data = null;        
String arr[];        BufferedReader reader = null;        try {            
reader = new BufferedReader(new FileReader(fileName));            
System.out.println("=====================*====================");            
System.out.println(" current reading fileName is  :" + fileName);            
while ((data = reader.readLine()) != null && data.length()>0) {                
count++;                    collector.emit(new Values(data),data);            }
        } catch (IOException e) {            System.out.println(" file Not 
found :" + e);        } finally {            try {                if (reader != 
null) {                    reader.close();                    
System.out.println(" reader is closed successfully !");                }        
    } catch (IOException e) {                System.out.println("reader is not 
closed properly .." + e);            }        }
    }
    public String getFileNames(String folderpath, String extention) {        
List<String> fileNames = new ArrayList();
        File file = new File(folderpath);        final String ext = extention;  
      File[] files = file.listFiles(new FilenameFilter() {            @Override 
           public boolean accept(File dir, String name) {                if 
(name.toLowerCase().endsWith(ext) && !name.contains("ReadingComplete")) {       
             return true;                } else {                    return 
false;                }            }        });        Arrays.sort(files, 
LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);        for (File f : 
files) {            return f.getAbsolutePath();        }        return "";    }
}
TestBolt.java..........................
public class TestBolt implements IRichBolt {    OutputCollector collector;    
BufferedWriter writer;    String filePath;@Override    public void prepare(Map 
stormConf, TopologyContext context, OutputCollector collector) {        
this.collector = collector;        try {            
System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            System.out.println(stormConf.get("outFile").toString()+" TESTBOLT 
********* "+new Date()+" acker"+stormConf.get("topology.environment"));         
   
System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            writer = new BufferedWriter(new 
FileWriter(stormConf.get("outFile").toString(),true));        } catch 
(Exception e) {           e.printStackTrace();        }    }
    @Override    public void execute(Tuple input) {        
System.out.println("^^^^^^^^^^^^^^^^^^");        System.out.println(" tuple 
data is :" + input.getString(0));        try {            collector.ack(input); 
           collector.emit(new Values(input));            
writer.write(input.getString(0) + " \n");            writer.flush();
        } catch (Exception e) {e.printStackTrace();        }
    }@Override    public void cleanup() {        System.out.println(" in side 
clean up");    }    @Override    public void 
declareOutputFields(OutputFieldsDeclarer declarer) {        
declarer.declare(new Fields("data"));    }@Override    public Map<String, 
Object> getComponentConfiguration() {        return null;    }}



For Cluster Mode folloowing to execute above app
 storm jar /apps/apache-storm-0.9.3/lib/storm-parallelism-ex.jar  
com.jesus.storm.MainApplication  5  /FS/testFiles/stormfiles 
/FS/testFiles/result/result.txt MAIN 2

in above i used 5 executors and 2 workers , even i increase the number of 
worker also  no change please help me



THANK YOU FOR YOUR VALUABLE HELP 😊
Regard'sprasad.ch




                                          

Reply via email to