Hi Stephan, the problem is when you try to write into HBase with the HadoopOutputFormat. Unfortunately the recordWriter of the HBase TableOutputFormat requires a Table object to be instantiated through the setConf() method (otherwise you get a nullPointer), and it sets also other parameters in the passed conf. Thus I think that hadoop OutputFormat implementing Configurable should be initialized somewhere with a call to the setConf() as I tried to do. Moreover there's the problem with the Flink Hadoop OutputFormat that requires a property to be set in the configuration (mapred.output.dir) that is not possible to set right now. Or am I'm doing something wrong? Could you try to write some data to an HBase TableOutputFormat and verify this problem?
Thanks again, Flavio On Sat, Mar 21, 2015 at 8:16 PM, Stephan Ewen <se...@apache.org> wrote: > Hi Flavio! > > The issue that abstract classes and interfaces are not supported is > definitely fixed in 0.9. > > Your other fix (adding the call for configuring the output format) - is > that always needed, or just important in a special case? How has the output > format worked before? > > If this is critical to the functionality, would you open a pull request > with this patch? > > Greetings, > Stephan > > > > On Fri, Mar 20, 2015 at 6:28 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> 0.8,1 >> >> On Fri, Mar 20, 2015 at 6:11 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi Flavio! >>> >>> Is this on Flink 0.9-SNAPSHOT or 0.8.1 ? >>> >>> Stephan >>> >>> >>> On Fri, Mar 20, 2015 at 6:03 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> To make it work I had to clone the Flink repo, imporrt the Flink-java >>>> project and modify the >>>> HadoopOutputFormatBase in the open() and finalizeGlobal and call >>>> >>>> if(this.mapreduceOutputFormat instanceof Configurable){ >>>> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration); >>>> } >>>> otherwise the "mapred.output.dir" property was always null :( >>>> >>>> On Fri, Mar 20, 2015 at 10:27 AM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi guys, >>>>> >>>>> I was trying to insert into an HBase table with Flink 0.8.1 and it >>>>> seems to be not possible without creating a custom version of the HBase >>>>> TableOutputFormat that specialize Mutation with Put. >>>>> This is my code using the standard Flink APIs: >>>>> >>>>> myds.output(new HadoopOutputFormat<Text, Put>(new >>>>> TableOutputFormat<Text>(), job)); >>>>> >>>>> and this is the Exception I get: >>>>> >>>>> Exception in thread "main" >>>>> org.apache.flink.api.common.functions.InvalidTypesException: Interfaces >>>>> and >>>>> abstract classes are not valid types: class >>>>> org.apache.hadoop.hbase.client.Mutation >>>>> at >>>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) >>>>> at >>>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) >>>>> .... >>>>> >>>>> So I had to copy the TableOutputFormat, rename it as >>>>> HBaseTableOutputFormat and change Mutation to Put as TableOutputFormat >>>>> Type >>>>> argument. >>>>> However the table filed is not initialized because setConf is not >>>>> called. Is this a bug of the HadoopOutputFormat wrapper that does not >>>>> check >>>>> is the outputFormat is an instance of Configurable and call setConf (as it >>>>> happens for the inputSlit)? >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>> >>>> >>>> >>>> >> >