That's a good point, thanks. Is there a way to instrument continuous realtime streaming of data out of a database? If the data keeps changing, one way to implement extraction would be to keep track of something like the last-modified timestamp and instrument the query to be 'where lastmodified > ?'
That would imply running the spark program repetitively on a scheduled basis. I wonder if it's possible to just continuously stream any updates out instead, using Spark.. On Thu, Feb 19, 2015 at 10:23 AM, Cody Koeninger <[email protected]> wrote: > At the beginning of the code, do a query to find the current maximum ID > > Don't just put in an arbitrarily large value, or all of your rows will end > up in 1 spark partition at the beginning of the range. > > The question of keys is up to you... all that you need to be able to do is > write a sql statement that takes 2 numbers to specify the bounds. Of > course, a numeric primary key is going to be the most efficient way to do > that. > > On Thu, Feb 19, 2015 at 8:57 AM, Dmitry Goldenberg < > [email protected]> wrote: > >> Yup, I did see that. Good point though, Cody. The mismatch was happening >> for me when I was trying to get the 'new JdbcRDD' approach going. Once I >> switched to the 'create' method things are working just fine. Was just able >> to refactor the 'get connection' logic into a 'DbConnection implements >> JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow >> implements org.apache.spark.api.java.function.Function<ResultSet, Row>'. >> >> This works fine and makes the driver program tighter. Of course, my next >> question is, how to work with the lower and upper bound parameters. As in, >> what if I don't know what the min and max ID values are and just want to >> extract all data from the table, what should the params be, if that's even >> supported. And furthermore, what if the primary key on the table is not >> numeric? or if there's no primary key altogether? >> >> The method works fine with lowerBound=0 and upperBound=1000000, for >> example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't >> work). >> >> On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger <[email protected]> >> wrote: >> >>> Look at the definition of JdbcRDD.create: >>> >>> def create[T]( >>> >>> sc: JavaSparkContext, >>> >>> connectionFactory: ConnectionFactory, >>> >>> sql: String, >>> >>> lowerBound: Long, >>> >>> upperBound: Long, >>> >>> numPartitions: Int, >>> >>> mapRow: JFunction[ResultSet, T]): JavaRDD[T] = { >>> >>> >>> JFunction here is the interface org.apache.spark.api.java.function.Function, >>> not scala Function0 >>> >>> LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not >>> scala Function0 >>> >>> On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg < >>> [email protected]> wrote: >>> >>>> That's exactly what I was doing. However, I ran into runtime issues >>>> with doing that. For instance, I had a >>>> >>>> public class DbConnection extends AbstractFunction0<Connection> >>>> implements Serializable >>>> >>>> I got a runtime error from Spark complaining that DbConnection wasn't >>>> an instance of scala.Function0. >>>> >>>> I also had a >>>> >>>> public class MapRow extends >>>> scala.runtime.AbstractFunction1<java.sql.ResultSet, Row> implements >>>> Serializable >>>> >>>> with which I seemed to have more luck. >>>> >>>> On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger <[email protected]> >>>> wrote: >>>> >>>>> Cant you implement the >>>>> >>>>> org.apache.spark.api.java.function.Function >>>>> >>>>> interface and pass an instance of that to JdbcRDD.create ? >>>>> >>>>> On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg < >>>>> [email protected]> wrote: >>>>> >>>>>> Cody, you were right, I had a copy and paste snag where I ended up >>>>>> with a vanilla SparkContext rather than a Java one. I also had to *not* >>>>>> use my function subclasses, rather just use anonymous inner classes for >>>>>> the >>>>>> Function stuff and that got things working. I'm fully following >>>>>> the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically >>>>>> verbatim. >>>>>> >>>>>> Is there a clean way to refactor out the custom Function classes such >>>>>> as the one for getting a db connection or mapping ResultSet data to your >>>>>> own POJO's rather than doing it all inline? >>>>>> >>>>>> >>>>>> On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Is sc there a SparkContext or a JavaSparkContext? The compilation >>>>>>> error seems to indicate the former, but JdbcRDD.create expects the >>>>>>> latter >>>>>>> >>>>>>> On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> I have tried that as well, I get a compile error -- >>>>>>>> >>>>>>>> [ERROR] ...SparkProto.java:[105,39] error: no suitable method found >>>>>>>> for create(SparkContext,<anonymous >>>>>>>> ConnectionFactory>,String,int,int,int,<anonymous >>>>>>>> Function<ResultSet,Integer>>) >>>>>>>> >>>>>>>> The code is a copy and paste: >>>>>>>> >>>>>>>> JavaRDD<Integer> jdbcRDD = JdbcRDD.create( >>>>>>>> sc, >>>>>>>> new JdbcRDD.ConnectionFactory() { >>>>>>>> public Connection getConnection() throws SQLException { >>>>>>>> return >>>>>>>> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"); >>>>>>>> } >>>>>>>> }, >>>>>>>> "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", >>>>>>>> 1, 100, 1, >>>>>>>> new Function<ResultSet, Integer>() { >>>>>>>> public Integer call(ResultSet r) throws Exception { >>>>>>>> return r.getInt(1); >>>>>>>> } >>>>>>>> } >>>>>>>> ); >>>>>>>> >>>>>>>> The other thing I've tried was to define a static class locally for >>>>>>>> GetConnection and use the JdbcCreate constructor. This got around the >>>>>>>> compile issues but blew up at runtime with "NoClassDefFoundError: >>>>>>>> scala/runtime/AbstractFunction0" ! >>>>>>>> >>>>>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>( >>>>>>>> sc, >>>>>>>> (AbstractFunction0<Connection>) new DbConn(), // had to cast or a >>>>>>>> compile error >>>>>>>> SQL_QUERY, >>>>>>>> 0L, >>>>>>>> 1000L, >>>>>>>> 10, >>>>>>>> new MapRow(), >>>>>>>> ROW_CLASS_TAG); >>>>>>>> // DbConn is defined as public static class DbConn extends >>>>>>>> AbstractFunction0<Connection> implements Serializable >>>>>>>> >>>>>>>> On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger <[email protected] >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> That test I linked >>>>>>>>> >>>>>>>>> >>>>>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90 >>>>>>>>> >>>>>>>>> is calling a static method JdbcRDD.create, not new JdbcRDD. Is >>>>>>>>> that what you tried doing? >>>>>>>>> >>>>>>>>> On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Thanks, Cody. Yes, I originally started off by looking at that >>>>>>>>>> but I get a compile error if I try and use that approach: constructor >>>>>>>>>> JdbcRDD in class JdbcRDD<T> cannot be applied to given types. Not to >>>>>>>>>> mention that JavaJdbcRDDSuite somehow manages to not pass in the >>>>>>>>>> class tag >>>>>>>>>> (the last argument). >>>>>>>>>> >>>>>>>>>> Wonder if it's a JDK version issue, I'm using 1.7. >>>>>>>>>> >>>>>>>>>> So I've got this, which doesn't compile >>>>>>>>>> >>>>>>>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>( >>>>>>>>>> new SparkContext(conf), >>>>>>>>>> new JdbcRDD.ConnectionFactory() { >>>>>>>>>> public Connection getConnection() throws SQLException { >>>>>>>>>> Connection conn = null; >>>>>>>>>> try { >>>>>>>>>> Class.forName(JDBC_DRIVER); >>>>>>>>>> conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, >>>>>>>>>> JDBC_PASSWORD); >>>>>>>>>> } catch (ClassNotFoundException ex) { >>>>>>>>>> throw new RuntimeException("Error while loading JDBC driver.", >>>>>>>>>> ex); >>>>>>>>>> } >>>>>>>>>> return conn; >>>>>>>>>> } >>>>>>>>>> }, >>>>>>>>>> "SELECT * FROM EMPLOYEES", >>>>>>>>>> 0L, >>>>>>>>>> 1000L, >>>>>>>>>> 10, >>>>>>>>>> new Function<ResultSet, Row>() { >>>>>>>>>> public Row call(ResultSet r) throws Exception { >>>>>>>>>> return null; // have some actual logic here... >>>>>>>>>> } >>>>>>>>>> }, >>>>>>>>>> scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class)); >>>>>>>>>> >>>>>>>>>> The other approach was mimicing the DbConnection class from this >>>>>>>>>> post: >>>>>>>>>> http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/. >>>>>>>>>> It got around any of the compilation issues but then I got the >>>>>>>>>> runtime >>>>>>>>>> error where Spark wouldn't recognize the db connection class as a >>>>>>>>>> scala.Function0. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Take a look at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> I'm reading data from a database using JdbcRDD, in Java, and I >>>>>>>>>>>> have an >>>>>>>>>>>> implementation of Function0<Connection> whose instance I supply >>>>>>>>>>>> as the >>>>>>>>>>>> 'getConnection' parameter into the JdbcRDD constructor. >>>>>>>>>>>> Compiles fine. >>>>>>>>>>>> >>>>>>>>>>>> The definition of the class/function is as follows: >>>>>>>>>>>> >>>>>>>>>>>> public class GetDbConnection extends >>>>>>>>>>>> AbstractFunction0<Connection> >>>>>>>>>>>> implements Serializable >>>>>>>>>>>> >>>>>>>>>>>> where scala.runtime.AbstractFunction0 extends scala.Function0. >>>>>>>>>>>> >>>>>>>>>>>> At runtime, I get an exception as below. Does anyone have an >>>>>>>>>>>> idea as to how >>>>>>>>>>>> to resolve this/work around it? Thanks. >>>>>>>>>>>> >>>>>>>>>>>> I'm running Spark 1.2.1 built for Hadoop 2.4. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Exception in thread "main" org.apache.spark.SparkException: Job >>>>>>>>>>>> aborted due >>>>>>>>>>>> to stage failure: Task 3 in stage 0.0 failed 1 times, most >>>>>>>>>>>> recent failure: >>>>>>>>>>>> Lost task 3.0 in stage 0.0 (TID 3, localhost): >>>>>>>>>>>> java.lang.ClassCastException: >>>>>>>>>>>> cannot assign instance of >>>>>>>>>>>> com.kona.motivis.spark.proto.GetDbConnection to >>>>>>>>>>>> field >>>>>>>>>>>> org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection >>>>>>>>>>>> of >>>>>>>>>>>> type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>>>>>>>>>> at >>>>>>>>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) >>>>>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>>>> at java.lang.Thread.run(Thread.java:744) >>>>>>>>>>>> >>>>>>>>>>>> Driver stacktrace: >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org >>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >>>>>>>>>>>> at scala.Option.foreach(Option.scala:236) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) >>>>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) >>>>>>>>>>>> at >>>>>>>>>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>>>>>>> at >>>>>>>>>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>>>>>> at >>>>>>>>>>>> >>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> View this message in context: >>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/JdbcRDD-ClassCastException-with-scala-Function0-tp21707.html >>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>>> Nabble.com. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>>> To unsubscribe, e-mail: [email protected] >>>>>>>>>>>> For additional commands, e-mail: [email protected] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
