HI Matthias,
I observed multiple bolt tasks are displayed in storm ui.for each bolt task 
randomly distributed around 500.

normally when i run in local mode with any number of executors i got correct 
results. while  deploying  in cluster mode with 2 executors for bolt and two 
executors for spout and two workers then i got correct results.when i increase 
executors for bolt or spout or both ,and when increase workers also duplicate 
tuples are processed .even when i take small dataset(10 files each 100 records) 
also but any number of executors for spout component and any number of workers 
also i get correct result  yes or no? my self i consider it is yes.please 
correct me if anything wrong?
is there any relation like executors and workers  i mean we can't use number of 
workers more than executors  anything please tell me.
Q) in storm ,it process tuples atleast  once , i mean possible of processing 
same tuple more than once? while in trident they said exactly once processing ?

Even when i submit same example with 4 executors for spout and 4 executors for 
bolt and only one worker also i am not getting correct results i got  duplicate 
tuples ?

please help me





> Date: Wed, 6 May 2015 13:57:45 +0200
> From: [email protected]
> To: [email protected]
> Subject: Re: Unable to Process Tuples In Storm cluster mode
> 
> If I get it right, you have multiple bolt tasks writing to the same
> output file. Because your data set is quite small, I would assume, that
> some tasks receive data later then other, and overwrite the result of
> previously finished tasks.
> 
> If you ensure, that all 5 bolt tasks are located on different hosts, it
> should work.
> 
> Did you check the Storm UI to see if
>  * multiple bolt tasks are deployed to the same node and
>  * how many tuples are processed by each bolt taks?
> 
> 
> -Matthias
> 
> 
> On 05/06/2015 01:31 PM, prasad ch wrote:
> > HI,
> > 
> > 
> > I designed a storm sample topology, which is contains a spout ,and bolt.
> > Here am reading from files(currently i have 10 files each file having
> > 1000 records) using spout and process to bolt.
> > In bolt ,am processing  to receiving tuples by writing into a file .
> > 
> > for my example totally my spout emitting 10000 records ,while my bolt
> > also receive 10000 tuples and write into file.
> > Here problem is when i run local mode  everything is fine.but when i run
> > in cluster mode with any number of workers ,
> > my bolt will receive or write 5000  or 7000 (randomly )records into
> > file. am loss the data please help me !
> > 
> > 
> > below is the my application code.
> > ----------------------------------------------
> >    MainApplication .java
> > ......................................
> > public class MainApplication {
> >     public static void main(String[] args) throws Exception {
> >         Config con = new Config();
> >         con.setDebug(true);
> >         int i = Integer.parseInt(args[0]);
> >         con.put("fileLocation", args[1]);
> >         con.put("outFile", args[2]);
> >         con.put("ext", ".txt");
> >         con.setNumWorkers(Integer.parseInt(args[4]));
> >         TopologyBuilder builder = new TopologyBuilder();
> >         builder.setSpout("FileREADING-SPOUT", new FileReadingSpout(), i);
> >         builder.setBolt("TEST-BOLT", new TestBolt(),
> > i).shuffleGrouping("FileREADING-SPOUT");
> >             StormSubmitter.submitTopology(args[3], con,
> > builder.createTopology());   } } }
> > 
> > 
> > FileReadingSpout.java
> > ..................................
> > 
> > 
> > public class FileReadingSpout implements IRichSpout {
> >     static int count=0;
> >     long windowTime;
> >     SpoutOutputCollector collector;
> >     TopologyContext context;
> >     String fileLocation, ext;
> >        @Override
> >     public void open(Map conf, TopologyContext context,
> > SpoutOutputCollector collector) {
> >         try {
> > 
> >            
> >  
> > System.out.println("********************************************************************");
> >             System.out.println(" FileReadingSpout  ********* "+new
> > Date()+" acker"+conf.get("TOPOLOGY_MESSAGE_TIMEOUT_SECS"));
> >            
> >  
> > System.out.println("********************************************************************");
> >             
> >             this.context = context;
> >             this.collector = collector;
> >             this.fileLocation = conf.get("fileLocation").toString();
> >             this.ext = conf.get("ext").toString();
> >         } catch (Exception e) {
> >             e.printStackTrace();
> >         }
> >     }
> > 
> >     @Override
> >     public void close() {
> >     }
> > 
> >     @Override
> >     public void activate() {
> >     }
> > 
> >     @Override
> >     public void deactivate() {
> >     }
> > 
> >     @Override
> >     public void nextTuple() {
> >         String line;
> >         try {
> >             System.out.println(Thread.currentThread().getName() + "in
> > side ReadingSpout ...*" + new Date());
> >             String l = getFileNames(fileLocation, ext);
> >             if (l.length() > 0) {
> >                 System.out.println("** list of files  count:" + l);
> >                 File oldName = new File(l);
> >                 String file = l.replace(".txt", ".Reading");
> >                 File newName = new File(file);
> >                 oldName.renameTo(newName);
> >                 readFiles(file);
> >             }
> >         } catch (Exception e) {
> >             e.printStackTrace();
> >         }
> > 
> >     }
> > 
> >     @Override
> >     public void ack(Object msgId) {
> >         System.out.println(count+" In side ack ::* " + msgId);
> >     }
> > 
> >     @Override
> >     public void fail(Object msgId) {
> >     }
> > 
> >     @Override
> >     public void declareOutputFields(OutputFieldsDeclarer declarer) {
> > 
> >         declarer.declare(new Fields("line"));
> >         
> >     }
> > 
> >     @Override
> >     public Map<String, Object> getComponentConfiguration() {
> >         return null;
> >     }
> > 
> >     public static long stringToLong(String st) {
> >         long result = 0L;
> >         try {
> >             if (st != null) {
> >                 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> > HH:mm:ss");
> >                 Date d = sdf.parse(st);
> >                 result = d.getTime();
> >             }
> >         } catch (Exception e) {
> >             e.printStackTrace();
> >         }
> >         return result;
> >     }
> > 
> >     public void readFiles(String fileName) {
> >         String data = null;
> >         String arr[];
> >         BufferedReader reader = null;
> >         try {
> >             reader = new BufferedReader(new FileReader(fileName));
> >            
> > System.out.println("=====================*====================");
> >             System.out.println(" current reading fileName is  :" +
> > fileName);
> >             while ((data = reader.readLine()) != null && data.length()>0) {
> >                 count++;
> >                     collector.emit(new Values(data),data);
> >             }
> > 
> >         } catch (IOException e) {
> >             System.out.println(" file Not found :" + e);
> >         } finally {
> >             try {
> >                 if (reader != null) {
> >                     reader.close();
> >                     System.out.println(" reader is closed successfully !");
> >                 }
> >             } catch (IOException e) {
> >                 System.out.println("reader is not closed properly .." + e);
> >             }
> >         }
> > 
> >     }
> > 
> >     public String getFileNames(String folderpath, String extention) {
> >         List<String> fileNames = new ArrayList();
> > 
> >         File file = new File(folderpath);
> >         final String ext = extention;
> >         File[] files = file.listFiles(new FilenameFilter() {
> >             @Override
> >             public boolean accept(File dir, String name) {
> >                 if (name.toLowerCase().endsWith(ext) &&
> > !name.contains("ReadingComplete")) {
> >                     return true;
> >                 } else {
> >                     return false;
> >                 }
> >             }
> >         });
> >         Arrays.sort(files,
> > LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);
> >         for (File f : files) {
> >             return f.getAbsolutePath();
> >         }
> >         return "";
> >     }
> > 
> > }
> > 
> > TestBolt.java
> > ..........................
> > 
> > public class TestBolt implements IRichBolt {
> >     OutputCollector collector;
> >     BufferedWriter writer;
> >     String filePath;
> > @Override
> >     public void prepare(Map stormConf, TopologyContext context,
> > OutputCollector collector) {
> >         this.collector = collector;
> >         try {
> >            
> > System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
> >             System.out.println(stormConf.get("outFile").toString()+"
> > TESTBOLT ********* "+new Date()+"
> > acker"+stormConf.get("topology.environment"));
> >            
> > System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
> > 
> >             writer = new BufferedWriter(new
> > FileWriter(stormConf.get("outFile").toString(),true));
> >         } catch (Exception e) {
> >            e.printStackTrace();
> >         }
> >     }
> > 
> >     @Override
> >     public void execute(Tuple input) {
> >         System.out.println("^^^^^^^^^^^^^^^^^^");
> >         System.out.println(" tuple data is :" + input.getString(0));
> >         try {
> >             collector.ack(input);
> >             collector.emit(new Values(input));
> >             writer.write(input.getString(0) + " \n");
> >             writer.flush();
> > 
> >         } catch (Exception e) {
> > e.printStackTrace();
> >         }
> > 
> >     }
> > @Override
> >     public void cleanup() {
> >         System.out.println(" in side clean up");
> >     }
> >     @Override
> >     public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >         declarer.declare(new Fields("data"));
> >     }
> > @Override
> >     public Map<String, Object> getComponentConfiguration() {
> >         return null;
> >     }
> > }
> > 
> > 
> > 
> > 
> > For Cluster Mode folloowing to execute above app
> > *
> > *
> > * storm jar /apps/apache-storm-0.9.3/lib/storm-parallelism-ex.jar
> >  com.jesus.storm.MainApplication  5  /FS/testFiles/stormfiles
> > /FS/testFiles/result/result.txt MAIN 2*
> > 
> > 
> > in above i used 5 executors and 2 workers , even i increase the number
> > of worker also  no change 
> > please help me
> > 
> > 
> > 
> > 
> > *THANK YOU FOR YOUR VALUABLE HELP *Emoji
> > 
> > Regard's
> > prasad.ch
> > 
> > 
> > 
> > 
> > 
> 
                                          

Reply via email to