Jawad Tahir created STORM-4000:
----------------------------------

             Summary: Processing late tuples from BaseWindowedBolt results in 
serialization exception
                 Key: STORM-4000
                 URL: https://issues.apache.org/jira/browse/STORM-4000
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-client
    Affects Versions: 2.5.0
         Environment: Ubuntu 22.04, OpenJDK 11, Apache Storm 2.5.0
            Reporter: Jawad Tahir


I am developing an Apache Storm (v2.5.0) topology that reads events from a 
spout ({_}BaseRichSpout{_}), counts the number of events in tumbling windows 
({_}BaseWindowedBolt{_}), and prints the count ({_}BaseRichBolt{_}). The 
topology works fine, but there are some out-of-order events in my dataset. The 
_BaseWindowedBolt_ provides _withLateTupleStream_ method to route late events 
to a separate stream. However, when I try to process late events, I get a 
serialization exception:
{code:java}
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IllegalArgumentException: Class is not registered: 
org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
Note: To register this class use: 
kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
Serialization trace:
defaultResources (org.apache.storm.task.WorkerTopologyContext)
context (org.apache.storm.tuple.TupleImpl)
    at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) 
~[kryo-4.0.2.jar:?]
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
 ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) 
~[kryo-4.0.2.jar:?]
    at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) 
~[kryo-4.0.2.jar:?]
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
 ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) 
~[kryo-4.0.2.jar:?]
    at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
 ~[kryo-4.0.2.jar:?]
    at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
 ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) 
~[kryo-4.0.2.jar:?]
    at 
org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38)
 ~[storm-client-2.5.0.jar:2.5.0]
    at 
org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40)
 ~[storm-client-2.5.0.jar:2.5.0]
    at 
org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613) 
~[storm-client-2.5.0.jar:2.5.0]
    at 
org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101)
 ~[storm-client-2.5.0.jar:2.5.0]
    at 
org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66)
 ~[storm-client-2.5.0.jar:2.5.0]
    at 
org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) 
~[storm-client-2.5.0.jar:2.5.0]
    at 
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112)
 ~[storm-client-2.5.0.jar:2.5.0]
    at 
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)
 ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) 
~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) 
~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) 
~[storm-client-2.5.0.jar:2.5.0]
    at 
org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313)
 ~[storm-client-2.5.0.jar:2.5.0]
    at 
org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212)
 ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.Executor.accept(Executor.java:294) 
~[storm-client-2.5.0.jar:2.5.0]
    ... 6 more{code}
I have defined my topology as below:
 
 
{code:java}
public class TestTopology {
    public static void main (String[] args) throws Exception {
        Config config = new Config();
        config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
        config.registerSerialization(TupleImpl.class);
        config.registerSerialization(WorkerTopologyContext.class);
        config.registerSerialization(Fields.class);
        LocalCluster cluster = new LocalCluster();

        try (LocalCluster.LocalTopology topology = 
cluster.submitTopology("testTopology", config, getTopology().createTopology())) 
{
            Thread.sleep(50000);}
        cluster.shutdown();
    }

    static TopologyBuilder getTopology(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("eventSpout", new LateEventSpout());
        builder.setBolt("windowBolt", new 
WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
                        withTimestampField("time").
                        withLateTupleStream("lateEvents")).
                        shuffleGrouping("eventSpout");
        builder.setBolt("latePrintBolt", new LatePrintBolt()).
                        shuffleGrouping("windowBolt", "lateEvents");
        builder.setBolt("printBolt", new 
PrintBolt()).shuffleGrouping("windowBolt");
        return builder;
    }
}{code}
Where `LateEventSpout` is
{code:java}
public class LateEventSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private List<Long> eventTimes;
    private int currentTime = 0;
    private int id = 1;

    public LateEventSpout () {
        eventTimes = new ArrayList<>();
        for (int i = 1; i<= 61; i++) {
            eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
        } // eventTimes = [epoch+1, epoch]
    }

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        int eventId = id++;
        Long eventTime = eventTimes.get(currentTime++);
        if (currentTime == eventTimes.size()){
            currentTime = 0;
        }
        collector.emit(new Values(eventId, eventTime));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "time"));
    }
} {code}
And `WindowBolt` is:
{code:java}
public class WindowBolt extends BaseWindowedBolt {

    OutputCollector collector;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, 
OutputCollector collector){
        this.collector = collector;
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        int sum = 0;
        for (Tuple event : inputWindow.get()){
            sum++;
        }
        collector.emit(new Values(inputWindow.getStartTimestamp(), 
inputWindow.getEndTimestamp(), sum));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("start", "end", "sum"));
    }
} {code}
 
And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is 
similar)
If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct 
results
 
{code:java}
public class PrintBolt extends BaseRichBolt {
    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, 
OutputCollector collector) {
    }
    
    @Override
    public void execute(Tuple input) {
        System.out.println(String.format("Start: %d, End: %d, Sum:%d", 
input.getLongByField("start"), input.getLongByField("end"), 
input.getIntegerByField("sum")));
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
} {code}
{code:java}
Start: 0, End: 10000, Sum:10
Start: 10000, End: 20000, Sum:10
Start: 20000, End: 30000, Sum:10
Start: 30000, End: 40000, Sum:10
Start: 40000, End: 50000, Sum:10
Start: 50000, End: 60000, Sum:10 {code}
 
However, when I try to print lateEvents stream, I get the same output but on 
the first late event, I get the above-mentioned exception.
 
I have debugged the issue. When 
[WindowedBoltExecutor|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313]
 receives a late tuple, it emits the late tuple but 
[BoltOutputCollectorImpl|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110]
  rewraps it in a new Tuple. Now, this new tuple contains 
_WorkerTopologyContext,_ which is not serializable, hence the error.
 
I would like to know how I can process the late tuples.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to