The nextTuple logic should be redone around a java.util.Scanner, not a
BufferedReader. Something like this:

Scanner scanner;

void open(...) {
    //other code
    scanner = new Scanner(new FileInputStream("location_here"));
}

void nextTuple() {
    if (scanner.hasNextLine()) {
        collector.emit("data", new Values(scanner.nextLine()));
    }
}

On Fri, Jun 5, 2015 at 8:21 AM, Andrew Xor <[email protected]>
wrote:

> Your function as it stands emits from nextTuple after the file has been
> read; which is against basically every storm principle that's out there.
> It's not scalable and will lead to a lot of issues, especially when running
> on a cluster. What Matthias is suggesting (and you should do) is that for
> each tuple you emit you need to return from nextTuple and call it again to
> emit another one. Basically you should have one call for nextTuple per
> batch/tuple that you emit, which as per your current implementation is one
> per tokenized line read.
>
> Hope this helps.
>
>
> On Fri, Jun 5, 2015 at 3:04 PM, Michail Toutoudakis <[email protected]>
> wrote:
>
>> Sorry for my stupid question, but what do you mean return from each emit?
>> Add return; after emiting tuple os something else i haven’t understood?
>> Could you post a code sample?
>>
>> On 5 Ιουν 2015, at 14:44, Matthias J. Sax <[email protected]>
>> wrote:
>>
>> Hi,
>>
>> I don't see big difference in both Spout implementations... They look
>> basically the same... I would still recommend, to return from
>> nextTuple() after each emit.
>>
>> If you don't want to return each time, another reason for missing values
>> might be the return statement in the while-loop:
>>
>>                if (str.isEmpty()) {
>>                    return;
>>                }
>>
>>
>> This should be "continue" -- otherwise you return from nextTuple()
>> without processing the whole file, if an empty line is in between.
>>
>>
>> -Matthias
>>
>> On 06/05/2015 11:16 AM, Michail Toutoudakis wrote:
>>
>> Sorry wrong code posted that was the bolts' code. The spout’ s code is:
>>
>> package tuc.LSH.storm.spouts;
>>
>> import backtype.storm.spout.SpoutOutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.IRichSpout;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichSpout;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Values;
>> import backtype.storm.utils.Utils;
>> import tuc.LSH.conf.Consts;
>>
>> import java.io.BufferedReader;
>> import java.io.FileNotFoundException;
>> import java.io.FileReader;
>> import java.io.IOException;
>> import java.util.Map;
>>
>> /**
>> * Created by mixtou on 15/5/15.
>> */
>> //public class FileReaderSpout extends BaseRichSpout {
>> public class FileReaderSpout implements IRichSpout {
>>
>>    private SpoutOutputCollector collector;
>>    private FileReader fileReader;
>>    private boolean completed;
>>    private TopologyContext context;
>>    private int spout_idx;
>>    private int spout_id;
>>    private Map config;
>>    private int noOfFailedWords;
>>    private int noOfAckedWords;
>> //    private BufferedReader reader;
>>
>>
>>    @Override
>>    public void declareOutputFields(OutputFieldsDeclarer
>> outputFieldsDeclarer) {
>>        outputFieldsDeclarer.declareStream("data", new Fields("streamId",
>> "timestamp", "value"));
>>
>>
>>    }
>>
>>    @Override
>>    public void open(Map config, TopologyContext topologyContext,
>> SpoutOutputCollector spoutOutputCollector) {
>>        this.context = topologyContext;
>>        this.spout_idx = context.getThisTaskIndex();
>>        this.spout_id = context.getThisTaskId();
>>        this.collector = spoutOutputCollector;
>>        this.config = config;
>>        this.completed = false;
>>        this.noOfFailedWords = 0;
>>        this.noOfAckedWords = 0;
>>
>>        try {
>>            System.err.println("Reading File: " +
>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>            this.fileReader = new
>> FileReader(config.get(file_to_read()).toString());
>> //            this.reader = new BufferedReader(fileReader);
>>
>>        } catch (FileNotFoundException e) {
>>            throw new RuntimeException("Error reading file [" +
>> config.get(file_to_read()) + "]");
>>        }
>>
>>    }
>>
>>    @Override
>>    public void nextTuple() {
>>
>>        if (this.completed) {
>>            try {
>>                Thread.sleep(1);
>>            } catch (InterruptedException e) {
>>                e.printStackTrace();
>>            }
>>        }
>>
>>        try {
>>
>>            String str;
>>            BufferedReader reader = new BufferedReader(fileReader);
>>
>>            while ((str = reader.readLine()) != null) {
>>
>>                if (str.isEmpty()) {
>>                    return;
>>                }
>>                String[] temp = str.split(",");
>> //                    System.err.println("============== "+temp[0] + " +
>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>                collector.emit("data", new Values(temp[0], temp[2],
>> temp[3]), temp[0]); //emmit the correct data to next bolt without guarantee
>> delivery
>>
>>            }
>>
>>        } catch (Exception e) {
>> //            System.err.println("Error Reading Tuple: " + e);
>>            throw new RuntimeException("Error Reading Tuple ", e);
>>        } finally {
>>            System.err.println("Finished Reading File. Message From Spout:
>> " + spout_idx);
>>            this.completed = true;
>>
>>        }
>>
>>    }
>>
>>    private String file_to_read() {
>> //        this.spout_id = context.getThisTaskId();
>>        if (Consts.NO_OF_SPOUTS > 1) {
>>            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>            return "data" + file_no;
>>        } else {
>>            return "data";
>>        }
>>    }
>>
>>    @Override
>>    public void ack(Object msgId) {
>> //        super.ack(msgId);
>>        noOfAckedWords++;
>> //        System.out.println("OK tuple acked from bolt: " + msgId+" no of
>> acked word "+noOfAckedWords);
>>    }
>>
>>    @Override
>>    public void fail(Object msgId) {
>> //        super.fail(msgId);
>>        noOfFailedWords++;
>>        System.err.println("ERROR: " + context.getThisComponentId() + " "
>> + msgId + " no of words failed " + noOfFailedWords);
>>
>>    }
>>
>>    @Override
>>    public Map<String, Object> getComponentConfiguration() {
>>        return null;
>>    }
>>
>>    @Override
>>    public void close() {
>>        try {
>>            fileReader.close();
>>        } catch (IOException e) {
>>            e.printStackTrace();
>>        }
>>
>>    }
>>
>>    @Override
>>    public void activate() {
>>
>>    }
>>
>>    @Override
>>    public void deactivate() {
>>
>>    }
>> }
>>
>>
>>
>>
>> On 5 Ιουν 2015, at 12:12, Michail Toutoudakis <[email protected]
>> <mailto:[email protected] <[email protected]>>> wrote:
>>
>> I tried your solution but nothing happened. I still continue to loose
>> many values. However i found a file reader spout implementation that
>> worked without loosing a single value. Bellow is the code for anyone
>> that might have the same problem:
>>
>> package tuc.LSH.storm.bolts;
>>
>> import backtype.storm.task.OutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichBolt;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Tuple;
>> import backtype.storm.tuple.Values;
>> import sun.jvm.hotspot.runtime.*;
>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>
>>
>> import java.lang.Thread;
>> import java.util.*;
>>
>> /**
>> * Created by mixtou on 17/5/15.
>> */
>> public class LSHBolt extends BaseRichBolt {
>>    private int task_id;
>>    private OutputCollector collector;
>>    private UniversalBasicWindow universalBasicWindow;
>>
>>    private String streamId;
>>    private String time;
>>    private Float value;
>>
>>    @Override
>>    public void prepare(Map conf, TopologyContext topologyContext,
>> OutputCollector outputCollector) {
>>        this.task_id = topologyContext.getThisTaskIndex();
>>        this.collector = outputCollector;
>>        this.universalBasicWindow = new UniversalBasicWindow();
>>        streamId = null;
>>        time = null;
>>        value = 0f;
>>        System.err.println("New Bolt with id: " + task_id);
>>
>>    }
>>
>>    @Override
>>    public void execute(Tuple tuple) {
>>
>>        if (tuple.getSourceStreamId().equals("sync")) {
>>            System.out.println("Bolt task id " + task_id + " received from
>> " + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>            System.out.println("Normalizing: Basic Window of Bolt " +
>> task_id);
>>            universalBasicWindow.normalize(); //fill the rest of the
>> streams with last received value to make them same size
>>            universalBasicWindow = null;
>>            universalBasicWindow = new UniversalBasicWindow();
>>
>>        }
>>
>>        if (tuple.getSourceStreamId().equals("data")) {
>>
>>            streamId = tuple.getStringByField("streamId");
>>            time = tuple.getStringByField("timestamp");
>>            value = Float.parseFloat(tuple.getStringByField("value"));
>>
>>            universalBasicWindow.pushStream(streamId, value);
>>
>>            if (universalBasicWindow.isFull(task_id)) { //check if any
>> stream of the window is full
>>
>> //                System.out.println("Univ. Basic Window of bolt " +
>> task_id + " is Filled Up");
>>
>>                collector.emit("bwFilled", new Values(task_id));
>>
>> //                universalBasicWindow.normalize();
>> //                universalBasicWindow = new UniversalBasicWindow();
>>
>> //                TODO:: Add basic window to sliding window and clear
>>
>>            }
>>
>>        }
>>
>>
>> //        System.err.println("SourceComponent:
>> "+tuple.getSourceComponent());
>> //        System.err.println("SourceStreamId:
>> "+tuple.getSourceStreamId());
>> //        System.err.println("Source Task: "+tuple.getSourceTask());
>> //        System.err.println("SourceGlobalStreamId:
>> "+tuple.getSourceGlobalStreamid());
>> //        System.err.println("MessageId: "+tuple.getMessageId());
>>
>>        collector.ack(tuple);
>>    }
>>
>>    @Override
>>    public void declareOutputFields(OutputFieldsDeclarer
>> outputFieldsDeclarer) {
>>        outputFieldsDeclarer.declareStream("bwFilled", new
>> Fields("task_id"));
>>
>>    }
>> }
>>
>> On 4 Ιουν 2015, at 20:29, Matthias J. Sax
>> <[email protected]
>> <mailto:[email protected] <[email protected]>>>
>> wrote:
>>
>> Hi,
>>
>> You are looping within "nextTuple()" to emit a tuple for each lines for
>> the whole file. This is "bad practice" because the spout is prevented to
>> take "acks" while "nextTuple()" is executing. I guess, that is the
>> reason why your tuples time out and fail.
>>
>> You should return from "nextTuple()" after emitting a single record.
>> This should solve your problem as the Spout can process incoming acks
>> between to calls to "nextTuple()". Just instantiate your BufferedReader
>> in "open()" and close it if "this.completed" is true.
>>
>> Hope this helps.
>>
>>
>> -Matthias
>>
>>
>> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote:
>>
>> I am using a bolt to read data from a text file, and send them to a
>> bolt. However i am loosing too many values, the Fail.
>> I am newbie to storm and i don’t know where to look to debug this issue.
>> Any ideas??
>>
>> My Storm Spout code is:
>>
>> package tuc.LSH.storm.spouts;
>>
>> import backtype.storm.spout.SpoutOutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichSpout;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Values;
>> import backtype.storm.utils.Utils;
>> import tuc.LSH.conf.Consts;
>>
>> import java.io.BufferedReader;
>> import java.io.FileNotFoundException;
>> import java.io.FileReader;
>> import java.util.Map;
>>
>> /**
>> * Created by mixtou on 15/5/15.
>> */
>> public class FileReaderSpout extends BaseRichSpout {
>>
>>   private SpoutOutputCollector collector;
>>   private FileReader fileReader;
>>   private boolean completed;
>>   private TopologyContext context;
>>   private int spout_idx;
>>   private int spout_id;
>>   private Map config;
>>   private int noOfFailedWords;
>>   private int noOfAckedWords;
>>
>>
>>   @Override
>>   public void declareOutputFields(OutputFieldsDeclarer
>> outputFieldsDeclarer) {
>>       outputFieldsDeclarer.declareStream("data",
>> new Fields("streamId", "timestamp", "value"));
>>
>>
>>   }
>>
>>   @Override
>>   public void open(Map config, TopologyContext
>> topologyContext, SpoutOutputCollector spoutOutputCollector) {
>>       this.context = topologyContext;
>>       this.spout_idx = context.getThisTaskIndex();
>>       this.spout_id = context.getThisTaskId();
>>       this.collector = spoutOutputCollector;
>>       this.config = config;
>>       this.completed = false;
>>       this.noOfFailedWords = 0;
>>       this.noOfAckedWords = 0;
>>
>>       try {
>>           System.err.println("Reading File: " +
>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>
>> this.fileReader = new FileReader(config.get(file_to_read()).toString());
>>       } catch (FileNotFoundException e) {
>>           throw new RuntimeException("Error reading file [" +
>> config.get(file_to_read()) + "]");
>>       }
>>
>>   }
>>
>>   @Override
>>   public void nextTuple() {
>>
>>       if (this.completed) {
>>           return;
>>       }
>>
>>       try {
>>
>>           String str;
>>           BufferedReader reader = new BufferedReader(fileReader);
>>
>>           while ((str = reader.readLine()) != null) {
>>
>>               if (str.isEmpty()) {
>>                   return;
>>               }
>>               String[] temp = str.split(",");
>> //                    System.err.println("============== "+temp[0] + " +
>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>
>>               collector.emit("data",
>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct
>> data to next bolt with guarantee delivery
>>
>>           }
>>
>>       } catch (Exception e) {
>>           System.err.println("Error Reading Tuple: " + e);
>>           throw new RuntimeException("Error Reading Tuple ", e);
>>       } finally {
>>           System.err.println("Finished Reading File. Message From
>> Spout: " + spout_idx);
>>           this.completed = true;
>>
>>       }
>>
>>       Utils.sleep(100);
>>   }
>>
>>   private String file_to_read() {
>> //        this.spout_id = context.getThisTaskId();
>>       if (Consts.NO_OF_SPOUTS > 1) {
>>           int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>           return "data" + file_no;
>>       } else {
>>           return "data";
>>       }
>>   }
>>
>>   @Override
>>   public void ack(Object msgId) {
>>       super.ack(msgId);
>>       noOfAckedWords++;
>> //        System.out.println("OK tuple acked from bolt: " + msgId+" no
>> of acked word "+noOfAckedWords);
>>   }
>>
>>   @Override
>>   public void fail(Object msgId) {
>>       super.fail(msgId);
>>       noOfFailedWords++;
>>       System.err.println("ERROR: " + context.getThisComponentId() + "
>> " + msgId + " no of words failed " + noOfFailedWords);
>>
>>   }
>> }
>>
>> And my bolt looks like this:
>>
>> package tuc.LSH.storm.bolts;
>>
>> import backtype.storm.task.OutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichBolt;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Tuple;
>> import backtype.storm.tuple.Values;
>> import sun.jvm.hotspot.runtime.*;
>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>
>>
>> import java.lang.Thread;
>> import java.util.*;
>>
>> /**
>> * Created by mixtou on 17/5/15.
>> */
>> public class LSHBolt extends BaseRichBolt {
>>   private int task_id;
>>   private OutputCollector collector;
>>   private UniversalBasicWindow universalBasicWindow;
>>
>>   private String streamId;
>>   private String time;
>>   private Float value;
>>
>>   @Override
>>   public void prepare(Map conf, TopologyContext topologyContext,
>> OutputCollector outputCollector) {
>>       this.task_id = topologyContext.getThisTaskIndex();
>>       this.collector = outputCollector;
>>       this.universalBasicWindow = new UniversalBasicWindow();
>>       streamId = null;
>>       time = null;
>>       value = 0f;
>>       System.err.println("New Bolt with id: " + task_id);
>>
>>   }
>>
>>   @Override
>>   public void execute(Tuple tuple) {
>>
>>       if (tuple.getSourceStreamId().equals("sync")) {
>>           System.out.println("Bolt task id " + task_id + " received
>> from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>           System.out.println("Normalizing: Basic Window of Bolt " +
>> task_id);
>>           universalBasicWindow.normalize(); //fill the rest of the
>> streams with last received value to make them same size
>>           universalBasicWindow = null;
>>           universalBasicWindow = new UniversalBasicWindow();
>>
>>           collector.ack(tuple);
>>       }
>>
>>       if (tuple.getSourceStreamId().equals("data")) {
>>
>>           streamId = tuple.getStringByField("streamId");
>>           time = tuple.getStringByField("timestamp");
>>           value = Float.parseFloat(tuple.getStringByField("value"));
>>
>>           universalBasicWindow.pushStream(streamId, value);
>>
>>           collector.ack(tuple);
>>
>>           if (universalBasicWindow.isFull(task_id)) { //check if
>> any stream of the window is full
>>
>> //                System.out.println("Univ. Basic Window of bolt " +
>> task_id + " is Filled Up");
>>
>>               collector.emit("bwFilled", new Values(task_id));
>>
>> //                universalBasicWindow.normalize();
>> //                universalBasicWindow = new UniversalBasicWindow();
>>
>> //                TODO:: Add basic window to sliding window and clear
>>
>>           }
>>
>>       }
>>
>>
>> //        collector.ack(tuple);
>>   }
>>
>>   @Override
>>   public void declareOutputFields(OutputFieldsDeclarer
>> outputFieldsDeclarer) {
>>       outputFieldsDeclarer.declareStream("bwFilled", new
>> Fields("task_id"));
>>
>>   }
>> }
>>
>> Any Ideas??
>>
>>
>>
>

Reply via email to