Hi,
I'm trying to implement unit tests for Storm bolts (Java). The code below works
fine and ends with a success on Storm 1.0.3:
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.887 sec
However, when I use BaseRichParrotBolt instead of BaseBasicParrotBolt (in line
46), the assertions never run and it ends with the following exception:
13610 [main] ERROR o.a.s.testing4j - Error in cluster java.lang.AssertionError:
Test timed out (10000ms) (not (every? exhausted? (spout-objects spouts)))
If you step through it with a debugger you'll see that the bolt does receive
and emit tuples, but it seems like Testing.completeTopology never returns. I
find this really odd because the bolts are virtually the same. All my bolts
extend from BaseRichBolt so I'd really like to make it work for those as well.
Any ideas?
Code:
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.ILocalCluster;
import org.apache.storm.Testing;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.CompleteTopologyParam;
import org.apache.storm.testing.MkClusterParam;
import org.apache.storm.testing.MockedSources;
import org.apache.storm.testing.TestJob;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Arrays;
import java.util.List;
import static junit.framework.Assert.*;
import org.junit.Test;
public class StormTestExample {
private final static String EVENT = "event";
private final static String SPOUT_ID = "spout";
private final static String BOLT_ID = "parrot";
private final static List<String> COMPONENT_IDS = Arrays.asList(SPOUT_ID,
BOLT_ID);
@Test
public void testBasicTopology() {
final MkClusterParam mkClusterParam = new MkClusterParam();
mkClusterParam.setSupervisors(4);
final Config daemonConf = new Config();
daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false);
mkClusterParam.setDaemonConf(daemonConf);
Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() {
@Override
public void run(ILocalCluster cluster) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, new TestSpout());
builder.setBolt(BOLT_ID, new
BaseBasicParrotBolt()).shuffleGrouping(SPOUT_ID);
StormTopology topology = builder.createTopology();
MockedSources mockedSources = new MockedSources();
mockedSources.addMockData(SPOUT_ID,
new Values("nathan"),
new Values("bob"),
new Values("joey"),
new Values("nathan"));
final Config conf = new Config();
conf.setNumWorkers(2);
final CompleteTopologyParam completeTopologyParam = new
CompleteTopologyParam();
completeTopologyParam.setMockedSources(mockedSources);
completeTopologyParam.setStormConf(conf);
final Map result = Testing.completeTopology(cluster, topology,
completeTopologyParam);
final Values expected = new Values(new Values("nathan"), new
Values("bob"), new Values("joey"),
new Values("nathan"));
for (String component : COMPONENT_IDS) {
assertTrue("Error in " + component + " output",
Testing.multiseteq(expected,
Testing.readTuples(result, component)));
}
}
});
}
private static class TestSpout extends BaseRichSpout {
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields(EVENT));
}
@Override
public void open(Map map, TopologyContext tc, SpoutOutputCollector soc)
{
throw new UnsupportedOperationException(); // Must override, but
not needed for test.
}
@Override
public void nextTuple() {
throw new UnsupportedOperationException(); // Must override, but
not needed for test.
}
}
private static class BaseBasicParrotBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields(EVENT));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector boc) {
boc.emit(new Values(tuple.getValue(0)));
}
}
private static class BaseRichParrotBolt extends BaseRichBolt {
private OutputCollector oc;
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields(EVENT));
}
@Override
public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
this.oc = oc;
}
@Override
public void execute(Tuple tuple) {
oc.emit(new Values(tuple.getValue(0)));
}
@Override
public final Map<String, Object> getComponentConfiguration() {
Config config = new Config();
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, null);
return config;
}
}
}
I also posted this on SO but haven’t been getting any responses or many views.
http://stackoverflow.com/questions/42491555/unit-testing-in-apache-storm-timeout-with-baserichbolt-but-not-with-basebasicb
<http://stackoverflow.com/questions/42491555/unit-testing-in-apache-storm-timeout-with-baserichbolt-but-not-with-basebasicb>
Thanks,
Jori