Good point, but it shouldn't matter how many exclamation bolts there are. The number of spouts does because they are all reading the same file. On May 6, 2014 9:37 AM, "padma priya chitturi" <[email protected]> wrote:
> Hi, > > The issue lies with the number of tasks/executors specified for spout. Try > specifying 1 spout and see if you could see duplicates. I suppose there > would be no duplicates in specifying 1 spout and 1 exclamtion bolt. > > > On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh <[email protected] > > wrote: > >> HI ,Nathan and thank you for responding ,I appreciate it ! >> no I'm not , I just run this topology for the first time >> >> >> >> >> >> >> public class Ex { >> >> public static class ExclamationBolt extends BaseRichBolt { >> OutputCollector _collector; >> >> @Override >> public void prepare(Map conf, TopologyContext context, >> OutputCollector collector) { >> _collector = collector; >> } >> >> @Override >> public void execute(Tuple tuple) { >> _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); >> _collector.ack(tuple); >> } >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer declarer) { >> declarer.declare(new Fields("word")); >> } >> >> >> } >> >> public static void main(String[] args) throws Exception { >> TopologyBuilder builder = new TopologyBuilder(); >> TryRead T = new TryRead(); >> PrinterBolty P = new PrinterBolty(); >> builder.setSpout("word", T, 10); >> builder.setBolt("exclaim1", new ExclamationBolt(), >> 3).shuffleGrouping("word"); >> builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1"); >> >> >> >> ------------------------------------------------------------------------------------- >> my spout Tryread >> >> >> public class TryRead extends BaseRichSpout { >> SpoutOutputCollector _collector; >> Random _rand; >> BufferedReader fileReader; >> FileSystem f; >> WatchService watcher ; >> String S; >> Path dir; >> >> @Override >> public void open(Map conf, TopologyContext context, >> SpoutOutputCollector collector) { >> _collector = collector; >> >> >> try{ >> f = FileSystems.getDefault(); >> watcher = f.newWatchService(); >> >> S="/root/src/storm-starter/src/jvm/storm/starter"; >> >> dir = f.getPath(S); >> >> dir.register(watcher, ENTRY_CREATE); >> >> } catch (IOException e) { >> >> e.printStackTrace(); >> } >> >> } >> } >> >> @Override >> public void nextTuple() { >> >> >> Utils.sleep(2000); >> for (;;) { >> >> // wait for key to be signaled >> WatchKey key; >> try { >> key = watcher.take(); >> } catch (InterruptedException x) { >> return; >> } >> >> for (WatchEvent<?> event: key.pollEvents()) { >> WatchEvent.Kind kind = event.kind(); >> if (kind == OVERFLOW) { >> continue; >> } >> >> >> WatchEvent<Path> ev = (WatchEvent<Path>)event; >> Path filename = ev.context(); >> >> System.out.format("Emailing file %s%n", filename); >> try { >> >> >> fileReader = new >> BufferedReader(new FileReader(new File(S+"/"+filename))); >> >> RandomAccessFile access = >> null; >> String line = null; >> try >> { >> while ((line = fileReader.readLine()) >> != null) >> { >> if (line !=null) >> { >> >> _collector.emit(new >> Values(line)); >> } >> } >> } catch (IOException e) { >> // TODO Auto-generated >> catch block >> e.printStackTrace(); >> } >> } catch (FileNotFoundException e) >> { >> // TODO Auto-generated >> catch block >> e.printStackTrace(); >> } >> } >> } >> >> boolean valid = key.reset(); >> if (!valid) { >> break; >> } >> } >> >> >> } >> >> @Override >> public void ack(Object id) { >> } >> >> @Override >> public void fail(Object id) { >> } >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer declarer) { >> declarer.declare(new Fields("word")); >> } >> >> >> >> 2014-05-06 12:35 GMT+01:00 Nathan Leung <[email protected]>: >> >> You are creating your file writer with append set to true. It's it >>> possible your topology was run more than once? >>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <[email protected]> >>> wrote: >>> >>>> I'm using a bolt that receives tuples from another bolt (exclamation >>>> bolt ) and writes it on a file , the problem I got is that I have >>>> duplicated results four times , like when I emit a word , I found the word >>>> Written four times . where's the problem possibly could be ? >>>> >>>> >>>> >>>> >>>> public class PrinterBolty extends BaseBasicBolt { >>>> >>>> @Override >>>> public void execute(Tuple tuple, BasicOutputCollector collector) { >>>> >>>> try { >>>> >>>> BufferedWriter output; >>>> output = new BufferedWriter(new >>>> FileWriter("/root/src/storm-starter/hh.txt", true)); >>>> output.newLine(); >>>> output.append(tuple.getString(0)); >>>> output.close(); >>>> >>>> } catch (IOException e) { >>>> // TODO Auto-generated catch block >>>> e.printStackTrace(); >>>> } >>>> } >>>> >>>> @Override >>>> public void declareOutputFields(OutputFieldsDeclarer ofd) { >>>> } >>>> >>>> } >>>> >>>> >>>> -- >>>> *Al Fartakh Bilal* >>>> >>> >> >> >> -- >> *Al Fartakh Bilal* >> > >
