Have you tried to explicitly set the blocksize parameter when writing and reading? The default value might be different when reading from local FS and HDFS.
2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[email protected]>: > Hi to all, > > I've created a dataset of Tuple2<String,byte[]> and I saved it on my local > fs (a folder with 8 files because I run the program with parallelism 8) > with the following code: > > Configuration configuration = new Configuration(); > TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new > TypeSerializerOutputFormat<>(); > outputFormat.setOutputFilePath(new Path(targetDir)); > outputFormat.setWriteMode(WriteMode.OVERWRITE); > outputFormat.configure(configuration); > ds.output(outputFormat); > > Then, if I read such a folder from the local fs everything is fine, > otherwise if I read it from HDFS I get the following exception: > > java.io.EOFException > at java.io.DataInputStream.readFully(DataInputStream.java:197) > at java.io.DataInputStream.readFully(DataInputStream.java:169) > at > org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62) > at > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) > at > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87) > at > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50) > at > org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > > Could you help in understanding what's going on? > > The code I use to read the serialized ds is: > > TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new > TupleTypeInfo<Tuple2<String, byte[]>>( > BasicTypeInfo.STRING_TYPE_INFO, > PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); > TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new > TypeSerializerInputFormat<>(tInfo); > inputFormat.setFilePath(new Path(inputDir)); > inputFormat.configure(conf); > DataSet<Tuple6<String, String, String, String, String, String>> ret = > env.createInput(inputFormat).flatMap(XXX); > > > Best, > Flavio >
