@Giuliano: any updates? Very curious to figure out what's causing this. As Fabian said, this is most likely a class loading issue. Judging from the stack trace, you are not running with YARN but a standalone cluster. Is that correct? Class loading wise nothing changed between Flink 1.1 and Flink 1.2 with respect to class loading and standalone clusters. Did you put any JARs into the lib folder of Flink before submitting the job?
– Ufuk On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin <yuri.ruc...@gmail.com> wrote: > Hi, > > I'd like to chime in since I've faced the same issue running Flink 1.1.4. I > have a long-running YARN session which I use to run multiple streaming jobs > concurrently. Once after cancelling and resubmitting the job I saw the "X > cannot be cast to X" ClassCastException exception in logs. I restarted YARN > session, then the problem disappeared. > > The class that failed to be cast was autogenerated by Avro compiler. I know > that Avro's Java binding does caching schemas in some static WeakHashMap. > I'm wondering whether that may step in the way of Flink classloading design. > > Anyway, I would be interested in watching the issue in Flink JIRA. > > Giuliano, could you provide the issue number? > > Thanks, > Yury > > 2017-01-11 14:11 GMT+03:00 Fabian Hueske <fhue...@gmail.com>: >> >> Hi Guiliano, >> >> thanks for bringing up this issue. >> A "ClassCastException: X cannot be cast to X" often points to a >> classloader issue. >> So it might actually be a bug in Flink. >> >> I assume you submit the same application (same jar file) with the same >> command right? >> Did you cancel the job before resubmitting? >> >> Can you create a JIRA issue [1] for this bug (hit the read CREATE button >> on top) and include the commit hash from which you built Flink? >> It would be great if you could provide a short example program and >> instructions how to reproduce the problem. >> >> Thank you very much, >> Fabian >> >> [1] https://issues.apache.org/jira/browse/FLINK >> >> >> >> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari <giuliano.cali...@gmail.com>: >>> >>> Hello, >>> >>> >>> >>> I need some guidance on how to report a bug. >>> >>> >>> >>> I’m testing version 1.2 on my local cluster and the first time I submit >>> the job everything works but whenever I re-submit the same job it fails with >>> >>> org.apache.flink.client.program.ProgramInvocationException: The program >>> execution failed: Job execution failed. >>> >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) >>> >>> at >>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) >>> >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) >>> >>> at >>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) >>> >>> at >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634) >>> >>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) >>> >>> at >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) >>> >>> at >>> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21) >>> >>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34) >>> >>> at >>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >>> >>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>> >>> at scala.App$$anonfun$main$1.apply(App.scala:76) >>> >>> at scala.collection.immutable.List.foreach(List.scala:381) >>> >>> at >>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) >>> >>> at scala.App$class.main(App.scala:76) >>> >>> at >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21) >>> >>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) >>> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) >>> >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) >>> >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) >>> >>> at >>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) >>> >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) >>> >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) >>> >>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) >>> >>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) >>> >>> at >>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29) >>> >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) >>> >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>> execution failed. >>> >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) >>> >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) >>> >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) >>> >>> at >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>> >>> at >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>> >>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >>> >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>> >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> Caused by: java.lang.RuntimeException: Could not forward element to next >>> operator >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:397) >>> >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:749) >>> >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>> >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:255) >>> >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>> >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) >>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Caused by: java.lang.ClassCastException: >>> au.com.my.package.schema.p.WowTransaction cannot be cast to >>> au.com.my.package.schema.p.WowTransaction >>> >>> at >>> au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply(Traitor.scala:132) >>> >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:763) >>> >>> at >>> org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72) >>> >>> at >>> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:65) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:412) >>> >>> ... 14 more >>> >>> >>> I'm running a flink cluster built from the "release-1.2" branch on >>> github. >>> >>> >>> How can I validate that this is a Flink big? >>> >>> Where can I report this? >>> >>> What sort of information do I need to provide? >>> >>> >>> >>> Cheers, >>> >>> Giuliano Caliari >>> -- >>> -- >>> Giuliano Caliari (+55 11 984898464) >>> +Google >>> Twitter >>> >>> Master Software Engineer by Escola Politécnica da USP >>> Bachelor in Computer Science by Instituto de Matemática e Estatística da >>> USP >>> >> >