Hi! This looks indeed like a class-loading issue - it looks like "RpcEngine" and "ProtobufRpcEngine" are loaded via different classloaders.
Can you try the following: - In your flink-conf.yml, set classloader.resolve-order: parent-first If that fixes the issue, then we can look at a way to make this seamless... On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <hamlin...@gmail.com> wrote: > Hello, > > After moving to Flink 1.4.0 I'm getting the following error. I can't find > anything online that addresses it. Is it a Hadoop dependency issue? Here > are my project dependencies: > > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-scala" % flinkVersion % Provided, > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, > "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, > "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion, > "org.apache.flink" % "flink-metrics-core" % flinkVersion, > "org.apache.flink" % "flink-metrics-graphite" % flinkVersion, > "org.apache.kafka" %% "kafka" % "0.10.0.1", > "org.apache.avro" % "avro" % "1.7.7", > "org.apache.parquet" % "parquet-hadoop" % "1.8.1", > "org.apache.parquet" % "parquet-avro" % "1.8.1", > "io.confluent" % "kafka-avro-serializer" % "3.2.0", > "org.apache.hadoop" % "hadoop-common" % "3.0.0" > ) > > *Stacktrace:* > Cluster configuration: Standalone cluster with JobManager at localhost/ > 127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8082 > Starting execution of program > Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for > job completion. > Connected to JobManager at Actor[akka.tcp://flink@localho > st:6123/user/jobmanager#-1321297259] with leader session id > 00000000-0000-0000-0000-000000000000. > 01/03/2018 14:20:52 Job execution switched to status RUNNING. > 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED > 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING > 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING > 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED > java.lang.RuntimeException: Error while creating FileSystem when > initializing the state of the BucketingSink. > at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing > Sink.initializeState(BucketingSink.java:358) > at org.apache.flink.streaming.util.functions.StreamingFunctionU > tils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at org.apache.flink.streaming.util.functions.StreamingFunctionU > tils.restoreFunctionState(StreamingFunctionUtils.java:160) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp > erator.initializeState(AbstractUdfStreamOperator.java:96) > at org.apache.flink.streaming.api.operators.AbstractStreamOpera > tor.initializeState(AbstractStreamOperator.java:259) > at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali > zeOperators(StreamTask.java:694) > at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali > zeState(StreamTask.java:682) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Cannot instantiate file system for URI: > hdfs://localhost:12345/ > at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado > opFsFactory.java:187) > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F > ileSystem.java:401) > at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing > Sink.createHadoopFileSystem(BucketingSink.java:1154) > at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing > Sink.initFileSystem(BucketingSink.java:411) > at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing > Sink.initializeState(BucketingSink.java:355) > ... 9 more > Caused by: java.lang.ClassCastException: > org.apache.hadoop.ipc.ProtobufRpcEngine > cannot be cast to org.apache.hadoop.ipc.RpcEngine > at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207) > at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579) > at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClie > ntProtocol(NameNodeProxies.java:418) > at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(Name > NodeProxies.java:314) > at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeP > roxies.java:176) > at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678) > at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619) > at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(Dist > ributedFileSystem.java:149) > at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado > opFsFactory.java:159) > ... 13 more >