Thank you :D specifying 1 spout was the solution . thank you Padma and Nathan again for your help !
2014-05-06 14:40 GMT+01:00 Nathan Leung <[email protected]>: > Good point, but it shouldn't matter how many exclamation bolts there are. > The number of spouts does because they are all reading the same file. > On May 6, 2014 9:37 AM, "padma priya chitturi" <[email protected]> > wrote: > >> Hi, >> >> The issue lies with the number of tasks/executors specified for spout. >> Try specifying 1 spout and see if you could see duplicates. I suppose there >> would be no duplicates in specifying 1 spout and 1 exclamtion bolt. >> >> >> On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh < >> [email protected]> wrote: >> >>> HI ,Nathan and thank you for responding ,I appreciate it ! >>> no I'm not , I just run this topology for the first time >>> >>> >>> >>> >>> >>> >>> public class Ex { >>> >>> public static class ExclamationBolt extends BaseRichBolt { >>> OutputCollector _collector; >>> >>> @Override >>> public void prepare(Map conf, TopologyContext context, >>> OutputCollector collector) { >>> _collector = collector; >>> } >>> >>> @Override >>> public void execute(Tuple tuple) { >>> _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); >>> _collector.ack(tuple); >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> declarer.declare(new Fields("word")); >>> } >>> >>> >>> } >>> >>> public static void main(String[] args) throws Exception { >>> TopologyBuilder builder = new TopologyBuilder(); >>> TryRead T = new TryRead(); >>> PrinterBolty P = new PrinterBolty(); >>> builder.setSpout("word", T, 10); >>> builder.setBolt("exclaim1", new ExclamationBolt(), >>> 3).shuffleGrouping("word"); >>> builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1"); >>> >>> >>> >>> ------------------------------------------------------------------------------------- >>> my spout Tryread >>> >>> >>> public class TryRead extends BaseRichSpout { >>> SpoutOutputCollector _collector; >>> Random _rand; >>> BufferedReader fileReader; >>> FileSystem f; >>> WatchService watcher ; >>> String S; >>> Path dir; >>> >>> @Override >>> public void open(Map conf, TopologyContext context, >>> SpoutOutputCollector collector) { >>> _collector = collector; >>> >>> >>> try{ >>> f = FileSystems.getDefault(); >>> watcher = f.newWatchService(); >>> >>> S="/root/src/storm-starter/src/jvm/storm/starter"; >>> >>> dir = f.getPath(S); >>> >>> dir.register(watcher, ENTRY_CREATE); >>> >>> } catch (IOException e) { >>> >>> e.printStackTrace(); >>> } >>> >>> } >>> } >>> >>> @Override >>> public void nextTuple() { >>> >>> >>> Utils.sleep(2000); >>> for (;;) { >>> >>> // wait for key to be signaled >>> WatchKey key; >>> try { >>> key = watcher.take(); >>> } catch (InterruptedException x) { >>> return; >>> } >>> >>> for (WatchEvent<?> event: key.pollEvents()) { >>> WatchEvent.Kind kind = event.kind(); >>> if (kind == OVERFLOW) { >>> continue; >>> } >>> >>> >>> WatchEvent<Path> ev = (WatchEvent<Path>)event; >>> Path filename = ev.context(); >>> >>> System.out.format("Emailing file %s%n", >>> filename); >>> try { >>> >>> >>> fileReader = new >>> BufferedReader(new FileReader(new File(S+"/"+filename))); >>> >>> RandomAccessFile access >>> = null; >>> String line = null; >>> try >>> { >>> while ((line = fileReader.readLine()) >>> != null) >>> { >>> if (line !=null) >>> { >>> >>> _collector.emit(new >>> Values(line)); >>> } >>> } >>> } catch (IOException e) { >>> // TODO Auto-generated >>> catch block >>> e.printStackTrace(); >>> } >>> } catch (FileNotFoundException >>> e) { >>> // TODO Auto-generated >>> catch block >>> e.printStackTrace(); >>> } >>> } >>> } >>> >>> boolean valid = key.reset(); >>> if (!valid) { >>> break; >>> } >>> } >>> >>> >>> } >>> >>> @Override >>> public void ack(Object id) { >>> } >>> >>> @Override >>> public void fail(Object id) { >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) >>> { >>> declarer.declare(new Fields("word")); >>> } >>> >>> >>> >>> 2014-05-06 12:35 GMT+01:00 Nathan Leung <[email protected]>: >>> >>> You are creating your file writer with append set to true. It's it >>>> possible your topology was run more than once? >>>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <[email protected]> >>>> wrote: >>>> >>>>> I'm using a bolt that receives tuples from another bolt (exclamation >>>>> bolt ) and writes it on a file , the problem I got is that I have >>>>> duplicated results four times , like when I emit a word , I found the word >>>>> Written four times . where's the problem possibly could be ? >>>>> >>>>> >>>>> >>>>> >>>>> public class PrinterBolty extends BaseBasicBolt { >>>>> >>>>> @Override >>>>> public void execute(Tuple tuple, BasicOutputCollector collector) { >>>>> >>>>> try { >>>>> >>>>> BufferedWriter output; >>>>> output = new BufferedWriter(new >>>>> FileWriter("/root/src/storm-starter/hh.txt", true)); >>>>> output.newLine(); >>>>> output.append(tuple.getString(0)); >>>>> output.close(); >>>>> >>>>> } catch (IOException e) { >>>>> // TODO Auto-generated catch block >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> @Override >>>>> public void declareOutputFields(OutputFieldsDeclarer ofd) { >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> -- >>>>> *Al Fartakh Bilal* >>>>> >>>> >>> >>> >>> -- >>> *Al Fartakh Bilal* >>> >> >> -- *Al Fartakh Bilal*
