Can you share the contents of your input and output files?
On May 6, 2014 7:54 AM, "Bilal Al Fartakh" <[email protected]> wrote:
> HI ,Nathan and thank you for responding ,I appreciate it !
> no I'm not , I just run this topology for the first time
>
>
>
>
>
>
> public class Ex {
>
> public static class ExclamationBolt extends BaseRichBolt {
> OutputCollector _collector;
>
> @Override
> public void prepare(Map conf, TopologyContext context, OutputCollector
> collector) {
> _collector = collector;
> }
>
> @Override
> public void execute(Tuple tuple) {
> _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
> _collector.ack(tuple);
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("word"));
> }
>
>
> }
>
> public static void main(String[] args) throws Exception {
> TopologyBuilder builder = new TopologyBuilder();
> TryRead T = new TryRead();
> PrinterBolty P = new PrinterBolty();
> builder.setSpout("word", T, 10);
> builder.setBolt("exclaim1", new ExclamationBolt(),
> 3).shuffleGrouping("word");
> builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>
>
>
> -------------------------------------------------------------------------------------
> my spout Tryread
>
>
> public class TryRead extends BaseRichSpout {
> SpoutOutputCollector _collector;
> Random _rand;
> BufferedReader fileReader;
> FileSystem f;
> WatchService watcher ;
> String S;
> Path dir;
>
> @Override
> public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
> _collector = collector;
>
>
> try{
> f = FileSystems.getDefault();
> watcher = f.newWatchService();
>
> S="/root/src/storm-starter/src/jvm/storm/starter";
>
> dir = f.getPath(S);
>
> dir.register(watcher, ENTRY_CREATE);
>
> } catch (IOException e) {
>
> e.printStackTrace();
> }
>
> }
> }
>
> @Override
> public void nextTuple() {
>
>
> Utils.sleep(2000);
> for (;;) {
>
> // wait for key to be signaled
> WatchKey key;
> try {
> key = watcher.take();
> } catch (InterruptedException x) {
> return;
> }
>
> for (WatchEvent<?> event: key.pollEvents()) {
> WatchEvent.Kind kind = event.kind();
> if (kind == OVERFLOW) {
> continue;
> }
>
>
> WatchEvent<Path> ev = (WatchEvent<Path>)event;
> Path filename = ev.context();
>
> System.out.format("Emailing file %s%n", filename);
> try {
>
>
> fileReader = new
> BufferedReader(new FileReader(new File(S+"/"+filename)));
>
> RandomAccessFile access =
> null;
> String line = null;
> try
> {
> while ((line = fileReader.readLine())
> != null)
> {
> if (line !=null)
> {
>
> _collector.emit(new
> Values(line));
> }
> }
> } catch (IOException e) {
> // TODO Auto-generated
> catch block
> e.printStackTrace();
> }
> } catch (FileNotFoundException e) {
> // TODO Auto-generated
> catch block
> e.printStackTrace();
> }
> }
> }
>
> boolean valid = key.reset();
> if (!valid) {
> break;
> }
> }
>
>
> }
>
> @Override
> public void ack(Object id) {
> }
>
> @Override
> public void fail(Object id) {
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("word"));
> }
>
>
>
> 2014-05-06 12:35 GMT+01:00 Nathan Leung <[email protected]>:
>
>> You are creating your file writer with append set to true. It's it
>> possible your topology was run more than once?
>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <[email protected]>
>> wrote:
>>
>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>> bolt ) and writes it on a file , the problem I got is that I have
>>> duplicated results four times , like when I emit a word , I found the word
>>> Written four times . where's the problem possibly could be ?
>>>
>>>
>>>
>>>
>>> public class PrinterBolty extends BaseBasicBolt {
>>>
>>> @Override
>>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>
>>> try {
>>>
>>> BufferedWriter output;
>>> output = new BufferedWriter(new
>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>> output.newLine();
>>> output.append(tuple.getString(0));
>>> output.close();
>>>
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>> }
>>>
>>> }
>>>
>>>
>>> --
>>> *Al Fartakh Bilal*
>>>
>>
>
>
> --
> *Al Fartakh Bilal*
>