Hello,
I am trying to run the new version of Storm but for some reason when
starting it locally all of my output commands are not shown in the console
leading me to the conclusion that the storm program is not executed at all.
The output from Storm is enormous and I cannot see where is the problem.

This is the content of my pom.xml file:

<groupId>com.example.exclamationTopologyEx</groupId>
<artifactId>ExclamationTopologtEx</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-client</artifactId>
        <version>2.0.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-server</artifactId>
        <version>2.0.0</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
            <version>1.5.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>exec</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <executable>java</executable>
                <includeProjectDependencies>true</includeProjectDependencies>
                <includePluginDependencies>false</includePluginDependencies>
                <classpathScope>compile</classpathScope>
                <mainClass>com.stormExample.SquareStormTopology</mainClass>
                <cleanupDaemonThreads>false</cleanupDaemonThreads>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>


This is the main class for starting the topology:

public static void main(String[] args) throws Exception {
    //used to build the toplogy
    TopologyBuilder builder = new TopologyBuilder();
    //add the spout with name 'spout' and parallelism hint of 5 executors
    builder.setSpout("spout", new DataSpout(),5);
    //add the Emitter bolt with the name 'emitter'
    builder.setBolt("emitter",new EmitterBolt(),8).shuffleGrouping("spout");

    Config conf = new Config();
    //set to false to disable debug when running on production cluster
    conf.setDebug(false);
    //if there are arguments then we are running on a cluster
    if(args!= null && args.length > 0){
        //parallelism hint to set the number of workers
        conf.setNumWorkers(3);
        //submit the toplogy
        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }else{//we are running locally
        //the maximum number of executors
        conf.setMaxTaskParallelism(3);

        //local cluster used to run locally
        LocalCluster cluster = new LocalCluster();
        //submitting the topology
        cluster.submitTopology("emitter-topology",conf,
builder.createTopology());
        //sleep
        Thread.sleep(10000);
        //shut down the cluster
        cluster.shutdown();
    }
}


Additionally, these are my spout and bolt implementations:

public class DataSpout extends BaseRichSpout {

    SpoutOutputCollector _collector;
    int nextNumber;

    @Override
    public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
        _collector = spoutOutputCollector;
        nextNumber = 2;
    }

    @Override
    public void nextTuple() {
        if(nextNumber > 20){
            return;
        }else{
            _collector.emit(new Values(nextNumber));

            nextNumber = nextNumber + 2;
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("number"));
    }
}

public class EmitterBolt extends BaseBasicBolt {

    int sumNumbers;

    public EmitterBolt(){

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        sumNumbers = 0;
        System.out.println("com.example.exclamationTopologyEx.EmitterBolt
has been initialized");
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector
basicOutputCollector) {
        int nextNumber = tuple.getIntegerByField("number");

        sumNumbers+=nextNumber;

        System.out.println("The new sum is: " + sumNumbers);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}


I use the maven comand:
*mvn compile exec:java -Dstorm.topology=com.example.exclamationTopologyEx *
for running the topology but as I said none of the print lines are shown in
the console. I have tried to search for a solution from the documentation
both on the website and in git but I couldn't find anything that can get
this simple topology to work.

So, please if you could have a look at the code and help me get this
starting project to work. I think that the problem might be in the pom.xml
file, I might need some other dependencies or some wrong dependencies. I
was previously working with the older version of Storm where I only used
the *storm-core *dependency which as I saw is now split in the two new
dependencies.

I am working on a Mac OS and using the latest version of Java and as IDE I
am using Intellij IDEA.

Thank you in advance for your response.

Regards,
Damjan

Reply via email to