Hi Jeroen,

No problem. I think there's some magic involved with how the Spark
classloader(s) works, especially with regards to the HBase dependencies. I
know there's probably a more light-weight solution that doesn't require
customizing the Spark setup, but that's the most straight-forward way I've
found that works.

Looking again at the docs, I thought I had a PR that mentioned the
SPARK_CLASSPATH, but either I'm dreaming it or it got dropped on the floor.
I'll search around for it today.

Thanks for the StackOverflow heads up, but feel free to update your post
with the resolution, maybe with a GMane link to the thread?

Good luck,

Josh

On Thu, Jun 11, 2015 at 2:38 AM, Jeroen Vlek <j.v...@anchormen.nl> wrote:

> Hi Josh,
>
> That worked! Thank you so much! (I can't believe it was something so
> obvious
> ;) )
>
> If you care about such a thing you could answer my question here for
> bounty:
>
> http://stackoverflow.com/questions/30639659/apache-phoenix-4-3-1-and-4-4-0-hbase-0-98-on-spark-1-3-1-classnotfoundexceptio
>
> Have a great day!
>
> Cheers,
> Jeroen
>
> On Wednesday 10 June 2015 08:58:02 Josh Mahonin wrote:
> > Hi Jeroen,
> >
> > Rather than bundle the Phoenix client JAR with your app, are you able to
> > include it in a static location either in the SPARK_CLASSPATH, or set the
> > conf values below (I use SPARK_CLASSPATH myself, though it's deprecated):
> >
> >   spark.driver.extraClassPath
> >   spark.executor.extraClassPath
> >
> > Josh
> >
> > On Wed, Jun 10, 2015 at 4:11 AM, Jeroen Vlek <j.v...@anchormen.nl>
> wrote:
> > > Hi Josh,
> > >
> > > Thank you for your effort. Looking at your code, I feel that mine is
> > > semantically the same, except written in Java. The dependencies in the
> > > pom.xml
> > > all have the scope provided. The job is submitted as follows:
> > >
> > > $ rm spark.log && MASTER=spark://maprdemo:7077
> > > /opt/mapr/spark/spark-1.3.1/bin/spark-submit-jars
> > > /home/mapr/projects/customer/lib/spark-streaming-
> > >
> > >
> kafka_2.10-1.3.1.jar,/home/mapr/projects/customer/lib/kafka_2.10-0.8.1.1.j
> > >
> ar,/home/mapr/projects/customer/lib/zkclient-0.3.jar,/home/mapr/projects/c
> > > ustomer/lib/metrics-
> > > core-3.1.0.jar,/home/mapr/projects/customer/lib/metrics-
> > >
> > >
> core-2.2.0.jar,lib/spark-sql_2.10-1.3.1.jar,/opt/mapr/phoenix/phoenix-4.4.
> > > 0- HBase-0.98-bin/phoenix-4.4.0-HBase-0.98-client.jar --class
> > > nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector
> > > KafkaStreamConsumer.jar maprdemo:5181 0 topic
> jdbc:phoenix:maprdemo:5181
> > > true
> > >
> > > The spark-defaults.conf is reverted back to its defaults (i.e. no
> > > userClassPathFirst). In the catch-block of the Phoenix connection
> buildup
> > > the
> > > class path is printed by recursively iterating over the class loaders.
> The
> > > first one already prints the phoenix-client jar [1]. It's also very
> > > unlikely to
> > > be a bug in Spark or Phoenix, if your proof-of-concept just works.
> > >
> > > So if the JAR that contains the offending class is known by the class
> > > loader,
> > > then that might indicate that there's a second JAR providing the same
> > > class
> > > but with a different version, right?
> > > Yet, the only Phoenix JAR on the whole class path hierarchy is the
> > > aforementioned phoenix-client JAR. Furthermore, I googled the class in
> > > question, ClientRpcControllerFactory, and it really only exists in the
> > > Phoenix
> > > project. We're not talking about some low-level AOP Alliance stuff
> here ;)
> > >
> > > Maybe I'm missing some fundamental class loading knowledge, in that
> case
> > > I'd
> > > be very happy to be enlightened. This all seems very strange.
> > >
> > > Cheers,
> > > Jeroen
> > >
> > > [1]
> > >
> [file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./spark-
> > > streaming-kafka_2.10-1.3.1.jar,
> > >
> > >
> file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./kafka_2.1
> > > 0-0.8.1.1.jar,
> > >
> > >
> file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./zkclient-
> > > 0.3.jar,
> > >
> > >
> file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./phoenix-4
> > > .4.0- HBase-0.98-client.jar,
> > > file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./spark-
> > > sql_2.10-1.3.1.jar,
> > >
> file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./metrics-
> > > core-3.1.0.jar,
> > >
> > >
> file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./KafkaStre
> > > amConsumer.jar,
> > >
> file:/opt/mapr/spark/spark-1.3.1/tmp/app-20150610010512-0001/0/./metrics-
> > > core-2.2.0.jar]
> > >
> > > On Tuesday, June 09, 2015 11:18:08 AM Josh Mahonin wrote:
> > > > This may or may not be helpful for your classpath issues, but I
> wanted
> > > > to
> > > > verify that basic functionality worked, so I made a sample app here:
> > > >
> > > > https://github.com/jmahonin/spark-streaming-phoenix
> > > >
> > > > This consumes events off a Kafka topic using spark streaming, and
> writes
> > > > out event counts to Phoenix using the new phoenix-spark
> functionality:
> > > > http://phoenix.apache.org/phoenix_spark.html
> > > >
> > > > It's definitely overkill, and would probably be more efficient to use
> > > > the
> > > > JDBC driver directly, but it serves as a proof-of-concept.
> > > >
> > > > I've only tested this in local mode. To convert it to a full jobs
> JAR, I
> > > > suspect that keeping all of the spark and phoenix dependencies
> marked as
> > > > 'provided', and including the Phoenix client JAR in the Spark
> classpath
> > > > would work as well.
> > > >
> > > > Good luck,
> > > >
> > > > Josh
> > > >
> > > > On Tue, Jun 9, 2015 at 4:40 AM, Jeroen Vlek <j.v...@work.nl> wrote:
> > > > > Hi,
> > > > >
> > > > > I posted a question with regards to Phoenix and Spark Streaming on
> > > > > StackOverflow [1]. Please find a copy of the question to this email
> > >
> > > below
> > >
> > > > > the
> > > > > first stack trace. I also already contacted the Phoenix mailing
> list
> > >
> > > and
> > >
> > > > > tried
> > > > > the suggestion of setting spark.driver.userClassPathFirst.
> > >
> > > Unfortunately
> > >
> > > > > that
> > > > > only pushed me further into the dependency hell, which I tried to
> > >
> > > resolve
> > >
> > > > > until I hit a wall with an UnsatisfiedLinkError on Snappy.
> > > > >
> > > > > What I am trying to achieve: To save a stream from Kafka into
> > > > > Phoenix/Hbase
> > > > > via Spark Streaming. I'm using MapR as a platform and the original
> > > > > exception
> > > > > happens both on a 3-node cluster, as on the MapR Sandbox (a VM for
> > > > > experimentation), in YARN and stand-alone mode. Further
> > > > > experimentation
> > > > > (like
> > > > > the saveAsNewHadoopApiFile below), was done only on the sandbox in
> > > > > standalone
> > > > > mode.
> > > > >
> > > > > Phoenix only supports Spark from 4.4.0 onwards, but I thought I
> could
> > > > > use a naive implementation that creates a new connection for
> > > > > every RDD from the DStream in 4.3.1.  This resulted in the
> > > > > ClassNotFoundException described in [1], so I switched to 4.4.0.
> > > > >
> > > > > Unfortunately the saveToPhoenix method is only available in Scala.
> So
> > > > > I
> > > > > did
> > > > > find the suggestion to try it via the saveAsNewHadoopApiFile method
> > > > > [2]
> > > > > and an
> > > > > example implementation [3], which I adapted to my own needs.
> > > > >
> > > > > However, 4.4.0 + saveAsNewHadoopApiFile  raises the same
> > > > >
> > > > > ClassNotFoundExeption, just a slightly different stacktrace:
> > > > >   java.lang.RuntimeException: java.sql.SQLException: ERROR 103
> > > > >
> > > > > (08004): Unable to establish connection.
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOu
> > >
> > > > > tputFormat.java:58)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.s
> > >
> > > > > cala:995)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.s
> > >
> > > > > cala:979)>
> > > > >
> > > > >         at
> > > > >
> > > > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> > > > >
> > > > >         at org.apache.spark.scheduler.Task.run(Task.scala:64)
> > > > >         at
> > > > >
> > > > >
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> > > > >
> > > > >         at
> > >
> > >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
> > > > > 1145)>
> > > > >
> > > > >         at
> > >
> > >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java
> > >
> > > > > :615)>
> > > > > :
> > > > >         at java.lang.Thread.run(Thread.java:745)
> > > > >
> > > > > Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to
> > > > > establish connection.
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLEx
> > >
> > > > > ceptionCode.java:386)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionI
> > >
> > > > > nfo.java:145)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec
> > >
> > > > > tionQueryServicesImpl.java:288)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(Connection
> > >
> > > > > QueryServicesImpl.java:171)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQue
> > >
> > > > > ryServicesImpl.java:1881)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQue
> > >
> > > > > ryServicesImpl.java:1860)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor
> > >
> > > > > .java:77)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryS
> > >
> > > > > ervicesImpl.java:1860)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDr
> > >
> > > > > iver.java:162)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDrive
> > >
> > > > > r.java:131)>
> > > > >
> > > > >         at
> > > > >
> > > > >
> org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
> > > > >
> > > > >         at
> > > > >
> > > > > java.sql.DriverManager.getConnection(DriverManager.java:571)
> > > > >
> > > > >         at
> > > > >
> > > > > java.sql.DriverManager.getConnection(DriverManager.java:187)
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionU
> > >
> > > > > til.java:92)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(Conne
> > >
> > > > > ctionUtil.java:80)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(Conne
> > >
> > > > > ctionUtil.java:68)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.mapreduce.PhoenixRecordWriter.<init>(PhoenixRecordWrite
> > >
> > > > > r.java:49)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOu
> > >
> > > > > tputFormat.java:55)>
> > > > >
> > > > >         ... 8 more
> > > > >
> > > > > Caused by: java.io.IOException:
> > > > > java.lang.reflect.InvocationTargetException
> > > > >
> > > > >         at
> > >
> > >
> org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > >
> > > > > tionManager.java:457)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > >
> > > > > tionManager.java:350)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createC
> > >
> > > > > onnection(HConnectionFactory.java:47)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec
> > >
> > > > > tionQueryServicesImpl.java:286)>
> > > > >
> > > > >         ... 23 more
> > > > >
> > > > > Caused by: java.lang.reflect.InvocationTargetException
> > > > >
> > > > >         at
> > > > >
> > > > > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > > > Method)
> > > > >
> > > > >         at
> > >
> > >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAcc
> > >
> > > > > essorImpl.java:57)>
> > > > >
> > > > >         at
> > >
> > >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstr
> > >
> > > > > uctorAccessorImpl.java:45)>
> > > > >
> > > > >         at
> > > > >
> > > > > java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> > > > >
> > > > >         at
> > >
> > >
> org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > >
> > > > > tionManager.java:455)>
> > > > >
> > > > >         ... 26 more
> > > > >
> > > > > Caused by: java.lang.UnsupportedOperationException: Unable to find
> > > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
> > > > >
> > > > >         at
> > >
> > >
> org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref
> > >
> > > > > lectionUtils.java:36)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcController
> > >
> > > > > Factory.java:56)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio
> > >
> > > > > n.<init>(HConnectionManager.java:769)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio
> > >
> > > > > n.<init>(HConnectionManager.java:689)>
> > > > >
> > > > >         ... 31 more
> > > > >
> > > > > Caused by: java.lang.ClassNotFoundException:
> > > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
> > > > >
> > > > >         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > > >         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > > >         at
> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > > >         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > > >         at
> > > > >
> > > > > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > > > >
> > > > >         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > > >         at java.lang.Class.forName0(Native Method)
> > > > >         at java.lang.Class.forName(Class.java:191)
> > > > >         at
> > >
> > >
> org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref
> > >
> > > > > lectionUtils.java:32)>
> > > > >
> > > > >         ... 34 more
> > > > >
> > > > > Driver stacktrace:
> > > > >         at
> > > > >
> > > > > org.apache.spark.scheduler.DAGScheduler.org
> > >
> > >
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGSched
> > >
> > > > > uler.scala:1204)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSch
> > >
> > > > > eduler.scala:1193)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSch
> > >
> > > > > eduler.scala:1192)>
> > > > >
> > > > >         at
> > >
> > >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala
> > >
> > > > > :59)>
> > > > > :
> > > > >         at
> > > > >
> > > > > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > > > >
> > > > >         at
> > >
> > >
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192
> > >
> > > > > )
> > > > >
> > > > >         at
> > >
> > >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.app
> > >
> > > > > ly(DAGScheduler.scala:693)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.app
> > >
> > > > > ly(DAGScheduler.scala:693)>
> > > > >
> > > > >         at scala.Option.foreach(Option.scala:236)
> > > > >         at
> > >
> > >
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.s
> > >
> > > > > cala:693)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSched
> > >
> > > > > uler.scala:1393)>
> > > > >
> > > > >         at
> > >
> > >
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSched
> > >
> > > > > uler.scala:1354)>
> > > > >
> > > > >         at
> > > > >
> > > > > org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> > > > >
> > > > >
> > > > > ====== Below is my question from StackOverflow ==========
> > > > >
> > > > > I'm trying to connect to Phoenix via Spark and I keep getting the
> > > > > following exception when opening a connection via the JDBC driver
> (cut
> > > > >
> > > > > for brevity, full stacktrace below):
> > > > >     Caused by: java.lang.ClassNotFoundException:
> > > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
> > > > >
> > > > >             at
> java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > > >             at
> java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > > >             at java.security.AccessController.doPrivileged(Native
> > >
> > > Method)
> > >
> > > > > The class in question is provided by the jar called phoenix-
> > > > > core-4.3.1.jar (despite it being in the HBase package namespace, I
> > > > > guess they need it to integrate with HBase).
> > > > >
> > > > > There are numerous questions on SO about ClassNotFoundExceptions
> > > > > on Spark and I've tried the fat-jar approach (both with Maven's
> > > > > assembly and shade plugins; I've inspected the jars, they **do**
> > > > > contain ClientRpcControllerFactory), and I've tried a lean jar
> while
> > > > > specifying the jars on the command line. For the latter, the
> command
> > > > >
> > > > > I used is as follows:
> > > > >     /opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark-
> > > > >
> > > > > streaming-
> > >
> > >
> kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/met
> > >
> > > > > rics-
> > >
> > > core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar
> > >
> > > > > -- class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector
> > > > > KafkaStreamConsumer.jar node1:5181 0 topic
> > > > > jdbc:phoenix:node1:5181 true
> > > > >
> > > > > I've also done a classpath dump from within the code and the first
> > > > >
> > > > > classloader in the hierarchy already knows the Phoenix jar:
> > > > >     2015-06-04 10:52:34,323 [Executor task launch worker-1] INFO
> > > > >
> > > > > nl.work.kafkastreamconsumer.phoenix.LinePersister -
> > > > > [file:/home/work/projects/customer/KafkaStreamConsumer.jar,
> > > > > file:/home/work/projects/customer/lib/spark-streaming-
> > > > > kafka_2.10-1.3.1.jar,
> > > > > file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar,
> > > > > file:/home/work/projects/customer/lib/zkclient-0.3.jar,
> > > > > file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar,
> > > > > file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar,
> > > > > file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar]
> > > > >
> > > > > So the question is: What am I missing here? Why can't Spark load
> the
> > > > > correct class? There should be only one version of the class flying
> > > > > around (namely the one from phoenix-core), so I doubt it's a
> > > > > versioning conflict.
> > > > >
> > > > >     [Executor task launch worker-3] ERROR
> > > > >
> > > > > nl.work.kafkastreamconsumer.phoenix.LinePersister - Error while
> > > > > processing line
> > > > >
> > > > >     java.lang.RuntimeException: java.sql.SQLException: ERROR 103
> > > > >
> > > > > (08004): Unable to establish connection.
> > > > >
> > > > >             at
> > >
> > >
> nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnec
> > >
> > > > > tion.java:41)>
> > > > >
> > > > >             at
> > >
> > >
> nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.jav
> > >
> > > > > a:40)>
> > > > >
> > > > >             at
> > >
> > >
> nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.jav
> > >
> > > > > a:32)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(Jav
> > >
> > > > > aPairRDD.scala:999)>
> > > > >
> > > > >             at
> > >
> > > scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> > >
> > > > >             at
> > >
> > > scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > >
> > > > >             at
> > > > >
> > > > > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > > >
> > > > >             at scala.collection.generic.Growable$class.
> > > > >
> > > > > $plus$plus$eq(Growable.scala:48)
> > > > >
> > > > >             at scala.collection.mutable.ArrayBuffer.
> > > > >
> > > > > $plus$plus$eq(ArrayBuffer.scala:103)
> > > > >
> > > > >             at scala.collection.mutable.ArrayBuffer.
> > > > >
> > > > > $plus$plus$eq(ArrayBuffer.scala:47)
> > > > >
> > > > >             at
> > > > >
> > > > > scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> > > > >
> > > > >             at scala.collection.AbstractIterator.to
> > >
> > > (Iterator.scala:1157)
> > >
> > > > >             at
> > >
> > >
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> > >
> > > > >             at
> > > > >
> > > > > scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> > > > >
> > > > >             at
> > >
> > >
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> > >
> > > > >             at
> > > > >
> > > > > scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> > > > >
> > > > >             at
> > > > >
> > > > > org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> > > > >
> > > > >             at
> > > > >
> > > > > org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
> > > > >
> > > > >             at
> > >
> > >
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1
> > >
> > > > > 498)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1
> > >
> > > > > 498)>
> > > > >
> > > > >             at
> > > > >
> > > > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> > > > >
> > > > >             at org.apache.spark.scheduler.Task.run(Task.scala:64)
> > > > >             at
> > > > >
> > > > >
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> > > > >
> > > > >             at
> > >
> > >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
> > > > > 1145)>
> > > > >
> > > > >             at
> > >
> > >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java
> > >
> > > > > :615)>
> > > > > :
> > > > >             at java.lang.Thread.run(Thread.java:745)
> > > > >
> > > > >     Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to
> > > > >
> > > > > establish connection.
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLEx
> > >
> > > > > ceptionCode.java:362)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionI
> > >
> > > > > nfo.java:133)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec
> > >
> > > > > tionQueryServicesImpl.java:282)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(Connection
> > >
> > > > > QueryServicesImpl.java:166)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQue
> > >
> > > > > ryServicesImpl.java:1831)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQue
> > >
> > > > > ryServicesImpl.java:1810)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor
> > >
> > > > > .java:77)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryS
> > >
> > > > > ervicesImpl.java:1810)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDr
> > >
> > > > > iver.java:162)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDrive
> > >
> > > > > r.java:126)>
> > > > >
> > > > >             at
> > > > >
> > > > >
> org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
> > > > >
> > > > >             at
> > > > >
> > > > > java.sql.DriverManager.getConnection(DriverManager.java:571)
> > > > >
> > > > >             at
> > > > >
> > > > > java.sql.DriverManager.getConnection(DriverManager.java:233)
> > > > >
> > > > >             at
> > >
> > >
> nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnec
> > >
> > > > > tion.java:39)>
> > > > >
> > > > >             ... 25 more
> > > > >
> > > > >     Caused by: java.io.IOException:
> > > > > java.lang.reflect.InvocationTargetException
> > > > >
> > > > >             at
> > >
> > >
> org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > >
> > > > > tionManager.java:457)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > >
> > > > > tionManager.java:350)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createC
> > >
> > > > > onnection(HConnectionFactory.java:47)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(Connec
> > >
> > > > > tionQueryServicesImpl.java:280)>
> > > > >
> > > > >             ... 36 more
> > > > >
> > > > >     Caused by: java.lang.reflect.InvocationTargetException
> > > > >
> > > > >             at
> > > > >
> > > > > sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown
> > > > > Source)
> > > > >
> > > > >             at
> > >
> > >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstr
> > >
> > > > > uctorAccessorImpl.java:45)>
> > > > >
> > > > >             at
> > > > >
> > > > > java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> > > > >
> > > > >             at
> > >
> > >
> org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnec
> > >
> > > > > tionManager.java:455)>
> > > > >
> > > > >             ... 39 more
> > > > >
> > > > >     Caused by: java.lang.UnsupportedOperationException: Unable to
> > > > >
> > > > > find
> > > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
> > > > >
> > > > >             at
> > >
> > >
> org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref
> > >
> > > > > lectionUtils.java:36)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcController
> > >
> > > > > Factory.java:56)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio
> > >
> > > > > n.<init>(HConnectionManager.java:769)>
> > > > >
> > > > >             at
> > >
> > >
> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementatio
> > >
> > > > > n.<init>(HConnectionManager.java:689)>
> > > > >
> > > > >             ... 43 more
> > > > >
> > > > >     Caused by: java.lang.ClassNotFoundException:
> > > > > org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
> > > > >
> > > > >             at
> java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > > >             at
> java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > > >             at java.security.AccessController.doPrivileged(Native
> > >
> > > Method)
> > >
> > > > >             at
> > > > >
> > > > > java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > > >
> > > > >             at
> java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > > >             at
> > > > >
> > > > > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> > > > >
> > > > >             at
> java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > > >             at java.lang.Class.forName0(Native Method)
> > > > >             at java.lang.Class.forName(Class.java:191)
> > > > >             at
> > >
> > >
> org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(Ref
> > >
> > > > > lectionUtils.java:32)>
> > > > >
> > > > >             ... 46 more
> > > > >
> > > > > **/edit**
> > > > >
> > > > > Unfortunately the issue remains with 4.4.0-HBase-0.98. Below are
> the
> > > > > classes in question. Since the saveToPhoenix() method is not yet
> > > > > available for the Java API and since this is just a POC, my idea
> was
> > > > > to
> > > > > simply use the JDBC driver for each mini-batch.
> > > > >
> > > > >     public class PhoenixConnection implements AutoCloseable,
> > > > >
> > > > > Serializable {
> > > > >
> > > > >         private static final long serialVersionUID =
> > > > >
> > > > > -4491057264383873689L;
> > > > >
> > > > >         private static final String PHOENIX_DRIVER =
> > > > >
> > > > > "org.apache.phoenix.jdbc.PhoenixDriver";
> > > > >
> > > > >         static {
> > > > >
> > > > >                 try {
> > > > >
> > > > >                         Class.forName(PHOENIX_DRIVER);
> > > > >
> > > > >                 } catch (ClassNotFoundException e) {
> > > > >
> > > > >                         throw new RuntimeException(e);
> > > > >
> > > > >                 }
> > > > >
> > > > >         }
> > > > >
> > > > >         private Connection connection;
> > > > >
> > > > >         public PhoenixConnection(final String jdbcUri) {
> > > > >
> > > > >                 try {
> > > > >
> > > > >                         connection =
> > >
> > > DriverManager.getConnection(jdbcUri);
> > >
> > > > >                 } catch (SQLException e) {
> > > > >
> > > > >                         throw new RuntimeException(e);
> > > > >
> > > > >                 }
> > > > >
> > > > >         }
> > > > >
> > > > >         public List<Map<String, Object>> executeQuery(final String
> > > > >         sql)
> > > > >
> > > > > throws SQLException {
> > > > >
> > > > >                 ArrayList<Map<String, Object>> resultList = new
> > > > >
> > > > > ArrayList<>();
> > > > >
> > > > >                 try (PreparedStatement statement =
> > > > >
> > > > > connection.prepareStatement(sql); ResultSet resultSet =
> > > > > statement.executeQuery() ) {
> > > > >
> > > > >                         ResultSetMetaData metaData =
> > > > >
> > > > > resultSet.getMetaData();
> > > > >
> > > > >                         while (resultSet.next()) {
> > > > >
> > > > >                                 Map<String, Object> row = new
> > > > >
> > > > > HashMap<>(metaData.getColumnCount());
> > > > >
> > > > >                                 for (int column = 0; column <
> > > > >
> > > > > metaData.getColumnCount(); ++column) {
> > > > >
> > > > >                                         final String columnLabel =
> > > > >
> > > > > metaData.getColumnLabel(column);
> > > > >
> > > > >                                         row.put(columnLabel,
> > > > >
> > > > > resultSet.getObject(columnLabel));
> > > > >
> > > > >                                 }
> > > > >
> > > > >                         }
> > > > >
> > > > >                 }
> > > > >                 resultList.trimToSize();
> > > > >
> > > > >                 return resultList;
> > > > >
> > > > >         }
> > > > >
> > > > >         @Override
> > > > >         public void close() {
> > > > >
> > > > >                 try {
> > > > >
> > > > >                         connection.close();
> > > > >
> > > > >                 } catch (SQLException e) {
> > > > >
> > > > >                         throw new RuntimeException(e);
> > > > >
> > > > >                 }
> > > > >
> > > > >         }
> > > > >
> > > > >     }
> > > > >
> > > > >     public class LinePersister implements Function<JavaRDD<String>,
> > > > >
> > > > > Void> {
> > > > >
> > > > >         private static final long serialVersionUID =
> > > > >
> > > > > -2529724617108874989L;
> > > > >
> > > > >         private static final Logger LOGGER =
> > > > >
> > > > > Logger.getLogger(LinePersister.class);
> > > > >
> > > > >         private static final String TABLE_NAME = "mail_events";
> > > > >
> > > > >         private final String jdbcUrl;
> > > > >
> > > > >         public LinePersister(String jdbcUrl) {
> > > > >
> > > > >                 this.jdbcUrl = jdbcUrl;
> > > > >
> > > > >         }
> > > > >
> > > > >
> > > > >
> > > > >         @Override
> > > > >         public Void call(JavaRDD<String> dataSet) throws Exception
> {
> > > > >
> > > > >                 LOGGER.info(String.format(
> > > > >
> > > > >                                 "Starting conversion on rdd with %d
> > > > >
> > > > > elements",
> > > > > dataSet.count()));
> > > > >
> > > > >                 List<Void> collectResult = dataSet.map(new
> > > > >
> > > > > Function<String, Void>() {
> > > > >
> > > > >                         private static final long serialVersionUID
> =
> > > > >
> > > > > -6651313541439109868L;
> > > > >
> > > > >                         @Override
> > > > >                         public Void call(String line) throws
> Exception
> > >
> > > {
> > >
> > > > >                                 LOGGER.info("Writing line " +
> line);
> > > > >                                 Event event =
> > >
> > > EventParser.parseLine(line);
> > >
> > > > >                                 try (PhoenixConnection connection =
> > > > >                                 new
> > > > >
> > > > > PhoenixConnection(
> > > > >
> > > > >                                                 jdbcUrl)) {
> > > > >
> > > > >
>  connection.executeQuery(event
> > > > >
> > > > > .createUpsertStatement(TABLE_NAME));
> > > > >
> > > > >                                 } catch (Exception e) {
> > > > >
> > > > >                                         LOGGER.error("Error while
> > > > >
> > > > > processing line",
> > > > > e);
> > > > >
> > > > > dumpClasspath(this.getClass().getClassLoader());
> > > > >
> > > > >                                 }
> > > > >                                 return null;
> > > > >
> > > > >                         }
> > > > >
> > > > >                 }).collect();
> > > > >
> > > > >                 LOGGER.info(String.format("Got %d results: ",
> > > > >
> > > > > collectResult.size()));
> > > > >
> > > > >                 return null;
> > > > >
> > > > >         }
> > > > >
> > > > >         public static void dumpClasspath(ClassLoader loader)
> > > > >         {
> > > > >
> > > > >             LOGGER.info("Classloader " + loader + ":");
> > > > >
> > > > >             if (loader instanceof URLClassLoader)
> > > > >             {
> > > > >
> > > > >                 URLClassLoader ucl = (URLClassLoader)loader;
> > > > >                 LOGGER.info(Arrays.toString(ucl.getURLs()));
> > > > >
> > > > >             }
> > > > >             else
> > > > >
> > > > >                 LOGGER.error("cannot display components as not a
> > > > >
> > > > > URLClassLoader)");
> > > > >
> > > > >             if (loader.getParent() != null)
> > > > >
> > > > >                 dumpClasspath(loader.getParent());
> > > > >
> > > > >         }
> > > > >
> > > > >     }
> > > > >
> > > > >     <?xml version="1.0" encoding="UTF-8"?>
> > > > >     <project xmlns="http://maven.apache.org/POM/4.0.0";
> > > > >
> > > > > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
> > > > >
> > > > >         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> > > > >
> > > > > http://maven.apache.org/xsd/maven-4.0.0.xsd";>
> > > > >
> > > > >         <modelVersion>4.0.0</modelVersion>
> > > > >         <groupId>nl.work</groupId>
> > > > >         <artifactId>KafkaStreamConsumer</artifactId>
> > > > >         <version>1.0</version>
> > > > >         <packaging>jar</packaging>
> > > > >         <properties>
> > > > >
> > > > > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
> > > > >
> > > > >                 <maven.compiler.source>1.7</maven.compiler.source>
> > > > >                 <maven.compiler.target>1.7</maven.compiler.target>
> > > > >                 <spark.version>1.3.1</spark.version>
> > > > >                 <hibernate.version>4.3.10.Final</hibernate.version>
> > > > >                 <phoenix.version>4.4.0-HBase-0.98</phoenix.version>
> > > > >                 <hbase.version>0.98.9-hadoop2</hbase.version>
> > > > >
>  <spark-hbase.version>0.0.2-clabs-spark-1.3.1</spark-
> > > > >
> > > > > hbase.version>
> > > > >
> > > > >         </properties>
> > > > >         <dependencies>
> > > > >
> > > > >                 <dependency>
> > > > >
> > > > >                         <groupId>org.apache.spark</groupId>
> > > > >                         <artifactId>spark-core_2.10</artifactId>
> > > > >                         <version>${spark.version}</version>
> > > > >                         <scope>provided</scope>
> > > > >
> > > > >                 </dependency>
> > > > >                 <dependency>
> > > > >
> > > > >                         <groupId>org.apache.spark</groupId>
> > > > >
>  <artifactId>spark-streaming_2.10</artifactId>
> > > > >                         <version>${spark.version}</version>
> > > > >                         <scope>provided</scope>
> > > > >
> > > > >                 </dependency>
> > > > >                 <dependency>
> > > > >
> > > > >                         <groupId>org.apache.spark</groupId>
> > >
> > >  <artifactId>spark-streaming-kafka_2.10</artifactId
> > >
> > > > >                         <version>${spark.version}</version>
> > > > >                         <scope>provided</scope>
> > > > >
> > > > >                 </dependency>
> > > > >                 <dependency>
> > > > >
> > > > >                         <groupId>org.apache.phoenix</groupId>
> > > > >                         <artifactId>phoenix-core</artifactId>
> > > > >                         <version>${phoenix.version}</version>
> > > > >                         <scope>provided</scope>
> > > > >
> > > > >                 </dependency>
> > > > >                 <dependency>
> > > > >
> > > > >                         <groupId>org.apache.phoenix</groupId>
> > > > >                         <artifactId>phoenix-spark</artifactId>
> > > > >                         <version>${phoenix.version}</version>
> > > > >                         <scope>provided</scope>
> > > > >
> > > > >                 </dependency>
> > > > >                 <dependency>
> > > > >
> > > > >                         <groupId>org.apache.hbase</groupId>
> > > > >                         <artifactId>hbase-client</artifactId>
> > > > >                         <version>${hbase.version}</version>
> > > > >                         <scope>provided</scope>
> > > > >
> > > > >                 </dependency>
> > > > >                 <dependency>
> > > > >
> > > > >                         <groupId>com.cloudera</groupId>
> > > > >                         <artifactId>spark-hbase</artifactId>
> > > > >                         <version>${spark-hbase.version}</version>
> > > > >                         <scope>provided</scope>
> > > > >
> > > > >                 </dependency>
> > > > >                 <dependency>
> > > > >
> > > > >                         <groupId>junit</groupId>
> > > > >                         <artifactId>junit</artifactId>
> > > > >                         <version>4.10</version>
> > > > >                         <scope>test</scope>
> > > > >
> > > > >                 </dependency>
> > > > >
> > > > >         </dependencies>
> > > > >         <build>
> > > > >
> > > > >                 <plugins>
> > > > >
> > > > >                         <plugin>
> > >
> > >  <groupId>org.apache.maven.plugins</groupId
> > >
> > > > > <artifactId>maven-compiler-plugin</artifactId>
> > > > >
> > > > >                                 <version>3.3</version>
> > > > >                                 <configuration>
> > > > >
> > > > > <source>${maven.compiler.source}</source>
> > > > >
> > > > > <target>${maven.compiler.target}</target>
> > > > >
> > > > >                                 </configuration>
> > > > >
> > > > >                         </plugin>
> > > > >                         <!-- <plugin>
> > > > >
> > > > > <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-
> > > > > shade-plugin</artifactId>
> > > > >
> > > > >                                 <version>2.3</version> <executions>
> > > > >
> > > > > <execution> <phase>package</phase> <goals>
> > > > >
> > > > >                                 <goal>shade</goal> </goals>
> > > > >                                 <configuration>
> > > > >
> > > > > <filters> <filter> <artifact>*:*</artifact>
> > > > >
> > > > >                                 <excludes>
> > > > >                                 <exclude>META-INF/*.SF</exclude>
> > > > >
> > > > > <exclude>META-INF/*.DSA</exclude>
> > > > >
> > > > >                                 <exclude>META-INF/*.RSA</exclude>
> > > > >
> > > > > </excludes> </filter> </filters> </configuration>
> > > > >
> > > > >                                 </execution> </executions>
> </plugin>
> > >
> > > -->
> > >
> > > > >                 </plugins>
> > > > >
> > > > >         </build>
> > > > >         <repositories>
> > > > >
> > > > >                 <repository>
> > > > >
> > > > >                         <id>unknown-jars-temp-repo</id>
> > > > >                         <name>A temporary repository created by
> > >
> > > NetBeans
> > >
> > > > > for libraries and jars it could not identify. Please replace the
> > > > > dependencies in this repository with correct ones and delete this
> > > > > repository.</name>
> > > > >
> > > > >                         <url>file:${project.basedir}/lib</url>
> > > > >
> > > > >                 </repository>
> > > > >
> > > > >         </repositories>
> > > > >
> > > > >     </project>
> > > > >
> > > > > Cheers,
> > > > > Jeroen
> > > > >
> > > > >
> > > > > [1]
> > >
> > >
> http://stackoverflow.com/questions/30639659/apache-phoenix-4-3-1-and-4-4-0
> > >
> > > > > -hbase-0-98-on-spark-1-3-1-classnotfoundexceptio [2]
> > > > >
> https://groups.google.com/forum/#!topic/phoenix-hbase-user/pKnvE1pd_K8
> > > > > [3]
> > >
> > >
> https://gist.github.com/mravi/444afe7f49821819c987#file-phoenixsparkjob-ja
> > >
> > > > > va
> > > > >
> > > > >
> ---------------------------------------------------------------------
> > > > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > > > > For additional commands, e-mail: user-h...@spark.apache.org
>

Reply via email to