Re: Not able to avoid Dynamic Class Loading

2021-09-22 Thread Kevin Lam
Sorry for the late reply here, I'm just returning to this now. Interesting re: the avro version, we're using 1.10.0 in our application jar. But maybe this is somehow being clobbered when we try to move it into /lib vs. /usrlib to avoid dynamic class loading. Is it possible that's happening? On

Re: Not able to avoid Dynamic Class Loading

2021-08-27 Thread Arvid Heise
I guess the best option is to attach a debugger and set a breakpoint at the NotSerializableException. There definitively has to be a non-serializable component in that FlinkKafkaConsumer and it can only come from the DeserializationSchema or Properties. Maybe the consumer internally caches some

Re: Not able to avoid Dynamic Class Loading

2021-08-27 Thread Kevin Lam
There's no inner classes, and none of the fields of DebeziumAvroRegistryDeserializationSchema have an Avro schema, even when expanded, including KafkaClusterConfig. KafkaClusterConfig is just composed of Strings and Booleans. DebeziumAvroRegistryDeserializationSchema has a field that initializes

Re: Not able to avoid Dynamic Class Loading

2021-08-27 Thread Arvid Heise
Without a real stacktrace, everything is a guess work. So please provide it together with your Flink version. It might be that some transition of DebeziumAvroRegistryDeserializationSchema (let's say open) will cause an illegal state where it's not serializable. On Thu, Aug 26, 2021 at 9:35 PM

Re: Not able to avoid Dynamic Class Loading

2021-08-26 Thread Kevin Lam
I also tested serializing an instance of `OurSource` with `org.apache.commons.lang3.SerializationUtils.clone` and it worked fine. On Thu, Aug 26, 2021 at 3:27 PM Kevin Lam wrote: > Hi Arvid, > > Got it, we don't use Avro.schema inside of > DebeziumAvroRegistryDeserializationSchema, but I tried

Re: Not able to avoid Dynamic Class Loading

2021-08-26 Thread Kevin Lam
Hi Arvid, Got it, we don't use Avro.schema inside of DebeziumAvroRegistryDeserializationSchema, but I tried to test it with a unit test and `org.apache.commons.lang3.SerializationUtils.clone` runs successfully. I'm curious as to why things work (are serializable) when we use dynamic

Re: Not able to avoid Dynamic Class Loading

2021-08-26 Thread Arvid Heise
Hi Kevin, the consumer needs to be serializable. Apparently you are also serializing the Avro schema (probably as part of your DebeziumAvroRegistryDeserializationSchema) and that fails. You may want to copy our SerializableAvroSchema [1] Make sure that everything is serializable. You can check

Re: Not able to avoid Dynamic Class Loading

2021-08-26 Thread Kevin Lam
Hi! We're using 1.13.1. We have a class in our user code that extends FlinkKafkaConsumer, that's built for reading avro records from Kafka. However it doesn't hold any Schema objects as fields so I'm a little confused. Something like this: ``` class OurSource[T <: ClassTag: TypeInformation:

Re: Not able to avoid Dynamic Class Loading

2021-08-25 Thread Caizhi Weng
Hi! What Flink version are you using? In current Flink code base FlinkKafkaConsumer does not contain fields related to Avro. Jars in usrlib has a higher priority to be loaded than jars in lib. So if there is another FlinkKafkaConsumer class in your user jar then it might affect class loading and

Not able to avoid Dynamic Class Loading

2021-08-25 Thread Kevin Lam
Hi all, I'm trying to avoid dynamic class loading my user code [0] due to a suspected classloading leak, but when I put my application jar into /lib instead of /usrlib, I run into the following error: ``` The main method caused an error: The implementation of the FlinkKafkaConsumer is not