Re: unserializable object in Spark Streaming context

2014-07-18 Thread Gino Bustelo
I get TD's recommendation of sharing a connection among tasks. Now, is there a 
good way to determine when to close connections? 

Gino B.

 On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote:
 
 Hi Sean,
 
 Thank you. I see your point. What I was thinking is that, do computation in a 
 distributed fashion and do the storing from a single place. But you are 
 right, having multiple DB connections actually is fine.
 
 Thanks for answering my questions. That helps me understand the system.
 
 Cheers,
 
 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108
 
 
 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote:
 On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote:
  Thank you for the help. If I use TD's approache, it works and there is no
  exception. Only drawback is that it will create many connections to the DB,
  which I was trying to avoid.
 
 Connection-like objects aren't data that can be serialized. What would
 it mean to share one connection with N workers? that they all connect
 back to the driver, and through one DB connection there? this defeats
 the purpose of distributed computing. You want multiple DB
 connections. You can limit the number of partitions if needed.
 
 
  Here is a snapshot of my code. Mark as red for the important code. What I
  was thinking is that, if I call the collect() method, Spark Streaming will
  bring the data to the driver and then the db object does not need to be 
  sent
 
 The Function you pass to foreachRDD() has a reference to db though.
 That's what is making it be serialized.
 
  to executors. My observation is that, thought exceptions are thrown, the
  insert function still works. Any thought about that? Also paste the log in
  case it helps .http://pastebin.com/T1bYvLWB
 
 Any executors that run locally might skip the serialization and
 succeed (?) but I don't think the remote executors can be succeeding.
 


Re: unserializable object in Spark Streaming context

2014-07-18 Thread Tathagata Das
Thats, a good question. My first reach is timeout. Timing out after 10s of
seconds should be sufficient. So there should be a timer in the singleton
that runs a check every second, on when the singleton was last used, and
closes the connections after a time out. Any attempts to use the connection
again will create a new connection.

TD


On Fri, Jul 18, 2014 at 7:59 PM, Gino Bustelo lbust...@gmail.com wrote:

 I get TD's recommendation of sharing a connection among tasks. Now, is
 there a good way to determine when to close connections?

 Gino B.

 On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi Sean,

 Thank you. I see your point. What I was thinking is that, do computation
 in a distributed fashion and do the storing from a single place. But you
 are right, having multiple DB connections actually is fine.

 Thanks for answering my questions. That helps me understand the system.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote:

 On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote:
  Thank you for the help. If I use TD's approache, it works and there is
 no
  exception. Only drawback is that it will create many connections to the
 DB,
  which I was trying to avoid.

 Connection-like objects aren't data that can be serialized. What would
 it mean to share one connection with N workers? that they all connect
 back to the driver, and through one DB connection there? this defeats
 the purpose of distributed computing. You want multiple DB
 connections. You can limit the number of partitions if needed.


  Here is a snapshot of my code. Mark as red for the important code. What
 I
  was thinking is that, if I call the collect() method, Spark Streaming
 will
  bring the data to the driver and then the db object does not need to be
 sent

 The Function you pass to foreachRDD() has a reference to db though.
 That's what is making it be serialized.

  to executors. My observation is that, thought exceptions are thrown, the
  insert function still works. Any thought about that? Also paste the log
 in
  case it helps .http://pastebin.com/T1bYvLWB

 Any executors that run locally might skip the serialization and
 succeed (?) but I don't think the remote executors can be succeeding.





Re: unserializable object in Spark Streaming context

2014-07-18 Thread Tathagata Das
Actually, let me clarify further. There are number of possibilities.

1. The easier, less efficient way is to create a connection object every
time you do foreachPartition (as shown in the pseudocode earlier in the
thread). For each partition, you create a connection, use it to push a all
the records in the partition, and then close it. You dont even need to
create a singleton in that case. The cons of this method is
   - You are not reusing connection across tasks and jobs. So you will be
creating a lot of connections to the database, which may or may not be fine.
   - It will get worse if you partitions are tiny and pushing each
partition takes few 100ms or few seconds (as possible with Spark Streaming).

2. The slightly harder, but more efficient way would be to use singletons,
which can contain one connection, or maintain a connection pool. Then
connections in the pool are created on demand, but not explicitly closed at
the end of the task, and are reused across tasks and jobs. In that case,
closing the connection would require some kind of timeout mechanism as I
explained in the previous post. Care also need to be taken if these
connections are threadsafe or not.

Hope this helps!

TD








