Hi Mich, I'd encourage you to use this mechanism mentioned by Josh: Another option is to use Phoenix-JDBC from within Spark Streaming. I've got a toy example of using Spark streaming with Phoenix DataFrames, but it could just as easily be a batched JDBC upsert.
Trying to write directly to HBase in a Phoenix-compliant way is likely brittle, especially as Phoenix evolves. Josh's mechanism has the advantage of still going through Phoenix APIs. Thanks, James On Fri, Oct 7, 2016 at 9:24 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Thanks Josh, I will try your code as well. > > I wrote this simple program based on some code that directly creates or > populates an Hbase table called "new" from Spark 2 > > import org.apache.spark._ > import org.apache.spark.rdd.NewHadoopRDD > import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} > import org.apache.hadoop.hbase.client.HBaseAdmin > import org.apache.hadoop.hbase.mapreduce.TableInputFormat > import org.apache.hadoop.fs.Path > import org.apache.hadoop.hbase.HColumnDescriptor > import org.apache.hadoop.hbase.util.Bytes > import org.apache.hadoop.hbase.client.Put > import org.apache.hadoop.hbase.client.HTable > import scala.util.Random > import scala.math._ > import org.apache.spark.sql.functions._ > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > def randomString(chars: String, length: Int): String = > (0 until length).map(_ => chars(Random.nextInt(chars. > length))).mkString > val chars = ('a' to 'z') ++ ('A' to 'Z') > val tableName = "new" > val conf = HBaseConfiguration.create() > // Add local HBase conf > //conf.addResource(new Path("file:////usr/lib/hbase/ > conf/hase-site.xml")) > conf.set(TableInputFormat.INPUT_TABLE, tableName) > // create this table with column family > val admin = new HBaseAdmin(conf) > if(!admin.isTableAvailable(tableName)) { > println("Creating table " + tableName) > val tableDesc = new HTableDescriptor(tableName) > tableDesc.addFamily(new HColumnDescriptor("cf1".getBytes())) > admin.createTable(tableDesc) > }else{ > println("Table " + tableName + " already exists!!") > } > println("populating table") > //put data into table > val myTable = new HTable(conf, tableName) > for (i <- 0 to 99) { > val r = scala.util.Random.nextInt(100000000) > val c = randomString(chars.mkString(""),1) > val key = c+r.toString > val data = randomString(chars.mkString(""),50) > //var p = new Put() > var p = new Put(new String(key).getBytes()) > p.add("cf1".getBytes(), "column-1".getBytes(), new > String(data).getBytes()) > myTable.put(p) > } > myTable.flushCommits() > //create rdd > val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], > classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], > classOf[org.apache.hadoop.hbase.client.Result]) > //get the row count > val count = hBaseRDD.count() > print("HBase RDD count:"+count+"\n") > println ("\nFinished at"); HiveContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > System.exit(0) > } > I am no hands on programmer but it seems to work on Spark shell and with > MVN as a jar file built :) > > hbase(main):265:0> scan 'new', 'LIMIT' => 5 > ROW COLUMN+CELL > A10179499 > column=cf1:column-1, timestamp=1475857020533, value= > riEgIrLuHNKLUmMeEnWZwAWdIUMYqOTkDpqpxnKsnlccuDRvEE > A27318405 > column=cf1:column-1, timestamp=1475857115678, value= > zpQWDjvPXobFkPspBxfTOefULkCidPGTjeLOzuxgLEcfzecVef > A44949791 > column=cf1:column-1, timestamp=1475856238280, value= > kzeuRUCqWYBKXcbPRSWMZLqPpsrLvgkOMLjDArtdJkoOlPGKZs > A4682060 > column=cf1:column-1, timestamp=1475857115666, value= > MTXnucpYRxKbYSVmTVaFtPteWAtxZEUeTMXPntsVLIsMGDghcs > A54369308 > column=cf1:column-1, timestamp=1475856238328, value= > HGYCCAefvCTKWbSwlZxgEauInysLOjXHKauZevnEhZLCLvjDTz > 5 row(s) in 0.0050 seconds > > > Cheers > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 7 October 2016 at 16:24, Josh Mahonin <jmaho...@gmail.com> wrote: > >> Hi Mich, >> >> You're correct that the rowkey is the primary key, but if you're writing >> to HBase directly and bypassing Phoenix, you'll have to be careful about >> the construction of your row keys to adhere to the Phoenix data types and >> row format. I don't think it's very well documented, but you might have >> some luck by checking with the data type implementations here: >> https://github.com/apache/phoenix/tree/master/phoenix-core/s >> rc/main/java/org/apache/phoenix/schema/types >> >> Another option is to use Phoenix-JDBC from within Spark Streaming. I've >> got a toy example of using Spark streaming with Phoenix DataFrames, but it >> could just as easily be a batched JDBC upsert. >> https://github.com/jmahonin/spark-streaming-phoenix/blob/mas >> ter/src/main/scala/SparkStreamingPhoenix.scala >> >> Best of luck, >> >> Josh >> >> On Fri, Oct 7, 2016 at 10:28 AM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Thank you all. very helpful. >>> >>> I have not tried the method Ciureanu suggested but will do so. >>> >>> Now I will be using Spark Streaming to populate Hbase table. I was >>> hoping to do this through Phoenix but managed to write a script to write to >>> Hbase table from Spark 2 itself. >>> >>> Having worked with Hbase I take the row key to be primary key, i.e. >>> unique much like RDBMS (Oracle). Sounds like phoenix relies on that one >>> when creating table on top of Hbase table. Is this assessment correct >>> please? >>> >>> Thanks >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> On 7 October 2016 at 14:30, Ciureanu Constantin < >>> ciureanu.constan...@gmail.com> wrote: >>> >>>> In Spark 1.4 it worked via JDBC - sure it would work in 1.6 / 2.0 >>>> without issues. >>>> >>>> Here's a sample code I used (it was getting data in parallel 24 >>>> partitions) >>>> >>>> >>>> import org.apache.spark.SparkConf >>>> import org.apache.spark.SparkContext >>>> >>>> import org.apache.spark.rdd.JdbcRDD >>>> import java.sql.{Connection, DriverManager, ResultSet} >>>> >>>> sc.addJar("/usr/lib/hbase/hbase-protocol.jar") >>>> sc.addJar("phoenix-x.y.z-bin/phoenix-core-x.y.z.jar") >>>> sc.addJar("phoenix-x.y.z-bin/phoenix-x.y.z-client.jar") >>>> >>>> def createConnection() = { >>>> Class.forName("org.apache.phoenix.jdbc.PhoenixDriver").newInstance(); >>>> DriverManager.getConnection("jdbc:phoenix:hd101.lps.stage,hd >>>> 102.lps.stage,hd103.lps.stage"); // the Zookeeper quorum >>>> } >>>> >>>> def extractValues(r: ResultSet) = { >>>> (r.getLong(1), // datum >>>> r.getInt(2), // pg >>>> r.getString(3), // HID >>>> .... >>>> ) >>>> } >>>> >>>> val data = new JdbcRDD(sc, createConnection, >>>> "SELECT DATUM, PG, HID, ..... WHERE DATUM >= ? * 1000 AND DATUM <= ? >>>> * 1000 and PG = <a value>", >>>> lowerBound = 1364774400, upperBound = 1384774400, numPartitions = 24, >>>> mapRow = extractValues) >>>> >>>> data.count() >>>> >>>> println(data.collect().toList) >>>> >>>> >>>> 2016-10-07 15:20 GMT+02:00 Ted Yu <yuzhih...@gmail.com>: >>>> >>>>> JIRA on hbase side: >>>>> HBASE-16179 >>>>> >>>>> FYI >>>>> >>>>> On Fri, Oct 7, 2016 at 6:07 AM, Josh Mahonin <jmaho...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Mich, >>>>>> >>>>>> There's an open ticket about this issue here: >>>>>> https://issues.apache.org/jira/browse/PHOENIX-3333 >>>>>> >>>>>> Long story short, Spark changed their API (again), breaking the >>>>>> existing integration. I'm not sure the level of effort to get it working >>>>>> with Spark 2.0, but based on examples from other projects, it looks like >>>>>> there's a fair bit of Maven module work to support both Spark 1.x and >>>>>> Spark >>>>>> 2.x concurrently in the same project. Patches are very welcome! >>>>>> >>>>>> Best, >>>>>> >>>>>> Josh >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Oct 7, 2016 at 8:33 AM, Mich Talebzadeh < >>>>>> mich.talebza...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Has anyone managed to read phoenix table in Spark 2 by any chance >>>>>>> please? >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> Dr Mich Talebzadeh >>>>>>> >>>>>>> >>>>>>> >>>>>>> LinkedIn * >>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>> >>>>>>> >>>>>>> >>>>>>> http://talebzadehmich.wordpress.com >>>>>>> >>>>>>> >>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>> for any loss, damage or destruction of data or any other property which >>>>>>> may >>>>>>> arise from relying on this email's technical content is explicitly >>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>> damages >>>>>>> arising from such loss, damage or destruction. >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >