Hi Priya, You can call me the below code from your main program to shutdown your topology.
cluster.shutdown(); Thanks Bijoy On Wed, Jan 22, 2014 at 10:06 AM, Priya Ganesan < [email protected]> wrote: > > We are running the word count topology in local cluster. Is there any > procedure to kill the topology through coding. If so please guide us. We > will attach the word count topology program with this: > > *Main program:* > > import backtype.storm.Config; > import backtype.storm.LocalCluster; > import backtype.storm.topology.TopologyBuilder; > import backtype.storm.tuple.Fields; > > public class TopologyMain { > public static void main(String[] args) throws InterruptedException { > //Topology definition > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("word-reader",new WordReader()); > builder.setBolt("word-normalizer", new WordNormalizer()) > .shuffleGrouping("word-reader"); > builder.setBolt("word-counter", new WordCounter(),2) > .fieldsGrouping("word-normalizer", new Fields("word")); > //Configuration > Config conf = new Config(); > conf.put("wordsFile","E:\\words.txt"); > > conf.setDebug(false); > //Topology run > conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("Getting-Started-Toplogie", conf, > builder.createTopology()); > Thread.sleep(1000); > > } > } > > *spout program:* > > > import java.io.BufferedReader; > import java.io.FileNotFoundException; > import java.io.FileReader; > import java.io.*; > import java.util.Map; > import backtype.storm.spout.SpoutOutputCollector; > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.IRichSpout; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.tuple.Fields; > import backtype.storm.tuple.Values; > public class WordReader implements IRichSpout { > private SpoutOutputCollector collector; > Map<String, Object> count; > > private FileReader fileReader; > private boolean completed = false; > private TopologyContext context; > public boolean isDistributed() {return false;} > public void ack(Object msgId) { > System.out.println("OK:"+msgId); > } > public void close() {} > public void fail(Object msgId) { > System.out.println("FAIL:"+msgId); > } > /** > * The only thing that the methods will do It is emit each > * file line > */ > public void nextTuple() { > /** > * The nextuple it is called forever, so if we have been readed the file > * we will wait and then return > */ > if(completed){ > try { > Thread.sleep(1000); > } catch (InterruptedException e) { > //Do nothing > } > return; > } > String str; > //Open the reader > BufferedReader reader = new BufferedReader(fileReader); > try{ > //Read all lines > while((str = reader.readLine()) != null){ > /** > * By each line emmit a new value with the line as a their > */ > this.collector.emit(new Values(str),str); > } > }catch(Exception e){ > throw new RuntimeException("Error reading tuple",e); > }finally{ > completed = true; > } > } > /** > * We will create the file and get the collector object > */ > public void open(Map conf, TopologyContext context, > SpoutOutputCollector collector) { > try { > this.context = context; > this.fileReader = new FileReader(conf.get("wordsFile").toString()); > } catch (FileNotFoundException e) { > throw new RuntimeException("Error reading file["+conf.get("wordFile")+"]"); > } > this.collector = collector; > } > /** > * Declare the output field "word" > */ > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("line")); > } > public void deactivate(){} > public void activate(){} > public Map<String, Object> getComponentConfiguration(){return count;} > } > > *Bolt program - word normalizer:* > > import java.util.ArrayList; > import java.util.List; > import java.util.Map; > import backtype.storm.task.OutputCollector; > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.IRichBolt; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.tuple.Fields; > import backtype.storm.tuple.Tuple; > import backtype.storm.tuple.Values; > public class WordNormalizer implements IRichBolt { > private OutputCollector collector; > Map<String, Object> count; > public void cleanup() {} > /** > * The bolt will receive the line from the > * words file and process it to Normalize this line > * > * The normalize will be put the words in lower case > * and split the line to get all words in this > */ > public void execute(Tuple input) { > String sentence = input.getString(0); > String[] words = sentence.split(" "); > for(String word : words){ > word = word.trim(); > if(!word.isEmpty()){ > word = word.toLowerCase(); > //Emit the word > List a = new ArrayList(); > a.add(input); > collector.emit(a,new Values(word)); > } > } > // Acknowledge the tuple > collector.ack(input); > } > public void prepare(Map stormConf, TopologyContext context, > OutputCollector collector) { > this.collector = collector; > } > /** > * The bolt will only emit the field "word" > */ > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("word")); > } > public Map<String, Object> getComponentConfiguration(){return count;} > } > > *Bolt program - word counter:* > > import java.util.HashMap; > import java.util.Map; > import backtype.storm.task.OutputCollector; > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.IRichBolt; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.tuple.Tuple; > public class WordCounter implements IRichBolt { > Integer id; > String name; > Map<String, Integer> counters; > Map<String, Object> count; > private OutputCollector collector; > /** > * At the end of the spout (when the cluster is shutdown > * We will show the word counters > */ > @Override > public void cleanup() { > System.out.println("-- Word Counter ["+name+"-"+id+"] --"); > for(Map.Entry<String, Integer> entry : counters.entrySet()){ > System.out.println(entry.getKey()+": "+entry.getValue()); > } > } > /** > * On each word We will count > */ > @Override > public void execute(Tuple input) { > String str = input.getString(0); > /** > * If the word dosn't exist in the map we will create > * this, if not We will add 1 > */ > if(!counters.containsKey(str)){ > counters.put(str, 1); > }else{ > Integer c = counters.get(str) + 1; > counters.put(str, c); > } > //Set the tuple as Acknowledge > collector.ack(input); > } > /** > * On create > */ > @Override > public void prepare(Map stormConf, TopologyContext context, > OutputCollector collector) { > this.counters = new HashMap<String, Integer>(); > this.collector = collector; > this.name = context.getThisComponentId(); > this.id = context.getThisTaskId(); > } > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) {} > > public void deactivate(){} > > public Map<String, Object> getComponentConfiguration(){return count;} > > } > ------------------------------ > Date: Tue, 21 Jan 2014 19:37:10 +0530 > Subject: Re: TOPOLOGY INFORMATION IS NOT APPEARING IN STORM UI > From: [email protected] > To: [email protected] > > > There are two things to notice here: > > 1. Whether you are submitting the topology to the storm cluster or to the > local cluster. If you submit the topology to the storm cluster, you will be > able to find the topology information on storm UI. > > 2. Whether storm nimbus and storm ui parameters are correctly configured > in program and storm.yaml file respectively. > > Thanks, > Richards Peter. >
