Thanks a lot scanner seems to work fine without failing any values. Only one addition: I had to put Utils.sleep(1); after tuple emit.
> On 5 Ιουν 2015, at 16:38, Mike Thomsen <[email protected]> wrote: > > The nextTuple logic should be redone around a java.util.Scanner, not a > BufferedReader. Something like this: > > Scanner scanner; > > void open(...) { > //other code > scanner = new Scanner(new FileInputStream("location_here")); > } > > void nextTuple() { > if (scanner.hasNextLine()) { > collector.emit("data", new Values(scanner.nextLine())); > } > } > > On Fri, Jun 5, 2015 at 8:21 AM, Andrew Xor <[email protected] > <mailto:[email protected]>> wrote: > Your function as it stands emits from nextTuple after the file has been read; > which is against basically every storm principle that's out there. It's not > scalable and will lead to a lot of issues, especially when running on a > cluster. What Matthias is suggesting (and you should do) is that for each > tuple you emit you need to return from nextTuple and call it again to emit > another one. Basically you should have one call for nextTuple per batch/tuple > that you emit, which as per your current implementation is one per tokenized > line read. > > Hope this helps. > > > On Fri, Jun 5, 2015 at 3:04 PM, Michail Toutoudakis <[email protected] > <mailto:[email protected]>> wrote: > Sorry for my stupid question, but what do you mean return from each emit? > Add return; after emiting tuple os something else i haven’t understood? > Could you post a code sample? > >> On 5 Ιουν 2015, at 14:44, Matthias J. Sax <[email protected] >> <mailto:[email protected]>> wrote: >> >> Hi, >> >> I don't see big difference in both Spout implementations... They look >> basically the same... I would still recommend, to return from >> nextTuple() after each emit. >> >> If you don't want to return each time, another reason for missing values >> might be the return statement in the while-loop: >> >>> if (str.isEmpty()) { >>> return; >>> } >> >> This should be "continue" -- otherwise you return from nextTuple() >> without processing the whole file, if an empty line is in between. >> >> >> -Matthias >> >> On 06/05/2015 11:16 AM, Michail Toutoudakis wrote: >>> Sorry wrong code posted that was the bolts' code. The spout’ s code is: >>> >>> package tuc.LSH.storm.spouts; >>> >>> import backtype.storm.spout.SpoutOutputCollector; >>> import backtype.storm.task.TopologyContext; >>> import backtype.storm.topology.IRichSpout; >>> import backtype.storm.topology.OutputFieldsDeclarer; >>> import backtype.storm.topology.base.BaseRichSpout; >>> import backtype.storm.tuple.Fields; >>> import backtype.storm.tuple.Values; >>> import backtype.storm.utils.Utils; >>> import tuc.LSH.conf.Consts; >>> >>> import java.io.BufferedReader; >>> import java.io.FileNotFoundException; >>> import java.io.FileReader; >>> import java.io.IOException; >>> import java.util.Map; >>> >>> /** >>> * Created by mixtou on 15/5/15. >>> */ >>> //public class FileReaderSpout extends BaseRichSpout { >>> public class FileReaderSpout implements IRichSpout { >>> >>> private SpoutOutputCollector collector; >>> private FileReader fileReader; >>> private boolean completed; >>> private TopologyContext context; >>> private int spout_idx; >>> private int spout_id; >>> private Map config; >>> private int noOfFailedWords; >>> private int noOfAckedWords; >>> // private BufferedReader reader; >>> >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer >>> outputFieldsDeclarer) { >>> outputFieldsDeclarer.declareStream("data", new Fields("streamId", >>> "timestamp", "value")); >>> >>> >>> } >>> >>> @Override >>> public void open(Map config, TopologyContext topologyContext, >>> SpoutOutputCollector spoutOutputCollector) { >>> this.context = topologyContext; >>> this.spout_idx = context.getThisTaskIndex(); >>> this.spout_id = context.getThisTaskId(); >>> this.collector = spoutOutputCollector; >>> this.config = config; >>> this.completed = false; >>> this.noOfFailedWords = 0; >>> this.noOfAckedWords = 0; >>> >>> try { >>> System.err.println("Reading File: " + >>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx); >>> this.fileReader = new >>> FileReader(config.get(file_to_read()).toString()); >>> // this.reader = new BufferedReader(fileReader); >>> >>> } catch (FileNotFoundException e) { >>> throw new RuntimeException("Error reading file [" + >>> config.get(file_to_read()) + "]"); >>> } >>> >>> } >>> >>> @Override >>> public void nextTuple() { >>> >>> if (this.completed) { >>> try { >>> Thread.sleep(1); >>> } catch (InterruptedException e) { >>> e.printStackTrace(); >>> } >>> } >>> >>> try { >>> >>> String str; >>> BufferedReader reader = new BufferedReader(fileReader); >>> >>> while ((str = reader.readLine()) != null) { >>> >>> if (str.isEmpty()) { >>> return; >>> } >>> String[] temp = str.split(","); >>> // System.err.println("============== "+temp[0] + " + " >>> + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value >>> collector.emit("data", new Values(temp[0], temp[2], >>> temp[3]), temp[0]); //emmit the correct data to next bolt without guarantee >>> delivery >>> >>> } >>> >>> } catch (Exception e) { >>> // System.err.println("Error Reading Tuple: " + e); >>> throw new RuntimeException("Error Reading Tuple ", e); >>> } finally { >>> System.err.println("Finished Reading File. Message From Spout: " >>> + spout_idx); >>> this.completed = true; >>> >>> } >>> >>> } >>> >>> private String file_to_read() { >>> // this.spout_id = context.getThisTaskId(); >>> if (Consts.NO_OF_SPOUTS > 1) { >>> int file_no = spout_idx % Consts.NO_OF_SPOUTS; >>> return "data" + file_no; >>> } else { >>> return "data"; >>> } >>> } >>> >>> @Override >>> public void ack(Object msgId) { >>> // super.ack(msgId); >>> noOfAckedWords++; >>> // System.out.println("OK tuple acked from bolt: " + msgId+" no of >>> acked word "+noOfAckedWords); >>> } >>> >>> @Override >>> public void fail(Object msgId) { >>> // super.fail(msgId); >>> noOfFailedWords++; >>> System.err.println("ERROR: " + context.getThisComponentId() + " " + >>> msgId + " no of words failed " + noOfFailedWords); >>> >>> } >>> >>> @Override >>> public Map<String, Object> getComponentConfiguration() { >>> return null; >>> } >>> >>> @Override >>> public void close() { >>> try { >>> fileReader.close(); >>> } catch (IOException e) { >>> e.printStackTrace(); >>> } >>> >>> } >>> >>> @Override >>> public void activate() { >>> >>> } >>> >>> @Override >>> public void deactivate() { >>> >>> } >>> } >>> >>> >>> >>> >>>> On 5 Ιουν 2015, at 12:12, Michail Toutoudakis <[email protected] >>>> <mailto:[email protected]> >>>> <mailto:[email protected] <mailto:[email protected]>>> wrote: >>>> >>>> I tried your solution but nothing happened. I still continue to loose >>>> many values. However i found a file reader spout implementation that >>>> worked without loosing a single value. Bellow is the code for anyone >>>> that might have the same problem: >>>> >>>> package tuc.LSH.storm.bolts; >>>> >>>> import backtype.storm.task.OutputCollector; >>>> import backtype.storm.task.TopologyContext; >>>> import backtype.storm.topology.OutputFieldsDeclarer; >>>> import backtype.storm.topology.base.BaseRichBolt; >>>> import backtype.storm.tuple.Fields; >>>> import backtype.storm.tuple.Tuple; >>>> import backtype.storm.tuple.Values; >>>> import sun.jvm.hotspot.runtime.*; >>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen; >>>> import tuc.LSH.core.timeseries.UniversalBasicWindow; >>>> >>>> >>>> import java.lang.Thread; >>>> import java.util.*; >>>> >>>> /** >>>> * Created by mixtou on 17/5/15. >>>> */ >>>> public class LSHBolt extends BaseRichBolt { >>>> private int task_id; >>>> private OutputCollector collector; >>>> private UniversalBasicWindow universalBasicWindow; >>>> >>>> private String streamId; >>>> private String time; >>>> private Float value; >>>> >>>> @Override >>>> public void prepare(Map conf, TopologyContext topologyContext, >>>> OutputCollector outputCollector) { >>>> this.task_id = topologyContext.getThisTaskIndex(); >>>> this.collector = outputCollector; >>>> this.universalBasicWindow = new UniversalBasicWindow(); >>>> streamId = null; >>>> time = null; >>>> value = 0f; >>>> System.err.println("New Bolt with id: " + task_id); >>>> >>>> } >>>> >>>> @Override >>>> public void execute(Tuple tuple) { >>>> >>>> if (tuple.getSourceStreamId().equals("sync")) { >>>> System.out.println("Bolt task id " + task_id + " received from >>>> " + tuple.getSourceComponent() + " message " + tuple.getString(0)); >>>> System.out.println("Normalizing: Basic Window of Bolt " + >>>> task_id); >>>> universalBasicWindow.normalize(); //fill the rest of the >>>> streams with last received value to make them same size >>>> universalBasicWindow = null; >>>> universalBasicWindow = new UniversalBasicWindow(); >>>> >>>> } >>>> >>>> if (tuple.getSourceStreamId().equals("data")) { >>>> >>>> streamId = tuple.getStringByField("streamId"); >>>> time = tuple.getStringByField("timestamp"); >>>> value = Float.parseFloat(tuple.getStringByField("value")); >>>> >>>> universalBasicWindow.pushStream(streamId, value); >>>> >>>> if (universalBasicWindow.isFull(task_id)) { //check if any >>>> stream of the window is full >>>> >>>> // System.out.println("Univ. Basic Window of bolt " + >>>> task_id + " is Filled Up"); >>>> >>>> collector.emit("bwFilled", new Values(task_id)); >>>> >>>> // universalBasicWindow.normalize(); >>>> // universalBasicWindow = new UniversalBasicWindow(); >>>> >>>> // TODO:: Add basic window to sliding window and clear >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> // System.err.println("SourceComponent: >>>> "+tuple.getSourceComponent()); >>>> // System.err.println("SourceStreamId: "+tuple.getSourceStreamId()); >>>> // System.err.println("Source Task: "+tuple.getSourceTask()); >>>> // System.err.println("SourceGlobalStreamId: >>>> "+tuple.getSourceGlobalStreamid()); >>>> // System.err.println("MessageId: "+tuple.getMessageId()); >>>> >>>> collector.ack(tuple); >>>> } >>>> >>>> @Override >>>> public void declareOutputFields(OutputFieldsDeclarer >>>> outputFieldsDeclarer) { >>>> outputFieldsDeclarer.declareStream("bwFilled", new >>>> Fields("task_id")); >>>> >>>> } >>>> } >>>> >>>>> On 4 Ιουν 2015, at 20:29, Matthias J. Sax >>>>> <[email protected] <mailto:[email protected]> >>>>> <mailto:[email protected] >>>>> <mailto:[email protected]>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>> You are looping within "nextTuple()" to emit a tuple for each lines for >>>>> the whole file. This is "bad practice" because the spout is prevented to >>>>> take "acks" while "nextTuple()" is executing. I guess, that is the >>>>> reason why your tuples time out and fail. >>>>> >>>>> You should return from "nextTuple()" after emitting a single record. >>>>> This should solve your problem as the Spout can process incoming acks >>>>> between to calls to "nextTuple()". Just instantiate your BufferedReader >>>>> in "open()" and close it if "this.completed" is true. >>>>> >>>>> Hope this helps. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote: >>>>>> I am using a bolt to read data from a text file, and send them to a >>>>>> bolt. However i am loosing too many values, the Fail. >>>>>> I am newbie to storm and i don’t know where to look to debug this issue. >>>>>> Any ideas?? >>>>>> >>>>>> My Storm Spout code is: >>>>>> >>>>>> package tuc.LSH.storm.spouts; >>>>>> >>>>>> import backtype.storm.spout.SpoutOutputCollector; >>>>>> import backtype.storm.task.TopologyContext; >>>>>> import backtype.storm.topology.OutputFieldsDeclarer; >>>>>> import backtype.storm.topology.base.BaseRichSpout; >>>>>> import backtype.storm.tuple.Fields; >>>>>> import backtype.storm.tuple.Values; >>>>>> import backtype.storm.utils.Utils; >>>>>> import tuc.LSH.conf.Consts; >>>>>> >>>>>> import java.io.BufferedReader; >>>>>> import java.io.FileNotFoundException; >>>>>> import java.io.FileReader; >>>>>> import java.util.Map; >>>>>> >>>>>> /** >>>>>> * Created by mixtou on 15/5/15. >>>>>> */ >>>>>> public class FileReaderSpout extends BaseRichSpout { >>>>>> >>>>>> private SpoutOutputCollector collector; >>>>>> private FileReader fileReader; >>>>>> private boolean completed; >>>>>> private TopologyContext context; >>>>>> private int spout_idx; >>>>>> private int spout_id; >>>>>> private Map config; >>>>>> private int noOfFailedWords; >>>>>> private int noOfAckedWords; >>>>>> >>>>>> >>>>>> @Override >>>>>> public void declareOutputFields(OutputFieldsDeclarer >>>>>> outputFieldsDeclarer) { >>>>>> outputFieldsDeclarer.declareStream("data", >>>>>> new Fields("streamId", "timestamp", "value")); >>>>>> >>>>>> >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void open(Map config, TopologyContext >>>>>> topologyContext, SpoutOutputCollector spoutOutputCollector) { >>>>>> this.context = topologyContext; >>>>>> this.spout_idx = context.getThisTaskIndex(); >>>>>> this.spout_id = context.getThisTaskId(); >>>>>> this.collector = spoutOutputCollector; >>>>>> this.config = config; >>>>>> this.completed = false; >>>>>> this.noOfFailedWords = 0; >>>>>> this.noOfAckedWords = 0; >>>>>> >>>>>> try { >>>>>> System.err.println("Reading File: " + >>>>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx); >>>>>> >>>>>> this.fileReader = new FileReader(config.get(file_to_read()).toString()); >>>>>> } catch (FileNotFoundException e) { >>>>>> throw new RuntimeException("Error reading file [" + >>>>>> config.get(file_to_read()) + "]"); >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void nextTuple() { >>>>>> >>>>>> if (this.completed) { >>>>>> return; >>>>>> } >>>>>> >>>>>> try { >>>>>> >>>>>> String str; >>>>>> BufferedReader reader = new BufferedReader(fileReader); >>>>>> >>>>>> while ((str = reader.readLine()) != null) { >>>>>> >>>>>> if (str.isEmpty()) { >>>>>> return; >>>>>> } >>>>>> String[] temp = str.split(","); >>>>>> // System.err.println("============== "+temp[0] + " + >>>>>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value >>>>>> >>>>>> collector.emit("data", >>>>>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct >>>>>> data to next bolt with guarantee delivery >>>>>> >>>>>> } >>>>>> >>>>>> } catch (Exception e) { >>>>>> System.err.println("Error Reading Tuple: " + e); >>>>>> throw new RuntimeException("Error Reading Tuple ", e); >>>>>> } finally { >>>>>> System.err.println("Finished Reading File. Message From >>>>>> Spout: " + spout_idx); >>>>>> this.completed = true; >>>>>> >>>>>> } >>>>>> >>>>>> Utils.sleep(100); >>>>>> } >>>>>> >>>>>> private String file_to_read() { >>>>>> // this.spout_id = context.getThisTaskId(); >>>>>> if (Consts.NO_OF_SPOUTS > 1) { >>>>>> int file_no = spout_idx % Consts.NO_OF_SPOUTS; >>>>>> return "data" + file_no; >>>>>> } else { >>>>>> return "data"; >>>>>> } >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void ack(Object msgId) { >>>>>> super.ack(msgId); >>>>>> noOfAckedWords++; >>>>>> // System.out.println("OK tuple acked from bolt: " + msgId+" no >>>>>> of acked word "+noOfAckedWords); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void fail(Object msgId) { >>>>>> super.fail(msgId); >>>>>> noOfFailedWords++; >>>>>> System.err.println("ERROR: " + context.getThisComponentId() + " >>>>>> " + msgId + " no of words failed " + noOfFailedWords); >>>>>> >>>>>> } >>>>>> } >>>>>> >>>>>> And my bolt looks like this: >>>>>> >>>>>> package tuc.LSH.storm.bolts; >>>>>> >>>>>> import backtype.storm.task.OutputCollector; >>>>>> import backtype.storm.task.TopologyContext; >>>>>> import backtype.storm.topology.OutputFieldsDeclarer; >>>>>> import backtype.storm.topology.base.BaseRichBolt; >>>>>> import backtype.storm.tuple.Fields; >>>>>> import backtype.storm.tuple.Tuple; >>>>>> import backtype.storm.tuple.Values; >>>>>> import sun.jvm.hotspot.runtime.*; >>>>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen; >>>>>> import tuc.LSH.core.timeseries.UniversalBasicWindow; >>>>>> >>>>>> >>>>>> import java.lang.Thread; >>>>>> import java.util.*; >>>>>> >>>>>> /** >>>>>> * Created by mixtou on 17/5/15. >>>>>> */ >>>>>> public class LSHBolt extends BaseRichBolt { >>>>>> private int task_id; >>>>>> private OutputCollector collector; >>>>>> private UniversalBasicWindow universalBasicWindow; >>>>>> >>>>>> private String streamId; >>>>>> private String time; >>>>>> private Float value; >>>>>> >>>>>> @Override >>>>>> public void prepare(Map conf, TopologyContext topologyContext, >>>>>> OutputCollector outputCollector) { >>>>>> this.task_id = topologyContext.getThisTaskIndex(); >>>>>> this.collector = outputCollector; >>>>>> this.universalBasicWindow = new UniversalBasicWindow(); >>>>>> streamId = null; >>>>>> time = null; >>>>>> value = 0f; >>>>>> System.err.println("New Bolt with id: " + task_id); >>>>>> >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void execute(Tuple tuple) { >>>>>> >>>>>> if (tuple.getSourceStreamId().equals("sync")) { >>>>>> System.out.println("Bolt task id " + task_id + " received >>>>>> from " + tuple.getSourceComponent() + " message " + tuple.getString(0)); >>>>>> System.out.println("Normalizing: Basic Window of Bolt " + >>>>>> task_id); >>>>>> universalBasicWindow.normalize(); //fill the rest of the >>>>>> streams with last received value to make them same size >>>>>> universalBasicWindow = null; >>>>>> universalBasicWindow = new UniversalBasicWindow(); >>>>>> >>>>>> collector.ack(tuple); >>>>>> } >>>>>> >>>>>> if (tuple.getSourceStreamId().equals("data")) { >>>>>> >>>>>> streamId = tuple.getStringByField("streamId"); >>>>>> time = tuple.getStringByField("timestamp"); >>>>>> value = Float.parseFloat(tuple.getStringByField("value")); >>>>>> >>>>>> universalBasicWindow.pushStream(streamId, value); >>>>>> >>>>>> collector.ack(tuple); >>>>>> >>>>>> if (universalBasicWindow.isFull(task_id)) { //check if >>>>>> any stream of the window is full >>>>>> >>>>>> // System.out.println("Univ. Basic Window of bolt " + >>>>>> task_id + " is Filled Up"); >>>>>> >>>>>> collector.emit("bwFilled", new Values(task_id)); >>>>>> >>>>>> // universalBasicWindow.normalize(); >>>>>> // universalBasicWindow = new UniversalBasicWindow(); >>>>>> >>>>>> // TODO:: Add basic window to sliding window and clear >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> // collector.ack(tuple); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void declareOutputFields(OutputFieldsDeclarer >>>>>> outputFieldsDeclarer) { >>>>>> outputFieldsDeclarer.declareStream("bwFilled", new >>>>>> Fields("task_id")); >>>>>> >>>>>> } >>>>>> } >>>>>> >>>>>> Any Ideas?? > > >
