Thanks a lot scanner seems to work fine without failing any values.
Only one addition:
I had to put Utils.sleep(1);
after tuple emit. 


> On 5 Ιουν 2015, at 16:38, Mike Thomsen <[email protected]> wrote:
> 
> The nextTuple logic should be redone around a java.util.Scanner, not a 
> BufferedReader. Something like this:
> 
> Scanner scanner;
> 
> void open(...) {
>     //other code
>     scanner = new Scanner(new FileInputStream("location_here"));
> }
> 
> void nextTuple() {
>     if (scanner.hasNextLine()) {
>         collector.emit("data", new Values(scanner.nextLine()));
>     }
> }
> 
> On Fri, Jun 5, 2015 at 8:21 AM, Andrew Xor <[email protected] 
> <mailto:[email protected]>> wrote:
> Your function as it stands emits from nextTuple after the file has been read; 
> which is against basically every storm principle that's out there. It's not 
> scalable and will lead to a lot of issues, especially when running on a 
> cluster. What Matthias is suggesting (and you should do) is that for each 
> tuple you emit you need to return from nextTuple and call it again to emit 
> another one. Basically you should have one call for nextTuple per batch/tuple 
> that you emit, which as per your current implementation is one per tokenized 
> line read.
> 
> Hope this helps.
> 
> 
> On Fri, Jun 5, 2015 at 3:04 PM, Michail Toutoudakis <[email protected] 
> <mailto:[email protected]>> wrote:
> Sorry for my stupid question, but what do you mean return from each emit?
> Add return; after emiting tuple os something else i haven’t understood?
> Could you post a code sample?
> 
>> On 5 Ιουν 2015, at 14:44, Matthias J. Sax <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Hi,
>> 
>> I don't see big difference in both Spout implementations... They look
>> basically the same... I would still recommend, to return from
>> nextTuple() after each emit.
>> 
>> If you don't want to return each time, another reason for missing values
>> might be the return statement in the while-loop:
>> 
>>>                if (str.isEmpty()) {
>>>                    return;
>>>                }
>> 
>> This should be "continue" -- otherwise you return from nextTuple()
>> without processing the whole file, if an empty line is in between.
>> 
>> 
>> -Matthias
>> 
>> On 06/05/2015 11:16 AM, Michail Toutoudakis wrote:
>>> Sorry wrong code posted that was the bolts' code. The spout’ s code is:
>>> 
>>> package tuc.LSH.storm.spouts;
>>> 
>>> import backtype.storm.spout.SpoutOutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.IRichSpout;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichSpout;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Values;
>>> import backtype.storm.utils.Utils;
>>> import tuc.LSH.conf.Consts;
>>> 
>>> import java.io.BufferedReader;
>>> import java.io.FileNotFoundException;
>>> import java.io.FileReader;
>>> import java.io.IOException;
>>> import java.util.Map;
>>> 
>>> /**
>>> * Created by mixtou on 15/5/15.
>>> */
>>> //public class FileReaderSpout extends BaseRichSpout {
>>> public class FileReaderSpout implements IRichSpout {
>>> 
>>>    private SpoutOutputCollector collector;
>>>    private FileReader fileReader;
>>>    private boolean completed;
>>>    private TopologyContext context;
>>>    private int spout_idx;
>>>    private int spout_id;
>>>    private Map config;
>>>    private int noOfFailedWords;
>>>    private int noOfAckedWords;
>>> //    private BufferedReader reader;
>>> 
>>> 
>>>    @Override
>>>    public void declareOutputFields(OutputFieldsDeclarer 
>>> outputFieldsDeclarer) {
>>>        outputFieldsDeclarer.declareStream("data", new Fields("streamId", 
>>> "timestamp", "value"));
>>> 
>>> 
>>>    }
>>> 
>>>    @Override
>>>    public void open(Map config, TopologyContext topologyContext, 
>>> SpoutOutputCollector spoutOutputCollector) {
>>>        this.context = topologyContext;
>>>        this.spout_idx = context.getThisTaskIndex();
>>>        this.spout_id = context.getThisTaskId();
>>>        this.collector = spoutOutputCollector;
>>>        this.config = config;
>>>        this.completed = false;
>>>        this.noOfFailedWords = 0;
>>>        this.noOfAckedWords = 0;
>>> 
>>>        try {
>>>            System.err.println("Reading File: " + 
>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>>            this.fileReader = new 
>>> FileReader(config.get(file_to_read()).toString());
>>> //            this.reader = new BufferedReader(fileReader);
>>> 
>>>        } catch (FileNotFoundException e) {
>>>            throw new RuntimeException("Error reading file [" + 
>>> config.get(file_to_read()) + "]");
>>>        }
>>> 
>>>    }
>>> 
>>>    @Override
>>>    public void nextTuple() {
>>> 
>>>        if (this.completed) {
>>>            try {
>>>                Thread.sleep(1);
>>>            } catch (InterruptedException e) {
>>>                e.printStackTrace();
>>>            }
>>>        }
>>> 
>>>        try {
>>> 
>>>            String str;
>>>            BufferedReader reader = new BufferedReader(fileReader);
>>> 
>>>            while ((str = reader.readLine()) != null) {
>>> 
>>>                if (str.isEmpty()) {
>>>                    return;
>>>                }
>>>                String[] temp = str.split(",");
>>> //                    System.err.println("============== "+temp[0] + " + " 
>>> + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>>                collector.emit("data", new Values(temp[0], temp[2], 
>>> temp[3]), temp[0]); //emmit the correct data to next bolt without guarantee 
>>> delivery
>>> 
>>>            }
>>> 
>>>        } catch (Exception e) {
>>> //            System.err.println("Error Reading Tuple: " + e);
>>>            throw new RuntimeException("Error Reading Tuple ", e);
>>>        } finally {
>>>            System.err.println("Finished Reading File. Message From Spout: " 
>>> + spout_idx);
>>>            this.completed = true;
>>> 
>>>        }
>>> 
>>>    }
>>> 
>>>    private String file_to_read() {
>>> //        this.spout_id = context.getThisTaskId();
>>>        if (Consts.NO_OF_SPOUTS > 1) {
>>>            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>>            return "data" + file_no;
>>>        } else {
>>>            return "data";
>>>        }
>>>    }
>>> 
>>>    @Override
>>>    public void ack(Object msgId) {
>>> //        super.ack(msgId);
>>>        noOfAckedWords++;
>>> //        System.out.println("OK tuple acked from bolt: " + msgId+" no of 
>>> acked word "+noOfAckedWords);
>>>    }
>>> 
>>>    @Override
>>>    public void fail(Object msgId) {
>>> //        super.fail(msgId);
>>>        noOfFailedWords++;
>>>        System.err.println("ERROR: " + context.getThisComponentId() + " " + 
>>> msgId + " no of words failed " + noOfFailedWords);
>>> 
>>>    }
>>> 
>>>    @Override
>>>    public Map<String, Object> getComponentConfiguration() {
>>>        return null;
>>>    }
>>> 
>>>    @Override
>>>    public void close() {
>>>        try {
>>>            fileReader.close();
>>>        } catch (IOException e) {
>>>            e.printStackTrace();
>>>        }
>>> 
>>>    }
>>> 
>>>    @Override
>>>    public void activate() {
>>> 
>>>    }
>>> 
>>>    @Override
>>>    public void deactivate() {
>>> 
>>>    }
>>> }
>>> 
>>> 
>>> 
>>> 
>>>> On 5 Ιουν 2015, at 12:12, Michail Toutoudakis <[email protected] 
>>>> <mailto:[email protected]>
>>>> <mailto:[email protected] <mailto:[email protected]>>> wrote:
>>>> 
>>>> I tried your solution but nothing happened. I still continue to loose
>>>> many values. However i found a file reader spout implementation that
>>>> worked without loosing a single value. Bellow is the code for anyone
>>>> that might have the same problem:
>>>> 
>>>> package tuc.LSH.storm.bolts;
>>>> 
>>>> import backtype.storm.task.OutputCollector;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>> import sun.jvm.hotspot.runtime.*;
>>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>>> 
>>>> 
>>>> import java.lang.Thread;
>>>> import java.util.*;
>>>> 
>>>> /**
>>>> * Created by mixtou on 17/5/15.
>>>> */
>>>> public class LSHBolt extends BaseRichBolt {
>>>>    private int task_id;
>>>>    private OutputCollector collector;
>>>>    private UniversalBasicWindow universalBasicWindow;
>>>> 
>>>>    private String streamId;
>>>>    private String time;
>>>>    private Float value;
>>>> 
>>>>    @Override
>>>>    public void prepare(Map conf, TopologyContext topologyContext, 
>>>> OutputCollector outputCollector) {
>>>>        this.task_id = topologyContext.getThisTaskIndex();
>>>>        this.collector = outputCollector;
>>>>        this.universalBasicWindow = new UniversalBasicWindow();
>>>>        streamId = null;
>>>>        time = null;
>>>>        value = 0f;
>>>>        System.err.println("New Bolt with id: " + task_id);
>>>> 
>>>>    }
>>>> 
>>>>    @Override
>>>>    public void execute(Tuple tuple) {
>>>> 
>>>>        if (tuple.getSourceStreamId().equals("sync")) {
>>>>            System.out.println("Bolt task id " + task_id + " received from 
>>>> " + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>>>            System.out.println("Normalizing: Basic Window of Bolt " + 
>>>> task_id);
>>>>            universalBasicWindow.normalize(); //fill the rest of the 
>>>> streams with last received value to make them same size
>>>>            universalBasicWindow = null;
>>>>            universalBasicWindow = new UniversalBasicWindow();
>>>> 
>>>>        }
>>>> 
>>>>        if (tuple.getSourceStreamId().equals("data")) {
>>>> 
>>>>            streamId = tuple.getStringByField("streamId");
>>>>            time = tuple.getStringByField("timestamp");
>>>>            value = Float.parseFloat(tuple.getStringByField("value"));
>>>> 
>>>>            universalBasicWindow.pushStream(streamId, value);
>>>> 
>>>>            if (universalBasicWindow.isFull(task_id)) { //check if any 
>>>> stream of the window is full
>>>> 
>>>> //                System.out.println("Univ. Basic Window of bolt " + 
>>>> task_id + " is Filled Up");
>>>> 
>>>>                collector.emit("bwFilled", new Values(task_id));
>>>> 
>>>> //                universalBasicWindow.normalize();
>>>> //                universalBasicWindow = new UniversalBasicWindow();
>>>> 
>>>> //                TODO:: Add basic window to sliding window and clear
>>>> 
>>>>            }
>>>> 
>>>>        }
>>>> 
>>>> 
>>>> //        System.err.println("SourceComponent: 
>>>> "+tuple.getSourceComponent());
>>>> //        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
>>>> //        System.err.println("Source Task: "+tuple.getSourceTask());
>>>> //        System.err.println("SourceGlobalStreamId: 
>>>> "+tuple.getSourceGlobalStreamid());
>>>> //        System.err.println("MessageId: "+tuple.getMessageId());
>>>> 
>>>>        collector.ack(tuple);
>>>>    }
>>>> 
>>>>    @Override
>>>>    public void declareOutputFields(OutputFieldsDeclarer 
>>>> outputFieldsDeclarer) {
>>>>        outputFieldsDeclarer.declareStream("bwFilled", new 
>>>> Fields("task_id"));
>>>> 
>>>>    }
>>>> }
>>>> 
>>>>> On 4 Ιουν 2015, at 20:29, Matthias J. Sax
>>>>> <[email protected] <mailto:[email protected]>
>>>>> <mailto:[email protected] 
>>>>> <mailto:[email protected]>>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> You are looping within "nextTuple()" to emit a tuple for each lines for
>>>>> the whole file. This is "bad practice" because the spout is prevented to
>>>>> take "acks" while "nextTuple()" is executing. I guess, that is the
>>>>> reason why your tuples time out and fail.
>>>>> 
>>>>> You should return from "nextTuple()" after emitting a single record.
>>>>> This should solve your problem as the Spout can process incoming acks
>>>>> between to calls to "nextTuple()". Just instantiate your BufferedReader
>>>>> in "open()" and close it if "this.completed" is true.
>>>>> 
>>>>> Hope this helps.
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote:
>>>>>> I am using a bolt to read data from a text file, and send them to a
>>>>>> bolt. However i am loosing too many values, the Fail.
>>>>>> I am newbie to storm and i don’t know where to look to debug this issue.
>>>>>> Any ideas??
>>>>>> 
>>>>>> My Storm Spout code is:
>>>>>> 
>>>>>> package tuc.LSH.storm.spouts;
>>>>>> 
>>>>>> import backtype.storm.spout.SpoutOutputCollector;
>>>>>> import backtype.storm.task.TopologyContext;
>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>> import backtype.storm.topology.base.BaseRichSpout;
>>>>>> import backtype.storm.tuple.Fields;
>>>>>> import backtype.storm.tuple.Values;
>>>>>> import backtype.storm.utils.Utils;
>>>>>> import tuc.LSH.conf.Consts;
>>>>>> 
>>>>>> import java.io.BufferedReader;
>>>>>> import java.io.FileNotFoundException;
>>>>>> import java.io.FileReader;
>>>>>> import java.util.Map;
>>>>>> 
>>>>>> /**
>>>>>> * Created by mixtou on 15/5/15.
>>>>>> */
>>>>>> public class FileReaderSpout extends BaseRichSpout {
>>>>>> 
>>>>>>   private SpoutOutputCollector collector;
>>>>>>   private FileReader fileReader;
>>>>>>   private boolean completed;
>>>>>>   private TopologyContext context;
>>>>>>   private int spout_idx;
>>>>>>   private int spout_id;
>>>>>>   private Map config;
>>>>>>   private int noOfFailedWords;
>>>>>>   private int noOfAckedWords;
>>>>>> 
>>>>>> 
>>>>>>   @Override
>>>>>>   public void declareOutputFields(OutputFieldsDeclarer
>>>>>> outputFieldsDeclarer) {
>>>>>>       outputFieldsDeclarer.declareStream("data",
>>>>>> new Fields("streamId", "timestamp", "value"));
>>>>>> 
>>>>>> 
>>>>>>   }
>>>>>> 
>>>>>>   @Override
>>>>>>   public void open(Map config, TopologyContext
>>>>>> topologyContext, SpoutOutputCollector spoutOutputCollector) {
>>>>>>       this.context = topologyContext;
>>>>>>       this.spout_idx = context.getThisTaskIndex();
>>>>>>       this.spout_id = context.getThisTaskId();
>>>>>>       this.collector = spoutOutputCollector;
>>>>>>       this.config = config;
>>>>>>       this.completed = false;
>>>>>>       this.noOfFailedWords = 0;
>>>>>>       this.noOfAckedWords = 0;
>>>>>> 
>>>>>>       try {
>>>>>>           System.err.println("Reading File: " +
>>>>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>>>>> 
>>>>>> this.fileReader = new FileReader(config.get(file_to_read()).toString());
>>>>>>       } catch (FileNotFoundException e) {
>>>>>>           throw new RuntimeException("Error reading file [" +
>>>>>> config.get(file_to_read()) + "]");
>>>>>>       }
>>>>>> 
>>>>>>   }
>>>>>> 
>>>>>>   @Override
>>>>>>   public void nextTuple() {
>>>>>> 
>>>>>>       if (this.completed) {
>>>>>>           return;
>>>>>>       }
>>>>>> 
>>>>>>       try {
>>>>>> 
>>>>>>           String str;
>>>>>>           BufferedReader reader = new BufferedReader(fileReader);
>>>>>> 
>>>>>>           while ((str = reader.readLine()) != null) {
>>>>>> 
>>>>>>               if (str.isEmpty()) {
>>>>>>                   return;
>>>>>>               }
>>>>>>               String[] temp = str.split(",");
>>>>>> //                    System.err.println("============== "+temp[0] + " +
>>>>>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>>>>> 
>>>>>>               collector.emit("data",
>>>>>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct
>>>>>> data to next bolt with guarantee delivery
>>>>>> 
>>>>>>           }
>>>>>> 
>>>>>>       } catch (Exception e) {
>>>>>>           System.err.println("Error Reading Tuple: " + e);
>>>>>>           throw new RuntimeException("Error Reading Tuple ", e);
>>>>>>       } finally {
>>>>>>           System.err.println("Finished Reading File. Message From
>>>>>> Spout: " + spout_idx);
>>>>>>           this.completed = true;
>>>>>> 
>>>>>>       }
>>>>>> 
>>>>>>       Utils.sleep(100);
>>>>>>   }
>>>>>> 
>>>>>>   private String file_to_read() {
>>>>>> //        this.spout_id = context.getThisTaskId();
>>>>>>       if (Consts.NO_OF_SPOUTS > 1) {
>>>>>>           int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>>>>>           return "data" + file_no;
>>>>>>       } else {
>>>>>>           return "data";
>>>>>>       }
>>>>>>   }
>>>>>> 
>>>>>>   @Override
>>>>>>   public void ack(Object msgId) {
>>>>>>       super.ack(msgId);
>>>>>>       noOfAckedWords++;
>>>>>> //        System.out.println("OK tuple acked from bolt: " + msgId+" no
>>>>>> of acked word "+noOfAckedWords);
>>>>>>   }
>>>>>> 
>>>>>>   @Override
>>>>>>   public void fail(Object msgId) {
>>>>>>       super.fail(msgId);
>>>>>>       noOfFailedWords++;
>>>>>>       System.err.println("ERROR: " + context.getThisComponentId() + "
>>>>>> " + msgId + " no of words failed " + noOfFailedWords);
>>>>>> 
>>>>>>   }
>>>>>> }
>>>>>> 
>>>>>> And my bolt looks like this:
>>>>>> 
>>>>>> package tuc.LSH.storm.bolts;
>>>>>> 
>>>>>> import backtype.storm.task.OutputCollector;
>>>>>> import backtype.storm.task.TopologyContext;
>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>>>> import backtype.storm.tuple.Fields;
>>>>>> import backtype.storm.tuple.Tuple;
>>>>>> import backtype.storm.tuple.Values;
>>>>>> import sun.jvm.hotspot.runtime.*;
>>>>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>>>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>>>>> 
>>>>>> 
>>>>>> import java.lang.Thread;
>>>>>> import java.util.*;
>>>>>> 
>>>>>> /**
>>>>>> * Created by mixtou on 17/5/15.
>>>>>> */
>>>>>> public class LSHBolt extends BaseRichBolt {
>>>>>>   private int task_id;
>>>>>>   private OutputCollector collector;
>>>>>>   private UniversalBasicWindow universalBasicWindow;
>>>>>> 
>>>>>>   private String streamId;
>>>>>>   private String time;
>>>>>>   private Float value;
>>>>>> 
>>>>>>   @Override
>>>>>>   public void prepare(Map conf, TopologyContext topologyContext,
>>>>>> OutputCollector outputCollector) {
>>>>>>       this.task_id = topologyContext.getThisTaskIndex();
>>>>>>       this.collector = outputCollector;
>>>>>>       this.universalBasicWindow = new UniversalBasicWindow();
>>>>>>       streamId = null;
>>>>>>       time = null;
>>>>>>       value = 0f;
>>>>>>       System.err.println("New Bolt with id: " + task_id);
>>>>>> 
>>>>>>   }
>>>>>> 
>>>>>>   @Override
>>>>>>   public void execute(Tuple tuple) {
>>>>>> 
>>>>>>       if (tuple.getSourceStreamId().equals("sync")) {
>>>>>>           System.out.println("Bolt task id " + task_id + " received
>>>>>> from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>>>>>           System.out.println("Normalizing: Basic Window of Bolt " +
>>>>>> task_id);
>>>>>>           universalBasicWindow.normalize(); //fill the rest of the
>>>>>> streams with last received value to make them same size
>>>>>>           universalBasicWindow = null;
>>>>>>           universalBasicWindow = new UniversalBasicWindow();
>>>>>> 
>>>>>>           collector.ack(tuple);
>>>>>>       }
>>>>>> 
>>>>>>       if (tuple.getSourceStreamId().equals("data")) {
>>>>>> 
>>>>>>           streamId = tuple.getStringByField("streamId");
>>>>>>           time = tuple.getStringByField("timestamp");
>>>>>>           value = Float.parseFloat(tuple.getStringByField("value"));
>>>>>> 
>>>>>>           universalBasicWindow.pushStream(streamId, value);
>>>>>> 
>>>>>>           collector.ack(tuple);
>>>>>> 
>>>>>>           if (universalBasicWindow.isFull(task_id)) { //check if
>>>>>> any stream of the window is full
>>>>>> 
>>>>>> //                System.out.println("Univ. Basic Window of bolt " +
>>>>>> task_id + " is Filled Up");
>>>>>> 
>>>>>>               collector.emit("bwFilled", new Values(task_id));
>>>>>> 
>>>>>> //                universalBasicWindow.normalize();
>>>>>> //                universalBasicWindow = new UniversalBasicWindow();
>>>>>> 
>>>>>> //                TODO:: Add basic window to sliding window and clear
>>>>>> 
>>>>>>           }
>>>>>> 
>>>>>>       }
>>>>>> 
>>>>>> 
>>>>>> //        collector.ack(tuple);
>>>>>>   }
>>>>>> 
>>>>>>   @Override
>>>>>>   public void declareOutputFields(OutputFieldsDeclarer
>>>>>> outputFieldsDeclarer) {
>>>>>>       outputFieldsDeclarer.declareStream("bwFilled", new
>>>>>> Fields("task_id"));
>>>>>> 
>>>>>>   }
>>>>>> }
>>>>>> 
>>>>>> Any Ideas??
> 
> 
> 

Reply via email to