The point is that you don't want Flink to automatically infer the parameter
because the default parameter depends on the file system.
If you write a file to local FS this happens with a different default
parameter than if you would write to HDFS.
Just set the parameter to 64 MB when reading and writing to the same value.

2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <[email protected]>:

> If I haven't set that param in the code of the job do you think Flink
> automatically infer that param from somewher in the hadoop xxx-site.xml
> files or from the hadoop cluster?
>
> On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[email protected]> wrote:
>
>> The value of the parameter is not important for correctness but it must
>> be the same when writing and reading.
>> Try setting it to 64 MB.
>>
>>
>> 2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[email protected]>:
>>
>>> How can I retrieve the right one..?I I write with a block size different
>>> from the one of HDFS can I still read it then..?
>>>
>>> On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[email protected]> wrote:
>>>
>>>> Have you tried to explicitly set the blocksize parameter when writing
>>>> and reading?
>>>> The default value might be different when reading from local FS and
>>>> HDFS.
>>>>
>>>> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[email protected]>:
>>>>
>>>>> Hi to all,
>>>>>
>>>>> I've created a dataset of Tuple2<String,byte[]> and I saved it on my
>>>>> local fs (a folder with 8 files because I run the program with parallelism
>>>>> 8) with the following code:
>>>>>
>>>>> Configuration configuration = new Configuration();
>>>>> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new
>>>>> TypeSerializerOutputFormat<>();
>>>>> outputFormat.setOutputFilePath(new Path(targetDir));
>>>>> outputFormat.setWriteMode(WriteMode.OVERWRITE);
>>>>> outputFormat.configure(configuration);
>>>>> ds.output(outputFormat);
>>>>>
>>>>> Then, if I read such a folder from the local fs everything is fine,
>>>>> otherwise if I read it from HDFS I get the following exception:
>>>>>
>>>>> java.io.EOFException
>>>>> at java.io.DataInputStream.readFully(DataInputStream.java:197)
>>>>> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>>>>> at
>>>>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
>>>>> at
>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
>>>>> at
>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
>>>>> at
>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>> at
>>>>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
>>>>> at
>>>>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
>>>>> at
>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>>>>> at
>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>>>>
>>>>> Could you help in understanding what's going on?
>>>>>
>>>>> The code I use to read the serialized ds is:
>>>>>
>>>>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new
>>>>> TupleTypeInfo<Tuple2<String, byte[]>>(
>>>>> BasicTypeInfo.STRING_TYPE_INFO,
>>>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>>>>> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new
>>>>> TypeSerializerInputFormat<>(tInfo);
>>>>> inputFormat.setFilePath(new Path(inputDir));
>>>>> inputFormat.configure(conf);
>>>>> DataSet<Tuple6<String, String, String, String, String, String>> ret =
>>>>>  env.createInput(inputFormat).flatMap(XXX);
>>>>>
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>

Reply via email to