The point is that you don't want Flink to automatically infer the parameter because the default parameter depends on the file system. If you write a file to local FS this happens with a different default parameter than if you would write to HDFS. Just set the parameter to 64 MB when reading and writing to the same value.
2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <[email protected]>: > If I haven't set that param in the code of the job do you think Flink > automatically infer that param from somewher in the hadoop xxx-site.xml > files or from the hadoop cluster? > > On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[email protected]> wrote: > >> The value of the parameter is not important for correctness but it must >> be the same when writing and reading. >> Try setting it to 64 MB. >> >> >> 2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[email protected]>: >> >>> How can I retrieve the right one..?I I write with a block size different >>> from the one of HDFS can I still read it then..? >>> >>> On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[email protected]> wrote: >>> >>>> Have you tried to explicitly set the blocksize parameter when writing >>>> and reading? >>>> The default value might be different when reading from local FS and >>>> HDFS. >>>> >>>> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[email protected]>: >>>> >>>>> Hi to all, >>>>> >>>>> I've created a dataset of Tuple2<String,byte[]> and I saved it on my >>>>> local fs (a folder with 8 files because I run the program with parallelism >>>>> 8) with the following code: >>>>> >>>>> Configuration configuration = new Configuration(); >>>>> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new >>>>> TypeSerializerOutputFormat<>(); >>>>> outputFormat.setOutputFilePath(new Path(targetDir)); >>>>> outputFormat.setWriteMode(WriteMode.OVERWRITE); >>>>> outputFormat.configure(configuration); >>>>> ds.output(outputFormat); >>>>> >>>>> Then, if I read such a folder from the local fs everything is fine, >>>>> otherwise if I read it from HDFS I get the following exception: >>>>> >>>>> java.io.EOFException >>>>> at java.io.DataInputStream.readFully(DataInputStream.java:197) >>>>> at java.io.DataInputStream.readFully(DataInputStream.java:169) >>>>> at >>>>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62) >>>>> at >>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) >>>>> at >>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87) >>>>> at >>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>> at >>>>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50) >>>>> at >>>>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) >>>>> at >>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) >>>>> at >>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) >>>>> >>>>> Could you help in understanding what's going on? >>>>> >>>>> The code I use to read the serialized ds is: >>>>> >>>>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new >>>>> TupleTypeInfo<Tuple2<String, byte[]>>( >>>>> BasicTypeInfo.STRING_TYPE_INFO, >>>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); >>>>> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new >>>>> TypeSerializerInputFormat<>(tInfo); >>>>> inputFormat.setFilePath(new Path(inputDir)); >>>>> inputFormat.configure(conf); >>>>> DataSet<Tuple6<String, String, String, String, String, String>> ret = >>>>> env.createInput(inputFormat).flatMap(XXX); >>>>> >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>> >>>> >>> >>> >>> >> >
