[ https://issues.apache.org/jira/browse/FLINK-31255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694417#comment-17694417 ]
Zhipeng Zhang commented on FLINK-31255: --------------------------------------- It seems that the bug comes from not unwrapping the streamconfig of the wrapped operator in OperatorUtils#createWrappedOperatorConfig. > OperatorUtils#createWrappedOperatorConfig fails to wrap operator config > ----------------------------------------------------------------------- > > Key: FLINK-31255 > URL: https://issues.apache.org/jira/browse/FLINK-31255 > Project: Flink > Issue Type: Bug > Components: Library / Machine Learning > Affects Versions: ml-2.0.0, ml-2.1.0, ml-2.2.0 > Reporter: Zhipeng Zhang > Priority: Major > > Currently we use operator wrapper to enable using normal operators in > iterations. However, teh operatorConfig is not correctly unwrapped. For > example, the following code fails because of wrong type serializer. > > {code:java} > @Test > public void testIterationWithMapPartition() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<Long> input = > env.fromParallelCollection(new NumberSequenceIterator(0L, 5L), > Types.LONG); > DataStreamList result = > Iterations.iterateBoundedStreamsUntilTermination( > DataStreamList.of(input), > ReplayableDataStreamList.notReplay(input), > IterationConfig.newBuilder() > .setOperatorLifeCycle(OperatorLifeCycle.PER_ROUND) > .build(), > new IterationBodyWithMapPartition()); > List<Integer> counts = > IteratorUtils.toList(result.get(0).executeAndCollect()); > System.out.println(counts.size()); > } > private static class IterationBodyWithMapPartition implements IterationBody { > @Override > public IterationBodyResult process( > DataStreamList variableStreams, DataStreamList dataStreams) { > DataStream<Long> input = variableStreams.get(0); > DataStream<Long> mapPartitionResult = > DataStreamUtils.mapPartition( > input, > new MapPartitionFunction <Long, Long>() { > @Override > public void mapPartition(Iterable <Long> iterable, > Collector <Long> collector) > throws Exception { > for (Long iter: iterable) { > collector.collect(iter); > } > } > }); > DataStream<Integer> terminationCriteria = > mapPartitionResult.<Long>flatMap(new > TerminateOnMaxIter(2)).returns(Types.INT); > return new IterationBodyResult( > DataStreamList.of(mapPartitionResult), variableStreams, > terminationCriteria); > } > } {code} > The error stack is: > Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to > org.apache.flink.iteration.IterationRecord > at > org.apache.flink.iteration.typeinfo.IterationRecordSerializer.serialize(IterationRecordSerializer.java:34) > at > org.apache.flink.iteration.datacache.nonkeyed.FileSegmentWriter.addRecord(FileSegmentWriter.java:79) > at > org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter.addRecord(DataCacheWriter.java:107) > at > org.apache.flink.iteration.datacache.nonkeyed.ListStateWithCache.add(ListStateWithCache.java:148) > at > org.apache.flink.ml.common.datastream.DataStreamUtils$MapPartitionOperator.processElement(DataStreamUtils.java:445) > at > org.apache.flink.iteration.operator.perround.OneInputPerRoundWrapperOperator.processElement(OneInputPerRoundWrapperOperator.java:69) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.20.10#820010)