[ 
https://issues.apache.org/jira/browse/FLINK-30072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-30072:
----------------------------
    Component/s: Runtime / Task
                     (was: Runtime / Coordination)

> Cannot assign instance of SerializedLambda to field 
> KeyGroupStreamPartitioner.keySelector
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-30072
>                 URL: https://issues.apache.org/jira/browse/FLINK-30072
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.16.0
>            Reporter: Nico Kruber
>            Priority: Major
>
> In application mode, if the {{usrlib}} directories of the JM and TM differ, 
> e.g. same jars but different names, the job is failing and throws this 
> cryptic exception on the JM:
> {code}
> 2022-11-17 09:55:12,968 INFO  
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Restarting 
> job.
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not 
> instantiate outputs in order.
>     at 
> org.apache.flink.streaming.api.graph.StreamConfig.getVertexNonChainedOutputs(StreamConfig.java:537)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1600)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1584)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:408)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:362)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:335)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:327)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:317)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.<init>(SourceOperatorStreamTask.java:84)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> jdk.internal.reflect.GeneratedConstructorAccessor38.newInstance(Unknown 
> Source) ~[?:?]
>     at 
> jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown 
> Source) ~[?:?]
>     at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1589)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:714) 
> ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
>  of type org.apache.flink.api.java.functions.KeySelector in instance of 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
>     at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown 
> Source) ~[?:?]
>     at 
> java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown 
> Source) ~[?:?]
>     at java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source) 
> ~[?:?]
>     at java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source) 
> ~[?:?]
>     at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>     at java.util.ArrayList.readObject(Unknown Source) ~[?:?]
>     at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) 
> ~[?:?]
>     at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source) ~[?:?]
>     at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>     at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>     at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     at 
> org.apache.flink.streaming.api.graph.StreamConfig.getVertexNonChainedOutputs(StreamConfig.java:533)
>  ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
>     ... 15 more
> {code}
> It seems that our normal exception reporting is not really working (maybe 
> also a problem for more common failure?). The TM logs do contain the actual 
> exception though:
> {code}
> 2022-11-17 10:11:43,551 WARN  org.apache.flink.runtime.taskmanager.Task       
>              [] - TumblingEventTimeWindows -> Sink: Print to Std. Out (1/1)#3 
> (cc8e6b0246079230c5ac1bc335c70163_c27dcf7b54ef6bfd6cff02ca8870b681_0_3) 
> switched from INITIALIZING to FAILED with failure cause: 
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load 
> user class: 
> com.immerok.cloud.examples.WindowAggregation$MetadataEnrichingWindowFunction
> ClassLoader info: URL ClassLoader:
>     file: 'usrlib/window-agg-2.jar' (missing)
> Class not resolvable through given classloader.
>     at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:397)
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:162)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:681)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.ClassNotFoundException: 
> com.immerok.cloud.examples.WindowAggregation$MetadataEnrichingWindowFunction
>     at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>     at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>     at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
>     at 
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>     at 
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
>     at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>     at 
> org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
>     at java.base/java.lang.Class.forName0(Native Method)
>     at java.base/java.lang.Class.forName(Unknown Source)
>     at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>     at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>     at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>     at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>     at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>     at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
>     at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
>     ... 9 more
> {code}
> h2. How to reproduce:
> # on JM: {{<flink>/usrlib/window-agg-1.jar}}
> # on TM: {{<flink>/usrlib/window-agg-2.jar}}
> then start the job in application mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to