I tried your solution but nothing happened. I still continue to loose many
values. However i found a file reader spout implementation that worked without
loosing a single value. Bellow is the code for anyone that might have the same
problem:
package tuc.LSH.storm.bolts;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import sun.jvm.hotspot.runtime.*;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;
import tuc.LSH.core.timeseries.UniversalBasicWindow;
import java.lang.Thread;
import java.util.*;
/**
* Created by mixtou on 17/5/15.
*/
public class LSHBolt extends BaseRichBolt {
private int task_id;
private OutputCollector collector;
private UniversalBasicWindow universalBasicWindow;
private String streamId;
private String time;
private Float value;
@Override
public void prepare(Map conf, TopologyContext topologyContext,
OutputCollector outputCollector) {
this.task_id = topologyContext.getThisTaskIndex();
this.collector = outputCollector;
this.universalBasicWindow = new UniversalBasicWindow();
streamId = null;
time = null;
value = 0f;
System.err.println("New Bolt with id: " + task_id);
}
@Override
public void execute(Tuple tuple) {
if (tuple.getSourceStreamId().equals("sync")) {
System.out.println("Bolt task id " + task_id + " received from " +
tuple.getSourceComponent() + " message " + tuple.getString(0));
System.out.println("Normalizing: Basic Window of Bolt " + task_id);
universalBasicWindow.normalize(); //fill the rest of the streams
with last received value to make them same size
universalBasicWindow = null;
universalBasicWindow = new UniversalBasicWindow();
}
if (tuple.getSourceStreamId().equals("data")) {
streamId = tuple.getStringByField("streamId");
time = tuple.getStringByField("timestamp");
value = Float.parseFloat(tuple.getStringByField("value"));
universalBasicWindow.pushStream(streamId, value);
if (universalBasicWindow.isFull(task_id)) { //check if any stream
of the window is full
// System.out.println("Univ. Basic Window of bolt " + task_id +
" is Filled Up");
collector.emit("bwFilled", new Values(task_id));
// universalBasicWindow.normalize();
// universalBasicWindow = new UniversalBasicWindow();
// TODO:: Add basic window to sliding window and clear
}
}
// System.err.println("SourceComponent: "+tuple.getSourceComponent());
// System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
// System.err.println("Source Task: "+tuple.getSourceTask());
// System.err.println("SourceGlobalStreamId:
"+tuple.getSourceGlobalStreamid());
// System.err.println("MessageId: "+tuple.getMessageId());
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));
}
}
> On 4 Ιουν 2015, at 20:29, Matthias J. Sax <[email protected]>
> wrote:
>
> Hi,
>
> You are looping within "nextTuple()" to emit a tuple for each lines for
> the whole file. This is "bad practice" because the spout is prevented to
> take "acks" while "nextTuple()" is executing. I guess, that is the
> reason why your tuples time out and fail.
>
> You should return from "nextTuple()" after emitting a single record.
> This should solve your problem as the Spout can process incoming acks
> between to calls to "nextTuple()". Just instantiate your BufferedReader
> in "open()" and close it if "this.completed" is true.
>
> Hope this helps.
>
>
> -Matthias
>
>
> On 06/04/2015 07:01 PM, Michail Toutoudakis wrote:
>> I am using a bolt to read data from a text file, and send them to a
>> bolt. However i am loosing too many values, the Fail.
>> I am newbie to storm and i don’t know where to look to debug this issue.
>> Any ideas??
>>
>> My Storm Spout code is:
>>
>> package tuc.LSH.storm.spouts;
>>
>> import backtype.storm.spout.SpoutOutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichSpout;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Values;
>> import backtype.storm.utils.Utils;
>> import tuc.LSH.conf.Consts;
>>
>> import java.io.BufferedReader;
>> import java.io.FileNotFoundException;
>> import java.io.FileReader;
>> import java.util.Map;
>>
>> /**
>> * Created by mixtou on 15/5/15.
>> */
>> public class FileReaderSpout extends BaseRichSpout {
>>
>> private SpoutOutputCollector collector;
>> private FileReader fileReader;
>> private boolean completed;
>> private TopologyContext context;
>> private int spout_idx;
>> private int spout_id;
>> private Map config;
>> private int noOfFailedWords;
>> private int noOfAckedWords;
>>
>>
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer
>> outputFieldsDeclarer) {
>> outputFieldsDeclarer.declareStream("data",
>> new Fields("streamId", "timestamp", "value"));
>>
>>
>> }
>>
>> @Override
>> public void open(Map config, TopologyContext
>> topologyContext, SpoutOutputCollector spoutOutputCollector) {
>> this.context = topologyContext;
>> this.spout_idx = context.getThisTaskIndex();
>> this.spout_id = context.getThisTaskId();
>> this.collector = spoutOutputCollector;
>> this.config = config;
>> this.completed = false;
>> this.noOfFailedWords = 0;
>> this.noOfAckedWords = 0;
>>
>> try {
>> System.err.println("Reading File: " +
>> config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>>
>> this.fileReader = new FileReader(config.get(file_to_read()).toString());
>> } catch (FileNotFoundException e) {
>> throw new RuntimeException("Error reading file [" +
>> config.get(file_to_read()) + "]");
>> }
>>
>> }
>>
>> @Override
>> public void nextTuple() {
>>
>> if (this.completed) {
>> return;
>> }
>>
>> try {
>>
>> String str;
>> BufferedReader reader = new BufferedReader(fileReader);
>>
>> while ((str = reader.readLine()) != null) {
>>
>> if (str.isEmpty()) {
>> return;
>> }
>> String[] temp = str.split(",");
>> // System.err.println("============== "+temp[0] + " +
>> " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>>
>> collector.emit("data",
>> new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct
>> data to next bolt with guarantee delivery
>>
>> }
>>
>> } catch (Exception e) {
>> System.err.println("Error Reading Tuple: " + e);
>> throw new RuntimeException("Error Reading Tuple ", e);
>> } finally {
>> System.err.println("Finished Reading File. Message From
>> Spout: " + spout_idx);
>> this.completed = true;
>>
>> }
>>
>> Utils.sleep(100);
>> }
>>
>> private String file_to_read() {
>> // this.spout_id = context.getThisTaskId();
>> if (Consts.NO_OF_SPOUTS > 1) {
>> int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>> return "data" + file_no;
>> } else {
>> return "data";
>> }
>> }
>>
>> @Override
>> public void ack(Object msgId) {
>> super.ack(msgId);
>> noOfAckedWords++;
>> // System.out.println("OK tuple acked from bolt: " + msgId+" no
>> of acked word "+noOfAckedWords);
>> }
>>
>> @Override
>> public void fail(Object msgId) {
>> super.fail(msgId);
>> noOfFailedWords++;
>> System.err.println("ERROR: " + context.getThisComponentId() + "
>> " + msgId + " no of words failed " + noOfFailedWords);
>>
>> }
>> }
>>
>> And my bolt looks like this:
>>
>> package tuc.LSH.storm.bolts;
>>
>> import backtype.storm.task.OutputCollector;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseRichBolt;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Tuple;
>> import backtype.storm.tuple.Values;
>> import sun.jvm.hotspot.runtime.*;
>> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>>
>>
>> import java.lang.Thread;
>> import java.util.*;
>>
>> /**
>> * Created by mixtou on 17/5/15.
>> */
>> public class LSHBolt extends BaseRichBolt {
>> private int task_id;
>> private OutputCollector collector;
>> private UniversalBasicWindow universalBasicWindow;
>>
>> private String streamId;
>> private String time;
>> private Float value;
>>
>> @Override
>> public void prepare(Map conf, TopologyContext topologyContext,
>> OutputCollector outputCollector) {
>> this.task_id = topologyContext.getThisTaskIndex();
>> this.collector = outputCollector;
>> this.universalBasicWindow = new UniversalBasicWindow();
>> streamId = null;
>> time = null;
>> value = 0f;
>> System.err.println("New Bolt with id: " + task_id);
>>
>> }
>>
>> @Override
>> public void execute(Tuple tuple) {
>>
>> if (tuple.getSourceStreamId().equals("sync")) {
>> System.out.println("Bolt task id " + task_id + " received from "
>> + tuple.getSourceComponent() + " message " + tuple.getString(0));
>> System.out.println("Normalizing: Basic Window of Bolt " +
>> task_id);
>> universalBasicWindow.normalize(); //fill the rest of the streams
>> with last received value to make them same size
>> universalBasicWindow = null;
>> universalBasicWindow = new UniversalBasicWindow();
>>
>> collector.ack(tuple);
>> }
>>
>> if (tuple.getSourceStreamId().equals("data")) {
>>
>> streamId = tuple.getStringByField("streamId");
>> time = tuple.getStringByField("timestamp");
>> value = Float.parseFloat(tuple.getStringByField("value"));
>>
>> universalBasicWindow.pushStream(streamId, value);
>>
>> collector.ack(tuple);
>>
>> if (universalBasicWindow.isFull(task_id)) { //check if any stream
>> of the window is full
>>
>> // System.out.println("Univ. Basic Window of bolt " + task_id
>> + " is Filled Up");
>>
>> collector.emit("bwFilled", new Values(task_id));
>>
>> // universalBasicWindow.normalize();
>> // universalBasicWindow = new UniversalBasicWindow();
>>
>> // TODO:: Add basic window to sliding window and clear
>>
>> }
>>
>> }
>>
>>
>> // collector.ack(tuple);
>> }
>>
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer
>> outputFieldsDeclarer) {
>> outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));
>>
>> }
>> }
>>
>> Any Ideas??
>>
>>
>