The value of the parameter is not important for correctness but it must be the same when writing and reading. Try setting it to 64 MB.
2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[email protected]>: > How can I retrieve the right one..?I I write with a block size different > from the one of HDFS can I still read it then..? > > On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[email protected]> wrote: > >> Have you tried to explicitly set the blocksize parameter when writing and >> reading? >> The default value might be different when reading from local FS and HDFS. >> >> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[email protected]>: >> >>> Hi to all, >>> >>> I've created a dataset of Tuple2<String,byte[]> and I saved it on my >>> local fs (a folder with 8 files because I run the program with parallelism >>> 8) with the following code: >>> >>> Configuration configuration = new Configuration(); >>> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new >>> TypeSerializerOutputFormat<>(); >>> outputFormat.setOutputFilePath(new Path(targetDir)); >>> outputFormat.setWriteMode(WriteMode.OVERWRITE); >>> outputFormat.configure(configuration); >>> ds.output(outputFormat); >>> >>> Then, if I read such a folder from the local fs everything is fine, >>> otherwise if I read it from HDFS I get the following exception: >>> >>> java.io.EOFException >>> at java.io.DataInputStream.readFully(DataInputStream.java:197) >>> at java.io.DataInputStream.readFully(DataInputStream.java:169) >>> at >>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62) >>> at >>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) >>> at >>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87) >>> at >>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>> at >>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50) >>> at >>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) >>> at >>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) >>> at >>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) >>> >>> Could you help in understanding what's going on? >>> >>> The code I use to read the serialized ds is: >>> >>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new >>> TupleTypeInfo<Tuple2<String, byte[]>>( >>> BasicTypeInfo.STRING_TYPE_INFO, >>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); >>> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new >>> TypeSerializerInputFormat<>(tInfo); >>> inputFormat.setFilePath(new Path(inputDir)); >>> inputFormat.configure(conf); >>> DataSet<Tuple6<String, String, String, String, String, String>> ret = >>> env.createInput(inputFormat).flatMap(XXX); >>> >>> >>> Best, >>> Flavio >>> >> >> > > >
