Re: [akka-user] Threads Processing

2017-06-22 Thread Patrik Nordwall
Not sure I understand what you are trying to do, shutting down the
ActorSystem is probably wrong. Perhaps you intended to stop the actor? If
you want at most 12 jobs in parallel you could use a pool router with 12
routees (and don't stop them). Each actor will process its messages
sequentially.

/Patrik

On Thu, Jun 22, 2017 at 1:21 PM, James Matlik 
wrote:

> i suspect the issue is related to how you call
> 'context.system.shutdown()'. Make sure it is called only after all your
> messages have been processed to completion.  As written, it appears to be
> called after the first message is handled. I'm no expert with actors, but I
> would guess the reason it completes 12 messages is due to a race condition
> between the actor logic (parallelism set to 12) and the shutdown.
>
>
> On Jun 22, 2017 4:29 AM, "Abhishek G" 
> wrote:
>
> Hi Akka Team,
>
> I have 100 threads, need to process only 12 threads at a time not more
> than that. After completion of these threads other 12 have to be processed
> and so on but it's processing only first 12 set threads then it terminates
> after that.
> Here is my Logic:
>
> class AkkaProcessing extends Actor {
> def receive = {
> case message: List[Any] =>
> var meterName = message(0)  // It Contains only 12 threads as a set,
> it process them and terminates. Am unable to get remaining threads
>  println("The Akka Meter is :" +meterName)
> val sqlContext = message(1).asInstanceOf[SQLContext]
> val FlagDF = message(2).asInstanceOf[DataFrame]
>
>{
>
>
>All the business logic here
>
>   }
>
>context.system.shutdown()
> }
>   }
> }
>
> object Processing {
> def main(args: Array[String]) = {
> val rawBuff = new ArrayBuffer[Any]()
> val actorSystem = ActorSystem("ActorSystem") // Creating ActorSystem
> val actor = actorSystem.actorOf(Props[Akka
> Processing].withRouter(RoundRobinPool(200)), "my-Actor")
> implicit val executionContext = actorSystem.dispatchers.lookup
> ("akka.actor.my-dispatcher")
>
>   for (i <- 0 until meter_list.length) {
>
> var meterName = meter_list(i) // All 100 Meters here
>
> rawBuff.append(meterName, sqlContext, FlagDF)
> actor ! rawBuff.toList
>}
> }
>   }
>
>
>
>
> Apllication.conf
>
> ActorSystem {
> akka {
> log-config-on-start = on
>   log-dead-letters-during-shutdown = off
>
>   actor {
>   timeouts {
>   verySmallAskDuration = 5000
> }
>
> # Frequency with which stopping actors are prodded in case
> they had to be
> # removed from their parents
> reaper-interval = 7s
>
> provider = "akka.actor.LocalActorRefProvider"
> my-dispatcher {
>   type = Dispatcher
>   executor = "fork-join-executor"
>   fork-join-executor {
># Throughput defines the maximum number of messages
> to be
>   # processed per actor before the thread jumps to the
> next actor.
> # Set to 1 for as fair as possible.
>   throughput = 1000
>  # Min number of threads to cap factor-based
> parallelism number to
>   parallelism-min = 2
>   # Parallelism (threads) ... ceil(available
> processors * factor)
>   parallelism-factor = 3
>   # Max number of threads to cap factor-based
> parallelism number to
>  parallelism-max = 12
>   }
>   }
> }
>  }
>
>akka.actor.deployment {
> /my-Actor {
>   dispatcher = my-dispatcher
>
> }
>   }
> }
>
>  Any help highly appreciated
>
>
> Thanks
> Abhishek
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/c
> urrent/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this 

Re: [akka-user] Threads Processing

2017-06-22 Thread James Matlik
i suspect the issue is related to how you call 'context.system.shutdown()'.
Make sure it is called only after all your messages have been processed to
completion.  As written, it appears to be called after the first message is
handled. I'm no expert with actors, but I would guess the reason it
completes 12 messages is due to a race condition between the actor logic
(parallelism set to 12) and the shutdown.

On Jun 22, 2017 4:29 AM, "Abhishek G"  wrote:

Hi Akka Team,

I have 100 threads, need to process only 12 threads at a time not more than
that. After completion of these threads other 12 have to be processed and
so on but it's processing only first 12 set threads then it terminates
after that.
Here is my Logic:

