Re: Spark 2.0 Structured Streaming: sc.parallelize in foreach sink cause Task not serializable error

2016-09-26 Thread Michael Armbrust
The code in ForeachWriter runs on the executors, which means that you are
not allowed to use the SparkContext.  This is probably why you are seeing
that exception.

On Sun, Sep 25, 2016 at 3:20 PM, Jianshi  wrote:

> Dear all:
>
> I am trying out the new released feature of structured streaming in Spark
> 2.0. I use the Structured Streaming to perform windowing by event time. I
> can print out the result in the console.  I would like to write the result
> to  Cassandra database through the foreach sink option. I am trying to use
> the spark-cassandra-connector to save the result. The connector saves rdd
> to
> Cassandra by calling rdd.saveToCassandra(), and this works fine if I
> execute
> the commands in spark-shell. For example:
> import com.datastax.spark.connector._
> val col = sc.parallelize(Seq(("of", 1200), ("the", "863")))
> col.saveToCassandra(keyspace, table)
>
> However, when I use the sc.parallelize inside foreach sink, it raise an
> error. The input file is Json messages with each row like the following:
> {"id": text, "time":timestamp,"hr": int}
>
> Here is my code:
>
> object StructStream {
>   def main(args: Array[String]) {
> val conf = new SparkConf(true).set("spark.cassandra.connection.host",
> "172.31.0.174")
> val sc = new SparkContext(conf)
> val sqlContext = new SQLContext(sc)
> val spark =
> SparkSession.builder.appName("StructuredAverage").getOrCreate()
> import spark.implicits._
>
> val userSchema = new StructType().add("id", "string").add("hr",
> "integer").add("time","timestamp")
> val jsonDF =
> spark.readStream.schema(userSchema).json("hdfs://ec2-
> 52-45-70-95.compute-1.amazonaws.com:9000/test3/")
> val line_count = jsonDF.groupBy(window($"time","2 minutes","1
> minutes"),
> $"id").count().orderBy("window")
>
> import org.apache.spark.sql.ForeachWriter
>
> val writer = new ForeachWriter[org.apache.spark.sql.Row] {
>   override def open(partitionId: Long, version: Long) = true
>   override def process(value: org.apache.spark.sql.Row) = {
> val toRemove = "[]".toSet
> val v_str = value.toString().filterNot(toRemove).split(",")
> val v_df =
> sc.parallelize(Seq(Stick(v_str(2),v_str(3).toInt,v_str(1),v_str(0
> v_df.saveToCassandra("playground","sstest")
> println(v_str(0),v_str(1),v_str(2),v_str(3))}
>   override def close(errorOrNull: Throwable) = ()
> }
>
> val query =
> line_count.writeStream.outputMode("complete").foreach(writer).start()
>
> query.awaitTermination()
>
>   }
>
> }
>
> case class Stick(aid: String, bct:Int, cend: String, dst: String)
>
> *
> The error message looks like this:*
>
> Error:
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:298)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(
> ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:882)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:881)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:881)
> at
> org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply$mcV$sp(Dataset.scala:2117)
> at
> org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply(Dataset.scala:2117)
> at
> org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply(Dataset.scala:2117)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:57)
> at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.
> scala:2532)
> at org.apache.spark.sql.Dataset.foreachPartition(Dataset.
> scala:2116)
> at
> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(
> ForeachSink.scala:69)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$
> spark$sql$execution$streaming$StreamExecution$$runBatch(
> StreamExecution.scala:375)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$
> apache$spark$sql$execution$streaming$StreamExecution$$
> runBatches$1.apply$mcZ$sp(StreamExecution.scala:194)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.
> execute(TriggerExecutor.scala:43)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$
> spark$sql$execution$streaming$StreamExecution$$runBatches(
> StreamExecution.scala:184)
> at
> org.a

Re: Spark 2.0+ Structured Streaming

2016-04-28 Thread Tathagata Das
Hello Benjamin,

Have you take a look at the slides of my talk in Strata San Jose -
http://www.slideshare.net/databricks/taking-spark-streaming-to-the-next-level-with-datasets-and-dataframes
Unfortunately there is not video, as Strata does not upload videos for
everyone.
I presented the same talk at Kafka Summit couple of days back, that will
probably get uploaded soon.
Let me know if you have any more questions.

On Thu, Apr 28, 2016 at 5:19 AM, Benjamin Kim  wrote:

> Can someone explain to me how the new Structured Streaming works in the
> upcoming Spark 2.0+? I’m a little hazy how data will be stored and
> referenced if it can be queried and/or batch processed directly from
> streams and if the data will be append only to or will there be some sort
> of upsert capability available. This almost sounds similar to what AWS
> Kinesis is trying to achieve, but it can only store the data for 24 hours.
> Am I close?
>
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>