Dear Community,

My storm topology (2.2.0), deployed on Docker Swarm crashes sometimes with:

java.lang.RuntimeException: java.lang.RuntimeException: 
java.lang.RuntimeException: java.io.NotSerializableException: 
org.apache.storm.tuple.TupleImplException


in a bolt extending the BaseStatefulWindowedBolt (my "windowbolt").


Interestingly, this seems to happen only when some throughput level is reached 
or network latency / packet loss occurs (I inject such network conditions on 
one of the storm docker containers for benchmarking purposes).

After one of this bolts' tasks failed, the others usually also do after some 
point and the topology is not able to recover from that failure in a state-ful 
way.

As the exception seems to not involve any custom classes i am quite unsure 
about the reason behind the failure.

Is this possibly an implementation / configuration problem on my side, or a 
problem / maybe even normal behavior of Apache storm under these conditions?

  *   Error fetched by the storm cli:

{"Comp-Errors":{"windowbolt":"java.lang.RuntimeException: 
java.lang.RuntimeException: java.lang.RuntimeException: 
java.io.NotSerializableException: org.apache.storm.tuple.TupleImpl\n\tat 
org.apache.storm.utils.Utils$1.run(Utils.java:409)\n\tat 
java.base\/java.lang.Thread.run(Unknown Source)\nCaused by: 
java.lang.RuntimeException: java.lang.RuntimeException: 
java.io.NotSerializableException: org.apache.storm.tuple.TupleImpl\n\tat 
org.apache.storm.executor.Executor.accept(Executor.java:290)\n\tat 
org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:131)\n\tat 
org.apache.storm.utils.JCQueue.consume(JCQueue.java:111)\n\tat 
org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:172)\n\tat 
org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:159)\n\tat 
org.apache.storm.utils.Utils$1.run(Utils.java:394)\n\t... 1 more\nCaused by: 
java.lang.RuntimeException: java.io.NotSerializableException: 
org.apache.storm.tuple.TupleImpl\n\tat 
org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:36)\n\tat
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)\n\tat 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)\n\tat
 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)\n\tat
 com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534)\n\tat 
org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38)\n\tat
 
org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40)\n\tat
 
org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:116)\n\tat
 
org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:524)\n\tat
 
org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:68)\n\tat
 
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112)\n\tat
 
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)\n\tat
 org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)\n\tat 
org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)\n\tat 
org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)\n\tat 
org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)\n\tat 
org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42)\n\tat 
org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313)\n\tat
 
org.apache.storm.topology.PersistentWindowedBoltExecutor.execute(PersistentWindowedBoltExecutor.java:137)\n\tat
 
org.apache.storm.topology.StatefulBoltExecutor.doExecute(StatefulBoltExecutor.java:145)\n\tat
 
org.apache.storm.topology.StatefulBoltExecutor.handleTuple(StatefulBoltExecutor.java:137)\n\tat
 
org.apache.storm.topology.BaseStatefulBoltExecutor.execute(BaseStatefulBoltExecutor.java:71)\n\tat
 
org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:236)\n\tat
 org.apache.storm.executor.Executor.accept(Executor.java:283)\n\t... 6 
more\nCaused by: java.io.NotSerializableException: 
org.apache.storm.tuple.TupleImpl\n\tat 
java.base\/java.io.ObjectOutputStream.writeObject0(Unknown Source)\n\tat 
java.base\/java.io.ObjectOutputStream.writeObject(Unknown Source)\n\tat 
org.apache.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:33)\n\t...
 29 more\n"},"Topology Name":"KafkaTopology"}


  *   "windowbolt" implementation:


public class StatefulWindowBolt extends 
BaseStatefulWindowedBolt<KeyValueState<String, AvgState>> {
    private OutputCollector collector;
    private Counter counter;
    private Counter windowCounter;
    private KeyValueState<String, AvgState> state;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector 
collector) {
        this.collector = collector;
        this.counter = context.registerCounter("WindowBolt_Executed");
        this.windowCounter = context.registerCounter("WindowBolt_WindowNumber");
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        long window_sum = 0;
        long window_length = 0;
        long ts = 0;
        long max_ts = 0;
        long start_event_time = inputWindow.getStartTimestamp();
        long end_event_time = inputWindow.getEndTimestamp();
        long partition = -1;
        String note = "/";

        Map<String, AvgState> map = new HashMap<String, AvgState>();
        Iterator<Tuple> it = inputWindow.getIter();

        while (it.hasNext()) {
            Tuple tuple = it.next();
            if (window_length == 0){
                //same for whole window because of FieldsGrouping by partition
                partition = tuple.getIntegerByField("partition");
                note = tuple.getStringByField("note");
            }
            Long sensordata = tuple.getLongByField("sensordata");
            window_sum += sensordata;
            ts = tuple.getLongByField("timestamp");

            if (ts > max_ts) {
                max_ts = ts;
            } else {
                //
            }
            String city = tuple.getStringByField("city");
            AvgState state = map.get(city);
            if (state == null){
                state = new AvgState(0,0);
            }
            map.put(city, new AvgState(state.sum+sensordata, state.count + 1));
            counter.inc();
            window_length++;
        }

        long window_avg = window_sum / window_length;

        // emit the results
        JSONObject json_message = new JSONObject();
        json_message.put("window_avg", window_avg);
        json_message.put("start_event_time", start_event_time);
        json_message.put("end_event_time", end_event_time);
        json_message.put("window_size", window_length);
        json_message.put("last_event_ts", max_ts);
        json_message.put("count_per_city", print(map));
        json_message.put("partition", partition);
        json_message.put("note", note);
        String kafkaMessage = json_message.toString();
        String kafkaKey = "window_id: " + windowCounter.getCount();

        collector.emit(new Values(kafkaKey, kafkaMessage));
        windowCounter.inc();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("key", "message"));

    }
    public String print(Map<String, AvgState> map) {
        StringBuilder mapAsString = new StringBuilder("{");
        for (String key : map.keySet()) {
            AvgState state = map.get(key);
            mapAsString.append(key + "=" + state.count + ", ");
        }
        mapAsString.delete(mapAsString.length()-2, 
mapAsString.length()).append("}");
        return mapAsString.toString();
    }
    @Override
    public void initState(KeyValueState<String, AvgState> state) {
    this.state = state;
    }
}



in my main i set:

config.setFallBackOnJavaSerialization(true);
config.registerSerialization(AvgState.class);


Stackoverflow link for better formatting: 
https://stackoverflow.com/questions/70070296/java-io-notserializableexception-org-apache-storm-tuple-tupleimpl-in-storm-base
[https://cdn.sstatic.net/Sites/stackoverflow/Img/[email protected]?v=73d79a89bded]<https://stackoverflow.com/questions/70070296/java-io-notserializableexception-org-apache-storm-tuple-tupleimpl-in-storm-base>

serialization - Java.io.NotSerializableException: 
org.apache.storm.tuple.TupleImpl in Storm BaseStatefulWindowedBolt (2.2.0) - 
Stack 
Overflow<https://stackoverflow.com/questions/70070296/java-io-notserializableexception-org-apache-storm-tuple-tupleimpl-in-storm-base>
stackoverflow.com
My storm topology (2.2.0) crashes sometimes with: java.lang.RuntimeException: 
java.lang.RuntimeException: java.lang.RuntimeException: 
java.io.NotSerializableException: org.apache.storm.tuple.



Best Regards

Johannes Friedlein

Reply via email to