[ https://issues.apache.org/jira/browse/FLINK-1438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14298847#comment-14298847 ]
Till Rohrmann edited comment on FLINK-1438 at 1/30/15 4:48 PM: --------------------------------------------------------------- This is not only happening in local mode but also on cloud-11 with jobs which contain the InputSplit class information in the user code jars. The result is that Akka cannot deserialize the object and terminates the transmission with the following error: WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://fl...@cloud-22.dima.tu-berlin.de:53627] has failed, address is now gated for [5000] ms. Reason is: [invalid type code: 00]. was (Author: till.rohrmann): This is not only happening in local mode but also on cloud-11 with jobs which contain the InputSplit class information in the user code jars. > ClassCastException for Custom InputSplit in local mode > ------------------------------------------------------ > > Key: FLINK-1438 > URL: https://issues.apache.org/jira/browse/FLINK-1438 > Project: Flink > Issue Type: Bug > Components: JobManager > Affects Versions: 0.8 > Reporter: Fabian Hueske > Priority: Minor > > Jobs with custom InputSplits fail with a ClassCastException such as > {{org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit > cannot be cast to > org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit}} > if executed on a local setup. > This issue is probably related to different ClassLoaders used by the > JobManager when InputSplits are generated and when they are handed to the > InputFormat by the TaskManager. Moving the class of the custom InputSplit > into the {{./lib}} folder and removing it from the job's makes the job work. > To reproduce the bug, run the following job on a local setup. > {code} > public class CustomSplitTestJob { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet<String> x = env.createInput(new TestFileInputFormat()); > x.print(); > env.execute(); > } > public static class TestFileInputFormat implements > InputFormat<String,TestFileInputSplit> { > @Override > public void configure(Configuration parameters) { > } > @Override > public BaseStatistics getStatistics(BaseStatistics > cachedStatistics) throws IOException { > return null; > } > @Override > public TestFileInputSplit[] createInputSplits(int minNumSplits) > throws IOException { > return new TestFileInputSplit[]{new > TestFileInputSplit()}; > } > @Override > public InputSplitAssigner > getInputSplitAssigner(TestFileInputSplit[] inputSplits) { > return new LocatableInputSplitAssigner(inputSplits); > } > @Override > public void open(TestFileInputSplit split) throws IOException { > } > @Override > public boolean reachedEnd() throws IOException { > return false; > } > @Override > public String nextRecord(String reuse) throws IOException { > return null; > } > @Override > public void close() throws IOException { > } > } > public static class TestFileInputSplit extends FileInputSplit { > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)