What is the best spout implementation for reading input data from file? I have
implemented a spout for reading input data from file using a scanner which
seems to perform better than buffered file reader.
However i still loose some values, not many this time about 1%, but the problem
is that after a few minutes of run i get java out of memory exception and i
believe it has to do with values buffering.
My spout implementation is:
package tuc.LSH.storm.spouts;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;
import javax.rmi.CORBA.Util;
import java.io.*;
import java.util.Map;
import java.util.Scanner;
/**
* Created by mixtou on 15/5/15.
*/
public class FileReaderSpout extends BaseRichSpout {
//public class FileReaderSpout implements IRichSpout {
private SpoutOutputCollector collector;
private Scanner scanner;
private boolean completed;
private TopologyContext context;
private int spout_idx;
private int spout_id;
private Map config;
private int noOfFailedWords;
private int noOfAckedWords;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("data", new Fields("streamId",
"timestamp", "value"));
}
@Override
public void open(Map config, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
this.context = topologyContext;
this.spout_idx = context.getThisTaskIndex();
this.spout_id = context.getThisTaskId();
this.collector = spoutOutputCollector;
this.config = config;
this.completed = false;
this.noOfFailedWords = 0;
this.noOfAckedWords = 0;
try {
this.scanner = new Scanner(new
File(config.get(file_to_read()).toString()));
System.err.println("Scanner Reading File: " +
config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
@Override
public void nextTuple() {
if(!completed) {
if (scanner.hasNextLine()) {
String[] temp = scanner.nextLine().split(",");
// System.err.println("============== " + temp[0] + " + " + temp[2]
+ " + " + temp[3]); //0-id,2-timestamp,3-value
collector.emit("data", new Values(temp[0], temp[2], temp[3]),
temp[0]); //emmit the correct data to next bolt without guarantee delivery
Utils.sleep(1);
} else {
System.err.println("End of File Closing Reader");
scanner.close();
completed = true;
}
}
}
private String file_to_read() {
// this.spout_id = context.getThisTaskId();
if (Consts.NO_OF_SPOUTS > 1) {
int file_no = spout_idx % Consts.NO_OF_SPOUTS;
return "data" + file_no;
} else {
return "data";
}
}
@Override
public void ack(Object msgId) {
super.ack(msgId);
noOfAckedWords++;
// System.out.println("OK tuple acked from bolt: " + msgId + " no of
acked word " + noOfAckedWords);
}
@Override
public void fail(Object msgId) {
super.fail(msgId);
noOfFailedWords++;
System.err.println("ERROR: " + context.getThisComponentId() + " " +
msgId + " no of words failed " + noOfFailedWords);
}
}