Hmmm, even i dont understand why TheMain needs to be serializable. It might be cleaner (as in avoid such mysterious closure issues) to actually create a separate sbt/maven project (instead of a shell) and run the streaming application from there.
TD On Thu, Sep 4, 2014 at 10:25 AM, kpeng1 <kpe...@gmail.com> wrote: > Tathagata, > > Thanks for all the help. It looks like the blah method doesn't need to be > wrapped around a serializable object, but the main streaming calls do. I > am currently running everything from spark-shell so I did not have a main > function and object to wrap the streaming, map, and foreach calls. After > wrapping those calls in an object and making that object Serializable, > everything seems to be working. > > import org.apache.spark.SparkConf > import org.apache.spark.streaming.{Seconds, StreamingContext} > import org.apache.spark.streaming.StreamingContext._ > import org.apache.spark.storage.StorageLevel > import org.apache.hadoop.hbase.HBaseConfiguration > import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} > import org.apache.hadoop.hbase.util.Bytes > > object Blaher { > def blah(row: Array[String]) { > val hConf = new HBaseConfiguration() > val hTable = new HTable(hConf, "table") > val thePut = new Put(Bytes.toBytes(row(0))) > thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)), > Bytes.toBytes(row(0))) > hTable.put(thePut) > } > } > > object TheMain extends Serializable{ > def run() { > val ssc = new StreamingContext(sc, Seconds(1)) > val lines = ssc.socketTextStream("localhost", 9977, > StorageLevel.MEMORY_AND_DISK_SER) > val words = lines.map(_.split(",")) > val store = words.foreachRDD(rdd => rdd.foreach(Blaher.blah)) > ssc.start() > } > } > > TheMain.run() > > Though, I don't really understand why TheMain object needs to be > Serializable, but the Blaher object doesn't. > > > > > On Wed, Sep 3, 2014 at 7:59 PM, Tathagata Das [via Apache Spark User List] > <[hidden email] <http://user/SendEmail.jtp?type=node&node=13478&i=0>> > wrote: > >> This is some issue with how Scala computes closures. Here because of the >> function blah it is trying the serialize the whole function that this code >> is part of. Can you define the function blah outside the main function? In >> fact you canTry putting the function in a serializable object. >> >> object BlahFunction extends Serializable { >> >> def blah(row: Array[Byte]) { .... } >> } >> >> On a related note, opening a connection for every record in the RDD is >> pretty inefficient. Use rdd.foreachPartition instead - open the connection, >> write the whole partition, and then close the conneciton. >> >> TD >> >> >> On Wed, Sep 3, 2014 at 4:24 PM, Kevin Peng <[hidden email] >> <http://user/SendEmail.jtp?type=node&node=13406&i=0>> wrote: >> >>> Ted, >>> >>> Here is the full stack trace coming from spark-shell: >>> >>> 14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job >>> streaming job 1409786463000 ms.0 >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> not serializable: java.io.NotSerializableException: >>> org.apache.spark.streaming.StreamingContext >>> >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) >>> >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) >>> >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) >>> >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) >>> >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) >>> >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) >>> >>> at >>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) >>> >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) >>> >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>> >>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>> >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>> >>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>> >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>> >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> >>> Basically, what I am doing on the terminal where I run nc -lk, I type in >>> words separated by commas and hit enter i.e. "bill,ted". >>> >>> >>> On Wed, Sep 3, 2014 at 2:36 PM, Ted Yu <[hidden email] >>> <http://user/SendEmail.jtp?type=node&node=13406&i=1>> wrote: >>> >>>> Adding back user@ >>>> >>>> I am not familiar with the NotSerializableException. Can you show the >>>> full stack trace ? >>>> >>>> See SPARK-1297 for changes you need to make so that Spark works with >>>> hbase 0.98 >>>> >>>> Cheers >>>> >>>> >>>> On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng <[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=13406&i=2>> wrote: >>>> >>>>> Ted, >>>>> >>>>> The hbase-site.xml is in the classpath (had worse issues before... >>>>> until I figured that it wasn't in the path). >>>>> >>>>> I get the following error in the spark-shell: >>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> Task not serializable: java.io.NotSerializableException: >>>>> org.apache.spark.streaming.StreamingContext >>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc >>>>> ... >>>>> >>>>> I also double checked the hbase table, just in case, and nothing new >>>>> is written in there. >>>>> >>>>> I am using hbase version: 0.98.1-cdh5.1.0 the default one with the >>>>> CDH5.1.0 distro. >>>>> >>>>> Thank you for the help. >>>>> >>>>> >>>>> On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu <[hidden email] >>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=3>> wrote: >>>>> >>>>>> Is hbase-site.xml in the classpath ? >>>>>> Do you observe any exception from the code below or in region server >>>>>> log ? >>>>>> >>>>>> Which hbase release are you using ? >>>>>> >>>>>> >>>>>> On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 <[hidden email] >>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=4>> wrote: >>>>>> >>>>>>> I have been trying to understand how spark streaming and hbase >>>>>>> connect, but >>>>>>> have not been successful. What I am trying to do is given a spark >>>>>>> stream, >>>>>>> process that stream and store the results in an hbase table. So far >>>>>>> this is >>>>>>> what I have: >>>>>>> >>>>>>> import org.apache.spark.SparkConf >>>>>>> import org.apache.spark.streaming.{Seconds, StreamingContext} >>>>>>> import org.apache.spark.streaming.StreamingContext._ >>>>>>> import org.apache.spark.storage.StorageLevel >>>>>>> import org.apache.hadoop.hbase.HBaseConfiguration >>>>>>> import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} >>>>>>> import org.apache.hadoop.hbase.util.Bytes >>>>>>> >>>>>>> def blah(row: Array[String]) { >>>>>>> val hConf = new HBaseConfiguration() >>>>>>> val hTable = new HTable(hConf, "table") >>>>>>> val thePut = new Put(Bytes.toBytes(row(0))) >>>>>>> thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)), >>>>>>> Bytes.toBytes(row(0))) >>>>>>> hTable.put(thePut) >>>>>>> } >>>>>>> >>>>>>> val ssc = new StreamingContext(sc, Seconds(1)) >>>>>>> val lines = ssc.socketTextStream("localhost", 9999, >>>>>>> StorageLevel.MEMORY_AND_DISK_SER) >>>>>>> val words = lines.map(_.split(",")) >>>>>>> val store = words.foreachRDD(rdd => rdd.foreach(blah)) >>>>>>> ssc.start() >>>>>>> >>>>>>> I am currently running the above code in spark-shell. I am not sure >>>>>>> what I >>>>>>> am doing wrong. >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> View this message in context: >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html >>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>> Nabble.com. >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe, e-mail: [hidden email] >>>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=5> >>>>>>> For additional commands, e-mail: [hidden email] >>>>>>> <http://user/SendEmail.jtp?type=node&node=13406&i=6> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >> ------------------------------ >> If you reply to this email, your message will be added to the >> discussion below: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13406.html >> To unsubscribe from Spark Streaming into HBase, click here. >> NAML >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > ------------------------------ > View this message in context: Re: Spark Streaming into HBase > <http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13478.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >