Hi,
I have wanted to use the
https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java
to write a test for a sample topology. I think configuration is not
affecting my test topology. I have added a serialiser for SampleModel class
in the Conf but when I ran it complained it could not find the serialiser.
Code:
package com.ws.storm;
import java.io.Serializable;import java.nio.ByteBuffer;import
java.nio.CharBuffer;import java.nio.charset.Charset;import
java.nio.charset.CharsetEncoder;import java.util.HashMap;import
java.util.Map;
import org.junit.Test;
import backtype.storm.Config;import
backtype.storm.ILocalCluster;import backtype.storm.Testing;import
backtype.storm.generated.StormTopology;import
backtype.storm.spout.SpoutOutputCollector;import
backtype.storm.task.TopologyContext;import
backtype.storm.testing.CompleteTopologyParam;import
backtype.storm.testing.MkClusterParam;import
backtype.storm.testing.MockedSources;import
backtype.storm.testing.TestJob;import
backtype.storm.topology.BasicOutputCollector;import
backtype.storm.topology.OutputFieldsDeclarer;import
backtype.storm.topology.TopologyBuilder;import
backtype.storm.topology.base.BaseBasicBolt;import
backtype.storm.topology.base.BaseRichSpout;import
backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import
backtype.storm.tuple.Values;import backtype.storm.utils.Utils;
import com.esotericsoftware.kryo.Kryo;import
com.esotericsoftware.kryo.Serializer;import
com.esotericsoftware.kryo.io.Input;import
com.esotericsoftware.kryo.io.Output;
public class StormTopologyTest implements Serializable {
private static final long serialVersionUID = 2767057917773290125L;
class SampleModel {
private ByteBuffer data;
public SampleModel() {
}
public SampleModel(ByteBuffer data) {
this.data = data;
}
public ByteBuffer getData() {
return data;
}
public void setData(ByteBuffer data) {
this.data = data;
}
}
final class SampleModelSerializer extends Serializer<SampleModel> {
@Override
public SampleModel read(Kryo theKryo, Input theInput,
Class<SampleModel> theMessage) {
long aDataBytesLength = theInput.readLong();
byte[] someData = theInput.readBytes((int) aDataBytesLength);
return new SampleModel(ByteBuffer.wrap(someData));
}
@Override
public void write(Kryo theKryo, Output theOutput, SampleModel
theMessage) {
ByteBuffer someData = theMessage.getData();
byte[] someDataBytes = someData.array();
long aDataBytesLength = someDataBytes.length;
theOutput.writeLong(aDataBytesLength);
theOutput.writeBytes(someDataBytes);
}
}
class SampleSpout1 extends BaseRichSpout {
boolean _isDistributed;
SpoutOutputCollector _collector;
public SampleSpout1() throws Exception {
this(true);
}
public SampleSpout1(boolean isDistributed) {
_isDistributed = isDistributed;
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
}
public void close() {
}
public void nextTuple() {
Utils.sleep(100);
try {
Charset charset = Charset.forName("UTF-8");
CharsetEncoder encoder = charset.newEncoder();
_collector.emit(new Values(new SampleModel(encoder
.encode(CharBuffer.wrap("This is for test")))));
} catch (Exception e) {
e.printStackTrace();
}
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("record"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
if (!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
return null;
}
}
}
class SampleBolt extends BaseBasicBolt {
private static final long serialVersionUID = 6859091283261443785L;
public SampleBolt() {
}
@Override
public void execute(Tuple theTuple, BasicOutputCollector theCollector) {
SampleModel aSampleModel = (SampleModel) theTuple.getValue(0);
theCollector.emit(new Values(aSampleModel.getData()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("out"));
}
}
@Test
public void testRealTimeTopology() {
MkClusterParam mkClusterParam = new MkClusterParam();
mkClusterParam.setSupervisors(4);
Config daemonConf = new Config();
daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, true);
mkClusterParam.setDaemonConf(daemonConf);
Testing.withLocalCluster(mkClusterParam, new TestJob() {
@Override
public void run(ILocalCluster cluster) throws Exception {
StormTopology aTopology = createTopology();
Config aConfig = createConfig();
Charset charset = Charset.forName("UTF-8");
CharsetEncoder encoder = charset.newEncoder();
MockedSources mockedSources = new MockedSources();
mockedSources.addMockData("spout",
new Values(new SampleModel(encoder
.encode(CharBuffer.wrap("This is for test")))));
CompleteTopologyParam aCompleteTopologyParam = new
CompleteTopologyParam();
aCompleteTopologyParam.setStormConf(aConfig);
aCompleteTopologyParam.setMockedSources(mockedSources);
Map result = Testing.completeTopology(cluster, aTopology,
aCompleteTopologyParam);
System.out.println("\n\nResult " + result + " \n\n");
}
});
}
private StormTopology createTopology() {
TopologyBuilder aBuilder = new TopologyBuilder();
aBuilder.setSpout("spout", new SampleSpout1(true), 2);
aBuilder.setBolt("aBolt", new SampleBolt(), 2)
.shuffleGrouping("spout");
return aBuilder.createTopology();
}
private static Config createConfig() {
Config conf = new Config();
conf.setNumAckers(1);
conf.setDebug(false);
conf.setNumWorkers(2);
conf.setMaxSpoutPending(10);
conf.registerSerialization(java.nio.ByteBuffer.class);
conf.registerSerialization(SampleModel.class,
SampleModelSerializer.class);
conf.setSkipMissingKryoRegistrations(true);
conf.setFallBackOnJavaSerialization(false);
return conf;
}
}
*Error:*
java.lang.RuntimeException: java.io.NotSerializableException:
com.ws.storm.StormTopologyTest$SampleModel
at
backtype.storm.serialization.DefaultSerializationDelegate.serialize(DefaultSerializationDelegate.java:43)
at backtype.storm.utils.Utils.serialize(Utils.java:85)
at backtype.storm.thrift$serialize_component_object.invoke(thrift.clj:164)
at backtype.storm.testing$complete_topology.doInvoke(testing.clj:478)
at clojure.lang.RestFn.invoke(RestFn.java:826)
at backtype.storm.testing4j$_completeTopology.invoke(testing4j.clj:61)
at backtype.storm.Testing.completeTopology(Unknown Source)
at com.ws.storm.StormTopologyTest$1.run(StormTopologyTest.java:193)
at backtype.storm.testing4j$_withLocalCluster.invoke(testing4j.clj:87)
at backtype.storm.Testing.withLocalCluster(Unknown Source)
at
com.ws.storm.StormTopologyTest.testRealTimeTopology(StormTopologyTest.java:175)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at
org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
at
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.io.NotSerializableException:
com.ws.storm.StormTopologyTest$SampleModel
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1165)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
at java.util.ArrayList.writeObject(ArrayList.java:570)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:950)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1482)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
at java.util.ArrayList.writeObject(ArrayList.java:570)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:950)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1482)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
at
backtype.storm.serialization.DefaultSerializationDelegate.serialize(DefaultSerializationDelegate.java:39)
... 33 more
Best regards and thanks,
Asfak