Hello,
I am trying to pass to storm configuration an object (not priminitive type),
the AESEncryption.class. I created a custom kryo serializer the AESSerializer
class and in main topology i used:
Config conf=new Config();conf.registerSerialization(AESEncryption.class,
AESSerializer.class);conf.setFallBackOnJavaSerialization(false);
LocalCluster cluster = new
LocalCluster();cluster.submitTopology("stormKafkaTopology", conf,
builder.createTopology());
public class AESEncryption{
public static final String cypherAlgorithm = "AES/CBC/PKCS5Padding";
private SecretKey privateKey;.
private IvParameterSpec entrophy;
private String encryptedMsg;
public static final int bitLength = 256;
... setters getterss methods}
public class AESSerilizer extends Serializer<AESEncryption> implements
Serializable {
public AESSerilizer() {
}
@Override
public void write(Kryo kryo, Output output, AESEncryption object) {
kryo.writeClassAndObject(output, object);
}
@Override
public AESEncryption read(Kryo kryo, Input input, Class<AESEncryption>
type) {
return (AESEncryption) kryo.readClassAndObject(input);
}
}
However, even using kryo serialization i always get the same
exception....Problem initializing cluster topology:
java.lang.IllegalArgumentException: Topology conf is not json-serializable
at org.apache.storm.testing$submit_local_topology.invoke(testing.clj:308)
~[storm-core-1.1.0.jar:1.1.0]
at
org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:49)
~[storm-core-1.1.0.jar:1.1.0]
at org.apache.storm.LocalCluster.submitTopology(Unknown Source)
~[storm-core-1.1.0.jar:1.1.0]
at
com.jrtechnologies.ifg.bsp.kafka.storm.StormKafkaStarter.main(StormKafkaStarter.java:117)
[classes/:?]
Any ideas what am i doing wrong?