Hi All I am trying to run a flink scala application which reads from kafka apply some lookup transformations and then writes to kafka.
I am using Flink Version 1.12.1 I tested it in local and it works fine. But when I try to run it on cluster using native kubernetes integration I see weird errors like below. The cluster also looks fine, because I tried to run a wordcount application on the cluster and it worked fine. The exception is not clear and also the stacktrace shows the taskmanager stack trace and hence no idea where in the application the problem could be. Could this be a serialization issue? Is there a way to debug such issues and find the actual point in application code where there is a problem? ```org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate serializer. at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(Unknown Source) ~[?:?] at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] ... 8 more Caused by: java.util.concurrent.ExecutionException: java.lang.ClassNotFoundException: __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$ at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] ... 8 more Caused by: java.lang.ClassNotFoundException: __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$ at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at java.lang.Class.forName0(Native Method) ~[?:?] at java.lang.Class.forName(Unknown Source) ~[?:?] at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] ... 8 more```