Hi Viraj, This does not solve the problem, as the class is not serializable at all. :( It isn't a serialization issue to begin with, as I am getting correct results when there are no late tuples in the systems. As soon as I receive my first late event, I get this exception.
Regards, Jawad Tahir. On Mon, 13 Nov 2023 at 15:22, v.s kadu <virajkadu...@gmail.com> wrote: > Hi Jawad, > I have encountered serialization error and i have resolved by setting > Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION to true > However I have not encountered such situation what you have. So won't be > able to help you in this. > > Regards, > Viraj > > On Mon, 13 Nov, 2023, 7:19 pm Jawad Tahir, <ranajawadta...@gmail.com> > wrote: > >> Hi Viraj, >> >> Thank you for your response. I have been through that page >> countless times, and if you see the code I provided, I am registering the >> required classes for serialization. The problem is that >> WorkerTopologyContext is not a serializable class. This question is geared >> towards the late tuple stream. WindowBoltExecutor >> <https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313> >> simply emits any late tuple, and BoltOutputCollectorImpl >> <https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110> >> rewraps that tuple into another Tuple. Under normal circumstances, this >> collector receives Values and wraps them in a Tuple, but in this case, it >> wraps a late tuple (which contains WorkerTopologyContext) in another tuple. >> Now, when it tries to serialize this tuple, I get the serialization error. >> >> Can you give me an example of the smallest possible working code on how >> to process late tuples? >> >> Regards, >> Jawad Tahir. >> >> On Sat, Nov 11, 2023, 04:40 v.s kadu <virajkadu...@gmail.com> wrote: >> >>> Hi Jawad, >>> Go through with following page >>> >>> https://storm.apache.org/releases/current/Serialization.html >>> >>> Regards, >>> Viraj Kadu >>> >>> On Sat, 11 Nov, 2023, 1:19 am Jawad Tahir, <ranajawadta...@gmail.com> >>> wrote: >>> >>>> Hi all, >>>> >>>> I am developing an Apache Storm (v2.5.0) topology that reads events >>>> from a spout (`BaseRichSpout`), counts the number of events in tumbling >>>> windows (`BaseWindowedBolt`), and prints the count (`BaseRichBolt`). The >>>> topology works fine, but there are some out-of-order events in my dataset. >>>> The BaseWindowedBolt provides withLateTupleStream method to route late >>>> events to a separate stream. However, when I try to process late events, I >>>> get a serialization exception: >>>> >>>> ``` >>>> `Caused by: com.esotericsoftware.kryo.KryoException: >>>> java.lang.IllegalArgumentException: Class is not registered: >>>> org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap >>>> Note: To register this class use: >>>> kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class); >>>> Serialization trace: >>>> defaultResources (org.apache.storm.task.WorkerTopologyContext) >>>> context (org.apache.storm.tuple.TupleImpl) >>>> at >>>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) >>>> ~[kryo-4.0.2.jar:?] >>>> at >>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) >>>> ~[kryo-4.0.2.jar:?] >>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) >>>> ~[kryo-4.0.2.jar:?] >>>> at >>>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) >>>> ~[kryo-4.0.2.jar:?] >>>> at >>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) >>>> ~[kryo-4.0.2.jar:?] >>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) >>>> ~[kryo-4.0.2.jar:?] >>>> at >>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) >>>> ~[kryo-4.0.2.jar:?] >>>> at >>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) >>>> ~[kryo-4.0.2.jar:?] >>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) >>>> ~[kryo-4.0.2.jar:?] >>>> at >>>> org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at >>>> org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at >>>> org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at >>>> org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at >>>> org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:it66) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at >>>> org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at >>>> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at >>>> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at >>>> org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at >>>> org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> at org.apache.storm.executor.Executor.accept(Executor.java:294) >>>> ~[storm-client-2.5.0.jar:2.5.0] >>>> ... 6 more` >>>> ``` >>>> >>>> I have defined my topology as below: >>>> >>>> ``` >>>> public class TestTopology { >>>> public static void main (String[] args) throws Exception { >>>> Config config = new Config(); >>>> config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true); >>>> config.registerSerialization(TupleImpl.class); >>>> config.registerSerialization(Fields.class); >>>> >>>> LocalCluster cluster = new LocalCluster(); >>>> try (LocalCluster.LocalTopology topology = >>>> cluster.submitTopology("testTopology", config, >>>> getTopology().createTopology())) { >>>> Thread.sleep(50000);} >>>> cluster.shutdown(); >>>> } >>>> static TopologyBuilder getTopology(){ >>>> TopologyBuilder builder = new TopologyBuilder(); >>>> builder.setSpout("eventSpout", new LateEventSpout()); >>>> builder.setBolt("windowBolt", new >>>> WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)). >>>> withTimestampField("time"). >>>> withLateTupleStream("lateEvents")). >>>> shuffleGrouping("eventSpout"); >>>> builder.setBolt("latePrintBolt", new LatePrintBolt()). >>>> shuffleGrouping("windowBolt", "lateEvents"); >>>> builder.setBolt("printBolt", new >>>> PrintBolt()).shuffleGrouping("windowBolt"); >>>> return builder; >>>> } >>>> } >>>> ``` >>>> Where `LateEventSpout` is >>>> >>>> ``` >>>> public class LateEventSpout extends BaseRichSpout { >>>> private SpoutOutputCollector collector; >>>> private List<Long> eventTimes; >>>> private int currentTime = 0; >>>> private int id = 1; >>>> public LateEventSpout () { >>>> eventTimes = new ArrayList<>(); >>>> for (int i = 1; i<= 61; i++) { >>>> eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli()); >>>> } // [epoch+1, epoch+2, .., epoch+61] >>>> } >>>> @Override >>>> public void open(Map<String, Object> conf, TopologyContext context, >>>> SpoutOutputCollector collector) { >>>> this.collector = collector; >>>> } >>>> @Override >>>> public void nextTuple() { >>>> int eventId = id++; >>>> Long eventTime = eventTimes.get(currentTime++); >>>> if (currentTime == eventTimes.size()){ >>>> currentTime = 0; // reset time to zero so we have OOO >>>> events >>>> } >>>> collector.emit(new Values(eventId, eventTime)); >>>> } >>>> @Override >>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>> declarer.declare(new Fields("id", "time")); >>>> } >>>> } >>>> ``` >>>> And `WindowBolt` is: >>>> >>>> ``` >>>> public class WindowBolt extends BaseWindowedBolt { >>>> OutputCollector collector; >>>> @Override >>>> public void prepare(Map<String, Object> topoConf, TopologyContext >>>> context, OutputCollector collector){ >>>> this.collector = collector; >>>> } >>>> @Override >>>> public void execute(TupleWindow inputWindow) { >>>> int sum = 0; >>>> for (Tuple event : inputWindow.get()){ >>>> sum++; >>>> } >>>> collector.emit(new Values(inputWindow.getStartTimestamp(), >>>> inputWindow.getEndTimestamp(), sum)); >>>> } >>>> @Override >>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>> declarer.declare(new Fields("start", "end", "sum")); >>>> } >>>> } >>>> ``` >>>> And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` >>>> is similar) >>>> >>>> ``` >>>> public class PrintBolt extends BaseRichBolt { >>>> @Override >>>> public void prepare(Map<String, Object> topoConf, TopologyContext >>>> context, OutputCollector collector) { >>>> } >>>> @Override >>>> public void execute(Tuple input) { >>>> System.out.println(String.format("Start: %d, End: %d, Sum:%d", >>>> input.getLongByField("start"), input.getLongByField("end"), >>>> input.getIntegerByField("sum"))); >>>> } >>>> @Override >>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>> } >>>> } >>>> ``` >>>> If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the >>>> correct results. >>>> >>>> ``` >>>> Start: 0, End: 10000, Sum:10 >>>> Start: 10000, End: 20000, Sum:10 >>>> Start: 20000, End: 30000, Sum:10 >>>> Start: 30000, End: 40000, Sum:10 >>>> Start: 40000, End: 50000, Sum:10 >>>> Start: 50000, End: 60000, Sum:10 >>>> ``` >>>> However, when I try to print lateEvents stream, I get the same output >>>> but on the first late event, I get the above-mentioned exception. >>>> >>>> I have debugged the issue. When [WindowedBoltExecutor]( >>>> https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313) >>>> receives a late tuple, it emits the late tuple but >>>> [BoltOutputCollectorImpl]( >>>> https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110) >>>> rewraps it in a new Tuple. Now, this new tuple contains >>>> `WorkerTopologyContext` which is not serializable, hence, the error. >>>> >>>> I would like to know how I can process the late tuples. >>>> >>>> Regards, >>>> Jawad Tahir. >>>> >>>