You should emit with a message id, which will prevent too many messages
from being in flight simultaneously, which will alleviate your out of
memory conditions.
On Jun 7, 2015 5:05 AM, "Michail Toutoudakis" <[email protected]> wrote:

> What is the best spout implementation for reading input data from file? I
> have implemented a spout for reading input data from file using a scanner
> which seems to perform better than buffered file reader.
> However i still loose some values, not many this time about 1%, but the
> problem is that after a few minutes of run i get java out of memory
> exception and i believe it has to do with values buffering.
> My spout implementation is:
>
> package tuc.LSH.storm.spouts;
>
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichSpout;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
> import tuc.LSH.conf.Consts;
>
> import javax.rmi.CORBA.Util;
> import java.io.*;
> import java.util.Map;
> import java.util.Scanner;
>
> /**
>  * Created by mixtou on 15/5/15.
>  */
> public class FileReaderSpout extends BaseRichSpout {
> //public class FileReaderSpout implements IRichSpout {
>
>     private SpoutOutputCollector collector;
>     private Scanner scanner;
>     private boolean completed;
>     private TopologyContext context;
>     private int spout_idx;
>     private int spout_id;
>     private Map config;
>     private int noOfFailedWords;
>     private int noOfAckedWords;
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer 
> outputFieldsDeclarer) {
>         outputFieldsDeclarer.declareStream("data", new Fields("streamId", 
> "timestamp", "value"));
>
>
>     }
>
>     @Override
>     public void open(Map config, TopologyContext topologyContext, 
> SpoutOutputCollector spoutOutputCollector) {
>         this.context = topologyContext;
>         this.spout_idx = context.getThisTaskIndex();
>         this.spout_id = context.getThisTaskId();
>         this.collector = spoutOutputCollector;
>         this.config = config;
>         this.completed = false;
>         this.noOfFailedWords = 0;
>         this.noOfAckedWords = 0;
>
>         try {
>             this.scanner = new Scanner(new 
> File(config.get(file_to_read()).toString()));
>             System.err.println("Scanner Reading File: " + 
> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>         } catch (FileNotFoundException e) {
>             e.printStackTrace();
>         }
>
>     }
>
>     @Override
>     public void nextTuple() {
>
>         if(!completed) {
>             if (scanner.hasNextLine()) {
>                 String[] temp = scanner.nextLine().split(",");
> //            System.err.println("============== " + temp[0] + " + " + 
> temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>                 collector.emit("data", new Values(temp[0], temp[2], temp[3]), 
> temp[0]); //emmit the correct data to next bolt without guarantee delivery
>                 Utils.sleep(1);
>             } else {
>                 System.err.println("End of File Closing Reader");
>                 scanner.close();
>                 completed = true;
>             }
>         }
>
>     }
>
>     private String file_to_read() {
> //        this.spout_id = context.getThisTaskId();
>         if (Consts.NO_OF_SPOUTS > 1) {
>             int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>             return "data" + file_no;
>         } else {
>             return "data";
>         }
>     }
>
>     @Override
>     public void ack(Object msgId) {
>         super.ack(msgId);
>         noOfAckedWords++;
> //        System.out.println("OK tuple acked from bolt: " + msgId + " no of 
> acked word " + noOfAckedWords);
>     }
>
>     @Override
>     public void fail(Object msgId) {
>         super.fail(msgId);
>         noOfFailedWords++;
>         System.err.println("ERROR: " + context.getThisComponentId() + " " + 
> msgId + " no of words failed " + noOfFailedWords);
>
>     }
>
> }
>
>
>

Reply via email to