We are running the word count topology in local cluster. Is there any procedure
to kill the topology through coding. If so please guide us. We will attach the
word count topology program with this:
Main program:
import backtype.storm.Config;import backtype.storm.LocalCluster;import
backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;
public class TopologyMain {public static void main(String[] args) throws
InterruptedException {//Topology definitionTopologyBuilder builder = new
TopologyBuilder();builder.setSpout("word-reader",new
WordReader());builder.setBolt("word-normalizer", new
WordNormalizer()).shuffleGrouping("word-reader");builder.setBolt("word-counter",
new WordCounter(),2).fieldsGrouping("word-normalizer", new
Fields("word"));//ConfigurationConfig conf = new
Config();conf.put("wordsFile","E:\\words.txt");
conf.setDebug(false);//Topology runconf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,
1);LocalCluster cluster = new
LocalCluster();cluster.submitTopology("Getting-Started-Toplogie",
conf,builder.createTopology());Thread.sleep(1000);
}}
spout program:
import java.io.BufferedReader;import java.io.FileNotFoundException;import
java.io.FileReader;import java.io.*;import java.util.Map;import
backtype.storm.spout.SpoutOutputCollector;import
backtype.storm.task.TopologyContext;import
backtype.storm.topology.IRichSpout;import
backtype.storm.topology.OutputFieldsDeclarer;import
backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;public class
WordReader implements IRichSpout {private SpoutOutputCollector
collector;Map<String, Object> count;
private FileReader fileReader;private boolean completed = false;private
TopologyContext context;public boolean isDistributed() {return false;}public
void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close()
{}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The
only thing that the methods will do It is emit each* file line*/public void
nextTuple() {/*** The nextuple it is called forever, so if we have been readed
the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);}
catch (InterruptedException e) {//Do nothing}return;}String str;//Open the
readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all
lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new
value with the line as a their*/this.collector.emit(new
Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading
tuple",e);}finally{completed = true;}}/*** We will create the file and get the
collector object*/public void open(Map conf, TopologyContext
context,SpoutOutputCollector collector) {try {this.context =
context;this.fileReader = new FileReader(conf.get("wordsFile").toString());}
catch (FileNotFoundException e) {throw new RuntimeException("Error reading
file["+conf.get("wordFile")+"]");}this.collector = collector;}/*** Declare the
output field "word"*/public void declareOutputFields(OutputFieldsDeclarer
declarer) {declarer.declare(new Fields("line"));}public void
deactivate(){}public void activate(){}public Map<String, Object>
getComponentConfiguration(){return count;}}
Bolt program - word normalizer:
import java.util.ArrayList;import java.util.List;import java.util.Map;import
backtype.storm.task.OutputCollector;import
backtype.storm.task.TopologyContext;import
backtype.storm.topology.IRichBolt;import
backtype.storm.topology.OutputFieldsDeclarer;import
backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import
backtype.storm.tuple.Values;public class WordNormalizer implements IRichBolt
{private OutputCollector collector;Map<String, Object> count;public void
cleanup() {}/*** The bolt will receive the line from the* words file and
process it to Normalize this line** The normalize will be put the words in
lower case* and split the line to get all words in this*/public void
execute(Tuple input) {String sentence = input.getString(0);String[] words =
sentence.split(" ");for(String word : words){word =
word.trim();if(!word.isEmpty()){word = word.toLowerCase();//Emit the wordList a
= new ArrayList();a.add(input);collector.emit(a,new Values(word));}}//
Acknowledge the tuplecollector.ack(input);}public void prepare(Map stormConf,
TopologyContext context,OutputCollector collector) {this.collector =
collector;}/*** The bolt will only emit the field "word"*/public void
declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new
Fields("word"));}public Map<String, Object> getComponentConfiguration(){return
count;}}
Bolt program - word counter:
import java.util.HashMap;import java.util.Map;import
backtype.storm.task.OutputCollector;import
backtype.storm.task.TopologyContext;import
backtype.storm.topology.IRichBolt;import
backtype.storm.topology.OutputFieldsDeclarer;import
backtype.storm.tuple.Tuple;public class WordCounter implements IRichBolt
{Integer id;String name;Map<String, Integer> counters;Map<String, Object>
count;private OutputCollector collector;/*** At the end of the spout (when the
cluster is shutdown* We will show the word counters*/@Overridepublic void
cleanup() {System.out.println("-- Word Counter ["+name+"-"+id+"]
--");for(Map.Entry<String, Integer> entry :
counters.entrySet()){System.out.println(entry.getKey()+":
"+entry.getValue());}}/*** On each word We will count*/@Overridepublic void
execute(Tuple input) {String str = input.getString(0);/*** If the word dosn't
exist in the map we will create* this, if not We will add
1*/if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c =
counters.get(str) + 1;counters.put(str, c);}//Set the tuple as
Acknowledgecollector.ack(input);}/*** On create*/@Overridepublic void
prepare(Map stormConf, TopologyContext context,OutputCollector collector)
{this.counters = new HashMap<String, Integer>();this.collector =
collector;this.name = context.getThisComponentId();this.id =
context.getThisTaskId();}@Overridepublic void
declareOutputFields(OutputFieldsDeclarer declarer) {}
public void deactivate(){}
public Map<String, Object> getComponentConfiguration(){return count;}
}Date: Tue, 21 Jan 2014 19:37:10 +0530
Subject: Re: TOPOLOGY INFORMATION IS NOT APPEARING IN STORM UI
From: [email protected]
To: [email protected]
There are two things to notice here:
1. Whether you are submitting the topology to the storm cluster or to the local
cluster. If you submit the topology to the storm cluster, you will be able to
find the topology information on storm UI.
2. Whether storm nimbus and storm ui parameters are correctly configured in
program and storm.yaml file respectively.
Thanks,
Richards Peter.