Hi Damjan, Your topology is actually able to run locally and I can see see the output from your bolt: “The new sum is xxx ”.
Although (local)cluster.shutdown didn’t really work and keep printing out errors. I would like to take some time later to look into it. By the way, to upgrade from older version of topology, you just need to switch your dependency from storm-core to storm-client and recompile. Thanks, Ethan > On Jul 3, 2019, at 8:05 AM, damjan gjurovski <[email protected]> wrote: > > Hello, > I am trying to run the new version of Storm but for some reason when starting > it locally all of my output commands are not shown in the console leading me > to the conclusion that the storm program is not executed at all. The output > from Storm is enormous and I cannot see where is the problem. > > This is the content of my pom.xml file: > <groupId>com.example.exclamationTopologyEx</groupId> > <artifactId>ExclamationTopologtEx</artifactId> > <version>1.0-SNAPSHOT</version> > <properties> > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > </properties> > > <dependencies> > <dependency> > <groupId>org.apache.storm</groupId> > <artifactId>storm-client</artifactId> > <version>2.0.0</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.storm</groupId> > <artifactId>storm-server</artifactId> > <version>2.0.0</version> > </dependency> > </dependencies> > > <build> > <plugins> > <plugin> > <groupId>org.codehaus.mojo</groupId> > <artifactId>exec-maven-plugin</artifactId> > <version>1.5.0</version> > <executions> > <execution> > <goals> > <goal>exec</goal> > </goals> > </execution> > </executions> > <configuration> > <executable>java</executable> > <includeProjectDependencies>true</includeProjectDependencies> > <includePluginDependencies>false</includePluginDependencies> > <classpathScope>compile</classpathScope> > <mainClass>com.stormExample.SquareStormTopology</mainClass> > <cleanupDaemonThreads>false</cleanupDaemonThreads> > </configuration> > </plugin> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-compiler-plugin</artifactId> > <version>3.6.1</version> > <configuration> > <source>1.8</source> > <target>1.8</target> > </configuration> > </plugin> > </plugins> > </build> > > This is the main class for starting the topology: > public static void main(String[] args) throws Exception { > //used to build the toplogy > TopologyBuilder builder = new TopologyBuilder(); > //add the spout with name 'spout' and parallelism hint of 5 executors > builder.setSpout("spout", new DataSpout(),5); > //add the Emitter bolt with the name 'emitter' > builder.setBolt("emitter",new EmitterBolt(),8).shuffleGrouping("spout"); > > Config conf = new Config(); > //set to false to disable debug when running on production cluster > conf.setDebug(false); > //if there are arguments then we are running on a cluster > if(args!= null && args.length > 0){ > //parallelism hint to set the number of workers > conf.setNumWorkers(3); > //submit the toplogy > StormSubmitter.submitTopology(args[0], conf, > builder.createTopology()); > }else{//we are running locally > //the maximum number of executors > conf.setMaxTaskParallelism(3); > > //local cluster used to run locally > LocalCluster cluster = new LocalCluster(); > //submitting the topology > cluster.submitTopology("emitter-topology",conf, > builder.createTopology()); > //sleep > Thread.sleep(10000); > //shut down the cluster > cluster.shutdown(); > } > } > > Additionally, these are my spout and bolt implementations: > public class DataSpout extends BaseRichSpout { > > SpoutOutputCollector _collector; > int nextNumber; > > @Override > public void open(Map map, TopologyContext topologyContext, > SpoutOutputCollector spoutOutputCollector) { > _collector = spoutOutputCollector; > nextNumber = 2; > } > > @Override > public void nextTuple() { > if(nextNumber > 20){ > return; > }else{ > _collector.emit(new Values(nextNumber)); > > nextNumber = nextNumber + 2; > } > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer > outputFieldsDeclarer) { > outputFieldsDeclarer.declare(new Fields("number")); > } > } > public class EmitterBolt extends BaseBasicBolt { > > int sumNumbers; > > public EmitterBolt(){ > > } > > @Override > public void prepare(Map stormConf, TopologyContext context) { > sumNumbers = 0; > System.out.println("com.example.exclamationTopologyEx.EmitterBolt has > been initialized"); > } > > @Override > public void execute(Tuple tuple, BasicOutputCollector > basicOutputCollector) { > int nextNumber = tuple.getIntegerByField("number"); > > sumNumbers+=nextNumber; > > System.out.println("The new sum is: " + sumNumbers); > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer > outputFieldsDeclarer) { > > } > } > > I use the maven command: > mvn compile exec:java -Dstorm.topology=com.example.exclamationTopologyEx > for running the topology but as I said none of the print lines are shown in > the console. I have tried to search for a solution from the documentation > both on the website and in git but I couldn't find anything that can get this > simple topology to work. > > So, please if you could have a look at the code and help me get this starting > project to work. I think that the problem might be in the pom.xml file, I > might need some other dependencies or some wrong dependencies. I was > previously working with the older version of Storm where I only used the > storm-core dependency which as I saw is now split in the two new > dependencies. > > I am working on a Mac OS and using the latest version of Java and as IDE I am > using IntelliJ IDEA. > > Thank you in advance for your response. > > Regards, > Damjan > >
