We are building a wrapper that makes it possible to use reactive streams (i.e. Observable, see reactivex.io) as input to Spark Streaming. We therefore tried to create a custom receiver for Spark. However, the Observable lives at the driver program and is generally not serializable.
Is it possible to create a receiver that runs next to the driver program and therefore does not need to be serialized? --- We tried the following, which gives a `NotSerializableException`: object Main { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("Clock") val ssc = new StreamingContext(conf, Seconds(1)) val data = // A non-serializable stream of data val stream = RxcreateStream(ssc, data) // Mapping, filtering, etc ssc.start() ssc.awaitTermination() } } with the createStream method something like the following: object RxUtils { def createStream[T: ClassTag](scc_ : StreamingContext, observable: Observable[T]): ReceiverInputDStream[T] = { new RxInputDStream[T](scc_, observable, StorageLevel.MEMORY_AND_DISK_SER_2) } } class RxInputDStream[T: ClassTag](ssc_ : StreamingContext, observable: Observable[T], storageLevel: StorageLevel) extends ReceiverInputDStream[T](ssc_) { override def getReceiver(): Receiver[T] = { new RxReceiver(observable, storageLevel) } } class RxReceiver[T](observable: Observable[T], storageLevel: StorageLevel) extends Receiver[T](storageLevel) with Logging { var subscription: Option[Subscription] = None override def onStart(): Unit = { // NOTE: 'observable' is a reference to a variable in the driver program subscription = Some( observable .asInstanceOf[Observable[T]] .subscribe(x => store(x)) ) } } the comment indicates what causes the `NotSerializableException`. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Receive-on-driver-program-without-serializing-tp22291.html Sent from the Apache Spark User List mailing list archive at Nabble.com.