Hi Priya,

You can call me the below code from your main program to shutdown your
topology.

cluster.shutdown();

Thanks
Bijoy


On Wed, Jan 22, 2014 at 10:06 AM, Priya Ganesan <
[email protected]> wrote:

>
> We are running the word count topology in local cluster. Is there any
> procedure to kill the topology through coding. If so please guide us. We
> will attach the word count topology program with this:
>
> *Main program:*
>
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import backtype.storm.topology.TopologyBuilder;
> import backtype.storm.tuple.Fields;
>
> public class TopologyMain {
> public static void main(String[] args) throws InterruptedException {
> //Topology definition
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("word-reader",new WordReader());
> builder.setBolt("word-normalizer", new WordNormalizer())
> .shuffleGrouping("word-reader");
> builder.setBolt("word-counter", new WordCounter(),2)
> .fieldsGrouping("word-normalizer", new Fields("word"));
> //Configuration
> Config conf = new Config();
> conf.put("wordsFile","E:\\words.txt");
>
> conf.setDebug(false);
> //Topology run
> conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
> LocalCluster cluster = new LocalCluster();
> cluster.submitTopology("Getting-Started-Toplogie", conf,
> builder.createTopology());
> Thread.sleep(1000);
>
> }
> }
>
> *spout program:*
>
>
> import java.io.BufferedReader;
> import java.io.FileNotFoundException;
> import java.io.FileReader;
> import java.io.*;
> import java.util.Map;
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichSpout;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> public  class  WordReader implements IRichSpout {
> private SpoutOutputCollector collector;
> Map<String, Object> count;
>
> private FileReader fileReader;
> private boolean completed = false;
> private TopologyContext context;
> public boolean isDistributed() {return false;}
> public void ack(Object msgId) {
> System.out.println("OK:"+msgId);
> }
> public void close() {}
> public void fail(Object msgId) {
> System.out.println("FAIL:"+msgId);
> }
> /**
> * The only thing that the methods will do It is emit each
> * file line
> */
> public void nextTuple() {
> /**
> * The nextuple it is called forever, so if we have been readed the file
> * we will wait and then return
> */
> if(completed){
> try {
> Thread.sleep(1000);
> } catch (InterruptedException e) {
> //Do nothing
> }
> return;
> }
> String str;
> //Open the reader
> BufferedReader reader = new BufferedReader(fileReader);
> try{
> //Read all lines
> while((str = reader.readLine()) != null){
> /**
> * By each line emmit a new value with the line as a their
> */
> this.collector.emit(new Values(str),str);
> }
> }catch(Exception e){
> throw new RuntimeException("Error reading tuple",e);
> }finally{
> completed = true;
> }
> }
> /**
> * We will create the file and get the collector object
> */
> public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
> try {
> this.context = context;
> this.fileReader = new FileReader(conf.get("wordsFile").toString());
> } catch (FileNotFoundException e) {
> throw new RuntimeException("Error reading file["+conf.get("wordFile")+"]");
> }
> this.collector = collector;
> }
> /**
> * Declare the output field "word"
> */
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("line"));
> }
> public void deactivate(){}
> public void activate(){}
> public Map<String, Object> getComponentConfiguration(){return count;}
> }
>
> *Bolt program - word normalizer:*
>
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichBolt;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> public class WordNormalizer implements IRichBolt {
> private OutputCollector collector;
> Map<String, Object> count;
> public void cleanup() {}
> /**
> * The bolt will receive the line from the
> * words file and process it to Normalize this line
> *
> * The normalize will be put the words in lower case
> * and split the line to get all words in this
> */
> public void execute(Tuple input) {
> String sentence = input.getString(0);
> String[] words = sentence.split(" ");
> for(String word : words){
> word = word.trim();
> if(!word.isEmpty()){
> word = word.toLowerCase();
> //Emit the word
> List a = new ArrayList();
> a.add(input);
> collector.emit(a,new Values(word));
> }
> }
> // Acknowledge the tuple
> collector.ack(input);
> }
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> this.collector = collector;
> }
> /**
> * The bolt will only emit the field "word"
> */
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("word"));
> }
> public Map<String, Object> getComponentConfiguration(){return count;}
> }
>
> *Bolt program - word counter:*
>
> import java.util.HashMap;
> import java.util.Map;
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichBolt;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.tuple.Tuple;
> public class  WordCounter implements IRichBolt {
> Integer id;
> String name;
> Map<String, Integer> counters;
> Map<String, Object> count;
> private OutputCollector collector;
> /**
> * At the end of the spout (when the cluster is shutdown
> * We will show the word counters
> */
> @Override
> public void cleanup() {
> System.out.println("-- Word Counter ["+name+"-"+id+"] --");
> for(Map.Entry<String, Integer> entry : counters.entrySet()){
> System.out.println(entry.getKey()+": "+entry.getValue());
> }
> }
> /**
> * On each word We will count
> */
> @Override
> public void execute(Tuple input) {
> String str = input.getString(0);
> /**
> * If the word dosn't exist in the map we will create
> * this, if not We will add 1
> */
> if(!counters.containsKey(str)){
> counters.put(str, 1);
> }else{
> Integer c = counters.get(str) + 1;
> counters.put(str, c);
> }
> //Set the tuple as Acknowledge
> collector.ack(input);
> }
> /**
> * On create
> */
> @Override
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> this.counters = new HashMap<String, Integer>();
> this.collector = collector;
> this.name = context.getThisComponentId();
> this.id = context.getThisTaskId();
> }
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {}
>
> public void deactivate(){}
>
> public Map<String, Object> getComponentConfiguration(){return count;}
>
> }
> ------------------------------
> Date: Tue, 21 Jan 2014 19:37:10 +0530
> Subject: Re: TOPOLOGY INFORMATION IS NOT APPEARING IN STORM UI
> From: [email protected]
> To: [email protected]
>
>
> There are two things to notice here:
>
> 1. Whether you are submitting the topology to the storm cluster or to the
> local cluster. If you submit the topology to the storm cluster, you will be
> able to find the topology information on storm UI.
>
> 2. Whether storm nimbus and storm ui parameters are correctly configured
> in program and storm.yaml file respectively.
>
> Thanks,
> Richards Peter.
>

Reply via email to