HI Matthias, I observed multiple bolt tasks are displayed in storm ui.for each bolt task randomly distributed around 500.
normally when i run in local mode with any number of executors i got correct results. while deploying in cluster mode with 2 executors for bolt and two executors for spout and two workers then i got correct results.when i increase executors for bolt or spout or both ,and when increase workers also duplicate tuples are processed .even when i take small dataset(10 files each 100 records) also but any number of executors for spout component and any number of workers also i get correct result yes or no? my self i consider it is yes.please correct me if anything wrong? is there any relation like executors and workers i mean we can't use number of workers more than executors anything please tell me. Q) in storm ,it process tuples atleast once , i mean possible of processing same tuple more than once? while in trident they said exactly once processing ? Even when i submit same example with 4 executors for spout and 4 executors for bolt and only one worker also i am not getting correct results i got duplicate tuples ? please help me > Date: Wed, 6 May 2015 13:57:45 +0200 > From: [email protected] > To: [email protected] > Subject: Re: Unable to Process Tuples In Storm cluster mode > > If I get it right, you have multiple bolt tasks writing to the same > output file. Because your data set is quite small, I would assume, that > some tasks receive data later then other, and overwrite the result of > previously finished tasks. > > If you ensure, that all 5 bolt tasks are located on different hosts, it > should work. > > Did you check the Storm UI to see if > * multiple bolt tasks are deployed to the same node and > * how many tuples are processed by each bolt taks? > > > -Matthias > > > On 05/06/2015 01:31 PM, prasad ch wrote: > > HI, > > > > > > I designed a storm sample topology, which is contains a spout ,and bolt. > > Here am reading from files(currently i have 10 files each file having > > 1000 records) using spout and process to bolt. > > In bolt ,am processing to receiving tuples by writing into a file . > > > > for my example totally my spout emitting 10000 records ,while my bolt > > also receive 10000 tuples and write into file. > > Here problem is when i run local mode everything is fine.but when i run > > in cluster mode with any number of workers , > > my bolt will receive or write 5000 or 7000 (randomly )records into > > file. am loss the data please help me ! > > > > > > below is the my application code. > > ---------------------------------------------- > > MainApplication .java > > ...................................... > > public class MainApplication { > > public static void main(String[] args) throws Exception { > > Config con = new Config(); > > con.setDebug(true); > > int i = Integer.parseInt(args[0]); > > con.put("fileLocation", args[1]); > > con.put("outFile", args[2]); > > con.put("ext", ".txt"); > > con.setNumWorkers(Integer.parseInt(args[4])); > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("FileREADING-SPOUT", new FileReadingSpout(), i); > > builder.setBolt("TEST-BOLT", new TestBolt(), > > i).shuffleGrouping("FileREADING-SPOUT"); > > StormSubmitter.submitTopology(args[3], con, > > builder.createTopology()); } } } > > > > > > FileReadingSpout.java > > .................................. > > > > > > public class FileReadingSpout implements IRichSpout { > > static int count=0; > > long windowTime; > > SpoutOutputCollector collector; > > TopologyContext context; > > String fileLocation, ext; > > @Override > > public void open(Map conf, TopologyContext context, > > SpoutOutputCollector collector) { > > try { > > > > > > > > System.out.println("********************************************************************"); > > System.out.println(" FileReadingSpout ********* "+new > > Date()+" acker"+conf.get("TOPOLOGY_MESSAGE_TIMEOUT_SECS")); > > > > > > System.out.println("********************************************************************"); > > > > this.context = context; > > this.collector = collector; > > this.fileLocation = conf.get("fileLocation").toString(); > > this.ext = conf.get("ext").toString(); > > } catch (Exception e) { > > e.printStackTrace(); > > } > > } > > > > @Override > > public void close() { > > } > > > > @Override > > public void activate() { > > } > > > > @Override > > public void deactivate() { > > } > > > > @Override > > public void nextTuple() { > > String line; > > try { > > System.out.println(Thread.currentThread().getName() + "in > > side ReadingSpout ...*" + new Date()); > > String l = getFileNames(fileLocation, ext); > > if (l.length() > 0) { > > System.out.println("** list of files count:" + l); > > File oldName = new File(l); > > String file = l.replace(".txt", ".Reading"); > > File newName = new File(file); > > oldName.renameTo(newName); > > readFiles(file); > > } > > } catch (Exception e) { > > e.printStackTrace(); > > } > > > > } > > > > @Override > > public void ack(Object msgId) { > > System.out.println(count+" In side ack ::* " + msgId); > > } > > > > @Override > > public void fail(Object msgId) { > > } > > > > @Override > > public void declareOutputFields(OutputFieldsDeclarer declarer) { > > > > declarer.declare(new Fields("line")); > > > > } > > > > @Override > > public Map<String, Object> getComponentConfiguration() { > > return null; > > } > > > > public static long stringToLong(String st) { > > long result = 0L; > > try { > > if (st != null) { > > SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd > > HH:mm:ss"); > > Date d = sdf.parse(st); > > result = d.getTime(); > > } > > } catch (Exception e) { > > e.printStackTrace(); > > } > > return result; > > } > > > > public void readFiles(String fileName) { > > String data = null; > > String arr[]; > > BufferedReader reader = null; > > try { > > reader = new BufferedReader(new FileReader(fileName)); > > > > System.out.println("=====================*===================="); > > System.out.println(" current reading fileName is :" + > > fileName); > > while ((data = reader.readLine()) != null && data.length()>0) { > > count++; > > collector.emit(new Values(data),data); > > } > > > > } catch (IOException e) { > > System.out.println(" file Not found :" + e); > > } finally { > > try { > > if (reader != null) { > > reader.close(); > > System.out.println(" reader is closed successfully !"); > > } > > } catch (IOException e) { > > System.out.println("reader is not closed properly .." + e); > > } > > } > > > > } > > > > public String getFileNames(String folderpath, String extention) { > > List<String> fileNames = new ArrayList(); > > > > File file = new File(folderpath); > > final String ext = extention; > > File[] files = file.listFiles(new FilenameFilter() { > > @Override > > public boolean accept(File dir, String name) { > > if (name.toLowerCase().endsWith(ext) && > > !name.contains("ReadingComplete")) { > > return true; > > } else { > > return false; > > } > > } > > }); > > Arrays.sort(files, > > LastModifiedFileComparator.LASTMODIFIED_COMPARATOR); > > for (File f : files) { > > return f.getAbsolutePath(); > > } > > return ""; > > } > > > > } > > > > TestBolt.java > > .......................... > > > > public class TestBolt implements IRichBolt { > > OutputCollector collector; > > BufferedWriter writer; > > String filePath; > > @Override > > public void prepare(Map stormConf, TopologyContext context, > > OutputCollector collector) { > > this.collector = collector; > > try { > > > > System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"); > > System.out.println(stormConf.get("outFile").toString()+" > > TESTBOLT ********* "+new Date()+" > > acker"+stormConf.get("topology.environment")); > > > > System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"); > > > > writer = new BufferedWriter(new > > FileWriter(stormConf.get("outFile").toString(),true)); > > } catch (Exception e) { > > e.printStackTrace(); > > } > > } > > > > @Override > > public void execute(Tuple input) { > > System.out.println("^^^^^^^^^^^^^^^^^^"); > > System.out.println(" tuple data is :" + input.getString(0)); > > try { > > collector.ack(input); > > collector.emit(new Values(input)); > > writer.write(input.getString(0) + " \n"); > > writer.flush(); > > > > } catch (Exception e) { > > e.printStackTrace(); > > } > > > > } > > @Override > > public void cleanup() { > > System.out.println(" in side clean up"); > > } > > @Override > > public void declareOutputFields(OutputFieldsDeclarer declarer) { > > declarer.declare(new Fields("data")); > > } > > @Override > > public Map<String, Object> getComponentConfiguration() { > > return null; > > } > > } > > > > > > > > > > For Cluster Mode folloowing to execute above app > > * > > * > > * storm jar /apps/apache-storm-0.9.3/lib/storm-parallelism-ex.jar > > com.jesus.storm.MainApplication 5 /FS/testFiles/stormfiles > > /FS/testFiles/result/result.txt MAIN 2* > > > > > > in above i used 5 executors and 2 workers , even i increase the number > > of worker also no change > > please help me > > > > > > > > > > *THANK YOU FOR YOUR VALUABLE HELP *Emoji > > > > Regard's > > prasad.ch > > > > > > > > > > >
