Re: Spark Streaming + Actors

2014-09-26 Thread Madabhattula Rajesh Kumar
Hi Team,

Could you please respond on my below request.

Regards,
Rajesh



On Thu, Sep 25, 2014 at 11:38 PM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi Team,
>
> Can I use Actors in Spark Streaming based on events type? Could you please
> review below Test program and let me know if any thing I need to change
> with respect to best practices
>
> import akka.actor.Actor
> import akka.actor.{ActorRef, Props}
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.StreamingContext
> import org.apache.spark.streaming.Seconds
> import akka.actor.ActorSystem
>
> case class one(r: org.apache.spark.rdd.RDD[String])
> case class two(s: org.apache.spark.rdd.RDD[String])
>
> class Events extends Actor
> {
>   def receive = {
> // Based on event type - Invoke respective methods asynchronously
> case one(r) => println("ONE COUNT" + r.count) // Invoke respective
> functions
> case two(s) => println("TWO COUNT" + s.count) // Invoke respective
> functions
>   }
> }
>
> object Test {
>
> def main(args: Array[String]) {
> val system = ActorSystem("System")
> val event: ActorRef = system.actorOf(Props[Events], "events")
> val sparkConf = new SparkConf() setAppName("AlertsLinesCount")
> setMaster("local")
> val ssc = new StreamingContext(sparkConf, Seconds(30))
> val lines = ssc
> textFileStream("hdfs://localhost:9000/user/rajesh/EventsDirectory/")
> lines foreachRDD(x => {
>   event ! one(x)
>   event ! two(x)
> })
> ssc.start
> ssc.awaitTermination
> }
> }
>
> Regards,
> Rajesh
>


Spark Streaming + Actors

2014-09-25 Thread Madabhattula Rajesh Kumar
Hi Team,

Can I use Actors in Spark Streaming based on events type? Could you please
review below Test program and let me know if any thing I need to change
with respect to best practices

import akka.actor.Actor
import akka.actor.{ActorRef, Props}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import akka.actor.ActorSystem

case class one(r: org.apache.spark.rdd.RDD[String])
case class two(s: org.apache.spark.rdd.RDD[String])

class Events extends Actor
{
  def receive = {
// Based on event type - Invoke respective methods asynchronously
case one(r) => println("ONE COUNT" + r.count) // Invoke respective
functions
case two(s) => println("TWO COUNT" + s.count) // Invoke respective
functions
  }
}

object Test {

def main(args: Array[String]) {
val system = ActorSystem("System")
val event: ActorRef = system.actorOf(Props[Events], "events")
val sparkConf = new SparkConf() setAppName("AlertsLinesCount")
setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(30))
val lines = ssc
textFileStream("hdfs://localhost:9000/user/rajesh/EventsDirectory/")
lines foreachRDD(x => {
  event ! one(x)
  event ! two(x)
})
ssc.start
ssc.awaitTermination
}
}

Regards,
Rajesh