Can you share the contents of your input and output files?
On May 6, 2014 7:54 AM, "Bilal Al Fartakh" <[email protected]> wrote:

> HI ,Nathan and thank you for responding ,I appreciate it  !
> no I'm not , I just run this topology for the first time
>
>
>
>
>
>
> public class Ex {
>
>   public static class ExclamationBolt extends BaseRichBolt {
>     OutputCollector _collector;
>
>     @Override
>     public void prepare(Map conf, TopologyContext context, OutputCollector
> collector) {
>       _collector = collector;
>     }
>
>     @Override
>     public void execute(Tuple tuple) {
>       _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
>       _collector.ack(tuple);
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>       declarer.declare(new Fields("word"));
>     }
>
>
>   }
>
>   public static void main(String[] args) throws Exception {
>     TopologyBuilder builder = new TopologyBuilder();
> TryRead T = new TryRead();
> PrinterBolty P = new PrinterBolty();
>     builder.setSpout("word", T, 10);
>     builder.setBolt("exclaim1", new ExclamationBolt(),
> 3).shuffleGrouping("word");
>     builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>
>
>
> -------------------------------------------------------------------------------------
> my spout Tryread
>
>
> public class TryRead extends BaseRichSpout {
>           SpoutOutputCollector _collector;
>           Random _rand;
>           BufferedReader fileReader;
>           FileSystem f;
>           WatchService watcher ;
>           String S;
>           Path dir;
>
>           @Override
>           public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
>                   _collector = collector;
>
>
> try{
>                         f = FileSystems.getDefault();
>                                  watcher = f.newWatchService();
>
>  S="/root/src/storm-starter/src/jvm/storm/starter";
>
>                                  dir = f.getPath(S);
>
>                                 dir.register(watcher, ENTRY_CREATE);
>
> } catch (IOException e) {
>
>                                                 e.printStackTrace();
>                                         }
>
>           }
>  }
>
>           @Override
>           public void nextTuple() {
>
>
>                   Utils.sleep(2000);
>                    for (;;) {
>
>                     // wait for key to be signaled
>                     WatchKey key;
>                     try {
>                         key = watcher.take();
>                     } catch (InterruptedException x) {
>                         return;
>                     }
>
>                     for (WatchEvent<?> event: key.pollEvents()) {
>                         WatchEvent.Kind kind = event.kind();
>  if (kind == OVERFLOW) {
>                             continue;
>                         }
>
>
>                         WatchEvent<Path> ev = (WatchEvent<Path>)event;
>                         Path filename = ev.context();
>
>                         System.out.format("Emailing file %s%n", filename);
>                         try {
>
>
>                                                  fileReader = new
> BufferedReader(new FileReader(new File(S+"/"+filename)));
>
>                                                 RandomAccessFile access =
> null;
>                             String line = null;
>                                try
>                                {
>                                    while ((line = fileReader.readLine())
> != null)
>                                    {
>                                        if (line !=null)
>                                        {
>
>                                            _collector.emit(new
> Values(line));
>                                        }
>                                    }
>                                } catch (IOException e) {
>                                                 // TODO Auto-generated
> catch block
>                                                 e.printStackTrace();
>                                         }
>                                         } catch (FileNotFoundException e) {
>                                                 // TODO Auto-generated
> catch block
>                                                 e.printStackTrace();
>                                         }
>                     }
>    }
>
>                     boolean valid = key.reset();
>                     if (!valid) {
>                             break;
>                     }
>                 }
>
>
>           }
>
>           @Override
>           public void ack(Object id) {
>           }
>
>           @Override
>           public void fail(Object id) {
>           }
>
>           @Override
>           public void declareOutputFields(OutputFieldsDeclarer declarer) {
>             declarer.declare(new Fields("word"));
>           }
>
>
>
> 2014-05-06 12:35 GMT+01:00 Nathan Leung <[email protected]>:
>
>> You are creating your file writer with append set to true. It's it
>> possible your topology was run more than once?
>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <[email protected]>
>> wrote:
>>
>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>> bolt ) and writes it on a file , the problem I got is that I have
>>> duplicated results four times , like when I emit a word , I found the word
>>> Written four times . where's the problem possibly could be ?
>>>
>>>
>>>
>>>
>>> public class PrinterBolty extends BaseBasicBolt {
>>>
>>>   @Override
>>>   public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>
>>> try {
>>>
>>>                         BufferedWriter output;
>>>                         output = new BufferedWriter(new
>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>>                         output.newLine();
>>>                         output.append(tuple.getString(0));
>>>                         output.close();
>>>
>>>                 } catch (IOException e) {
>>>                         // TODO Auto-generated catch block
>>>                         e.printStackTrace();
>>>                 }
>>>  }
>>>
>>>   @Override
>>>   public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>   }
>>>
>>> }
>>>
>>>
>>> --
>>> *Al Fartakh Bilal*
>>>
>>
>
>
> --
> *Al Fartakh Bilal*
>

Reply via email to