Hi, It would be more efficient if you configure the table and flush the commits by partition instead of per element in the RDD. The latter works fine because you only have 4 elements, but it won't bid well for large data sets IMO..
Thanks, Deng On Tue, Oct 27, 2015 at 5:22 PM, jinhong lu <lujinho...@gmail.com> wrote: > > Hi, > > I write my result to hdfs, it did well: > > val model = > lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new > TrainFeature())(seqOp, combOp).values > model.map(a => (a.toKey() + "\t" + a.totalCount + "\t" + > a.positiveCount)).saveAsTextFile(modelDataPath); > > But when I want to write to hbase, the applicaton hung, no log, no > response, just stay there, and nothing is written to hbase: > > val model = > lines.map(pairFunction).groupByKey().flatMap(pairFlatMapFunction).aggregateByKey(new > TrainFeature())(seqOp, combOp).values.foreach({ res => > val configuration = HBaseConfiguration.create(); > configuration.set("hbase.zookeeper.property.clientPort", "2181"); > configuration.set("hbase.zookeeper.quorum", “192.168.1.66"); > configuration.set("hbase.master", "192.168.1:60000"); > val hadmin = new HBaseAdmin(configuration); > val table = new HTable(configuration, "ljh_test3"); > var put = new Put(Bytes.toBytes(res.toKey())); > put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), > Bytes.toBytes(res.totalCount + res.positiveCount)); > table.put(put); > table.flushCommits() > }) > > And then I try to write som simple data to hbase, it did well too: > > sc.parallelize(Array(1,2,3,4)).foreach({ res => > val configuration = HBaseConfiguration.create(); > configuration.set("hbase.zookeeper.property.clientPort", "2181"); > configuration.set("hbase.zookeeper.quorum", "192.168.1.66"); > configuration.set("hbase.master", "192.168.1:60000"); > val hadmin = new HBaseAdmin(configuration); > val table = new HTable(configuration, "ljh_test3"); > var put = new Put(Bytes.toBytes(res)); > put.add(Bytes.toBytes("f"), Bytes.toBytes("c"), Bytes.toBytes(res)); > table.put(put); > table.flushCommits() > }) > > what is the problem with the 2rd code? thanks a lot. > >