Re: [akka-user] Cancel Actor job

2018-01-25 Thread Konrad “ktoso” Malawski
Actors may not be that cheap.

They are ~400 bytes for an Actor; plus whatever you put inside one of
course, but that’s your state, not the Actor’s fault ;)

I tried to create a worker actor every time the master actor received a
start job message and found out that it created a new thread to run it.
Seems to me that the default dispatcher is an unbounded thread pool. It
doesn't look good to me -- it falls back to the thread per request model???

That’s not true. Actors share a thread-pool, the pool may decide more
threads would be a good idea ;-)

if you are doing blocking operations, please follow the advice which I
already posted in the previous email, read about managing blocking:

https://doc.akka.io/docs/akka/current/dispatchers.html#blocking-needs-careful-management

Since the jobs are CPU intensive, I have to control the number of
concurrent jobs.

Sure, which is why I suggested the master actor who can control how many
workers are active at any given time.

Number of threads you can limit by following the docs above, in dispatchers.

Here's an update of the code. It works to me. Just want to clarify if this
a right way to do.

// When master actor receive startJob message, creates a new worker actor
with a predefined dispatcher with a fixed thread pool to control the number
of concurrent jobs.
ActorRef worker =
getContext().actorOf(Props.create(Worker.class).withDispatcher("my-blocking-dispatcher"));
worker.tell(job, self());


Yes, that’s the bigger part of it, read above docs to understand why.

-- 
Cheers,
Konrad 'ktoso ' Malawski
Akka  @ Lightbend 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Cancel Actor job

2018-01-25 Thread Richard Gong
Actors may not be that cheap. I tried to create a worker actor every time 
the master actor received a start job message and found out that it created 
a new thread to run it. Seems to me that the default dispatcher is an 
unbounded thread pool. It doesn't look good to me -- it falls back to the 
thread per request model???

Since the jobs are CPU intensive, I have to control the number of 
concurrent jobs. Here's an update of the code. It works to me. Just want to 
clarify if this a right way to do.

// When master actor receive startJob message, creates a new worker actor 
with a predefined dispatcher with a fixed thread pool to control the number 
of concurrent jobs.
ActorRef worker = 
getContext().actorOf(Props.create(Worker.class).withDispatcher("my-blocking-dispatcher"));
worker.tell(job, self());



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Cancel Actor job

2018-01-25 Thread Konrad Malawski
Actors are cheap so you can start one actor (make them child actors if some
actor that manages the work things), so then you don’t have that
multiplexing issue at all.

Remember to put the blocking things onto a dedicated thread pool dispatcher
— read docs about dealing with blocking to see how

-- 
Konrad Malawski

On January 26, 2018 at 3:56:05, Richard Gong (gong...@gmail.com) wrote:

> I'm newbie in Akka. Here's one thing I'm trying to achieve, a simple job
> manager. There are jobs involving SQL queries and CPU intended computation.
> The jobs should be cancellable while they're running. Here's my code to
> simulate it. The worker actor sends "continue" message to itself if no
> cancellation signal. However, the master actor may sends another job to it
> while the current job is still running. Question is how to guarantee the
> jobs don't step on each other? This may not be the way Actors do. Any
> suggestions are very welcome.
>
> import java.io.IOException;
>
> import akka.actor.AbstractActor;
> import akka.actor.ActorRef;
> import akka.actor.ActorSystem;
> import akka.actor.Props;
> import akka.routing.RoundRobinPool;
>
> class StartJob {
> private final String id;
> private final int time;
>
> public StartJob(String id, int time) {
> super();
> this.id = id;
> this.time = time;
> }
>
> public String getId() {
> return id;
> }
>
> public int getTime() {
> return time;
> }
>
> }
>
> class CancelJob {
> private final String id;
>
> public CancelJob(String id) {
> super();
> this.id = id;
> }
>
> public String getId() {
> return id;
> }
>
> }
>
> class Worker extends AbstractActor {
>
> private String jobId;
> private boolean cancelling;
> private int time;
> private int i;
>
> @Override
> public Receive createReceive() {
> return receiveBuilder().match(StartJob.class, job -> {
> jobId = job.getId();
> cancelling = false;
> i = 0;
> time = job.getTime();
> System.out.printf("Worker %s starts with id=%s\n", self(),
> job.getId());
> self().tell("continue", self());
> }).matchEquals("continue", p -> {
> if (cancelling) {
> System.out.printf("Worker %s cancelled.\n", jobId);
> return;
> }
> System.out.printf("Worker %s progress %.0f%%\n", jobId, i *
> 100.0 / time);
> Thread.sleep(1000); // simulate SQL query
> for (int j=0; j<100; j++); // simulate CPU intended job
> i++;
> if (i < time) {
> self().tell("continue", self());
> } else {
> System.out.printf("Worker %s finished.\n", self());
> }
> }).match(CancelJob.class, job -> job.getId().equals(jobId), job ->
> {
> System.out.printf("Worker %s cancelling.\n", job.getId());
> cancelling = true;
> }).build();
> }
>
> }
>
> class Master extends AbstractActor {
>
> private final ActorRef workerRouter;
>
> public Master(int numWorkers) {
> workerRouter =
> this.getContext().actorOf(Props.create(Worker.class).withRouter(new
> RoundRobinPool(numWorkers)),
> "workerRouter");
> }
>
> @Override
> public Receive createReceive() {
> return receiveBuilder().match(StartJob.class, job -> {
> System.out.printf("Submitting job %s\n", job.getId());
> workerRouter.tell(job, self());
> }).match(CancelJob.class, job -> {
> System.out.printf("Cancelling job %s\n", job.getId());
> getContext().actorSelection("workerRouter/*").tell(job,
> self());
> }).build();
> }
> }
>
> public class AnalyticsApp {
> public static void main(String[] args) throws IOException,
> InterruptedException {
> ActorSystem system = ActorSystem.create("JobManager");
>
> ActorRef masterRef = system.actorOf(Props.create(Master.class, 2),
> "job-manager");
> System.out.println("Master: " + masterRef);
>
> for (int i = 0; i < 4; i++) {
> masterRef.tell(new StartJob(Integer.toString(i), 5),
> ActorRef.noSender());
> }
>
> Thread.sleep(1000);
> masterRef.tell(new CancelJob("1"), ActorRef.noSender());
>
> System.out.println(">>> Press ENTER to exit <<<");
> try {
> System.in.read();
> } finally {
> system.terminate();
> }
>
> }
> }
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, 

