Sorry for my stupid question, but what do you mean return from each emit? Add return; after emiting tuple os something else i haven’t understood? Could you post a code sample?
> On 5 Ιουν 2015, at 14:44, Matthias J. Sax <[email protected]> > wrote: > > Hi, > > I don't see big difference in both Spout implementations... They look > basically the same... I would still recommend, to return from > nextTuple() after each emit. > > If you don't want to return each time, another reason for missing values > might be the return statement in the while-loop: > >> if (str.isEmpty()) { >> return; >> } > > This should be "continue" -- otherwise you return from nextTuple() > without processing the whole file, if an empty line is in between. > > > -Matthias > > On 06/05/2015 11:16 AM, Michail Toutoudakis wrote: >> Sorry wrong code posted that was the bolts' code. The spout’ s code is: >> >> package tuc.LSH.storm.spouts; >> >> import backtype.storm.spout.SpoutOutputCollector; >> import backtype.storm.task.TopologyContext; >> import backtype.storm.topology.IRichSpout; >> import backtype.storm.topology.OutputFieldsDeclarer; >> import backtype.storm.topology.base.BaseRichSpout; >> import backtype.storm.tuple.Fields; >> import backtype.storm.tuple.Values; >> import backtype.storm.utils.Utils; >> import tuc.LSH.conf.Consts; >> >> import java.io.BufferedReader; >> import java.io.FileNotFoundException; >> import java.io.FileReader; >> import java.io.IOException; >> import java.util.Map; >> >> /** >> * Created by mixtou on 15/5/15. >> */ >> //public class FileReaderSpout extends BaseRichSpout { >> public class FileReaderSpout implements IRichSpout { >> >> private SpoutOutputCollector collector; >> private FileReader fileReader; >> private boolean completed; >> private TopologyContext context; >> private int spout_idx; >> private int spout_id; >> private Map config; >> private int noOfFailedWords; >> private int noOfAckedWords; >> // private BufferedReader reader; >> >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer >> outputFieldsDeclarer) { >> outputFieldsDeclarer.declareStream("data", new Fields("streamId", >> "timestamp", "value")); >> >> >> } >> >> @Override >> public void open(Map config, TopologyContext topologyContext, >> SpoutOutputCollector spoutOutputCollector) { >> this.context = topologyContext; >> this.spout_idx = context.getThisTaskIndex(); >> this.spout_id = context.getThisTaskId(); >> this.collector = spoutOutputCollector; >> this.config = config; >> this.completed = false; >> this.noOfFailedWords = 0; >> this.noOfAckedWords = 0; >> >> try { >> System.err.println("Reading File: " + >> config.get(file_to_read()).toString() + " Spout index: " + spout_idx); >> this.fileReader = new >> FileReader(config.get(file_to_read()).toString()); >> // this.reader = new BufferedReader(fileReader); >> >> } catch (FileNotFoundException e) { >> throw new RuntimeException("Error reading file [" + >> config.get(file_to_read()) + "]"); >> } >> >> } >> >> @Override >> public void nextTuple() { >> >> if (this.completed) { >> try { >> Thread.sleep(1); >> } catch (InterruptedException e) { >> e.printStackTrace(); >> } >> } >> >> try { >> >> String str; >> BufferedReader reader = new BufferedReader(fileReader); >> >> while ((str = reader.readLine()) != null) { >> >> if (str.isEmpty()) { >> return; >> } >> String[] temp = str.split(","); >> // System.err.println("============== "+temp[0] + " + " + >> temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value >> collector.emit("data", new Values(temp[0], temp[2], temp[3]), >> temp[0]); //emmit the correct data to next bolt without guarantee delivery >> >> } >> >> } catch (Exception e) { >> // System.err.println("Error Reading Tuple: " + e); >> throw new RuntimeException("Error Reading Tuple ", e); >> } finally { >> System.err.println("Finished Reading File. Message From Spout: " >> + spout_idx); >> this.completed = true; >> >> } >> >> } >> >> private String file_to_read() { >> // this.spout_id = context.getThisTaskId(); >> if (Consts.NO_OF_SPOUTS > 1) { >> int file_no = spout_idx % Consts.NO_OF_SPOUTS; >> return "data" + file_no; >> } else { >> return "data"; >> } >> } >> >> @Override >> public void ack(Object msgId) { >> // super.ack(msgId); >> noOfAckedWords++; >> // System.out.println("OK tuple acked from bolt: " + msgId+" no of >> acked word "+noOfAckedWords); >> } >> >> @Override >> public void fail(Object msgId) { >> // super.fail(msgId); >> noOfFailedWords++; >> System.err.println("ERROR: " + context.getThisComponentId() + " " + >> msgId + " no of words failed " + noOfFailedWords); >> >> } >> >> @Override >> public Map<String, Object> getComponentConfiguration() { >> return null; >> } >> >> @Override >> public void close() { >> try { >> fileReader.close(); >> } catch (IOException e) { >> e.printStackTrace(); >> } >> >> } >> >> @Override >> public void activate() { >> >> } >> >> @Override >> public void deactivate() { >> >> } >> } >> >> >> >> >>> On 5 Ιουν 2015, at 12:12, Michail Toutoudakis <[email protected] >>> <mailto:[email protected] <mailto:[email protected]>>> wrote: >>> >>> I tried your solution but nothing happened. I still continue to loose >>> many values. However i found a file reader spout implementation that >>> worked without loosing a single value. Bellow is the code for anyone >>> that might have the same problem: >>> >>> package tuc.LSH.storm.bolts; >>> >>> import backtype.storm.task.OutputCollector; >>> import backtype.storm.task.TopologyContext; >>> import backtype.storm.topology.OutputFieldsDeclarer; >>> import backtype.storm.topology.base.BaseRichBolt; >>> import backtype.storm.tuple.Fields; >>> import backtype.storm.tuple.Tuple; >>> import backtype.storm.tuple.Values; >>> import sun.jvm.hotspot.runtime.*; >>> import tuc.LSH.core.hashfunctions.HashFunctionsGen; >>> import tuc.LSH.core.timeseries.UniversalBasicWindow; >>> >>> >>> import java.lang.Thread; >>> import java.util.*; >>> >>> /** >>> * Created by mixtou on 17/5/15. >>> */ >>> public class LSHBolt extends BaseRichBolt { >>> private int task_id; >>> private OutputCollector collector; >>> private UniversalBasicWindow universalBasicWindow; >>> >>> private String streamId; >>> private String time; >>> private Float value; >>> >>> @Override >>> public void prepare(Map conf, TopologyContext topologyContext, >>> OutputCollector outputCollector) { >>> this.task_id = topologyContext.getThisTaskIndex(); >>> this.collector = outputCollector; >>> this.universalBasicWindow = new UniversalBasicWindow(); >>> streamId = null; >>> time = null; >>> value = 0f; >>> System.err.println("New Bolt with id: " + task_id); >>> >>> } >>> >>> @Override >>> public void execute(Tuple tuple) { >>> >>> if (tuple.getSourceStreamId().equals("sync")) { >>> System.out.println("Bolt task id " + task_id + " received from " >>> + tuple.getSourceComponent() + " message " + tuple.getString(0)); >>> System.out.println("Normalizing: Basic Window of Bolt " + >>> task_id); >>> universalBasicWindow.normalize(); //fill the rest of the streams >>> with last received value to make them same size >>> universalBasicWindow = null; >>> universalBasicWindow = new UniversalBasicWindow(); >>> >>> } >>> >>> if (tuple.getSourceStreamId().equals("data")) { >>> >>> streamId = tuple.getStringByField("streamId"); >>> time = tuple.getStringByField("timestamp"); >>> value = Float.parseFloat(tuple.getStringByField("value")); >>> >>> universalBasicWindow.pushStream(streamId, value); >>> >>> if (universalBasicWindow.isFull(task_id)) { //check if any >>> stream of the window is full >>> >>> // System.out.println("Univ. Basic Window of bolt " + >>> task_id + " is Filled Up"); >>> >>> collector.emit("bwFilled", new Values(task_id)); >>> >>> // universalBasicWindow.normalize(); >>> // universalBasicWindow = new UniversalBasicWindow(); >>> >>> // TODO:: Add basic window to sliding window and clear >>> >>> } >>> >>> } >>> >>> >>> // System.err.println("SourceComponent: >>> "+tuple.getSourceComponent()); >>> // System.err.println("SourceStreamId: "+tuple.getSourceStreamId()); >>> // System.err.println("Source Task: "+tuple.getSourceTask()); >>> // System.err.println("SourceGlobalStreamId: >>> "+tuple.getSourceGlobalStreamid()); >>> // System.err.println("MessageId: "+tuple.getMessageId()); >>> >>> collector.ack(tuple); >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer >>> outputFieldsDeclarer) { >>> outputFieldsDeclarer.declareStream("bwFilled", new >>> Fields("task_id")); >>> >>> } >>> } >>> >>>> On 4 Ιουν 2015, at 20:29, Matthias J. Sax >>>> <[email protected] <mailto:[email protected]> >>>> <mailto:[email protected] >>>> <mailto:[email protected]>>> wrote: >>>> >>>> Hi, >>>> >>>> You are looping within "nextTuple()" to emit a tuple for each lines for >>>> the whole file. This is "bad practice" because the spout is prevented to >>>> take "acks" while "nextTuple()" is executing. I guess, that is the >>>> reason why your tuples time out and fail. >>>> >>>> You should return from "nextTuple()" after emitting a single record. >>>> This should solve your problem as the Spout can process incoming acks >>>> between to calls to "nextTuple()". Just instantiate your BufferedReader >>>> in "open()" and close it if "this.completed" is true. >>>> >>>> Hope this helps. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote: >>>>> I am using a bolt to read data from a text file, and send them to a >>>>> bolt. However i am loosing too many values, the Fail. >>>>> I am newbie to storm and i don’t know where to look to debug this issue. >>>>> Any ideas?? >>>>> >>>>> My Storm Spout code is: >>>>> >>>>> package tuc.LSH.storm.spouts; >>>>> >>>>> import backtype.storm.spout.SpoutOutputCollector; >>>>> import backtype.storm.task.TopologyContext; >>>>> import backtype.storm.topology.OutputFieldsDeclarer; >>>>> import backtype.storm.topology.base.BaseRichSpout; >>>>> import backtype.storm.tuple.Fields; >>>>> import backtype.storm.tuple.Values; >>>>> import backtype.storm.utils.Utils; >>>>> import tuc.LSH.conf.Consts; >>>>> >>>>> import java.io.BufferedReader; >>>>> import java.io.FileNotFoundException; >>>>> import java.io.FileReader; >>>>> import java.util.Map; >>>>> >>>>> /** >>>>> * Created by mixtou on 15/5/15. >>>>> */ >>>>> public class FileReaderSpout extends BaseRichSpout { >>>>> >>>>> private SpoutOutputCollector collector; >>>>> private FileReader fileReader; >>>>> private boolean completed; >>>>> private TopologyContext context; >>>>> private int spout_idx; >>>>> private int spout_id; >>>>> private Map config; >>>>> private int noOfFailedWords; >>>>> private int noOfAckedWords; >>>>> >>>>> >>>>> @Override >>>>> public void declareOutputFields(OutputFieldsDeclarer >>>>> outputFieldsDeclarer) { >>>>> outputFieldsDeclarer.declareStream("data", >>>>> new Fields("streamId", "timestamp", "value")); >>>>> >>>>> >>>>> } >>>>> >>>>> @Override >>>>> public void open(Map config, TopologyContext >>>>> topologyContext, SpoutOutputCollector spoutOutputCollector) { >>>>> this.context = topologyContext; >>>>> this.spout_idx = context.getThisTaskIndex(); >>>>> this.spout_id = context.getThisTaskId(); >>>>> this.collector = spoutOutputCollector; >>>>> this.config = config; >>>>> this.completed = false; >>>>> this.noOfFailedWords = 0; >>>>> this.noOfAckedWords = 0; >>>>> >>>>> try { >>>>> System.err.println("Reading File: " + >>>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx); >>>>> >>>>> this.fileReader = new FileReader(config.get(file_to_read()).toString()); >>>>> } catch (FileNotFoundException e) { >>>>> throw new RuntimeException("Error reading file [" + >>>>> config.get(file_to_read()) + "]"); >>>>> } >>>>> >>>>> } >>>>> >>>>> @Override >>>>> public void nextTuple() { >>>>> >>>>> if (this.completed) { >>>>> return; >>>>> } >>>>> >>>>> try { >>>>> >>>>> String str; >>>>> BufferedReader reader = new BufferedReader(fileReader); >>>>> >>>>> while ((str = reader.readLine()) != null) { >>>>> >>>>> if (str.isEmpty()) { >>>>> return; >>>>> } >>>>> String[] temp = str.split(","); >>>>> // System.err.println("============== "+temp[0] + " + >>>>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value >>>>> >>>>> collector.emit("data", >>>>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct >>>>> data to next bolt with guarantee delivery >>>>> >>>>> } >>>>> >>>>> } catch (Exception e) { >>>>> System.err.println("Error Reading Tuple: " + e); >>>>> throw new RuntimeException("Error Reading Tuple ", e); >>>>> } finally { >>>>> System.err.println("Finished Reading File. Message From >>>>> Spout: " + spout_idx); >>>>> this.completed = true; >>>>> >>>>> } >>>>> >>>>> Utils.sleep(100); >>>>> } >>>>> >>>>> private String file_to_read() { >>>>> // this.spout_id = context.getThisTaskId(); >>>>> if (Consts.NO_OF_SPOUTS > 1) { >>>>> int file_no = spout_idx % Consts.NO_OF_SPOUTS; >>>>> return "data" + file_no; >>>>> } else { >>>>> return "data"; >>>>> } >>>>> } >>>>> >>>>> @Override >>>>> public void ack(Object msgId) { >>>>> super.ack(msgId); >>>>> noOfAckedWords++; >>>>> // System.out.println("OK tuple acked from bolt: " + msgId+" no >>>>> of acked word "+noOfAckedWords); >>>>> } >>>>> >>>>> @Override >>>>> public void fail(Object msgId) { >>>>> super.fail(msgId); >>>>> noOfFailedWords++; >>>>> System.err.println("ERROR: " + context.getThisComponentId() + " >>>>> " + msgId + " no of words failed " + noOfFailedWords); >>>>> >>>>> } >>>>> } >>>>> >>>>> And my bolt looks like this: >>>>> >>>>> package tuc.LSH.storm.bolts; >>>>> >>>>> import backtype.storm.task.OutputCollector; >>>>> import backtype.storm.task.TopologyContext; >>>>> import backtype.storm.topology.OutputFieldsDeclarer; >>>>> import backtype.storm.topology.base.BaseRichBolt; >>>>> import backtype.storm.tuple.Fields; >>>>> import backtype.storm.tuple.Tuple; >>>>> import backtype.storm.tuple.Values; >>>>> import sun.jvm.hotspot.runtime.*; >>>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen; >>>>> import tuc.LSH.core.timeseries.UniversalBasicWindow; >>>>> >>>>> >>>>> import java.lang.Thread; >>>>> import java.util.*; >>>>> >>>>> /** >>>>> * Created by mixtou on 17/5/15. >>>>> */ >>>>> public class LSHBolt extends BaseRichBolt { >>>>> private int task_id; >>>>> private OutputCollector collector; >>>>> private UniversalBasicWindow universalBasicWindow; >>>>> >>>>> private String streamId; >>>>> private String time; >>>>> private Float value; >>>>> >>>>> @Override >>>>> public void prepare(Map conf, TopologyContext topologyContext, >>>>> OutputCollector outputCollector) { >>>>> this.task_id = topologyContext.getThisTaskIndex(); >>>>> this.collector = outputCollector; >>>>> this.universalBasicWindow = new UniversalBasicWindow(); >>>>> streamId = null; >>>>> time = null; >>>>> value = 0f; >>>>> System.err.println("New Bolt with id: " + task_id); >>>>> >>>>> } >>>>> >>>>> @Override >>>>> public void execute(Tuple tuple) { >>>>> >>>>> if (tuple.getSourceStreamId().equals("sync")) { >>>>> System.out.println("Bolt task id " + task_id + " received >>>>> from " + tuple.getSourceComponent() + " message " + tuple.getString(0)); >>>>> System.out.println("Normalizing: Basic Window of Bolt " + >>>>> task_id); >>>>> universalBasicWindow.normalize(); //fill the rest of the >>>>> streams with last received value to make them same size >>>>> universalBasicWindow = null; >>>>> universalBasicWindow = new UniversalBasicWindow(); >>>>> >>>>> collector.ack(tuple); >>>>> } >>>>> >>>>> if (tuple.getSourceStreamId().equals("data")) { >>>>> >>>>> streamId = tuple.getStringByField("streamId"); >>>>> time = tuple.getStringByField("timestamp"); >>>>> value = Float.parseFloat(tuple.getStringByField("value")); >>>>> >>>>> universalBasicWindow.pushStream(streamId, value); >>>>> >>>>> collector.ack(tuple); >>>>> >>>>> if (universalBasicWindow.isFull(task_id)) { //check if >>>>> any stream of the window is full >>>>> >>>>> // System.out.println("Univ. Basic Window of bolt " + >>>>> task_id + " is Filled Up"); >>>>> >>>>> collector.emit("bwFilled", new Values(task_id)); >>>>> >>>>> // universalBasicWindow.normalize(); >>>>> // universalBasicWindow = new UniversalBasicWindow(); >>>>> >>>>> // TODO:: Add basic window to sliding window and clear >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> // collector.ack(tuple); >>>>> } >>>>> >>>>> @Override >>>>> public void declareOutputFields(OutputFieldsDeclarer >>>>> outputFieldsDeclarer) { >>>>> outputFieldsDeclarer.declareStream("bwFilled", new >>>>> Fields("task_id")); >>>>> >>>>> } >>>>> } >>>>> >>>>> Any Ideas??
