Hi guys,I ran into a weird classpath issue while running a streaming job on a
yarn cluster.I have a relatively simple flow that reads data from kafka, does a
few manipulations and then indexes them on Elasticsearch (2.3).I use the
elasticsearch2 connector (1.1-SNAPSHOT) (bleeding edge, I know).The stream
works fine in a local flink node (1.0.2) (reading from remote kafka and writing
to remote es).However, when deployed to the remote YARN cluster (again, flink
1.0.2) the following exception is thrown:```04/26/2016 10:07:30 Source: Custom
Source -> Flat Map -> Sink: Unnamed(1/8) switched to FAILED
java.lang.NoSuchMethodError:
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
at
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
java.lang.Thread.run(Thread.java:745)04/26/2016 10:07:30 Job execution
switched to status FAILING.java.lang.NoSuchMethodError:
com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
at
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
java.lang.Thread.run(Thread.java:745)04/26/2016 10:07:30 Source: Custom
Source -> Flat Map -> Sink: Unnamed(7/8) switched to FAILED
java.lang.NoClassDefFoundError: Could not initialize class
org.elasticsearch.threadpool.ThreadPool at
org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
at
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
java.lang.Thread.run(Thread.java:745)```I rebuilt the fat jar (I use sbt) many
times and in my fat jar there is no trace of the old guava `MoreExecutor` class
that doesn't contain the `directExecutor` method the transport client
needs.`lib/flink-dist_2.11-1.0.2.jar` unfortunately contains both: the newest
class coming from guava 18 and an old one introduced probably by some ancient
hadoop dependency. For some reason the old version takes precedence.In Spark, I
used to configure spark.driver.userClassPathFirst trueand those problems were
usually dealt with. Is there anything similar?Any ideas? Thanks,Aris