Hi to all,
I;m trying to read a file serialized with kryo but I get this exception
(due to the fact that the createInputSplits creates 8 inputsplits, where
just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at
org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at
org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at
org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

-----------------------------------------------
My program is basically the following:

public static void main(String[] args) throws Exception {

...
//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser")))
{
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}

//deserialise object

myObj=null;

try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);
}


final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new
TypeSerializerInputFormat<>(typeInfo);
inputFormat.setFilePath("file:/tmp/KryoTest.ser");
inputFormat.configure(configuration);

DataSet<MyClass> ds = env.createInput(inputFormat);
ds.print();

}

private static final class MyClassSerializer extends Serializer<MyClass> {

@Override
public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);
}

@Override
public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);
}
}

Am I doing something wrong?

Best,
Flavio

Reply via email to