Hi Jawad, I have encountered serialization error and i have resolved by setting Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION to true However I have not encountered such situation what you have. So won't be able to help you in this.
Regards, Viraj On Mon, 13 Nov, 2023, 7:19 pm Jawad Tahir, <ranajawadta...@gmail.com> wrote: > Hi Viraj, > > Thank you for your response. I have been through that page > countless times, and if you see the code I provided, I am registering the > required classes for serialization. The problem is that > WorkerTopologyContext is not a serializable class. This question is geared > towards the late tuple stream. WindowBoltExecutor > <https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313> > simply emits any late tuple, and BoltOutputCollectorImpl > <https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110> > rewraps that tuple into another Tuple. Under normal circumstances, this > collector receives Values and wraps them in a Tuple, but in this case, it > wraps a late tuple (which contains WorkerTopologyContext) in another tuple. > Now, when it tries to serialize this tuple, I get the serialization error. > > Can you give me an example of the smallest possible working code on how to > process late tuples? > > Regards, > Jawad Tahir. > > On Sat, Nov 11, 2023, 04:40 v.s kadu <virajkadu...@gmail.com> wrote: > >> Hi Jawad, >> Go through with following page >> >> https://storm.apache.org/releases/current/Serialization.html >> >> Regards, >> Viraj Kadu >> >> On Sat, 11 Nov, 2023, 1:19 am Jawad Tahir, <ranajawadta...@gmail.com> >> wrote: >> >>> Hi all, >>> >>> I am developing an Apache Storm (v2.5.0) topology that reads events from >>> a spout (`BaseRichSpout`), counts the number of events in tumbling windows >>> (`BaseWindowedBolt`), and prints the count (`BaseRichBolt`). The topology >>> works fine, but there are some out-of-order events in my dataset. The >>> BaseWindowedBolt provides withLateTupleStream method to route late events >>> to a separate stream. However, when I try to process late events, I get a >>> serialization exception: >>> >>> ``` >>> `Caused by: com.esotericsoftware.kryo.KryoException: >>> java.lang.IllegalArgumentException: Class is not registered: >>> org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap >>> Note: To register this class use: >>> kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class); >>> Serialization trace: >>> defaultResources (org.apache.storm.task.WorkerTopologyContext) >>> context (org.apache.storm.tuple.TupleImpl) >>> at >>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) >>> ~[kryo-4.0.2.jar:?] >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) >>> ~[kryo-4.0.2.jar:?] >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) >>> ~[kryo-4.0.2.jar:?] >>> at >>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) >>> ~[kryo-4.0.2.jar:?] >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) >>> ~[kryo-4.0.2.jar:?] >>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) >>> ~[kryo-4.0.2.jar:?] >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) >>> ~[kryo-4.0.2.jar:?] >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) >>> ~[kryo-4.0.2.jar:?] >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) >>> ~[kryo-4.0.2.jar:?] >>> at >>> org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at >>> org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at >>> org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at >>> org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at >>> org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:it66) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at >>> org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at >>> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at >>> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at >>> org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at >>> org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> at org.apache.storm.executor.Executor.accept(Executor.java:294) >>> ~[storm-client-2.5.0.jar:2.5.0] >>> ... 6 more` >>> ``` >>> >>> I have defined my topology as below: >>> >>> ``` >>> public class TestTopology { >>> public static void main (String[] args) throws Exception { >>> Config config = new Config(); >>> config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true); >>> config.registerSerialization(TupleImpl.class); >>> config.registerSerialization(Fields.class); >>> >>> LocalCluster cluster = new LocalCluster(); >>> try (LocalCluster.LocalTopology topology = >>> cluster.submitTopology("testTopology", config, >>> getTopology().createTopology())) { >>> Thread.sleep(50000);} >>> cluster.shutdown(); >>> } >>> static TopologyBuilder getTopology(){ >>> TopologyBuilder builder = new TopologyBuilder(); >>> builder.setSpout("eventSpout", new LateEventSpout()); >>> builder.setBolt("windowBolt", new >>> WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)). >>> withTimestampField("time"). >>> withLateTupleStream("lateEvents")). >>> shuffleGrouping("eventSpout"); >>> builder.setBolt("latePrintBolt", new LatePrintBolt()). >>> shuffleGrouping("windowBolt", "lateEvents"); >>> builder.setBolt("printBolt", new >>> PrintBolt()).shuffleGrouping("windowBolt"); >>> return builder; >>> } >>> } >>> ``` >>> Where `LateEventSpout` is >>> >>> ``` >>> public class LateEventSpout extends BaseRichSpout { >>> private SpoutOutputCollector collector; >>> private List<Long> eventTimes; >>> private int currentTime = 0; >>> private int id = 1; >>> public LateEventSpout () { >>> eventTimes = new ArrayList<>(); >>> for (int i = 1; i<= 61; i++) { >>> eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli()); >>> } // [epoch+1, epoch+2, .., epoch+61] >>> } >>> @Override >>> public void open(Map<String, Object> conf, TopologyContext context, >>> SpoutOutputCollector collector) { >>> this.collector = collector; >>> } >>> @Override >>> public void nextTuple() { >>> int eventId = id++; >>> Long eventTime = eventTimes.get(currentTime++); >>> if (currentTime == eventTimes.size()){ >>> currentTime = 0; // reset time to zero so we have OOO >>> events >>> } >>> collector.emit(new Values(eventId, eventTime)); >>> } >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> declarer.declare(new Fields("id", "time")); >>> } >>> } >>> ``` >>> And `WindowBolt` is: >>> >>> ``` >>> public class WindowBolt extends BaseWindowedBolt { >>> OutputCollector collector; >>> @Override >>> public void prepare(Map<String, Object> topoConf, TopologyContext >>> context, OutputCollector collector){ >>> this.collector = collector; >>> } >>> @Override >>> public void execute(TupleWindow inputWindow) { >>> int sum = 0; >>> for (Tuple event : inputWindow.get()){ >>> sum++; >>> } >>> collector.emit(new Values(inputWindow.getStartTimestamp(), >>> inputWindow.getEndTimestamp(), sum)); >>> } >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> declarer.declare(new Fields("start", "end", "sum")); >>> } >>> } >>> ``` >>> And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is >>> similar) >>> >>> ``` >>> public class PrintBolt extends BaseRichBolt { >>> @Override >>> public void prepare(Map<String, Object> topoConf, TopologyContext >>> context, OutputCollector collector) { >>> } >>> @Override >>> public void execute(Tuple input) { >>> System.out.println(String.format("Start: %d, End: %d, Sum:%d", >>> input.getLongByField("start"), input.getLongByField("end"), >>> input.getIntegerByField("sum"))); >>> } >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> } >>> } >>> ``` >>> If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the >>> correct results. >>> >>> ``` >>> Start: 0, End: 10000, Sum:10 >>> Start: 10000, End: 20000, Sum:10 >>> Start: 20000, End: 30000, Sum:10 >>> Start: 30000, End: 40000, Sum:10 >>> Start: 40000, End: 50000, Sum:10 >>> Start: 50000, End: 60000, Sum:10 >>> ``` >>> However, when I try to print lateEvents stream, I get the same output >>> but on the first late event, I get the above-mentioned exception. >>> >>> I have debugged the issue. When [WindowedBoltExecutor]( >>> https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313) >>> receives a late tuple, it emits the late tuple but >>> [BoltOutputCollectorImpl]( >>> https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110) >>> rewraps it in a new Tuple. Now, this new tuple contains >>> `WorkerTopologyContext` which is not serializable, hence, the error. >>> >>> I would like to know how I can process the late tuples. >>> >>> Regards, >>> Jawad Tahir. >>> >>