Sorry for my stupid question, but what do you mean return from each emit?
Add return; after emiting tuple os something else i haven’t understood?
Could you post a code sample?

> On 5 Ιουν 2015, at 14:44, Matthias J. Sax <[email protected]> 
> wrote:
> 
> Hi,
> 
> I don't see big difference in both Spout implementations... They look
> basically the same... I would still recommend, to return from
> nextTuple() after each emit.
> 
> If you don't want to return each time, another reason for missing values
> might be the return statement in the while-loop:
> 
>>                if (str.isEmpty()) {
>>                    return;
>>                }
> 
> This should be "continue" -- otherwise you return from nextTuple()
> without processing the whole file, if an empty line is in between.
> 
> 
> -Matthias
> 
> On 06/05/2015 11:16 AM, Michail Toutoudakis wrote:
>> Sorry wrong code posted that was the bolts' code. The spout’ s code is:
>> 
>> package tuc.LSH.storm.spouts;
>> 
>> import backtype.storm.spout.SpoutOutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.IRichSpout;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichSpout;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Values;
>> import backtype.storm.utils.Utils;
>> import tuc.LSH.conf.Consts;
>> 
>> import java.io.BufferedReader;
>> import java.io.FileNotFoundException;
>> import java.io.FileReader;
>> import java.io.IOException;
>> import java.util.Map;
>> 
>> /**
>> * Created by mixtou on 15/5/15.
>> */
>> //public class FileReaderSpout extends BaseRichSpout {
>> public class FileReaderSpout implements IRichSpout {
>> 
>>    private SpoutOutputCollector collector;
>>    private FileReader fileReader;
>>    private boolean completed;
>>    private TopologyContext context;
>>    private int spout_idx;
>>    private int spout_id;
>>    private Map config;
>>    private int noOfFailedWords;
>>    private int noOfAckedWords;
>> //    private BufferedReader reader;
>> 
>> 
>>    @Override
>>    public void declareOutputFields(OutputFieldsDeclarer 
>> outputFieldsDeclarer) {
>>        outputFieldsDeclarer.declareStream("data", new Fields("streamId", 
>> "timestamp", "value"));
>> 
>> 
>>    }
>> 
>>    @Override
>>    public void open(Map config, TopologyContext topologyContext, 
>> SpoutOutputCollector spoutOutputCollector) {
>>        this.context = topologyContext;
>>        this.spout_idx = context.getThisTaskIndex();
>>        this.spout_id = context.getThisTaskId();
>>        this.collector = spoutOutputCollector;
>>        this.config = config;
>>        this.completed = false;
>>        this.noOfFailedWords = 0;
>>        this.noOfAckedWords = 0;
>> 
>>        try {
>>            System.err.println("Reading File: " + 
>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>            this.fileReader = new 
>> FileReader(config.get(file_to_read()).toString());
>> //            this.reader = new BufferedReader(fileReader);
>> 
>>        } catch (FileNotFoundException e) {
>>            throw new RuntimeException("Error reading file [" + 
>> config.get(file_to_read()) + "]");
>>        }
>> 
>>    }
>> 
>>    @Override
>>    public void nextTuple() {
>> 
>>        if (this.completed) {
>>            try {
>>                Thread.sleep(1);
>>            } catch (InterruptedException e) {
>>                e.printStackTrace();
>>            }
>>        }
>> 
>>        try {
>> 
>>            String str;
>>            BufferedReader reader = new BufferedReader(fileReader);
>> 
>>            while ((str = reader.readLine()) != null) {
>> 
>>                if (str.isEmpty()) {
>>                    return;
>>                }
>>                String[] temp = str.split(",");
>> //                    System.err.println("============== "+temp[0] + " + " + 
>> temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>                collector.emit("data", new Values(temp[0], temp[2], temp[3]), 
>> temp[0]); //emmit the correct data to next bolt without guarantee delivery
>> 
>>            }
>> 
>>        } catch (Exception e) {
>> //            System.err.println("Error Reading Tuple: " + e);
>>            throw new RuntimeException("Error Reading Tuple ", e);
>>        } finally {
>>            System.err.println("Finished Reading File. Message From Spout: " 
>> + spout_idx);
>>            this.completed = true;
>> 
>>        }
>> 
>>    }
>> 
>>    private String file_to_read() {
>> //        this.spout_id = context.getThisTaskId();
>>        if (Consts.NO_OF_SPOUTS > 1) {
>>            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>            return "data" + file_no;
>>        } else {
>>            return "data";
>>        }
>>    }
>> 
>>    @Override
>>    public void ack(Object msgId) {
>> //        super.ack(msgId);
>>        noOfAckedWords++;
>> //        System.out.println("OK tuple acked from bolt: " + msgId+" no of 
>> acked word "+noOfAckedWords);
>>    }
>> 
>>    @Override
>>    public void fail(Object msgId) {
>> //        super.fail(msgId);
>>        noOfFailedWords++;
>>        System.err.println("ERROR: " + context.getThisComponentId() + " " + 
>> msgId + " no of words failed " + noOfFailedWords);
>> 
>>    }
>> 
>>    @Override
>>    public Map<String, Object> getComponentConfiguration() {
>>        return null;
>>    }
>> 
>>    @Override
>>    public void close() {
>>        try {
>>            fileReader.close();
>>        } catch (IOException e) {
>>            e.printStackTrace();
>>        }
>> 
>>    }
>> 
>>    @Override
>>    public void activate() {
>> 
>>    }
>> 
>>    @Override
>>    public void deactivate() {
>> 
>>    }
>> }
>> 
>> 
>> 
>> 
>>> On 5 Ιουν 2015, at 12:12, Michail Toutoudakis <[email protected]
>>> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>>> 
>>> I tried your solution but nothing happened. I still continue to loose
>>> many values. However i found a file reader spout implementation that
>>> worked without loosing a single value. Bellow is the code for anyone
>>> that might have the same problem:
>>> 
>>> package tuc.LSH.storm.bolts;
>>> 
>>> import backtype.storm.task.OutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>> import sun.jvm.hotspot.runtime.*;
>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>> 
>>> 
>>> import java.lang.Thread;
>>> import java.util.*;
>>> 
>>> /**
>>> * Created by mixtou on 17/5/15.
>>> */
>>> public class LSHBolt extends BaseRichBolt {
>>>    private int task_id;
>>>    private OutputCollector collector;
>>>    private UniversalBasicWindow universalBasicWindow;
>>> 
>>>    private String streamId;
>>>    private String time;
>>>    private Float value;
>>> 
>>>    @Override
>>>    public void prepare(Map conf, TopologyContext topologyContext, 
>>> OutputCollector outputCollector) {
>>>        this.task_id = topologyContext.getThisTaskIndex();
>>>        this.collector = outputCollector;
>>>        this.universalBasicWindow = new UniversalBasicWindow();
>>>        streamId = null;
>>>        time = null;
>>>        value = 0f;
>>>        System.err.println("New Bolt with id: " + task_id);
>>> 
>>>    }
>>> 
>>>    @Override
>>>    public void execute(Tuple tuple) {
>>> 
>>>        if (tuple.getSourceStreamId().equals("sync")) {
>>>            System.out.println("Bolt task id " + task_id + " received from " 
>>> + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>>            System.out.println("Normalizing: Basic Window of Bolt " + 
>>> task_id);
>>>            universalBasicWindow.normalize(); //fill the rest of the streams 
>>> with last received value to make them same size
>>>            universalBasicWindow = null;
>>>            universalBasicWindow = new UniversalBasicWindow();
>>> 
>>>        }
>>> 
>>>        if (tuple.getSourceStreamId().equals("data")) {
>>> 
>>>            streamId = tuple.getStringByField("streamId");
>>>            time = tuple.getStringByField("timestamp");
>>>            value = Float.parseFloat(tuple.getStringByField("value"));
>>> 
>>>            universalBasicWindow.pushStream(streamId, value);
>>> 
>>>            if (universalBasicWindow.isFull(task_id)) { //check if any 
>>> stream of the window is full
>>> 
>>> //                System.out.println("Univ. Basic Window of bolt " + 
>>> task_id + " is Filled Up");
>>> 
>>>                collector.emit("bwFilled", new Values(task_id));
>>> 
>>> //                universalBasicWindow.normalize();
>>> //                universalBasicWindow = new UniversalBasicWindow();
>>> 
>>> //                TODO:: Add basic window to sliding window and clear
>>> 
>>>            }
>>> 
>>>        }
>>> 
>>> 
>>> //        System.err.println("SourceComponent: 
>>> "+tuple.getSourceComponent());
>>> //        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
>>> //        System.err.println("Source Task: "+tuple.getSourceTask());
>>> //        System.err.println("SourceGlobalStreamId: 
>>> "+tuple.getSourceGlobalStreamid());
>>> //        System.err.println("MessageId: "+tuple.getMessageId());
>>> 
>>>        collector.ack(tuple);
>>>    }
>>> 
>>>    @Override
>>>    public void declareOutputFields(OutputFieldsDeclarer 
>>> outputFieldsDeclarer) {
>>>        outputFieldsDeclarer.declareStream("bwFilled", new 
>>> Fields("task_id"));
>>> 
>>>    }
>>> }
>>> 
>>>> On 4 Ιουν 2015, at 20:29, Matthias J. Sax
>>>> <[email protected] <mailto:[email protected]>
>>>> <mailto:[email protected] 
>>>> <mailto:[email protected]>>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> You are looping within "nextTuple()" to emit a tuple for each lines for
>>>> the whole file. This is "bad practice" because the spout is prevented to
>>>> take "acks" while "nextTuple()" is executing. I guess, that is the
>>>> reason why your tuples time out and fail.
>>>> 
>>>> You should return from "nextTuple()" after emitting a single record.
>>>> This should solve your problem as the Spout can process incoming acks
>>>> between to calls to "nextTuple()". Just instantiate your BufferedReader
>>>> in "open()" and close it if "this.completed" is true.
>>>> 
>>>> Hope this helps.
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> 
>>>> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote:
>>>>> I am using a bolt to read data from a text file, and send them to a
>>>>> bolt. However i am loosing too many values, the Fail.
>>>>> I am newbie to storm and i don’t know where to look to debug this issue.
>>>>> Any ideas??
>>>>> 
>>>>> My Storm Spout code is:
>>>>> 
>>>>> package tuc.LSH.storm.spouts;
>>>>> 
>>>>> import backtype.storm.spout.SpoutOutputCollector;
>>>>> import backtype.storm.task.TopologyContext;
>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>> import backtype.storm.topology.base.BaseRichSpout;
>>>>> import backtype.storm.tuple.Fields;
>>>>> import backtype.storm.tuple.Values;
>>>>> import backtype.storm.utils.Utils;
>>>>> import tuc.LSH.conf.Consts;
>>>>> 
>>>>> import java.io.BufferedReader;
>>>>> import java.io.FileNotFoundException;
>>>>> import java.io.FileReader;
>>>>> import java.util.Map;
>>>>> 
>>>>> /**
>>>>> * Created by mixtou on 15/5/15.
>>>>> */
>>>>> public class FileReaderSpout extends BaseRichSpout {
>>>>> 
>>>>>   private SpoutOutputCollector collector;
>>>>>   private FileReader fileReader;
>>>>>   private boolean completed;
>>>>>   private TopologyContext context;
>>>>>   private int spout_idx;
>>>>>   private int spout_id;
>>>>>   private Map config;
>>>>>   private int noOfFailedWords;
>>>>>   private int noOfAckedWords;
>>>>> 
>>>>> 
>>>>>   @Override
>>>>>   public void declareOutputFields(OutputFieldsDeclarer
>>>>> outputFieldsDeclarer) {
>>>>>       outputFieldsDeclarer.declareStream("data",
>>>>> new Fields("streamId", "timestamp", "value"));
>>>>> 
>>>>> 
>>>>>   }
>>>>> 
>>>>>   @Override
>>>>>   public void open(Map config, TopologyContext
>>>>> topologyContext, SpoutOutputCollector spoutOutputCollector) {
>>>>>       this.context = topologyContext;
>>>>>       this.spout_idx = context.getThisTaskIndex();
>>>>>       this.spout_id = context.getThisTaskId();
>>>>>       this.collector = spoutOutputCollector;
>>>>>       this.config = config;
>>>>>       this.completed = false;
>>>>>       this.noOfFailedWords = 0;
>>>>>       this.noOfAckedWords = 0;
>>>>> 
>>>>>       try {
>>>>>           System.err.println("Reading File: " +
>>>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>>>> 
>>>>> this.fileReader = new FileReader(config.get(file_to_read()).toString());
>>>>>       } catch (FileNotFoundException e) {
>>>>>           throw new RuntimeException("Error reading file [" +
>>>>> config.get(file_to_read()) + "]");
>>>>>       }
>>>>> 
>>>>>   }
>>>>> 
>>>>>   @Override
>>>>>   public void nextTuple() {
>>>>> 
>>>>>       if (this.completed) {
>>>>>           return;
>>>>>       }
>>>>> 
>>>>>       try {
>>>>> 
>>>>>           String str;
>>>>>           BufferedReader reader = new BufferedReader(fileReader);
>>>>> 
>>>>>           while ((str = reader.readLine()) != null) {
>>>>> 
>>>>>               if (str.isEmpty()) {
>>>>>                   return;
>>>>>               }
>>>>>               String[] temp = str.split(",");
>>>>> //                    System.err.println("============== "+temp[0] + " +
>>>>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>>>> 
>>>>>               collector.emit("data",
>>>>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct
>>>>> data to next bolt with guarantee delivery
>>>>> 
>>>>>           }
>>>>> 
>>>>>       } catch (Exception e) {
>>>>>           System.err.println("Error Reading Tuple: " + e);
>>>>>           throw new RuntimeException("Error Reading Tuple ", e);
>>>>>       } finally {
>>>>>           System.err.println("Finished Reading File. Message From
>>>>> Spout: " + spout_idx);
>>>>>           this.completed = true;
>>>>> 
>>>>>       }
>>>>> 
>>>>>       Utils.sleep(100);
>>>>>   }
>>>>> 
>>>>>   private String file_to_read() {
>>>>> //        this.spout_id = context.getThisTaskId();
>>>>>       if (Consts.NO_OF_SPOUTS > 1) {
>>>>>           int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>>>>           return "data" + file_no;
>>>>>       } else {
>>>>>           return "data";
>>>>>       }
>>>>>   }
>>>>> 
>>>>>   @Override
>>>>>   public void ack(Object msgId) {
>>>>>       super.ack(msgId);
>>>>>       noOfAckedWords++;
>>>>> //        System.out.println("OK tuple acked from bolt: " + msgId+" no
>>>>> of acked word "+noOfAckedWords);
>>>>>   }
>>>>> 
>>>>>   @Override
>>>>>   public void fail(Object msgId) {
>>>>>       super.fail(msgId);
>>>>>       noOfFailedWords++;
>>>>>       System.err.println("ERROR: " + context.getThisComponentId() + "
>>>>> " + msgId + " no of words failed " + noOfFailedWords);
>>>>> 
>>>>>   }
>>>>> }
>>>>> 
>>>>> And my bolt looks like this:
>>>>> 
>>>>> package tuc.LSH.storm.bolts;
>>>>> 
>>>>> import backtype.storm.task.OutputCollector;
>>>>> import backtype.storm.task.TopologyContext;
>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>>> import backtype.storm.tuple.Fields;
>>>>> import backtype.storm.tuple.Tuple;
>>>>> import backtype.storm.tuple.Values;
>>>>> import sun.jvm.hotspot.runtime.*;
>>>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>>>> 
>>>>> 
>>>>> import java.lang.Thread;
>>>>> import java.util.*;
>>>>> 
>>>>> /**
>>>>> * Created by mixtou on 17/5/15.
>>>>> */
>>>>> public class LSHBolt extends BaseRichBolt {
>>>>>   private int task_id;
>>>>>   private OutputCollector collector;
>>>>>   private UniversalBasicWindow universalBasicWindow;
>>>>> 
>>>>>   private String streamId;
>>>>>   private String time;
>>>>>   private Float value;
>>>>> 
>>>>>   @Override
>>>>>   public void prepare(Map conf, TopologyContext topologyContext,
>>>>> OutputCollector outputCollector) {
>>>>>       this.task_id = topologyContext.getThisTaskIndex();
>>>>>       this.collector = outputCollector;
>>>>>       this.universalBasicWindow = new UniversalBasicWindow();
>>>>>       streamId = null;
>>>>>       time = null;
>>>>>       value = 0f;
>>>>>       System.err.println("New Bolt with id: " + task_id);
>>>>> 
>>>>>   }
>>>>> 
>>>>>   @Override
>>>>>   public void execute(Tuple tuple) {
>>>>> 
>>>>>       if (tuple.getSourceStreamId().equals("sync")) {
>>>>>           System.out.println("Bolt task id " + task_id + " received
>>>>> from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>>>>           System.out.println("Normalizing: Basic Window of Bolt " +
>>>>> task_id);
>>>>>           universalBasicWindow.normalize(); //fill the rest of the
>>>>> streams with last received value to make them same size
>>>>>           universalBasicWindow = null;
>>>>>           universalBasicWindow = new UniversalBasicWindow();
>>>>> 
>>>>>           collector.ack(tuple);
>>>>>       }
>>>>> 
>>>>>       if (tuple.getSourceStreamId().equals("data")) {
>>>>> 
>>>>>           streamId = tuple.getStringByField("streamId");
>>>>>           time = tuple.getStringByField("timestamp");
>>>>>           value = Float.parseFloat(tuple.getStringByField("value"));
>>>>> 
>>>>>           universalBasicWindow.pushStream(streamId, value);
>>>>> 
>>>>>           collector.ack(tuple);
>>>>> 
>>>>>           if (universalBasicWindow.isFull(task_id)) { //check if
>>>>> any stream of the window is full
>>>>> 
>>>>> //                System.out.println("Univ. Basic Window of bolt " +
>>>>> task_id + " is Filled Up");
>>>>> 
>>>>>               collector.emit("bwFilled", new Values(task_id));
>>>>> 
>>>>> //                universalBasicWindow.normalize();
>>>>> //                universalBasicWindow = new UniversalBasicWindow();
>>>>> 
>>>>> //                TODO:: Add basic window to sliding window and clear
>>>>> 
>>>>>           }
>>>>> 
>>>>>       }
>>>>> 
>>>>> 
>>>>> //        collector.ack(tuple);
>>>>>   }
>>>>> 
>>>>>   @Override
>>>>>   public void declareOutputFields(OutputFieldsDeclarer
>>>>> outputFieldsDeclarer) {
>>>>>       outputFieldsDeclarer.declareStream("bwFilled", new
>>>>> Fields("task_id"));
>>>>> 
>>>>>   }
>>>>> }
>>>>> 
>>>>> Any Ideas??

Reply via email to