Sorry wrong code posted that was the bolts' code. The spout’ s code is:
package tuc.LSH.storm.spouts;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
/**
* Created by mixtou on 15/5/15.
*/
//public class FileReaderSpout extends BaseRichSpout {
public class FileReaderSpout implements IRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed;
private TopologyContext context;
private int spout_idx;
private int spout_id;
private Map config;
private int noOfFailedWords;
private int noOfAckedWords;
// private BufferedReader reader;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("data", new Fields("streamId",
"timestamp", "value"));
}
@Override
public void open(Map config, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
this.context = topologyContext;
this.spout_idx = context.getThisTaskIndex();
this.spout_id = context.getThisTaskId();
this.collector = spoutOutputCollector;
this.config = config;
this.completed = false;
this.noOfFailedWords = 0;
this.noOfAckedWords = 0;
try {
System.err.println("Reading File: " +
config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
this.fileReader = new
FileReader(config.get(file_to_read()).toString());
// this.reader = new BufferedReader(fileReader);
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file [" +
config.get(file_to_read()) + "]");
}
}
@Override
public void nextTuple() {
if (this.completed) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
String str;
BufferedReader reader = new BufferedReader(fileReader);
while ((str = reader.readLine()) != null) {
if (str.isEmpty()) {
return;
}
String[] temp = str.split(",");
// System.err.println("============== "+temp[0] + " + " +
temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
collector.emit("data", new Values(temp[0], temp[2], temp[3]),
temp[0]); //emmit the correct data to next bolt without guarantee delivery
}
} catch (Exception e) {
// System.err.println("Error Reading Tuple: " + e);
throw new RuntimeException("Error Reading Tuple ", e);
} finally {
System.err.println("Finished Reading File. Message From Spout: " +
spout_idx);
this.completed = true;
}
}
private String file_to_read() {
// this.spout_id = context.getThisTaskId();
if (Consts.NO_OF_SPOUTS > 1) {
int file_no = spout_idx % Consts.NO_OF_SPOUTS;
return "data" + file_no;
} else {
return "data";
}
}
@Override
public void ack(Object msgId) {
// super.ack(msgId);
noOfAckedWords++;
// System.out.println("OK tuple acked from bolt: " + msgId+" no of acked
word "+noOfAckedWords);
}
@Override
public void fail(Object msgId) {
// super.fail(msgId);
noOfFailedWords++;
System.err.println("ERROR: " + context.getThisComponentId() + " " +
msgId + " no of words failed " + noOfFailedWords);
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
@Override
public void close() {
try {
fileReader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
}
> On 5 Ιουν 2015, at 12:12, Michail Toutoudakis <[email protected]> wrote:
>
> I tried your solution but nothing happened. I still continue to loose many
> values. However i found a file reader spout implementation that worked
> without loosing a single value. Bellow is the code for anyone that might have
> the same problem:
>
> package tuc.LSH.storm.bolts;
>
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import sun.jvm.hotspot.runtime.*;
> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>
>
> import java.lang.Thread;
> import java.util.*;
>
> /**
> * Created by mixtou on 17/5/15.
> */
> public class LSHBolt extends BaseRichBolt {
> private int task_id;
> private OutputCollector collector;
> private UniversalBasicWindow universalBasicWindow;
>
> private String streamId;
> private String time;
> private Float value;
>
> @Override
> public void prepare(Map conf, TopologyContext topologyContext,
> OutputCollector outputCollector) {
> this.task_id = topologyContext.getThisTaskIndex();
> this.collector = outputCollector;
> this.universalBasicWindow = new UniversalBasicWindow();
> streamId = null;
> time = null;
> value = 0f;
> System.err.println("New Bolt with id: " + task_id);
>
> }
>
> @Override
> public void execute(Tuple tuple) {
>
> if (tuple.getSourceStreamId().equals("sync")) {
> System.out.println("Bolt task id " + task_id + " received from "
> + tuple.getSourceComponent() + " message " + tuple.getString(0));
> System.out.println("Normalizing: Basic Window of Bolt " +
> task_id);
> universalBasicWindow.normalize(); //fill the rest of the streams
> with last received value to make them same size
> universalBasicWindow = null;
> universalBasicWindow = new UniversalBasicWindow();
>
> }
>
> if (tuple.getSourceStreamId().equals("data")) {
>
> streamId = tuple.getStringByField("streamId");
> time = tuple.getStringByField("timestamp");
> value = Float.parseFloat(tuple.getStringByField("value"));
>
> universalBasicWindow.pushStream(streamId, value);
>
> if (universalBasicWindow.isFull(task_id)) { //check if any stream
> of the window is full
>
> // System.out.println("Univ. Basic Window of bolt " + task_id
> + " is Filled Up");
>
> collector.emit("bwFilled", new Values(task_id));
>
> // universalBasicWindow.normalize();
> // universalBasicWindow = new UniversalBasicWindow();
>
> // TODO:: Add basic window to sliding window and clear
>
> }
>
> }
>
>
> // System.err.println("SourceComponent: "+tuple.getSourceComponent());
> // System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
> // System.err.println("Source Task: "+tuple.getSourceTask());
> // System.err.println("SourceGlobalStreamId:
> "+tuple.getSourceGlobalStreamid());
> // System.err.println("MessageId: "+tuple.getMessageId());
>
> collector.ack(tuple);
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer
> outputFieldsDeclarer) {
> outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));
>
> }
> }
>
>> On 4 Ιουν 2015, at 20:29, Matthias J. Sax <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi,
>>
>> You are looping within "nextTuple()" to emit a tuple for each lines for
>> the whole file. This is "bad practice" because the spout is prevented to
>> take "acks" while "nextTuple()" is executing. I guess, that is the
>> reason why your tuples time out and fail.
>>
>> You should return from "nextTuple()" after emitting a single record.
>> This should solve your problem as the Spout can process incoming acks
>> between to calls to "nextTuple()". Just instantiate your BufferedReader
>> in "open()" and close it if "this.completed" is true.
>>
>> Hope this helps.
>>
>>
>> -Matthias
>>
>>
>> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote:
>>> I am using a bolt to read data from a text file, and send them to a
>>> bolt. However i am loosing too many values, the Fail.
>>> I am newbie to storm and i don’t know where to look to debug this issue.
>>> Any ideas??
>>>
>>> My Storm Spout code is:
>>>
>>> package tuc.LSH.storm.spouts;
>>>
>>> import backtype.storm.spout.SpoutOutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichSpout;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Values;
>>> import backtype.storm.utils.Utils;
>>> import tuc.LSH.conf.Consts;
>>>
>>> import java.io.BufferedReader;
>>> import java.io.FileNotFoundException;
>>> import java.io.FileReader;
>>> import java.util.Map;
>>>
>>> /**
>>> * Created by mixtou on 15/5/15.
>>> */
>>> public class FileReaderSpout extends BaseRichSpout {
>>>
>>> private SpoutOutputCollector collector;
>>> private FileReader fileReader;
>>> private boolean completed;
>>> private TopologyContext context;
>>> private int spout_idx;
>>> private int spout_id;
>>> private Map config;
>>> private int noOfFailedWords;
>>> private int noOfAckedWords;
>>>
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer
>>> outputFieldsDeclarer) {
>>> outputFieldsDeclarer.declareStream("data",
>>> new Fields("streamId", "timestamp", "value"));
>>>
>>>
>>> }
>>>
>>> @Override
>>> public void open(Map config, TopologyContext
>>> topologyContext, SpoutOutputCollector spoutOutputCollector) {
>>> this.context = topologyContext;
>>> this.spout_idx = context.getThisTaskIndex();
>>> this.spout_id = context.getThisTaskId();
>>> this.collector = spoutOutputCollector;
>>> this.config = config;
>>> this.completed = false;
>>> this.noOfFailedWords = 0;
>>> this.noOfAckedWords = 0;
>>>
>>> try {
>>> System.err.println("Reading File: " +
>>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>>
>>> this.fileReader = new FileReader(config.get(file_to_read()).toString());
>>> } catch (FileNotFoundException e) {
>>> throw new RuntimeException("Error reading file [" +
>>> config.get(file_to_read()) + "]");
>>> }
>>>
>>> }
>>>
>>> @Override
>>> public void nextTuple() {
>>>
>>> if (this.completed) {
>>> return;
>>> }
>>>
>>> try {
>>>
>>> String str;
>>> BufferedReader reader = new BufferedReader(fileReader);
>>>
>>> while ((str = reader.readLine()) != null) {
>>>
>>> if (str.isEmpty()) {
>>> return;
>>> }
>>> String[] temp = str.split(",");
>>> // System.err.println("============== "+temp[0] + " +
>>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>>
>>> collector.emit("data",
>>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct
>>> data to next bolt with guarantee delivery
>>>
>>> }
>>>
>>> } catch (Exception e) {
>>> System.err.println("Error Reading Tuple: " + e);
>>> throw new RuntimeException("Error Reading Tuple ", e);
>>> } finally {
>>> System.err.println("Finished Reading File. Message From
>>> Spout: " + spout_idx);
>>> this.completed = true;
>>>
>>> }
>>>
>>> Utils.sleep(100);
>>> }
>>>
>>> private String file_to_read() {
>>> // this.spout_id = context.getThisTaskId();
>>> if (Consts.NO_OF_SPOUTS > 1) {
>>> int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>>> return "data" + file_no;
>>> } else {
>>> return "data";
>>> }
>>> }
>>>
>>> @Override
>>> public void ack(Object msgId) {
>>> super.ack(msgId);
>>> noOfAckedWords++;
>>> // System.out.println("OK tuple acked from bolt: " + msgId+" no
>>> of acked word "+noOfAckedWords);
>>> }
>>>
>>> @Override
>>> public void fail(Object msgId) {
>>> super.fail(msgId);
>>> noOfFailedWords++;
>>> System.err.println("ERROR: " + context.getThisComponentId() + "
>>> " + msgId + " no of words failed " + noOfFailedWords);
>>>
>>> }
>>> }
>>>
>>> And my bolt looks like this:
>>>
>>> package tuc.LSH.storm.bolts;
>>>
>>> import backtype.storm.task.OutputCollector;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseRichBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>> import sun.jvm.hotspot.runtime.*;
>>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>>
>>>
>>> import java.lang.Thread;
>>> import java.util.*;
>>>
>>> /**
>>> * Created by mixtou on 17/5/15.
>>> */
>>> public class LSHBolt extends BaseRichBolt {
>>> private int task_id;
>>> private OutputCollector collector;
>>> private UniversalBasicWindow universalBasicWindow;
>>>
>>> private String streamId;
>>> private String time;
>>> private Float value;
>>>
>>> @Override
>>> public void prepare(Map conf, TopologyContext topologyContext,
>>> OutputCollector outputCollector) {
>>> this.task_id = topologyContext.getThisTaskIndex();
>>> this.collector = outputCollector;
>>> this.universalBasicWindow = new UniversalBasicWindow();
>>> streamId = null;
>>> time = null;
>>> value = 0f;
>>> System.err.println("New Bolt with id: " + task_id);
>>>
>>> }
>>>
>>> @Override
>>> public void execute(Tuple tuple) {
>>>
>>> if (tuple.getSourceStreamId().equals("sync")) {
>>> System.out.println("Bolt task id " + task_id + " received from "
>>> + tuple.getSourceComponent() + " message " + tuple.getString(0));
>>> System.out.println("Normalizing: Basic Window of Bolt " +
>>> task_id);
>>> universalBasicWindow.normalize(); //fill the rest of the streams
>>> with last received value to make them same size
>>> universalBasicWindow = null;
>>> universalBasicWindow = new UniversalBasicWindow();
>>>
>>> collector.ack(tuple);
>>> }
>>>
>>> if (tuple.getSourceStreamId().equals("data")) {
>>>
>>> streamId = tuple.getStringByField("streamId");
>>> time = tuple.getStringByField("timestamp");
>>> value = Float.parseFloat(tuple.getStringByField("value"));
>>>
>>> universalBasicWindow.pushStream(streamId, value);
>>>
>>> collector.ack(tuple);
>>>
>>> if (universalBasicWindow.isFull(task_id)) { //check if any
>>> stream of the window is full
>>>
>>> // System.out.println("Univ. Basic Window of bolt " +
>>> task_id + " is Filled Up");
>>>
>>> collector.emit("bwFilled", new Values(task_id));
>>>
>>> // universalBasicWindow.normalize();
>>> // universalBasicWindow = new UniversalBasicWindow();
>>>
>>> // TODO:: Add basic window to sliding window and clear
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> // collector.ack(tuple);
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer
>>> outputFieldsDeclarer) {
>>> outputFieldsDeclarer.declareStream("bwFilled", new
>>> Fields("task_id"));
>>>
>>> }
>>> }
>>>
>>> Any Ideas??
>>>
>>>
>>
>