Hi, I posted a question with regards to Phoenix and Spark Streaming on StackOverflow [1]. Please find a copy of the question to this email below the first stack trace. I also already contacted the Phoenix mailing list and tried the suggestion of setting spark.driver.userClassPathFirst. Unfortunately that only pushed me further into the dependency hell, which I tried to resolve until I hit a wall with an UnsatisfiedLinkError on Snappy.
What I am trying to achieve: To save a stream from Kafka into Phoenix/Hbase via Spark Streaming. I'm using MapR as a platform and the original exception happens both on a 3-node cluster, as on the MapR Sandbox (a VM for experimentation), in YARN and stand-alone mode. Further experimentation (like the saveAsNewHadoopApiFile below), was done only on the sandbox in standalone mode. Phoenix only supports Spark from 4.4.0 onwards, but I thought I could use a naive implementation that creates a new connection for every RDD from the DStream in 4.3.1. This resulted in the ClassNotFoundException described in [1], so I switched to 4.4.0. Unfortunately the saveToPhoenix method is only available in Scala. So I did find the suggestion to try it via the saveAsNewHadoopApiFile method [2] and an example implementation [3], which I adapted to my own needs. However, 4.4.0 + saveAsNewHadoopApiFile raises the same ClassNotFoundExeption, just a slightly different stacktrace: java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection. at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:995) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:979) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection. at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:386) at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:145) at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:288) at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:171) at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1881) at org.apache.phoenix.query.ConnectionQueryServicesImpl$12.call(ConnectionQueryServicesImpl.java:1860) at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77) at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1860) at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162) at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:131) at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133) at java.sql.DriverManager.getConnection(DriverManager.java:571) at java.sql.DriverManager.getConnection(DriverManager.java:187) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getConnection(ConnectionUtil.java:92) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:80) at org.apache.phoenix.mapreduce.util.ConnectionUtil.getOutputConnection(ConnectionUtil.java:68) at org.apache.phoenix.mapreduce.PhoenixRecordWriter.<init>(PhoenixRecordWriter.java:49) at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:55) ... 8 more Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457) at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350) at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47) at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:286) ... 23 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455) ... 26 more Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36) at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689) ... 31 more Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32) ... 34 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) ====== Below is my question from StackOverflow ========== I'm trying to connect to Phoenix via Spark and I keep getting the following exception when opening a connection via the JDBC driver (cut for brevity, full stacktrace below): Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) The class in question is provided by the jar called phoenix- core-4.3.1.jar (despite it being in the HBase package namespace, I guess they need it to integrate with HBase). There are numerous questions on SO about ClassNotFoundExceptions on Spark and I've tried the fat-jar approach (both with Maven's assembly and shade plugins; I've inspected the jars, they **do** contain ClientRpcControllerFactory), and I've tried a lean jar while specifying the jars on the command line. For the latter, the command I used is as follows: /opt/mapr/spark/spark-1.3.1/bin/spark-submit --jars lib/spark- streaming- kafka_10-1.3.1.jar,lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar,lib/metrics- core-3.1.0.jar,lib/metrics-core-2.2.0.jar,lib/phoenix-core-4.3.1.jar -- class nl.work.kafkastreamconsumer.phoenix.KafkaPhoenixConnector KafkaStreamConsumer.jar node1:5181 0 topic jdbc:phoenix:node1:5181 true I've also done a classpath dump from within the code and the first classloader in the hierarchy already knows the Phoenix jar: 2015-06-04 10:52:34,323 [Executor task launch worker-1] INFO nl.work.kafkastreamconsumer.phoenix.LinePersister - [file:/home/work/projects/customer/KafkaStreamConsumer.jar, file:/home/work/projects/customer/lib/spark-streaming- kafka_2.10-1.3.1.jar, file:/home/work/projects/customer/lib/kafka_2.10-0.8.1.1.jar, file:/home/work/projects/customer/lib/zkclient-0.3.jar, file:/home/work/projects/customer/lib/metrics-core-3.1.0.jar, file:/home/work/projects/customer/lib/metrics-core-2.2.0.jar, file:/home/work/projects/customer/lib/phoenix-core-4.3.1.jar] So the question is: What am I missing here? Why can't Spark load the correct class? There should be only one version of the class flying around (namely the one from phoenix-core), so I doubt it's a versioning conflict. [Executor task launch worker-3] ERROR nl.work.kafkastreamconsumer.phoenix.LinePersister - Error while processing line java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection. at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:41) at nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:40) at nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:32) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:999) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class. $plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer. $plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer. $plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection. at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:362) at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:133) at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:282) at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:166) at org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1831) at org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1810) at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77) at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1810) at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162) at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:126) at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133) at java.sql.DriverManager.getConnection(DriverManager.java:571) at java.sql.DriverManager.getConnection(DriverManager.java:233) at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:39) ... 25 more Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457) at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350) at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47) at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:280) ... 36 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455) ... 39 more Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36) at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689) ... 43 more Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32) ... 46 more **/edit** Unfortunately the issue remains with 4.4.0-HBase-0.98. Below are the classes in question. Since the saveToPhoenix() method is not yet available for the Java API and since this is just a POC, my idea was to simply use the JDBC driver for each mini-batch. public class PhoenixConnection implements AutoCloseable, Serializable { private static final long serialVersionUID = -4491057264383873689L; private static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"; static { try { Class.forName(PHOENIX_DRIVER); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } } private Connection connection; public PhoenixConnection(final String jdbcUri) { try { connection = DriverManager.getConnection(jdbcUri); } catch (SQLException e) { throw new RuntimeException(e); } } public List<Map<String, Object>> executeQuery(final String sql) throws SQLException { ArrayList<Map<String, Object>> resultList = new ArrayList<>(); try (PreparedStatement statement = connection.prepareStatement(sql); ResultSet resultSet = statement.executeQuery() ) { ResultSetMetaData metaData = resultSet.getMetaData(); while (resultSet.next()) { Map<String, Object> row = new HashMap<>(metaData.getColumnCount()); for (int column = 0; column < metaData.getColumnCount(); ++column) { final String columnLabel = metaData.getColumnLabel(column); row.put(columnLabel, resultSet.getObject(columnLabel)); } } } resultList.trimToSize(); return resultList; } @Override public void close() { try { connection.close(); } catch (SQLException e) { throw new RuntimeException(e); } } } public class LinePersister implements Function<JavaRDD<String>, Void> { private static final long serialVersionUID = -2529724617108874989L; private static final Logger LOGGER = Logger.getLogger(LinePersister.class); private static final String TABLE_NAME = "mail_events"; private final String jdbcUrl; public LinePersister(String jdbcUrl) { this.jdbcUrl = jdbcUrl; } @Override public Void call(JavaRDD<String> dataSet) throws Exception { LOGGER.info(String.format( "Starting conversion on rdd with %d elements", dataSet.count())); List<Void> collectResult = dataSet.map(new Function<String, Void>() { private static final long serialVersionUID = -6651313541439109868L; @Override public Void call(String line) throws Exception { LOGGER.info("Writing line " + line); Event event = EventParser.parseLine(line); try (PhoenixConnection connection = new PhoenixConnection( jdbcUrl)) { connection.executeQuery(event .createUpsertStatement(TABLE_NAME)); } catch (Exception e) { LOGGER.error("Error while processing line", e); dumpClasspath(this.getClass().getClassLoader()); } return null; } }).collect(); LOGGER.info(String.format("Got %d results: ", collectResult.size())); return null; } public static void dumpClasspath(ClassLoader loader) { LOGGER.info("Classloader " + loader + ":"); if (loader instanceof URLClassLoader) { URLClassLoader ucl = (URLClassLoader)loader; LOGGER.info(Arrays.toString(ucl.getURLs())); } else LOGGER.error("cannot display components as not a URLClassLoader)"); if (loader.getParent() != null) dumpClasspath(loader.getParent()); } } <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>nl.work</groupId> <artifactId>KafkaStreamConsumer</artifactId> <version>1.0</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <spark.version>1.3.1</spark.version> <hibernate.version>4.3.10.Final</hibernate.version> <phoenix.version>4.4.0-HBase-0.98</phoenix.version> <hbase.version>0.98.9-hadoop2</hbase.version> <spark-hbase.version>0.0.2-clabs-spark-1.3.1</spark- hbase.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>${phoenix.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-spark</artifactId> <version>${phoenix.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.cloudera</groupId> <artifactId>spark-hbase</artifactId> <version>${spark-hbase.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> </configuration> </plugin> <!-- <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven- shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> --> </plugins> </build> <repositories> <repository> <id>unknown-jars-temp-repo</id> <name>A temporary repository created by NetBeans for libraries and jars it could not identify. Please replace the dependencies in this repository with correct ones and delete this repository.</name> <url>file:${project.basedir}/lib</url> </repository> </repositories> </project> Cheers, Jeroen [1] http://stackoverflow.com/questions/30639659/apache-phoenix-4-3-1-and-4-4-0-hbase-0-98-on-spark-1-3-1-classnotfoundexceptio [2] https://groups.google.com/forum/#!topic/phoenix-hbase-user/pKnvE1pd_K8 [3] https://gist.github.com/mravi/444afe7f49821819c987#file-phoenixsparkjob-java --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org