[ 
https://issues.apache.org/jira/browse/FLINK-1438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-1438.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 0.9
         Assignee: Stephan Ewen

Fixed via a07d59d72fc059a600a3eb1f479b02964ca256f5

> ClassCastException for Custom InputSplit in local mode and invalid type code 
> in distributed mode
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-1438
>                 URL: https://issues.apache.org/jira/browse/FLINK-1438
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 0.8, 0.9
>            Reporter: Fabian Hueske
>            Assignee: Stephan Ewen
>            Priority: Minor
>             Fix For: 0.9
>
>
> Jobs with custom InputSplits fail with a ClassCastException such as 
> {{org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit 
> cannot be cast to 
> org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit}} 
> if executed on a local setup. 
> This issue is probably related to different ClassLoaders used by the 
> JobManager when InputSplits are generated and when they are handed to the 
> InputFormat by the TaskManager. Moving the class of the custom InputSplit 
> into the {{./lib}} folder and removing it from the job's makes the job work.
> To reproduce the bug, run the following job on a local setup. 
> {code}
> public class CustomSplitTestJob {
>       public static void main(String[] args) throws Exception {
>               ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>               DataSet<String> x = env.createInput(new TestFileInputFormat());
>               x.print();
>               env.execute();
>       }
>       public static class TestFileInputFormat implements 
> InputFormat<String,TestFileInputSplit> {
>               @Override
>               public void configure(Configuration parameters) {
>               }
>               @Override
>               public BaseStatistics getStatistics(BaseStatistics 
> cachedStatistics) throws IOException {
>                       return null;
>               }
>               @Override
>               public TestFileInputSplit[] createInputSplits(int minNumSplits) 
> throws IOException {
>                       return new TestFileInputSplit[]{new 
> TestFileInputSplit()};
>               }
>               @Override
>               public InputSplitAssigner 
> getInputSplitAssigner(TestFileInputSplit[] inputSplits) {
>                       return new LocatableInputSplitAssigner(inputSplits);
>               }
>               @Override
>               public void open(TestFileInputSplit split) throws IOException {
>               }
>               @Override
>               public boolean reachedEnd() throws IOException {
>                       return false;
>               }
>               @Override
>               public String nextRecord(String reuse) throws IOException {
>                       return null;
>               }
>               @Override
>               public void close() throws IOException {
>               }
>       }
>       public static class TestFileInputSplit extends FileInputSplit {
>       }
> }
> {code}
> The same happens in distributed mode just that Akka terminates the 
> transmission of the input split with a meaningless {{invalid type code: 00}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to