Dear Community,
My storm topology (2.2.0), deployed on Docker Swarm crashes sometimes with:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.io.NotSerializableException:
org.apache.storm.tuple.TupleImplException
in a bolt extending the BaseStatefulWindowedBolt (my "windowbolt").
Interestingly, this seems to happen only when some throughput level is reached
or network latency / packet loss occurs (I inject such network conditions on
one of the storm docker containers for benchmarking purposes).
After one of this bolts' tasks failed, the others usually also do after some
point and the topology is not able to recover from that failure in a state-ful
way.
As the exception seems to not involve any custom classes i am quite unsure
about the reason behind the failure.
Is this possibly an implementation / configuration problem on my side, or a
problem / maybe even normal behavior of Apache storm under these conditions?
* Error fetched by the storm cli:
{"Comp-Errors":{"windowbolt":"java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.io.NotSerializableException: org.apache.storm.tuple.TupleImpl\n\tat
org.apache.storm.utils.Utils$1.run(Utils.java:409)\n\tat
java.base\/java.lang.Thread.run(Unknown Source)\nCaused by:
java.lang.RuntimeException: java.lang.RuntimeException:
java.io.NotSerializableException: org.apache.storm.tuple.TupleImpl\n\tat
org.apache.storm.executor.Executor.accept(Executor.java:290)\n\tat
org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:131)\n\tat
org.apache.storm.utils.JCQueue.consume(JCQueue.java:111)\n\tat
org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:172)\n\tat
org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:159)\n\tat
org.apache.storm.utils.Utils$1.run(Utils.java:394)\n\t... 1 more\nCaused by:
java.lang.RuntimeException: java.io.NotSerializableException:
org.apache.storm.tuple.TupleImpl\n\tat
org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:36)\n\tat
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)\n\tat
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)\n\tat
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)\n\tat
com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534)\n\tat
org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38)\n\tat
org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40)\n\tat
org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:116)\n\tat
org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:524)\n\tat
org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:68)\n\tat
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112)\n\tat
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)\n\tat
org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)\n\tat
org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)\n\tat
org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)\n\tat
org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)\n\tat
org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42)\n\tat
org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313)\n\tat
org.apache.storm.topology.PersistentWindowedBoltExecutor.execute(PersistentWindowedBoltExecutor.java:137)\n\tat
org.apache.storm.topology.StatefulBoltExecutor.doExecute(StatefulBoltExecutor.java:145)\n\tat
org.apache.storm.topology.StatefulBoltExecutor.handleTuple(StatefulBoltExecutor.java:137)\n\tat
org.apache.storm.topology.BaseStatefulBoltExecutor.execute(BaseStatefulBoltExecutor.java:71)\n\tat
org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:236)\n\tat
org.apache.storm.executor.Executor.accept(Executor.java:283)\n\t... 6
more\nCaused by: java.io.NotSerializableException:
org.apache.storm.tuple.TupleImpl\n\tat
java.base\/java.io.ObjectOutputStream.writeObject0(Unknown Source)\n\tat
java.base\/java.io.ObjectOutputStream.writeObject(Unknown Source)\n\tat
org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:33)\n\t...
29 more\n"},"Topology Name":"KafkaTopology"}
* "windowbolt" implementation:
public class StatefulWindowBolt extends
BaseStatefulWindowedBolt<KeyValueState<String, AvgState>> {
private OutputCollector collector;
private Counter counter;
private Counter windowCounter;
private KeyValueState<String, AvgState> state;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector
collector) {
this.collector = collector;
this.counter = context.registerCounter("WindowBolt_Executed");
this.windowCounter = context.registerCounter("WindowBolt_WindowNumber");
}
@Override
public void execute(TupleWindow inputWindow) {
long window_sum = 0;
long window_length = 0;
long ts = 0;
long max_ts = 0;
long start_event_time = inputWindow.getStartTimestamp();
long end_event_time = inputWindow.getEndTimestamp();
long partition = -1;
String note = "/";
Map<String, AvgState> map = new HashMap<String, AvgState>();
Iterator<Tuple> it = inputWindow.getIter();
while (it.hasNext()) {
Tuple tuple = it.next();
if (window_length == 0){
//same for whole window because of FieldsGrouping by partition
partition = tuple.getIntegerByField("partition");
note = tuple.getStringByField("note");
}
Long sensordata = tuple.getLongByField("sensordata");
window_sum += sensordata;
ts = tuple.getLongByField("timestamp");
if (ts > max_ts) {
max_ts = ts;
} else {
//
}
String city = tuple.getStringByField("city");
AvgState state = map.get(city);
if (state == null){
state = new AvgState(0,0);
}
map.put(city, new AvgState(state.sum+sensordata, state.count + 1));
counter.inc();
window_length++;
}
long window_avg = window_sum / window_length;
// emit the results
JSONObject json_message = new JSONObject();
json_message.put("window_avg", window_avg);
json_message.put("start_event_time", start_event_time);
json_message.put("end_event_time", end_event_time);
json_message.put("window_size", window_length);
json_message.put("last_event_ts", max_ts);
json_message.put("count_per_city", print(map));
json_message.put("partition", partition);
json_message.put("note", note);
String kafkaMessage = json_message.toString();
String kafkaKey = "window_id: " + windowCounter.getCount();
collector.emit(new Values(kafkaKey, kafkaMessage));
windowCounter.inc();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("key", "message"));
}
public String print(Map<String, AvgState> map) {
StringBuilder mapAsString = new StringBuilder("{");
for (String key : map.keySet()) {
AvgState state = map.get(key);
mapAsString.append(key + "=" + state.count + ", ");
}
mapAsString.delete(mapAsString.length()-2,
mapAsString.length()).append("}");
return mapAsString.toString();
}
@Override
public void initState(KeyValueState<String, AvgState> state) {
this.state = state;
}
}
in my main i set:
config.setFallBackOnJavaSerialization(true);
config.registerSerialization(AvgState.class);
Stackoverflow link for better formatting:
https://stackoverflow.com/questions/70070296/java-io-notserializableexception-org-apache-storm-tuple-tupleimpl-in-storm-base
[https://cdn.sstatic.net/Sites/stackoverflow/Img/[email protected]?v=73d79a89bded]<https://stackoverflow.com/questions/70070296/java-io-notserializableexception-org-apache-storm-tuple-tupleimpl-in-storm-base>
serialization - Java.io.NotSerializableException:
org.apache.storm.tuple.TupleImpl in Storm BaseStatefulWindowedBolt (2.2.0) -
Stack
Overflow<https://stackoverflow.com/questions/70070296/java-io-notserializableexception-org-apache-storm-tuple-tupleimpl-in-storm-base>
stackoverflow.com
My storm topology (2.2.0) crashes sometimes with: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.io.NotSerializableException: org.apache.storm.tuple.
Best Regards
Johannes Friedlein