Hi Josh,

the error message you've posted usually indicates that there is a class
loader issue. When you first run your program the class
com.me.avro.MyAvroType will be first loaded (by the user code class
loader). I suspect that this class is now somewhere cached (e.g. the avro
serializer) and when you run your program a second time, then there is a
new user code class loader which has loaded the same class and now you want
to convert an instance of the first class into the second class. However,
these two classes are not identical since they were loaded by different
class loaders.

In order to find the culprit, it would be helpful to see the full stack
trace of the ClassCastException and the code of the
AvroDeserializationSchema. I suspect that something similar to
https://issues.apache.org/jira/browse/FLINK-1390 is happening.

Cheers,
Till

On Wed, Jun 8, 2016 at 10:38 AM, Josh <jof...@gmail.com> wrote:

> Hi all,
>
> Currently I have to relaunch my Flink cluster every time I want to
> upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:
>
> java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to
> com.me.avro.MyAvroType
>
> It's related to MyAvroType which is an class generated from an Avro
> schema. The ClassCastException occurs every time I redeploy the job without
> killing the Flink cluster (even if there have been no changes to the
> job/jar).
>
> I wrote my own AvroDeserializationSchema in Scala which does something a
> little strange to get the avro type information (see below), and I'm
> wondering if that's causing the problem when the Flink job creates an
> AvroDeserializationSchema[MyAvroType].
>
> Does anyone have any ideas?
>
> Thanks,
> Josh
>
>
>
> class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] extends
> DeserializationSchema[T] {
>
>   ...
>
>   private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]]
>
>   private val typeInformation = TypeExtractor.getForClass(avroType)
>
>   ...
>
>   override def getProducedType: TypeInformation[T] = typeInformation
>
>   ...
>
> }
>

Reply via email to