Hi Jerry, thanks for your answer. I'm using Spark Streaming for Java, and I only have rudimentary knowledge about Scala, how could I recreate in Java the lazy creation of a singleton object that you propose for Scala? Maybe a static class member in Java for the connection would be the solution?
Thanks again for your help, Best Regards, Juan 2014-07-08 11:44 GMT+02:00 Shao, Saisai <saisai.s...@intel.com>: > I think you can maintain a connection pool or keep the connection as a > long-lived object in executor side (like lazily creating a singleton object > in object { } in Scala), so your task can get this connection each time > executing a task, not creating a new one, that would be good for your > scenario, since create a connection is quite expensive for each task. > > > > Thanks > > Jerry > > > > *From:* Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com] > *Sent:* Tuesday, July 08, 2014 5:19 PM > *To:* Tobias Pfeiffer > *Cc:* user@spark.apache.org > *Subject:* Re: Which is the best way to get a connection to an external > database per task in Spark Streaming? > > > > Hi Tobias, thanks for your help. I understand that with that code we > obtain a database connection per partition, but I also suspect that with > that code a new database connection is created per each execution of the > function used as argument for mapPartitions(). That would be very > inefficient because a new object and a new database connection would be > created for each batch of the DStream. But my knowledge about the lifecycle > of Functions in Spark Streaming is very limited, so maybe I'm wrong, what > do you think? > > Greetings, > > Juan > > > > 2014-07-08 3:30 GMT+02:00 Tobias Pfeiffer <t...@preferred.jp>: > > Juan, > > > > I am doing something similar, just not "insert into SQL database", but > "issue some RPC call". I think mapPartitions() may be helpful to you. You > could do something like > > > > dstream.mapPartitions(iter => { > > val db = new DbConnection() > > // maybe only do the above if !iter.isEmpty > > iter.map(item => { > > db.call(...) > > // do some cleanup if !iter.hasNext here > > item > > }) > > }).count() // force output > > > > Keep in mind though that the whole idea about RDDs is that operations are > idempotent and in theory could be run on multiple hosts (to take the result > from the fastest server) or multiple times (to deal with failures/timeouts) > etc., which is maybe something you want to deal with in your SQL. > > > > Tobias > > > > > > On Tue, Jul 8, 2014 at 3:40 AM, Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > > Hi list, > > I'm writing a Spark Streaming program that reads from a kafka topic, > performs some transformations on the data, and then inserts each record in > a database with foreachRDD. I was wondering which is the best way to handle > the connection to the database so each worker, or even each task, uses a > different connection to the database, and then database inserts/updates > would be performed in parallel. > - I understand that using a final variable in the driver code is not a > good idea because then the communication with the database would be > performed in the driver code, which leads to a bottleneck, according to > http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/ > - I think creating a new connection in the call() method of the Function > passed to foreachRDD is also a bad idea, because then I wouldn't be reusing > the connection to the database for each batch RDD in the DStream > - I'm not sure that a broadcast variable with the connection handler is a > good idea in case the target database is distributed, because if the same > handler is used for all the nodes of the Spark cluster then than could have > a negative effect in the data locality of the connection to the database. > - From > http://apache-spark-user-list.1001560.n3.nabble.com/Database-connection-per-worker-td1280.html > I understand that by using an static variable and referencing it in the > call() method of the Function passed to foreachRDD we get a different > connection per Spark worker, I guess it's because there is a different JVM > per worker. But then all the tasks in the same worker would share the same > database handler object, am I right? > - Another idea is using updateStateByKey() using the database handler as > the state, but I guess that would only work for Serializable database > handlers, and for example not for an org.apache.hadoop.hbase.client.HTable > object. > > So my question is, which is the best way to get a connection to an > external database per task in Spark Streaming? Or at least per worker. In > http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-to-an-inmemory-database-from-Spark-td1343.html > there is a partial solution to this question, but there the database > handler object is missing. This other question > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Shared-hashmaps-td3247.html > is closer to mine, but there is no answer for it yet > > Thanks in advance, > > Greetings, > > Juan > > > > > > >