Hi Marcelo and TD,

Thank you for the help. If I use TD's approache, it works and there is no
exception. Only drawback is that it will create many connections to the DB,
which I was trying to avoid.

Here is a snapshot of my code. Mark as red for the important code. What I
was thinking is that, if I call the collect() method, Spark Streaming will
bring the data to the driver and then the db object does not need to be
sent to executors. My observation is that, thought exceptions are thrown,
the insert function still works. Any thought about that? Also paste the log
in case it helps .http://pastebin.com/T1bYvLWB

================== code ==================

       SparkConf sparkConf = new SparkConf().setAppName("balababala");
       JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                new Duration(2000));

        final MySQLHelper db = new MySQLHelper();  // this class contain
instantiate the jdbc driver.

       /**
       /* a few DStream transformations
       **/

        JavaPairDStream<String, MachineState> noiseState = machineIdNoise
                .updateStateByKey(getUpdateFunction());

        JavaPairDStream<String, Tuple2<MachineState, Integer>>
noiseStateTemperature = noiseState.join(machineIdTemperature);

        noiseStateTemperature
        .foreachRDD(new Function<JavaPairRDD<String, Tuple2<MachineState,
Integer>>, Void>() {
            @Override
            public Void call(JavaPairRDD<String, Tuple2<MachineState,
Integer>> arg0)
                    throws Exception {
                List<Tuple2<String, Tuple2<MachineState, Integer>>> list =
arg0
                        .collect();
                for (Tuple2<String, Tuple2<MachineState, Integer>> tuple :
list) {
                    String machineId
                    String machineState
                    db.insertAverages(machineId, machineState);
                }
                return null;
            }
        });

============ end code ===============

Thank you. If there is no other workaround, I may use TD's approach because
it is the only option.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 1:54 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> And if Marcelo's guess is correct, then the right way to do this would be
> to lazily  / dynamically create the jdbc connection server as a singleton
> in the workers/executors and use that. Something like this.
>
>
> dstream.foreachRDD(rdd => {
>    rdd.foreachPartition((iterator: Iterator[...]) => {
>        val driver = JDBCDriver.getSingleton()   // this will create the
> single jdbc server in the worker, if it does not exist
>        // loop through iterator to get the records in the partition and
> use the driver to push them out to the DB
>    }
> }
>
> This will avoid the JDBC server being serialized as part of the closure /
> DStream checkpoint.
>
> TD
>
>
> On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin <van...@cloudera.com>
> wrote:
>
>> Could you share some code (or pseudo-code)?
>>
>> Sounds like you're instantiating the JDBC connection in the driver,
>> and using it inside a closure that would be run in a remote executor.
>> That means that the connection object would need to be serializable.
>> If that sounds like what you're doing, it won't work.
>>
>>
>> On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang <yanfang...@gmail.com> wrote:
>> > Hi guys,
>> >
>> > need some help in this problem. In our use case, we need to continuously
>> > insert values into the database. So our approach is to create the jdbc
>> > object in the main method and then do the inserting operation in the
>> DStream
>> > foreachRDD operation. Is this approach reasonable?
>> >
>> > Then the problem comes: since we are using com.mysql.jdbc.java, which is
>> > unserializable, we keep seeing the notSerializableException. I think
>> that is
>> > because Spark Streaming is trying to serialize and then checkpoint the
>> whole
>> > class which contains the StreamingContext, not only the StreamingContext
>> > object, right? Or other reason to trigger the serialize operation? Any
>> > workaround for this? (except not using the com.mysql.jdbc.java)
>> >
>> > Thank you.
>> >
>> > Cheers,
>> > Fang, Yan
>> > yanfang...@gmail.com
>> > +1 (206) 849-4108
>>
>>
>>
>> --
>> Marcelo
>>
>
>

Reply via email to