If I get it right, you have multiple bolt tasks writing to the same
output file. Because your data set is quite small, I would assume, that
some tasks receive data later then other, and overwrite the result of
previously finished tasks.

If you ensure, that all 5 bolt tasks are located on different hosts, it
should work.

Did you check the Storm UI to see if
 * multiple bolt tasks are deployed to the same node and
 * how many tuples are processed by each bolt taks?


-Matthias


On 05/06/2015 01:31 PM, prasad ch wrote:
> HI,
> 
> 
> I designed a storm sample topology, which is contains a spout ,and bolt.
> Here am reading from files(currently i have 10 files each file having
> 1000 records) using spout and process to bolt.
> In bolt ,am processing  to receiving tuples by writing into a file .
> 
> for my example totally my spout emitting 10000 records ,while my bolt
> also receive 10000 tuples and write into file.
> Here problem is when i run local mode  everything is fine.but when i run
> in cluster mode with any number of workers ,
> my bolt will receive or write 5000  or 7000 (randomly )records into
> file. am loss the data please help me !
> 
> 
> below is the my application code.
> ----------------------------------------------
>    MainApplication .java
> ......................................
> public class MainApplication {
>     public static void main(String[] args) throws Exception {
>         Config con = new Config();
>         con.setDebug(true);
>         int i = Integer.parseInt(args[0]);
>         con.put("fileLocation", args[1]);
>         con.put("outFile", args[2]);
>         con.put("ext", ".txt");
>         con.setNumWorkers(Integer.parseInt(args[4]));
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("FileREADING-SPOUT", new FileReadingSpout(), i);
>         builder.setBolt("TEST-BOLT", new TestBolt(),
> i).shuffleGrouping("FileREADING-SPOUT");
>             StormSubmitter.submitTopology(args[3], con,
> builder.createTopology());   } } }
> 
> 
> FileReadingSpout.java
> ..................................
> 
> 
> public class FileReadingSpout implements IRichSpout {
>     static int count=0;
>     long windowTime;
>     SpoutOutputCollector collector;
>     TopologyContext context;
>     String fileLocation, ext;
>        @Override
>     public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
>         try {
> 
>            
>  
> System.out.println("********************************************************************");
>             System.out.println(" FileReadingSpout  ********* "+new
> Date()+" acker"+conf.get("TOPOLOGY_MESSAGE_TIMEOUT_SECS"));
>            
>  
> System.out.println("********************************************************************");
>             
>             this.context = context;
>             this.collector = collector;
>             this.fileLocation = conf.get("fileLocation").toString();
>             this.ext = conf.get("ext").toString();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
> 
>     @Override
>     public void close() {
>     }
> 
>     @Override
>     public void activate() {
>     }
> 
>     @Override
>     public void deactivate() {
>     }
> 
>     @Override
>     public void nextTuple() {
>         String line;
>         try {
>             System.out.println(Thread.currentThread().getName() + "in
> side ReadingSpout ...*" + new Date());
>             String l = getFileNames(fileLocation, ext);
>             if (l.length() > 0) {
>                 System.out.println("** list of files  count:" + l);
>                 File oldName = new File(l);
>                 String file = l.replace(".txt", ".Reading");
>                 File newName = new File(file);
>                 oldName.renameTo(newName);
>                 readFiles(file);
>             }
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
> 
>     }
> 
>     @Override
>     public void ack(Object msgId) {
>         System.out.println(count+" In side ack ::* " + msgId);
>     }
> 
>     @Override
>     public void fail(Object msgId) {
>     }
> 
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
> 
>         declarer.declare(new Fields("line"));
>         
>     }
> 
>     @Override
>     public Map<String, Object> getComponentConfiguration() {
>         return null;
>     }
> 
>     public static long stringToLong(String st) {
>         long result = 0L;
>         try {
>             if (st != null) {
>                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> HH:mm:ss");
>                 Date d = sdf.parse(st);
>                 result = d.getTime();
>             }
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>         return result;
>     }
> 
>     public void readFiles(String fileName) {
>         String data = null;
>         String arr[];
>         BufferedReader reader = null;
>         try {
>             reader = new BufferedReader(new FileReader(fileName));
>            
> System.out.println("=====================*====================");
>             System.out.println(" current reading fileName is  :" +
> fileName);
>             while ((data = reader.readLine()) != null && data.length()>0) {
>                 count++;
>                     collector.emit(new Values(data),data);
>             }
> 
>         } catch (IOException e) {
>             System.out.println(" file Not found :" + e);
>         } finally {
>             try {
>                 if (reader != null) {
>                     reader.close();
>                     System.out.println(" reader is closed successfully !");
>                 }
>             } catch (IOException e) {
>                 System.out.println("reader is not closed properly .." + e);
>             }
>         }
> 
>     }
> 
>     public String getFileNames(String folderpath, String extention) {
>         List<String> fileNames = new ArrayList();
> 
>         File file = new File(folderpath);
>         final String ext = extention;
>         File[] files = file.listFiles(new FilenameFilter() {
>             @Override
>             public boolean accept(File dir, String name) {
>                 if (name.toLowerCase().endsWith(ext) &&
> !name.contains("ReadingComplete")) {
>                     return true;
>                 } else {
>                     return false;
>                 }
>             }
>         });
>         Arrays.sort(files,
> LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);
>         for (File f : files) {
>             return f.getAbsolutePath();
>         }
>         return "";
>     }
> 
> }
> 
> TestBolt.java
> ..........................
> 
> public class TestBolt implements IRichBolt {
>     OutputCollector collector;
>     BufferedWriter writer;
>     String filePath;
> @Override
>     public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>         this.collector = collector;
>         try {
>            
> System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
>             System.out.println(stormConf.get("outFile").toString()+"
> TESTBOLT ********* "+new Date()+"
> acker"+stormConf.get("topology.environment"));
>            
> System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
> 
>             writer = new BufferedWriter(new
> FileWriter(stormConf.get("outFile").toString(),true));
>         } catch (Exception e) {
>            e.printStackTrace();
>         }
>     }
> 
>     @Override
>     public void execute(Tuple input) {
>         System.out.println("^^^^^^^^^^^^^^^^^^");
>         System.out.println(" tuple data is :" + input.getString(0));
>         try {
>             collector.ack(input);
>             collector.emit(new Values(input));
>             writer.write(input.getString(0) + " \n");
>             writer.flush();
> 
>         } catch (Exception e) {
> e.printStackTrace();
>         }
> 
>     }
> @Override
>     public void cleanup() {
>         System.out.println(" in side clean up");
>     }
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("data"));
>     }
> @Override
>     public Map<String, Object> getComponentConfiguration() {
>         return null;
>     }
> }
> 
> 
> 
> 
> For Cluster Mode folloowing to execute above app
> *
> *
> * storm jar /apps/apache-storm-0.9.3/lib/storm-parallelism-ex.jar
>  com.jesus.storm.MainApplication  5  /FS/testFiles/stormfiles
> /FS/testFiles/result/result.txt MAIN 2*
> 
> 
> in above i used 5 executors and 2 workers , even i increase the number
> of worker also  no change 
> please help me
> 
> 
> 
> 
> *THANK YOU FOR YOUR VALUABLE HELP *Emoji
> 
> Regard's
> prasad.ch
> 
> 
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to