I am using a bolt to read data from a text file, and send them to a bolt.
However i am loosing too many values, the Fail.
I am newbie to storm and i don’t know where to look to debug this issue. Any
ideas??
My Storm Spout code is:
package tuc.LSH.storm.spouts;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
/**
* Created by mixtou on 15/5/15.
*/
public class FileReaderSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed;
private TopologyContext context;
private int spout_idx;
private int spout_id;
private Map config;
private int noOfFailedWords;
private int noOfAckedWords;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("data", new Fields("streamId",
"timestamp", "value"));
}
@Override
public void open(Map config, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
this.context = topologyContext;
this.spout_idx = context.getThisTaskIndex();
this.spout_id = context.getThisTaskId();
this.collector = spoutOutputCollector;
this.config = config;
this.completed = false;
this.noOfFailedWords = 0;
this.noOfAckedWords = 0;
try {
System.err.println("Reading File: " +
config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
this.fileReader = new
FileReader(config.get(file_to_read()).toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file [" +
config.get(file_to_read()) + "]");
}
}
@Override
public void nextTuple() {
if (this.completed) {
return;
}
try {
String str;
BufferedReader reader = new BufferedReader(fileReader);
while ((str = reader.readLine()) != null) {
if (str.isEmpty()) {
return;
}
String[] temp = str.split(",");
// System.err.println("============== "+temp[0] + " + " +
temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
collector.emit("data", new Values(temp[0], temp[2], temp[3]),
temp[0]); //emmit the correct data to next bolt with guarantee delivery
}
} catch (Exception e) {
System.err.println("Error Reading Tuple: " + e);
throw new RuntimeException("Error Reading Tuple ", e);
} finally {
System.err.println("Finished Reading File. Message From Spout: " +
spout_idx);
this.completed = true;
}
Utils.sleep(100);
}
private String file_to_read() {
// this.spout_id = context.getThisTaskId();
if (Consts.NO_OF_SPOUTS > 1) {
int file_no = spout_idx % Consts.NO_OF_SPOUTS;
return "data" + file_no;
} else {
return "data";
}
}
@Override
public void ack(Object msgId) {
super.ack(msgId);
noOfAckedWords++;
// System.out.println("OK tuple acked from bolt: " + msgId+" no of acked
word "+noOfAckedWords);
}
@Override
public void fail(Object msgId) {
super.fail(msgId);
noOfFailedWords++;
System.err.println("ERROR: " + context.getThisComponentId() + " " +
msgId + " no of words failed " + noOfFailedWords);
}
}
And my bolt looks like this:
package tuc.LSH.storm.bolts;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import sun.jvm.hotspot.runtime.*;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;
import tuc.LSH.core.timeseries.UniversalBasicWindow;
import java.lang.Thread;
import java.util.*;
/**
* Created by mixtou on 17/5/15.
*/
public class LSHBolt extends BaseRichBolt {
private int task_id;
private OutputCollector collector;
private UniversalBasicWindow universalBasicWindow;
private String streamId;
private String time;
private Float value;
@Override
public void prepare(Map conf, TopologyContext topologyContext,
OutputCollector outputCollector) {
this.task_id = topologyContext.getThisTaskIndex();
this.collector = outputCollector;
this.universalBasicWindow = new UniversalBasicWindow();
streamId = null;
time = null;
value = 0f;
System.err.println("New Bolt with id: " + task_id);
}
@Override
public void execute(Tuple tuple) {
if (tuple.getSourceStreamId().equals("sync")) {
System.out.println("Bolt task id " + task_id + " received from " +
tuple.getSourceComponent() + " message " + tuple.getString(0));
System.out.println("Normalizing: Basic Window of Bolt " + task_id);
universalBasicWindow.normalize(); //fill the rest of the streams
with last received value to make them same size
universalBasicWindow = null;
universalBasicWindow = new UniversalBasicWindow();
collector.ack(tuple);
}
if (tuple.getSourceStreamId().equals("data")) {
streamId = tuple.getStringByField("streamId");
time = tuple.getStringByField("timestamp");
value = Float.parseFloat(tuple.getStringByField("value"));
universalBasicWindow.pushStream(streamId, value);
collector.ack(tuple);
if (universalBasicWindow.isFull(task_id)) { //check if any stream
of the window is full
// System.out.println("Univ. Basic Window of bolt " + task_id +
" is Filled Up");
collector.emit("bwFilled", new Values(task_id));
// universalBasicWindow.normalize();
// universalBasicWindow = new UniversalBasicWindow();
// TODO:: Add basic window to sliding window and clear
}
}
// collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));
}
}
Any Ideas??