Hi Till, Thanks for the reply! I see - yes it does sound very much like FLINK-1390.
Please see my AvroDeserializationSchema implementation here: http://pastebin.com/mK7SfBQ8 I think perhaps the problem is caused by this line: val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass) Looking at SpecificData, it contains a classCache which is a map of strings to classes, similar to what's described in FLINK-1390. I'm not sure how to change my AvroDeserializationSchema to prevent this from happening though! Do you have any ideas? Josh On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Josh, > > the error message you've posted usually indicates that there is a class > loader issue. When you first run your program the class > com.me.avro.MyAvroType will be first loaded (by the user code class > loader). I suspect that this class is now somewhere cached (e.g. the avro > serializer) and when you run your program a second time, then there is a > new user code class loader which has loaded the same class and now you want > to convert an instance of the first class into the second class. However, > these two classes are not identical since they were loaded by different > class loaders. > > In order to find the culprit, it would be helpful to see the full stack > trace of the ClassCastException and the code of the > AvroDeserializationSchema. I suspect that something similar to > https://issues.apache.org/jira/browse/FLINK-1390 is happening. > > Cheers, > Till > > On Wed, Jun 8, 2016 at 10:38 AM, Josh <jof...@gmail.com> wrote: > >> Hi all, >> >> Currently I have to relaunch my Flink cluster every time I want to >> upgrade/redeploy my Flink job, because otherwise I get a ClassCastException: >> >> java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to >> com.me.avro.MyAvroType >> >> It's related to MyAvroType which is an class generated from an Avro >> schema. The ClassCastException occurs every time I redeploy the job without >> killing the Flink cluster (even if there have been no changes to the >> job/jar). >> >> I wrote my own AvroDeserializationSchema in Scala which does something a >> little strange to get the avro type information (see below), and I'm >> wondering if that's causing the problem when the Flink job creates an >> AvroDeserializationSchema[MyAvroType]. >> >> Does anyone have any ideas? >> >> Thanks, >> Josh >> >> >> >> class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] >> extends DeserializationSchema[T] { >> >> ... >> >> private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]] >> >> private val typeInformation = TypeExtractor.getForClass(avroType) >> >> ... >> >> override def getProducedType: TypeInformation[T] = typeInformation >> >> ... >> >> } >> > >