How can I retrieve the right one..?I I write with a block size different
from the one of HDFS can I still read it then..?

On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[email protected]> wrote:

> Have you tried to explicitly set the blocksize parameter when writing and
> reading?
> The default value might be different when reading from local FS and HDFS.
>
> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[email protected]>:
>
>> Hi to all,
>>
>> I've created a dataset of Tuple2<String,byte[]> and I saved it on my
>> local fs (a folder with 8 files because I run the program with parallelism
>> 8) with the following code:
>>
>> Configuration configuration = new Configuration();
>> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new
>> TypeSerializerOutputFormat<>();
>> outputFormat.setOutputFilePath(new Path(targetDir));
>> outputFormat.setWriteMode(WriteMode.OVERWRITE);
>> outputFormat.configure(configuration);
>> ds.output(outputFormat);
>>
>> Then, if I read such a folder from the local fs everything is fine,
>> otherwise if I read it from HDFS I get the following exception:
>>
>> java.io.EOFException
>> at java.io.DataInputStream.readFully(DataInputStream.java:197)
>> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>> at
>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
>> at
>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
>> at
>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
>> at
>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
>> at
>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>> at
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>
>> Could you help in understanding what's going on?
>>
>> The code I use to read the serialized ds is:
>>
>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new
>> TupleTypeInfo<Tuple2<String, byte[]>>(
>> BasicTypeInfo.STRING_TYPE_INFO,
>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new
>> TypeSerializerInputFormat<>(tInfo);
>> inputFormat.setFilePath(new Path(inputDir));
>> inputFormat.configure(conf);
>> DataSet<Tuple6<String, String, String, String, String, String>> ret =
>> env.createInput(inputFormat).flatMap(XXX);
>>
>>
>> Best,
>> Flavio
>>
>
>

Reply via email to