Re: [akka-user] org.apache.spark.SparkException: Task not serializable

2017-08-04 Thread Patrik Nordwall
Perhaps try to define props factory method in companion object, see
recommended practises in docs
http://doc.akka.io/docs/akka/current/scala/actors.html#creating-actors

/Patrik
lör 5 aug. 2017 kl. 08:07 skrev 周梦想 :

> thank you. it is cause by spark streaming closure, if it reference to
> member variable or method, it needs the object and all it's member to be
> serialized  . but I don't know how to solve the problem.
>
> 在 2017年8月4日星期五 UTC+8下午8:35:26,Justin du coeur写道:
>>
>> I don't know Spark, so I don't know quite what this is trying to do, but
>> Actors typically are not serializable -- you send the ActorRef for the
>> Actor, not the Actor itself.  I'm not sure it even makes any sense
>> semantically to try to serialize and send an Actor...
>>
> On Thu, Aug 3, 2017 at 11:49 PM, 周梦想  wrote:
>>
>
>>> my main file
>>> /**
>>>  * Author: zhouhh
>>>  * Date  : 2017-06-26
>>>  *
>>>  * main file,start web server, init akka ,spark streaming
>>>  */
>>> object WebServer extends Directives {
>>>
>>>   val log = Logger(LoggerFactory.getLogger("WebServer"))
>>>   log.info("==enter WebServer init")
>>>
>>>   //init ActorSystem
>>>   implicit val system = ActorSystem("recommed_System")
>>>   implicit val materializer = ActorMaterializer()
>>>   implicit val executionContext: ExecutionContextExecutor =
>>> system.dispatcher
>>>
>>>   //init spark streaming
>>>   val sparkConf: SparkConf = new
>>> SparkConf().setMaster(Config.sparkMaster).setAppName("WebUserActionConsumer")
>>>   val ssc: StreamingContext = new StreamingContext(sparkConf,
>>> Seconds(Config.batchDuration))
>>>
>>>   //init routes
>>>   val routes: Route = BaseRoutes.baseRoutes ~
>>> new RcmdRoutes().rcmdRoutes ~
>>> SimpleRoutes.simpleRoutes
>>>
>>>   def main(args: Array[String]) {
>>>
>>> log.debug("==enter WebServer beginning")
>>>
>>>
>>> val userActConsumerActor = system.actorOf(Props(new
>>> UserActConsumerActor(ssc)), name = "UserActConsumerActor")
>>>
>>> userActConsumerActor ! UserActMsgConsumer
>>>
>>>
>>> val bindingFuture = Http().bindAndHandle(routes, Config.host,
>>> Config.port)
>>>
>>> log.info(s"Server online at ${Config.host}:${Config.port}/\nPress
>>> RETURN to stop...")
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>> log.info(s"Server online at ${Config.host}:${Config.port}/\nPress
>>> RETURN to stop...")
>>>
>>> StdIn.readLine() // let it run until user presses return
>>> bindingFuture
>>>   .flatMap(_.unbind()) // trigger unbinding from the port
>>>   .onComplete(_ => system.terminate()) // and shutdown when done
>>>
>>>   }
>>>
>>>
>>> my spark streaming kafka consumer file
>>>
>>> class UserActConsumerActor(ssc: StreamingContext) extends Actor {
>>>   val logger = Logger(LoggerFactory.getLogger("UserActConsumerActor"))
>>>
>>>   val userActMsgParseActor: ActorRef =
>>> context.actorOf(Props[UserActMsgParseActor], name = "UserActMsgParseActor")
>>>
>>>   def receive = {
>>>
>>> case UserActMsgConsumer => parse()
>>> case e => logger.info(s"error receive of user act msg:  $e")
>>>   }
>>>
>>>   val stream: InputDStream[ConsumerRecord[String, String]] =
>>> KafkaUtils.createDirectStream(
>>> ssc,
>>> PreferConsistent,
>>> Subscribe[String, String](Config.useracttopics.split(","),
>>> Config.kafkaParams)
>>>   )
>>>
>>>   def parse(): Unit = {
>>>
>>> stream.foreachRDD(rdd => rdd.foreach(x => {
>>>   logger.info("==user act value:\n")
>>>   logger.info(x.value()) //value is String
>>>
>>>   Try {
>>> x.value().parseJson.convertTo[UserActMsg]
>>>   } match {
>>> case Success(msg) => userActMsgParseActor ! msg
>>> //case Success(msg) => context.parent ! msg
>>> case Failure(x) => logger.error(s"$x") //println(s"$x") //
>>> //case Success(msg) => println(msg)
>>>
>>>   }
>>>
>>> }))
>>>   }
>>>
>>> }
>>>
>>>
>>> when i run webserver, it will report the error as the Title repeatly.
>>>
>>> 17/08/04 11:40:15 ERROR JobScheduler: Error running job streaming job
>>> 1501818015000 ms.0
>>> org.apache.spark.SparkException: Task not serializable
>>> at
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>>> at
>>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
>>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
>>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>> at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
>>> at
>>> co

Re: [akka-user] Akka and Mesos: Will it blend?

2017-08-04 Thread Konrad “ktoso” Malawski
This is great, thanks for the initiative :-)
We’ll review shortly!

—
Konrad `kto.so` Malawski
Akka  @ Lightbend 

On 4 August 2017 at 22:27:50, Dragos Dascalita Haut (ddrag...@gmail.com)
wrote:

FWIW, I'm reviving this thread to share that I've opened a PR with a
working example with Mesos in akka-samples [1].

I was excited to see that it's actually quite straight forward to get this
working in Mesos, thanks to its Marathon framework, and the APIs
facilitating auto-discovery of the seed-nodes.

For whoever wants to try it, the PR also contains a docker-compose for
spinning a Mesos environment locally, using docker-machine.

HTH,
dragos

[1] - https://github.com/akka/akka-samples/pull/37

On Tuesday, June 23, 2015 at 12:10:04 PM UTC+3, √ wrote:
>
> Worth noting is that ConductR also solves the aforementioned problem (of
> seeding containerized Akka Clusters).
>
> On Mon, Jun 22, 2015 at 11:40 PM, Michael Frank 
> wrote:
>
>> perhaps the Cluster Usage documentation page should be updated with a
>> section containing links to external libraries implementing this glue
>> functionality, similar to how external serializers are mentioned on the
>> Serialization page: http://doc.akka.io/docs/akka/
>> 2.3.11/scala/serialization.html#External_Akka_Serializers
>>
>> The Cluster Usage page is kind of a monster already tho...
>>
>> -Michael
>>
>>
>> On 06/22/15 03:30, i...@virgon.eu wrote:
>>
>> Hello,
>>
>> Me and my team have successfully managed to bootstrap Akka Cluster with
>> Docker and Mesos, and we're actually running automated performance tests
>> against ephemeral clusters using this combination - so it's working *that*
>> well. The thing that really made it tick (and quickly so - since this part
>> of the implementation took mere minutes) was Marathon - it's exporting
>> Mesos slave's external hostname/ip and port and from there it's really
>> typesafe config. Of course that might not be relevant to your situation,
>> but I'd still suggest that you look at how they've done it. If you're
>> interested in more details, I could prioritise writing a blog post
>> describing the setup in more detail.
>>
>> Cheers
>>
>> On Sunday, June 21, 2015 at 9:33:40 AM UTC+2, rkuhn wrote:
>>>
>>> Hi Brian & Greg,
>>>
>>> the bind-host capability does indeed not solve the whole problem, but it
>>> is what is needed from Akka to enable you to do so (as you have done,
>>> thanks for sharing!). It might make sense to create small libraries that
>>> simplify the process for the various clouds out there, but in the end such
>>> a library would not be Akka-specific: it is just needed to obtain a
>>> host:port combination from a container service so that it can then be
>>> passed to another library that makes use of the network (like
>>> akka-cluster). What I’m getting at is that the akka/akka repository does
>>> not necessarily need to contain all the code, it would be enough (or maybe
>>> even better) to document how to pass in these pieces of information
>>> obtained via other means. Of course if large benefits are to be had by
>>> closely integrating then we’ll discuss this on a case by case basis.
>>>
>>> Regards,
>>>
>>> Roland
>>>
>>> 20 jun 2015 kl. 23:16 skrev Brian Topping :
>>>
>>> Interesting responses, thanks!
>>>
>>> As a question to the core team, should the problems Greg raised be
>>> reported as issues? What would be appropriate release target if so?
>>>
>>> Greg, I'm going to be busy with a Java-land client for the next couple
>>> of weeks, but I will do my best to clone your project and build some PRs if
>>> I can.
>>>
>>> Cheers, Brian
>>>
>>> On Jun 18, 2015, at 3:04 PM, tigerfoot  wrote:
>>>
>>> Actually that's not quite right...its not nearly that simple.  These
>>> "oceans" like Mesos need to auto-assign the ports so while, yes, 2.4 gives
>>> dual binding capabilities this is only useful when you know the ports!
>>>
>>> So how do you do that?  That's much more complex.
>>>
>>> I've got a hello-world remote project in github that shows how to do
>>> this on Amazon's container service ECS, using some AWS-specific
>>> capabilities, namely to call an api from within a running Docker to find
>>> the IP of the host its running on (not the Docker-internal IP).  From there
>>> other magic is possible.  I can successfully launch a bunch of Dockers into
>>> ECS and it will figure out what's-what and bind to the dynamic ports ECS
>>> assigns, and from there I can talk to them.
>>>
>>> See here for info: https://github.com/gzoller/docker-exp/tree/remoting
>>>
>>> (NOTE: The clustering branch of this project is still in-work, and will
>>> not function properly yet, but soon will using the same principles.  My
>>> expectations is that a cluster in an ocean will need to use pub-sub or
>>> similar mechanism to identify actors "somewhere" to talk to, but we'll
>>> see...)
>>>
>>> You're going to have to adapt this for Mesos.  If you so, shoot a pull
>>> request to my project so I've got 

Re: [akka-user] org.apache.spark.SparkException: Task not serializable

2017-08-04 Thread 周梦想
thank you. it is cause by spark streaming closure, if it reference to 
member variable or method, it needs the object and all it's member to be 
serialized  . but I don't know how to solve the problem.

在 2017年8月4日星期五 UTC+8下午8:35:26,Justin du coeur写道:
>
> I don't know Spark, so I don't know quite what this is trying to do, but 
> Actors typically are not serializable -- you send the ActorRef for the 
> Actor, not the Actor itself.  I'm not sure it even makes any sense 
> semantically to try to serialize and send an Actor...
>
> On Thu, Aug 3, 2017 at 11:49 PM, 周梦想 > 
> wrote:
>
>>
>> my main file
>> /**
>>  * Author: zhouhh
>>  * Date  : 2017-06-26
>>  *
>>  * main file,start web server, init akka ,spark streaming
>>  */
>> object WebServer extends Directives {
>>
>>   val log = Logger(LoggerFactory.getLogger("WebServer"))
>>   log.info("==enter WebServer init")
>>
>>   //init ActorSystem
>>   implicit val system = ActorSystem("recommed_System")
>>   implicit val materializer = ActorMaterializer()
>>   implicit val executionContext: ExecutionContextExecutor = 
>> system.dispatcher
>>
>>   //init spark streaming
>>   val sparkConf: SparkConf = new 
>> SparkConf().setMaster(Config.sparkMaster).setAppName("WebUserActionConsumer")
>>   val ssc: StreamingContext = new StreamingContext(sparkConf, 
>> Seconds(Config.batchDuration))
>>
>>   //init routes
>>   val routes: Route = BaseRoutes.baseRoutes ~
>> new RcmdRoutes().rcmdRoutes ~
>> SimpleRoutes.simpleRoutes
>>
>>   def main(args: Array[String]) {
>>
>> log.debug("==enter WebServer beginning")
>>
>>
>> val userActConsumerActor = system.actorOf(Props(new 
>> UserActConsumerActor(ssc)), name = "UserActConsumerActor")
>>
>> userActConsumerActor ! UserActMsgConsumer
>>
>>
>> val bindingFuture = Http().bindAndHandle(routes, Config.host, 
>> Config.port)
>>
>> log.info(s"Server online at ${Config.host}:${Config.port}/\nPress 
>> RETURN to stop...")
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> log.info(s"Server online at ${Config.host}:${Config.port}/\nPress 
>> RETURN to stop...")
>>
>> StdIn.readLine() // let it run until user presses return
>> bindingFuture
>>   .flatMap(_.unbind()) // trigger unbinding from the port
>>   .onComplete(_ => system.terminate()) // and shutdown when done
>>
>>   }
>>
>>
>> my spark streaming kafka consumer file
>>
>> class UserActConsumerActor(ssc: StreamingContext) extends Actor {
>>   val logger = Logger(LoggerFactory.getLogger("UserActConsumerActor"))
>>
>>   val userActMsgParseActor: ActorRef = 
>> context.actorOf(Props[UserActMsgParseActor], name = "UserActMsgParseActor")
>>
>>   def receive = {
>>
>> case UserActMsgConsumer => parse()
>> case e => logger.info(s"error receive of user act msg:  $e")
>>   }
>>
>>   val stream: InputDStream[ConsumerRecord[String, String]] = 
>> KafkaUtils.createDirectStream(
>> ssc,
>> PreferConsistent,
>> Subscribe[String, String](Config.useracttopics.split(","), 
>> Config.kafkaParams)
>>   )
>>
>>   def parse(): Unit = {
>>
>> stream.foreachRDD(rdd => rdd.foreach(x => {
>>   logger.info("==user act value:\n")
>>   logger.info(x.value()) //value is String
>>
>>   Try {
>> x.value().parseJson.convertTo[UserActMsg]
>>   } match {
>> case Success(msg) => userActMsgParseActor ! msg
>> //case Success(msg) => context.parent ! msg
>> case Failure(x) => logger.error(s"$x") //println(s"$x") //
>> //case Success(msg) => println(msg)
>>
>>   }
>>
>> }))
>>   }
>>
>> }
>>
>>
>> when i run webserver, it will report the error as the Title repeatly.
>>
>> 17/08/04 11:40:15 ERROR JobScheduler: Error running job streaming job 
>> 1501818015000 ms.0
>> org.apache.spark.SparkException: Task not serializable
>> at 
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>> at 
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
>> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
>> at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>> at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
>> at 
>> com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(UserActConsumerActor.scala:46)
>> at 
>> com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(UserActConsumerActor.scala:46)
>> at 
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
>> at 
>> or

[akka-user] Akka Typed and Futures - is there a pipe-to alternative?

2017-08-04 Thread Qux
Hi, 
I have something like this:

Actor.immutable[Msg]((ctx, msg) =>
 msg match {
 case DoUpdate =>
  val update: Future[Update] = other ? (GetUpdate(_))

  Actor.same
 case Update => 
  //...
  Actor.same
 case _ => 
  //...
  Actor.same
 }
with

trait Msg
case object DoUpdate extends Msg
case object Update extends Msg


whats the best way do pipe the Update Message on completion? Or should I use a 
other pattern in this case?


Thanks!

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka and Mesos: Will it blend?

2017-08-04 Thread Dragos Dascalita Haut
FWIW, I'm reviving this thread to share that I've opened a PR with a 
working example with Mesos in akka-samples [1].

I was excited to see that it's actually quite straight forward to get this 
working in Mesos, thanks to its Marathon framework, and the APIs 
facilitating auto-discovery of the seed-nodes. 

For whoever wants to try it, the PR also contains a docker-compose for 
spinning a Mesos environment locally, using docker-machine. 

HTH,
dragos

[1] - https://github.com/akka/akka-samples/pull/37 

On Tuesday, June 23, 2015 at 12:10:04 PM UTC+3, √ wrote:
>
> Worth noting is that ConductR also solves the aforementioned problem (of 
> seeding containerized Akka Clusters).
>
> On Mon, Jun 22, 2015 at 11:40 PM, Michael Frank  > wrote:
>
>> perhaps the Cluster Usage documentation page should be updated with a 
>> section containing links to external libraries implementing this glue 
>> functionality, similar to how external serializers are mentioned on the 
>> Serialization page: 
>> http://doc.akka.io/docs/akka/2.3.11/scala/serialization.html#External_Akka_Serializers
>>
>> The Cluster Usage page is kind of a monster already tho...
>>
>> -Michael
>>
>>
>> On 06/22/15 03:30, i...@virgon.eu  wrote:
>>
>> Hello, 
>>
>> Me and my team have successfully managed to bootstrap Akka Cluster with 
>> Docker and Mesos, and we're actually running automated performance tests 
>> against ephemeral clusters using this combination - so it's working *that* 
>> well. The thing that really made it tick (and quickly so - since this part 
>> of the implementation took mere minutes) was Marathon - it's exporting 
>> Mesos slave's external hostname/ip and port and from there it's really 
>> typesafe config. Of course that might not be relevant to your situation, 
>> but I'd still suggest that you look at how they've done it. If you're 
>> interested in more details, I could prioritise writing a blog post 
>> describing the setup in more detail.
>>
>> Cheers
>>
>> On Sunday, June 21, 2015 at 9:33:40 AM UTC+2, rkuhn wrote: 
>>>
>>> Hi Brian & Greg, 
>>>
>>> the bind-host capability does indeed not solve the whole problem, but it 
>>> is what is needed from Akka to enable you to do so (as you have done, 
>>> thanks for sharing!). It might make sense to create small libraries that 
>>> simplify the process for the various clouds out there, but in the end such 
>>> a library would not be Akka-specific: it is just needed to obtain a 
>>> host:port combination from a container service so that it can then be 
>>> passed to another library that makes use of the network (like 
>>> akka-cluster). What I’m getting at is that the akka/akka repository does 
>>> not necessarily need to contain all the code, it would be enough (or maybe 
>>> even better) to document how to pass in these pieces of information 
>>> obtained via other means. Of course if large benefits are to be had by 
>>> closely integrating then we’ll discuss this on a case by case basis.
>>>
>>> Regards,
>>>
>>> Roland
>>>
>>> 20 jun 2015 kl. 23:16 skrev Brian Topping :
>>>
>>> Interesting responses, thanks! 
>>>
>>> As a question to the core team, should the problems Greg raised be 
>>> reported as issues? What would be appropriate release target if so? 
>>>
>>> Greg, I'm going to be busy with a Java-land client for the next couple 
>>> of weeks, but I will do my best to clone your project and build some PRs if 
>>> I can.
>>>
>>> Cheers, Brian
>>>
>>> On Jun 18, 2015, at 3:04 PM, tigerfoot  wrote:
>>>
>>> Actually that's not quite right...its not nearly that simple.  These 
>>> "oceans" like Mesos need to auto-assign the ports so while, yes, 2.4 gives 
>>> dual binding capabilities this is only useful when you know the ports! 
>>>
>>> So how do you do that?  That's much more complex.  
>>>
>>> I've got a hello-world remote project in github that shows how to do 
>>> this on Amazon's container service ECS, using some AWS-specific 
>>> capabilities, namely to call an api from within a running Docker to find 
>>> the IP of the host its running on (not the Docker-internal IP).  From there 
>>> other magic is possible.  I can successfully launch a bunch of Dockers into 
>>> ECS and it will figure out what's-what and bind to the dynamic ports ECS 
>>> assigns, and from there I can talk to them. 
>>>
>>> See here for info: https://github.com/gzoller/docker-exp/tree/remoting
>>>
>>> (NOTE: The clustering branch of this project is still in-work, and will 
>>> not function properly yet, but soon will using the same principles.  My 
>>> expectations is that a cluster in an ocean will need to use pub-sub or 
>>> similar mechanism to identify actors "somewhere" to talk to, but we'll 
>>> see...)
>>>
>>> You're going to have to adapt this for Mesos.  If you so, shoot a pull 
>>> request to my project so I've got it. :-)
>>>
>>> Remoting in oceans is ok, but I suspect what you're really shooting for 
>>> will be the clustering fire a bunch of nodes wrapped in Dockers out 
>>> there a

Re: [akka-user] org.apache.spark.SparkException: Task not serializable

2017-08-04 Thread Justin du coeur
I don't know Spark, so I don't know quite what this is trying to do, but
Actors typically are not serializable -- you send the ActorRef for the
Actor, not the Actor itself.  I'm not sure it even makes any sense
semantically to try to serialize and send an Actor...

On Thu, Aug 3, 2017 at 11:49 PM, 周梦想  wrote:

>
> my main file
> /**
>  * Author: zhouhh
>  * Date  : 2017-06-26
>  *
>  * main file,start web server, init akka ,spark streaming
>  */
> object WebServer extends Directives {
>
>   val log = Logger(LoggerFactory.getLogger("WebServer"))
>   log.info("==enter WebServer init")
>
>   //init ActorSystem
>   implicit val system = ActorSystem("recommed_System")
>   implicit val materializer = ActorMaterializer()
>   implicit val executionContext: ExecutionContextExecutor =
> system.dispatcher
>
>   //init spark streaming
>   val sparkConf: SparkConf = new SparkConf().setMaster(Config.
> sparkMaster).setAppName("WebUserActionConsumer")
>   val ssc: StreamingContext = new StreamingContext(sparkConf,
> Seconds(Config.batchDuration))
>
>   //init routes
>   val routes: Route = BaseRoutes.baseRoutes ~
> new RcmdRoutes().rcmdRoutes ~
> SimpleRoutes.simpleRoutes
>
>   def main(args: Array[String]) {
>
> log.debug("==enter WebServer beginning")
>
>
> val userActConsumerActor = system.actorOf(Props(new
> UserActConsumerActor(ssc)), name = "UserActConsumerActor")
>
> userActConsumerActor ! UserActMsgConsumer
>
>
> val bindingFuture = Http().bindAndHandle(routes, Config.host,
> Config.port)
>
> log.info(s"Server online at ${Config.host}:${Config.port}/\nPress
> RETURN to stop...")
>
> ssc.start()
> ssc.awaitTermination()
>
> log.info(s"Server online at ${Config.host}:${Config.port}/\nPress
> RETURN to stop...")
>
> StdIn.readLine() // let it run until user presses return
> bindingFuture
>   .flatMap(_.unbind()) // trigger unbinding from the port
>   .onComplete(_ => system.terminate()) // and shutdown when done
>
>   }
>
>
> my spark streaming kafka consumer file
>
> class UserActConsumerActor(ssc: StreamingContext) extends Actor {
>   val logger = Logger(LoggerFactory.getLogger("UserActConsumerActor"))
>
>   val userActMsgParseActor: ActorRef = 
> context.actorOf(Props[UserActMsgParseActor],
> name = "UserActMsgParseActor")
>
>   def receive = {
>
> case UserActMsgConsumer => parse()
> case e => logger.info(s"error receive of user act msg:  $e")
>   }
>
>   val stream: InputDStream[ConsumerRecord[String, String]] =
> KafkaUtils.createDirectStream(
> ssc,
> PreferConsistent,
> Subscribe[String, String](Config.useracttopics.split(","),
> Config.kafkaParams)
>   )
>
>   def parse(): Unit = {
>
> stream.foreachRDD(rdd => rdd.foreach(x => {
>   logger.info("==user act value:\n")
>   logger.info(x.value()) //value is String
>
>   Try {
> x.value().parseJson.convertTo[UserActMsg]
>   } match {
> case Success(msg) => userActMsgParseActor ! msg
> //case Success(msg) => context.parent ! msg
> case Failure(x) => logger.error(s"$x") //println(s"$x") //
> //case Success(msg) => println(msg)
>
>   }
>
> }))
>   }
>
> }
>
>
> when i run webserver, it will report the error as the Title repeatly.
>
> 17/08/04 11:40:15 ERROR JobScheduler: Error running job streaming job
> 1501818015000 ms.0
> org.apache.spark.SparkException: Task not serializable
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:298)
> at org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
> at com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(
> UserActConsumerActor.scala:46)
> at com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(
> UserActConsumerActor.scala:46)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$
> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$
> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
> at org.apache.spark.streaming.dstream.ForEachDStream$$
> anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> at org.apache.spark.streaming.dstream.ForEachDStream$$
> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> at org.apache.spark.streaming.dstream.ForEa

[akka-user] Akka event bus with typed actor Ask Question

2017-08-04 Thread Politrons


I´m looking in google without success. Anybody knows if it´s possible use 
Akka Event bus to subscribe and send events to typed actors?.

And if you wonder, yes typed actors are mandatory :(

Regards.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.