Basically something like this:

open(...) {
  BufferedReader reader = new BufferedReader(...)
}

nextTuple() {
  if(reader != null) {
    String line = reader.readLine();
    if(line != null) {
      collector.emit(line);
    } else {
      reader.close();
      reader = null;
    }
  }
}

There is no need to loop within nextTuple() because Storm is calling it
infinitely anyway. Furthermore, there is no need to sleep, because Strom
will apply a sleep-penalty automatically, if .emit(...) is not called
with nextTuple() (see
https://storm.apache.org/2012/09/06/storm081-released.html)

You should also close the reader after the file is processed. Closing is
in Spout.close() is not reliable, because Storm does not guarantee that
Spout.close() will be called.


-Matthias


On 06/05/2015 02:04 PM, Michail Toutoudakis wrote:
> Sorry for my stupid question, but what do you mean return from each emit?
> Add return; after emiting tuple os something else i haven’t understood?
> Could you post a code sample?
> 
>> On 5 Ιουν 2015, at 14:44, Matthias J. Sax
>> <[email protected] <mailto:[email protected]>>
>> wrote:
>>
>> Hi,
>>
>> I don't see big difference in both Spout implementations... They look
>> basically the same... I would still recommend, to return from
>> nextTuple() after each emit.
>>
>> If you don't want to return each time, another reason for missing values
>> might be the return statement in the while-loop:
>>
>>>                if (str.isEmpty()) {
>>>                    return;
>>>                }
>>
>> This should be "continue" -- otherwise you return from nextTuple()
>> without processing the whole file, if an empty line is in between.
>>
>>
>> -Matthias
>>
>> On 06/05/2015 11:16 AM, Michail Toutoudakis wrote:
>>> Sorry wrong code posted that was the bolts' code. The spout’ s code is:
>>>
>>> package tuc.LSH.storm.spouts;
>>>
>>> import backtype.storm.spout.SpoutOutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.IRichSpout;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichSpout;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Values;
>>> import backtype.storm.utils.Utils;
>>> import tuc.LSH.conf.Consts;
>>>
>>> import java.io.BufferedReader;
>>> import java.io.FileNotFoundException;
>>> import java.io.FileReader;
>>> import java.io.IOException;
>>> import java.util.Map;
>>>
>>> /**
>>> * Created by mixtou on 15/5/15.
>>> */
>>> //public class FileReaderSpout extends BaseRichSpout {
>>> public class FileReaderSpout implements IRichSpout {
>>>
>>>    private SpoutOutputCollector collector;
>>>    private FileReader fileReader;
>>>    private boolean completed;
>>>    private TopologyContext context;
>>>    private int spout_idx;
>>>    private int spout_id;
>>>    private Map config;
>>>    private int noOfFailedWords;
>>>    private int noOfAckedWords;
>>> //    private BufferedReader reader;
>>>
>>>
>>>    @Override
>>>    public void declareOutputFields(OutputFieldsDeclarer
>>> outputFieldsDeclarer) {
>>>        outputFieldsDeclarer.declareStream("data", new
>>> Fields("streamId", "timestamp", "value"));
>>>
>>>
>>>    }
>>>
>>>    @Override
>>>    public void open(Map config, TopologyContext topologyContext,
>>> SpoutOutputCollector spoutOutputCollector) {
>>>        this.context = topologyContext;
>>>        this.spout_idx = context.getThisTaskIndex();
>>>        this.spout_id = context.getThisTaskId();
>>>        this.collector = spoutOutputCollector;
>>>        this.config = config;
>>>        this.completed = false;
>>>        this.noOfFailedWords = 0;
>>>        this.noOfAckedWords = 0;
>>>
>>>        try {
>>>            System.err.println("Reading File: " +
>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>>            this.fileReader = new
>>> FileReader(config.get(file_to_read()).toString());
>>> //            this.reader = new BufferedReader(fileReader);
>>>
>>>        } catch (FileNotFoundException e) {
>>>            throw new RuntimeException("Error reading file [" +
>>> config.get(file_to_read()) + "]");
>>>        }
>>>
>>>    }
>>>
>>>    @Override
>>>    public void nextTuple() {
>>>
>>>        if (this.completed) {
>>>            try {
>>>                Thread.sleep(1);
>>>            } catch (InterruptedException e) {
>>>                e.printStackTrace();
>>>            }
>>>        }
>>>
>>>        try {
>>>
>>>            String str;
>>>            BufferedReader reader = new BufferedReader(fileReader);
>>>
>>>            while ((str = reader.readLine()) != null) {
>>>
>>>                if (str.isEmpty()) {
>>>                    return;
>>>                }
>>>                String[] temp = str.split(",");
>>> //                    System.err.println("============== "+temp[0] +
>>> " + " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>>                collector.emit("data", new Values(temp[0], temp[2],
>>> temp[3]), temp[0]); //emmit the correct data to next bolt without
>>> guarantee delivery
>>>
>>>            }
>>>
>>>        } catch (Exception e) {
>>> //            System.err.println("Error Reading Tuple: " + e);
>>>            throw new RuntimeException("Error Reading Tuple ", e);
>>>        } finally {
>>>            System.err.println("Finished Reading File. Message From
>>> Spout: " + spout_idx);
>>>            this.completed = true;
>>>
>>>        }
>>>
>>>    }
>>>
>>>    private String file_to_read() {
>>> //        this.spout_id = context.getThisTaskId();
>>>        if (Consts.NO_OF_SPOUTS > 1) {
>>>            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>>            return "data" + file_no;
>>>        } else {
>>>            return "data";
>>>        }
>>>    }
>>>
>>>    @Override
>>>    public void ack(Object msgId) {
>>> //        super.ack(msgId);
>>>        noOfAckedWords++;
>>> //        System.out.println("OK tuple acked from bolt: " + msgId+"
>>> no of acked word "+noOfAckedWords);
>>>    }
>>>
>>>    @Override
>>>    public void fail(Object msgId) {
>>> //        super.fail(msgId);
>>>        noOfFailedWords++;
>>>        System.err.println("ERROR: " + context.getThisComponentId() +
>>> " " + msgId + " no of words failed " + noOfFailedWords);
>>>
>>>    }
>>>
>>>    @Override
>>>    public Map<String, Object> getComponentConfiguration() {
>>>        return null;
>>>    }
>>>
>>>    @Override
>>>    public void close() {
>>>        try {
>>>            fileReader.close();
>>>        } catch (IOException e) {
>>>            e.printStackTrace();
>>>        }
>>>
>>>    }
>>>
>>>    @Override
>>>    public void activate() {
>>>
>>>    }
>>>
>>>    @Override
>>>    public void deactivate() {
>>>
>>>    }
>>> }
>>>
>>>
>>>
>>>
>>>> On 5 Ιουν 2015, at 12:12, Michail Toutoudakis <[email protected]
>>>> <mailto:[email protected]>
>>>> <mailto:[email protected]>> wrote:
>>>>
>>>> I tried your solution but nothing happened. I still continue to loose
>>>> many values. However i found a file reader spout implementation that
>>>> worked without loosing a single value. Bellow is the code for anyone
>>>> that might have the same problem:
>>>>
>>>> package tuc.LSH.storm.bolts;
>>>>
>>>> import backtype.storm.task.OutputCollector;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>> import sun.jvm.hotspot.runtime.*;
>>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>>>
>>>>
>>>> import java.lang.Thread;
>>>> import java.util.*;
>>>>
>>>> /**
>>>> * Created by mixtou on 17/5/15.
>>>> */
>>>> public class LSHBolt extends BaseRichBolt {
>>>>    private int task_id;
>>>>    private OutputCollector collector;
>>>>    private UniversalBasicWindow universalBasicWindow;
>>>>
>>>>    private String streamId;
>>>>    private String time;
>>>>    private Float value;
>>>>
>>>>    @Override
>>>>    public void prepare(Map conf, TopologyContext topologyContext,
>>>> OutputCollector outputCollector) {
>>>>        this.task_id = topologyContext.getThisTaskIndex();
>>>>        this.collector = outputCollector;
>>>>        this.universalBasicWindow = new UniversalBasicWindow();
>>>>        streamId = null;
>>>>        time = null;
>>>>        value = 0f;
>>>>        System.err.println("New Bolt with id: " + task_id);
>>>>
>>>>    }
>>>>
>>>>    @Override
>>>>    public void execute(Tuple tuple) {
>>>>
>>>>        if (tuple.getSourceStreamId().equals("sync")) {
>>>>            System.out.println("Bolt task id " + task_id + " received
>>>> from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>>>            System.out.println("Normalizing: Basic Window of Bolt " +
>>>> task_id);
>>>>            universalBasicWindow.normalize(); //fill the rest of the
>>>> streams with last received value to make them same size
>>>>            universalBasicWindow = null;
>>>>            universalBasicWindow = new UniversalBasicWindow();
>>>>
>>>>        }
>>>>
>>>>        if (tuple.getSourceStreamId().equals("data")) {
>>>>
>>>>            streamId = tuple.getStringByField("streamId");
>>>>            time = tuple.getStringByField("timestamp");
>>>>            value = Float.parseFloat(tuple.getStringByField("value"));
>>>>
>>>>            universalBasicWindow.pushStream(streamId, value);
>>>>
>>>>            if (universalBasicWindow.isFull(task_id)) { //check if
>>>> any stream of the window is full
>>>>
>>>> //                System.out.println("Univ. Basic Window of bolt " +
>>>> task_id + " is Filled Up");
>>>>
>>>>                collector.emit("bwFilled", new Values(task_id));
>>>>
>>>> //                universalBasicWindow.normalize();
>>>> //                universalBasicWindow = new UniversalBasicWindow();
>>>>
>>>> //                TODO:: Add basic window to sliding window and clear
>>>>
>>>>            }
>>>>
>>>>        }
>>>>
>>>>
>>>> //        System.err.println("SourceComponent:
>>>> "+tuple.getSourceComponent());
>>>> //        System.err.println("SourceStreamId:
>>>> "+tuple.getSourceStreamId());
>>>> //        System.err.println("Source Task: "+tuple.getSourceTask());
>>>> //        System.err.println("SourceGlobalStreamId:
>>>> "+tuple.getSourceGlobalStreamid());
>>>> //        System.err.println("MessageId: "+tuple.getMessageId());
>>>>
>>>>        collector.ack(tuple);
>>>>    }
>>>>
>>>>    @Override
>>>>    public void declareOutputFields(OutputFieldsDeclarer
>>>> outputFieldsDeclarer) {
>>>>        outputFieldsDeclarer.declareStream("bwFilled", new
>>>> Fields("task_id"));
>>>>
>>>>    }
>>>> }
>>>>
>>>>> On 4 Ιουν 2015, at 20:29, Matthias J. Sax
>>>>> <[email protected] <mailto:[email protected]>
>>>>> <mailto:[email protected]>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> You are looping within "nextTuple()" to emit a tuple for each lines for
>>>>> the whole file. This is "bad practice" because the spout is
>>>>> prevented to
>>>>> take "acks" while "nextTuple()" is executing. I guess, that is the
>>>>> reason why your tuples time out and fail.
>>>>>
>>>>> You should return from "nextTuple()" after emitting a single record.
>>>>> This should solve your problem as the Spout can process incoming acks
>>>>> between to calls to "nextTuple()". Just instantiate your BufferedReader
>>>>> in "open()" and close it if "this.completed" is true.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote:
>>>>>> I am using a bolt to read data from a text file, and send them to a
>>>>>> bolt. However i am loosing too many values, the Fail.
>>>>>> I am newbie to storm and i don’t know where to look to debug this
>>>>>> issue.
>>>>>> Any ideas??
>>>>>>
>>>>>> My Storm Spout code is:
>>>>>>
>>>>>> package tuc.LSH.storm.spouts;
>>>>>>
>>>>>> import backtype.storm.spout.SpoutOutputCollector;
>>>>>> import backtype.storm.task.TopologyContext;
>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>> import backtype.storm.topology.base.BaseRichSpout;
>>>>>> import backtype.storm.tuple.Fields;
>>>>>> import backtype.storm.tuple.Values;
>>>>>> import backtype.storm.utils.Utils;
>>>>>> import tuc.LSH.conf.Consts;
>>>>>>
>>>>>> import java.io.BufferedReader;
>>>>>> import java.io.FileNotFoundException;
>>>>>> import java.io.FileReader;
>>>>>> import java.util.Map;
>>>>>>
>>>>>> /**
>>>>>> * Created by mixtou on 15/5/15.
>>>>>> */
>>>>>> public class FileReaderSpout extends BaseRichSpout {
>>>>>>
>>>>>>   private SpoutOutputCollector collector;
>>>>>>   private FileReader fileReader;
>>>>>>   private boolean completed;
>>>>>>   private TopologyContext context;
>>>>>>   private int spout_idx;
>>>>>>   private int spout_id;
>>>>>>   private Map config;
>>>>>>   private int noOfFailedWords;
>>>>>>   private int noOfAckedWords;
>>>>>>
>>>>>>
>>>>>>   @Override
>>>>>>   public void declareOutputFields(OutputFieldsDeclarer
>>>>>> outputFieldsDeclarer) {
>>>>>>       outputFieldsDeclarer.declareStream("data",
>>>>>> new Fields("streamId", "timestamp", "value"));
>>>>>>
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public void open(Map config, TopologyContext
>>>>>> topologyContext, SpoutOutputCollector spoutOutputCollector) {
>>>>>>       this.context = topologyContext;
>>>>>>       this.spout_idx = context.getThisTaskIndex();
>>>>>>       this.spout_id = context.getThisTaskId();
>>>>>>       this.collector = spoutOutputCollector;
>>>>>>       this.config = config;
>>>>>>       this.completed = false;
>>>>>>       this.noOfFailedWords = 0;
>>>>>>       this.noOfAckedWords = 0;
>>>>>>
>>>>>>       try {
>>>>>>           System.err.println("Reading File: " +
>>>>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>>>>>
>>>>>> this.fileReader = new
>>>>>> FileReader(config.get(file_to_read()).toString());
>>>>>>       } catch (FileNotFoundException e) {
>>>>>>           throw new RuntimeException("Error reading file [" +
>>>>>> config.get(file_to_read()) + "]");
>>>>>>       }
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public void nextTuple() {
>>>>>>
>>>>>>       if (this.completed) {
>>>>>>           return;
>>>>>>       }
>>>>>>
>>>>>>       try {
>>>>>>
>>>>>>           String str;
>>>>>>           BufferedReader reader = new BufferedReader(fileReader);
>>>>>>
>>>>>>           while ((str = reader.readLine()) != null) {
>>>>>>
>>>>>>               if (str.isEmpty()) {
>>>>>>                   return;
>>>>>>               }
>>>>>>               String[] temp = str.split(",");
>>>>>> //                    System.err.println("============== "+temp[0]
>>>>>> + " +
>>>>>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>>>>>
>>>>>>               collector.emit("data",
>>>>>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct
>>>>>> data to next bolt with guarantee delivery
>>>>>>
>>>>>>           }
>>>>>>
>>>>>>       } catch (Exception e) {
>>>>>>           System.err.println("Error Reading Tuple: " + e);
>>>>>>           throw new RuntimeException("Error Reading Tuple ", e);
>>>>>>       } finally {
>>>>>>           System.err.println("Finished Reading File. Message From
>>>>>> Spout: " + spout_idx);
>>>>>>           this.completed = true;
>>>>>>
>>>>>>       }
>>>>>>
>>>>>>       Utils.sleep(100);
>>>>>>   }
>>>>>>
>>>>>>   private String file_to_read() {
>>>>>> //        this.spout_id = context.getThisTaskId();
>>>>>>       if (Consts.NO_OF_SPOUTS > 1) {
>>>>>>           int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>>>>>           return "data" + file_no;
>>>>>>       } else {
>>>>>>           return "data";
>>>>>>       }
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public void ack(Object msgId) {
>>>>>>       super.ack(msgId);
>>>>>>       noOfAckedWords++;
>>>>>> //        System.out.println("OK tuple acked from bolt: " + msgId+" no
>>>>>> of acked word "+noOfAckedWords);
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public void fail(Object msgId) {
>>>>>>       super.fail(msgId);
>>>>>>       noOfFailedWords++;
>>>>>>       System.err.println("ERROR: " + context.getThisComponentId() + "
>>>>>> " + msgId + " no of words failed " + noOfFailedWords);
>>>>>>
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> And my bolt looks like this:
>>>>>>
>>>>>> package tuc.LSH.storm.bolts;
>>>>>>
>>>>>> import backtype.storm.task.OutputCollector;
>>>>>> import backtype.storm.task.TopologyContext;
>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>>>> import backtype.storm.tuple.Fields;
>>>>>> import backtype.storm.tuple.Tuple;
>>>>>> import backtype.storm.tuple.Values;
>>>>>> import sun.jvm.hotspot.runtime.*;
>>>>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>>>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>>>>>
>>>>>>
>>>>>> import java.lang.Thread;
>>>>>> import java.util.*;
>>>>>>
>>>>>> /**
>>>>>> * Created by mixtou on 17/5/15.
>>>>>> */
>>>>>> public class LSHBolt extends BaseRichBolt {
>>>>>>   private int task_id;
>>>>>>   private OutputCollector collector;
>>>>>>   private UniversalBasicWindow universalBasicWindow;
>>>>>>
>>>>>>   private String streamId;
>>>>>>   private String time;
>>>>>>   private Float value;
>>>>>>
>>>>>>   @Override
>>>>>>   public void prepare(Map conf, TopologyContext topologyContext,
>>>>>> OutputCollector outputCollector) {
>>>>>>       this.task_id = topologyContext.getThisTaskIndex();
>>>>>>       this.collector = outputCollector;
>>>>>>       this.universalBasicWindow = new UniversalBasicWindow();
>>>>>>       streamId = null;
>>>>>>       time = null;
>>>>>>       value = 0f;
>>>>>>       System.err.println("New Bolt with id: " + task_id);
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public void execute(Tuple tuple) {
>>>>>>
>>>>>>       if (tuple.getSourceStreamId().equals("sync")) {
>>>>>>           System.out.println("Bolt task id " + task_id + " received
>>>>>> from " + tuple.getSourceComponent() + " message " +
>>>>>> tuple.getString(0));
>>>>>>           System.out.println("Normalizing: Basic Window of Bolt " +
>>>>>> task_id);
>>>>>>           universalBasicWindow.normalize(); //fill the rest of the
>>>>>> streams with last received value to make them same size
>>>>>>           universalBasicWindow = null;
>>>>>>           universalBasicWindow = new UniversalBasicWindow();
>>>>>>
>>>>>>           collector.ack(tuple);
>>>>>>       }
>>>>>>
>>>>>>       if (tuple.getSourceStreamId().equals("data")) {
>>>>>>
>>>>>>           streamId = tuple.getStringByField("streamId");
>>>>>>           time = tuple.getStringByField("timestamp");
>>>>>>           value = Float.parseFloat(tuple.getStringByField("value"));
>>>>>>
>>>>>>           universalBasicWindow.pushStream(streamId, value);
>>>>>>
>>>>>>           collector.ack(tuple);
>>>>>>
>>>>>>           if (universalBasicWindow.isFull(task_id)) { //check if
>>>>>> any stream of the window is full
>>>>>>
>>>>>> //                System.out.println("Univ. Basic Window of bolt " +
>>>>>> task_id + " is Filled Up");
>>>>>>
>>>>>>               collector.emit("bwFilled", new Values(task_id));
>>>>>>
>>>>>> //                universalBasicWindow.normalize();
>>>>>> //                universalBasicWindow = new UniversalBasicWindow();
>>>>>>
>>>>>> //                TODO:: Add basic window to sliding window and clear
>>>>>>
>>>>>>           }
>>>>>>
>>>>>>       }
>>>>>>
>>>>>>
>>>>>> //        collector.ack(tuple);
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public void declareOutputFields(OutputFieldsDeclarer
>>>>>> outputFieldsDeclarer) {
>>>>>>       outputFieldsDeclarer.declareStream("bwFilled", new
>>>>>> Fields("task_id"));
>>>>>>
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> Any Ideas??
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to