Great to hear :-)

On Wed, Jun 8, 2016 at 7:45 PM, Josh <jof...@gmail.com> wrote:

> Thanks Till, your suggestion worked!
>
> I actually just created a new SpecificData for each
> AvroDeserializationSchema instance, so I think it's still just as efficient.
>
> Josh
>
> On Wed, Jun 8, 2016 at 4:41 PM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> The only thing I could think of is to not use the SpecificData singleton
>> but instead creating a new SpecificData object for each SpecificDatumReader
>> (you can pass it as a third argument to the constructor). This, of course,
>> is not really efficient. But you could try it out to see whether it solves
>> your problem.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 8, 2016 at 4:24 PM, Josh <jof...@gmail.com> wrote:
>>
>>> Sorry - I forgot to include my stack trace too. Here it is:
>>>
>>> The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>>> at
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536)
>>> at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
>>> at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.lang.Exception: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
>>> at
>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>> at
>>> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerThread.run(ShardConsumerThread.java:141)
>>> Caused by: java.lang.ClassCastException: com.me.avro.MyAvroType cannot
>>> be cast to com.me.avro.MyAvroType
>>> at com.me.flink.job.MyFlinkJob$$anonfun$1.apply(MyFlinkJob.scala:61)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:746)
>>> at
>>> org.apache.flink.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:71)
>>> at
>>> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:63)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>>> ... 3 more
>>>
>>> On Wed, Jun 8, 2016 at 3:19 PM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> Thanks for the reply! I see - yes it does sound very much like
>>>> FLINK-1390.
>>>>
>>>> Please see my AvroDeserializationSchema implementation here:
>>>> http://pastebin.com/mK7SfBQ8
>>>>
>>>> I think perhaps the problem is caused by this line:
>>>> val readerSchema = SpecificData.get().getSchema(classTag[T].
>>>> runtimeClass)
>>>>
>>>> Looking at SpecificData, it contains a classCache which is a map of
>>>> strings to classes, similar to what's described in FLINK-1390.
>>>>
>>>> I'm not sure how to change my AvroDeserializationSchema to prevent this
>>>> from happening though! Do you have any ideas?
>>>>
>>>> Josh
>>>>
>>>>
>>>>
>>>> On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Josh,
>>>>>
>>>>> the error message you've posted usually indicates that there is a
>>>>> class loader issue. When you first run your program the class
>>>>> com.me.avro.MyAvroType will be first loaded (by the user code class
>>>>> loader). I suspect that this class is now somewhere cached (e.g. the avro
>>>>> serializer) and when you run your program a second time, then there is a
>>>>> new user code class loader which has loaded the same class and now you 
>>>>> want
>>>>> to convert an instance of the first class into the second class. However,
>>>>> these two classes are not identical since they were loaded by different
>>>>> class loaders.
>>>>>
>>>>> In order to find the culprit, it would be helpful to see the full
>>>>> stack trace of the ClassCastException and the code of the
>>>>> AvroDeserializationSchema. I suspect that something similar to
>>>>> https://issues.apache.org/jira/browse/FLINK-1390 is happening.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Jun 8, 2016 at 10:38 AM, Josh <jof...@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Currently I have to relaunch my Flink cluster every time I want to
>>>>>> upgrade/redeploy my Flink job, because otherwise I get a 
>>>>>> ClassCastException:
>>>>>>
>>>>>> java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast
>>>>>> to com.me.avro.MyAvroType
>>>>>>
>>>>>> It's related to MyAvroType which is an class generated from an Avro
>>>>>> schema. The ClassCastException occurs every time I redeploy the job 
>>>>>> without
>>>>>> killing the Flink cluster (even if there have been no changes to the
>>>>>> job/jar).
>>>>>>
>>>>>> I wrote my own AvroDeserializationSchema in Scala which does
>>>>>> something a little strange to get the avro type information (see below),
>>>>>> and I'm wondering if that's causing the problem when the Flink job 
>>>>>> creates
>>>>>> an AvroDeserializationSchema[MyAvroType].
>>>>>>
>>>>>> Does anyone have any ideas?
>>>>>>
>>>>>> Thanks,
>>>>>> Josh
>>>>>>
>>>>>>
>>>>>>
>>>>>> class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag]
>>>>>> extends DeserializationSchema[T] {
>>>>>>
>>>>>>   ...
>>>>>>
>>>>>>   private val avroType =
>>>>>> classTag[T].runtimeClass.asInstanceOf[Class[T]]
>>>>>>
>>>>>>   private val typeInformation = TypeExtractor.getForClass(avroType)
>>>>>>
>>>>>>   ...
>>>>>>
>>>>>>   override def getProducedType: TypeInformation[T] = typeInformation
>>>>>>
>>>>>>   ...
>>>>>>
>>>>>> }
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to