Hi!

This looks indeed like a class-loading issue - it looks like "RpcEngine"
and "ProtobufRpcEngine" are loaded via different classloaders.

Can you try the following:

  - In your flink-conf.yml, set classloader.resolve-order: parent-first

If that fixes the issue, then we can look at a way to make this seamless...

On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <hamlin...@gmail.com> wrote:

> Hello,
>
> After moving to Flink 1.4.0 I'm getting the following error. I can't find
> anything online that addresses it. Is it a Hadoop dependency issue? Here
> are my project dependencies:
>
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>   "org.apache.avro" % "avro" % "1.7.7",
>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
> )
>
> *Stacktrace:*
> Cluster configuration: Standalone cluster with JobManager at localhost/
> 127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8082
> Starting execution of program
> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@localho
> st:6123/user/jobmanager#-1321297259] with leader session id
> 00000000-0000-0000-0000-000000000000.
> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.initializeState(BucketingSink.java:358)
> at org.apache.flink.streaming.util.functions.StreamingFunctionU
> tils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionU
> tils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
> erator.initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
> tor.initializeState(AbstractStreamOperator.java:259)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
> zeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
> zeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
> hdfs://localhost:12345/
> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
> opFsFactory.java:187)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
> ileSystem.java:401)
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.createHadoopFileSystem(BucketingSink.java:1154)
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.initFileSystem(BucketingSink.java:411)
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.initializeState(BucketingSink.java:355)
> ... 9 more
> Caused by: java.lang.ClassCastException: 
> org.apache.hadoop.ipc.ProtobufRpcEngine
> cannot be cast to org.apache.hadoop.ipc.RpcEngine
> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
> at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClie
> ntProtocol(NameNodeProxies.java:418)
> at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(Name
> NodeProxies.java:314)
> at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeP
> roxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(Dist
> ributedFileSystem.java:149)
> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
> opFsFactory.java:159)
> ... 13 more
>

Reply via email to