Ah, I checked the code. The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat. So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.
2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > The file containing the serialized object is 7 bytes > > On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> This might be an issue with the blockSize parameter of the >> BinaryInputFormat. >> How large is the file with the single object? >> >> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> >>> I also tried with >>> >>> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1); >>> >>> but I get the same error :( >>> >>> Moreover, in this example I put exactly one object per file so it should >>> be able to deserialize it, right? >>> >>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> If you create your file by just sequentially writing all objects to the >>>> file using Kryo, you can only read it with a parallelism of 1. >>>> Writing binary files in a way that they can be read in parallel is a >>>> bit tricky (and not specific to Flink). >>>> >>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> Hi to all, >>>>> I;m trying to read a file serialized with kryo but I get this >>>>> exception (due to the fact that the createInputSplits creates 8 >>>>> inputsplits, where just one is not empty..). >>>>> >>>>> Caused by: java.io.IOException: Invalid argument >>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method) >>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285) >>>>> at >>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57) >>>>> at >>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257) >>>>> at >>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46) >>>>> at >>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> ----------------------------------------------- >>>>> My program is basically the following: >>>>> >>>>> public static void main(String[] args) throws Exception { >>>>> >>>>> ... >>>>> //try-with-resources used to autoclose resources >>>>> try (Output output = new Output(new >>>>> FileOutputStream("/tmp/KryoTest.ser"))) { >>>>> //serialise object >>>>> Kryo kryo=new Kryo(); >>>>> kryo.writeClassAndObject(output, myObj); >>>>> } catch (FileNotFoundException ex) { >>>>> LOG.error(ex.getMessage(), ex); >>>>> } >>>>> >>>>> //deserialise object >>>>> >>>>> myObj=null; >>>>> >>>>> try (Input input = new Input( new >>>>> FileInputStream("/tmp/KryoTest.ser"))){ >>>>> Kryo kryo=new Kryo(); >>>>> myObj =(MyClass)kryo.readClassAndObject(input); >>>>> } catch (FileNotFoundException ex) { >>>>> LOG.error(ex.getMessage(), ex); >>>>> } >>>>> >>>>> >>>>> final ExecutionEnvironment env = >>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>> env.registerTypeWithKryoSerializer(MyClass.class, >>>>> MyClassSerializer.class); >>>>> Configuration configuration = new Configuration(); >>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, >>>>> 64*1024*1024); >>>>> >>>>> TypeInformation<MyClass> typeInfo = new >>>>> GenericTypeInfo<>(MyClass.class); >>>>> final BinaryInputFormat<MyClass> inputFormat = new >>>>> TypeSerializerInputFormat<>(typeInfo); >>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser"); >>>>> inputFormat.configure(configuration); >>>>> >>>>> DataSet<MyClass> ds = env.createInput(inputFormat); >>>>> ds.print(); >>>>> >>>>> } >>>>> >>>>> private static final class MyClassSerializer extends >>>>> Serializer<MyClass> { >>>>> >>>>> @Override >>>>> public void write(Kryo kryo, Output output, MyClass object) { >>>>> kryo.writeClassAndObject(output, object); >>>>> } >>>>> >>>>> @Override >>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) { >>>>> return (MyClass) kryo.readClassAndObject(input); >>>>> } >>>>> } >>>>> >>>>> Am I doing something wrong? >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>>> >>>>> >>>> >>> >> > >