Hi Jeroen, Rather than bundle the Phoenix client JAR with your app, are you able to include it in a static location either in the SPARK_CLASSPATH, or set the conf values below (I use SPARK_CLASSPATH myself, though it's deprecated):
spark.driver.extraClassPath spark.executor.extraClassPath Josh On Wed, Jun 10, 2015 at 4:11 AM, Jeroen Vlek <j.v...@anchormen.nl> wrote: > Hi Josh, > > Thank you for your effort. Looking at your code, I feel that mine is > semantically the same, except written in Java. The dependencies in the > pom.xml > all have the scope provided. The job is submitted as follows: > > $ rm spark.log && MASTER=spark://maprdemo:7077 > /opt/mapr/spark/spark-1.3.1/bin/spark-submit-jars > /home/mapr/projects/customer/lib/spark-streaming- > > kafka_2.10-1.3.1.jar,/home/mapr/projects/customer/lib/kafka_2.10-0.8.1.1.jar,/home/mapr/projects/customer/lib/zkclient-0.3.jar,/home/mapr/projects/customer/lib/metrics- > core-3.1.0.jar,/home/mapr/projects/customer/lib/metrics- > > core-2.2.0.jar,lib/spark-sql_2.10-1.3.1.jar,/opt/mapr/phoenix/phoenix-4.4.0- > HBase-0.98-bin/phoenix-4.4.0-HBase-0.98-client.jar --class > nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector > KafkaStreamConsumer.jar maprdemo:5181 0 topic jdbc:phoenix:maprdemo:5181 > true > > The spark-defaults.conf is reverted back to its defaults (i.e. no > userClassPathFirst). In the catch-block of the Phoenix connection buildup > the > class path is printed by recursively iterating over the class loaders. The > first one already prints the phoenix-client jar [1]. It's also very > unlikely to > be a bug in Spark or Phoenix, if your proof-of-concept just works. > > So if the JAR that contains the offending class is known by the class > loader, > then that might indicate that there's a second JAR providing the same class > but with a different version, right? > Yet, the only Phoenix JAR on the whole class path hierarchy is the > aforementioned phoenix-client JAR. Furthermore, I googled the class in > question, ClientRpcControllerFactory, and it really only exists in the > Phoenix > project. We're not talking about some low-level AOP Alliance stuff here ;) > > Maybe I'm missing some fundamental class loading knowledge, in that case > I'd > be very happy to be enlightened. This all seems very strange. > > Cheers, > Jeroen > > [1] > [file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./spark- > streaming-kafka_2.10-1.3.1.jar, > > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./kafka_2.10-0.8.1.1.jar, > > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./zkclient-0.3.jar, > > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./phoenix-4.4.0- > HBase-0.98-client.jar, > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./spark- > sql_2.10-1.3.1.jar, > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./metrics- > core-3.1.0.jar, > > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./KafkaStreamConsumer.jar, > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./metrics- > core-2.2.0.jar] > > > On Tuesday, June 09, 2015 11:18:08 AM Josh Mahonin wrote: > > This may or may not be helpful for your classpath issues, but I wanted to > > verify that basic functionality worked, so I made a sample app here: > > > > https://github.com/jmahonin/spark-streaming-phoenix > > > > This consumes events off a Kafka topic using spark streaming, and writes > > out event counts to Phoenix using the new phoenix-spark functionality: > > http://phoenix.apache.org/phoenix_spark.html > > > > It's definitely overkill, and would probably be more efficient to use the > > JDBC driver directly, but it serves as a proof-of-concept. > > > > I've only tested this in local mode. To convert it to a full jobs JAR, I > > suspect that keeping all of the spark and phoenix dependencies marked as > > 'provided', and including the Phoenix client JAR in the Spark classpath > > would work as well. > > > > Good luck, > > > > Josh > > > > On Tue, Jun 9, 2015 at 4:40 AM, Jeroen Vlek <j.v...@work.nl> wrote: > > > Hi, > > > > > > I posted a question with regards to Phoenix and Spark Streaming on > > > StackOverflow [1]. Please find a copy of the question to this email > below > > > the > > > first stack trace. I also already contacted the Phoenix mailing list > and > > > tried > > > the suggestion of setting spark.driver.userClassPathFirst. > Unfortunately > > > that > > > only pushed me further into the dependency hell, which I tried to > resolve > > > until I hit a wall with an UnsatisfiedLinkError on Snappy. > > > > > > What I am trying to achieve: To save a stream from Kafka into > > > Phoenix/Hbase > > > via Spark Streaming. I'm using MapR as a platform and the original > > > exception > > > happens both on a 3-node cluster, as on the MapR Sandbox (a VM for > > > experimentation), in YARN and stand-alone mode. Further experimentation > > > (like > > > the saveAsNewHadoopApiFile below), was done only on the sandbox in > > > standalone > > > mode. > > > > > > Phoenix only supports Spark from 4.4.0 onwards, but I thought I could > > > use a naive implementation that creates a new connection for > > > every RDD from the DStream in 4.3.1. This resulted in the > > > ClassNotFoundException described in [1], so I switched to 4.4.0. > > > > > > Unfortunately the saveToPhoenix method is only available in Scala. So I > > > did > > > find the suggestion to try it via the saveAsNewHadoopApiFile method [2] > > > and an > > > example implementation [3], which I adapted to my own needs. > > > > > > However, 4.4.0 + saveAsNewHadoopApiFile raises the same > > > > > > ClassNotFoundExeption, just a slightly different stacktrace: > > > java.lang.RuntimeException: java.sql.SQLException: ERROR 103 > > > > > > (08004): Unable to establish connection. > > > > > > at > > > > > > > org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOu > > > tputFormat.java:58)> > > > at > > > > > > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.s > > > cala:995)> > > > at > > > > > > > org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.s > > > cala:979)> > > > at > > > > > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > > > > > at org.apache.spark.scheduler.Task.run(Task.scala:64) > > > at > > > > > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > > > > > > at > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: > > > 1145)> > > > at > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java > > > :615)> > > > at java.lang.Thread.run(Thread.java:745) > > > > > > Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to > > > establish connection. > > > > > > at > > > > > > > org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLEx > > > ceptionCode.java:386)> > > > at > > > > > > > org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionI > > > nfo.java:145)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec > > > tionQueryServicesImpl.java:288)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(Connection > > > QueryServicesImpl.java:171)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQue > > > ryServicesImpl.java:1881)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQue > > > ryServicesImpl.java:1860)> > > > at > > > > > > > org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor > > > .java:77)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryS > > > ervicesImpl.java:1860)> > > > at > > > > > > > org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDr > > > iver.java:162)> > > > at > > > > > > > org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDrive > > > r.java:131)> > > > at > > > > > > org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133) > > > > > > at > > > > > > java.sql.DriverManager.getConnection(DriverManager.java:571) > > > > > > at > > > > > > java.sql.DriverManager.getConnection(DriverManager.java:187) > > > > > > at > > > > > > > org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionU > > > til.java:92)> > > > at > > > > > > > org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(Conne > > > ctionUtil.java:80)> > > > at > > > > > > > org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(Conne > > > ctionUtil.java:68)> > > > at > > > > > > > org.apache.phoenix.mapreduce.PhoenixRecordWriter.<init>(PhoenixRecordWrite > > > r.java:49)> > > > at > > > > > > > org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOu > > > tputFormat.java:55)> > > > ... 8 more > > > > > > Caused by: java.io.IOException: > > > java.lang.reflect.InvocationTargetException > > > > > > at > > > > > > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec > > > tionManager.java:457)> > > > at > > > > > > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec > > > tionManager.java:350)> > > > at > > > > > > > org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createC > > > onnection(HConnectionFactory.java:47)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec > > > tionQueryServicesImpl.java:286)> > > > ... 23 more > > > > > > Caused by: java.lang.reflect.InvocationTargetException > > > > > > at > > > > > > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > > > Method) > > > > > > at > > > > > > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAcc > > > essorImpl.java:57)> > > > at > > > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstr > > > uctorAccessorImpl.java:45)> > > > at > > > > > > java.lang.reflect.Constructor.newInstance(Constructor.java:526) > > > > > > at > > > > > > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec > > > tionManager.java:455)> > > > ... 26 more > > > > > > Caused by: java.lang.UnsupportedOperationException: Unable to find > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory > > > > > > at > > > > > > > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref > > > lectionUtils.java:36)> > > > at > > > > > > > org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcController > > > Factory.java:56)> > > > at > > > > > > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio > > > n.<init>(HConnectionManager.java:769)> > > > at > > > > > > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio > > > n.<init>(HConnectionManager.java:689)> > > > ... 31 more > > > > > > Caused by: java.lang.ClassNotFoundException: > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory > > > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > > at java.security.AccessController.doPrivileged(Native Method) > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > > at > > > > > > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > > > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > > at java.lang.Class.forName0(Native Method) > > > at java.lang.Class.forName(Class.java:191) > > > at > > > > > > > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref > > > lectionUtils.java:32)> > > > ... 34 more > > > > > > Driver stacktrace: > > > at > > > > > > org.apache.spark.scheduler.DAGScheduler.org > > > > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGSched > > > uler.scala:1204)> > > > at > > > > > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSch > > > eduler.scala:1193)> > > > at > > > > > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSch > > > eduler.scala:1192)> > > > at > > > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala > > > :59)> > > > at > > > > > > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > > > > > at > > > > > > > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192 > > > ) > > > > > > at > > > > > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.app > > > ly(DAGScheduler.scala:693)> > > > at > > > > > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.app > > > ly(DAGScheduler.scala:693)> > > > at scala.Option.foreach(Option.scala:236) > > > at > > > > > > > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.s > > > cala:693)> > > > at > > > > > > > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSched > > > uler.scala:1393)> > > > at > > > > > > > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSched > > > uler.scala:1354)> > > > at > > > > > > org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > > > > > > > ====== Below is my question from StackOverflow ========== > > > > > > I'm trying to connect to Phoenix via Spark and I keep getting the > > > following exception when opening a connection via the JDBC driver (cut > > > > > > for brevity, full stacktrace below): > > > Caused by: java.lang.ClassNotFoundException: > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory > > > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > > at java.security.AccessController.doPrivileged(Native > Method) > > > > > > The class in question is provided by the jar called phoenix- > > > core-4.3.1.jar (despite it being in the HBase package namespace, I > > > guess they need it to integrate with HBase). > > > > > > There are numerous questions on SO about ClassNotFoundExceptions > > > on Spark and I've tried the fat-jar approach (both with Maven's > > > assembly and shade plugins; I've inspected the jars, they **do** > > > contain ClientRpcControllerFactory), and I've tried a lean jar while > > > specifying the jars on the command line. For the latter, the command > > > > > > I used is as follows: > > > /opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark- > > > > > > streaming- > > > > > > > kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/met > > > rics- > core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar > > > -- class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector > > > KafkaStreamConsumer.jar node1:5181 0 topic > > > jdbc:phoenix:node1:5181 true > > > > > > I've also done a classpath dump from within the code and the first > > > > > > classloader in the hierarchy already knows the Phoenix jar: > > > 2015-06-04 10:52:34,323 [Executor task launch worker-1] INFO > > > > > > nl.work.kafkastreamconsumer.phoenix.LinePersister - > > > [file:/home/work/projects/customer/KafkaStreamConsumer.jar, > > > file:/home/work/projects/customer/lib/spark-streaming- > > > kafka_2.10-1.3.1.jar, > > > file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar, > > > file:/home/work/projects/customer/lib/zkclient-0.3.jar, > > > file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar, > > > file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar, > > > file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar] > > > > > > So the question is: What am I missing here? Why can't Spark load the > > > correct class? There should be only one version of the class flying > > > around (namely the one from phoenix-core), so I doubt it's a > > > versioning conflict. > > > > > > [Executor task launch worker-3] ERROR > > > > > > nl.work.kafkastreamconsumer.phoenix.LinePersister - Error while > > > processing line > > > > > > java.lang.RuntimeException: java.sql.SQLException: ERROR 103 > > > > > > (08004): Unable to establish connection. > > > > > > at > > > > > > > nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnec > > > tion.java:41)> > > > at > > > > > > > nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.jav > > > a:40)> > > > at > > > > > > > nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.jav > > > a:32)> > > > at > > > > > > > org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(Jav > > > aPairRDD.scala:999)> > > > at > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > > > at > scala.collection.Iterator$class.foreach(Iterator.scala:727) > > > at > > > > > > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > > > > > at scala.collection.generic.Growable$class. > > > > > > $plus$plus$eq(Growable.scala:48) > > > > > > at scala.collection.mutable.ArrayBuffer. > > > > > > $plus$plus$eq(ArrayBuffer.scala:103) > > > > > > at scala.collection.mutable.ArrayBuffer. > > > > > > $plus$plus$eq(ArrayBuffer.scala:47) > > > > > > at > > > > > > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > > > > > > at scala.collection.AbstractIterator.to > (Iterator.scala:1157) > > > at > > > > > > > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > > > > > > at > > > > > > scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > > > > > > at > > > > > > > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > > > > > > at > > > > > > scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > > > > > > at > > > > > > org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) > > > > > > at > > > > > > org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) > > > > > > at > > > > > > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1 > > > 498)> > > > at > > > > > > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1 > > > 498)> > > > at > > > > > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > > > > > at org.apache.spark.scheduler.Task.run(Task.scala:64) > > > at > > > > > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > > > > > > at > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: > > > 1145)> > > > at > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java > > > :615)> > > > at java.lang.Thread.run(Thread.java:745) > > > > > > Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to > > > > > > establish connection. > > > > > > at > > > > > > > org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLEx > > > ceptionCode.java:362)> > > > at > > > > > > > org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionI > > > nfo.java:133)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec > > > tionQueryServicesImpl.java:282)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(Connection > > > QueryServicesImpl.java:166)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQue > > > ryServicesImpl.java:1831)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQue > > > ryServicesImpl.java:1810)> > > > at > > > > > > > org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor > > > .java:77)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryS > > > ervicesImpl.java:1810)> > > > at > > > > > > > org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDr > > > iver.java:162)> > > > at > > > > > > > org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDrive > > > r.java:126)> > > > at > > > > > > org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133) > > > > > > at > > > > > > java.sql.DriverManager.getConnection(DriverManager.java:571) > > > > > > at > > > > > > java.sql.DriverManager.getConnection(DriverManager.java:233) > > > > > > at > > > > > > > nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnec > > > tion.java:39)> > > > ... 25 more > > > > > > Caused by: java.io.IOException: > > > java.lang.reflect.InvocationTargetException > > > > > > at > > > > > > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec > > > tionManager.java:457)> > > > at > > > > > > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec > > > tionManager.java:350)> > > > at > > > > > > > org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createC > > > onnection(HConnectionFactory.java:47)> > > > at > > > > > > > org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec > > > tionQueryServicesImpl.java:280)> > > > ... 36 more > > > > > > Caused by: java.lang.reflect.InvocationTargetException > > > > > > at > > > > > > sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown > > > Source) > > > > > > at > > > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstr > > > uctorAccessorImpl.java:45)> > > > at > > > > > > java.lang.reflect.Constructor.newInstance(Constructor.java:526) > > > > > > at > > > > > > > org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec > > > tionManager.java:455)> > > > ... 39 more > > > > > > Caused by: java.lang.UnsupportedOperationException: Unable to > > > > > > find > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory > > > > > > at > > > > > > > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref > > > lectionUtils.java:36)> > > > at > > > > > > > org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcController > > > Factory.java:56)> > > > at > > > > > > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio > > > n.<init>(HConnectionManager.java:769)> > > > at > > > > > > > org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio > > > n.<init>(HConnectionManager.java:689)> > > > ... 43 more > > > > > > Caused by: java.lang.ClassNotFoundException: > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory > > > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > > at java.security.AccessController.doPrivileged(Native > Method) > > > at > > > > > > java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > > at > > > > > > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) > > > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > > at java.lang.Class.forName0(Native Method) > > > at java.lang.Class.forName(Class.java:191) > > > at > > > > > > > org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref > > > lectionUtils.java:32)> > > > ... 46 more > > > > > > **/edit** > > > > > > Unfortunately the issue remains with 4.4.0-HBase-0.98. Below are the > > > classes in question. Since the saveToPhoenix() method is not yet > > > available for the Java API and since this is just a POC, my idea was to > > > simply use the JDBC driver for each mini-batch. > > > > > > public class PhoenixConnection implements AutoCloseable, > > > > > > Serializable { > > > > > > private static final long serialVersionUID = > > > > > > -4491057264383873689L; > > > > > > private static final String PHOENIX_DRIVER = > > > > > > "org.apache.phoenix.jdbc.PhoenixDriver"; > > > > > > static { > > > > > > try { > > > > > > Class.forName(PHOENIX_DRIVER); > > > > > > } catch (ClassNotFoundException e) { > > > > > > throw new RuntimeException(e); > > > > > > } > > > > > > } > > > > > > private Connection connection; > > > > > > public PhoenixConnection(final String jdbcUri) { > > > > > > try { > > > > > > connection = > DriverManager.getConnection(jdbcUri); > > > > > > } catch (SQLException e) { > > > > > > throw new RuntimeException(e); > > > > > > } > > > > > > } > > > > > > public List<Map<String, Object>> executeQuery(final String sql) > > > > > > throws SQLException { > > > > > > ArrayList<Map<String, Object>> resultList = new > > > > > > ArrayList<>(); > > > > > > try (PreparedStatement statement = > > > > > > connection.prepareStatement(sql); ResultSet resultSet = > > > statement.executeQuery() ) { > > > > > > ResultSetMetaData metaData = > > > > > > resultSet.getMetaData(); > > > > > > while (resultSet.next()) { > > > > > > Map<String, Object> row = new > > > > > > HashMap<>(metaData.getColumnCount()); > > > > > > for (int column = 0; column < > > > > > > metaData.getColumnCount(); ++column) { > > > > > > final String columnLabel = > > > > > > metaData.getColumnLabel(column); > > > > > > row.put(columnLabel, > > > > > > resultSet.getObject(columnLabel)); > > > > > > } > > > > > > } > > > > > > } > > > resultList.trimToSize(); > > > > > > return resultList; > > > > > > } > > > > > > @Override > > > public void close() { > > > > > > try { > > > > > > connection.close(); > > > > > > } catch (SQLException e) { > > > > > > throw new RuntimeException(e); > > > > > > } > > > > > > } > > > > > > } > > > > > > public class LinePersister implements Function<JavaRDD<String>, > > > > > > Void> { > > > > > > private static final long serialVersionUID = > > > > > > -2529724617108874989L; > > > > > > private static final Logger LOGGER = > > > > > > Logger.getLogger(LinePersister.class); > > > > > > private static final String TABLE_NAME = "mail_events"; > > > > > > private final String jdbcUrl; > > > > > > public LinePersister(String jdbcUrl) { > > > > > > this.jdbcUrl = jdbcUrl; > > > > > > } > > > > > > > > > > > > @Override > > > public Void call(JavaRDD<String> dataSet) throws Exception { > > > > > > LOGGER.info(String.format( > > > > > > "Starting conversion on rdd with %d > > > > > > elements", > > > dataSet.count())); > > > > > > List<Void> collectResult = dataSet.map(new > > > > > > Function<String, Void>() { > > > > > > private static final long serialVersionUID = > > > > > > -6651313541439109868L; > > > > > > @Override > > > public Void call(String line) throws Exception > { > > > > > > LOGGER.info("Writing line " + line); > > > Event event = > EventParser.parseLine(line); > > > try (PhoenixConnection connection = new > > > > > > PhoenixConnection( > > > > > > jdbcUrl)) { > > > > > > connection.executeQuery(event > > > > > > .createUpsertStatement(TABLE_NAME)); > > > > > > } catch (Exception e) { > > > > > > LOGGER.error("Error while > > > > > > processing line", > > > e); > > > > > > dumpClasspath(this.getClass().getClassLoader()); > > > > > > } > > > return null; > > > > > > } > > > > > > }).collect(); > > > > > > LOGGER.info(String.format("Got %d results: ", > > > > > > collectResult.size())); > > > > > > return null; > > > > > > } > > > > > > public static void dumpClasspath(ClassLoader loader) > > > { > > > > > > LOGGER.info("Classloader " + loader + ":"); > > > > > > if (loader instanceof URLClassLoader) > > > { > > > > > > URLClassLoader ucl = (URLClassLoader)loader; > > > LOGGER.info(Arrays.toString(ucl.getURLs())); > > > > > > } > > > else > > > > > > LOGGER.error("cannot display components as not a > > > > > > URLClassLoader)"); > > > > > > if (loader.getParent() != null) > > > > > > dumpClasspath(loader.getParent()); > > > > > > } > > > > > > } > > > > > > <?xml version="1.0" encoding="UTF-8"?> > > > <project xmlns="http://maven.apache.org/POM/4.0.0" > > > > > > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > > > > > > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > > > > > > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > > > > > > <modelVersion>4.0.0</modelVersion> > > > <groupId>nl.work</groupId> > > > <artifactId>KafkaStreamConsumer</artifactId> > > > <version>1.0</version> > > > <packaging>jar</packaging> > > > <properties> > > > > > > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > > > > > > <maven.compiler.source>1.7</maven.compiler.source> > > > <maven.compiler.target>1.7</maven.compiler.target> > > > <spark.version>1.3.1</spark.version> > > > <hibernate.version>4.3.10.Final</hibernate.version> > > > <phoenix.version>4.4.0-HBase-0.98</phoenix.version> > > > <hbase.version>0.98.9-hadoop2</hbase.version> > > > <spark-hbase.version>0.0.2-clabs-spark-1.3.1</spark- > > > > > > hbase.version> > > > > > > </properties> > > > <dependencies> > > > > > > <dependency> > > > > > > <groupId>org.apache.spark</groupId> > > > <artifactId>spark-core_2.10</artifactId> > > > <version>${spark.version}</version> > > > <scope>provided</scope> > > > > > > </dependency> > > > <dependency> > > > > > > <groupId>org.apache.spark</groupId> > > > <artifactId>spark-streaming_2.10</artifactId> > > > <version>${spark.version}</version> > > > <scope>provided</scope> > > > > > > </dependency> > > > <dependency> > > > > > > <groupId>org.apache.spark</groupId> > > > > <artifactId>spark-streaming-kafka_2.10</artifactId > > > > > > > <version>${spark.version}</version> > > > <scope>provided</scope> > > > > > > </dependency> > > > <dependency> > > > > > > <groupId>org.apache.phoenix</groupId> > > > <artifactId>phoenix-core</artifactId> > > > <version>${phoenix.version}</version> > > > <scope>provided</scope> > > > > > > </dependency> > > > <dependency> > > > > > > <groupId>org.apache.phoenix</groupId> > > > <artifactId>phoenix-spark</artifactId> > > > <version>${phoenix.version}</version> > > > <scope>provided</scope> > > > > > > </dependency> > > > <dependency> > > > > > > <groupId>org.apache.hbase</groupId> > > > <artifactId>hbase-client</artifactId> > > > <version>${hbase.version}</version> > > > <scope>provided</scope> > > > > > > </dependency> > > > <dependency> > > > > > > <groupId>com.cloudera</groupId> > > > <artifactId>spark-hbase</artifactId> > > > <version>${spark-hbase.version}</version> > > > <scope>provided</scope> > > > > > > </dependency> > > > <dependency> > > > > > > <groupId>junit</groupId> > > > <artifactId>junit</artifactId> > > > <version>4.10</version> > > > <scope>test</scope> > > > > > > </dependency> > > > > > > </dependencies> > > > <build> > > > > > > <plugins> > > > > > > <plugin> > > > > > > > <groupId>org.apache.maven.plugins</groupId > > > > > > > > > > <artifactId>maven-compiler-plugin</artifactId> > > > > > > <version>3.3</version> > > > <configuration> > > > > > > <source>${maven.compiler.source}</source> > > > > > > <target>${maven.compiler.target}</target> > > > > > > </configuration> > > > > > > </plugin> > > > <!-- <plugin> > > > > > > <groupId>org.apache.maven.plugins</groupId> <artifactId>maven- > > > shade-plugin</artifactId> > > > > > > <version>2.3</version> <executions> > > > > > > <execution> <phase>package</phase> <goals> > > > > > > <goal>shade</goal> </goals> > > > <configuration> > > > > > > <filters> <filter> <artifact>*:*</artifact> > > > > > > <excludes> > > > <exclude>META-INF/*.SF</exclude> > > > > > > <exclude>META-INF/*.DSA</exclude> > > > > > > <exclude>META-INF/*.RSA</exclude> > > > > > > </excludes> </filter> </filters> </configuration> > > > > > > </execution> </executions> </plugin> > --> > > > > > > </plugins> > > > > > > </build> > > > <repositories> > > > > > > <repository> > > > > > > <id>unknown-jars-temp-repo</id> > > > <name>A temporary repository created by > NetBeans > > > > > > for libraries and jars it could not identify. Please replace the > > > dependencies in this repository with correct ones and delete this > > > repository.</name> > > > > > > <url>file:${project.basedir}/lib</url> > > > > > > </repository> > > > > > > </repositories> > > > > > > </project> > > > > > > Cheers, > > > Jeroen > > > > > > > > > [1] > > > > http://stackoverflow.com/questions/30639659/apache-phoenix-4-3-1-and-4-4-0 > > > -hbase-0-98-on-spark-1-3-1-classnotfoundexceptio [2] > > > https://groups.google.com/forum/#!topic/phoenix-hbase-user/pKnvE1pd_K8 > > > [3] > > > > https://gist.github.com/mravi/444afe7f49821819c987#file-phoenixsparkjob-ja > > > va > > > > > > --------------------------------------------------------------------- > > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > > For additional commands, e-mail: user-h...@spark.apache.org >