class AkkaProcessing extends Actor {
def receive = {
case message: List[Any] =>
var meterName = message(0)  // It Contains only 12 threads as a set, it
process them and terminates. Am unable to get remaining threads
 println("The Akka Meter is :" +meterName)
val sqlContext = message(1).asInstanceOf[SQLContext]
val FlagDF = message(2).asInstanceOf[DataFrame]

   {


   All the business logic here

  }

   context.system.shutdown()
}
  }
}

object Processing {
def main(args: Array[String]) = {
val rawBuff = new ArrayBuffer[Any]()
val actorSystem = ActorSystem("ActorSystem") // Creating ActorSystem
val actor =
actorSystem.actorOf(Props[AkkaProcessing].withRouter(RoundRobinPool(200)),
"my-Actor")
implicit val executionContext = actorSystem.dispatchers.
lookup("akka.actor.my-dispatcher")

  for (i <- 0 until meter_list.length) {

var meterName = meter_list(i) // All 100 Meters here

rawBuff.append(meterName, sqlContext, FlagDF)
actor ! rawBuff.toList
   }
}
  }




Apllication.conf

ActorSystem {
akka {
log-config-on-start = on
  log-dead-letters-during-shutdown = off

  actor {
  timeouts {
  verySmallAskDuration = 5000
}

# Frequency with which stopping actors are prodded in case they
had to be
# removed from their parents
reaper-interval = 7s

provider = "akka.actor.LocalActorRefProvider"
my-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
   # Throughput defines the maximum number of messages
to be
  # processed per actor before the thread jumps to the
next actor.
# Set to 1 for as fair as possible.
  throughput = 1000
 # Min number of threads to cap factor-based
parallelism number to
  parallelism-min = 2
  # Parallelism (threads) ... ceil(available processors
* factor)
  parallelism-factor = 3
  # Max number of threads to cap factor-based
parallelism number to
 parallelism-max = 12
  }
  }
}
 }

   akka.actor.deployment {
/my-Actor {
  dispatcher = my-dispatcher

}
  }
}

 Any help highly appreciated


Thanks
Abhishek

-- 
>> Read the docs: http://akka.io/docs/
>> Check the FAQ: http://doc.akka.io/docs/akka/
current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Threads Processing

2017-06-22 Thread Abhishek G
Hi Akka Team,

I have 100 threads, need to process only 12 threads at a time not more than 
that. After completion of these threads other 12 have to be processed and 
so on but it's processing only first 12 set threads then it terminates 
after that.
Here is my Logic:

class AkkaProcessing extends Actor {
def receive = {
case message: List[Any] =>
var meterName = message(0)  // It Contains only 12 threads as a set, it 
process them and terminates. Am unable to get remaining threads
 println("The Akka Meter is :" +meterName)
val sqlContext = message(1).asInstanceOf[SQLContext]
val FlagDF = message(2).asInstanceOf[DataFrame]

   {


   All the business logic here

  }

   context.system.shutdown()
}
  }
}

object Processing { 
def main(args: Array[String]) = {
val rawBuff = new ArrayBuffer[Any]()
val actorSystem = ActorSystem("ActorSystem") // Creating ActorSystem  
val actor = 
actorSystem.actorOf(Props[AkkaProcessing].withRouter(RoundRobinPool(200)), 
"my-Actor")
implicit val executionContext = 
actorSystem.dispatchers.lookup("akka.actor.my-dispatcher")
 
  for (i <- 0 until meter_list.length) {

var meterName = meter_list(i) // All 100 Meters here
   
rawBuff.append(meterName, sqlContext, FlagDF)
actor ! rawBuff.toList
   }
}
  }




Apllication.conf

ActorSystem {
akka {
log-config-on-start = on
  log-dead-letters-during-shutdown = off
  
  actor {
  timeouts {
  verySmallAskDuration = 5000
}

# Frequency with which stopping actors are prodded in case they 
had to be
# removed from their parents
reaper-interval = 7s

provider = "akka.actor.LocalActorRefProvider"
my-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
   # Throughput defines the maximum number of messages 
to be
  # processed per actor before the thread jumps to the 
next actor.
# Set to 1 for as fair as possible.
  throughput = 1000
 # Min number of threads to cap factor-based 
parallelism number to
  parallelism-min = 2
  # Parallelism (threads) ... ceil(available processors 
* factor)
  parallelism-factor = 3
  # Max number of threads to cap factor-based 
parallelism number to
 parallelism-max = 12
  }
  }
}
 }

   akka.actor.deployment {
/my-Actor {
  dispatcher = my-dispatcher
  
}
  }
}

 Any help highly appreciated


Thanks
Abhishek

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.