[akka-user] validating a https server from client using private certificate authorities

2018-01-25 Thread Philippe Derome
Do you have configuration that allows a Akka-Http client application to 
specify the host and port of a certificate authority in addition to the 
public internet one? Failing that, can you provide some guidance on custom 
code that would alleviate much of the needs to go to Javax interface, 
probably in relation to TrustManager classes?

I have a need to verify https certificate on an enterprise private 
internet. The external public Internet certificate authorities are known to 
be blocked outside of corporate firewalls.

Regards,

Phil

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Cancel Actor job

2018-01-25 Thread Richard Gong
I'm newbie in Akka. Here's one thing I'm trying to achieve, a simple job 
manager. There are jobs involving SQL queries and CPU intended computation. 
The jobs should be cancellable while they're running. Here's my code to 
simulate it. The worker actor sends "continue" message to itself if no 
cancellation signal. However, the master actor may sends another job to it 
while the current job is still running. Question is how to guarantee the 
jobs don't step on each other? This may not be the way Actors do. Any 
suggestions are very welcome. 

import java.io.IOException;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.RoundRobinPool;

class StartJob {
private final String id;
private final int time;

public StartJob(String id, int time) {
super();
this.id = id;
this.time = time;
}

public String getId() {
return id;
}

public int getTime() {
return time;
}

}

class CancelJob {
private final String id;

public CancelJob(String id) {
super();
this.id = id;
}

public String getId() {
return id;
}

}

class Worker extends AbstractActor {

private String jobId;
private boolean cancelling;
private int time;
private int i;

@Override
public Receive createReceive() {
return receiveBuilder().match(StartJob.class, job -> {
jobId = job.getId();
cancelling = false;
i = 0;
time = job.getTime();
System.out.printf("Worker %s starts with id=%s\n", self(), 
job.getId());
self().tell("continue", self());
}).matchEquals("continue", p -> {
if (cancelling) {
System.out.printf("Worker %s cancelled.\n", jobId);
return;
}
System.out.printf("Worker %s progress %.0f%%\n", jobId, i * 
100.0 / time);
Thread.sleep(1000); // simulate SQL query
for (int j=0; j<100; j++); // simulate CPU intended job
i++;
if (i < time) {
self().tell("continue", self());
} else {
System.out.printf("Worker %s finished.\n", self());
}
}).match(CancelJob.class, job -> job.getId().equals(jobId), job -> {
System.out.printf("Worker %s cancelling.\n", job.getId());
cancelling = true;
}).build();
}

}

class Master extends AbstractActor {

private final ActorRef workerRouter;

public Master(int numWorkers) {
workerRouter = 
this.getContext().actorOf(Props.create(Worker.class).withRouter(new 
RoundRobinPool(numWorkers)),
"workerRouter");
}

@Override
public Receive createReceive() {
return receiveBuilder().match(StartJob.class, job -> {
System.out.printf("Submitting job %s\n", job.getId());
workerRouter.tell(job, self());
}).match(CancelJob.class, job -> {
System.out.printf("Cancelling job %s\n", job.getId());
getContext().actorSelection("workerRouter/*").tell(job, self());
}).build();
}
}

