I reached this solution, do you think it could be ok (taking into account that my local fs block size is 4096):
blockSize = new org.apache.hadoop.conf.Configuration().getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); In this way, if I find the hadoop config files in the resources folder I use that blockSize, otherwise 4096. In this way, also if I run the job locally, I'll have consistent setting whether the hadoop config files are there or not.. Now I have another problem..the byte[] of the Tuple2 is written using thrift 0.9.2 but the one on the Flink dist is 0.6.1 and I think this is the cuase of some errors like java.lang.AbstractMethodError: org.apache.thrift.TUnion.readValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object; at org.apache.thrift.TUnion.read(TUnion.java:135) at org.apache.jena.riot.thrift.BinRDF.apply(BinRDF.java:187) at org.apache.jena.riot.thrift.BinRDF.applyVisitor(BinRDF.java:176) at org.apache.jena.riot.thrift.BinRDF.protocolToStream(BinRDF.java:164) at org.apache.jena.riot.thrift.BinRDF.inputStreamToStream(BinRDF.java:149) What is the best way to fix such version mismatching problems? On Fri, May 8, 2015 at 6:14 PM, Fabian Hueske <[email protected]> wrote: > The point is that you don't want Flink to automatically infer the > parameter because the default parameter depends on the file system. > If you write a file to local FS this happens with a different default > parameter than if you would write to HDFS. > Just set the parameter to 64 MB when reading and writing to the same value. > > 2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <[email protected]>: > >> If I haven't set that param in the code of the job do you think Flink >> automatically infer that param from somewher in the hadoop xxx-site.xml >> files or from the hadoop cluster? >> >> On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[email protected]> wrote: >> >>> The value of the parameter is not important for correctness but it must >>> be the same when writing and reading. >>> Try setting it to 64 MB. >>> >>> >>> 2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[email protected]>: >>> >>>> How can I retrieve the right one..?I I write with a block size >>>> different from the one of HDFS can I still read it then..? >>>> >>>> On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[email protected]> >>>> wrote: >>>> >>>>> Have you tried to explicitly set the blocksize parameter when writing >>>>> and reading? >>>>> The default value might be different when reading from local FS and >>>>> HDFS. >>>>> >>>>> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[email protected]>: >>>>> >>>>>> Hi to all, >>>>>> >>>>>> I've created a dataset of Tuple2<String,byte[]> and I saved it on my >>>>>> local fs (a folder with 8 files because I run the program with >>>>>> parallelism >>>>>> 8) with the following code: >>>>>> >>>>>> Configuration configuration = new Configuration(); >>>>>> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new >>>>>> TypeSerializerOutputFormat<>(); >>>>>> outputFormat.setOutputFilePath(new Path(targetDir)); >>>>>> outputFormat.setWriteMode(WriteMode.OVERWRITE); >>>>>> outputFormat.configure(configuration); >>>>>> ds.output(outputFormat); >>>>>> >>>>>> Then, if I read such a folder from the local fs everything is fine, >>>>>> otherwise if I read it from HDFS I get the following exception: >>>>>> >>>>>> java.io.EOFException >>>>>> at java.io.DataInputStream.readFully(DataInputStream.java:197) >>>>>> at java.io.DataInputStream.readFully(DataInputStream.java:169) >>>>>> at >>>>>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>> at >>>>>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50) >>>>>> at >>>>>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) >>>>>> at >>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) >>>>>> at >>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) >>>>>> >>>>>> Could you help in understanding what's going on? >>>>>> >>>>>> The code I use to read the serialized ds is: >>>>>> >>>>>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new >>>>>> TupleTypeInfo<Tuple2<String, byte[]>>( >>>>>> BasicTypeInfo.STRING_TYPE_INFO, >>>>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); >>>>>> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new >>>>>> TypeSerializerInputFormat<>(tInfo); >>>>>> inputFormat.setFilePath(new Path(inputDir)); >>>>>> inputFormat.configure(conf); >>>>>> DataSet<Tuple6<String, String, String, String, String, String>> ret = >>>>>> env.createInput(inputFormat).flatMap(XXX); >>>>>> >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>> >>>>> >>>> >>>> >>>> >>> >> >
