Hi all,

Is there a "best practice" for subscribing to JMS with Spark Streaming?  I
have searched but not found anything conclusive.

In the absence of a standard practice the solution I was thinking of was to
use Akka + Camel (akka.camel.Consumer) to create a subscription for a Spark
Streaming Custom Receiver.  So the actor would look something like this:

class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with
Consumer {
  //e.g. "jms:sonicmq://localhost:2506/queue?destination=SampleQ1"
  def endpointUri = jmsURI
  lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER)

  protected override def onStart() {
    blockGenerator.start
  }

  def receive = {
    case msg: CamelMessage => { blockGenerator += msg.body }
    case _                 => { /* ... */ }
  }

  protected override def onStop() {
    blockGenerator.stop
  }
}

And then in the main application create receivers like this:

val ssc = new StreamingContext(...)
object tascQueue extends JmsReceiver[String](ssc) {
override def getReceiver():JmsReceiver[String] = {
new JmsReceiver("jms:sonicmq://localhost:2506/queue?destination=TascQueue")
}
}
ssc.registerInputStream(tascQueue)

Is this the best way to go?

Best regards,
Patrick

Reply via email to