We are running the word count topology in local cluster. Is there any procedure 
to kill the topology through coding. If so please guide us. We will attach the 
word count topology program with this:

Main program:
import backtype.storm.Config;import backtype.storm.LocalCluster;import 
backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;
public class TopologyMain {public static void main(String[] args) throws 
InterruptedException {//Topology definitionTopologyBuilder builder = new 
TopologyBuilder();builder.setSpout("word-reader",new 
WordReader());builder.setBolt("word-normalizer", new 
WordNormalizer()).shuffleGrouping("word-reader");builder.setBolt("word-counter",
 new WordCounter(),2).fieldsGrouping("word-normalizer", new 
Fields("word"));//ConfigurationConfig conf = new 
Config();conf.put("wordsFile","E:\\words.txt");
conf.setDebug(false);//Topology runconf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 
1);LocalCluster cluster = new 
LocalCluster();cluster.submitTopology("Getting-Started-Toplogie", 
conf,builder.createTopology());Thread.sleep(1000);
}}
spout program:

import java.io.BufferedReader;import java.io.FileNotFoundException;import 
java.io.FileReader;import java.io.*;import java.util.Map;import 
backtype.storm.spout.SpoutOutputCollector;import 
backtype.storm.task.TopologyContext;import 
backtype.storm.topology.IRichSpout;import 
backtype.storm.topology.OutputFieldsDeclarer;import 
backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;public  class  
WordReader implements IRichSpout {private SpoutOutputCollector 
collector;Map<String, Object> count;
private FileReader fileReader;private boolean completed = false;private 
TopologyContext context;public boolean isDistributed() {return false;}public 
void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() 
{}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The 
only thing that the methods will do It is emit each* file line*/public void 
nextTuple() {/*** The nextuple it is called forever, so if we have been readed 
the file* we will wait and then return*/if(completed){try {Thread.sleep(1000);} 
catch (InterruptedException e) {//Do nothing}return;}String str;//Open the 
readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all 
lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new 
value with the line as a their*/this.collector.emit(new 
Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading 
tuple",e);}finally{completed = true;}}/*** We will create the file and get the 
collector object*/public void open(Map conf, TopologyContext 
context,SpoutOutputCollector collector) {try {this.context = 
context;this.fileReader = new FileReader(conf.get("wordsFile").toString());} 
catch (FileNotFoundException e) {throw new RuntimeException("Error reading 
file["+conf.get("wordFile")+"]");}this.collector = collector;}/*** Declare the 
output field "word"*/public void declareOutputFields(OutputFieldsDeclarer 
declarer) {declarer.declare(new Fields("line"));}public void 
deactivate(){}public void activate(){}public Map<String, Object> 
getComponentConfiguration(){return count;}}
Bolt program - word normalizer:
import java.util.ArrayList;import java.util.List;import java.util.Map;import 
backtype.storm.task.OutputCollector;import 
backtype.storm.task.TopologyContext;import 
backtype.storm.topology.IRichBolt;import 
backtype.storm.topology.OutputFieldsDeclarer;import 
backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import 
backtype.storm.tuple.Values;public class WordNormalizer implements IRichBolt 
{private OutputCollector collector;Map<String, Object> count;public void 
cleanup() {}/*** The bolt will receive the line from the* words file and 
process it to Normalize this line** The normalize will be put the words in 
lower case* and split the line to get all words in this*/public void 
execute(Tuple input) {String sentence = input.getString(0);String[] words = 
sentence.split(" ");for(String word : words){word = 
word.trim();if(!word.isEmpty()){word = word.toLowerCase();//Emit the wordList a 
= new ArrayList();a.add(input);collector.emit(a,new Values(word));}}// 
Acknowledge the tuplecollector.ack(input);}public void prepare(Map stormConf, 
TopologyContext context,OutputCollector collector) {this.collector = 
collector;}/*** The bolt will only emit the field "word"*/public void 
declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new 
Fields("word"));}public Map<String, Object> getComponentConfiguration(){return 
count;}}
Bolt program - word counter:
import java.util.HashMap;import java.util.Map;import 
backtype.storm.task.OutputCollector;import 
backtype.storm.task.TopologyContext;import 
backtype.storm.topology.IRichBolt;import 
backtype.storm.topology.OutputFieldsDeclarer;import 
backtype.storm.tuple.Tuple;public class  WordCounter implements IRichBolt 
{Integer id;String name;Map<String, Integer> counters;Map<String, Object> 
count;private OutputCollector collector;/*** At the end of the spout (when the 
cluster is shutdown* We will show the word counters*/@Overridepublic void 
cleanup() {System.out.println("-- Word Counter ["+name+"-"+id+"] 
--");for(Map.Entry<String, Integer> entry : 
counters.entrySet()){System.out.println(entry.getKey()+": 
"+entry.getValue());}}/*** On each word We will count*/@Overridepublic void 
execute(Tuple input) {String str = input.getString(0);/*** If the word dosn't 
exist in the map we will create* this, if not We will add 
1*/if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = 
counters.get(str) + 1;counters.put(str, c);}//Set the tuple as 
Acknowledgecollector.ack(input);}/*** On create*/@Overridepublic void 
prepare(Map stormConf, TopologyContext context,OutputCollector collector) 
{this.counters = new HashMap<String, Integer>();this.collector = 
collector;this.name = context.getThisComponentId();this.id = 
context.getThisTaskId();}@Overridepublic void 
declareOutputFields(OutputFieldsDeclarer declarer) {}
public void deactivate(){}
public Map<String, Object> getComponentConfiguration(){return count;}
}Date: Tue, 21 Jan 2014 19:37:10 +0530
Subject: Re: TOPOLOGY INFORMATION IS NOT APPEARING IN STORM UI
From: [email protected]
To: [email protected]

There are two things to notice here:

1. Whether you are submitting the topology to the storm cluster or to the local 
cluster. If you submit the topology to the storm cluster, you will be able to 
find the topology information on storm UI.


2. Whether storm nimbus and storm ui parameters are correctly configured in 
program and storm.yaml file respectively.

Thanks,
Richards Peter.
                                          

Reply via email to