The code in ForeachWriter runs on the executors, which means that you are
not allowed to use the SparkContext.  This is probably why you are seeing
that exception.

On Sun, Sep 25, 2016 at 3:20 PM, Jianshi <jszhao...@gmail.com> wrote:

> Dear all:
>
> I am trying out the new released feature of structured streaming in Spark
> 2.0. I use the Structured Streaming to perform windowing by event time. I
> can print out the result in the console.  I would like to write the result
> to  Cassandra database through the foreach sink option. I am trying to use
> the spark-cassandra-connector to save the result. The connector saves rdd
> to
> Cassandra by calling rdd.saveToCassandra(), and this works fine if I
> execute
> the commands in spark-shell. For example:
> import com.datastax.spark.connector._
> val col = sc.parallelize(Seq(("of", 1200), ("the", "863")))
> col.saveToCassandra(keyspace, table)
>
> However, when I use the sc.parallelize inside foreach sink, it raise an
> error. The input file is Json messages with each row like the following:
> {"id": text, "time":timestamp,"hr": int}
>
> Here is my code:
>
> object StructStream {
>   def main(args: Array[String]) {
>     val conf = new SparkConf(true).set("spark.cassandra.connection.host",
> "172.31.0.174")
>     val sc = new SparkContext(conf)
>     val sqlContext = new SQLContext(sc)
>     val spark =
> SparkSession.builder.appName("StructuredAverage").getOrCreate()
>     import spark.implicits._
>
>     val userSchema = new StructType().add("id", "string").add("hr",
> "integer").add("time","timestamp")
>     val jsonDF =
> spark.readStream.schema(userSchema).json("hdfs://ec2-
> 52-45-70-95.compute-1.amazonaws.com:9000/test3/")
>     val line_count = jsonDF.groupBy(window($"time","2 minutes","1
> minutes"),
> $"id").count().orderBy("window")
>
>     import org.apache.spark.sql.ForeachWriter
>
>     val writer = new ForeachWriter[org.apache.spark.sql.Row] {
>       override def open(partitionId: Long, version: Long) = true
>       override def process(value: org.apache.spark.sql.Row) = {
>         val toRemove = "[]".toSet
>         val v_str = value.toString().filterNot(toRemove).split(",")
>         val v_df =
> sc.parallelize(Seq(Stick(v_str(2),v_str(3).toInt,v_str(1),v_str(0))))
>         v_df.saveToCassandra("playground","sstest")
>         println(v_str(0),v_str(1),v_str(2),v_str(3))}
>       override def close(errorOrNull: Throwable) = ()
>     }
>
>     val query =
> line_count.writeStream.outputMode("complete").foreach(writer).start()
>
>     query.awaitTermination()
>
>   }
>
> }
>
> case class Stick(aid: String, bct:Int, cend: String, dst: String)
>
> *
> The error message looks like this:*
>
> Error:
> org.apache.spark.SparkException: Task not serializable
>         at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:298)
>         at
> org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>         at org.apache.spark.util.ClosureCleaner$.clean(
> ClosureCleaner.scala:108)
>         at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
>         at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:882)
>         at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:881)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>         at
> org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>         at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:881)
>         at
> org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply$mcV$sp(Dataset.scala:2117)
>         at
> org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply(Dataset.scala:2117)
>         at
> org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply(Dataset.scala:2117)
>         at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:57)
>         at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.
> scala:2532)
>         at org.apache.spark.sql.Dataset.foreachPartition(Dataset.
> scala:2116)
>         at
> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(
> ForeachSink.scala:69)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$
> spark$sql$execution$streaming$StreamExecution$$runBatch(
> StreamExecution.scala:375)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$
> apache$spark$sql$execution$streaming$StreamExecution$$
> runBatches$1.apply$mcZ$sp(StreamExecution.scala:194)
>         at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.
> execute(TriggerExecutor.scala:43)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$
> spark$sql$execution$streaming$StreamExecution$$runBatches(
> StreamExecution.scala:184)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(
> StreamExecution.scala:120)
> Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
>
>
> *If I remove the sc.parallelize() function, then the code works fine, and
> printed out in the console:*
>
> (2016-10-19 01:16:00.0,2016-10-19 01:18:00.0,user_test,11)
> (2016-10-19 01:17:00.0,2016-10-19 01:19:00.0,user_test,11)
> (2016-10-19 01:18:00.0,2016-10-19 01:20:00.0,user_test,11)
> (2016-10-19 01:19:00.0,2016-10-19 01:21:00.0,user_test,11)
> (2016-10-19 01:20:00.0,2016-10-19 01:22:00.0,user_test,11)
>
>
> Does anyone have an idea of the problem? Or is there another way to save
> the
> result using foreach sink?  Thanks very much.
>
> Best,
> Jianshi
>
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-2-0-Structured-Streaming-sc-
> parallelize-in-foreach-sink-cause-Task-not-serializable-error-tp27791.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to