Hmmm, even i dont understand why TheMain needs to be serializable. It might
be cleaner (as in avoid such mysterious closure issues) to actually create
a separate sbt/maven project (instead of a shell) and run the streaming
application from there.

TD


On Thu, Sep 4, 2014 at 10:25 AM, kpeng1 <kpe...@gmail.com> wrote:

> Tathagata,
>
> Thanks for all the help.  It looks like the blah method doesn't need to be
> wrapped around a serializable object, but the main streaming calls do.  I
> am currently running everything from spark-shell so I did not have a main
> function and object to wrap the streaming, map, and foreach calls.  After
> wrapping those calls in an object and making that object Serializable,
> everything seems to be working.
>
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.storage.StorageLevel
> import org.apache.hadoop.hbase.HBaseConfiguration
> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
> import org.apache.hadoop.hbase.util.Bytes
>
> object Blaher {
>   def blah(row: Array[String]) {
>     val hConf = new HBaseConfiguration()
>     val hTable = new HTable(hConf, "table")
>     val thePut = new Put(Bytes.toBytes(row(0)))
>      thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
> Bytes.toBytes(row(0)))
>     hTable.put(thePut)
>   }
> }
>
> object TheMain extends Serializable{
>   def run() {
>     val ssc = new StreamingContext(sc, Seconds(1))
>     val lines = ssc.socketTextStream("localhost", 9977,
> StorageLevel.MEMORY_AND_DISK_SER)
>     val words = lines.map(_.split(","))
>     val store = words.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>     ssc.start()
>   }
> }
>
> TheMain.run()
>
> Though, I don't really understand why TheMain object needs to be
> Serializable, but the Blaher object doesn't.
>
>
>
>
> On Wed, Sep 3, 2014 at 7:59 PM, Tathagata Das [via Apache Spark User List]
> <[hidden email] <http://user/SendEmail.jtp?type=node&node=13478&i=0>>
> wrote:
>
>> This is some issue with how Scala computes closures. Here because of the
>> function blah it is trying the serialize the whole function that this code
>> is part of. Can you define the function blah outside the main function?  In
>> fact you canTry putting the function in a serializable object.
>>
>> object BlahFunction extends Serializable {
>>
>>    def blah(row: Array[Byte]) { .... }
>> }
>>
>> On a related note, opening a connection for every record in the RDD is
>> pretty inefficient. Use rdd.foreachPartition instead - open the connection,
>> write the whole partition, and then close the conneciton.
>>
>> TD
>>
>>
>> On Wed, Sep 3, 2014 at 4:24 PM, Kevin Peng <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=13406&i=0>> wrote:
>>
>>> Ted,
>>>
>>> Here is the full stack trace coming from spark-shell:
>>>
>>> 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job
>>> streaming job 1409786463000 ms.0
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> not serializable: java.io.NotSerializableException:
>>> org.apache.spark.streaming.StreamingContext
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>>>
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>>>
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> Basically, what I am doing on the terminal where I run nc -lk, I type in
>>> words separated by commas and hit enter i.e. "bill,ted".
>>>
>>>
>>> On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu <[hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=13406&i=1>> wrote:
>>>
>>>> Adding back user@
>>>>
>>>> I am not familiar with the NotSerializableException. Can you show the
>>>> full stack trace ?
>>>>
>>>> See SPARK-1297 for changes you need to make so that Spark works with
>>>> hbase 0.98
>>>>
>>>> Cheers
>>>>
>>>>
>>>> On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=2>> wrote:
>>>>
>>>>> Ted,
>>>>>
>>>>> The hbase-site.xml is in the classpath (had worse issues before...
>>>>> until I figured that it wasn't in the path).
>>>>>
>>>>> I get the following error in the spark-shell:
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task not serializable: java.io.NotSerializableException:
>>>>> org.apache.spark.streaming.StreamingContext
>>>>>         at org.apache.spark.scheduler.DAGScheduler.org
>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc
>>>>> ...
>>>>>
>>>>> I also double checked the hbase table, just in case, and nothing new
>>>>> is written in there.
>>>>>
>>>>> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
>>>>> CDH5.1.0 distro.
>>>>>
>>>>> Thank you for the help.
>>>>>
>>>>>
>>>>> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu <[hidden email]
>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=3>> wrote:
>>>>>
>>>>>> Is hbase-site.xml in the classpath ?
>>>>>> Do you observe any exception from the code below or in region server
>>>>>> log ?
>>>>>>
>>>>>> Which hbase release are you using ?
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 <[hidden email]
>>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=4>> wrote:
>>>>>>
>>>>>>> I have been trying to understand how spark streaming and hbase
>>>>>>> connect, but
>>>>>>> have not been successful. What I am trying to do is given a spark
>>>>>>> stream,
>>>>>>> process that stream and store the results in an hbase table. So far
>>>>>>> this is
>>>>>>> what I have:
>>>>>>>
>>>>>>> import org.apache.spark.SparkConf
>>>>>>> import org.apache.spark.streaming.{Seconds, StreamingContext}
>>>>>>> import org.apache.spark.streaming.StreamingContext._
>>>>>>> import org.apache.spark.storage.StorageLevel
>>>>>>> import org.apache.hadoop.hbase.HBaseConfiguration
>>>>>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
>>>>>>> import org.apache.hadoop.hbase.util.Bytes
>>>>>>>
>>>>>>> def blah(row: Array[String]) {
>>>>>>>   val hConf = new HBaseConfiguration()
>>>>>>>   val hTable = new HTable(hConf, "table")
>>>>>>>   val thePut = new Put(Bytes.toBytes(row(0)))
>>>>>>>   thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)),
>>>>>>> Bytes.toBytes(row(0)))
>>>>>>>   hTable.put(thePut)
>>>>>>> }
>>>>>>>
>>>>>>> val ssc = new StreamingContext(sc, Seconds(1))
>>>>>>> val lines = ssc.socketTextStream("localhost", 9999,
>>>>>>> StorageLevel.MEMORY_AND_DISK_SER)
>>>>>>> val words = lines.map(_.split(","))
>>>>>>> val store = words.foreachRDD(rdd => rdd.foreach(blah))
>>>>>>> ssc.start()
>>>>>>>
>>>>>>> I am currently running the above code in spark-shell. I am not sure
>>>>>>> what I
>>>>>>> am doing wrong.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: [hidden email]
>>>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=5>
>>>>>>> For additional commands, e-mail: [hidden email]
>>>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=6>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13406.html
>>  To unsubscribe from Spark Streaming into HBase, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Spark Streaming into HBase
> <http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13478.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>

Reply via email to