On Fri, Jul 18, 2014 at 8:14 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Thats, a good question. My first reach is timeout. Timing out after 10s of
 seconds should be sufficient. So there should be a timer in the singleton
 that runs a check every second, on when the singleton was last used, and
 closes the connections after a time out. Any attempts to use the connection
 again will create a new connection.

 TD


 On Fri, Jul 18, 2014 at 7:59 PM, Gino Bustelo lbust...@gmail.com wrote:

 I get TD's recommendation of sharing a connection among tasks. Now, is
 there a good way to determine when to close connections?

 Gino B.

 On Jul 17, 2014, at 7:05 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi Sean,

 Thank you. I see your point. What I was thinking is that, do computation
 in a distributed fashion and do the storing from a single place. But you
 are right, having multiple DB connections actually is fine.

 Thanks for answering my questions. That helps me understand the system.

 Cheers,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108


 On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote:

 On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote:
  Thank you for the help. If I use TD's approache, it works and there is
 no
  exception. Only drawback is that it will create many connections to
 the DB,
  which I was trying to avoid.

 Connection-like objects aren't data that can be serialized. What would
 it mean to share one connection with N workers? that they all connect
 back to the driver, and through one DB connection there? this defeats
 the purpose of distributed computing. You want multiple DB
 connections. You can limit the number of partitions if needed.


  Here is a snapshot of my code. Mark as red for the important code.
 What I
  was thinking is that, if I call the collect() method, Spark Streaming
 will
  bring the data to the driver and then the db object does not need to
 be sent

 The Function you pass to foreachRDD() has a reference to db though.
 That's what is making it be serialized.

  to executors. My observation is that, thought exceptions are thrown,
 the
  insert function still works. Any thought about that? Also paste the
 log in
  case it helps .http://pastebin.com/T1bYvLWB

 Any executors that run locally might skip the serialization and
 succeed (?) but I don't think the remote executors can be succeeding.






Re: unserializable object in Spark Streaming context

2014-07-17 Thread Marcelo Vanzin
Could you share some code (or pseudo-code)?

Sounds like you're instantiating the JDBC connection in the driver,
and using it inside a closure that would be run in a remote executor.
That means that the connection object would need to be serializable.
If that sounds like what you're doing, it won't work.


On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote:
 Hi guys,

 need some help in this problem. In our use case, we need to continuously
 insert values into the database. So our approach is to create the jdbc
 object in the main method and then do the inserting operation in the DStream
 foreachRDD operation. Is this approach reasonable?

 Then the problem comes: since we are using com.mysql.jdbc.java, which is
 unserializable, we keep seeing the notSerializableException. I think that is
 because Spark Streaming is trying to serialize and then checkpoint the whole
 class which contains the StreamingContext, not only the StreamingContext
 object, right? Or other reason to trigger the serialize operation? Any
 workaround for this? (except not using the com.mysql.jdbc.java)

 Thank you.

 Cheers,
 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108



-- 
Marcelo


Re: unserializable object in Spark Streaming context

2014-07-17 Thread Tathagata Das
And if Marcelo's guess is correct, then the right way to do this would be
to lazily  / dynamically create the jdbc connection server as a singleton
in the workers/executors and use that. Something like this.


