You should emit with a message id, which will prevent too many messages
from being in flight simultaneously, which will alleviate your out of
memory conditions.
On Jun 7, 2015 5:05 AM, "Michail Toutoudakis" <[email protected]> wrote:
> What is the best spout implementation for reading input data from file? I
> have implemented a spout for reading input data from file using a scanner
> which seems to perform better than buffered file reader.
> However i still loose some values, not many this time about 1%, but the
> problem is that after a few minutes of run i get java out of memory
> exception and i believe it has to do with values buffering.
> My spout implementation is:
>
> package tuc.LSH.storm.spouts;
>
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichSpout;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
> import tuc.LSH.conf.Consts;
>
> import javax.rmi.CORBA.Util;
> import java.io.*;
> import java.util.Map;
> import java.util.Scanner;
>
> /**
> * Created by mixtou on 15/5/15.
> */
> public class FileReaderSpout extends BaseRichSpout {
> //public class FileReaderSpout implements IRichSpout {
>
> private SpoutOutputCollector collector;
> private Scanner scanner;
> private boolean completed;
> private TopologyContext context;
> private int spout_idx;
> private int spout_id;
> private Map config;
> private int noOfFailedWords;
> private int noOfAckedWords;
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer
> outputFieldsDeclarer) {
> outputFieldsDeclarer.declareStream("data", new Fields("streamId",
> "timestamp", "value"));
>
>
> }
>
> @Override
> public void open(Map config, TopologyContext topologyContext,
> SpoutOutputCollector spoutOutputCollector) {
> this.context = topologyContext;
> this.spout_idx = context.getThisTaskIndex();
> this.spout_id = context.getThisTaskId();
> this.collector = spoutOutputCollector;
> this.config = config;
> this.completed = false;
> this.noOfFailedWords = 0;
> this.noOfAckedWords = 0;
>
> try {
> this.scanner = new Scanner(new
> File(config.get(file_to_read()).toString()));
> System.err.println("Scanner Reading File: " +
> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
> } catch (FileNotFoundException e) {
> e.printStackTrace();
> }
>
> }
>
> @Override
> public void nextTuple() {
>
> if(!completed) {
> if (scanner.hasNextLine()) {
> String[] temp = scanner.nextLine().split(",");
> // System.err.println("============== " + temp[0] + " + " +
> temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
> collector.emit("data", new Values(temp[0], temp[2], temp[3]),
> temp[0]); //emmit the correct data to next bolt without guarantee delivery
> Utils.sleep(1);
> } else {
> System.err.println("End of File Closing Reader");
> scanner.close();
> completed = true;
> }
> }
>
> }
>
> private String file_to_read() {
> // this.spout_id = context.getThisTaskId();
> if (Consts.NO_OF_SPOUTS > 1) {
> int file_no = spout_idx % Consts.NO_OF_SPOUTS;
> return "data" + file_no;
> } else {
> return "data";
> }
> }
>
> @Override
> public void ack(Object msgId) {
> super.ack(msgId);
> noOfAckedWords++;
> // System.out.println("OK tuple acked from bolt: " + msgId + " no of
> acked word " + noOfAckedWords);
> }
>
> @Override
> public void fail(Object msgId) {
> super.fail(msgId);
> noOfFailedWords++;
> System.err.println("ERROR: " + context.getThisComponentId() + " " +
> msgId + " no of words failed " + noOfFailedWords);
>
> }
>
> }
>
>
>