Hi, The phoenix-spark integration is a thin wrapper around the phoenix-mapreduce integration, which under the hood just uses Phoenix's 'UPSERT' functionality for saving. As far as I know, there's no provisions for checkAndPut functionality there, so if you require it, I suggest sticking to the HBase API for now.
Re: timestamp, the phoenix-spark API only supports using the native Phoenix time formats, i.e. DATE, TIMESTAMP. Good luck, Josh On Fri, Jul 17, 2015 at 11:40 PM, kaye ann <kayeann_po...@yahoo.com> wrote: > I am using Spark 1.3, HBase 1.1 and Phoenix 4.4. I have this in my code: > > val rdd = processedRdd.map(r => Row.fromSeq(r)) > val dataframe = sqlContext.createDataFrame(rdd, schema) > dataframe.save("org.apache.phoenix.spark", SaveMode.Overwrite, > Map("table" -> HTABLE, "zkUrl" -> zkQuorum)) > > This code works, but... > 1. How do I implement HBase's checkAndPut using Phoenix-Spark API? > CREATED_DATE is always set to DateTime.now() in the dataframe. > I don't want the field to be updated if the row already exists in HBase, > yet there's an update in other fields. > I can achieve it using HBase's checkAndPut: Put all the fields and use > checkAndPut on created_date field. > 2. How do I add an HBase Timestamp using Phoenix-Spark similiar to HBase > API: > Put(rowkey, timestamp.getMillis) > ----------------- > This is my code using HBase API that I am trying to convert to > Phoenix-Spark since I think Phoenix-Spark is more optimized: > > rdd.foreachPartition(p => { > val conf = HBaseConfiguration.create() > val hTable = new HTable(conf, HTABLE) > hTable.setAutoFlushTo(false) > > p.foreach(r => { > val hTimestamp = ... > val rowkey = ... > > val hRow = new Put(rowkey, hTimestamp.getMillis) > r.filter(...).foreach(tuple => > hRow.add(toBytes(tuple._1), toBytes(tuple._2), toBytes(tuple._3)) > ) > hTable.put(hRow) > > val CREATED_DATE_PUT = new Put(rowkey, hTimestamp.getMillis) > .add(toBytes(CF), toBytes(CREATED_DATE), toBytes(now)) > hTable.checkAndPut(rowkey, toBytes(CF), toBytes(CREATED_DATE), null, > CREATED_DATE_PUT) > > }) > hTable.flushCommits() > hTable.close() > }) > >