Hi,

You are looping within "nextTuple()" to emit a tuple for each lines for
the whole file. This is "bad practice" because the spout is prevented to
take "acks" while "nextTuple()" is executing. I guess, that is the
reason why your tuples time out and fail.

You should return from "nextTuple()" after emitting a single record.
This should solve your problem as the Spout can process incoming acks
between to calls to "nextTuple()". Just instantiate your BufferedReader
in "open()" and close it if "this.completed" is true.

Hope this helps.


-Matthias


On 06/04/2015 07:01 PM, Michail Toutoudakis wrote:
> I am using a bolt to read data from a text file, and send them to a
> bolt. However i am loosing too many values, the Fail. 
> I am newbie to storm and i don’t know where to look to debug this issue.
> Any ideas??
> 
> My Storm Spout code is:
> 
> package tuc.LSH.storm.spouts;
> 
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
> import tuc.LSH.conf.Consts;
> 
> import java.io.BufferedReader;
> import java.io.FileNotFoundException;
> import java.io.FileReader;
> import java.util.Map;
> 
> /**
>  * Created by mixtou on 15/5/15.
>  */
> public class FileReaderSpout extends BaseRichSpout {
> 
>     private SpoutOutputCollector collector;
>     private FileReader fileReader;
>     private boolean completed;
>     private TopologyContext context;
>     private int spout_idx;
>     private int spout_id;
>     private Map config;
>     private int noOfFailedWords;
>     private int noOfAckedWords;
> 
> 
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer
> outputFieldsDeclarer) {
>         outputFieldsDeclarer.declareStream("data",
> new Fields("streamId", "timestamp", "value"));
> 
> 
>     }
> 
>     @Override
>     public void open(Map config, TopologyContext
> topologyContext, SpoutOutputCollector spoutOutputCollector) {
>         this.context = topologyContext;
>         this.spout_idx = context.getThisTaskIndex();
>         this.spout_id = context.getThisTaskId();
>         this.collector = spoutOutputCollector;
>         this.config = config;
>         this.completed = false;
>         this.noOfFailedWords = 0;
>         this.noOfAckedWords = 0;
> 
>         try {
>             System.err.println("Reading File: " +
> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>            
> this.fileReader = new FileReader(config.get(file_to_read()).toString());
>         } catch (FileNotFoundException e) {
>             throw new RuntimeException("Error reading file [" +
> config.get(file_to_read()) + "]");
>         }
> 
>     }
> 
>     @Override
>     public void nextTuple() {
> 
>         if (this.completed) {
>             return;
>         }
> 
>         try {
> 
>             String str;
>             BufferedReader reader = new BufferedReader(fileReader);
> 
>             while ((str = reader.readLine()) != null) {
> 
>                 if (str.isEmpty()) {
>                     return;
>                 }
>                 String[] temp = str.split(",");
> //                    System.err.println("============== "+temp[0] + " +
> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
> 
>                 collector.emit("data",
> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct
> data to next bolt with guarantee delivery
> 
>             }
> 
>         } catch (Exception e) {
>             System.err.println("Error Reading Tuple: " + e);
>             throw new RuntimeException("Error Reading Tuple ", e);
>         } finally {
>             System.err.println("Finished Reading File. Message From
> Spout: " + spout_idx);
>             this.completed = true;
> 
>         }
> 
>         Utils.sleep(100);
>     }
> 
>     private String file_to_read() {
> //        this.spout_id = context.getThisTaskId();
>         if (Consts.NO_OF_SPOUTS > 1) {
>             int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>             return "data" + file_no;
>         } else {
>             return "data";
>         }
>     }
> 
>     @Override
>     public void ack(Object msgId) {
>         super.ack(msgId);
>         noOfAckedWords++;
> //        System.out.println("OK tuple acked from bolt: " + msgId+" no
> of acked word "+noOfAckedWords);
>     }
> 
>     @Override
>     public void fail(Object msgId) {
>         super.fail(msgId);
>         noOfFailedWords++;
>         System.err.println("ERROR: " + context.getThisComponentId() + "
> " + msgId + " no of words failed " + noOfFailedWords);
> 
>     }
> }
> 
> And my bolt looks like this:
> 
> package tuc.LSH.storm.bolts;
> 
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import sun.jvm.hotspot.runtime.*;
> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
> import tuc.LSH.core.timeseries.UniversalBasicWindow;
> 
> 
> import java.lang.Thread;
> import java.util.*;
> 
> /**
>  * Created by mixtou on 17/5/15.
>  */
> public class LSHBolt extends BaseRichBolt {
>     private int task_id;
>     private OutputCollector collector;
>     private UniversalBasicWindow universalBasicWindow;
> 
>     private String streamId;
>     private String time;
>     private Float value;
> 
>     @Override
>     public void prepare(Map conf, TopologyContext topologyContext, 
> OutputCollector outputCollector) {
>         this.task_id = topologyContext.getThisTaskIndex();
>         this.collector = outputCollector;
>         this.universalBasicWindow = new UniversalBasicWindow();
>         streamId = null;
>         time = null;
>         value = 0f;
>         System.err.println("New Bolt with id: " + task_id);
> 
>     }
> 
>     @Override
>     public void execute(Tuple tuple) {
> 
>         if (tuple.getSourceStreamId().equals("sync")) {
>             System.out.println("Bolt task id " + task_id + " received from " 
> + tuple.getSourceComponent() + " message " + tuple.getString(0));
>             System.out.println("Normalizing: Basic Window of Bolt " + 
> task_id);
>             universalBasicWindow.normalize(); //fill the rest of the streams 
> with last received value to make them same size
>             universalBasicWindow = null;
>             universalBasicWindow = new UniversalBasicWindow();
> 
>             collector.ack(tuple);
>         }
> 
>         if (tuple.getSourceStreamId().equals("data")) {
> 
>             streamId = tuple.getStringByField("streamId");
>             time = tuple.getStringByField("timestamp");
>             value = Float.parseFloat(tuple.getStringByField("value"));
> 
>             universalBasicWindow.pushStream(streamId, value);
> 
>             collector.ack(tuple);
> 
>             if (universalBasicWindow.isFull(task_id)) { //check if any stream 
> of the window is full
> 
> //                System.out.println("Univ. Basic Window of bolt " + task_id 
> + " is Filled Up");
> 
>                 collector.emit("bwFilled", new Values(task_id));
> 
> //                universalBasicWindow.normalize();
> //                universalBasicWindow = new UniversalBasicWindow();
> 
> //                TODO:: Add basic window to sliding window and clear
> 
>             }
> 
>         }
> 
> 
> //        collector.ack(tuple);
>     }
> 
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer 
> outputFieldsDeclarer) {
>         outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));
> 
>     }
> }
> 
> Any Ideas??
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to