public class AnalyticsApp {
public static void main(String[] args) throws IOException, 
InterruptedException {
ActorSystem system = ActorSystem.create("JobManager");

ActorRef masterRef = system.actorOf(Props.create(Master.class, 2), 
"job-manager");
System.out.println("Master: " + masterRef);

for (int i = 0; i < 4; i++) {
masterRef.tell(new StartJob(Integer.toString(i), 5), 
ActorRef.noSender());
}

Thread.sleep(1000);
masterRef.tell(new CancelJob("1"), ActorRef.noSender());

System.out.println(">>> Press ENTER to exit <<<");
try {
System.in.read();
} finally {
system.terminate();
}

}
}

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [Akka-Streams] Want to always receive latest element in Sink

2018-01-25 Thread 'Johannes Rudolph' via Akka User List
Hi,

in akka-stream, processing is usually run in a fused fashion, i.e. without 
further configuration one stream will run in a single actor so all 
operations are run sequentially. In such a synchronous scenario, there's 
little room for elements to ever get dropped because the actorRef stage 
basically always needs to wait for the consumer stage to finish before it 
can do its own work. At that point the `foreach` stage already can process 
the next element. Fused processing also means that `Thread.sleep` is bad 
thing to do as it will block stream infrastructure and dispatcher threads 
from doing their work.

Try using `mapAsync()` with `akka.pattern.after` to wait (or actually do 
processing) without blocking infrastructure and it will probably start to 
work.

Johannes


On Thursday, January 25, 2018 at 8:03:11 AM UTC+1, sal...@thoughtworks.com 
wrote:
>
> Hello,
>
> We are having a requirement that if a consumer is slower than producer 
> then discard all the elements that cannot be consumed and whenever the 
> consumer gets ready, feed the latest element from producer.
>
> We tried an approach as follows:
>
> Source.actorRef(0, OverflowStrategy.dropHead)   // actor receives data 
>> at every 10 milliseconds
>
> .runWith {
>>println("data received")
>>Thread.sleep(1000)   // mimic consumer processing data in 
>> every 1 second
>> }
>
>
> We shrank the buffer size to 1 (minimal possible) with following settings
>
> private val actorMaterializerSettings = ActorMaterializerSettings(
>> actorSystem).withInputBuffer(1, 1)
>
>
> With this buffer size, Sink pulls data 1 to consume and data 2 to put in 
> buffer at initialization.
>
> While data 1 is getting processed we are dropping data from producer.
>
> When data 1 gets processed after 1000 milliseconds (1 second) ideally I 
> should receive data 10 (and drop 2 - 9 as consumer is slow) but instead I 
> receive data 2 from the buffer. data 2 in our domain is extremely useless 
> as it is stale.
>
> Is there a way to disable buffer at Sink totally and always pull latest 
> data from Source ?
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Kafka flow shutting down unexpectedly when brokers fail

2018-01-25 Thread 'Michal Borowiecki' via Akka User List
I use RestartSource.withBackoff to recover from broker outages.

https://doc.akka.io/docs/akka/current/stream/stream-error.html#delayed-restarts-with-a-backoff-stage

Hope that helps.
Michał

