Hi to all, I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).
Caused by: java.io.IOException: Invalid argument at sun.nio.ch.FileChannelImpl.position0(Native Method) at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) ----------------------------------------------- My program is basically the following: public static void main(String[] args) throws Exception { ... //try-with-resources used to autoclose resources try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) { //serialise object Kryo kryo=new Kryo(); kryo.writeClassAndObject(output, myObj); } catch (FileNotFoundException ex) { LOG.error(ex.getMessage(), ex); } //deserialise object myObj=null; try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){ Kryo kryo=new Kryo(); myObj =(MyClass)kryo.readClassAndObject(input); } catch (FileNotFoundException ex) { LOG.error(ex.getMessage(), ex); } final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class); Configuration configuration = new Configuration(); configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024); TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class); final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo); inputFormat.setFilePath("file:/tmp/KryoTest.ser"); inputFormat.configure(configuration); DataSet<MyClass> ds = env.createInput(inputFormat); ds.print(); } private static final class MyClassSerializer extends Serializer<MyClass> { @Override public void write(Kryo kryo, Output output, MyClass object) { kryo.writeClassAndObject(output, object); } @Override public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { return (MyClass) kryo.readClassAndObject(input); } } Am I doing something wrong? Best, Flavio