Basically something like this:
open(...) {
BufferedReader reader = new BufferedReader(...)
}
nextTuple() {
if(reader != null) {
String line = reader.readLine();
if(line != null) {
collector.emit(line);
} else {
reader.close();
reader = null;
}
}
}
There is no need to loop within nextTuple() because Storm is calling it
infinitely anyway. Furthermore, there is no need to sleep, because Strom
will apply a sleep-penalty automatically, if .emit(...) is not called
with nextTuple() (see
https://storm.apache.org/2012/09/06/storm081-released.html)
You should also close the reader after the file is processed. Closing is
in Spout.close() is not reliable, because Storm does not guarantee that
Spout.close() will be called.
-Matthias
On 06/05/2015 02:04 PM, Michail Toutoudakis wrote:
> Sorry for my stupid question, but what do you mean return from each emit?
> Add return; after emiting tuple os something else i haven’t understood?
> Could you post a code sample?
>
>> On 5 Ιουν 2015, at 14:44, Matthias J. Sax
>> <[email protected] <mailto:[email protected]>>
>> wrote:
>>
>> Hi,
>>
>> I don't see big difference in both Spout implementations... They look
>> basically the same... I would still recommend, to return from
>> nextTuple() after each emit.
>>
>> If you don't want to return each time, another reason for missing values
>> might be the return statement in the while-loop:
>>
>>> if (str.isEmpty()) {
>>> return;
>>> }
>>
>> This should be "continue" -- otherwise you return from nextTuple()
>> without processing the whole file, if an empty line is in between.
>>
>>
>> -Matthias
>>
>> On 06/05/2015 11:16 AM, Michail Toutoudakis wrote:
>>> Sorry wrong code posted that was the bolts' code. The spout’ s code is:
>>>
>>> package tuc.LSH.storm.spouts;
>>>
>>> import backtype.storm.spout.SpoutOutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.IRichSpout;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichSpout;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Values;
>>> import backtype.storm.utils.Utils;
>>> import tuc.LSH.conf.Consts;
>>>
>>> import java.io.BufferedReader;
>>> import java.io.FileNotFoundException;
>>> import java.io.FileReader;
>>> import java.io.IOException;
>>> import java.util.Map;
>>>
>>> /**
>>> * Created by mixtou on 15/5/15.
>>> */
>>> //public class FileReaderSpout extends BaseRichSpout {
>>> public class FileReaderSpout implements IRichSpout {
>>>
>>> private SpoutOutputCollector collector;
>>> private FileReader fileReader;
>>> private boolean completed;
>>> private TopologyContext context;
>>> private int spout_idx;
>>> private int spout_id;
>>> private Map config;
>>> private int noOfFailedWords;
>>> private int noOfAckedWords;
>>> // private BufferedReader reader;
>>>
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer
>>> outputFieldsDeclarer) {
>>> outputFieldsDeclarer.declareStream("data", new
>>> Fields("streamId", "timestamp", "value"));
>>>
>>>
>>> }
>>>
>>> @Override
>>> public void open(Map config, TopologyContext topologyContext,
>>> SpoutOutputCollector spoutOutputCollector) {
>>> this.context = topologyContext;
>>> this.spout_idx = context.getThisTaskIndex();
>>> this.spout_id = context.getThisTaskId();
>>> this.collector = spoutOutputCollector;
>>> this.config = config;
>>> this.completed = false;
>>> this.noOfFailedWords = 0;
>>> this.noOfAckedWords = 0;
>>>
>>> try {
>>> System.err.println("Reading File: " +
>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>> this.fileReader = new
>>> FileReader(config.get(file_to_read()).toString());
>>> // this.reader = new BufferedReader(fileReader);
>>>
>>> } catch (FileNotFoundException e) {
>>> throw new RuntimeException("Error reading file [" +
>>> config.get(file_to_read()) + "]");
>>> }
>>>
>>> }
>>>
>>> @Override
>>> public void nextTuple() {
>>>
>>> if (this.completed) {
>>> try {
>>> Thread.sleep(1);
>>> } catch (InterruptedException e) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> try {
>>>
>>> String str;
>>> BufferedReader reader = new BufferedReader(fileReader);
>>>
>>> while ((str = reader.readLine()) != null) {
>>>
>>> if (str.isEmpty()) {
>>> return;
>>> }
>>> String[] temp = str.split(",");
>>> // System.err.println("============== "+temp[0] +
>>> " + " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>> collector.emit("data", new Values(temp[0], temp[2],
>>> temp[3]), temp[0]); //emmit the correct data to next bolt without
>>> guarantee delivery
>>>
>>> }
>>>
>>> } catch (Exception e) {
>>> // System.err.println("Error Reading Tuple: " + e);
>>> throw new RuntimeException("Error Reading Tuple ", e);
>>> } finally {
>>> System.err.println("Finished Reading File. Message From
>>> Spout: " + spout_idx);
>>> this.completed = true;
>>>
>>> }
>>>
>>> }
>>>
>>> private String file_to_read() {
>>> // this.spout_id = context.getThisTaskId();
>>> if (Consts.NO_OF_SPOUTS > 1) {
>>> int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>> return "data" + file_no;
>>> } else {
>>> return "data";
>>> }
>>> }
>>>
>>> @Override
>>> public void ack(Object msgId) {
>>> // super.ack(msgId);
>>> noOfAckedWords++;
>>> // System.out.println("OK tuple acked from bolt: " + msgId+"
>>> no of acked word "+noOfAckedWords);
>>> }
>>>
>>> @Override
>>> public void fail(Object msgId) {
>>> // super.fail(msgId);
>>> noOfFailedWords++;
>>> System.err.println("ERROR: " + context.getThisComponentId() +
>>> " " + msgId + " no of words failed " + noOfFailedWords);
>>>
>>> }
>>>
>>> @Override
>>> public Map<String, Object> getComponentConfiguration() {
>>> return null;
>>> }
>>>
>>> @Override
>>> public void close() {
>>> try {
>>> fileReader.close();
>>> } catch (IOException e) {
>>> e.printStackTrace();
>>> }
>>>
>>> }
>>>
>>> @Override
>>> public void activate() {
>>>
>>> }
>>>
>>> @Override
>>> public void deactivate() {
>>>
>>> }
>>> }
>>>
>>>
>>>
>>>
>>>> On 5 Ιουν 2015, at 12:12, Michail Toutoudakis <[email protected]
>>>> <mailto:[email protected]>
>>>> <mailto:[email protected]>> wrote:
>>>>
>>>> I tried your solution but nothing happened. I still continue to loose
>>>> many values. However i found a file reader spout implementation that
>>>> worked without loosing a single value. Bellow is the code for anyone
>>>> that might have the same problem:
>>>>
>>>> package tuc.LSH.storm.bolts;
>>>>
>>>> import backtype.storm.task.OutputCollector;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>> import sun.jvm.hotspot.runtime.*;
>>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>>>
>>>>
>>>> import java.lang.Thread;
>>>> import java.util.*;
>>>>
>>>> /**
>>>> * Created by mixtou on 17/5/15.
>>>> */
>>>> public class LSHBolt extends BaseRichBolt {
>>>> private int task_id;
>>>> private OutputCollector collector;
>>>> private UniversalBasicWindow universalBasicWindow;
>>>>
>>>> private String streamId;
>>>> private String time;
>>>> private Float value;
>>>>
>>>> @Override
>>>> public void prepare(Map conf, TopologyContext topologyContext,
>>>> OutputCollector outputCollector) {
>>>> this.task_id = topologyContext.getThisTaskIndex();
>>>> this.collector = outputCollector;
>>>> this.universalBasicWindow = new UniversalBasicWindow();
>>>> streamId = null;
>>>> time = null;
>>>> value = 0f;
>>>> System.err.println("New Bolt with id: " + task_id);
>>>>
>>>> }
>>>>
>>>> @Override
>>>> public void execute(Tuple tuple) {
>>>>
>>>> if (tuple.getSourceStreamId().equals("sync")) {
>>>> System.out.println("Bolt task id " + task_id + " received
>>>> from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>>> System.out.println("Normalizing: Basic Window of Bolt " +
>>>> task_id);
>>>> universalBasicWindow.normalize(); //fill the rest of the
>>>> streams with last received value to make them same size
>>>> universalBasicWindow = null;
>>>> universalBasicWindow = new UniversalBasicWindow();
>>>>
>>>> }
>>>>
>>>> if (tuple.getSourceStreamId().equals("data")) {
>>>>
>>>> streamId = tuple.getStringByField("streamId");
>>>> time = tuple.getStringByField("timestamp");
>>>> value = Float.parseFloat(tuple.getStringByField("value"));
>>>>
>>>> universalBasicWindow.pushStream(streamId, value);
>>>>
>>>> if (universalBasicWindow.isFull(task_id)) { //check if
>>>> any stream of the window is full
>>>>
>>>> // System.out.println("Univ. Basic Window of bolt " +
>>>> task_id + " is Filled Up");
>>>>
>>>> collector.emit("bwFilled", new Values(task_id));
>>>>
>>>> // universalBasicWindow.normalize();
>>>> // universalBasicWindow = new UniversalBasicWindow();
>>>>
>>>> // TODO:: Add basic window to sliding window and clear
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> // System.err.println("SourceComponent:
>>>> "+tuple.getSourceComponent());
>>>> // System.err.println("SourceStreamId:
>>>> "+tuple.getSourceStreamId());
>>>> // System.err.println("Source Task: "+tuple.getSourceTask());
>>>> // System.err.println("SourceGlobalStreamId:
>>>> "+tuple.getSourceGlobalStreamid());
>>>> // System.err.println("MessageId: "+tuple.getMessageId());
>>>>
>>>> collector.ack(tuple);
>>>> }
>>>>
>>>> @Override
>>>> public void declareOutputFields(OutputFieldsDeclarer
>>>> outputFieldsDeclarer) {
>>>> outputFieldsDeclarer.declareStream("bwFilled", new
>>>> Fields("task_id"));
>>>>
>>>> }
>>>> }
>>>>
>>>>> On 4 Ιουν 2015, at 20:29, Matthias J. Sax
>>>>> <[email protected] <mailto:[email protected]>
>>>>> <mailto:[email protected]>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> You are looping within "nextTuple()" to emit a tuple for each lines for
>>>>> the whole file. This is "bad practice" because the spout is
>>>>> prevented to
>>>>> take "acks" while "nextTuple()" is executing. I guess, that is the
>>>>> reason why your tuples time out and fail.
>>>>>
>>>>> You should return from "nextTuple()" after emitting a single record.
>>>>> This should solve your problem as the Spout can process incoming acks
>>>>> between to calls to "nextTuple()". Just instantiate your BufferedReader
>>>>> in "open()" and close it if "this.completed" is true.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote:
>>>>>> I am using a bolt to read data from a text file, and send them to a
>>>>>> bolt. However i am loosing too many values, the Fail.
>>>>>> I am newbie to storm and i don’t know where to look to debug this
>>>>>> issue.
>>>>>> Any ideas??
>>>>>>
>>>>>> My Storm Spout code is:
>>>>>>
>>>>>> package tuc.LSH.storm.spouts;
>>>>>>
>>>>>> import backtype.storm.spout.SpoutOutputCollector;
>>>>>> import backtype.storm.task.TopologyContext;
>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>> import backtype.storm.topology.base.BaseRichSpout;
>>>>>> import backtype.storm.tuple.Fields;
>>>>>> import backtype.storm.tuple.Values;
>>>>>> import backtype.storm.utils.Utils;
>>>>>> import tuc.LSH.conf.Consts;
>>>>>>
>>>>>> import java.io.BufferedReader;
>>>>>> import java.io.FileNotFoundException;
>>>>>> import java.io.FileReader;
>>>>>> import java.util.Map;
>>>>>>
>>>>>> /**
>>>>>> * Created by mixtou on 15/5/15.
>>>>>> */
>>>>>> public class FileReaderSpout extends BaseRichSpout {
>>>>>>
>>>>>> private SpoutOutputCollector collector;
>>>>>> private FileReader fileReader;
>>>>>> private boolean completed;
>>>>>> private TopologyContext context;
>>>>>> private int spout_idx;
>>>>>> private int spout_id;
>>>>>> private Map config;
>>>>>> private int noOfFailedWords;
>>>>>> private int noOfAckedWords;
>>>>>>
>>>>>>
>>>>>> @Override
>>>>>> public void declareOutputFields(OutputFieldsDeclarer
>>>>>> outputFieldsDeclarer) {
>>>>>> outputFieldsDeclarer.declareStream("data",
>>>>>> new Fields("streamId", "timestamp", "value"));
>>>>>>
>>>>>>
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void open(Map config, TopologyContext
>>>>>> topologyContext, SpoutOutputCollector spoutOutputCollector) {
>>>>>> this.context = topologyContext;
>>>>>> this.spout_idx = context.getThisTaskIndex();
>>>>>> this.spout_id = context.getThisTaskId();
>>>>>> this.collector = spoutOutputCollector;
>>>>>> this.config = config;
>>>>>> this.completed = false;
>>>>>> this.noOfFailedWords = 0;
>>>>>> this.noOfAckedWords = 0;
>>>>>>
>>>>>> try {
>>>>>> System.err.println("Reading File: " +
>>>>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>>>>>
>>>>>> this.fileReader = new
>>>>>> FileReader(config.get(file_to_read()).toString());
>>>>>> } catch (FileNotFoundException e) {
>>>>>> throw new RuntimeException("Error reading file [" +
>>>>>> config.get(file_to_read()) + "]");
>>>>>> }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void nextTuple() {
>>>>>>
>>>>>> if (this.completed) {
>>>>>> return;
>>>>>> }
>>>>>>
>>>>>> try {
>>>>>>
>>>>>> String str;
>>>>>> BufferedReader reader = new BufferedReader(fileReader);
>>>>>>
>>>>>> while ((str = reader.readLine()) != null) {
>>>>>>
>>>>>> if (str.isEmpty()) {
>>>>>> return;
>>>>>> }
>>>>>> String[] temp = str.split(",");
>>>>>> // System.err.println("============== "+temp[0]
>>>>>> + " +
>>>>>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>>>>>
>>>>>> collector.emit("data",
>>>>>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct
>>>>>> data to next bolt with guarantee delivery
>>>>>>
>>>>>> }
>>>>>>
>>>>>> } catch (Exception e) {
>>>>>> System.err.println("Error Reading Tuple: " + e);
>>>>>> throw new RuntimeException("Error Reading Tuple ", e);
>>>>>> } finally {
>>>>>> System.err.println("Finished Reading File. Message From
>>>>>> Spout: " + spout_idx);
>>>>>> this.completed = true;
>>>>>>
>>>>>> }
>>>>>>
>>>>>> Utils.sleep(100);
>>>>>> }
>>>>>>
>>>>>> private String file_to_read() {
>>>>>> // this.spout_id = context.getThisTaskId();
>>>>>> if (Consts.NO_OF_SPOUTS > 1) {
>>>>>> int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>>>>> return "data" + file_no;
>>>>>> } else {
>>>>>> return "data";
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void ack(Object msgId) {
>>>>>> super.ack(msgId);
>>>>>> noOfAckedWords++;
>>>>>> // System.out.println("OK tuple acked from bolt: " + msgId+" no
>>>>>> of acked word "+noOfAckedWords);
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void fail(Object msgId) {
>>>>>> super.fail(msgId);
>>>>>> noOfFailedWords++;
>>>>>> System.err.println("ERROR: " + context.getThisComponentId() + "
>>>>>> " + msgId + " no of words failed " + noOfFailedWords);
>>>>>>
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> And my bolt looks like this:
>>>>>>
>>>>>> package tuc.LSH.storm.bolts;
>>>>>>
>>>>>> import backtype.storm.task.OutputCollector;
>>>>>> import backtype.storm.task.TopologyContext;
>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>> import backtype.storm.topology.base.BaseRichBolt;
>>>>>> import backtype.storm.tuple.Fields;
>>>>>> import backtype.storm.tuple.Tuple;
>>>>>> import backtype.storm.tuple.Values;
>>>>>> import sun.jvm.hotspot.runtime.*;
>>>>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>>>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>>>>>
>>>>>>
>>>>>> import java.lang.Thread;
>>>>>> import java.util.*;
>>>>>>
>>>>>> /**
>>>>>> * Created by mixtou on 17/5/15.
>>>>>> */
>>>>>> public class LSHBolt extends BaseRichBolt {
>>>>>> private int task_id;
>>>>>> private OutputCollector collector;
>>>>>> private UniversalBasicWindow universalBasicWindow;
>>>>>>
>>>>>> private String streamId;
>>>>>> private String time;
>>>>>> private Float value;
>>>>>>
>>>>>> @Override
>>>>>> public void prepare(Map conf, TopologyContext topologyContext,
>>>>>> OutputCollector outputCollector) {
>>>>>> this.task_id = topologyContext.getThisTaskIndex();
>>>>>> this.collector = outputCollector;
>>>>>> this.universalBasicWindow = new UniversalBasicWindow();
>>>>>> streamId = null;
>>>>>> time = null;
>>>>>> value = 0f;
>>>>>> System.err.println("New Bolt with id: " + task_id);
>>>>>>
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void execute(Tuple tuple) {
>>>>>>
>>>>>> if (tuple.getSourceStreamId().equals("sync")) {
>>>>>> System.out.println("Bolt task id " + task_id + " received
>>>>>> from " + tuple.getSourceComponent() + " message " +
>>>>>> tuple.getString(0));
>>>>>> System.out.println("Normalizing: Basic Window of Bolt " +
>>>>>> task_id);
>>>>>> universalBasicWindow.normalize(); //fill the rest of the
>>>>>> streams with last received value to make them same size
>>>>>> universalBasicWindow = null;
>>>>>> universalBasicWindow = new UniversalBasicWindow();
>>>>>>
>>>>>> collector.ack(tuple);
>>>>>> }
>>>>>>
>>>>>> if (tuple.getSourceStreamId().equals("data")) {
>>>>>>
>>>>>> streamId = tuple.getStringByField("streamId");
>>>>>> time = tuple.getStringByField("timestamp");
>>>>>> value = Float.parseFloat(tuple.getStringByField("value"));
>>>>>>
>>>>>> universalBasicWindow.pushStream(streamId, value);
>>>>>>
>>>>>> collector.ack(tuple);
>>>>>>
>>>>>> if (universalBasicWindow.isFull(task_id)) { //check if
>>>>>> any stream of the window is full
>>>>>>
>>>>>> // System.out.println("Univ. Basic Window of bolt " +
>>>>>> task_id + " is Filled Up");
>>>>>>
>>>>>> collector.emit("bwFilled", new Values(task_id));
>>>>>>
>>>>>> // universalBasicWindow.normalize();
>>>>>> // universalBasicWindow = new UniversalBasicWindow();
>>>>>>
>>>>>> // TODO:: Add basic window to sliding window and clear
>>>>>>
>>>>>> }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> // collector.ack(tuple);
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> public void declareOutputFields(OutputFieldsDeclarer
>>>>>> outputFieldsDeclarer) {
>>>>>> outputFieldsDeclarer.declareStream("bwFilled", new
>>>>>> Fields("task_id"));
>>>>>>
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> Any Ideas??
>
signature.asc
Description: OpenPGP digital signature