On Friday, 12 January 2018 22:41:04 UTC, Sean Rohead wrote:
>
> I am using akka-stream-kafka 0.18. I have a flow that reads from one 
> kafka topic, does some processing and then writes to a different kafka 
> topic. The flow has been shutting down intermittently when kafka brokers 
> fail. 
>
> Sometimes the brokers will fail repeatedly over a long period and the flow 
> does not shut down and other times it shuts down as soon as the broker 
> fails the first time. In the logs below, once the message 'Closing the 
> Kafka producer' appears, we no longer receive any messages from the Kafka 
> topic.
>
> Here is the code:
>
>   private val consumerSettings = ConsumerSettings(actorSystem, new 
> ByteArrayDeserializer, new StringDeserializer)
> .withBootstrapServers(bootstrapServers)
> .withGroupId(requestGroup)
> .withProperty(AUTO_OFFSET_RESET_CONFIG, "earliest")
>
>   private val producerSettings = ProducerSettings(actorSystem, new 
> ByteArraySerializer, new StringSerializer)
> .withBootstrapServers(bootstrapServers)
>
>   private def decider(throwable: Throwable): Supervision.Directive = {
> logger.error("Received error in request consumer - restarting", 
> throwable)
> Supervision.Restart
>   }
>
>   private implicit val materializer: Materializer = ActorMaterializer()
>
>   private val parallelism: Int = parallelismFactor * 
> getRuntime.availableProcessors
>
>   private val source = Consumer.committableSource(consumerSettings, 
> Subscriptions.topics(requestTopic))
> .mapAsync(parallelism)(messageProcessor.processMessage)
> .withAttributes(supervisionStrategy(decider))
> 
> .via(Producer.flow(producerSettings).withAttributes(supervisionStrategy(decider)))
> .map(_.message.passThrough)
> .groupedWithin(batchSize, DurationUtils.toFiniteDuration(batchDelay))
> .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, 
> elem) => batch.updated(elem) })
> .mapAsync(parallelism)(_.commitScaladsl())
>   source.runWith(Sink.ignore)
>
> Am I missing something in the code that is necessary to keep the flow 
> running when errors occur?
>
> Here's the config:
>
> akka.kafka.consumer {
>   wakeup-timeout = 10s
>   max-wakeups = 8640
>   kafka-clients {
> reconnect.backoff.ms = 1000
> reconnect.backoff.max.ms = 6
> enable.auto.commit = false
>   }
> }
>
> Here's the logs just before things stop working:
>
> 2018-01-12 16:01:55.899 kafka-coordinator-heartbeat-thread | sherlock INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator Marking the 
> coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) dead for group 
> sherlock
> 2018-01-12 16:02:02.815 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator Discovered 
> coordinator XXX.XXX.XXX.159:6667 (id: 1253353944 rack: null) for group 
> sherlock.
> 2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Revoking 
> previously assigned partitions [dlp_request-9, dlp_request-11, 
> dlp_request-10] for group sherlock
> 2018-01-12 16:02:02.839 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> (Re-)joining group sherlock
> 2018-01-12 16:02:03.015 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> (Re-)joining group sherlock
> 2018-01-12 16:02:04.864 application-akka.actor.default-dispatcher-308541 
> INFO org.apache.kafka.clients.producer.KafkaProducer Closing the Kafka 
> producer with timeoutMillis = 6 ms.
> 2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 
> WARN akka.kafka.KafkaConsumerActor Consumer interrupted with 
> WakeupException after timeout. Message: null. Current value of 
> akka.kafka.consumer.wakeup-timeout is 1 milliseconds
> 2018-01-12 16:02:12.831 application-akka.actor.default-dispatcher-308634 
> WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than 
> `commit-time-warning`: 22991676910 ms
> 2018-01-12 16:02:12.832 application-akka.actor.default-dispatcher-308541 
> WARN akka.kafka.KafkaConsumerActor Kafka commit took longer than 
> `commit-time-warning`: 10016185009 ms
> 2018-01-12 16:02:12.919 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> Successfully joined group sherlock with generation 257
> 2018-01-12 16:02:12.920 application-akka.kafka.default-dispatcher-23 INFO 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Setting 
> newly assigned partitions [dlp_request-9, dlp_request-11, dlp_request-10] 
> for group sherlock
>
>

-- 
>>  

[akka-user] Recommendations on a PoC for system migration from VB.NET to Scala/Akka stack

2018-01-25 Thread Som Shankar Bhattacharyya


I want some suggestions/recommendations on the following.


Today we have a legacy .NET system that gathers information from customers 
environment and reports back to our backend.

Now this system is built the following way,

The client is a VB.Net application.(Usually running outside our network)

The back end is a combination of REST services and traditional web 
services. All in ASP. These services have their database calls etc.


*Motivations:*


i. Migrating the legacy framework is essential simply because of 
maintainace cost.

ii.Our services are going to be hit very heavily this coming year with a 
big increase in clients and reducing intervals of time between client polls 
and i want to showcase how the Scala/Akka stack can provide better 
performance in this regard.  

Now i want to demo advantages of having this work done in the Scala/Akka 
framework.

I am wondering what approach should i take to best highlight the advantages 
of adopting this stack.

I already have a small REST service implemented that does a connect to a 
database and get/set some data. I have implemented that as a Akka HTTP REST 
service.

But i haven’t mimicked the client or the other web services in question.

With my limited knowledge of this stack i might still be able to implement 
a client and a web service but i want to have a focused approach on what to 
highlight and how to do so.


*Initial thoughts:*

i.Write a Scala program with Akka actors. I need actors to abstract the 
different work the client needs to do in parallel. Today we do those using 
a few windows services.

ii.Have the already implemented REST service on a separate machine.

iii.I need to write at least another web service(will it be a better idea 
to write all services as REST ? )

iv.Figure out a way to save data to the database from the services at 
lightning speed. Today we have to write to the file system first and thena  
different service slurps it up and persists to the database. Maybe we can 
have some message broker here that can be used ?


I need someone to discuss this with and come to a focussed work item.


Much thanks !

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.