Hey Guys,

I just want to throw a question regarding the latest flink release 1.4.1. I
have a flink topology which was first written using flink version 1.2.0.
Since then, we are continuously try to keep our libraries upto date. So, we
try to upgrade this same flink topology from version 1.4.0  to 1.4.1.

To give a broader view about the toplogy, it is reading the events from
Kafka and  after some calculations writing the output back into
elasticsearch sink. We are using Kafka 1.0.0 and Elasticsearch 5.4.2.

After changing the library version from 1.4.0 to 1.4.1, I found some
compilation errors because of shaded elasticsearch dependencies (I was
using JodaDate from the transitive dependency). After fixing the import
problem, I build a new fatJar and deployed it on Flink cluster running with
1.4.1. When I deploy the newly built fatJar, I get following exception:

2018-03-01 17:24:45,873 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
      - Job roadrunner_matching (2c44f471297f94963df1c45785797d69) switched
from state FAILING to FAILED.
java.lang.ClassNotFoundException: com.dilax.roadrunner.pcudata.
matcher.topology.PCUDataMatcherTopology$2
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.flink.util.InstantiationUtil$
ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
        at java.io.ObjectInputStream.readNonProxyDesc(
ObjectInputStream.java:1620)
        at java.io.ObjectInputStream.readClassDesc(
ObjectInputStream.java:1521)
        at java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:1781)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.
java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:393)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:380)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(
InstantiationUtil.java:368)
        at org.apache.flink.util.SerializedValue.deserializeValue(
SerializedValue.java:58)
        at org.apache.flink.streaming.connectors.kafka.internals.
AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:521)
        at org.apache.flink.streaming.connectors.kafka.internals.
AbstractFetcher.<init>(AbstractFetcher.java:167)
        at org.apache.flink.streaming.connectors.kafka.internal.
Kafka09Fetcher.<init>(Kafka09Fetcher.java:89)
        at org.apache.flink.streaming.connectors.kafka.internal.
Kafka010Fetcher.<init>(Kafka010Fetcher.java:62)
        at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
        at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
        at org.apache.flink.streaming.api.operators.StreamSource.
run(StreamSource.java:86)
        at org.apache.flink.streaming.api.operators.StreamSource.
run(StreamSource.java:55)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(
SourceStreamTask.java:94)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.
invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:745)

I checked the fat jar and I can see that the class exists there:

com/dilax/roadrunner/pcudata/matcher/topology/
com/dilax/roadrunner/pcudata/matcher/topology/PCUDataMatcherTopology$1.class
com/dilax/roadrunner/pcudata/matcher/topology/PCUDataMatcherTopology$2.class
com/dilax/roadrunner/pcudata/matcher/topology/PCUDataMatcherTopology.class


Did someone else also reported similar issue? or am I missing something? Also,
if I deploy my old jar with 1.4.0 on the same infrastructure it works. I
literally used git diff to compare between the two commit and I can only
see changes in the lib version and change in the imports for JodaDate.

I will investigate it further today and will post if I find the solution.
Meanwhile, if someone here also encountered similar issue post upgrade
please help.


Cheers, Ankit

Reply via email to