Hi,

I don't see big difference in both Spout implementations... They look
basically the same... I would still recommend, to return from
nextTuple() after each emit.

If you don't want to return each time, another reason for missing values
might be the return statement in the while-loop:

>                 if (str.isEmpty()) {
>                     return;
>                 }

This should be "continue" -- otherwise you return from nextTuple()
without processing the whole file, if an empty line is in between.


-Matthias

On 06/05/2015 11:16 AM, Michail Toutoudakis wrote:
> Sorry wrong code posted that was the bolts' code. The spout’ s code is:
> 
> package tuc.LSH.storm.spouts;
> 
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichSpout;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
> import tuc.LSH.conf.Consts;
> 
> import java.io.BufferedReader;
> import java.io.FileNotFoundException;
> import java.io.FileReader;
> import java.io.IOException;
> import java.util.Map;
> 
> /**
>  * Created by mixtou on 15/5/15.
>  */
> //public class FileReaderSpout extends BaseRichSpout {
> public class FileReaderSpout implements IRichSpout {
> 
>     private SpoutOutputCollector collector;
>     private FileReader fileReader;
>     private boolean completed;
>     private TopologyContext context;
>     private int spout_idx;
>     private int spout_id;
>     private Map config;
>     private int noOfFailedWords;
>     private int noOfAckedWords;
> //    private BufferedReader reader;
> 
> 
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer 
> outputFieldsDeclarer) {
>         outputFieldsDeclarer.declareStream("data", new Fields("streamId", 
> "timestamp", "value"));
> 
> 
>     }
> 
>     @Override
>     public void open(Map config, TopologyContext topologyContext, 
> SpoutOutputCollector spoutOutputCollector) {
>         this.context = topologyContext;
>         this.spout_idx = context.getThisTaskIndex();
>         this.spout_id = context.getThisTaskId();
>         this.collector = spoutOutputCollector;
>         this.config = config;
>         this.completed = false;
>         this.noOfFailedWords = 0;
>         this.noOfAckedWords = 0;
> 
>         try {
>             System.err.println("Reading File: " + 
> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>             this.fileReader = new 
> FileReader(config.get(file_to_read()).toString());
> //            this.reader = new BufferedReader(fileReader);
> 
>         } catch (FileNotFoundException e) {
>             throw new RuntimeException("Error reading file [" + 
> config.get(file_to_read()) + "]");
>         }
> 
>     }
> 
>     @Override
>     public void nextTuple() {
> 
>         if (this.completed) {
>             try {
>                 Thread.sleep(1);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>         }
> 
>         try {
> 
>             String str;
>             BufferedReader reader = new BufferedReader(fileReader);
> 
>             while ((str = reader.readLine()) != null) {
> 
>                 if (str.isEmpty()) {
>                     return;
>                 }
>                 String[] temp = str.split(",");
> //                    System.err.println("============== "+temp[0] + " + " + 
> temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>                 collector.emit("data", new Values(temp[0], temp[2], temp[3]), 
> temp[0]); //emmit the correct data to next bolt without guarantee delivery
>                
>             }
> 
>         } catch (Exception e) {
> //            System.err.println("Error Reading Tuple: " + e);
>             throw new RuntimeException("Error Reading Tuple ", e);
>         } finally {
>             System.err.println("Finished Reading File. Message From Spout: " 
> + spout_idx);
>             this.completed = true;
> 
>         }
> 
>     }
> 
>     private String file_to_read() {
> //        this.spout_id = context.getThisTaskId();
>         if (Consts.NO_OF_SPOUTS > 1) {
>             int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>             return "data" + file_no;
>         } else {
>             return "data";
>         }
>     }
> 
>     @Override
>     public void ack(Object msgId) {
> //        super.ack(msgId);
>         noOfAckedWords++;
> //        System.out.println("OK tuple acked from bolt: " + msgId+" no of 
> acked word "+noOfAckedWords);
>     }
> 
>     @Override
>     public void fail(Object msgId) {
> //        super.fail(msgId);
>         noOfFailedWords++;
>         System.err.println("ERROR: " + context.getThisComponentId() + " " + 
> msgId + " no of words failed " + noOfFailedWords);
> 
>     }
> 
>     @Override
>     public Map<String, Object> getComponentConfiguration() {
>         return null;
>     }
> 
>     @Override
>     public void close() {
>         try {
>             fileReader.close();
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
> 
>     }
> 
>     @Override
>     public void activate() {
> 
>     }
> 
>     @Override
>     public void deactivate() {
> 
>     }
> }
> 
> 
> 
> 
>> On 5 Ιουν 2015, at 12:12, Michail Toutoudakis <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> I tried your solution but nothing happened. I still continue to loose
>> many values. However i found a file reader spout implementation that
>> worked without loosing a single value. Bellow is the code for anyone
>> that might have the same problem:
>>
>> package tuc.LSH.storm.bolts;
>>
>> import backtype.storm.task.OutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichBolt;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Tuple;
>> import backtype.storm.tuple.Values;
>> import sun.jvm.hotspot.runtime.*;
>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>
>>
>> import java.lang.Thread;
>> import java.util.*;
>>
>> /**
>>  * Created by mixtou on 17/5/15.
>>  */
>> public class LSHBolt extends BaseRichBolt {
>>     private int task_id;
>>     private OutputCollector collector;
>>     private UniversalBasicWindow universalBasicWindow;
>>
>>     private String streamId;
>>     private String time;
>>     private Float value;
>>
>>     @Override
>>     public void prepare(Map conf, TopologyContext topologyContext, 
>> OutputCollector outputCollector) {
>>         this.task_id = topologyContext.getThisTaskIndex();
>>         this.collector = outputCollector;
>>         this.universalBasicWindow = new UniversalBasicWindow();
>>         streamId = null;
>>         time = null;
>>         value = 0f;
>>         System.err.println("New Bolt with id: " + task_id);
>>
>>     }
>>
>>     @Override
>>     public void execute(Tuple tuple) {
>>
>>         if (tuple.getSourceStreamId().equals("sync")) {
>>             System.out.println("Bolt task id " + task_id + " received from " 
>> + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>             System.out.println("Normalizing: Basic Window of Bolt " + 
>> task_id);
>>             universalBasicWindow.normalize(); //fill the rest of the streams 
>> with last received value to make them same size
>>             universalBasicWindow = null;
>>             universalBasicWindow = new UniversalBasicWindow();
>>
>>         }
>>
>>         if (tuple.getSourceStreamId().equals("data")) {
>>
>>             streamId = tuple.getStringByField("streamId");
>>             time = tuple.getStringByField("timestamp");
>>             value = Float.parseFloat(tuple.getStringByField("value"));
>>
>>             universalBasicWindow.pushStream(streamId, value);
>>
>>             if (universalBasicWindow.isFull(task_id)) { //check if any 
>> stream of the window is full
>>
>> //                System.out.println("Univ. Basic Window of bolt " + task_id 
>> + " is Filled Up");
>>
>>                 collector.emit("bwFilled", new Values(task_id));
>>
>> //                universalBasicWindow.normalize();
>> //                universalBasicWindow = new UniversalBasicWindow();
>>
>> //                TODO:: Add basic window to sliding window and clear
>>
>>             }
>>
>>         }
>>
>>
>> //        System.err.println("SourceComponent: "+tuple.getSourceComponent());
>> //        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
>> //        System.err.println("Source Task: "+tuple.getSourceTask());
>> //        System.err.println("SourceGlobalStreamId: 
>> "+tuple.getSourceGlobalStreamid());
>> //        System.err.println("MessageId: "+tuple.getMessageId());
>>
>>         collector.ack(tuple);
>>     }
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer 
>> outputFieldsDeclarer) {
>>         outputFieldsDeclarer.declareStream("bwFilled", new 
>> Fields("task_id"));
>>
>>     }
>> }
>>
>>> On 4 Ιουν 2015, at 20:29, Matthias J. Sax
>>> <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Hi,
>>>
>>> You are looping within "nextTuple()" to emit a tuple for each lines for
>>> the whole file. This is "bad practice" because the spout is prevented to
>>> take "acks" while "nextTuple()" is executing. I guess, that is the
>>> reason why your tuples time out and fail.
>>>
>>> You should return from "nextTuple()" after emitting a single record.
>>> This should solve your problem as the Spout can process incoming acks
>>> between to calls to "nextTuple()". Just instantiate your BufferedReader
>>> in "open()" and close it if "this.completed" is true.
>>>
>>> Hope this helps.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote:
>>>> I am using a bolt to read data from a text file, and send them to a
>>>> bolt. However i am loosing too many values, the Fail.
>>>> I am newbie to storm and i don’t know where to look to debug this issue.
>>>> Any ideas??
>>>>
>>>> My Storm Spout code is:
>>>>
>>>> package tuc.LSH.storm.spouts;
>>>>
>>>> import backtype.storm.spout.SpoutOutputCollector;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseRichSpout;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Values;
>>>> import backtype.storm.utils.Utils;
>>>> import tuc.LSH.conf.Consts;
>>>>
>>>> import java.io.BufferedReader;
>>>> import java.io.FileNotFoundException;
>>>> import java.io.FileReader;
>>>> import java.util.Map;
>>>>
>>>> /**
>>>> * Created by mixtou on 15/5/15.
>>>> */
>>>> public class FileReaderSpout extends BaseRichSpout {
>>>>
>>>>    private SpoutOutputCollector collector;
>>>>    private FileReader fileReader;
>>>>    private boolean completed;
>>>>    private TopologyContext context;
>>>>    private int spout_idx;
>>>>    private int spout_id;
>>>>    private Map config;
>>>>    private int noOfFailedWords;
>>>>    private int noOfAckedWords;
>>>>
>>>>
>>>>    @Override
>>>>    public void declareOutputFields(OutputFieldsDeclarer
>>>> outputFieldsDeclarer) {
>>>>        outputFieldsDeclarer.declareStream("data",
>>>> new Fields("streamId", "timestamp", "value"));
>>>>
>>>>
>>>>    }
>>>>
>>>>    @Override
>>>>    public void open(Map config, TopologyContext
>>>> topologyContext, SpoutOutputCollector spoutOutputCollector) {
>>>>        this.context = topologyContext;
>>>>        this.spout_idx = context.getThisTaskIndex();
>>>>        this.spout_id = context.getThisTaskId();
>>>>        this.collector = spoutOutputCollector;
>>>>        this.config = config;
>>>>        this.completed = false;
>>>>        this.noOfFailedWords = 0;
>>>>        this.noOfAckedWords = 0;
>>>>
>>>>        try {
>>>>            System.err.println("Reading File: " +
>>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>>>
>>>> this.fileReader = new FileReader(config.get(file_to_read()).toString());
>>>>        } catch (FileNotFoundException e) {
>>>>            throw new RuntimeException("Error reading file [" +
>>>> config.get(file_to_read()) + "]");
>>>>        }
>>>>
>>>>    }
>>>>
>>>>    @Override
>>>>    public void nextTuple() {
>>>>
>>>>        if (this.completed) {
>>>>            return;
>>>>        }
>>>>
>>>>        try {
>>>>
>>>>            String str;
>>>>            BufferedReader reader = new BufferedReader(fileReader);
>>>>
>>>>            while ((str = reader.readLine()) != null) {
>>>>
>>>>                if (str.isEmpty()) {
>>>>                    return;
>>>>                }
>>>>                String[] temp = str.split(",");
>>>> //                    System.err.println("============== "+temp[0] + " +
>>>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>>>
>>>>                collector.emit("data",
>>>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct
>>>> data to next bolt with guarantee delivery
>>>>
>>>>            }
>>>>
>>>>        } catch (Exception e) {
>>>>            System.err.println("Error Reading Tuple: " + e);
>>>>            throw new RuntimeException("Error Reading Tuple ", e);
>>>>        } finally {
>>>>            System.err.println("Finished Reading File. Message From
>>>> Spout: " + spout_idx);
>>>>            this.completed = true;
>>>>
>>>>        }
>>>>
>>>>        Utils.sleep(100);
>>>>    }
>>>>
>>>>    private String file_to_read() {
>>>> //        this.spout_id = context.getThisTaskId();
>>>>        if (Consts.NO_OF_SPOUTS > 1) {
>>>>            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>>>            return "data" + file_no;
>>>>        } else {
>>>>            return "data";
>>>>        }
>>>>    }
>>>>
>>>>    @Override
>>>>    public void ack(Object msgId) {
>>>>        super.ack(msgId);
>>>>        noOfAckedWords++;
>>>> //        System.out.println("OK tuple acked from bolt: " + msgId+" no
>>>> of acked word "+noOfAckedWords);
>>>>    }
>>>>
>>>>    @Override
>>>>    public void fail(Object msgId) {
>>>>        super.fail(msgId);
>>>>        noOfFailedWords++;
>>>>        System.err.println("ERROR: " + context.getThisComponentId() + "
>>>> " + msgId + " no of words failed " + noOfFailedWords);
>>>>
>>>>    }
>>>> }
>>>>
>>>> And my bolt looks like this:
>>>>
>>>> package tuc.LSH.storm.bolts;
>>>>
>>>> import backtype.storm.task.OutputCollector;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>> import sun.jvm.hotspot.runtime.*;
>>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>>>
>>>>
>>>> import java.lang.Thread;
>>>> import java.util.*;
>>>>
>>>> /**
>>>> * Created by mixtou on 17/5/15.
>>>> */
>>>> public class LSHBolt extends BaseRichBolt {
>>>>    private int task_id;
>>>>    private OutputCollector collector;
>>>>    private UniversalBasicWindow universalBasicWindow;
>>>>
>>>>    private String streamId;
>>>>    private String time;
>>>>    private Float value;
>>>>
>>>>    @Override
>>>>    public void prepare(Map conf, TopologyContext topologyContext,
>>>> OutputCollector outputCollector) {
>>>>        this.task_id = topologyContext.getThisTaskIndex();
>>>>        this.collector = outputCollector;
>>>>        this.universalBasicWindow = new UniversalBasicWindow();
>>>>        streamId = null;
>>>>        time = null;
>>>>        value = 0f;
>>>>        System.err.println("New Bolt with id: " + task_id);
>>>>
>>>>    }
>>>>
>>>>    @Override
>>>>    public void execute(Tuple tuple) {
>>>>
>>>>        if (tuple.getSourceStreamId().equals("sync")) {
>>>>            System.out.println("Bolt task id " + task_id + " received
>>>> from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>>>            System.out.println("Normalizing: Basic Window of Bolt " +
>>>> task_id);
>>>>            universalBasicWindow.normalize(); //fill the rest of the
>>>> streams with last received value to make them same size
>>>>            universalBasicWindow = null;
>>>>            universalBasicWindow = new UniversalBasicWindow();
>>>>
>>>>            collector.ack(tuple);
>>>>        }
>>>>
>>>>        if (tuple.getSourceStreamId().equals("data")) {
>>>>
>>>>            streamId = tuple.getStringByField("streamId");
>>>>            time = tuple.getStringByField("timestamp");
>>>>            value = Float.parseFloat(tuple.getStringByField("value"));
>>>>
>>>>            universalBasicWindow.pushStream(streamId, value);
>>>>
>>>>            collector.ack(tuple);
>>>>
>>>>            if (universalBasicWindow.isFull(task_id)) { //check if
>>>> any stream of the window is full
>>>>
>>>> //                System.out.println("Univ. Basic Window of bolt " +
>>>> task_id + " is Filled Up");
>>>>
>>>>                collector.emit("bwFilled", new Values(task_id));
>>>>
>>>> //                universalBasicWindow.normalize();
>>>> //                universalBasicWindow = new UniversalBasicWindow();
>>>>
>>>> //                TODO:: Add basic window to sliding window and clear
>>>>
>>>>            }
>>>>
>>>>        }
>>>>
>>>>
>>>> //        collector.ack(tuple);
>>>>    }
>>>>
>>>>    @Override
>>>>    public void declareOutputFields(OutputFieldsDeclarer
>>>> outputFieldsDeclarer) {
>>>>        outputFieldsDeclarer.declareStream("bwFilled", new
>>>> Fields("task_id"));
>>>>
>>>>    }
>>>> }
>>>>
>>>> Any Ideas??
>>>>
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to