I reached this solution, do you think it could be ok (taking into account
that my local fs block size is 4096):

blockSize = new
org.apache.hadoop.conf.Configuration().getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
4096);

In this way, if I find the hadoop config files in the resources folder I
use that blockSize, otherwise 4096.
In this way, also if I run the job locally, I'll have consistent setting
whether the hadoop config files are there or not..

Now I have another problem..the byte[] of the Tuple2 is written using
thrift 0.9.2 but the one on the Flink dist is 0.6.1 and I think this is the
cuase of some errors like

 java.lang.AbstractMethodError:
org.apache.thrift.TUnion.readValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object;
at org.apache.thrift.TUnion.read(TUnion.java:135)
at org.apache.jena.riot.thrift.BinRDF.apply(BinRDF.java:187)
at org.apache.jena.riot.thrift.BinRDF.applyVisitor(BinRDF.java:176)
at org.apache.jena.riot.thrift.BinRDF.protocolToStream(BinRDF.java:164)
at org.apache.jena.riot.thrift.BinRDF.inputStreamToStream(BinRDF.java:149)

What is the best way to fix such version mismatching problems?

On Fri, May 8, 2015 at 6:14 PM, Fabian Hueske <[email protected]> wrote:

> The point is that you don't want Flink to automatically infer the
> parameter because the default parameter depends on the file system.
> If you write a file to local FS this happens with a different default
> parameter than if you would write to HDFS.
> Just set the parameter to 64 MB when reading and writing to the same value.
>
> 2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <[email protected]>:
>
>> If I haven't set that param in the code of the job do you think Flink
>> automatically infer that param from somewher in the hadoop xxx-site.xml
>> files or from the hadoop cluster?
>>
>> On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[email protected]> wrote:
>>
>>> The value of the parameter is not important for correctness but it must
>>> be the same when writing and reading.
>>> Try setting it to 64 MB.
>>>
>>>
>>> 2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[email protected]>:
>>>
>>>> How can I retrieve the right one..?I I write with a block size
>>>> different from the one of HDFS can I still read it then..?
>>>>
>>>> On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[email protected]>
>>>> wrote:
>>>>
>>>>> Have you tried to explicitly set the blocksize parameter when writing
>>>>> and reading?
>>>>> The default value might be different when reading from local FS and
>>>>> HDFS.
>>>>>
>>>>> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[email protected]>:
>>>>>
>>>>>> Hi to all,
>>>>>>
>>>>>> I've created a dataset of Tuple2<String,byte[]> and I saved it on my
>>>>>> local fs (a folder with 8 files because I run the program with 
>>>>>> parallelism
>>>>>> 8) with the following code:
>>>>>>
>>>>>> Configuration configuration = new Configuration();
>>>>>> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new
>>>>>> TypeSerializerOutputFormat<>();
>>>>>> outputFormat.setOutputFilePath(new Path(targetDir));
>>>>>> outputFormat.setWriteMode(WriteMode.OVERWRITE);
>>>>>> outputFormat.configure(configuration);
>>>>>> ds.output(outputFormat);
>>>>>>
>>>>>> Then, if I read such a folder from the local fs everything is fine,
>>>>>> otherwise if I read it from HDFS I get the following exception:
>>>>>>
>>>>>> java.io.EOFException
>>>>>> at java.io.DataInputStream.readFully(DataInputStream.java:197)
>>>>>> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>>>>>> at
>>>>>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
>>>>>> at
>>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
>>>>>> at
>>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
>>>>>> at
>>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>> at
>>>>>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
>>>>>> at
>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>>>>>> at
>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>>>>>
>>>>>> Could you help in understanding what's going on?
>>>>>>
>>>>>> The code I use to read the serialized ds is:
>>>>>>
>>>>>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new
>>>>>> TupleTypeInfo<Tuple2<String, byte[]>>(
>>>>>> BasicTypeInfo.STRING_TYPE_INFO,
>>>>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>>>>>> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new
>>>>>> TypeSerializerInputFormat<>(tInfo);
>>>>>> inputFormat.setFilePath(new Path(inputDir));
>>>>>> inputFormat.configure(conf);
>>>>>> DataSet<Tuple6<String, String, String, String, String, String>> ret =
>>>>>>  env.createInput(inputFormat).flatMap(XXX);
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to