0.8,1 On Fri, Mar 20, 2015 at 6:11 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi Flavio! > > Is this on Flink 0.9-SNAPSHOT or 0.8.1 ? > > Stephan > > > On Fri, Mar 20, 2015 at 6:03 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> To make it work I had to clone the Flink repo, imporrt the Flink-java >> project and modify the >> HadoopOutputFormatBase in the open() and finalizeGlobal and call >> >> if(this.mapreduceOutputFormat instanceof Configurable){ >> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration); >> } >> otherwise the "mapred.output.dir" property was always null :( >> >> On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> Hi guys, >>> >>> I was trying to insert into an HBase table with Flink 0.8.1 and it seems >>> to be not possible without creating a custom version of the HBase >>> TableOutputFormat that specialize Mutation with Put. >>> This is my code using the standard Flink APIs: >>> >>> myds.output(new HadoopOutputFormat<Text, Put>(new >>> TableOutputFormat<Text>(), job)); >>> >>> and this is the Exception I get: >>> >>> Exception in thread "main" >>> org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and >>> abstract classes are not valid types: class >>> org.apache.hadoop.hbase.client.Mutation >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) >>> at >>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) >>> .... >>> >>> So I had to copy the TableOutputFormat, rename it as >>> HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat Type >>> argument. >>> However the table filed is not initialized because setConf is not >>> called. Is this a bug of the HadoopOutputFormat wrapper that does not check >>> is the outputFormat is an instance of Configurable and call setConf (as it >>> happens for the inputSlit)? >>> >>> Best, >>> Flavio >>> >> >> >> >>