Hi Jawad,
I have encountered serialization error and i have resolved by setting
Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION to true
However I have not encountered such situation what you have. So won't be
able to help you in this.

Regards,
Viraj

On Mon, 13 Nov, 2023, 7:19 pm Jawad Tahir, <ranajawadta...@gmail.com> wrote:

> Hi Viraj,
>
> Thank you for your response. I have been through that page
> countless times, and if you see the code I provided, I am registering the
> required classes for serialization. The problem is that
> WorkerTopologyContext is not a serializable class. This question is geared
> towards the late tuple stream. WindowBoltExecutor
> <https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313>
> simply emits any late tuple, and BoltOutputCollectorImpl
> <https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110>
> rewraps that tuple into another Tuple. Under normal circumstances, this
> collector receives Values and wraps them in a Tuple, but in this case, it
> wraps a late tuple (which contains WorkerTopologyContext) in another tuple.
> Now, when it tries to serialize this tuple, I get the serialization error.
>
> Can you give me an example of the smallest possible working code on how to
> process late tuples?
>
> Regards,
> Jawad Tahir.
>
> On Sat, Nov 11, 2023, 04:40 v.s kadu <virajkadu...@gmail.com> wrote:
>
>> Hi Jawad,
>> Go through with following page
>>
>> https://storm.apache.org/releases/current/Serialization.html
>>
>> Regards,
>> Viraj Kadu
>>
>> On Sat, 11 Nov, 2023, 1:19 am Jawad Tahir, <ranajawadta...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I am developing an Apache Storm (v2.5.0) topology that reads events from
>>> a spout (`BaseRichSpout`), counts the number of events in tumbling windows
>>> (`BaseWindowedBolt`), and prints the count (`BaseRichBolt`). The topology
>>> works fine, but there are some out-of-order events in my dataset. The
>>> BaseWindowedBolt provides withLateTupleStream method to route late events
>>> to a separate stream. However, when I try to process late events, I get a
>>> serialization exception:
>>>
>>> ```
>>> `Caused by: com.esotericsoftware.kryo.KryoException:
>>> java.lang.IllegalArgumentException: Class is not registered:
>>> org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
>>> Note: To register this class use:
>>> kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
>>> Serialization trace:
>>> defaultResources (org.apache.storm.task.WorkerTopologyContext)
>>> context (org.apache.storm.tuple.TupleImpl)
>>> at
>>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
>>> ~[kryo-4.0.2.jar:?]
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
>>> ~[kryo-4.0.2.jar:?]
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
>>> ~[kryo-4.0.2.jar:?]
>>> at
>>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
>>> ~[kryo-4.0.2.jar:?]
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
>>> ~[kryo-4.0.2.jar:?]
>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
>>> ~[kryo-4.0.2.jar:?]
>>> at
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
>>> ~[kryo-4.0.2.jar:?]
>>> at
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
>>> ~[kryo-4.0.2.jar:?]
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557)
>>> ~[kryo-4.0.2.jar:?]
>>> at
>>> org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at
>>> org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at
>>> org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at
>>> org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at
>>> org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:it66)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at
>>> org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at
>>> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at
>>> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at
>>> org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at
>>> org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> at org.apache.storm.executor.Executor.accept(Executor.java:294)
>>> ~[storm-client-2.5.0.jar:2.5.0]
>>> ... 6 more`
>>> ```
>>>
>>> I have defined my topology as below:
>>>
>>> ```
>>> public class TestTopology {
>>>     public static void main (String[] args) throws Exception {
>>>         Config config = new Config();
>>>         config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
>>>         config.registerSerialization(TupleImpl.class);
>>>         config.registerSerialization(Fields.class);
>>>
>>>         LocalCluster cluster = new LocalCluster();
>>>         try (LocalCluster.LocalTopology topology =
>>> cluster.submitTopology("testTopology", config,
>>> getTopology().createTopology())) {
>>>             Thread.sleep(50000);}
>>>         cluster.shutdown();
>>>     }
>>>     static TopologyBuilder getTopology(){
>>>         TopologyBuilder builder = new TopologyBuilder();
>>>         builder.setSpout("eventSpout", new LateEventSpout());
>>>         builder.setBolt("windowBolt", new
>>> WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
>>>                         withTimestampField("time").
>>>                         withLateTupleStream("lateEvents")).
>>>                         shuffleGrouping("eventSpout");
>>>         builder.setBolt("latePrintBolt", new LatePrintBolt()).
>>>                         shuffleGrouping("windowBolt", "lateEvents");
>>>         builder.setBolt("printBolt", new
>>> PrintBolt()).shuffleGrouping("windowBolt");
>>>         return builder;
>>>     }
>>> }
>>> ```
>>> Where `LateEventSpout` is
>>>
>>> ```
>>> public class LateEventSpout extends BaseRichSpout {
>>>     private SpoutOutputCollector collector;
>>>     private List<Long> eventTimes;
>>>     private int currentTime = 0;
>>>     private int id = 1;
>>>     public LateEventSpout () {
>>>         eventTimes = new ArrayList<>();
>>>         for (int i = 1; i<= 61; i++) {
>>>             eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
>>>         } // [epoch+1, epoch+2, .., epoch+61]
>>>     }
>>>     @Override
>>>     public void open(Map<String, Object> conf, TopologyContext context,
>>> SpoutOutputCollector collector) {
>>>         this.collector = collector;
>>>     }
>>>     @Override
>>>     public void nextTuple() {
>>>         int eventId = id++;
>>>         Long eventTime = eventTimes.get(currentTime++);
>>>         if (currentTime == eventTimes.size()){
>>>             currentTime = 0;      // reset time to zero so we have OOO
>>> events
>>>         }
>>>         collector.emit(new Values(eventId, eventTime));
>>>     }
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         declarer.declare(new Fields("id", "time"));
>>>     }
>>> }
>>> ```
>>> And `WindowBolt` is:
>>>
>>> ```
>>> public class WindowBolt extends BaseWindowedBolt {
>>>     OutputCollector collector;
>>>     @Override
>>>     public void prepare(Map<String, Object> topoConf, TopologyContext
>>> context, OutputCollector collector){
>>>         this.collector = collector;
>>>     }
>>>     @Override
>>>     public void execute(TupleWindow inputWindow) {
>>>         int sum = 0;
>>>         for (Tuple event : inputWindow.get()){
>>>             sum++;
>>>         }
>>>         collector.emit(new Values(inputWindow.getStartTimestamp(),
>>> inputWindow.getEndTimestamp(), sum));
>>>     }
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         declarer.declare(new Fields("start", "end", "sum"));
>>>     }
>>> }
>>> ```
>>> And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is
>>> similar)
>>>
>>> ```
>>> public class PrintBolt extends BaseRichBolt {
>>>     @Override
>>>     public void prepare(Map<String, Object> topoConf, TopologyContext
>>> context, OutputCollector collector) {
>>>     }
>>>     @Override
>>>     public void execute(Tuple input) {
>>>         System.out.println(String.format("Start: %d, End: %d, Sum:%d",
>>> input.getLongByField("start"), input.getLongByField("end"),
>>> input.getIntegerByField("sum")));
>>>     }
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>     }
>>> }
>>> ```
>>> If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the
>>> correct results.
>>>
>>> ```
>>> Start: 0, End: 10000, Sum:10
>>> Start: 10000, End: 20000, Sum:10
>>> Start: 20000, End: 30000, Sum:10
>>> Start: 30000, End: 40000, Sum:10
>>> Start: 40000, End: 50000, Sum:10
>>> Start: 50000, End: 60000, Sum:10
>>> ```
>>> However, when I try to print lateEvents stream, I get the same output
>>> but on the first late event, I get the above-mentioned exception.
>>>
>>> I have debugged the issue. When [WindowedBoltExecutor](
>>> https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313)
>>> receives a late tuple, it emits the late tuple but
>>> [BoltOutputCollectorImpl](
>>> https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110)
>>> rewraps it in a new Tuple. Now, this new tuple contains
>>> `WorkerTopologyContext` which is not serializable, hence, the error.
>>>
>>> I would like to know how I can process the late tuples.
>>>
>>> Regards,
>>> Jawad Tahir.
>>>
>>

Reply via email to