Hi Till,

Thanks for the reply! I see - yes it does sound very much like FLINK-1390.

Please see my AvroDeserializationSchema implementation here:
http://pastebin.com/mK7SfBQ8

I think perhaps the problem is caused by this line:
val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass)

Looking at SpecificData, it contains a classCache which is a map of strings
to classes, similar to what's described in FLINK-1390.

I'm not sure how to change my AvroDeserializationSchema to prevent this
from happening though! Do you have any ideas?

Josh



On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Josh,
>
> the error message you've posted usually indicates that there is a class
> loader issue. When you first run your program the class
> com.me.avro.MyAvroType will be first loaded (by the user code class
> loader). I suspect that this class is now somewhere cached (e.g. the avro
> serializer) and when you run your program a second time, then there is a
> new user code class loader which has loaded the same class and now you want
> to convert an instance of the first class into the second class. However,
> these two classes are not identical since they were loaded by different
> class loaders.
>
> In order to find the culprit, it would be helpful to see the full stack
> trace of the ClassCastException and the code of the
> AvroDeserializationSchema. I suspect that something similar to
> https://issues.apache.org/jira/browse/FLINK-1390 is happening.
>
> Cheers,
> Till
>
> On Wed, Jun 8, 2016 at 10:38 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi all,
>>
>> Currently I have to relaunch my Flink cluster every time I want to
>> upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:
>>
>> java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to
>> com.me.avro.MyAvroType
>>
>> It's related to MyAvroType which is an class generated from an Avro
>> schema. The ClassCastException occurs every time I redeploy the job without
>> killing the Flink cluster (even if there have been no changes to the
>> job/jar).
>>
>> I wrote my own AvroDeserializationSchema in Scala which does something a
>> little strange to get the avro type information (see below), and I'm
>> wondering if that's causing the problem when the Flink job creates an
>> AvroDeserializationSchema[MyAvroType].
>>
>> Does anyone have any ideas?
>>
>> Thanks,
>> Josh
>>
>>
>>
>> class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag]
>> extends DeserializationSchema[T] {
>>
>>   ...
>>
>>   private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]]
>>
>>   private val typeInformation = TypeExtractor.getForClass(avroType)
>>
>>   ...
>>
>>   override def getProducedType: TypeInformation[T] = typeInformation
>>
>>   ...
>>
>> }
>>
>
>

Reply via email to