dstream.foreachRDD(rdd = {
   rdd.foreachPartition((iterator: Iterator[...]) = {
   val driver = JDBCDriver.getSingleton()   // this will create the
single jdbc server in the worker, if it does not exist
   // loop through iterator to get the records in the partition and use
the driver to push them out to the DB
   }
}

This will avoid the JDBC server being serialized as part of the closure /
DStream checkpoint.

TD


On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Could you share some code (or pseudo-code)?

 Sounds like you're instantiating the JDBC connection in the driver,
 and using it inside a closure that would be run in a remote executor.
 That means that the connection object would need to be serializable.
 If that sounds like what you're doing, it won't work.


 On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote:
  Hi guys,
 
  need some help in this problem. In our use case, we need to continuously
  insert values into the database. So our approach is to create the jdbc
  object in the main method and then do the inserting operation in the
 DStream
  foreachRDD operation. Is this approach reasonable?
 
  Then the problem comes: since we are using com.mysql.jdbc.java, which is
  unserializable, we keep seeing the notSerializableException. I think
 that is
  because Spark Streaming is trying to serialize and then checkpoint the
 whole
  class which contains the StreamingContext, not only the StreamingContext
  object, right? Or other reason to trigger the serialize operation? Any
  workaround for this? (except not using the com.mysql.jdbc.java)
 
  Thank you.
 
  Cheers,
  Fang, Yan
  yanfang...@gmail.com
  +1 (206) 849-4108



 --
 Marcelo



Re: unserializable object in Spark Streaming context

2014-07-17 Thread Yan Fang
Hi Marcelo and TD,

Thank you for the help. If I use TD's approache, it works and there is no
exception. Only drawback is that it will create many connections to the DB,
which I was trying to avoid.

Here is a snapshot of my code. Mark as red for the important code. What I
was thinking is that, if I call the collect() method, Spark Streaming will
bring the data to the driver and then the db object does not need to be
sent to executors. My observation is that, thought exceptions are thrown,
the insert function still works. Any thought about that? Also paste the log
in case it helps .http://pastebin.com/T1bYvLWB

== code ==

   SparkConf sparkConf = new SparkConf().setAppName(balababala);
   JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
new Duration(2000));

final MySQLHelper db = new MySQLHelper();  // this class contain
instantiate the jdbc driver.

   /**
   /* a few DStream transformations
   **/

JavaPairDStreamString, MachineState noiseState = machineIdNoise
.updateStateByKey(getUpdateFunction());

JavaPairDStreamString, Tuple2MachineState, Integer
noiseStateTemperature = noiseState.join(machineIdTemperature);

noiseStateTemperature
.foreachRDD(new FunctionJavaPairRDDString, Tuple2MachineState,
Integer, Void() {
@Override
public Void call(JavaPairRDDString, Tuple2MachineState,
Integer arg0)
throws Exception {
ListTuple2String, Tuple2MachineState, Integer list =
arg0
.collect();
for (Tuple2String, Tuple2MachineState, Integer tuple :
list) {
String machineId
String machineState
db.insertAverages(machineId, machineState);
}
return null;
}
});

 end code ===

Thank you. If there is no other workaround, I may use TD's approach because
it is the only option.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 1:54 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 And if Marcelo's guess is correct, then the right way to do this would be
 to lazily  / dynamically create the jdbc connection server as a singleton
 in the workers/executors and use that. Something like this.


 dstream.foreachRDD(rdd = {
rdd.foreachPartition((iterator: Iterator[...]) = {
val driver = JDBCDriver.getSingleton()   // this will create the
 single jdbc server in the worker, if it does not exist
// loop through iterator to get the records in the partition and
 use the driver to push them out to the DB
}
 }

 This will avoid the JDBC server being serialized as part of the closure /
 DStream checkpoint.

 TD


 On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Could you share some code (or pseudo-code)?

 Sounds like you're instantiating the JDBC connection in the driver,
 and using it inside a closure that would be run in a remote executor.
 That means that the connection object would need to be serializable.
 If that sounds like what you're doing, it won't work.


 On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang yanfang...@gmail.com wrote:
  Hi guys,
 
  need some help in this problem. In our use case, we need to continuously
  insert values into the database. So our approach is to create the jdbc
  object in the main method and then do the inserting operation in the
 DStream
  foreachRDD operation. Is this approach reasonable?
 
  Then the problem comes: since we are using com.mysql.jdbc.java, which is
  unserializable, we keep seeing the notSerializableException. I think
 that is
  because Spark Streaming is trying to serialize and then checkpoint the
 whole
  class which contains the StreamingContext, not only the StreamingContext
  object, right? Or other reason to trigger the serialize operation? Any
  workaround for this? (except not using the com.mysql.jdbc.java)
 
  Thank you.
 
  Cheers,
  Fang, Yan
  yanfang...@gmail.com
  +1 (206) 849-4108



 --
 Marcelo





Re: unserializable object in Spark Streaming context

2014-07-17 Thread Yan Fang
Hi Sean,

Thank you. I see your point. What I was thinking is that, do computation in
a distributed fashion and do the storing from a single place. But you are
right, having multiple DB connections actually is fine.

Thanks for answering my questions. That helps me understand the system.

Cheers,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen so...@cloudera.com wrote:

 On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang yanfang...@gmail.com wrote:
  Thank you for the help. If I use TD's approache, it works and there is no
  exception. Only drawback is that it will create many connections to the
 DB,
  which I was trying to avoid.

 Connection-like objects aren't data that can be serialized. What would
 it mean to share one connection with N workers? that they all connect
 back to the driver, and through one DB connection there? this defeats
 the purpose of distributed computing. You want multiple DB
 connections. You can limit the number of partitions if needed.


  Here is a snapshot of my code. Mark as red for the important code. What I
  was thinking is that, if I call the collect() method, Spark Streaming
 will
  bring the data to the driver and then the db object does not need to be
 sent

 The Function you pass to foreachRDD() has a reference to db though.
 That's what is making it be serialized.

  to executors. My observation is that, thought exceptions are thrown, the
  insert function still works. Any thought about that? Also paste the log
 in
  case it helps .http://pastebin.com/T1bYvLWB

 Any executors that run locally might skip the serialization and
 succeed (?) but I don't think the remote executors can be succeeding.