Sorry wrong code posted that was the bolts' code. The spout’ s code is:

package tuc.LSH.storm.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

/**
 * Created by mixtou on 15/5/15.
 */
//public class FileReaderSpout extends BaseRichSpout {
public class FileReaderSpout implements IRichSpout {

    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed;
    private TopologyContext context;
    private int spout_idx;
    private int spout_id;
    private Map config;
    private int noOfFailedWords;
    private int noOfAckedWords;
//    private BufferedReader reader;


    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("data", new Fields("streamId", 
"timestamp", "value"));


    }

    @Override
    public void open(Map config, TopologyContext topologyContext, 
SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.spout_idx = context.getThisTaskIndex();
        this.spout_id = context.getThisTaskId();
        this.collector = spoutOutputCollector;
        this.config = config;
        this.completed = false;
        this.noOfFailedWords = 0;
        this.noOfAckedWords = 0;

        try {
            System.err.println("Reading File: " + 
config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
            this.fileReader = new 
FileReader(config.get(file_to_read()).toString());
//            this.reader = new BufferedReader(fileReader);

        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file [" + 
config.get(file_to_read()) + "]");
        }

    }

    @Override
    public void nextTuple() {

        if (this.completed) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {

            String str;
            BufferedReader reader = new BufferedReader(fileReader);

            while ((str = reader.readLine()) != null) {

                if (str.isEmpty()) {
                    return;
                }
                String[] temp = str.split(",");
//                    System.err.println("============== "+temp[0] + " + " + 
temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
                collector.emit("data", new Values(temp[0], temp[2], temp[3]), 
temp[0]); //emmit the correct data to next bolt without guarantee delivery
               
            }

        } catch (Exception e) {
//            System.err.println("Error Reading Tuple: " + e);
            throw new RuntimeException("Error Reading Tuple ", e);
        } finally {
            System.err.println("Finished Reading File. Message From Spout: " + 
spout_idx);
            this.completed = true;

        }

    }

    private String file_to_read() {
//        this.spout_id = context.getThisTaskId();
        if (Consts.NO_OF_SPOUTS > 1) {
            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
            return "data" + file_no;
        } else {
            return "data";
        }
    }

    @Override
    public void ack(Object msgId) {
//        super.ack(msgId);
        noOfAckedWords++;
//        System.out.println("OK tuple acked from bolt: " + msgId+" no of acked 
word "+noOfAckedWords);
    }

    @Override
    public void fail(Object msgId) {
//        super.fail(msgId);
        noOfFailedWords++;
        System.err.println("ERROR: " + context.getThisComponentId() + " " + 
msgId + " no of words failed " + noOfFailedWords);

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void close() {
        try {
            fileReader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }
}



> On 5 Ιουν 2015, at 12:12, Michail Toutoudakis <[email protected]> wrote:
> 
> I tried your solution but nothing happened. I still continue to loose many 
> values. However i found a file reader spout implementation that worked 
> without loosing a single value. Bellow is the code for anyone that might have 
> the same problem:
> 
> package tuc.LSH.storm.bolts;
> 
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import sun.jvm.hotspot.runtime.*;
> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
> import tuc.LSH.core.timeseries.UniversalBasicWindow;
> 
> 
> import java.lang.Thread;
> import java.util.*;
> 
> /**
>  * Created by mixtou on 17/5/15.
>  */
> public class LSHBolt extends BaseRichBolt {
>     private int task_id;
>     private OutputCollector collector;
>     private UniversalBasicWindow universalBasicWindow;
> 
>     private String streamId;
>     private String time;
>     private Float value;
> 
>     @Override
>     public void prepare(Map conf, TopologyContext topologyContext, 
> OutputCollector outputCollector) {
>         this.task_id = topologyContext.getThisTaskIndex();
>         this.collector = outputCollector;
>         this.universalBasicWindow = new UniversalBasicWindow();
>         streamId = null;
>         time = null;
>         value = 0f;
>         System.err.println("New Bolt with id: " + task_id);
> 
>     }
> 
>     @Override
>     public void execute(Tuple tuple) {
> 
>         if (tuple.getSourceStreamId().equals("sync")) {
>             System.out.println("Bolt task id " + task_id + " received from " 
> + tuple.getSourceComponent() + " message " + tuple.getString(0));
>             System.out.println("Normalizing: Basic Window of Bolt " + 
> task_id);
>             universalBasicWindow.normalize(); //fill the rest of the streams 
> with last received value to make them same size
>             universalBasicWindow = null;
>             universalBasicWindow = new UniversalBasicWindow();
> 
>         }
> 
>         if (tuple.getSourceStreamId().equals("data")) {
> 
>             streamId = tuple.getStringByField("streamId");
>             time = tuple.getStringByField("timestamp");
>             value = Float.parseFloat(tuple.getStringByField("value"));
> 
>             universalBasicWindow.pushStream(streamId, value);
> 
>             if (universalBasicWindow.isFull(task_id)) { //check if any stream 
> of the window is full
> 
> //                System.out.println("Univ. Basic Window of bolt " + task_id 
> + " is Filled Up");
> 
>                 collector.emit("bwFilled", new Values(task_id));
> 
> //                universalBasicWindow.normalize();
> //                universalBasicWindow = new UniversalBasicWindow();
> 
> //                TODO:: Add basic window to sliding window and clear
> 
>             }
> 
>         }
> 
> 
> //        System.err.println("SourceComponent: "+tuple.getSourceComponent());
> //        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
> //        System.err.println("Source Task: "+tuple.getSourceTask());
> //        System.err.println("SourceGlobalStreamId: 
> "+tuple.getSourceGlobalStreamid());
> //        System.err.println("MessageId: "+tuple.getMessageId());
> 
>         collector.ack(tuple);
>     }
> 
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer 
> outputFieldsDeclarer) {
>         outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));
> 
>     }
> }
> 
>> On 4 Ιουν 2015, at 20:29, Matthias J. Sax <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Hi,
>> 
>> You are looping within "nextTuple()" to emit a tuple for each lines for
>> the whole file. This is "bad practice" because the spout is prevented to
>> take "acks" while "nextTuple()" is executing. I guess, that is the
>> reason why your tuples time out and fail.
>> 
>> You should return from "nextTuple()" after emitting a single record.
>> This should solve your problem as the Spout can process incoming acks
>> between to calls to "nextTuple()". Just instantiate your BufferedReader
>> in "open()" and close it if "this.completed" is true.
>> 
>> Hope this helps.
>> 
>> 
>> -Matthias
>> 
>> 
>> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote:
>>> I am using a bolt to read data from a text file, and send them to a
>>> bolt. However i am loosing too many values, the Fail. 
>>> I am newbie to storm and i don’t know where to look to debug this issue.
>>> Any ideas??
>>> 
>>> My Storm Spout code is:
>>> 
>>> package tuc.LSH.storm.spouts;
>>> 
>>> import backtype.storm.spout.SpoutOutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichSpout;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Values;
>>> import backtype.storm.utils.Utils;
>>> import tuc.LSH.conf.Consts;
>>> 
>>> import java.io.BufferedReader;
>>> import java.io.FileNotFoundException;
>>> import java.io.FileReader;
>>> import java.util.Map;
>>> 
>>> /**
>>> * Created by mixtou on 15/5/15.
>>> */
>>> public class FileReaderSpout extends BaseRichSpout {
>>> 
>>>    private SpoutOutputCollector collector;
>>>    private FileReader fileReader;
>>>    private boolean completed;
>>>    private TopologyContext context;
>>>    private int spout_idx;
>>>    private int spout_id;
>>>    private Map config;
>>>    private int noOfFailedWords;
>>>    private int noOfAckedWords;
>>> 
>>> 
>>>    @Override
>>>    public void declareOutputFields(OutputFieldsDeclarer
>>> outputFieldsDeclarer) {
>>>        outputFieldsDeclarer.declareStream("data",
>>> new Fields("streamId", "timestamp", "value"));
>>> 
>>> 
>>>    }
>>> 
>>>    @Override
>>>    public void open(Map config, TopologyContext
>>> topologyContext, SpoutOutputCollector spoutOutputCollector) {
>>>        this.context = topologyContext;
>>>        this.spout_idx = context.getThisTaskIndex();
>>>        this.spout_id = context.getThisTaskId();
>>>        this.collector = spoutOutputCollector;
>>>        this.config = config;
>>>        this.completed = false;
>>>        this.noOfFailedWords = 0;
>>>        this.noOfAckedWords = 0;
>>> 
>>>        try {
>>>            System.err.println("Reading File: " +
>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>> 
>>> this.fileReader = new FileReader(config.get(file_to_read()).toString());
>>>        } catch (FileNotFoundException e) {
>>>            throw new RuntimeException("Error reading file [" +
>>> config.get(file_to_read()) + "]");
>>>        }
>>> 
>>>    }
>>> 
>>>    @Override
>>>    public void nextTuple() {
>>> 
>>>        if (this.completed) {
>>>            return;
>>>        }
>>> 
>>>        try {
>>> 
>>>            String str;
>>>            BufferedReader reader = new BufferedReader(fileReader);
>>> 
>>>            while ((str = reader.readLine()) != null) {
>>> 
>>>                if (str.isEmpty()) {
>>>                    return;
>>>                }
>>>                String[] temp = str.split(",");
>>> //                    System.err.println("============== "+temp[0] + " +
>>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>> 
>>>                collector.emit("data",
>>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct
>>> data to next bolt with guarantee delivery
>>> 
>>>            }
>>> 
>>>        } catch (Exception e) {
>>>            System.err.println("Error Reading Tuple: " + e);
>>>            throw new RuntimeException("Error Reading Tuple ", e);
>>>        } finally {
>>>            System.err.println("Finished Reading File. Message From
>>> Spout: " + spout_idx);
>>>            this.completed = true;
>>> 
>>>        }
>>> 
>>>        Utils.sleep(100);
>>>    }
>>> 
>>>    private String file_to_read() {
>>> //        this.spout_id = context.getThisTaskId();
>>>        if (Consts.NO_OF_SPOUTS > 1) {
>>>            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>>            return "data" + file_no;
>>>        } else {
>>>            return "data";
>>>        }
>>>    }
>>> 
>>>    @Override
>>>    public void ack(Object msgId) {
>>>        super.ack(msgId);
>>>        noOfAckedWords++;
>>> //        System.out.println("OK tuple acked from bolt: " + msgId+" no
>>> of acked word "+noOfAckedWords);
>>>    }
>>> 
>>>    @Override
>>>    public void fail(Object msgId) {
>>>        super.fail(msgId);
>>>        noOfFailedWords++;
>>>        System.err.println("ERROR: " + context.getThisComponentId() + "
>>> " + msgId + " no of words failed " + noOfFailedWords);
>>> 
>>>    }
>>> }
>>> 
>>> And my bolt looks like this:
>>> 
>>> package tuc.LSH.storm.bolts;
>>> 
>>> import backtype.storm.task.OutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>> import sun.jvm.hotspot.runtime.*;
>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>> 
>>> 
>>> import java.lang.Thread;
>>> import java.util.*;
>>> 
>>> /**
>>> * Created by mixtou on 17/5/15.
>>> */
>>> public class LSHBolt extends BaseRichBolt {
>>>    private int task_id;
>>>    private OutputCollector collector;
>>>    private UniversalBasicWindow universalBasicWindow;
>>> 
>>>    private String streamId;
>>>    private String time;
>>>    private Float value;
>>> 
>>>    @Override
>>>    public void prepare(Map conf, TopologyContext topologyContext, 
>>> OutputCollector outputCollector) {
>>>        this.task_id = topologyContext.getThisTaskIndex();
>>>        this.collector = outputCollector;
>>>        this.universalBasicWindow = new UniversalBasicWindow();
>>>        streamId = null;
>>>        time = null;
>>>        value = 0f;
>>>        System.err.println("New Bolt with id: " + task_id);
>>> 
>>>    }
>>> 
>>>    @Override
>>>    public void execute(Tuple tuple) {
>>> 
>>>        if (tuple.getSourceStreamId().equals("sync")) {
>>>            System.out.println("Bolt task id " + task_id + " received from " 
>>> + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>>            System.out.println("Normalizing: Basic Window of Bolt " + 
>>> task_id);
>>>            universalBasicWindow.normalize(); //fill the rest of the streams 
>>> with last received value to make them same size
>>>            universalBasicWindow = null;
>>>            universalBasicWindow = new UniversalBasicWindow();
>>> 
>>>            collector.ack(tuple);
>>>        }
>>> 
>>>        if (tuple.getSourceStreamId().equals("data")) {
>>> 
>>>            streamId = tuple.getStringByField("streamId");
>>>            time = tuple.getStringByField("timestamp");
>>>            value = Float.parseFloat(tuple.getStringByField("value"));
>>> 
>>>            universalBasicWindow.pushStream(streamId, value);
>>> 
>>>            collector.ack(tuple);
>>> 
>>>            if (universalBasicWindow.isFull(task_id)) { //check if any 
>>> stream of the window is full
>>> 
>>> //                System.out.println("Univ. Basic Window of bolt " + 
>>> task_id + " is Filled Up");
>>> 
>>>                collector.emit("bwFilled", new Values(task_id));
>>> 
>>> //                universalBasicWindow.normalize();
>>> //                universalBasicWindow = new UniversalBasicWindow();
>>> 
>>> //                TODO:: Add basic window to sliding window and clear
>>> 
>>>            }
>>> 
>>>        }
>>> 
>>> 
>>> //        collector.ack(tuple);
>>>    }
>>> 
>>>    @Override
>>>    public void declareOutputFields(OutputFieldsDeclarer 
>>> outputFieldsDeclarer) {
>>>        outputFieldsDeclarer.declareStream("bwFilled", new 
>>> Fields("task_id"));
>>> 
>>>    }
>>> }
>>> 
>>> Any Ideas??
>>> 
>>> 
>> 
> 

Reply via email to