Hi, The issue lies with the number of tasks/executors specified for spout. Try specifying 1 spout and see if you could see duplicates. I suppose there would be no duplicates in specifying 1 spout and 1 exclamtion bolt.
On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh <[email protected]>wrote: > HI ,Nathan and thank you for responding ,I appreciate it ! > no I'm not , I just run this topology for the first time > > > > > > > public class Ex { > > public static class ExclamationBolt extends BaseRichBolt { > OutputCollector _collector; > > @Override > public void prepare(Map conf, TopologyContext context, OutputCollector > collector) { > _collector = collector; > } > > @Override > public void execute(Tuple tuple) { > _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); > _collector.ack(tuple); > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("word")); > } > > > } > > public static void main(String[] args) throws Exception { > TopologyBuilder builder = new TopologyBuilder(); > TryRead T = new TryRead(); > PrinterBolty P = new PrinterBolty(); > builder.setSpout("word", T, 10); > builder.setBolt("exclaim1", new ExclamationBolt(), > 3).shuffleGrouping("word"); > builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1"); > > > > ------------------------------------------------------------------------------------- > my spout Tryread > > > public class TryRead extends BaseRichSpout { > SpoutOutputCollector _collector; > Random _rand; > BufferedReader fileReader; > FileSystem f; > WatchService watcher ; > String S; > Path dir; > > @Override > public void open(Map conf, TopologyContext context, > SpoutOutputCollector collector) { > _collector = collector; > > > try{ > f = FileSystems.getDefault(); > watcher = f.newWatchService(); > > S="/root/src/storm-starter/src/jvm/storm/starter"; > > dir = f.getPath(S); > > dir.register(watcher, ENTRY_CREATE); > > } catch (IOException e) { > > e.printStackTrace(); > } > > } > } > > @Override > public void nextTuple() { > > > Utils.sleep(2000); > for (;;) { > > // wait for key to be signaled > WatchKey key; > try { > key = watcher.take(); > } catch (InterruptedException x) { > return; > } > > for (WatchEvent<?> event: key.pollEvents()) { > WatchEvent.Kind kind = event.kind(); > if (kind == OVERFLOW) { > continue; > } > > > WatchEvent<Path> ev = (WatchEvent<Path>)event; > Path filename = ev.context(); > > System.out.format("Emailing file %s%n", filename); > try { > > > fileReader = new > BufferedReader(new FileReader(new File(S+"/"+filename))); > > RandomAccessFile access = > null; > String line = null; > try > { > while ((line = fileReader.readLine()) > != null) > { > if (line !=null) > { > > _collector.emit(new > Values(line)); > } > } > } catch (IOException e) { > // TODO Auto-generated > catch block > e.printStackTrace(); > } > } catch (FileNotFoundException e) { > // TODO Auto-generated > catch block > e.printStackTrace(); > } > } > } > > boolean valid = key.reset(); > if (!valid) { > break; > } > } > > > } > > @Override > public void ack(Object id) { > } > > @Override > public void fail(Object id) { > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("word")); > } > > > > 2014-05-06 12:35 GMT+01:00 Nathan Leung <[email protected]>: > > You are creating your file writer with append set to true. It's it >> possible your topology was run more than once? >> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <[email protected]> >> wrote: >> >>> I'm using a bolt that receives tuples from another bolt (exclamation >>> bolt ) and writes it on a file , the problem I got is that I have >>> duplicated results four times , like when I emit a word , I found the word >>> Written four times . where's the problem possibly could be ? >>> >>> >>> >>> >>> public class PrinterBolty extends BaseBasicBolt { >>> >>> @Override >>> public void execute(Tuple tuple, BasicOutputCollector collector) { >>> >>> try { >>> >>> BufferedWriter output; >>> output = new BufferedWriter(new >>> FileWriter("/root/src/storm-starter/hh.txt", true)); >>> output.newLine(); >>> output.append(tuple.getString(0)); >>> output.close(); >>> >>> } catch (IOException e) { >>> // TODO Auto-generated catch block >>> e.printStackTrace(); >>> } >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer ofd) { >>> } >>> >>> } >>> >>> >>> -- >>> *Al Fartakh Bilal* >>> >> > > > -- > *Al Fartakh Bilal* >
