Hi Alex, Is it possible that the data has been corrupted?
Or have you confirmed that the avro version is consistent in different Flink versions? Also, if you don't upgrade Flink and still use version 1.3.1, can it be recovered? Thanks, vino. 2018-07-25 8:32 GMT+08:00 Alex Vinnik <[email protected]>: > Vino, > > Upgraded flink to Hadoop 2.8.1 > > $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep > entrypoint | grep 'Hadoop version' > 2018-07-25T00:19:46.142+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Hadoop version: 2.8.1 > > but job still fails to start > > Ideas? > > Caused by: org.apache.flink.util.FlinkException: Failed to submit job > d84cccd3bffcba1f243352a5e5ef99a9. > at org.apache.flink.runtime.dispatcher.Dispatcher. > submitJob(Dispatcher.java:254) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( > AkkaRpcActor.java:247) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor. > handleRpcMessage(AkkaRpcActor.java:162) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage( > FencedAkkaRpcActor.java:70) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive( > AkkaRpcActor.java:142) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor. > onReceive(FencedAkkaRpcActor.java:40) > at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse( > UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > ... 4 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not set up JobManager > at org.apache.flink.runtime.jobmaster.JobManagerRunner.< > init>(JobManagerRunner.java:169) > at org.apache.flink.runtime.dispatcher.Dispatcher$ > DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885) > at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner( > Dispatcher.java:287) > at org.apache.flink.runtime.dispatcher.Dispatcher.runJob( > Dispatcher.java:277) > at org.apache.flink.runtime.dispatcher.Dispatcher. > persistAndRunJob(Dispatcher.java:262) > at org.apache.flink.runtime.dispatcher.Dispatcher. > submitJob(Dispatcher.java:249) > ... 21 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce. > HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat > (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) > failed: unread block data > at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. > buildGraph(ExecutionGraphBuilder.java:220) > at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. > buildGraph(ExecutionGraphBuilder.java:100) > at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph( > JobMaster.java:1150) > at org.apache.flink.runtime.jobmaster.JobMaster. > createAndRestoreExecutionGraph(JobMaster.java:1130) > at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298) > at org.apache.flink.runtime.jobmaster.JobManagerRunner.< > init>(JobManagerRunner.java:151) > ... 26 more > Caused by: java.lang.Exception: Deserializing the OutputFormat > (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) > failed: unread block data > at org.apache.flink.runtime.jobgraph.OutputFormatVertex. > initializeOnMaster(OutputFormatVertex.java:63) > at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. > buildGraph(ExecutionGraphBuilder.java:216) > ... 31 more > Caused by: java.lang.IllegalStateException: unread block data > at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode( > ObjectInputStream.java:2781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2285) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2067) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:488) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:475) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:463) > at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( > InstantiationUtil.java:424) > at org.apache.flink.runtime.operators.util.TaskConfig. > getStubWrapper(TaskConfig.java:288) > at org.apache.flink.runtime.jobgraph.OutputFormatVertex. > initializeOnMaster(OutputFormatVertex.java:60) > ... 32 more > > > On Tue, Jul 24, 2018 at 10:32 AM vino yang <[email protected]> wrote: > >> Hi Alex, >> >> Based on your log information, the potential reason is Hadoop version. To >> troubleshoot the exception comes from different Hadoop version. I suggest >> you match the both side of Hadoop version. >> >> You can : >> >> 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's >> official website provides the binary binding Hadoop 2.8.[1] >> 2. downgrade your fat jar's Hadoop client dependency's version to match >> Flink Cluster's hadoop dependency's version. >> >> [1]: http://www.apache.org/dyn/closer.lua/flink/flink-1. >> 5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz >> >> Thanks, vino. >> >> 2018-07-24 22:59 GMT+08:00 Alex Vinnik <[email protected]>: >> >>> Hi Till, >>> >>> Thanks for responding. Below is entrypoint logs. One thing I noticed >>> that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. >>> Could it be a reason for that error? If so how can i use same hadoop >>> version 2.8 on flink server side? BTW job runs fine locally reading from >>> the same s3a buckets when executed using createLocalEnvironment via java >>> -jar my-fat.jar --input s3a://foo --output s3a://bar >>> >>> Regarding java version. The job is submitted via Flink UI, so it should >>> not be a problem. >>> >>> Thanks a lot in advance. >>> >>> 2018-07-24T12:09:38.083+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO ------------------------------------------------------------ >>> -------------------- >>> 2018-07-24T12:09:38.085+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, >>> Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC) >>> 2018-07-24T12:09:38.085+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO OS current user: flink >>> 2018-07-24T12:09:38.844+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Current Hadoop/Kerberos user: flink >>> 2018-07-24T12:09:38.844+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11 >>> 2018-07-24T12:09:38.844+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Maximum heap size: 1963 MiBytes >>> 2018-07-24T12:09:38.844+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO JAVA_HOME: /docker-java-home/jre >>> 2018-07-24T12:09:38.851+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Hadoop version: 2.7.3 >>> 2018-07-24T12:09:38.851+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO JVM Options: >>> 2018-07-24T12:09:38.851+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO -Xms2048m >>> 2018-07-24T12:09:38.851+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO -Xmx2048m >>> 2018-07-24T12:09:38.851+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk. >>> disableCertChecking >>> 2018-07-24T12:09:38.851+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO -Dcom.amazonaws.sdk.disableCertChecking >>> 2018-07-24T12:09:38.851+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO -agentlib:jdwp=transport=dt_socket,server=y,suspend=n, >>> address=5015 >>> 2018-07-24T12:09:38.851+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO -Dlog4j.configuration=file:/opt/flink/conf/log4j-console. >>> properties >>> 2018-07-24T12:09:38.851+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO -Dlogback.configurationFile=file:/opt/flink/conf/logback- >>> console.xml >>> 2018-07-24T12:09:38.851+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Program Arguments: >>> 2018-07-24T12:09:38.852+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO --configDir >>> 2018-07-24T12:09:38.852+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO /opt/flink/conf >>> 2018-07-24T12:09:38.852+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO --executionMode >>> 2018-07-24T12:09:38.853+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO cluster >>> 2018-07-24T12:09:38.853+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO --host >>> 2018-07-24T12:09:38.853+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO cluster >>> 2018-07-24T12:09:38.853+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Classpath: /opt/flink/lib/flink-metrics- >>> datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0. >>> jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/ >>> flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/ >>> flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1. >>> 7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar::: >>> 2018-07-24T12:09:38.853+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO ------------------------------------------------------------ >>> -------------------- >>> 2018-07-24T12:09:38.854+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Registered UNIX signal handlers for [TERM, HUP, INT] >>> 2018-07-24T12:09:38.895+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Starting StandaloneSessionClusterEntrypoint. >>> 2018-07-24T12:09:38.895+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Install default filesystem. >>> 2018-07-24T12:09:38.927+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Install security context. >>> 2018-07-24T12:09:39.034+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Initializing cluster services. >>> 2018-07-24T12:09:39.059+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Trying to start actor system at flink-jobmanager:6123 >>> 2018-07-24T12:09:40.335+0000 >>> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] >>> INFO Actor system started at akka.tcp://flink@flink-jobmanager:6123 >>> >>> On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <[email protected]> >>> wrote: >>> >>>> Hi Alex, >>>> >>>> I'm not entirely sure what causes this problem because it is the first >>>> time I see it. >>>> >>>> First question would be if the problem also arises if using a different >>>> Hadoop version. >>>> >>>> Are you using the same Java versions on the client as well as on the >>>> server? >>>> >>>> Could you provide us with the cluster entrypoint logs? >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[email protected]> >>>> wrote: >>>> >>>>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink >>>>> 1.5 and getting a weird exception. >>>>> >>>>> Job reads json from s3a and writes parquet files to s3a with avro >>>>> model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have >>>>> access to S3AFileSystem class. >>>>> >>>>> Fails here >>>>> https://github.com/apache/flink/blob/release-1.5.0/ >>>>> flink-runtime/src/main/java/org/apache/flink/runtime/ >>>>> operators/util/TaskConfig.java#L288 >>>>> with >>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat >>>>> (org.apache.flink.api.java.hadoop.mapreduce. >>>>> HadoopOutputFormat@5ebe8168) failed: unread block data >>>>> >>>>> To be exact it fails right on that line. >>>>> https://github.com/apache/flink/blob/release-1.5.0/ >>>>> flink-core/src/main/java/org/apache/flink/util/ >>>>> InstantiationUtil.java#L488 >>>>> >>>>> Not sure how to resolve this problem. Looking for an advice. Let me >>>>> know if more info is needed. Full stack is below. Thanks. >>>>> >>>>> org.apache.flink.runtime.rest.handler.RestHandlerException: >>>>> org.apache.flink.util.FlinkException: Failed to submit job >>>>> 13a1478cbc7ec20f93f9ee0947856bfd. >>>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$ >>>>> handleRequest$3(JarRunHandler.java:141) >>>>> at java.util.concurrent.CompletableFuture.uniExceptionally( >>>>> CompletableFuture.java:870) >>>>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire( >>>>> CompletableFuture.java:852) >>>>> at java.util.concurrent.CompletableFuture.postComplete( >>>>> CompletableFuture.java:474) >>>>> at java.util.concurrent.CompletableFuture.completeExceptionally( >>>>> CompletableFuture.java:1977) >>>>> at org.apache.flink.runtime.concurrent.FutureUtils$1. >>>>> onComplete(FutureUtils.java:811) >>>>> at akka.dispatch.OnComplete.internal(Future.scala:258) >>>>> at akka.dispatch.OnComplete.internal(Future.scala:256) >>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) >>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) >>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>>>> at org.apache.flink.runtime.concurrent.Executors$ >>>>> DirectExecutionContext.execute(Executors.java:83) >>>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise. >>>>> scala:44) >>>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete( >>>>> Promise.scala:252) >>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) >>>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$ >>>>> pipeTo$1.applyOrElse(PipeToSupport.scala:20) >>>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$ >>>>> pipeTo$1.applyOrElse(PipeToSupport.scala:18) >>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch( >>>>> BatchingExecutor.scala:55) >>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1. >>>>> apply$mcV$sp(BatchingExecutor.scala:91) >>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1. >>>>> apply(BatchingExecutor.scala:91) >>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1. >>>>> apply(BatchingExecutor.scala:91) >>>>> at scala.concurrent.BlockContext$.withBlockContext( >>>>> BlockContext.scala:72) >>>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run( >>>>> BatchingExecutor.scala:90) >>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) >>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( >>>>> AbstractDispatcher.scala:415) >>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec( >>>>> ForkJoinTask.java:260) >>>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. >>>>> runTask(ForkJoinPool.java:1339) >>>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker( >>>>> ForkJoinPool.java:1979) >>>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( >>>>> ForkJoinWorkerThread.java:107) >>>>> Caused by: java.util.concurrent.CompletionException: >>>>> org.apache.flink.util.FlinkException: Failed to submit job >>>>> 13a1478cbc7ec20f93f9ee0947856bfd. >>>>> at java.util.concurrent.CompletableFuture.encodeRelay( >>>>> CompletableFuture.java:326) >>>>> at java.util.concurrent.CompletableFuture.completeRelay( >>>>> CompletableFuture.java:338) >>>>> at java.util.concurrent.CompletableFuture.uniRelay( >>>>> CompletableFuture.java:911) >>>>> at java.util.concurrent.CompletableFuture$UniRelay. >>>>> tryFire(CompletableFuture.java:899) >>>>> ... 29 more >>>>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job >>>>> 13a1478cbc7ec20f93f9ee0947856bfd. >>>>> at org.apache.flink.runtime.dispatcher.Dispatcher. >>>>> submitJob(Dispatcher.java:254) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke( >>>>> NativeMethodAccessorImpl.java:62) >>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >>>>> DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( >>>>> AkkaRpcActor.java:247) >>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor. >>>>> handleRpcMessage(AkkaRpcActor.java:162) >>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor. >>>>> handleRpcMessage(FencedAkkaRpcActor.java:70) >>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive( >>>>> AkkaRpcActor.java:142) >>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor. >>>>> onReceive(FencedAkkaRpcActor.java:40) >>>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse( >>>>> UntypedActor.scala:165) >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>>> ... 4 more >>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>> Could not set up JobManager >>>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.< >>>>> init>(JobManagerRunner.java:169) >>>>> at org.apache.flink.runtime.dispatcher.Dispatcher$ >>>>> DefaultJobManagerRunnerFactory.createJobManagerRunner( >>>>> Dispatcher.java:885) >>>>> at org.apache.flink.runtime.dispatcher.Dispatcher. >>>>> createJobManagerRunner(Dispatcher.java:287) >>>>> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob( >>>>> Dispatcher.java:277) >>>>> at org.apache.flink.runtime.dispatcher.Dispatcher. >>>>> persistAndRunJob(Dispatcher.java:262) >>>>> at org.apache.flink.runtime.dispatcher.Dispatcher. >>>>> submitJob(Dispatcher.java:249) >>>>> ... 21 more >>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>> Cannot initialize task 'DataSink (org.apache.flink.api.java. >>>>> hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the >>>>> OutputFormat (org.apache.flink.api.java.hadoop.mapreduce. >>>>> HadoopOutputFormat@5ebe8168) failed: unread block data >>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. >>>>> buildGraph(ExecutionGraphBuilder.java:220) >>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. >>>>> buildGraph(ExecutionGraphBuilder.java:100) >>>>> at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph( >>>>> JobMaster.java:1150) >>>>> at org.apache.flink.runtime.jobmaster.JobMaster. >>>>> createAndRestoreExecutionGraph(JobMaster.java:1130) >>>>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>( >>>>> JobMaster.java:298) >>>>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.< >>>>> init>(JobManagerRunner.java:151) >>>>> ... 26 more >>>>> Caused by: java.lang.Exception: Deserializing the OutputFormat >>>>> (org.apache.flink.api.java.hadoop.mapreduce. >>>>> HadoopOutputFormat@5ebe8168) failed: unread block data >>>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex. >>>>> initializeOnMaster(OutputFormatVertex.java:63) >>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. >>>>> buildGraph(ExecutionGraphBuilder.java:216) >>>>> ... 31 more >>>>> Caused by: java.lang.IllegalStateException: unread block data >>>>> at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode( >>>>> ObjectInputStream.java:2781) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603) >>>>> at java.io.ObjectInputStream.defaultReadFields( >>>>> ObjectInputStream.java:2285) >>>>> at java.io.ObjectInputStream.readSerialData( >>>>> ObjectInputStream.java:2209) >>>>> at java.io.ObjectInputStream.readOrdinaryObject( >>>>> ObjectInputStream.java:2067) >>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) >>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) >>>>> at org.apache.flink.util.InstantiationUtil.deserializeObject( >>>>> InstantiationUtil.java:488) >>>>> at org.apache.flink.util.InstantiationUtil.deserializeObject( >>>>> InstantiationUtil.java:475) >>>>> at org.apache.flink.util.InstantiationUtil.deserializeObject( >>>>> InstantiationUtil.java:463) >>>>> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( >>>>> InstantiationUtil.java:424) >>>>> at org.apache.flink.runtime.operators.util.TaskConfig. >>>>> getStubWrapper(TaskConfig.java:288) >>>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex. >>>>> initializeOnMaster(OutputFormatVertex.java:60) >>>>> ... 32 more >>>>> >>>>> >>
