How can I retrieve the right one..?I I write with a block size different from the one of HDFS can I still read it then..?
On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[email protected]> wrote: > Have you tried to explicitly set the blocksize parameter when writing and > reading? > The default value might be different when reading from local FS and HDFS. > > 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[email protected]>: > >> Hi to all, >> >> I've created a dataset of Tuple2<String,byte[]> and I saved it on my >> local fs (a folder with 8 files because I run the program with parallelism >> 8) with the following code: >> >> Configuration configuration = new Configuration(); >> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new >> TypeSerializerOutputFormat<>(); >> outputFormat.setOutputFilePath(new Path(targetDir)); >> outputFormat.setWriteMode(WriteMode.OVERWRITE); >> outputFormat.configure(configuration); >> ds.output(outputFormat); >> >> Then, if I read such a folder from the local fs everything is fine, >> otherwise if I read it from HDFS I get the following exception: >> >> java.io.EOFException >> at java.io.DataInputStream.readFully(DataInputStream.java:197) >> at java.io.DataInputStream.readFully(DataInputStream.java:169) >> at >> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62) >> at >> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) >> at >> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87) >> at >> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50) >> at >> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) >> at >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) >> >> Could you help in understanding what's going on? >> >> The code I use to read the serialized ds is: >> >> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new >> TupleTypeInfo<Tuple2<String, byte[]>>( >> BasicTypeInfo.STRING_TYPE_INFO, >> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); >> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new >> TypeSerializerInputFormat<>(tInfo); >> inputFormat.setFilePath(new Path(inputDir)); >> inputFormat.configure(conf); >> DataSet<Tuple6<String, String, String, String, String, String>> ret = >> env.createInput(inputFormat).flatMap(XXX); >> >> >> Best, >> Flavio >> > >
