That's a good point, thanks. Is there a way to instrument continuous
realtime streaming of data out of a database? If the data keeps changing,
one way to implement extraction would be to keep track of something like
the last-modified timestamp and instrument the query to be 'where
lastmodified > ?'

That would imply running the spark program repetitively on a scheduled
basis. I wonder if it's possible to just continuously stream any updates
out instead, using Spark..

On Thu, Feb 19, 2015 at 10:23 AM, Cody Koeninger <[email protected]> wrote:

> At the beginning of the code, do a query to find the current maximum ID
>
> Don't just put in an arbitrarily large value, or all of your rows will end
> up in 1 spark partition at the beginning of the range.
>
> The question of keys is up to you... all that you need to be able to do is
> write a sql statement that takes 2 numbers to specify the bounds.  Of
> course, a numeric primary key is going to be the most efficient way to do
> that.
>
> On Thu, Feb 19, 2015 at 8:57 AM, Dmitry Goldenberg <
> [email protected]> wrote:
>
>> Yup, I did see that. Good point though, Cody. The mismatch was happening
>> for me when I was trying to get the 'new JdbcRDD' approach going. Once I
>> switched to the 'create' method things are working just fine. Was just able
>> to refactor the 'get connection' logic into a 'DbConnection implements
>> JdbcRDD.ConnectionFactory' and my 'map row' class is still 'MapRow
>> implements org.apache.spark.api.java.function.Function<ResultSet, Row>'.
>>
>> This works fine and makes the driver program tighter. Of course, my next
>> question is, how to work with the lower and upper bound parameters. As in,
>> what if I don't know what the min and max ID values are and just want to
>> extract all data from the table, what should the params be, if that's even
>> supported. And furthermore, what if the primary key on the table is not
>> numeric? or if there's no primary key altogether?
>>
>> The method works fine with lowerBound=0 and upperBound=1000000, for
>> example. But doesn't seem to have a way to say, 'no upper bound' (-1 didn't
>> work).
>>
>> On Wed, Feb 18, 2015 at 11:59 PM, Cody Koeninger <[email protected]>
>> wrote:
>>
>>> Look at the definition of JdbcRDD.create:
>>>
>>>   def create[T](
>>>
>>>       sc: JavaSparkContext,
>>>
>>>       connectionFactory: ConnectionFactory,
>>>
>>>       sql: String,
>>>
>>>       lowerBound: Long,
>>>
>>>       upperBound: Long,
>>>
>>>       numPartitions: Int,
>>>
>>>       mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {
>>>
>>>
>>> JFunction here is the interface org.apache.spark.api.java.function.Function,
>>> not scala Function0
>>>
>>> LIkewise, ConnectionFactory is an interface defined inside JdbcRDD, not
>>> scala Function0
>>>
>>> On Wed, Feb 18, 2015 at 4:50 PM, Dmitry Goldenberg <
>>> [email protected]> wrote:
>>>
>>>> That's exactly what I was doing. However, I ran into runtime issues
>>>> with doing that. For instance, I had a
>>>>
>>>>   public class DbConnection extends AbstractFunction0<Connection>
>>>> implements Serializable
>>>>
>>>> I got a runtime error from Spark complaining that DbConnection wasn't
>>>> an instance of scala.Function0.
>>>>
>>>> I also had a
>>>>
>>>>   public class MapRow extends
>>>> scala.runtime.AbstractFunction1<java.sql.ResultSet, Row> implements
>>>> Serializable
>>>>
>>>> with which I seemed to have more luck.
>>>>
>>>> On Wed, Feb 18, 2015 at 5:32 PM, Cody Koeninger <[email protected]>
>>>> wrote:
>>>>
>>>>> Cant you implement the
>>>>>
>>>>> org.apache.spark.api.java.function.Function
>>>>>
>>>>> interface and pass an instance of that to JdbcRDD.create ?
>>>>>
>>>>> On Wed, Feb 18, 2015 at 3:48 PM, Dmitry Goldenberg <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Cody, you were right, I had a copy and paste snag where I ended up
>>>>>> with a vanilla SparkContext rather than a Java one.  I also had to *not*
>>>>>> use my function subclasses, rather just use anonymous inner classes for 
>>>>>> the
>>>>>> Function stuff and that got things working. I'm fully following
>>>>>> the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically 
>>>>>> verbatim.
>>>>>>
>>>>>> Is there a clean way to refactor out the custom Function classes such
>>>>>> as the one for getting a db connection or mapping ResultSet data to your
>>>>>> own POJO's rather than doing it all inline?
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Is sc there a SparkContext or a JavaSparkContext?  The compilation
>>>>>>> error seems to indicate the former, but JdbcRDD.create expects the 
>>>>>>> latter
>>>>>>>
>>>>>>> On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> I have tried that as well, I get a compile error --
>>>>>>>>
>>>>>>>> [ERROR] ...SparkProto.java:[105,39] error: no suitable method found
>>>>>>>> for create(SparkContext,<anonymous
>>>>>>>> ConnectionFactory>,String,int,int,int,<anonymous
>>>>>>>> Function<ResultSet,Integer>>)
>>>>>>>>
>>>>>>>> The code is a copy and paste:
>>>>>>>>
>>>>>>>>     JavaRDD<Integer> jdbcRDD = JdbcRDD.create(
>>>>>>>>           sc,
>>>>>>>>           new JdbcRDD.ConnectionFactory() {
>>>>>>>>             public Connection getConnection() throws SQLException {
>>>>>>>>               return
>>>>>>>> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
>>>>>>>>             }
>>>>>>>>           },
>>>>>>>>           "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
>>>>>>>>           1, 100, 1,
>>>>>>>>           new Function<ResultSet, Integer>() {
>>>>>>>>             public Integer call(ResultSet r) throws Exception {
>>>>>>>>               return r.getInt(1);
>>>>>>>>             }
>>>>>>>>           }
>>>>>>>>         );
>>>>>>>>
>>>>>>>> The other thing I've tried was to define a static class locally for
>>>>>>>> GetConnection and use the JdbcCreate constructor. This got around the
>>>>>>>> compile issues but blew up at runtime with "NoClassDefFoundError:
>>>>>>>> scala/runtime/AbstractFunction0" !
>>>>>>>>
>>>>>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>(
>>>>>>>> sc,
>>>>>>>> (AbstractFunction0<Connection>) new DbConn(), // had to cast or a
>>>>>>>> compile error
>>>>>>>> SQL_QUERY,
>>>>>>>> 0L,
>>>>>>>> 1000L,
>>>>>>>> 10,
>>>>>>>> new MapRow(),
>>>>>>>> ROW_CLASS_TAG);
>>>>>>>> // DbConn is defined as public static class DbConn extends
>>>>>>>> AbstractFunction0<Connection> implements Serializable
>>>>>>>>
>>>>>>>> On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger <[email protected]
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> That test I linked
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90
>>>>>>>>>
>>>>>>>>> is calling a static method JdbcRDD.create, not new JdbcRDD.  Is
>>>>>>>>> that what you tried doing?
>>>>>>>>>
>>>>>>>>> On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks, Cody. Yes, I originally started off by looking at that
>>>>>>>>>> but I get a compile error if I try and use that approach: constructor
>>>>>>>>>> JdbcRDD in class JdbcRDD<T> cannot be applied to given types.  Not to
>>>>>>>>>> mention that JavaJdbcRDDSuite somehow manages to not pass in the 
>>>>>>>>>> class tag
>>>>>>>>>> (the last argument).
>>>>>>>>>>
>>>>>>>>>> Wonder if it's a JDK version issue, I'm using 1.7.
>>>>>>>>>>
>>>>>>>>>> So I've got this, which doesn't compile
>>>>>>>>>>
>>>>>>>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>(
>>>>>>>>>> new SparkContext(conf),
>>>>>>>>>> new JdbcRDD.ConnectionFactory() {
>>>>>>>>>> public Connection getConnection() throws SQLException {
>>>>>>>>>> Connection conn = null;
>>>>>>>>>> try {
>>>>>>>>>> Class.forName(JDBC_DRIVER);
>>>>>>>>>> conn = DriverManager.getConnection(JDBC_URL, JDBC_USER,
>>>>>>>>>> JDBC_PASSWORD);
>>>>>>>>>> } catch (ClassNotFoundException ex) {
>>>>>>>>>> throw new RuntimeException("Error while loading JDBC driver.",
>>>>>>>>>> ex);
>>>>>>>>>> }
>>>>>>>>>> return conn;
>>>>>>>>>> }
>>>>>>>>>> },
>>>>>>>>>> "SELECT * FROM EMPLOYEES",
>>>>>>>>>> 0L,
>>>>>>>>>> 1000L,
>>>>>>>>>> 10,
>>>>>>>>>> new Function<ResultSet, Row>() {
>>>>>>>>>> public Row call(ResultSet r) throws Exception {
>>>>>>>>>> return null; // have some actual logic here...
>>>>>>>>>> }
>>>>>>>>>> },
>>>>>>>>>> scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));
>>>>>>>>>>
>>>>>>>>>> The other approach was mimicing the DbConnection class from this
>>>>>>>>>> post:
>>>>>>>>>> http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
>>>>>>>>>> It got around any of the compilation issues but then I got the 
>>>>>>>>>> runtime
>>>>>>>>>> error where Spark wouldn't recognize the db connection class as a
>>>>>>>>>> scala.Function0.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Take a look at
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'm reading data from a database using JdbcRDD, in Java, and I
>>>>>>>>>>>> have an
>>>>>>>>>>>> implementation of Function0<Connection> whose instance I supply
>>>>>>>>>>>> as the
>>>>>>>>>>>> 'getConnection' parameter into the JdbcRDD constructor.
>>>>>>>>>>>> Compiles fine.
>>>>>>>>>>>>
>>>>>>>>>>>> The definition of the class/function is as follows:
>>>>>>>>>>>>
>>>>>>>>>>>>   public class GetDbConnection extends
>>>>>>>>>>>> AbstractFunction0<Connection>
>>>>>>>>>>>> implements Serializable
>>>>>>>>>>>>
>>>>>>>>>>>> where scala.runtime.AbstractFunction0 extends scala.Function0.
>>>>>>>>>>>>
>>>>>>>>>>>> At runtime, I get an exception as below. Does anyone have an
>>>>>>>>>>>> idea as to how
>>>>>>>>>>>> to resolve this/work around it? Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>> I'm running Spark 1.2.1 built for Hadoop 2.4.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Exception in thread "main" org.apache.spark.SparkException: Job
>>>>>>>>>>>> aborted due
>>>>>>>>>>>> to stage failure: Task 3 in stage 0.0 failed 1 times, most
>>>>>>>>>>>> recent failure:
>>>>>>>>>>>> Lost task 3.0 in stage 0.0 (TID 3, localhost):
>>>>>>>>>>>> java.lang.ClassCastException:
>>>>>>>>>>>> cannot assign instance of
>>>>>>>>>>>> com.kona.motivis.spark.proto.GetDbConnection to
>>>>>>>>>>>> field
>>>>>>>>>>>> org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection
>>>>>>>>>>>> of
>>>>>>>>>>>> type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>>>>>>>         at
>>>>>>>>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
>>>>>>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>>>>>>>>>
>>>>>>>>>>>> Driver stacktrace:
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>>>>>>>>>>         at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>>>>>>>>>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>>>>>>>>>>>>         at
>>>>>>>>>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>>>>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>>>>>>>         at
>>>>>>>>>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>>>>>>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>>>>>         at
>>>>>>>>>>>>
>>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/JdbcRDD-ClassCastException-with-scala-Function0-tp21707.html
>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>> To unsubscribe, e-mail: [email protected]
>>>>>>>>>>>> For additional commands, e-mail: [email protected]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to