Re: unserializable object in Spark Streaming context
I get TD's recommendation of sharing a connection among tasks. Now, is there a good way to determine when to close connections? Gino B. On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote: Hi Sean, Thank you. I see your point. What I was thinking is that, do computation in a distributed fashion and do the storing from a single place. But you are right, having multiple DB connections actually is fine. Thanks for answering my questions. That helps me understand the system. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote: On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote: Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Connection-like objects aren't data that can be serialized. What would it mean to share one connection with N workers? that they all connect back to the driver, and through one DB connection there? this defeats the purpose of distributed computing. You want multiple DB connections. You can limit the number of partitions if needed. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent The Function you pass to foreachRDD() has a reference to db though. That's what is making it be serialized. to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB Any executors that run locally might skip the serialization and succeed (?) but I don't think the remote executors can be succeeding.
Re: unserializable object in Spark Streaming context
Thats, a good question. My first reach is timeout. Timing out after 10s of seconds should be sufficient. So there should be a timer in the singleton that runs a check every second, on when the singleton was last used, and closes the connections after a time out. Any attempts to use the connection again will create a new connection. TD On Fri, Jul 18, 2014 at 7:59 PM, Gino Bustelo lbust...@gmail.com wrote: I get TD's recommendation of sharing a connection among tasks. Now, is there a good way to determine when to close connections? Gino B. On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote: Hi Sean, Thank you. I see your point. What I was thinking is that, do computation in a distributed fashion and do the storing from a single place. But you are right, having multiple DB connections actually is fine. Thanks for answering my questions. That helps me understand the system. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote: On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote: Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Connection-like objects aren't data that can be serialized. What would it mean to share one connection with N workers? that they all connect back to the driver, and through one DB connection there? this defeats the purpose of distributed computing. You want multiple DB connections. You can limit the number of partitions if needed. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent The Function you pass to foreachRDD() has a reference to db though. That's what is making it be serialized. to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB Any executors that run locally might skip the serialization and succeed (?) but I don't think the remote executors can be succeeding.
Re: unserializable object in Spark Streaming context
Actually, let me clarify further. There are number of possibilities. 1. The easier, less efficient way is to create a connection object every time you do foreachPartition (as shown in the pseudocode earlier in the thread). For each partition, you create a connection, use it to push a all the records in the partition, and then close it. You dont even need to create a singleton in that case. The cons of this method is - You are not reusing connection across tasks and jobs. So you will be creating a lot of connections to the database, which may or may not be fine. - It will get worse if you partitions are tiny and pushing each partition takes few 100ms or few seconds (as possible with Spark Streaming). 2. The slightly harder, but more efficient way would be to use singletons, which can contain one connection, or maintain a connection pool. Then connections in the pool are created on demand, but not explicitly closed at the end of the task, and are reused across tasks and jobs. In that case, closing the connection would require some kind of timeout mechanism as I explained in the previous post. Care also need to be taken if these connections are threadsafe or not. Hope this helps! TD On Fri, Jul 18, 2014 at 8:14 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Thats, a good question. My first reach is timeout. Timing out after 10s of seconds should be sufficient. So there should be a timer in the singleton that runs a check every second, on when the singleton was last used, and closes the connections after a time out. Any attempts to use the connection again will create a new connection. TD On Fri, Jul 18, 2014 at 7:59 PM, Gino Bustelo lbust...@gmail.com wrote: I get TD's recommendation of sharing a connection among tasks. Now, is there a good way to determine when to close connections? Gino B. On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote: Hi Sean, Thank you. I see your point. What I was thinking is that, do computation in a distributed fashion and do the storing from a single place. But you are right, having multiple DB connections actually is fine. Thanks for answering my questions. That helps me understand the system. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote: On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote: Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Connection-like objects aren't data that can be serialized. What would it mean to share one connection with N workers? that they all connect back to the driver, and through one DB connection there? this defeats the purpose of distributed computing. You want multiple DB connections. You can limit the number of partitions if needed. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent The Function you pass to foreachRDD() has a reference to db though. That's what is making it be serialized. to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB Any executors that run locally might skip the serialization and succeed (?) but I don't think the remote executors can be succeeding.
Re: unserializable object in Spark Streaming context
Could you share some code (or pseudo-code)? Sounds like you're instantiating the JDBC connection in the driver, and using it inside a closure that would be run in a remote executor. That means that the connection object would need to be serializable. If that sounds like what you're doing, it won't work. On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, need some help in this problem. In our use case, we need to continuously insert values into the database. So our approach is to create the jdbc object in the main method and then do the inserting operation in the DStream foreachRDD operation. Is this approach reasonable? Then the problem comes: since we are using com.mysql.jdbc.java, which is unserializable, we keep seeing the notSerializableException. I think that is because Spark Streaming is trying to serialize and then checkpoint the whole class which contains the StreamingContext, not only the StreamingContext object, right? Or other reason to trigger the serialize operation? Any workaround for this? (except not using the com.mysql.jdbc.java) Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 -- Marcelo
Re: unserializable object in Spark Streaming context
And if Marcelo's guess is correct, then the right way to do this would be to lazily / dynamically create the jdbc connection server as a singleton in the workers/executors and use that. Something like this. dstream.foreachRDD(rdd = { rdd.foreachPartition((iterator: Iterator[...]) = { val driver = JDBCDriver.getSingleton() // this will create the single jdbc server in the worker, if it does not exist // loop through iterator to get the records in the partition and use the driver to push them out to the DB } } This will avoid the JDBC server being serialized as part of the closure / DStream checkpoint. TD On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin van...@cloudera.com wrote: Could you share some code (or pseudo-code)? Sounds like you're instantiating the JDBC connection in the driver, and using it inside a closure that would be run in a remote executor. That means that the connection object would need to be serializable. If that sounds like what you're doing, it won't work. On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, need some help in this problem. In our use case, we need to continuously insert values into the database. So our approach is to create the jdbc object in the main method and then do the inserting operation in the DStream foreachRDD operation. Is this approach reasonable? Then the problem comes: since we are using com.mysql.jdbc.java, which is unserializable, we keep seeing the notSerializableException. I think that is because Spark Streaming is trying to serialize and then checkpoint the whole class which contains the StreamingContext, not only the StreamingContext object, right? Or other reason to trigger the serialize operation? Any workaround for this? (except not using the com.mysql.jdbc.java) Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 -- Marcelo
Re: unserializable object in Spark Streaming context
Hi Marcelo and TD, Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB == code == SparkConf sparkConf = new SparkConf().setAppName(balababala); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); final MySQLHelper db = new MySQLHelper(); // this class contain instantiate the jdbc driver. /** /* a few DStream transformations **/ JavaPairDStreamString, MachineState noiseState = machineIdNoise .updateStateByKey(getUpdateFunction()); JavaPairDStreamString, Tuple2MachineState, Integer noiseStateTemperature = noiseState.join(machineIdTemperature); noiseStateTemperature .foreachRDD(new FunctionJavaPairRDDString, Tuple2MachineState, Integer, Void() { @Override public Void call(JavaPairRDDString, Tuple2MachineState, Integer arg0) throws Exception { ListTuple2String, Tuple2MachineState, Integer list = arg0 .collect(); for (Tuple2String, Tuple2MachineState, Integer tuple : list) { String machineId String machineState db.insertAverages(machineId, machineState); } return null; } }); end code === Thank you. If there is no other workaround, I may use TD's approach because it is the only option. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 1:54 PM, Tathagata Das tathagata.das1...@gmail.com wrote: And if Marcelo's guess is correct, then the right way to do this would be to lazily / dynamically create the jdbc connection server as a singleton in the workers/executors and use that. Something like this. dstream.foreachRDD(rdd = { rdd.foreachPartition((iterator: Iterator[...]) = { val driver = JDBCDriver.getSingleton() // this will create the single jdbc server in the worker, if it does not exist // loop through iterator to get the records in the partition and use the driver to push them out to the DB } } This will avoid the JDBC server being serialized as part of the closure / DStream checkpoint. TD On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin van...@cloudera.com wrote: Could you share some code (or pseudo-code)? Sounds like you're instantiating the JDBC connection in the driver, and using it inside a closure that would be run in a remote executor. That means that the connection object would need to be serializable. If that sounds like what you're doing, it won't work. On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, need some help in this problem. In our use case, we need to continuously insert values into the database. So our approach is to create the jdbc object in the main method and then do the inserting operation in the DStream foreachRDD operation. Is this approach reasonable? Then the problem comes: since we are using com.mysql.jdbc.java, which is unserializable, we keep seeing the notSerializableException. I think that is because Spark Streaming is trying to serialize and then checkpoint the whole class which contains the StreamingContext, not only the StreamingContext object, right? Or other reason to trigger the serialize operation? Any workaround for this? (except not using the com.mysql.jdbc.java) Thank you. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 -- Marcelo
Re: unserializable object in Spark Streaming context
Hi Sean, Thank you. I see your point. What I was thinking is that, do computation in a distributed fashion and do the storing from a single place. But you are right, having multiple DB connections actually is fine. Thanks for answering my questions. That helps me understand the system. Cheers, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote: On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote: Thank you for the help. If I use TD's approache, it works and there is no exception. Only drawback is that it will create many connections to the DB, which I was trying to avoid. Connection-like objects aren't data that can be serialized. What would it mean to share one connection with N workers? that they all connect back to the driver, and through one DB connection there? this defeats the purpose of distributed computing. You want multiple DB connections. You can limit the number of partitions if needed. Here is a snapshot of my code. Mark as red for the important code. What I was thinking is that, if I call the collect() method, Spark Streaming will bring the data to the driver and then the db object does not need to be sent The Function you pass to foreachRDD() has a reference to db though. That's what is making it be serialized. to executors. My observation is that, thought exceptions are thrown, the insert function still works. Any thought about that? Also paste the log in case it helps .http://pastebin.com/T1bYvLWB Any executors that run locally might skip the serialization and succeed (?) but I don't think the remote executors can be succeeding.