I also tried with DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
but I get the same error :( Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right? On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com> wrote: > If you create your file by just sequentially writing all objects to the > file using Kryo, you can only read it with a parallelism of 1. > Writing binary files in a way that they can be read in parallel is a bit > tricky (and not specific to Flink). > > 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> Hi to all, >> I;m trying to read a file serialized with kryo but I get this exception >> (due to the fact that the createInputSplits creates 8 inputsplits, where >> just one is not empty..). >> >> Caused by: java.io.IOException: Invalid argument >> at sun.nio.ch.FileChannelImpl.position0(Native Method) >> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >> at >> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >> at >> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >> at >> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> ----------------------------------------------- >> My program is basically the following: >> >> public static void main(String[] args) throws Exception { >> >> ... >> //try-with-resources used to autoclose resources >> try (Output output = new Output(new >> FileOutputStream("/tmp/KryoTest.ser"))) { >> //serialise object >> Kryo kryo=new Kryo(); >> kryo.writeClassAndObject(output, myObj); >> } catch (FileNotFoundException ex) { >> LOG.error(ex.getMessage(), ex); >> } >> >> //deserialise object >> >> myObj=null; >> >> try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){ >> Kryo kryo=new Kryo(); >> myObj =(MyClass)kryo.readClassAndObject(input); >> } catch (FileNotFoundException ex) { >> LOG.error(ex.getMessage(), ex); >> } >> >> >> final ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> env.registerTypeWithKryoSerializer(MyClass.class, >> MyClassSerializer.class); >> Configuration configuration = new Configuration(); >> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >> 64*1024*1024); >> >> TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class); >> final BinaryInputFormat<MyClass> inputFormat = new >> TypeSerializerInputFormat<>(typeInfo); >> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >> inputFormat.configure(configuration); >> >> DataSet<MyClass> ds = env.createInput(inputFormat); >> ds.print(); >> >> } >> >> private static final class MyClassSerializer extends Serializer<MyClass> { >> >> @Override >> public void write(Kryo kryo, Output output, MyClass object) { >> kryo.writeClassAndObject(output, object); >> } >> >> @Override >> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { >> return (MyClass) kryo.readClassAndObject(input); >> } >> } >> >> Am I doing something wrong? >> >> Best, >> Flavio >> >> >> >