Hi Viraj,

This does not solve the problem, as the class is not serializable at all. :(
It isn't a serialization issue to begin with, as I am getting correct
results when there are no late tuples in the systems. As soon as I receive
my first late event, I get this exception.

Regards,
Jawad Tahir.


On Mon, 13 Nov 2023 at 15:22, v.s kadu <virajkadu...@gmail.com> wrote:

> Hi Jawad,
> I have encountered serialization error and i have resolved by setting
> Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION to true
> However I have not encountered such situation what you have. So won't be
> able to help you in this.
>
> Regards,
> Viraj
>
> On Mon, 13 Nov, 2023, 7:19 pm Jawad Tahir, <ranajawadta...@gmail.com>
> wrote:
>
>> Hi Viraj,
>>
>> Thank you for your response. I have been through that page
>> countless times, and if you see the code I provided, I am registering the
>> required classes for serialization. The problem is that
>> WorkerTopologyContext is not a serializable class. This question is geared
>> towards the late tuple stream. WindowBoltExecutor
>> <https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313>
>> simply emits any late tuple, and BoltOutputCollectorImpl
>> <https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110>
>> rewraps that tuple into another Tuple. Under normal circumstances, this
>> collector receives Values and wraps them in a Tuple, but in this case, it
>> wraps a late tuple (which contains WorkerTopologyContext) in another tuple.
>> Now, when it tries to serialize this tuple, I get the serialization error.
>>
>> Can you give me an example of the smallest possible working code on how
>> to process late tuples?
>>
>> Regards,
>> Jawad Tahir.
>>
>> On Sat, Nov 11, 2023, 04:40 v.s kadu <virajkadu...@gmail.com> wrote:
>>
>>> Hi Jawad,
>>> Go through with following page
>>>
>>> https://storm.apache.org/releases/current/Serialization.html
>>>
>>> Regards,
>>> Viraj Kadu
>>>
>>> On Sat, 11 Nov, 2023, 1:19 am Jawad Tahir, <ranajawadta...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am developing an Apache Storm (v2.5.0) topology that reads events
>>>> from a spout (`BaseRichSpout`), counts the number of events in tumbling
>>>> windows (`BaseWindowedBolt`), and prints the count (`BaseRichBolt`). The
>>>> topology works fine, but there are some out-of-order events in my dataset.
>>>> The BaseWindowedBolt provides withLateTupleStream method to route late
>>>> events to a separate stream. However, when I try to process late events, I
>>>> get a serialization exception:
>>>>
>>>> ```
>>>> `Caused by: com.esotericsoftware.kryo.KryoException:
>>>> java.lang.IllegalArgumentException: Class is not registered:
>>>> org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
>>>> Note: To register this class use:
>>>> kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
>>>> Serialization trace:
>>>> defaultResources (org.apache.storm.task.WorkerTopologyContext)
>>>> context (org.apache.storm.tuple.TupleImpl)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
>>>> ~[kryo-4.0.2.jar:?]
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
>>>> ~[kryo-4.0.2.jar:?]
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
>>>> ~[kryo-4.0.2.jar:?]
>>>> at
>>>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
>>>> ~[kryo-4.0.2.jar:?]
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
>>>> ~[kryo-4.0.2.jar:?]
>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
>>>> ~[kryo-4.0.2.jar:?]
>>>> at
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
>>>> ~[kryo-4.0.2.jar:?]
>>>> at
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
>>>> ~[kryo-4.0.2.jar:?]
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557)
>>>> ~[kryo-4.0.2.jar:?]
>>>> at
>>>> org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at
>>>> org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at
>>>> org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at
>>>> org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at
>>>> org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:it66)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at
>>>> org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at
>>>> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at
>>>> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at
>>>> org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at
>>>> org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> at org.apache.storm.executor.Executor.accept(Executor.java:294)
>>>> ~[storm-client-2.5.0.jar:2.5.0]
>>>> ... 6 more`
>>>> ```
>>>>
>>>> I have defined my topology as below:
>>>>
>>>> ```
>>>> public class TestTopology {
>>>>     public static void main (String[] args) throws Exception {
>>>>         Config config = new Config();
>>>>         config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
>>>>         config.registerSerialization(TupleImpl.class);
>>>>         config.registerSerialization(Fields.class);
>>>>
>>>>         LocalCluster cluster = new LocalCluster();
>>>>         try (LocalCluster.LocalTopology topology =
>>>> cluster.submitTopology("testTopology", config,
>>>> getTopology().createTopology())) {
>>>>             Thread.sleep(50000);}
>>>>         cluster.shutdown();
>>>>     }
>>>>     static TopologyBuilder getTopology(){
>>>>         TopologyBuilder builder = new TopologyBuilder();
>>>>         builder.setSpout("eventSpout", new LateEventSpout());
>>>>         builder.setBolt("windowBolt", new
>>>> WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
>>>>                         withTimestampField("time").
>>>>                         withLateTupleStream("lateEvents")).
>>>>                         shuffleGrouping("eventSpout");
>>>>         builder.setBolt("latePrintBolt", new LatePrintBolt()).
>>>>                         shuffleGrouping("windowBolt", "lateEvents");
>>>>         builder.setBolt("printBolt", new
>>>> PrintBolt()).shuffleGrouping("windowBolt");
>>>>         return builder;
>>>>     }
>>>> }
>>>> ```
>>>> Where `LateEventSpout` is
>>>>
>>>> ```
>>>> public class LateEventSpout extends BaseRichSpout {
>>>>     private SpoutOutputCollector collector;
>>>>     private List<Long> eventTimes;
>>>>     private int currentTime = 0;
>>>>     private int id = 1;
>>>>     public LateEventSpout () {
>>>>         eventTimes = new ArrayList<>();
>>>>         for (int i = 1; i<= 61; i++) {
>>>>             eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
>>>>         } // [epoch+1, epoch+2, .., epoch+61]
>>>>     }
>>>>     @Override
>>>>     public void open(Map<String, Object> conf, TopologyContext context,
>>>> SpoutOutputCollector collector) {
>>>>         this.collector = collector;
>>>>     }
>>>>     @Override
>>>>     public void nextTuple() {
>>>>         int eventId = id++;
>>>>         Long eventTime = eventTimes.get(currentTime++);
>>>>         if (currentTime == eventTimes.size()){
>>>>             currentTime = 0;      // reset time to zero so we have OOO
>>>> events
>>>>         }
>>>>         collector.emit(new Values(eventId, eventTime));
>>>>     }
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>         declarer.declare(new Fields("id", "time"));
>>>>     }
>>>> }
>>>> ```
>>>> And `WindowBolt` is:
>>>>
>>>> ```
>>>> public class WindowBolt extends BaseWindowedBolt {
>>>>     OutputCollector collector;
>>>>     @Override
>>>>     public void prepare(Map<String, Object> topoConf, TopologyContext
>>>> context, OutputCollector collector){
>>>>         this.collector = collector;
>>>>     }
>>>>     @Override
>>>>     public void execute(TupleWindow inputWindow) {
>>>>         int sum = 0;
>>>>         for (Tuple event : inputWindow.get()){
>>>>             sum++;
>>>>         }
>>>>         collector.emit(new Values(inputWindow.getStartTimestamp(),
>>>> inputWindow.getEndTimestamp(), sum));
>>>>     }
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>         declarer.declare(new Fields("start", "end", "sum"));
>>>>     }
>>>> }
>>>> ```
>>>> And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt`
>>>> is similar)
>>>>
>>>> ```
>>>> public class PrintBolt extends BaseRichBolt {
>>>>     @Override
>>>>     public void prepare(Map<String, Object> topoConf, TopologyContext
>>>> context, OutputCollector collector) {
>>>>     }
>>>>     @Override
>>>>     public void execute(Tuple input) {
>>>>         System.out.println(String.format("Start: %d, End: %d, Sum:%d",
>>>> input.getLongByField("start"), input.getLongByField("end"),
>>>> input.getIntegerByField("sum")));
>>>>     }
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>     }
>>>> }
>>>> ```
>>>> If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the
>>>> correct results.
>>>>
>>>> ```
>>>> Start: 0, End: 10000, Sum:10
>>>> Start: 10000, End: 20000, Sum:10
>>>> Start: 20000, End: 30000, Sum:10
>>>> Start: 30000, End: 40000, Sum:10
>>>> Start: 40000, End: 50000, Sum:10
>>>> Start: 50000, End: 60000, Sum:10
>>>> ```
>>>> However, when I try to print lateEvents stream, I get the same output
>>>> but on the first late event, I get the above-mentioned exception.
>>>>
>>>> I have debugged the issue. When [WindowedBoltExecutor](
>>>> https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313)
>>>> receives a late tuple, it emits the late tuple but
>>>> [BoltOutputCollectorImpl](
>>>> https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110)
>>>> rewraps it in a new Tuple. Now, this new tuple contains
>>>> `WorkerTopologyContext` which is not serializable, hence, the error.
>>>>
>>>> I would like to know how I can process the late tuples.
>>>>
>>>> Regards,
>>>> Jawad Tahir.
>>>>
>>>

Reply via email to