Hi Alex, Based on your log information, the potential reason is Hadoop version. To troubleshoot the exception comes from different Hadoop version. I suggest you match the both side of Hadoop version.
You can : 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's official website provides the binary binding Hadoop 2.8.[1] 2. downgrade your fat jar's Hadoop client dependency's version to match Flink Cluster's hadoop dependency's version. [1]: http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz Thanks, vino. 2018-07-24 22:59 GMT+08:00 Alex Vinnik <[email protected]>: > Hi Till, > > Thanks for responding. Below is entrypoint logs. One thing I noticed that > "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could > it be a reason for that error? If so how can i use same hadoop version 2.8 > on flink server side? BTW job runs fine locally reading from the same s3a > buckets when executed using createLocalEnvironment via java -jar my-fat.jar > --input s3a://foo --output s3a://bar > > Regarding java version. The job is submitted via Flink UI, so it should > not be a problem. > > Thanks a lot in advance. > > 2018-07-24T12:09:38.083+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO ------------------------------------------------------------ > -------------------- > 2018-07-24T12:09:38.085+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, > Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC) > 2018-07-24T12:09:38.085+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO OS current user: flink > 2018-07-24T12:09:38.844+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Current Hadoop/Kerberos user: flink > 2018-07-24T12:09:38.844+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11 > 2018-07-24T12:09:38.844+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Maximum heap size: 1963 MiBytes > 2018-07-24T12:09:38.844+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO JAVA_HOME: /docker-java-home/jre > 2018-07-24T12:09:38.851+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Hadoop version: 2.7.3 > 2018-07-24T12:09:38.851+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO JVM Options: > 2018-07-24T12:09:38.851+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Xms2048m > 2018-07-24T12:09:38.851+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Xmx2048m > 2018-07-24T12:09:38.851+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk. > disableCertChecking > 2018-07-24T12:09:38.851+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Dcom.amazonaws.sdk.disableCertChecking > 2018-07-24T12:09:38.851+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -agentlib:jdwp=transport=dt_socket,server=y,suspend=n, > address=5015 > 2018-07-24T12:09:38.851+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Dlog4j.configuration=file:/opt/flink/conf/log4j-console. > properties > 2018-07-24T12:09:38.851+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Dlogback.configurationFile=file:/opt/flink/conf/logback- > console.xml > 2018-07-24T12:09:38.851+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Program Arguments: > 2018-07-24T12:09:38.852+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO --configDir > 2018-07-24T12:09:38.852+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO /opt/flink/conf > 2018-07-24T12:09:38.852+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO --executionMode > 2018-07-24T12:09:38.853+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO cluster > 2018-07-24T12:09:38.853+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO --host > 2018-07-24T12:09:38.853+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO cluster > 2018-07-24T12:09:38.853+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Classpath: /opt/flink/lib/flink-metrics- > datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0. > jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/ > flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/ > flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1. > 7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar::: > 2018-07-24T12:09:38.853+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO ------------------------------------------------------------ > -------------------- > 2018-07-24T12:09:38.854+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Registered UNIX signal handlers for [TERM, HUP, INT] > 2018-07-24T12:09:38.895+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Starting StandaloneSessionClusterEntrypoint. > 2018-07-24T12:09:38.895+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Install default filesystem. > 2018-07-24T12:09:38.927+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Install security context. > 2018-07-24T12:09:39.034+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Initializing cluster services. > 2018-07-24T12:09:39.059+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Trying to start actor system at flink-jobmanager:6123 > 2018-07-24T12:09:40.335+0000 > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Actor system started at akka.tcp://flink@flink-jobmanager:6123 > > On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <[email protected]> > wrote: > >> Hi Alex, >> >> I'm not entirely sure what causes this problem because it is the first >> time I see it. >> >> First question would be if the problem also arises if using a different >> Hadoop version. >> >> Are you using the same Java versions on the client as well as on the >> server? >> >> Could you provide us with the cluster entrypoint logs? >> >> Cheers, >> Till >> >> On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[email protected]> wrote: >> >>> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink >>> 1.5 and getting a weird exception. >>> >>> Job reads json from s3a and writes parquet files to s3a with avro model. >>> Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to >>> S3AFileSystem class. >>> >>> Fails here >>> https://github.com/apache/flink/blob/release-1.5.0/ >>> flink-runtime/src/main/java/org/apache/flink/runtime/ >>> operators/util/TaskConfig.java#L288 >>> with >>> Caused by: java.lang.Exception: Deserializing the OutputFormat >>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) >>> failed: unread block data >>> >>> To be exact it fails right on that line. >>> https://github.com/apache/flink/blob/release-1.5.0/ >>> flink-core/src/main/java/org/apache/flink/util/ >>> InstantiationUtil.java#L488 >>> >>> Not sure how to resolve this problem. Looking for an advice. Let me know >>> if more info is needed. Full stack is below. Thanks. >>> >>> org.apache.flink.runtime.rest.handler.RestHandlerException: >>> org.apache.flink.util.FlinkException: Failed to submit job >>> 13a1478cbc7ec20f93f9ee0947856bfd. >>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$ >>> handleRequest$3(JarRunHandler.java:141) >>> at java.util.concurrent.CompletableFuture.uniExceptionally( >>> CompletableFuture.java:870) >>> at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire( >>> CompletableFuture.java:852) >>> at java.util.concurrent.CompletableFuture.postComplete( >>> CompletableFuture.java:474) >>> at java.util.concurrent.CompletableFuture.completeExceptionally( >>> CompletableFuture.java:1977) >>> at org.apache.flink.runtime.concurrent.FutureUtils$1. >>> onComplete(FutureUtils.java:811) >>> at akka.dispatch.OnComplete.internal(Future.scala:258) >>> at akka.dispatch.OnComplete.internal(Future.scala:256) >>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) >>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) >>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext. >>> execute(Executors.java:83) >>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise. >>> scala:44) >>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete( >>> Promise.scala:252) >>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) >>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$ >>> pipeTo$1.applyOrElse(PipeToSupport.scala:20) >>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$ >>> pipeTo$1.applyOrElse(PipeToSupport.scala:18) >>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch( >>> BatchingExecutor.scala:55) >>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1. >>> apply$mcV$sp(BatchingExecutor.scala:91) >>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1. >>> apply(BatchingExecutor.scala:91) >>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1. >>> apply(BatchingExecutor.scala:91) >>> at scala.concurrent.BlockContext$.withBlockContext( >>> BlockContext.scala:72) >>> at akka.dispatch.BatchingExecutor$BlockableBatch.run( >>> BatchingExecutor.scala:90) >>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) >>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( >>> AbstractDispatcher.scala:415) >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. >>> runTask(ForkJoinPool.java:1339) >>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker( >>> ForkJoinPool.java:1979) >>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( >>> ForkJoinWorkerThread.java:107) >>> Caused by: java.util.concurrent.CompletionException: >>> org.apache.flink.util.FlinkException: Failed to submit job >>> 13a1478cbc7ec20f93f9ee0947856bfd. >>> at java.util.concurrent.CompletableFuture.encodeRelay( >>> CompletableFuture.java:326) >>> at java.util.concurrent.CompletableFuture.completeRelay( >>> CompletableFuture.java:338) >>> at java.util.concurrent.CompletableFuture.uniRelay( >>> CompletableFuture.java:911) >>> at java.util.concurrent.CompletableFuture$UniRelay. >>> tryFire(CompletableFuture.java:899) >>> ... 29 more >>> Caused by: org.apache.flink.util.FlinkException: Failed to submit job >>> 13a1478cbc7ec20f93f9ee0947856bfd. >>> at org.apache.flink.runtime.dispatcher.Dispatcher. >>> submitJob(Dispatcher.java:254) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke( >>> NativeMethodAccessorImpl.java:62) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >>> DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( >>> AkkaRpcActor.java:247) >>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor. >>> handleRpcMessage(AkkaRpcActor.java:162) >>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor. >>> handleRpcMessage(FencedAkkaRpcActor.java:70) >>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive( >>> AkkaRpcActor.java:142) >>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor. >>> onReceive(FencedAkkaRpcActor.java:40) >>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse( >>> UntypedActor.scala:165) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>> ... 4 more >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could >>> not set up JobManager >>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.< >>> init>(JobManagerRunner.java:169) >>> at org.apache.flink.runtime.dispatcher.Dispatcher$ >>> DefaultJobManagerRunnerFactory.createJobManagerRunner( >>> Dispatcher.java:885) >>> at org.apache.flink.runtime.dispatcher.Dispatcher. >>> createJobManagerRunner(Dispatcher.java:287) >>> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob( >>> Dispatcher.java:277) >>> at org.apache.flink.runtime.dispatcher.Dispatcher. >>> persistAndRunJob(Dispatcher.java:262) >>> at org.apache.flink.runtime.dispatcher.Dispatcher. >>> submitJob(Dispatcher.java:249) >>> ... 21 more >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>> Cannot initialize task 'DataSink (org.apache.flink.api.java. >>> hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the >>> OutputFormat (org.apache.flink.api.java.hadoop.mapreduce. >>> HadoopOutputFormat@5ebe8168) failed: unread block data >>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. >>> buildGraph(ExecutionGraphBuilder.java:220) >>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. >>> buildGraph(ExecutionGraphBuilder.java:100) >>> at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph( >>> JobMaster.java:1150) >>> at org.apache.flink.runtime.jobmaster.JobMaster. >>> createAndRestoreExecutionGraph(JobMaster.java:1130) >>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>( >>> JobMaster.java:298) >>> at org.apache.flink.runtime.jobmaster.JobManagerRunner.< >>> init>(JobManagerRunner.java:151) >>> ... 26 more >>> Caused by: java.lang.Exception: Deserializing the OutputFormat >>> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) >>> failed: unread block data >>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex. >>> initializeOnMaster(OutputFormatVertex.java:63) >>> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. >>> buildGraph(ExecutionGraphBuilder.java:216) >>> ... 31 more >>> Caused by: java.lang.IllegalStateException: unread block data >>> at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode( >>> ObjectInputStream.java:2781) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603) >>> at java.io.ObjectInputStream.defaultReadFields( >>> ObjectInputStream.java:2285) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) >>> at java.io.ObjectInputStream.readOrdinaryObject( >>> ObjectInputStream.java:2067) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) >>> at org.apache.flink.util.InstantiationUtil.deserializeObject( >>> InstantiationUtil.java:488) >>> at org.apache.flink.util.InstantiationUtil.deserializeObject( >>> InstantiationUtil.java:475) >>> at org.apache.flink.util.InstantiationUtil.deserializeObject( >>> InstantiationUtil.java:463) >>> at org.apache.flink.util.InstantiationUtil.readObjectFromConfig( >>> InstantiationUtil.java:424) >>> at org.apache.flink.runtime.operators.util.TaskConfig. >>> getStubWrapper(TaskConfig.java:288) >>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex. >>> initializeOnMaster(OutputFormatVertex.java:60) >>> ... 32 more >>> >>>
