[akka-user] MergePreferred with different priorities on all inlets

2017-03-20 Thread Sergey Sopin
Hi,

Is there any way to set different priorities on different inlets in 
MergePreferred if it has more that two inlets? 

Thanks!

Regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to stop unstoppable actor?

2017-01-24 Thread Sergey Sopin
Hi Konrad,

Thanks for a tip! But it seems that postStop function executes only after 
flow is finished. How to deal with it? 
Thanks!

Regards, 
Sergey

вторник, 24 января 2017 г., 22:31:41 UTC+2 пользователь Konrad Malawski 
написал:
>
> Add an Akka Streams KillSwitch to your stream and trigger it from the 
> postStop of the actor.
> Read about those concepts in the docs.
>
> -- 
> Konrad Malawski
>
> On 24 January 2017 at 20:02:40, Sergey Sopin (sopi...@gmail.com 
> ) wrote:
>
>> Hi, 
>>
>> I am trying to solve problem with the actor which works too long. In my 
>> case "heavy" actor runs akka-stream which calculates some values, sometimes 
>> this calculation takes too much time and I want to stop it. I supposed that 
>> it will be stopped automatically in case parent actor throw 
>> AskTimeoutException, but I was wrong. Could you please tell me how to stop 
>> this calculation in the most elegant way? 
>>
>> Thank you in advance!
>>
>> Regards,
>> Sergey
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] How to stop unstoppable actor?

2017-01-24 Thread Sergey Sopin
Hi,

I am trying to solve problem with the actor which works too long. In my 
case "heavy" actor runs akka-stream which calculates some values, sometimes 
this calculation takes too much time and I want to stop it. I supposed that 
it will be stopped automatically in case parent actor throw 
AskTimeoutException, but I was wrong. Could you please tell me how to stop 
this calculation in the most elegant way? 

Thank you in advance!

Regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [Akka-streams] Cannot push port twice

2017-01-12 Thread Sergey Sopin
Hi Rafał,

Thank you very much!

Cheers,
Sergey

четверг, 12 января 2017 г., 12:51:23 UTC+2 пользователь Rafał Krzewski 
написал:
>
> Hi,
>
> here's more robust implementation of stream splitter, based on Partition 
> stage: https://gist.github.com/rkrzewski/a0fc5d0b47d9a3e0b2c81435adef3fe7
>
> cheers,
> Rafał
>
> W dniu środa, 11 stycznia 2017 20:26:24 UTC+1 użytkownik Sergey Sopin 
> napisał:
>>
>> Hi again,
>>
>> Rafał, could you please give me an example of how to implement waiting?
>>
>> Thanks!
>>
>> - Sergey
>>
>> среда, 11 января 2017 г., 18:56:00 UTC+2 пользователь Rafał Krzewski 
>> написал:
>>>
>>> The process should look like following:
>>>
>>> 1. Wait for both outlets to pull
>>> 2. Pull from inlet
>>> 2. Wait for the inlet to push an element, make the decision and push it 
>>> to the appropriate outlet
>>> 3. Goto 1
>>>
>>> This way you only ever pull inlet once, and once the element is 
>>> available you can always push it out, since both outlets are available.
>>>
>>> Cheers,
>>> Rafał
>>>
>>> W dniu wtorek, 10 stycznia 2017 19:07:48 UTC+1 użytkownik Sergey Sopin 
>>> napisał:
>>>>
>>>> So, does it make sense? It seems it would be better to allow inlet be 
>>>> pullet once per outlet, not once at all. 
>>>> May be there is some existing solution for the problem above? 
>>>>
>>>>  - Sergey
>>>>
>>>> понедельник, 9 января 2017 г., 17:03:43 UTC+2 пользователь Sergey Sopin 
>>>> написал:
>>>>>
>>>>> Hi Endre,
>>>>>
>>>>> I am not sure if I need it. But it seems, that sometimes my inlet is 
>>>>> being pulled by outlet_0 when, according to conditions, message should be 
>>>>> pushed to outlet_1. In this case my flow get stuck, becase it waits until 
>>>>> outlet_1 will pull it and it doesn't happen (because port is already 
>>>>> pulled 
>>>>> by outlet_0).
>>>>>
>>>>> My flow looks like following: 
>>>>>
>>>>>
>>>>>
>>>>> <https://lh3.googleusercontent.com/-91i8qjm-MXM/WHOla3zvGeI/BQA/t1sEPAi_q6oeMdPWsz4DIlsZdrB8s06QACLcB/s1600/Flow.png>
>>>>>
>>>>>
>>>>> Custom logic of the element (FanOutShape2) can be found in the initial 
>>>>> message. Let me please know if you see something wrong there.
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Regards,
>>>>> Sergey
>>>>>
>>>>> понедельник, 9 января 2017 г., 16:10:14 UTC+2 пользователь drewhk 
>>>>> написал:
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 9, 2017 at 3:03 PM, Sergey Sopin <sopi...@gmail.com> 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi again,
>>>>>>>
>>>>>>> In my case I have 1 inlet and 2 outlets. It seems that inlet port 
>>>>>>> can be pulled only once.
>>>>>>>
>>>>>>
>>>>>> Yes, this is clearly documented here: 
>>>>>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler
>>>>>>
>>>>>> Please look at the state charts of the ports.
>>>>>>  
>>>>>>
>>>>>>> So when two outlets try to pull the same inlet one by one, like in 
>>>>>>> example above, second attempt will fail, because hasBeenPulled(inlet) 
>>>>>>> function will return true. Could you please help me to figure out how 
>>>>>>> to 
>>>>>>> deal with it?
>>>>>>>
>>>>>>
>>>>>> Why do you want to pull again if you have already pulled?
>>>>>>
>>>>>> -Endre
>>>>>>  
>>>>>>
>>>>>>> Thank you in advance!
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Sergey
>>>>>>>
>>>>>>>
>>>>>>> понедельник, 9 января 2017 г., 14:55:22 UTC+2 пользователь Julian 
>>>>>>> Howarth написал:
>>>>>>>>
>>>>>>>> As per 
>>>>>&

Re: [akka-user] Re: [Akka-streams] Cannot push port twice

2017-01-11 Thread Sergey Sopin
Hi again,

Rafał, could you please give me an example of how to implement waiting?

Thanks!

- Sergey

среда, 11 января 2017 г., 18:56:00 UTC+2 пользователь Rafał Krzewski 
написал:
>
> The process should look like following:
>
> 1. Wait for both outlets to pull
> 2. Pull from inlet
> 2. Wait for the inlet to push an element, make the decision and push it to 
> the appropriate outlet
> 3. Goto 1
>
> This way you only ever pull inlet once, and once the element is available 
> you can always push it out, since both outlets are available.
>
> Cheers,
> Rafał
>
> W dniu wtorek, 10 stycznia 2017 19:07:48 UTC+1 użytkownik Sergey Sopin 
> napisał:
>>
>> So, does it make sense? It seems it would be better to allow inlet be 
>> pullet once per outlet, not once at all. 
>> May be there is some existing solution for the problem above? 
>>
>>  - Sergey
>>
>> понедельник, 9 января 2017 г., 17:03:43 UTC+2 пользователь Sergey Sopin 
>> написал:
>>>
>>> Hi Endre,
>>>
>>> I am not sure if I need it. But it seems, that sometimes my inlet is 
>>> being pulled by outlet_0 when, according to conditions, message should be 
>>> pushed to outlet_1. In this case my flow get stuck, becase it waits until 
>>> outlet_1 will pull it and it doesn't happen (because port is already pulled 
>>> by outlet_0).
>>>
>>> My flow looks like following: 
>>>
>>>
>>>
>>> <https://lh3.googleusercontent.com/-91i8qjm-MXM/WHOla3zvGeI/BQA/t1sEPAi_q6oeMdPWsz4DIlsZdrB8s06QACLcB/s1600/Flow.png>
>>>
>>>
>>> Custom logic of the element (FanOutShape2) can be found in the initial 
>>> message. Let me please know if you see something wrong there.
>>>
>>> Thanks!
>>>
>>> Regards,
>>> Sergey
>>>
>>> понедельник, 9 января 2017 г., 16:10:14 UTC+2 пользователь drewhk 
>>> написал:
>>>>
>>>>
>>>>
>>>> On Mon, Jan 9, 2017 at 3:03 PM, Sergey Sopin <sopi...@gmail.com> wrote:
>>>>
>>>>> Hi again,
>>>>>
>>>>> In my case I have 1 inlet and 2 outlets. It seems that inlet port can 
>>>>> be pulled only once.
>>>>>
>>>>
>>>> Yes, this is clearly documented here: 
>>>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler
>>>>
>>>> Please look at the state charts of the ports.
>>>>  
>>>>
>>>>> So when two outlets try to pull the same inlet one by one, like in 
>>>>> example above, second attempt will fail, because hasBeenPulled(inlet) 
>>>>> function will return true. Could you please help me to figure out how to 
>>>>> deal with it?
>>>>>
>>>>
>>>> Why do you want to pull again if you have already pulled?
>>>>
>>>> -Endre
>>>>  
>>>>
>>>>> Thank you in advance!
>>>>>
>>>>> Cheers,
>>>>> Sergey
>>>>>
>>>>>
>>>>> понедельник, 9 января 2017 г., 14:55:22 UTC+2 пользователь Julian 
>>>>> Howarth написал:
>>>>>>
>>>>>> As per 
>>>>>> http://doc.akka.io/docs/akka/2.4.16/java/stream/stream-customize.html#Port_states__AbstractInHandler_and_AbstractOutHandler
>>>>>>  
>>>>>> you can only push to a port if downstream has pulled (ie signalled that 
>>>>>> it 
>>>>>> is ready for data). So, in addition to checking  isClosed(outx), you 
>>>>>> also need to check isAvailable(outx).
>>>>>>
>>>>>>
>>>>>> Alternatively, you can use emit(outx)which takes care of this 
>>>>>> automatically.
>>>>>>
>>>>>> Julian
>>>>>>
>>>>>>
>>>>>> On Monday, January 9, 2017 at 12:46:43 PM UTC, Sergey Sopin wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have created FanOutShape2 shape with cusom logic: 
>>>>>>>
>>>>>>> @Override
>>>>>>> public GraphStageLogic createLogic(Attributes 
>>>>>>> inheritedAttributes)  {
>>>>>>> return new GraphStageLogic(shape) {
>>>>>>> {
>>>>>&

Re: [akka-user] Re: [Akka-streams] Cannot push port twice

2017-01-11 Thread Sergey Sopin
Hi Rafał,

Yeah, you are right! Thank you! I understood.

Cheers,
Sergey

среда, 11 января 2017 г., 18:56:00 UTC+2 пользователь Rafał Krzewski 
написал:
>
> The process should look like following:
>
> 1. Wait for both outlets to pull
> 2. Pull from inlet
> 2. Wait for the inlet to push an element, make the decision and push it to 
> the appropriate outlet
> 3. Goto 1
>
> This way you only ever pull inlet once, and once the element is available 
> you can always push it out, since both outlets are available.
>
> Cheers,
> Rafał
>
> W dniu wtorek, 10 stycznia 2017 19:07:48 UTC+1 użytkownik Sergey Sopin 
> napisał:
>>
>> So, does it make sense? It seems it would be better to allow inlet be 
>> pullet once per outlet, not once at all. 
>> May be there is some existing solution for the problem above? 
>>
>>  - Sergey
>>
>> понедельник, 9 января 2017 г., 17:03:43 UTC+2 пользователь Sergey Sopin 
>> написал:
>>>
>>> Hi Endre,
>>>
>>> I am not sure if I need it. But it seems, that sometimes my inlet is 
>>> being pulled by outlet_0 when, according to conditions, message should be 
>>> pushed to outlet_1. In this case my flow get stuck, becase it waits until 
>>> outlet_1 will pull it and it doesn't happen (because port is already pulled 
>>> by outlet_0).
>>>
>>> My flow looks like following: 
>>>
>>>
>>>
>>> <https://lh3.googleusercontent.com/-91i8qjm-MXM/WHOla3zvGeI/BQA/t1sEPAi_q6oeMdPWsz4DIlsZdrB8s06QACLcB/s1600/Flow.png>
>>>
>>>
>>> Custom logic of the element (FanOutShape2) can be found in the initial 
>>> message. Let me please know if you see something wrong there.
>>>
>>> Thanks!
>>>
>>> Regards,
>>> Sergey
>>>
>>> понедельник, 9 января 2017 г., 16:10:14 UTC+2 пользователь drewhk 
>>> написал:
>>>>
>>>>
>>>>
>>>> On Mon, Jan 9, 2017 at 3:03 PM, Sergey Sopin <sopi...@gmail.com> wrote:
>>>>
>>>>> Hi again,
>>>>>
>>>>> In my case I have 1 inlet and 2 outlets. It seems that inlet port can 
>>>>> be pulled only once.
>>>>>
>>>>
>>>> Yes, this is clearly documented here: 
>>>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler
>>>>
>>>> Please look at the state charts of the ports.
>>>>  
>>>>
>>>>> So when two outlets try to pull the same inlet one by one, like in 
>>>>> example above, second attempt will fail, because hasBeenPulled(inlet) 
>>>>> function will return true. Could you please help me to figure out how to 
>>>>> deal with it?
>>>>>
>>>>
>>>> Why do you want to pull again if you have already pulled?
>>>>
>>>> -Endre
>>>>  
>>>>
>>>>> Thank you in advance!
>>>>>
>>>>> Cheers,
>>>>> Sergey
>>>>>
>>>>>
>>>>> понедельник, 9 января 2017 г., 14:55:22 UTC+2 пользователь Julian 
>>>>> Howarth написал:
>>>>>>
>>>>>> As per 
>>>>>> http://doc.akka.io/docs/akka/2.4.16/java/stream/stream-customize.html#Port_states__AbstractInHandler_and_AbstractOutHandler
>>>>>>  
>>>>>> you can only push to a port if downstream has pulled (ie signalled that 
>>>>>> it 
>>>>>> is ready for data). So, in addition to checking  isClosed(outx), you 
>>>>>> also need to check isAvailable(outx).
>>>>>>
>>>>>>
>>>>>> Alternatively, you can use emit(outx)which takes care of this 
>>>>>> automatically.
>>>>>>
>>>>>> Julian
>>>>>>
>>>>>>
>>>>>> On Monday, January 9, 2017 at 12:46:43 PM UTC, Sergey Sopin wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have created FanOutShape2 shape with cusom logic: 
>>>>>>>
>>>>>>> @Override
>>>>>>> public GraphStageLogic createLogic(Attributes 
>>>>>>> inheritedAttributes)  {
>>>>>>> return new GraphStageLogic(shape) {
>>>>>>> {
>>>>>>>

Re: [akka-user] Re: [Akka-streams] Cannot push port twice

2017-01-11 Thread Sergey Sopin
Hi Rafał,

Could you please tell me how to achieve it? The code is listed in the 
initial message. 
When onPull procedure is called for the out1 Outlet, onPull for out0 is 
already executed and inlet is already in pulled state, so hasBeenPulled 
function returns true for it and it is not possible to pull the inlet.

Regards,
Sergey

среда, 11 января 2017 г., 18:56:00 UTC+2 пользователь Rafał Krzewski 
написал:
>
> The process should look like following:
>
> 1. Wait for both outlets to pull
> 2. Pull from inlet
> 2. Wait for the inlet to push an element, make the decision and push it to 
> the appropriate outlet
> 3. Goto 1
>
> This way you only ever pull inlet once, and once the element is available 
> you can always push it out, since both outlets are available.
>
> Cheers,
> Rafał
>
> W dniu wtorek, 10 stycznia 2017 19:07:48 UTC+1 użytkownik Sergey Sopin 
> napisał:
>>
>> So, does it make sense? It seems it would be better to allow inlet be 
>> pullet once per outlet, not once at all. 
>> May be there is some existing solution for the problem above? 
>>
>>  - Sergey
>>
>> понедельник, 9 января 2017 г., 17:03:43 UTC+2 пользователь Sergey Sopin 
>> написал:
>>>
>>> Hi Endre,
>>>
>>> I am not sure if I need it. But it seems, that sometimes my inlet is 
>>> being pulled by outlet_0 when, according to conditions, message should be 
>>> pushed to outlet_1. In this case my flow get stuck, becase it waits until 
>>> outlet_1 will pull it and it doesn't happen (because port is already pulled 
>>> by outlet_0).
>>>
>>> My flow looks like following: 
>>>
>>>
>>>
>>> <https://lh3.googleusercontent.com/-91i8qjm-MXM/WHOla3zvGeI/BQA/t1sEPAi_q6oeMdPWsz4DIlsZdrB8s06QACLcB/s1600/Flow.png>
>>>
>>>
>>> Custom logic of the element (FanOutShape2) can be found in the initial 
>>> message. Let me please know if you see something wrong there.
>>>
>>> Thanks!
>>>
>>> Regards,
>>> Sergey
>>>
>>> понедельник, 9 января 2017 г., 16:10:14 UTC+2 пользователь drewhk 
>>> написал:
>>>>
>>>>
>>>>
>>>> On Mon, Jan 9, 2017 at 3:03 PM, Sergey Sopin <sopi...@gmail.com> wrote:
>>>>
>>>>> Hi again,
>>>>>
>>>>> In my case I have 1 inlet and 2 outlets. It seems that inlet port can 
>>>>> be pulled only once.
>>>>>
>>>>
>>>> Yes, this is clearly documented here: 
>>>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler
>>>>
>>>> Please look at the state charts of the ports.
>>>>  
>>>>
>>>>> So when two outlets try to pull the same inlet one by one, like in 
>>>>> example above, second attempt will fail, because hasBeenPulled(inlet) 
>>>>> function will return true. Could you please help me to figure out how to 
>>>>> deal with it?
>>>>>
>>>>
>>>> Why do you want to pull again if you have already pulled?
>>>>
>>>> -Endre
>>>>  
>>>>
>>>>> Thank you in advance!
>>>>>
>>>>> Cheers,
>>>>> Sergey
>>>>>
>>>>>
>>>>> понедельник, 9 января 2017 г., 14:55:22 UTC+2 пользователь Julian 
>>>>> Howarth написал:
>>>>>>
>>>>>> As per 
>>>>>> http://doc.akka.io/docs/akka/2.4.16/java/stream/stream-customize.html#Port_states__AbstractInHandler_and_AbstractOutHandler
>>>>>>  
>>>>>> you can only push to a port if downstream has pulled (ie signalled that 
>>>>>> it 
>>>>>> is ready for data). So, in addition to checking  isClosed(outx), you 
>>>>>> also need to check isAvailable(outx).
>>>>>>
>>>>>>
>>>>>> Alternatively, you can use emit(outx)which takes care of this 
>>>>>> automatically.
>>>>>>
>>>>>> Julian
>>>>>>
>>>>>>
>>>>>> On Monday, January 9, 2017 at 12:46:43 PM UTC, Sergey Sopin wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have created FanOutShape2 s

Re: [akka-user] Re: [Akka-streams] Cannot push port twice

2017-01-10 Thread Sergey Sopin
So, does it make sense? It seems it would be better to allow inlet be 
pullet once per outlet, not once at all. 
May be there is some existing solution for the problem above? 

 - Sergey

понедельник, 9 января 2017 г., 17:03:43 UTC+2 пользователь Sergey Sopin 
написал:
>
> Hi Endre,
>
> I am not sure if I need it. But it seems, that sometimes my inlet is being 
> pulled by outlet_0 when, according to conditions, message should be pushed 
> to outlet_1. In this case my flow get stuck, becase it waits until outlet_1 
> will pull it and it doesn't happen (because port is already pulled by 
> outlet_0).
>
> My flow looks like following: 
>
>
>
> <https://lh3.googleusercontent.com/-91i8qjm-MXM/WHOla3zvGeI/BQA/t1sEPAi_q6oeMdPWsz4DIlsZdrB8s06QACLcB/s1600/Flow.png>
>
>
> Custom logic of the element (FanOutShape2) can be found in the initial 
> message. Let me please know if you see something wrong there.
>
> Thanks!
>
> Regards,
> Sergey
>
> понедельник, 9 января 2017 г., 16:10:14 UTC+2 пользователь drewhk написал:
>>
>>
>>
>> On Mon, Jan 9, 2017 at 3:03 PM, Sergey Sopin <sopi...@gmail.com> wrote:
>>
>>> Hi again,
>>>
>>> In my case I have 1 inlet and 2 outlets. It seems that inlet port can be 
>>> pulled only once.
>>>
>>
>> Yes, this is clearly documented here: 
>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler
>>
>> Please look at the state charts of the ports.
>>  
>>
>>> So when two outlets try to pull the same inlet one by one, like in 
>>> example above, second attempt will fail, because hasBeenPulled(inlet) 
>>> function will return true. Could you please help me to figure out how to 
>>> deal with it?
>>>
>>
>> Why do you want to pull again if you have already pulled?
>>
>> -Endre
>>  
>>
>>> Thank you in advance!
>>>
>>> Cheers,
>>> Sergey
>>>
>>>
>>> понедельник, 9 января 2017 г., 14:55:22 UTC+2 пользователь Julian 
>>> Howarth написал:
>>>>
>>>> As per 
>>>> http://doc.akka.io/docs/akka/2.4.16/java/stream/stream-customize.html#Port_states__AbstractInHandler_and_AbstractOutHandler
>>>>  
>>>> you can only push to a port if downstream has pulled (ie signalled that it 
>>>> is ready for data). So, in addition to checking  isClosed(outx), you 
>>>> also need to check isAvailable(outx).
>>>>
>>>>
>>>> Alternatively, you can use emit(outx)which takes care of this 
>>>> automatically.
>>>>
>>>> Julian
>>>>
>>>>
>>>> On Monday, January 9, 2017 at 12:46:43 PM UTC, Sergey Sopin wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I have created FanOutShape2 shape with cusom logic: 
>>>>>
>>>>> @Override
>>>>> public GraphStageLogic createLogic(Attributes inheritedAttributes) 
>>>>>  {
>>>>> return new GraphStageLogic(shape) {
>>>>> {
>>>>> setHandler(in, new AbstractInHandler() {
>>>>> @Override
>>>>> public void onPush() throws Exception {
>>>>> Object result = process(grab(in), 
>>>>> materializer());
>>>>>
>>>>> if (result instanceof ProcessingResponse) {   
>>>>> ProcessingResponse response = 
>>>>> (ProcessingResponse) result;
>>>>> if (!isClosed(out1)) {
>>>>> push(out1, response); 
>>>>>   //This is FAndLShape.java:46
>>>>> }
>>>>> } else if (result != null && result instanceof 
>>>>> FinderData) {  
>>>>> FinderData response = (FinderData) result;
>>>>> if (!isClosed(out0)) {
>>>>> push(out0, response);
>>>>> }
>>>>> }
>>>>> }
>>>>> });
>>>>>
>>>>> setHandler(out0, new AbstractOutHandler() {
>>>>>  

Re: [akka-user] Re: [Akka-streams] Cannot push port twice

2017-01-09 Thread Sergey Sopin


Hi Endre,

I am not sure if I need it. But it seems, that sometimes my inlet is being 
pulled by outlet_0 when, according to conditions, message should be pushed 
to outlet_1. In this case my flow get stuck, becase it waits until outlet_1 
will pull it and it doesn't happen (because port is already pulled by 
outlet_0).

My flow looks like following: 


<https://lh3.googleusercontent.com/-91i8qjm-MXM/WHOla3zvGeI/BQA/t1sEPAi_q6oeMdPWsz4DIlsZdrB8s06QACLcB/s1600/Flow.png>


Custom logic of the element (FanOutShape2) can be found in the initial 
message. Let me please know if you see something wrong there.

Thanks!

Regards,
Sergey

понедельник, 9 января 2017 г., 16:10:14 UTC+2 пользователь drewhk написал:
>
>
>
> On Mon, Jan 9, 2017 at 3:03 PM, Sergey Sopin <sopi...@gmail.com 
> > wrote:
>
>> Hi again,
>>
>> In my case I have 1 inlet and 2 outlets. It seems that inlet port can be 
>> pulled only once.
>>
>
> Yes, this is clearly documented here: 
> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler
>
> Please look at the state charts of the ports.
>  
>
>> So when two outlets try to pull the same inlet one by one, like in 
>> example above, second attempt will fail, because hasBeenPulled(inlet) 
>> function will return true. Could you please help me to figure out how to 
>> deal with it?
>>
>
> Why do you want to pull again if you have already pulled?
>
> -Endre
>  
>
>> Thank you in advance!
>>
>> Cheers,
>> Sergey
>>
>>
>> понедельник, 9 января 2017 г., 14:55:22 UTC+2 пользователь Julian Howarth 
>> написал:
>>>
>>> As per 
>>> http://doc.akka.io/docs/akka/2.4.16/java/stream/stream-customize.html#Port_states__AbstractInHandler_and_AbstractOutHandler
>>>  
>>> you can only push to a port if downstream has pulled (ie signalled that it 
>>> is ready for data). So, in addition to checking  isClosed(outx), you 
>>> also need to check isAvailable(outx).
>>>
>>>
>>> Alternatively, you can use emit(outx)which takes care of this 
>>> automatically.
>>>
>>> Julian
>>>
>>>
>>> On Monday, January 9, 2017 at 12:46:43 PM UTC, Sergey Sopin wrote:
>>>>
>>>> Hi,
>>>>
>>>> I have created FanOutShape2 shape with cusom logic: 
>>>>
>>>> @Override
>>>> public GraphStageLogic createLogic(Attributes inheritedAttributes) 
>>>>  {
>>>> return new GraphStageLogic(shape) {
>>>> {
>>>> setHandler(in, new AbstractInHandler() {
>>>> @Override
>>>> public void onPush() throws Exception {
>>>> Object result = process(grab(in), 
>>>> materializer());
>>>>
>>>> if (result instanceof ProcessingResponse) {   
>>>> ProcessingResponse response = 
>>>> (ProcessingResponse) result;
>>>> if (!isClosed(out1)) {
>>>> push(out1, response);   
>>>> //This is FAndLShape.java:46
>>>> }
>>>> } else if (result != null && result instanceof 
>>>> FinderData) {  
>>>> FinderData response = (FinderData) result;
>>>> if (!isClosed(out0)) {
>>>> push(out0, response);
>>>> }
>>>> }
>>>> }
>>>> });
>>>>
>>>> setHandler(out0, new AbstractOutHandler() {
>>>> @Override
>>>> public void onPull() throws Exception {
>>>> if ((!hasBeenPulled(in)) && (!isClosed(in))) {
>>>> pull(in);
>>>> }
>>>> }
>>>> });
>>>>
>>>> setHandler(out1, new AbstractOutHandler() {
>>>> @Override
>>>> public void onPull () throws Exception {
>>>> if ((!hasBeenPulled(in)) && (!isClosed(in))) {
>>>>

[akka-user] Re: [Akka-streams] Cannot push port twice

2017-01-09 Thread Sergey Sopin
Hi again,

In my case I have 1 inlet and 2 outlets. It seems that inlet port can be 
pulled only once. So when two outlets try to pull the same inlet one by 
one, like in example above, second attempt will fail, because 
hasBeenPulled(inlet) function will return true. Could you please help me to 
figure out how to deal with it?
Thank you in advance!

Cheers,
Sergey


понедельник, 9 января 2017 г., 14:55:22 UTC+2 пользователь Julian Howarth 
написал:
>
> As per 
> http://doc.akka.io/docs/akka/2.4.16/java/stream/stream-customize.html#Port_states__AbstractInHandler_and_AbstractOutHandler
>  
> you can only push to a port if downstream has pulled (ie signalled that it 
> is ready for data). So, in addition to checking  isClosed(outx), you also 
> need to check isAvailable(outx).
>
> Alternatively, you can use emit(outx)which takes care of this 
> automatically.
>
> Julian
>
>
> On Monday, January 9, 2017 at 12:46:43 PM UTC, Sergey Sopin wrote:
>>
>> Hi,
>>
>> I have created FanOutShape2 shape with cusom logic: 
>>
>> @Override
>> public GraphStageLogic createLogic(Attributes inheritedAttributes)  {
>> return new GraphStageLogic(shape) {
>> {
>> setHandler(in, new AbstractInHandler() {
>> @Override
>> public void onPush() throws Exception {
>> Object result = process(grab(in), materializer());
>>
>> if (result instanceof ProcessingResponse) {   
>> ProcessingResponse response = 
>> (ProcessingResponse) result;
>> if (!isClosed(out1)) {
>> push(out1, response); 
>>   //This is FAndLShape.java:46
>> }
>> } else if (result != null && result instanceof 
>> FinderData) {  
>> FinderData response = (FinderData) result;
>> if (!isClosed(out0)) {
>> push(out0, response);
>> }
>> }
>> }
>> });
>>
>> setHandler(out0, new AbstractOutHandler() {
>> @Override
>> public void onPull() throws Exception {
>> if ((!hasBeenPulled(in)) && (!isClosed(in))) {
>> pull(in);
>> }
>> }
>> });
>>
>> setHandler(out1, new AbstractOutHandler() {
>> @Override
>> public void onPull () throws Exception {
>> if ((!hasBeenPulled(in)) && (!isClosed(in))) {
>> pull(in);
>> }
>> }
>> });
>> }
>>
>> };
>> }
>>
>> And sometimes I get following error message: 
>>
>> [error] a.a.RepointableActorRef - Error in stage 
>> [kernel.modeller.workers.streamFinder.finderShapes.FAndLShape@4efd96f7]: 
>> requirement failed: Cannot push port (out1) twice
>> java.lang.IllegalArgumentException: requirement failed: Cannot push port 
>> (out1) twice
>> at scala.Predef$.require(Predef.scala:219)
>> at akka.stream.stage.GraphStageLogic.push(GraphStage.scala:439)
>> at 
>> kernel.modeller.workers.streamFinder.finderShapes.FAndLShape$1$1.onPush(FAndLShape.java:46)
>> at 
>> akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:582)
>> at 
>> akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:593)
>> at 
>> akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:535)
>> at 
>> akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:443)
>> at 
>> akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:387)
>> at 
>> akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:547)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
>>
>> Could you tell me what is wrong here? 
>> Thanks! 
>>
>> Cheers,
>> Sergey
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [Akka-streams] Cannot push port twice

2017-01-09 Thread Sergey Sopin
Hi Julian,

Thank you very much! 

понедельник, 9 января 2017 г., 14:55:22 UTC+2 пользователь Julian Howarth 
написал:
>
> As per 
> http://doc.akka.io/docs/akka/2.4.16/java/stream/stream-customize.html#Port_states__AbstractInHandler_and_AbstractOutHandler
>  
> you can only push to a port if downstream has pulled (ie signalled that it 
> is ready for data). So, in addition to checking  isClosed(outx), you also 
> need to check isAvailable(outx).
>
> Alternatively, you can use emit(outx)which takes care of this 
> automatically.
>
> Julian
>
>
> On Monday, January 9, 2017 at 12:46:43 PM UTC, Sergey Sopin wrote:
>>
>> Hi,
>>
>> I have created FanOutShape2 shape with cusom logic: 
>>
>> @Override
>> public GraphStageLogic createLogic(Attributes inheritedAttributes)  {
>> return new GraphStageLogic(shape) {
>> {
>> setHandler(in, new AbstractInHandler() {
>> @Override
>> public void onPush() throws Exception {
>> Object result = process(grab(in), materializer());
>>
>> if (result instanceof ProcessingResponse) {   
>> ProcessingResponse response = 
>> (ProcessingResponse) result;
>> if (!isClosed(out1)) {
>> push(out1, response); 
>>   //This is FAndLShape.java:46
>> }
>> } else if (result != null && result instanceof 
>> FinderData) {  
>> FinderData response = (FinderData) result;
>> if (!isClosed(out0)) {
>> push(out0, response);
>> }
>> }
>> }
>> });
>>
>> setHandler(out0, new AbstractOutHandler() {
>> @Override
>> public void onPull() throws Exception {
>> if ((!hasBeenPulled(in)) && (!isClosed(in))) {
>> pull(in);
>> }
>> }
>> });
>>
>> setHandler(out1, new AbstractOutHandler() {
>> @Override
>> public void onPull () throws Exception {
>> if ((!hasBeenPulled(in)) && (!isClosed(in))) {
>> pull(in);
>> }
>> }
>> });
>> }
>>
>> };
>> }
>>
>> And sometimes I get following error message: 
>>
>> [error] a.a.RepointableActorRef - Error in stage 
>> [kernel.modeller.workers.streamFinder.finderShapes.FAndLShape@4efd96f7]: 
>> requirement failed: Cannot push port (out1) twice
>> java.lang.IllegalArgumentException: requirement failed: Cannot push port 
>> (out1) twice
>> at scala.Predef$.require(Predef.scala:219)
>> at akka.stream.stage.GraphStageLogic.push(GraphStage.scala:439)
>> at 
>> kernel.modeller.workers.streamFinder.finderShapes.FAndLShape$1$1.onPush(FAndLShape.java:46)
>> at 
>> akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:582)
>> at 
>> akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:593)
>> at 
>> akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:535)
>> at 
>> akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:443)
>> at 
>> akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:387)
>> at 
>> akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:547)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
>>
>> Could you tell me what is wrong here? 
>> Thanks! 
>>
>> Cheers,
>> Sergey
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [Akka-streams] Cannot push port twice

2017-01-09 Thread Sergey Sopin
Hi,

I have created FanOutShape2 shape with cusom logic: 

@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes)  {
return new GraphStageLogic(shape) {
{
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
Object result = process(grab(in), materializer());

if (result instanceof ProcessingResponse) {   
ProcessingResponse response = 
(ProcessingResponse) result;
if (!isClosed(out1)) {
push(out1, response);   
//This is FAndLShape.java:46
}
} else if (result != null && result instanceof 
FinderData) {  
FinderData response = (FinderData) result;
if (!isClosed(out0)) {
push(out0, response);
}
}
}
});

setHandler(out0, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
if ((!hasBeenPulled(in)) && (!isClosed(in))) {
pull(in);
}
}
});

setHandler(out1, new AbstractOutHandler() {
@Override
public void onPull () throws Exception {
if ((!hasBeenPulled(in)) && (!isClosed(in))) {
pull(in);
}
}
});
}

};
}

And sometimes I get following error message: 

[error] a.a.RepointableActorRef - Error in stage 
[kernel.modeller.workers.streamFinder.finderShapes.FAndLShape@4efd96f7]: 
requirement failed: Cannot push port (out1) twice
java.lang.IllegalArgumentException: requirement failed: Cannot push port 
(out1) twice
at scala.Predef$.require(Predef.scala:219)
at akka.stream.stage.GraphStageLogic.push(GraphStage.scala:439)
at 
kernel.modeller.workers.streamFinder.finderShapes.FAndLShape$1$1.onPush(FAndLShape.java:46)
at 
akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:582)
at 
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:593)
at 
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:535)
at 
akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:443)
at 
akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:387)
at 
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:547)
at akka.actor.Actor$class.aroundReceive(Actor.scala:484)

Could you tell me what is wrong here? 
Thanks! 

Cheers,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka stream never completes

2016-12-27 Thread Sergey Sopin
Yep, thanks a lot! I completely forgot about it.

вторник, 27 декабря 2016 г., 19:53:45 UTC+2 пользователь √ написал:
>
> eagerClose on the Merge?
>
> -- 
> Cheers,
> √
>
> On Dec 27, 2016 6:30 PM, "Sergey Sopin" <sopi...@gmail.com > 
> wrote:
>
>> Hi all,
>>
>>
>> I am trying to build a flow, which will return CompletionStage with the 
>> List inside. It looks like following:
>>
>>
>> private CompletionStage<List> buildAndRunGraph(ArrayList 
>> sourceList) {
>> Source<Data, NotUsed> source = Source.from(sourceList);
>> Materializer materializer = ActorMaterializer.create(context());
>> return source.via(balancer(buildFinderFlow(), sourceList.size(), 
>> false)).runWith(Sink.seq(), materializer);
>> }
>>
>>
>> The flow itself looks like following:
>>
>>
>> <https://lh3.googleusercontent.com/-xcZRmr1fRjA/WGKiaWar2pI/BPo/qydmGNfqarUtHZticT_AELKraHEhBdoLwCLcB/s1600/Flow.png>
>>
>> First two elements in the flow with multiple inlets and outlets are Merge 
>> and Partition. Third one (with two outlets) is custom FanOutShape2 which 
>> redirects message to one of two outlets based on condition.
>> Flow itself is working, I added loging in it and if Source contains 55 
>> elements, then 55 results will be in the Sink, but resulted CompletionStage 
>> doesn't complete. It waits for some miracle which never happens. 
>> Could you please tell me how to fix that?
>> Thank you in advance!
>>
>> Cheers,
>> Sergey
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka stream never completes

2016-12-27 Thread Sergey Sopin


Hi all,


I am trying to build a flow, which will return CompletionStage with the 
List inside. It looks like following:


private CompletionStage buildAndRunGraph(ArrayList 
sourceList) {
Source source = Source.from(sourceList);
Materializer materializer = ActorMaterializer.create(context());
return source.via(balancer(buildFinderFlow(), sourceList.size(), 
false)).runWith(Sink.seq(), materializer);
}


The flow itself looks like following:



First two elements in the flow with multiple inlets and outlets are Merge 
and Partition. Third one (with two outlets) is custom FanOutShape2 which 
redirects message to one of two outlets based on condition.
Flow itself is working, I added loging in it and if Source contains 55 
elements, then 55 results will be in the Sink, but resulted CompletionStage 
doesn't complete. It waits for some miracle which never happens. 
Could you please tell me how to fix that?
Thank you in advance!

Cheers,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] Implementing custom graph stage logic for FanOut shapes

2016-11-27 Thread Sergey Sopin
Hi,

I am trying to implement custom logic for UniformFanOutShape, but it seems 
I do something wrong.

public class SPMessageSplitter extends 
GraphStage> {

//Inlet
public final Inlet in = Inlet.create("Inlet.in");

//Outlets
public final Outlet flOut = Outlet.create("FL.out");
public final Outlet sOut = Outlet.create("SD.out");
public final Outlet cOut = Outlet.create("CD.out");

private Outlet[] outlets = {flOut, sOut, cOut};

//Shape
private final UniformFanOutShape shape = new 
UniformFanOutShape<>(in, (Outlet[])outlets);

@Override
public UniformFanOutShape shape() {
return shape;
}

@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
IntReplacement pendingCount = new IntReplacement(3);
IntReplacement downstreamRunning = new IntReplacement(3);

BoolReplacement pending0 = BoolReplacement.TRUE;
BoolReplacement pending1 = BoolReplacement.TRUE;
BoolReplacement pending2 = BoolReplacement.TRUE;

//In handler
{
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() throws Exception {
SPFinderData elem = grab(in);
System.out.print("Splitter: ON PUSH!");
push(findOutlet(elem), elem);
pendingCount.setInteger(downstreamRunning.getInteger());
}
});
}

//Handler for FL.out outlet
{
setHandler(flOut, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
{
pendingCount.setInteger(pendingCount.getInteger() - 
1);
pending0.setValue(false);
if (pendingCount.getInteger() == 0) pull(in);
}
}

@Override
public void onDownstreamFinish() {

downstreamRunning.setInteger(downstreamRunning.getInteger() - 1);
if (downstreamRunning.getInteger() == 0) 
completeStage();
else {
if (pending0.getBoolValue()) 
pendingCount.setInteger(pendingCount.getInteger() - 1);
if (pendingCount.getInteger() == 0 && 
!hasBeenPulled(in)) pull(in);
}
}
});
}
//Handler for SD.out outlet
{
setHandler(sOut, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
{
pendingCount.setInteger(pendingCount.getInteger() - 
1);
pending1.setValue(false);
if (pendingCount.getInteger() == 0) pull(in);
}
}

@Override
public void onDownstreamFinish() {

downstreamRunning.setInteger(downstreamRunning.getInteger() - 1);
if (downstreamRunning.getInteger() == 0) 
completeStage();
else {
if (pending1.getBoolValue()) 
pendingCount.setInteger(pendingCount.getInteger() - 1);
if (pendingCount.getInteger() == 0 && 
!hasBeenPulled(in)) pull(in);
}
}
});
}
//Handler for CDir.out outlet
{
setHandler(cOut, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
{
pendingCount.setInteger(pendingCount.getInteger() - 
1);
pending2.setValue(false);
if (pendingCount.getInteger() == 0) pull(in);
}
}

@Override
public void onDownstreamFinish() {

downstreamRunning.setInteger(downstreamRunning.getInteger() - 1);
if (downstreamRunning.getInteger() == 0) 
completeStage();
else {
if (pending2.getBoolValue()) 
pendingCount.setInteger(pendingCount.getInteger() - 1);
if (pendingCount.getInteger() == 0 && 
!hasBeenPulled(in)) pull(in);
}
}
});
}

};
}

private Outlet findOutlet(SPFinderData elem) 

Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-25 Thread Sergey Sopin
Hi Rafał,

Thanks a lot for your comments! 
You are right, it is a time to test them separately.
I've read that blog posts, but I will do it again carefully.
Thanks again.
 
Regards,
Sergey

вторник, 25 октября 2016 г., 12:54:15 UTC+3 пользователь Rafał Krzewski 
написал:
>
> Sergey, I spent a few minutes looking at your code, and I see two problems:
> -inletNumberToPullFrom is not scoped properly. It should be local to 
> GraphLogic implementation. In the current form, it is shared between all 
> ManagementFlow instances that get materialized that is definitely not what 
> you want
> - you expect that elements will be pushed to the inlets before findInelt() 
> method will be first called. To the best of my knowledge this is not how 
> Akka Streams work. If you don't see the exception in the default case of 
> switch(inletNumbetToPullFrom) thrown, that would mean that the demand token 
> never reaches ManagementFlow inlet 0, possibly because it is not propagated 
> correctly in FL graph.
>
> My recommendation is still to test FL, S, C and ManagementFlow in 
> isolation (ie connecting them only to TestSource, TestSink, Source.single, 
> Sink.ignore etc) and only then proceed to wiring them together.
>
> BTW, have you read the blog posts [1] and [2]? I find them quite 
> informative.
>
> Cheers,
> Rafał
>
> [1] http://blog.akka.io/streams/2016/07/30/mastering-graph-stage-part-1 
> <http://www.google.com/url?q=http%3A%2F%2Fblog.akka.io%2Fstreams%2F2016%2F07%2F30%2Fmastering-graph-stage-part-1=D=1=AFQjCNGuCrWgIKfw0bU_7I-iX9s3iU8QCA>
> [2] 
> http://blog.akka.io/integrations/2016/08/25/simple-sink-source-with-graphstage
>  
> <http://www.google.com/url?q=http%3A%2F%2Fblog.akka.io%2Fintegrations%2F2016%2F08%2F25%2Fsimple-sink-source-with-graphstage=D=1=AFQjCNHYHicBY3_8IE0RSR6Cy7dhz3Geog>
>
> W dniu poniedziałek, 24 października 2016 23:38:54 UTC+2 użytkownik Sergey 
> Sopin napisał:
>>
>> Hi Rafał,
>>
>> - sink requests data from you
>>> - OutletHandler.onPull is invoked on the outlet where the sink is 
>>> connected to
>>> - you propagate demand outstream by calling pull on any (or all) of your 
>>> Inlets, depending on your logic
>>> - eventually data becomes available upstream
>>> - InletHandler.onPush is invoked on the inlet you pulled previously, 
>>> with the incoming element
>>>
>>
>> I tried to add logging into onPull functions, but it didn't help. I see 
>> only messages from the beggining of flow ("Message_1"), but not from my 
>> custom shape. 
>>
>> OK, but the actual number of workers should not be greater than the 
>>> number of available CPUs, because otherwise Akka will interleave their 
>>> execution anyway. Spawning 1000s of worker flows will only waste memory. Of 
>>> course I understand that the input of fixed collection of data is artif
>>>
>>
>> I tried to remove balancer at all and work with the single worker. It 
>> doesn't help.
>>
>>  I usually prefer to debug single problem at a time than a number of 
>>> possibly interrelated problems at once.
>>>
>>
>> Me too and I will try, but I am sure that it will give me nothing in this 
>> case.
>> The problem is in the shape itself. 
>>
>> Regards,
>> Sergey
>>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-24 Thread Sergey Sopin
Hi Rafał,

- sink requests data from you
> - OutletHandler.onPull is invoked on the outlet where the sink is 
> connected to
> - you propagate demand outstream by calling pull on any (or all) of your 
> Inlets, depending on your logic
> - eventually data becomes available upstream
> - InletHandler.onPush is invoked on the inlet you pulled previously, with 
> the incoming element
>

I tried to add logging into onPull functions, but it didn't help. I see 
only messages from the beggining of flow ("Message_1"), but not from my 
custom shape. 

OK, but the actual number of workers should not be greater than the number 
> of available CPUs, because otherwise Akka will interleave their execution 
> anyway. Spawning 1000s of worker flows will only waste memory. Of course I 
> understand that the input of fixed collection of data is artif
>

I tried to remove balancer at all and work with the single worker. It 
doesn't help.

 I usually prefer to debug single problem at a time than a number of 
> possibly interrelated problems at once.
>

Me too and I will try, but I am sure that it will give me nothing in this 
case.
The problem is in the shape itself. 

Regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-20 Thread Sergey Sopin
Rafał,

- If I understand onPulI() function correctly, it is allowed only for 
outlets, so in order to pull something you need to push it in this outlet 
firstly.

- Balancer has been taken from the docs: 
Balancing_jobs_to_a_fixed_pool_of_workers 

 

- I don't think that testing will help me, because based on the first 
statement I would see something in the log. So, messages do not go inside 
the shape and stuck somewhere between flow and managementFlow. 

Regards,
Sergey

четверг, 20 октября 2016 г., 13:52:58 UTC+3 пользователь Rafał Krzewski 
написал:
>
> Sergey,
> I have a few remarks after cursory reading of your code:
>
> - Akka Streams (and Reactrive Streams) are pull based. As the messages 
> travel downstream, virtual demand tokens travel upstream. Each graph 
> element is allowed to push elements downstream only when demand is 
> signaled. This means that you must keep track of demand carefully. In a 
> GraphStage that is flow-shaped (has both inlets and outlets), flow of data 
> is initiated by a pull on it's outlets. If you fails to propagate such pull 
> to some (or all) of your GraphStage's inlets things are going to stall. In 
> your test code, you put debut statements in onPush methods, but you should 
> also monitor onPull
>
> - There's something strange with your balancer component. Either I'm 
> misreading things, or you removed some important details while editing the 
> code for publication, but it seems to me that each Data input element will 
> be processed in a dedicated, parallel finderFlow / each finderFlow will 
> ever see only a single element. This also could be a reason of the "clog" 
> you experience.
>
> - In general I would suggest building your flow processing "outwards": 
> first try to validate that managementFlow works in isolation (unit tests 
> with Stream Test Kit would be recommended here) and once you have this 
> settled, build and test large flows progressively.
>
> Hope that helps,
> Rafał
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Sergey Sopin
Hi Rafał,

Code that uses my shape is following: 

private CompletionStage buildAndRunGraph(ArrayList 
sourceList) {  //First function
Source source = Source.from(sourceList);
Materializer materializer = ActorMaterializer.create(context());
System.out.println("Running flow.");
return source.map(i -> {System.out.println("Message_1!"); return 
i;}).via(balancer(buildFinderFlow(), sourceList.size(), false)).
   map(i -> 
{System.out.println("Message_2!"); return i;}).runWith(Sink.seq(), 
materializer);

}

/**
 * This procedure build main processing flow.
 * Flow design is following:
 *
 * +---+
 * |   +-(0)--++   |
 * |   |   +---+  | FL |-(1)---|>
 * |   +-->|   |-(0)->++   |
 * >---|-->|Management |  +-+  |
 * |   +-->|  flow |-(1)->| S   |+ |
 * |   |+->|   |  +-+| |
 * |   ||  +---+-(2)->+---+  | |
 * |   |+-|C  |  | |
 * |   |  +---+  | |
 * |   +-+ |
 * +---+
 *
 * @return main processing flow.
 */
private static Flow buildFinderFlow() {
return Flow.fromGraph(GraphDSL.create(builder -> {
Graph, NotUsed> managementFlow = 
new ManagementFlow();   //Extends my new UniformFanShape. Code listed below
UniformFanShape managementShape = 
builder.add(managementFlow);

Graph, NotUsed> fl = new 
FLShape(); //FLShape extends FanOutShape2
FanOutShape2 flShape = builder.add(fl);

FlowShape s = builder.add(new SDShape()); //SDShape 
extends FlowShape
FlowShape c = builder.add(new CDShape()); //CDShape 
extends FlowShape

builder.from(managementShape.out(0)).toInlet(flShape.in());
builder.from(managementShape.out(1)).toInlet(s.in());
builder.from(managementShape.out(2)).toInlet(c.in());

builder.from(flShape.out0()).toInlet(managementShape.in(0));

builder.from(s.out()).toInlet(managementShape.in(2));

builder.from(c.out()).toInlet(managementShape.in(3));

return new FlowShape<>(managementShape.in(1), flShape.out1());

})
);
}

/**
 * This procedure returns Flow which contains set of sub Flows to be run 
asynchronously.
 *
 * @param worker Flow which contains processing logic and to be run 
asynchronously
 * @param workerCount amount of asycnhronous processes
 * @param  Type of input
 * @param  Type of output
 * @return Flow which contains set of asynchronous processes
 */
private static  Flow balancer(
Flow worker, int workerCount, boolean 
waitForAllDownstreams) {
return Flow.fromGraph(GraphDSL.create(b -> {
final UniformFanOutShape balance =
b.add(Balance.create(workerCount, waitForAllDownstreams));
final UniformFanInShape merge =
b.add(Merge.create(workerCount));

for (int i = 0; i < workerCount; i++) {

b.from(balance.out(i)).via(b.add(worker.async())).toInlet(merge.in(i));
}

return FlowShape.of(balance.in(), merge.out());
}));
}


And here is the code of ManagementFlow class which extends UniformFanShape:

package kernel.modeller.workers.streamFinder.subPathFinderShapes;

import akka.stream.Attributes;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import kernel.modeller.data.Data;
import kernel.modeller.workers.streamFinder.generic.UniformFanShape;

public final class ManagementFlow extends 
GraphStage> {
//Inlets
public final Inlet startIn = Inlet.create("Start.in");
public final Inlet flIn = Inlet.create("FL.in");
public final Inlet sIn = Inlet.create("sDir.in");
public final Inlet cIn = Inlet.create("cDir.in");
//Outlets
public final Outlet flOut = Outlet.create("FL.out");
public final Outlet sOut = Outlet.create("sDir.out");
public final Outlet cOut = Outlet.create("cDir.out");

private Inlet[] inlets = { flIn, startIn, sIn, cIn};
private Outlet[] outlets = {flOut, sOut, cOut};

private byte inletNumberToPullFrom = -1;

//Shape
private final UniformFanShape shape = new 

Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Sergey Sopin
Yep, I was trying to make something like that, but my code doesn't work. 
Messages stuck somewhere and I don't know how to fix it due to lack of 
Scala knowledge. You can find the code in my initial message here. 
I took UniformFanOut as example and unsuccessfully tried to replace single 
Inlet with the set of them.
If you could help me to fix it I would be more than happy :)
Thanks!

Regards,
Sergey

среда, 19 октября 2016 г., 11:32:05 UTC+3 пользователь Konrad Malawski 
написал:
>
> Shapes don't need separate java or scala api, it's shared.
>
> You can just subclass a shape and make a class that directly represents 
> your shape.
> If you want AmorphousShape then sure, but please note that it's purpose is 
> to "forget about the types of those".
>
> If you want a well typed one simply extend Shape and fill in the abstract 
> methods - see FlowShape etc for examples how to do this.
>
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 19 October 2016 at 08:03:16, Sergey Sopin (sopi...@gmail.com 
> ) wrote:
>
> Hi, 
>
> Yes, but it seems that I need to create Java API for it, because my app is 
> in Java. 
> I used Inkscape app. to draw the diagram.
>
> Cheers,
> Sergey
>
> среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski 
> написал: 
>>
>> A custom GraphStage [1] using AmorphousShape is probably the way to go in 
>> this case. 
>>
>> That's a really neat diagram, BTW! What software did you us to create it?
>>
>> Cheers,
>> Rafał
>>
>> [1] 
>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Custom_processing_with_GraphStage
>>
>> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey 
>> Sopin napisał: 
>>>
>>> Hi again,
>>>
>>> I have a very specific case. My flow looks like this one: 
>>>
>>>
>>> <https://lh3.googleusercontent.com/-qrP4yHkVYI8/WAaAO1-q67I/Abw/acUT-YaG48k0lo7MkePGv9QVVRkH5L_BACLcB/s1600/Flow.png>
>>>
>>> The idea of multi input/output shape was to redirect messages to a right 
>>> output based on the message data.
>>>
>>> I just learn streams, so maybe you can suggest a better solution?
>>>
>>> Thanks!
>>>
>>>
>>> Cheers, 
>>>
>>> Sergey
>>>
>>>
>>>
>>> вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
>>> написал: 
>>>>
>>>> It's not clear to me, what are you trying to accomplish. It looks like 
>>>> you are trying to implement AmorphousShape (ie. arbitrary number of open 
>>>> inlets and outlets) on your own, and then a specific variant of it, that 
>>>> has all inlets sharing the same type, and all outlets sharing another 
>>>> type. 
>>>> The "Fan" fragment in the names you used is a bit misleading, since in 
>>>> Akka 
>>>> Stream's own usage of it names like FanIn / FanOut shape mean that such 
>>>> grap has many inlets and single outlet / single inlet many outlets. The 
>>>> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
>>>> many blades :) I am wondering what use case you have in mind for your 
>>>> AmorphousShape because the graphs that can be materialized and executed 
>>>> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
>>>> for reusing pieces of functionality, but anything more complex than a 
>>>> BidiShape  seems  rather unwieldy to me. 
>>>>
>>>> My understanding is that Graph's shape should not interfere with 
>>>> message flow, because it's just a canvas with contact points on the 
>>>> perimeter. What matters are the components that you plug into it. Akka 
>>>> just 
>>>> makes sure that you don't leave any of the contact points dangling. This 
>>>> makes me think that the problems with messages getting "stuck" was caused 
>>>> somewhere other than graph shape construction site.
>>>>
>>>> Have you tried inserting probes alon the lines of 
>>>> Flow.alsoTo(Sink.foreach(_ 
>>>> => println("beep!"))) (shooting from the hip here, apologies if it 
>>>> does not compile straight away) into your graph? That could help you 
>>>> locate 
>>>> where the messages are stuck / discarded.
>>>>
>>>> Cheers,
>>>> Rafał
>>>>
>>>> W dniu 

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Sergey Sopin
Hi,

Yes, but it seems that I need to create Java API for it, because my app is 
in Java. 
I used Inkscape app. to draw the diagram.

Cheers,
Sergey

среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski 
написал:
>
> A custom GraphStage [1] using AmorphousShape is probably the way to go in 
> this case.
>
> That's a really neat diagram, BTW! What software did you us to create it?
>
> Cheers,
> Rafał
>
> [1] 
> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Custom_processing_with_GraphStage
>
> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey Sopin 
> napisał:
>>
>> Hi again,
>>
>> I have a very specific case. My flow looks like this one:
>>
>>
>> <https://lh3.googleusercontent.com/-qrP4yHkVYI8/WAaAO1-q67I/Abw/acUT-YaG48k0lo7MkePGv9QVVRkH5L_BACLcB/s1600/Flow.png>
>>
>> The idea of multi input/output shape was to redirect messages to a right 
>> output based on the message data.
>>
>> I just learn streams, so maybe you can suggest a better solution?
>>
>> Thanks!
>>
>>
>> Cheers, 
>>
>> Sergey
>>
>>
>>
>> вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
>> написал:
>>>
>>> It's not clear to me, what are you trying to accomplish. It looks like 
>>> you are trying to implement AmorphousShape (ie. arbitrary number of open 
>>> inlets and outlets) on your own, and then a specific variant of it, that 
>>> has all inlets sharing the same type, and all outlets sharing another type. 
>>> The "Fan" fragment in the names you used is a bit misleading, since in Akka 
>>> Stream's own usage of it names like FanIn / FanOut shape mean that such 
>>> grap has many inlets and single outlet / single inlet many outlets. The 
>>> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
>>> many blades :) I am wondering what use case you have in mind for your 
>>> AmorphousShape because the graphs that can be materialized and executed 
>>> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
>>> for reusing pieces of functionality, but anything more complex than a 
>>> BidiShape  seems  rather unwieldy to me.
>>>
>>> My understanding is that Graph's shape should not interfere with message 
>>> flow, because it's just a canvas with contact points on the perimeter. What 
>>> matters are the components that you plug into it. Akka just makes sure that 
>>> you don't leave any of the contact points dangling. This makes me think 
>>> that the problems with messages getting "stuck" was caused somewhere other 
>>> than graph shape construction site.
>>>
>>> Have you tried inserting probes alon the lines of 
>>> Flow.alsoTo(Sink.foreach(_ 
>>> => println("beep!"))) (shooting from the hip here, apologies if it does 
>>> not compile straight away) into your graph? That could help you locate 
>>> where the messages are stuck / discarded.
>>>
>>> Cheers,
>>> Rafał
>>>
>>> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik 
>>> Sergey Sopin napisał:
>>>>
>>>> Hi,
>>>>
>>>> I am trying to create my own akka streams shape with several Inlets and 
>>>> Outlets. I have written following code: 
>>>>
>>>> package kernel.modeller.workers.streamFinder.generic
>>>>
>>>> import akka.stream.{Shape, Outlet, Inlet}
>>>> import scala.annotation.unchecked.uncheckedVariance
>>>> import scala.collection.immutable
>>>>
>>>> object FanShape {
>>>>   sealed trait Init[_] {
>>>> def inlets: immutable.Seq[Inlet[_]]
>>>> def outlets: immutable.Seq[Outlet[_]]
>>>> def name: String
>>>>   }
>>>>   final case class Name[_](override val name: String) extends Init[Any] {
>>>> override def inlets: immutable.Seq[Inlet[_]] = Nil
>>>> override def outlets: immutable.Seq[Outlet[_]] = Nil
>>>>   }
>>>>   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
>>>> override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
>>>> override def name: String = "FanShape"
>>>>   }
>>>> }
>>>>
>>>> abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
>>>> Iterator[Outlet[_]], _name: String) extend

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-18 Thread Sergey Sopin
Hi again,

I have a very specific case. My flow looks like this one:

<https://lh3.googleusercontent.com/-qrP4yHkVYI8/WAaAO1-q67I/Abw/acUT-YaG48k0lo7MkePGv9QVVRkH5L_BACLcB/s1600/Flow.png>

The idea of multi input/output shape was to redirect messages to a right 
output based on the message data.

I just learn streams, so maybe you can suggest a better solution?

Thanks!


Cheers, 

Sergey



вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
написал:
>
> It's not clear to me, what are you trying to accomplish. It looks like you 
> are trying to implement AmorphousShape (ie. arbitrary number of open inlets 
> and outlets) on your own, and then a specific variant of it, that has all 
> inlets sharing the same type, and all outlets sharing another type. The 
> "Fan" fragment in the names you used is a bit misleading, since in Akka 
> Stream's own usage of it names like FanIn / FanOut shape mean that such 
> grap has many inlets and single outlet / single inlet many outlets. The 
> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
> many blades :) I am wondering what use case you have in mind for your 
> AmorphousShape because the graphs that can be materialized and executed 
> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
> for reusing pieces of functionality, but anything more complex than a 
> BidiShape  seems  rather unwieldy to me.
>
> My understanding is that Graph's shape should not interfere with message 
> flow, because it's just a canvas with contact points on the perimeter. What 
> matters are the components that you plug into it. Akka just makes sure that 
> you don't leave any of the contact points dangling. This makes me think 
> that the problems with messages getting "stuck" was caused somewhere other 
> than graph shape construction site.
>
> Have you tried inserting probes alon the lines of Flow.alsoTo(Sink.foreach(_ 
> => println("beep!"))) (shooting from the hip here, apologies if it does 
> not compile straight away) into your graph? That could help you locate 
> where the messages are stuck / discarded.
>
> Cheers,
> Rafał
>
> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik Sergey 
> Sopin napisał:
>>
>> Hi,
>>
>> I am trying to create my own akka streams shape with several Inlets and 
>> Outlets. I have written following code: 
>>
>> package kernel.modeller.workers.streamFinder.generic
>>
>> import akka.stream.{Shape, Outlet, Inlet}
>> import scala.annotation.unchecked.uncheckedVariance
>> import scala.collection.immutable
>>
>> object FanShape {
>>   sealed trait Init[_] {
>> def inlets: immutable.Seq[Inlet[_]]
>> def outlets: immutable.Seq[Outlet[_]]
>> def name: String
>>   }
>>   final case class Name[_](override val name: String) extends Init[Any] {
>> override def inlets: immutable.Seq[Inlet[_]] = Nil
>> override def outlets: immutable.Seq[Outlet[_]] = Nil
>>   }
>>   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
>> override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
>> override def name: String = "FanShape"
>>   }
>> }
>>
>> abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
>> Iterator[Outlet[_]], _name: String) extends Shape {
>>   
>>   import FanShape._
>>
>>   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
>> init.outlets.iterator, init.name)
>>
>>   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
>>   final override def inlets: immutable.Seq[Inlet[_]] = _inlets
>>
>>   private var _outlets: Vector[Outlet[_]] = Vector.empty
>>   private var _inlets: Vector[Inlet[_]] = Vector.empty
>>
>>   protected def newOutlet[T](name: String): Outlet[T] = {
>> val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
>> Outlet[T](s"${_name}.$name")
>> _outlets :+= p
>> p
>>   }
>>
>>   protected def newInlet[T](name: String): Inlet[T] = {
>> val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
>> Inlet[T](s"${_name}.$name")
>> _inlets :+= p
>> p
>>   }
>>
>>   protected def construct(init: Init[_]): FanShape[_]
>>
>>   def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()), 
>> outlets.map(_.carbonCopy(
>>   final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: 
>> immutable.Seq[Outlet[_]]): FanShape[_] = {
>> require(outlets.size == _outlets.size, s"p

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-18 Thread Sergey Sopin
Hi, Rafał! 

Thanks a lot! You gave me everything I need :) I was looking 
for AmorphousShape!

Thanks again, I will not be inventing a wheel anymore!

Regards,
Sergey


вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
написал:
>
> It's not clear to me, what are you trying to accomplish. It looks like you 
> are trying to implement AmorphousShape (ie. arbitrary number of open inlets 
> and outlets) on your own, and then a specific variant of it, that has all 
> inlets sharing the same type, and all outlets sharing another type. The 
> "Fan" fragment in the names you used is a bit misleading, since in Akka 
> Stream's own usage of it names like FanIn / FanOut shape mean that such 
> grap has many inlets and single outlet / single inlet many outlets. The 
> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
> many blades :) I am wondering what use case you have in mind for your 
> AmorphousShape because the graphs that can be materialized and executed 
> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
> for reusing pieces of functionality, but anything more complex than a 
> BidiShape  seems  rather unwieldy to me.
>
> My understanding is that Graph's shape should not interfere with message 
> flow, because it's just a canvas with contact points on the perimeter. What 
> matters are the components that you plug into it. Akka just makes sure that 
> you don't leave any of the contact points dangling. This makes me think 
> that the problems with messages getting "stuck" was caused somewhere other 
> than graph shape construction site.
>
> Have you tried inserting probes alon the lines of Flow.alsoTo(Sink.foreach(_ 
> => println("beep!"))) (shooting from the hip here, apologies if it does 
> not compile straight away) into your graph? That could help you locate 
> where the messages are stuck / discarded.
>
> Cheers,
> Rafał
>
> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik Sergey 
> Sopin napisał:
>>
>> Hi,
>>
>> I am trying to create my own akka streams shape with several Inlets and 
>> Outlets. I have written following code: 
>>
>> package kernel.modeller.workers.streamFinder.generic
>>
>> import akka.stream.{Shape, Outlet, Inlet}
>> import scala.annotation.unchecked.uncheckedVariance
>> import scala.collection.immutable
>>
>> object FanShape {
>>   sealed trait Init[_] {
>> def inlets: immutable.Seq[Inlet[_]]
>> def outlets: immutable.Seq[Outlet[_]]
>> def name: String
>>   }
>>   final case class Name[_](override val name: String) extends Init[Any] {
>> override def inlets: immutable.Seq[Inlet[_]] = Nil
>> override def outlets: immutable.Seq[Outlet[_]] = Nil
>>   }
>>   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
>> override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
>> override def name: String = "FanShape"
>>   }
>> }
>>
>> abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
>> Iterator[Outlet[_]], _name: String) extends Shape {
>>   
>>   import FanShape._
>>
>>   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
>> init.outlets.iterator, init.name)
>>
>>   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
>>   final override def inlets: immutable.Seq[Inlet[_]] = _inlets
>>
>>   private var _outlets: Vector[Outlet[_]] = Vector.empty
>>   private var _inlets: Vector[Inlet[_]] = Vector.empty
>>
>>   protected def newOutlet[T](name: String): Outlet[T] = {
>> val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
>> Outlet[T](s"${_name}.$name")
>> _outlets :+= p
>> p
>>   }
>>
>>   protected def newInlet[T](name: String): Inlet[T] = {
>> val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
>> Inlet[T](s"${_name}.$name")
>> _inlets :+= p
>> p
>>   }
>>
>>   protected def construct(init: Init[_]): FanShape[_]
>>
>>   def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()), 
>> outlets.map(_.carbonCopy(
>>   final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: 
>> immutable.Seq[Outlet[_]]): FanShape[_] = {
>> require(outlets.size == _outlets.size, s"proposed outlets 
>> [${outlets.mkString(", ")}] do not fit FanShape")
>> require(inlets.size == _inlets.size, s"proposed inlects 
>> [${inlets.mkString(", ")}] do not fit FanShape")

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-18 Thread Sergey Sopin
People say that following article may help: 

http://degoes.net/articles/insufficiently-polymorphic 

However, I still don't understand what's wrong with it. Any help will be 
appreciated.
Thanks!

Regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] Problems with the shape creation

2016-10-17 Thread Sergey Sopin
Hi,

I am trying to create my own akka streams shape with several Inlets and 
Outlets. I have written following code: 

package kernel.modeller.workers.streamFinder.generic

import akka.stream.{Shape, Outlet, Inlet}
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable

object FanShape {
  sealed trait Init[_] {
def inlets: immutable.Seq[Inlet[_]]
def outlets: immutable.Seq[Outlet[_]]
def name: String
  }
  final case class Name[_](override val name: String) extends Init[Any] {
override def inlets: immutable.Seq[Inlet[_]] = Nil
override def outlets: immutable.Seq[Outlet[_]] = Nil
  }
  final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
override def name: String = "FanShape"
  }
}

abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
Iterator[Outlet[_]], _name: String) extends Shape {
  
  import FanShape._

  def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
init.outlets.iterator, init.name)

  final override def outlets: immutable.Seq[Outlet[_]] = _outlets
  final override def inlets: immutable.Seq[Inlet[_]] = _inlets

  private var _outlets: Vector[Outlet[_]] = Vector.empty
  private var _inlets: Vector[Inlet[_]] = Vector.empty

  protected def newOutlet[T](name: String): Outlet[T] = {
val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
Outlet[T](s"${_name}.$name")
_outlets :+= p
p
  }

  protected def newInlet[T](name: String): Inlet[T] = {
val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
Inlet[T](s"${_name}.$name")
_inlets :+= p
p
  }

  protected def construct(init: Init[_]): FanShape[_]

  def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()), 
outlets.map(_.carbonCopy(
  final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: 
immutable.Seq[Outlet[_]]): FanShape[_] = {
require(outlets.size == _outlets.size, s"proposed outlets 
[${outlets.mkString(", ")}] do not fit FanShape")
require(inlets.size == _inlets.size, s"proposed inlects 
[${inlets.mkString(", ")}] do not fit FanShape")
construct(Ports(inlets, outlets))
  }
}

object UniformFanShape {
  def apply[I, O](inlets: Array[Inlet[I]], outlets: Outlet[O]*): 
UniformFanShape[I, O] =
new UniformFanShape(inlets.size, outlets.size, 
FanShape.Ports(inlets.toList, outlets.toList))
}

class UniformFanShape[-I, +O](n: Int, m: Int, _init: FanShape.Init[_]) extends 
FanShape(_init) {
  def this(n: Int, m: Int) = this (n, m, FanShape.Name("UniformFan"))
  def this(n: Int, m: Int, name: String) = this(n, m, FanShape.Name(name))
  def this(inlets: Array[Inlet[I]], outlets: Array[Outlet[O]]) = 
this(inlets.size, outlets.size, FanShape.Ports(inlets.toList, outlets.toList))
  override protected def construct(init: FanShape.Init[_]): FanShape[_] = new 
UniformFanShape(n, m, init)
  override def deepCopy(): UniformFanShape[I, O] = 
super.deepCopy().asInstanceOf[UniformFanShape[I, O]]

  val inArray: Array[Inlet[I @uncheckedVariance]] = Array.tabulate(n)(i ⇒ 
newInlet[I](s"in$i"))
  def in(n: Int): Inlet[I @uncheckedVariance] = inArray(n)

  val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(m)(j ⇒ 
newOutlet[O](s"out$j"))
  def out(m: Int): Outlet[O @uncheckedVariance] = outArray(m)
}


This code allows creating graph, however, it is not possible to process 
messages with it. It doesn't call handlers for messages, they get stuck 
somewhere. 
Could you please help me to fix it? 

PS: I am not an expert in Scala.

Thank you in advance!

Regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-streams] Debug of custom stream processing logic

2016-10-11 Thread Sergey Sopin
Yep, thanks! 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-streams] Debug of custom stream processing logic

2016-10-11 Thread Sergey Sopin
It may work, however it will not give me full picture of what is going on.

Thanks.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-streams] Debug of custom stream processing logic

2016-10-11 Thread Sergey Sopin
I agree, but I need to understand that this logic is being called. I do not 
need to debug whole flow, I need only one function inside one shape.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-streams] Debug of custom stream processing logic

2016-10-11 Thread Sergey Sopin
Hi,

The question is still valid :)

Thanks.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] AbstractOutHandler in the shape with several Inlets

2016-10-01 Thread Sergey Sopin
Hi,

I am trying to define custom logic for the shape with several inlets (for 
example custom shape with 2 or more inlets). 
I was trying to avoid defining out handler for the Outlet, however it 
throws an error, so I suppose that there is no default behaviour in the 
shape class. 
The problem is that *pull*  procedure takes Inlet as a parameter, but in my 
case I don't know what inlet I should use, because each of them could push 
an element into the Outlet. 
Could you please tell me is there some best practices how to do it? I could 
not find such example in the documentation 
.
It seems that I need to introduce some flag, which will tell me what inlet 
to use.

Thank you in advance!

Regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-streams] I have problems with adding Filter to a graph

2016-09-20 Thread Sergey Sopin
The problem was in a return type. I was trying to assign FlowShape to a 
UniformFanInShape.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: [akka-streams] I have problems with adding Filter to a graph

2016-09-20 Thread Sergey Sopin
Small addition. The error itself looks like this: 

[error] 
D:\Work\Development\Projects\Myzir\trunk\app\kernel\modeller\workers\streamFinder\generic\SubPathProcessingAbstractShape.java:36:
 
 inference variable S has incompatible bounds
[error] equality constraints: 
akka.stream.FlowShape
[error] upper bounds: 
akka.stream.UniformFanInShape,akka.stream.Shape
[error] final UniformFanInShape A = builder.add(flow);


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-streams] It is not possible to create Graph with the custom shapes

2016-09-20 Thread Sergey Sopin
Hi Roland,

Yeah, you are right, I forgot about toInlet() and from(Inlet[]) functions. 
Everything is good now.
Thanks!

Regards,
Sergey


вторник, 20 сентября 2016 г., 1:01:30 UTC+3 пользователь rkuhn написал:
>
> Hi Sergey,
>
> Just add your custom stage using builder.add() and then write the ports of 
> the resulting Shape using the normal GraphDSL methods. There is no need to 
> make the wiring aware of your special shape.
>
> Regards, Roland 
>
> Sent from my iPhone
>
> On 19 Sep 2016, at 22:35, Sergey Sopin <sopi...@gmail.com > 
> wrote:
>
> Hi,
>
> Due to performance reasons I decided to create my new custom Shape with 
> several inlets and several outlets (5 in and 3 out in my case). Then I 
> decided to create Graph with the new element and realized that it is not 
> possible without rewriting Graph object. In order to create my own shape I 
> had to create new class which extends Shape one. Then I created custom 
> logic by creating new class which extends GraphStage<MyNewShape<...>>. And 
> everything was good. 
> Then while creating graph I realized that my class contains more than one 
> input and one output, so neither UniformFanOutShape nor UniformFanInShape 
> can be used in graph. Due to that fact I created my own BiDirFanShape which 
> contains more than one inlet and one outlet. It allowed me to add my new 
> shape in a graph. But, then I realized that I am not able to connect my 
> shape with others because there are no functions "via" or "to" in a Builder 
> which take such arguments. So now, I have to rewrite object GraphDSL in 
> order to add there such functions. 
> It seems a bit painful. Probably I missed something, but I think that in 
> case you allow to create custom shapes you should have some generic 
> mechanism that allows to work with the newly created elements. I 
> cannot even override ForwardOps and ReverseOps classes because it are 
> final... Could you please let me know if I missed something and there is a 
> better solution?
>
> Thank you in advance!
>
> Best regards,
> Sergey
>
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] It is not possible to create Graph with the custom shapes

2016-09-19 Thread Sergey Sopin
Hi,

Due to performance reasons I decided to create my new custom Shape with 
several inlets and several outlets (5 in and 3 out in my case). Then I 
decided to create Graph with the new element and realized that it is not 
possible without rewriting Graph object. In order to create my own shape I 
had to create new class which extends Shape one. Then I created custom 
logic by creating new class which extends GraphStage>. And 
everything was good. 
Then while creating graph I realized that my class contains more than one 
input and one output, so neither UniformFanOutShape nor UniformFanInShape 
can be used in graph. Due to that fact I created my own BiDirFanShape which 
contains more than one inlet and one outlet. It allowed me to add my new 
shape in a graph. But, then I realized that I am not able to connect my 
shape with others because there are no functions "via" or "to" in a Builder 
which take such arguments. So now, I have to rewrite object GraphDSL in 
order to add there such functions. 
It seems a bit painful. Probably I missed something, but I think that in 
case you allow to create custom shapes you should have some generic 
mechanism that allows to work with the newly created elements. I 
cannot even override ForwardOps and ReverseOps classes because it are 
final... Could you please let me know if I missed something and there is a 
better solution?

Thank you in advance!

Best regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-streams] I have problems with adding Filter to a graph

2016-09-18 Thread Sergey Sopin
Hi,

I am trying to create and add filer to a graph. This filter receives 
request and returns new object with another data type.
I tried to do it like that:

final UniformFanInShape A = 
builder.add(Flow.of(CoverRequest.class).map(mes -> new CoverSetterData(mes)));


However in this case Java unable to recognize lambda return type. 
My next attempt was to create Flow and then add it to a graph:

Flow flow = 
Flow.of(CoverRequest.class).map((CoverRequest mes) -> new CoverSetterData(mes));
final UniformFanInShape A = builder.add(flow);


But in this case it says that builder.add(...) function doesn't take such 
arguments. 

Could you please help me with this issue? 
Thanks a lot!

Regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream] Multi input/output flow

2016-08-25 Thread Sergey Sopin
Thanks a lot! It is exactly what I need, but can I do the same in Java? 

Regards,
Sergey

четверг, 25 августа 2016 г., 21:16:44 UTC+3 пользователь Konrad Malawski 
написал:
>
> If it has more than 1 input and 1 output it's not a Flow anymore, so let's 
> not refer to it as a Flow.
> It's a Graph of a given Shape.
>
> There's nothing special about Shape, so if you want a custom one, you 
> simply extend Shape, as is documented in the reference docs:
>
> http://doc.akka.io/docs/akka/2.4.9/scala/stream/stream-graphs.html#building-reusable-graph-components
>  
>
> Notice that it's still fully typesafe for users of the shape!
>
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 25 August 2016 at 20:07:02, Sergey Sopin (sopi...@gmail.com 
> ) wrote:
>
> Thanks a lot for your response.  
>
> However it is still not clear.  
> I am talking about FlowShape with several Inlets and Outlets. As I 
> understood from: 
> http://doc.akka.io/docs/akka/2.4/java/stream/stream-customize.html#Custom_linear_processing_stages_using_GraphStage
> I have to create Shape by myself as a pair of Inlet and Outlet, but in my 
> case I either need to create all possible combinations of Inlets & Outlets 
> and return one of them from the function 
>   @Override
>   public SourceShape shape() {
> return shape;
>   }
> or invent something new. 
>
> First option will not work in my case, so I need something new here.
>
> Could you please tell me how to create such multi inlet/outlet shape? Or 
> please explain me if I don't understand something. 
> Thanks a lot!
>
>
> четверг, 25 августа 2016 г., 17:50:00 UTC+3 пользователь Konrad Malawski 
> написал: 
>>
>> Handlers have no shape. The GraphStage does.
>>
>> So your stage has some shape.
>>
>> And then you set as many handlers as you need.
>>
>> Still simple sources and sinks, but we just published a blog post 
>> explaining it a bit more: 
>> http://blog.akka.io/integrations/2016/08/25/simple-sink-source-with-graphstage
>>
>> -- 
>> Konrad `ktoso` Malawski
>> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>>
>> On 25 August 2016 at 15:55:14, Sergey Sopin (sopi...@gmail.com) wrote:
>>
>> Hi, 
>>
>> I am trying to create multi input/output flow. It should obtain messages 
>> from different Inlets and emit to different Outlets based on parameter 
>> inside the message. 
>>
>> Something like following:
>>
>>1. ...
>>A elem = grab(in1); 
>>2.if (p.test(elem)) { 
>>3.  push(out1, elem); 
>>4.} else { 
>>5.  push(out2, elem); 
>>6.}
>>... 
>>
>>
>> But, usual case is when I am creating shape and setting handler for it. 
>> It seems that in my case I need different shapes inside one handler.
>>
>> Thanks a lot!
>>
>> Regards,
>> Sergey
>>
>>  
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] [akka-stream] Multi input/output flow

2016-08-25 Thread Sergey Sopin
Thanks a lot for your response. 

However it is still not clear. 
I am talking about FlowShape with several Inlets and Outlets. As I 
understood 
from: 
http://doc.akka.io/docs/akka/2.4/java/stream/stream-customize.html#Custom_linear_processing_stages_using_GraphStage
I have to create Shape by myself as a pair of Inlet and Outlet, but in my 
case I either need to create all possible combinations of Inlets & Outlets 
and return one of them from the function 
  @Override
  public SourceShape shape() {
return shape;
  }
or invent something new. 

First option will not work in my case, so I need something new here.

Could you please tell me how to create such multi inlet/outlet shape? Or 
please explain me if I don't understand something. 
Thanks a lot!


четверг, 25 августа 2016 г., 17:50:00 UTC+3 пользователь Konrad Malawski 
написал:
>
> Handlers have no shape. The GraphStage does.
>
> So your stage has some shape.
>
> And then you set as many handlers as you need.
>
> Still simple sources and sinks, but we just published a blog post 
> explaining it a bit more: 
> http://blog.akka.io/integrations/2016/08/25/simple-sink-source-with-graphstage
>
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 25 August 2016 at 15:55:14, Sergey Sopin (sopi...@gmail.com 
> ) wrote:
>
> Hi, 
>
> I am trying to create multi input/output flow. It should obtain messages 
> from different Inlets and emit to different Outlets based on parameter 
> inside the message. 
>
> Something like following:
>
>1. ...
>A elem = grab(in1); 
>2.if (p.test(elem)) { 
>3.  push(out1, elem); 
>4.} else { 
>5.  push(out2, elem); 
>6.}
>... 
>
>
> But, usual case is when I am creating shape and setting handler for it. It 
> seems that in my case I need different shapes inside one handler.
>
> Thanks a lot!
>
> Regards,
> Sergey
>
>  
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] [akka-stream] Multi input/output flow

2016-08-25 Thread Sergey Sopin
Hi,

I am trying to create multi input/output flow. It should obtain messages 
from different Inlets and emit to different Outlets based on parameter 
inside the message. 

Something like following:

   1. ...
   A elem = grab(in1);
   2.if (p.test(elem)) {
   3.  push(out1, elem);
   4.} else {
   5.  push(out2, elem);
   6.}
   ...


But, usual case is when I am creating shape and setting handler for it. It 
seems that in my case I need different shapes inside one handler.

Thanks a lot!

Regards,
Sergey

 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams Graph Editor tool

2016-08-22 Thread Sergey Sopin
Thanks! I will try.

понедельник, 22 августа 2016 г., 14:32:53 UTC+3 пользователь Konrad 
Malawski написал:
>
> Sorry, I haven't been using Windows since years hm.
>
> I know Endre used raw Inkscape to do some of our drawings for our docs.
>
>
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 22 August 2016 at 13:14:17, Sergey Sopin (sopi...@gmail.com 
> ) wrote:
>
> Hi again, 
>
> Omnigraffle is very good program, but I use Windows :(
> Can you recommend something for it? 
>
> Thanks!
>
> Regards,
> Sergey
>
> воскресенье, 17 июля 2016 г., 16:05:59 UTC+3 пользователь Konrad Malawski 
> написал: 
>>
>> You may enjoy these Omnigraffle stencils: 
>> https://www.graffletopia.com/stencils/1493 by Derek Wyatt
>>
>> -- 
>> Konrad `ktoso` Malawski
>> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>>
>> On 16 July 2016 at 17:44:18, Sergey Sopin (sopi...@gmail.com) wrote:
>>
>> Hi, 
>>
>> Is there any tool which allows to draw graphs as they were shown in akka 
>> docs? I mean that allows using the same blocks and elements.
>>
>> Thanks a lot!
>>
>> Best regards,
>> Sergey
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Akka Streams Graph Editor tool

2016-08-22 Thread Sergey Sopin
Hi again,

Omnigraffle is very good program, but I use Windows :(
Can you recommend something for it? 

Thanks!

Regards,
Sergey

воскресенье, 17 июля 2016 г., 16:05:59 UTC+3 пользователь Konrad Malawski 
написал:
>
> You may enjoy these Omnigraffle stencils: 
> https://www.graffletopia.com/stencils/1493 by Derek Wyatt
>
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
>
> On 16 July 2016 at 17:44:18, Sergey Sopin (sopi...@gmail.com ) 
> wrote:
>
> Hi, 
>
> Is there any tool which allows to draw graphs as they were shown in akka 
> docs? I mean that allows using the same blocks and elements.
>
> Thanks a lot!
>
> Best regards,
> Sergey
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka Streams Graph Editor tool

2016-07-16 Thread Sergey Sopin
Hi,

Is there any tool which allows to draw graphs as they were shown in akka 
docs? I mean that allows using the same blocks and elements.

Thanks a lot!

Best regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Patterns.Ask replacement

2016-07-11 Thread Sergey Sopin
Hi, 

Yeah I was looking for something like that. But now I realized that I was 
trying to solve task partially. I can fully replace set of actors in my app 
with the complex graph in order to obtain desirable result.

Thanks!

понедельник, 11 июля 2016 г., 22:39:20 UTC+3 пользователь Justin du coeur 
написал:
>
> Ah -- yeah, if you're looking for a more Actor-compatible way to do single 
> request/response interactions, check out Requester 
> <https://github.com/jducoeur/Requester>, which was designed for exactly 
> this.  It's intended specifically as a replacement for ask(), in a way that 
> is relatively safe and composable...
>
> On Mon, Jul 11, 2016 at 9:00 AM, Akka Team <akka.o...@gmail.com 
> > wrote:
>
>> Hi Sergey,
>>
>> What are you trying to achieve?
>> In general Streams are geared more for processing pipelines than "get one 
>> element"-request-response type of interaction.
>>
>> --
>> Johan
>>
>> On Tue, Jul 5, 2016 at 9:22 PM, Sergey Sopin <sopi...@gmail.com 
>> > wrote:
>>
>>> Hi,
>>>
>>> Is there any possibility to replace akka.patterns.ask / pipe 
>>> communication with akks-streams? I was looking for at least small example, 
>>> but could not find it. 
>>> I suppose that it can be done with BidiFlow, but I am not sure if I know 
>>> how. I would be very grateful for such example.
>>>
>>> Thank you in advance!
>>>
>>> Regards,
>>> Sergey
>>>
>>> -- 
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to akka-user+...@googlegroups.com .
>>> To post to this group, send email to akka...@googlegroups.com 
>>> .
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Patterns.Ask replacement

2016-07-05 Thread Sergey Sopin
Hi,

Is there any possibility to replace akka.patterns.ask / pipe communication 
with akks-streams? I was looking for at least small example, but could not 
find it. 
I suppose that it can be done with BidiFlow, but I am not sure if I know 
how. I would be very grateful for such example.

Thank you in advance!

Regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Bad example in documentation

2016-07-01 Thread Sergey Sopin
Hi,

It seems that one of examples in the documentation contains bug, namely: 
http://doc.akka.io/docs/akka/2.4.7/java/stream/stream-graphs.html


   1. final Source in = Source.from(Arrays.asList(1, 2, 3, 
   4, 5));
   2. final Sink> sink = Sink.
   head();
   3. final Sink> sink2 = Sink
   .head();
   4. final Flow f1 = Flow.of(Integer.class).map(
   elem -> elem + 10);
   5. final Flow f2 = Flow.of(Integer.class).map(
   elem -> elem + 20);
   6. final Flow f3 = Flow.of(Integer.class).map(
   elem -> elem.toString());
   7. final Flow f4 = Flow.of(Integer.class).map(
   elem -> elem + 30);
   8. 
   9. final RunnableGraph> result =
   10.  RunnableGraph.>fromGraph(
   11.GraphDSL
   12.  .create(
   13.sink,
   14.(builder, out) -> {
   15.  final UniformFanOutShape bcast = builder.
   add(Broadcast.create(2));
   16.  final UniformFanInShape merge = builder.
   add(Merge.create(2));
   17. 
   18.   final Outlet source = builder.add(in).out();
   19.  builder.from(source).via(builder.add(f1))
   20..viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge)
   21..via(builder.add(f3.grouped(1000))).to(out);
   22.  builder.from(bcast).via(builder.add(f4)).toFanIn(merge);
   23.  return ClosedShape.getInstance();
   24.}));


Here we define 'sink2' variable, but never use it. We are using 'out' 
variable instead, which is undefined.
Please fix this confusing thing when it will be possible.

Thanks,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] How to get parent of router from routee?

2016-03-27 Thread Sergey Sopin
Hi all, 

Thanks for your tips. I have implemented approach with creator, thanks a 
lot!

Regards,
Sergey

воскресенье, 27 марта 2016 г., 22:52:35 UTC+3 пользователь Guido Medina 
написал:
>
> It is also possible to pass the grandpa actor via creator, that way it 
> doesn't matter what member of the family you pass, it is always part of 
> your creator and accessible to the actor.
>
> On Sunday, March 27, 2016 at 6:06:22 PM UTC+1, Konrad Malawski wrote:
>>
>> If it were me, I'd instead make this explicit part of the protocol - i.e. 
>> introduce this Actor via a message to the worker.
>>
>> If you just rely on "parent of my parent" then you lock yourself into 
>> this structure and make it harder to refactor if you change
>> your mind about the hierarchy you need.
>>
>> Hierarchy should be mostly used for supervision and creating "islands" 
>> that fail together etc. (read up on supervision hierarchies).
>>
>> Hope this helps.
>>
>> -- 
>> Cheers,
>> Konrad 'ktoso’ Malawski
>> <http://akka.io>Akka <http://akka.io> @ Lightbend <http://typesafe.com>
>> <http://lightbend.com>
>>
>> On 27 March 2016 at 13:27:04, Sergey Sopin (sopi...@gmail.com) wrote:
>>
>> Hi, 
>>
>> Is it possible to get parent of router from routee? If I would use 
>> context().parent() it will return router itself, but I need to find parent 
>> of parent in this case.
>> Thanks.
>>
>> Best regards,
>> Sergey Sopin
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] How to get parent of router from routee?

2016-03-27 Thread Sergey Sopin
Hi,

Is it possible to get parent of router from routee? If I would use 
context().parent() it will return router itself, but I need to find parent 
of parent in this case.
Thanks.

Best regards,
Sergey Sopin

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Error while creating router from config

2016-03-04 Thread Sergey Sopin
Hi,

I am trying to create router from configuration but I'm getting 
ConfigurationException. My code is following: 

ActorRef subPathFinderRouter = 
context().actorOf(FromConfig.getInstance().props(Props.create(SubPathFinderActor.class)),
 "subPathFinderRouter");


Config: 

"/*/subPathFinderRouter" {
  #router = smallest-mailbox-pool
  router = round-robin-pool
  resizer {

enabled = off

 }

  optimal-size-exploring-resizer {
enabled = on
lower-bound = 10
#upper-bound = 10
chance-of-ramping-down-when-full = 0.2
downsize-after-underutilized-for = 1h
explore-step-size = 0.2
chance-of-exploration = 0.4
downsize-ratio = 0.8
optimization-range = 50
weight-of-latest-metric = 0.5
  }
}


When its trying to create this router it throws following error: 

[ERROR] [03/04/2016 19:47:45.466] 
[ModellerSystem-akka.actor.deployment.single-highload-dispatcher-24] 
[akka://ModellerSystem/user/localProcessor/$a/$b/$b/$a/$a] configuration 
problem while creating 
[akka://ModellerSystem/user/localProcessor/$a/$b/$b/$a/$a/subPathFinderRouter] 
with router dispatcher [akka.actor.default-dispatcher] and mailbox 
[akka.actor.default-mailbox] and routee dispatcher 
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
akka.ConfigurationException: configuration problem while creating 
[akka://ModellerSystem/user/localProcessor/$a/$b/$b/$a/$a/subPathFinderRouter] 
with router dispatcher [akka.actor.default-dispatcher] and mailbox 
[akka.actor.default-mailbox] and routee dispatcher 
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:753)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:206)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:37)
at akka.actor.ActorCell.actorOf(ActorCell.scala:369)
at 
kernel.modeller.managers.SubPathProcessingSupervisor.createRouter(SubPathProcessingSupervisor.java:44)
at 
kernel.modeller.managers.SubPathProcessingSupervisor.lambda$new$0(SubPathProcessingSupervisor.java:54)
at 
kernel.modeller.managers.SubPathProcessingSupervisor$$Lambda$102/1078115745.apply(Unknown
 
Source)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:47)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: Configuration missing for router 
[akka://ModellerSystem/user/localProcessor/$a/$b/$b/$a/$a/subPathFinderRouter] 
in 'akka.actor.deployment' section.
at akka.routing.FromConfig.verifyConfig(RouterConfig.scala:297)
at akka.routing.RoutedActorRef.(RoutedActorRef.scala:40)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:751)
... 22 more

[ERROR] [03/04/2016 19:47:45.466] 
[ModellerSystem-akka.actor.deployment.single-highload-dispatcher-25] 
[akka://ModellerSystem/user/localProcessor/$a/$b/$b/$b/$a] configuration 
problem while creating 
[akka://ModellerSystem/user/localProcessor/$a/$b/$b/$b/$a/subPathFinderRouter] 
with router dispatcher [akka.actor.default-dispatcher] and mailbox 
[akka.actor.default-mailbox] and routee dispatcher 
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
akka.ConfigurationException: configuration problem while creating 
[akka://ModellerSystem/user/localProcessor/$a/$b/$b/$b/$a/subPathFinderRouter] 
with router dispatcher [akka.actor.default-dispatcher] and mailbox 
[akka.actor.default-mailbox] and routee dispatcher 
[akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:753)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:206)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:37)
at akka.actor.ActorCell.actorOf(ActorCell.scala:369)
at 
kernel.modeller.managers.SubPathProcessingSupervisor.createRouter(SubPathProcessingSupervisor.java:44)
at 
kernel.modeller.managers.SubPathProcessingSupervisor.lambda$new$0(SubPathProcessingSupervisor.java:54)
at 

Re: [akka-user] Re: Akka behaviour

2015-09-24 Thread Sergey Sopin
In my case system doesn't stop work after timeout, so I just want to return 
the system in the initial state, so that it will be able to process other 
requests.

Best regards,
Sergey

четверг, 24 сентября 2015 г., 21:31:37 UTC+3 пользователь Patrik Nordwall 
написал:
>
>
>
> On Tue, Sep 22, 2015 at 7:07 PM, Sergey Sopin <sopi...@gmail.com 
> > wrote:
>
>> Hi Patrik,
>>
>> Thank you for clarification. 
>> I have been using onComplete() procedure in order to check response 
>> status, but now I have replaced it with the Future.map(). As result of 
>> map() operation I have a new transformed Future. 
>> In my case all child actors send to response to caller actor using 
>> following operation: 
>> Patterns.pipe(result, ec).to(sender);
>>
>> And each caller actor has following code: 
>>
>> Future f1 = Patterns.ask(childActorRef, new Message(Param param), 
>> askDuration);
>>
>> Future result = f1.map(new Mapper<Object, Result>() {
>>
>>   public SignModelGenerationResponse apply(Object o) {
>>  Result result = (Result) o;
>>  result.add(somethig);
>>  return result;
>>   }
>> }, ExecutionContext);
>>
>>
>> Patterns.pipe(result, ec).to(sender);
>>
>>
>> Probably you can explain me how to change this behaviour in order to 
>> avoid situations mentioned previously?
>>
>
> That code looks good, but be aware of that it will send 
> akka.actor.Status.Failure 
> message to the sender if you get a timeout.
>
> I don't know what you want to do. Why would you need to restart something 
> because of a ask timeout?
>
> /Patrik
>  
>
>>
>> Best regards,
>> Sergey
>>
>> вторник, 22 сентября 2015 г., 11:29:03 UTC+3 пользователь Patrik Nordwall 
>> написал:
>>>
>>> The supervisor strategy comes into play when a child actor throws an 
>>> exception. That is not something that happens automatically because of a 
>>> ask timeout. Normally you use ask together with pipeTo and then you will 
>>> receive a akka.actor.Status.Failure message with the exception cause. Note 
>>> that this is a message, and you have to actively throw the cause to trigger 
>>> supervision action.
>>>
>>> Regards,
>>> Patrik
>>>
>>> On Sat, Sep 12, 2015 at 10:32 PM, Sergey Sopin <sopi...@gmail.com> 
>>> wrote:
>>>
>>>> Workflow is following:
>>>>
>>>>
>>>> <https://lh3.googleusercontent.com/-2NQ3uKBGhc4/VfSL1PLrg0I/AXk/O2bNmf2pVkk/s1600/Flow.jpg>
>>>>
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> -- 
>>>
>>> Patrik Nordwall
>>> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
>>> Twitter: @patriknw
>>>
>>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
>
> Patrik Nordwall
> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Akka behaviour

2015-09-22 Thread Sergey Sopin
Hi Patrik,

Thank you for clarification. 
I have been using onComplete() procedure in order to check response status, 
but now I have replaced it with the Future.map(). As result of map() 
operation I have a new transformed Future. 
In my case all child actors send to response to caller actor using 
following operation: 
Patterns.pipe(result, ec).to(sender);

And each caller actor has following code: 

Future f1 = Patterns.ask(childActorRef, new Message(Param param), 
askDuration);

Future result = f1.map(new Mapper<Object, Result>() {

  public SignModelGenerationResponse apply(Object o) {
 Result result = (Result) o;
 result.add(somethig);
 return result;
  }
}, ExecutionContext);


Patterns.pipe(result, ec).to(sender);


Probably you can explain me how to change this behaviour in order to avoid 
situations mentioned previously?

Best regards,
Sergey

вторник, 22 сентября 2015 г., 11:29:03 UTC+3 пользователь Patrik Nordwall 
написал:
>
> The supervisor strategy comes into play when a child actor throws an 
> exception. That is not something that happens automatically because of a 
> ask timeout. Normally you use ask together with pipeTo and then you will 
> receive a akka.actor.Status.Failure message with the exception cause. Note 
> that this is a message, and you have to actively throw the cause to trigger 
> supervision action.
>
> Regards,
> Patrik
>
> On Sat, Sep 12, 2015 at 10:32 PM, Sergey Sopin <sopi...@gmail.com 
> > wrote:
>
>> Workflow is following:
>>
>>
>> <https://lh3.googleusercontent.com/-2NQ3uKBGhc4/VfSL1PLrg0I/AXk/O2bNmf2pVkk/s1600/Flow.jpg>
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
>
> Patrik Nordwall
> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
> Twitter: @patriknw
>
>

-- 
>>>>>>>>>>  Read the docs: http://akka.io/docs/
>>>>>>>>>>  Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka behaviour

2015-09-12 Thread Sergey Sopin
Hi all,

I have problems with Akka in case of errors, namely: Ask time out. I 
suppose that the system should stop processing and restart itself when 
error happens. I have supervisor actor with following supervision strategy: 

private static SupervisorStrategy strategy =
new OneForOneStrategy(5, Duration.create("1 minute"), DeciderBuilder.
match(NullPointerException.class,   e -> 
SupervisorStrategy.escalate()).
matchAny(o -> SupervisorStrategy.restart()).build());

@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}


All other actors are child ones. In case of ask time out exception I see 
following: 

[ERROR] [09/12/2015 19:19:56.291] 
[application-akka.actor.default-dispatcher-6] 
[akka://application/user/localProcessor/$b/$b/$b/$a] Path models 
aggregation failed with following message: Ask timed out on 
[Actor[akka://application/user/localProcessor/$b/$b/$b/$a/$a#447300449]] 
after [2 ms]
[ERROR] [09/12/2015 19:19:56.294] 
[application-akka.actor.default-dispatcher-6] 
[akka://application/user/localProcessor/$b/$b] GtObject processing failed. 
Message: Ask timed out on 
[Actor[akka://application/user/localProcessor/$b/$b/$b/$a/$a#447300449]] 
after [2 ms]
[INFO] [09/12/2015 19:19:56.565] 
[application-akka.actor.default-dispatcher-10] 
[akka://application/deadLetters] Message 
[kernel.modeller.data.messages.SubPathProcessingResponse] from 
Actor[akka://application/deadLetters] to 
Actor[akka://application/deadLetters] was not delivered. [1] dead letters 
encountered. This logging can be turned off or adjusted with configuration 
settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [09/12/2015 19:19:56.565] 
[application-akka.actor.default-dispatcher-10] 
[akka://application/deadLetters] Message 
[kernel.modeller.data.messages.SubPathProcessingResponse] from 
Actor[akka://application/deadLetters] to 
Actor[akka://application/deadLetters] was not delivered. [2] dead letters 
encountered. This logging can be turned off or adjusted with configuration 
settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [09/12/2015 19:19:56.565] 
[application-akka.actor.default-dispatcher-10] 
[akka://application/deadLetters] Message 
[kernel.modeller.data.messages.SubPathProcessingResponse] from 
Actor[akka://application/deadLetters] to 
Actor[akka://application/deadLetters] was not delivered. [3] dead letters 
encountered. This logging can be turned off or adjusted with configuration 
settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [09/12/2015 19:19:56.565] 
[application-akka.actor.default-dispatcher-10] 
[akka://application/deadLetters] Message 
[kernel.modeller.data.messages.SubPathProcessingResponse] from 
Actor[akka://application/deadLetters] to 
Actor[akka://application/deadLetters] was not delivered. [4] dead letters 
encountered. This logging can be turned off or adjusted with configuration 
settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [09/12/2015 19:19:56.935] 
[application-akka.actor.default-dispatcher-11] 
[akka://application/user/localProcessor/$b/$b/$b/$a/$a] Message 
[kernel.modeller.data.messages.SubPathProcessingRequest] from 
Actor[akka://application/temp/$a] to 
Actor[akka://application/user/localProcessor/$b/$b/$b/$a/$a#447300449] was 
not delivered. [5] dead letters encountered. This logging can be turned off 
or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [09/12/2015 19:19:56.935] 
[application-akka.actor.default-dispatcher-11] 
[akka://application/user/localProcessor/$b/$b/$b/$a/$a] Message 
[kernel.modeller.data.messages.SubPathProcessingRequest] from 
Actor[akka://application/temp/$b] to 
Actor[akka://application/user/localProcessor/$b/$b/$b/$a/$a#447300449] was 
not delivered. [6] dead letters encountered. This logging can be turned off 
or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [09/12/2015 19:19:56.935] 
[application-akka.actor.default-dispatcher-11] 
[akka://application/user/localProcessor/$b/$b/$b/$a/$a] Message 
[kernel.modeller.data.messages.SubPathProcessingRequest] from 
Actor[akka://application/temp/$c] to 
Actor[akka://application/user/localProcessor/$b/$b/$b/$a/$a#447300449] was 
not delivered. [7] dead letters encountered. This logging can be turned off 
or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [09/12/2015 19:19:56.935] 
[application-akka.actor.default-dispatcher-11] 
[akka://application/user/localProcessor/$b/$b/$b/$a/$a] Message 
[kernel.modeller.data.messages.SubPathProcessingRequest] from 
Actor[akka://application/temp/$d] to 
Actor[akka://application/user/localProcessor/$b/$b/$b/$a/$a#447300449] was 
not delivered. [8] dead letters 

[akka-user] Re: Akka behaviour

2015-09-12 Thread Sergey Sopin


Workflow is following:



-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Akka massages delivery and memory issue

2015-08-17 Thread Sergey Sopin
It seems I have dead lock on my data, because I am passing the same object 
to several actors in parallel. 

воскресенье, 16 августа 2015 г., 12:03:43 UTC+3 пользователь Sergey Sopin 
написал:

 Hello, 

 I have some problems with Akka and memory, hope you can help me. 
 I have following code: 

 //CallerActor procedure

 private IterableFutureObject 
 process(ConfiguredPathProcessingRequest request) {
 CollectionFutureObject result = new ArrayList();
 for (int ceSize = request.minCoverElementSize; ceSize = 
 request.path.getMaxCoverElementSize(); ceSize += 2) {
 for (int segSize = request.minCoverElementSize; segSize = 
 ceSize; segSize += 2) {
 FutureObject future = 
 Patterns.ask(subPathProcessingSupervisor,
 new SubPathProcessingRequest(request.path, ceSize, 
 segSize), waitingDuration);
 result.add(future);
 }
 }

 return result;
 }


 //WorkerSupervisor Actor:


 public class SubPathProcessingSupervisor extends NotificationActor{

 {receive(ReceiveBuilder.match(SubPathProcessingRequest.class, o - {
 log().debug(SubPathProcessingSupervisor : Message 
 received...);
 ActorRef subPathProcessorActor = 
 context().actorOf(Props.create(SubPathFinderActor.class));
 subPathProcessorActor.forward(o, context());
 }).matchAny( o - unhandled(o)).build());
 }
 }

 //SubPathFinderActor:

 {receive(ReceiveBuilder.match(SubPathProcessingRequest.class, mes - {
 log().debug(SubPathFinder actor : Message received...);
 first = true;
 last = false;
 findSubPaths(mes.path, mes.coverElementSize, mes.segmentSize);
 }).matchAny(message - unhandled(message)).build());
 }


 There I have caller actor which creates single instance of Worker 
 Supervisor and sends requests to it. Supervisor creates new instance of 
 worker per each request and forwards message to it. As you could see there 
 I have some logging when message received, but following recod doesn't 
 appear: SubPathFinder actor : Message received 
 What's wrong there? 

 PS: Processor is really busy all the time. Loading is 100%. Memory 
 consumprion in debug mode is 1.2 Gb.

 Thanks!


-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Akka massages delivery and memory issue

2015-08-16 Thread Sergey Sopin
Hello, 

I have some problems with Akka and memory, hope you can help me. 
I have following code: 

//CallerActor procedure

private IterableFutureObject 
process(ConfiguredPathProcessingRequest request) {
CollectionFutureObject result = new ArrayList();
for (int ceSize = request.minCoverElementSize; ceSize = 
request.path.getMaxCoverElementSize(); ceSize += 2) {
for (int segSize = request.minCoverElementSize; segSize = 
ceSize; segSize += 2) {
FutureObject future = 
Patterns.ask(subPathProcessingSupervisor,
new SubPathProcessingRequest(request.path, ceSize, 
segSize), waitingDuration);
result.add(future);
}
}

return result;
}


//WorkerSupervisor Actor:


public class SubPathProcessingSupervisor extends NotificationActor{

{receive(ReceiveBuilder.match(SubPathProcessingRequest.class, o - {
log().debug(SubPathProcessingSupervisor : Message 
received...);
ActorRef subPathProcessorActor = 
context().actorOf(Props.create(SubPathFinderActor.class));
subPathProcessorActor.forward(o, context());
}).matchAny( o - unhandled(o)).build());
}
}

//SubPathFinderActor:

{receive(ReceiveBuilder.match(SubPathProcessingRequest.class, mes - {
log().debug(SubPathFinder actor : Message received...);
first = true;
last = false;
findSubPaths(mes.path, mes.coverElementSize, mes.segmentSize);
}).matchAny(message - unhandled(message)).build());
}


There I have caller actor which creates single instance of Worker 
Supervisor and sends requests to it. Supervisor creates new instance of 
worker per each request and forwards message to it. As you could see there 
I have some logging when message received, but following recod doesn't 
appear: SubPathFinder actor : Message received 
What's wrong there? 

PS: Processor is really busy all the time. Loading is 100%. Memory 
consumprion in debug mode is 1.2 Gb.

Thanks!

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Message was not delivered issue

2015-05-06 Thread Sergey Sopin
Hi Endre,

I tried to reproduce my problem, but I could not do this. Everything works 
correctly in my test 
roject: https://github.com/SScare/PilotsAndTests/tree/master/PlayAkka
I am confused even more than earlier...

Regards,
Sergey 

вторник, 5 мая 2015 г., 14:25:23 UTC+3 пользователь Akka Team написал:

 Hi Sergey,

 The code looks OK, but I think it is not enough to see where the issue is. 
 Try to put together a minimized, self-contained code example that shows the 
 symptoms. The example should contain only minimal functionality that is 
 enough to replicate the problem.

 -Endre

 On Sun, May 3, 2015 at 12:48 PM, Sergey Sopin sopi...@gmail.com 
 javascript: wrote:

 Hi, I'm developing a system based on Akka and I got stuck with situation 
 which is completely unclear for me. 
 I have three actors with following hierarchy: Root actor - 
 SubPathProcessingSupervisor - SubPathFinderActor.
 Root actor creates SubPathProcessingSupervisor  one during preStart and 
 then tries to send message to it using ask pattern. 

 //Root actor

 @Override
 public void preStart() throws Exception {
 super.preStart();
 this.subPathProcessingSupervisor = 
 context().actorOf(Props.create(SubPathProcessingSupervisor.class));
 this.pathModelsAggregatorSupervisor = 
 context().actorOf(Props.create(PathModelsAggregatorSupervisor.class));
 this.waitingDuration = (int)readConfig(akka.actor.timeouts, 
 subPathFindingAskDuration, 5000);
 }
 
 //...
 
 private IterableFutureObject 
 process(ConfiguredPathProcessingRequest request) {
 CollectionFutureObject result = new ArrayList();
 for (int ceSize = request.minCoverElementSize; ceSize = 
 request.path.getMaxCoverElementSize(); ceSize += 2) {
 for (int segSize = request.minCoverElementSize; segSize = 
 ceSize; segSize += 2) {
 FutureObject future = 
 Patterns.ask(subPathProcessingSupervisor,
 new SubPathProcessingRequest(request.path, 
 ceSize, segSize), waitingDuration);
 result.add(future);
 }
 }

 return result;
 }

 SubPathProcessingSupervisor has following structure: 

 public class SubPathProcessingSupervisor extends LoggingActor{

 @Override
 public void onReceive(Object o) throws Exception {
 logger.info(SubPathProcessingSupervisor : Message received...);
 if(o instanceof SubPathProcessingRequest) {
 ActorRef subPathProcessorActor = 
 context().actorOf(Props.create(SubPathFinderActor.class));
 subPathProcessorActor.forward(o, context());
 } else {
 unhandled(o);
 }
 }
 }

 From time to time, when I am trying to debug my application I see 
 following messages:

 Either: 

 [INFO] [05/03/2015 13:43:48.413] 
 [application-akka.actor.default-dispatcher-9] 
 [akka://application/user/localProcessor/$a/$a/$b/$a/$a/$a] Message 
 [kernel.modeller.data.messages.SubPathProcessingRequest] from 
 Actor[akka://application/temp/$a] to 
 Actor[akka://application/user/localProcessor/$a/$a/$b/$a/$a/$a#342530440] 
 was not delivered. [1] dead letters encountered. This logging can be turned 
 off or adjusted with configuration settings 'akka.log-dead-letters' and 
 'akka.log-dead-letters-during-shutdown'.


 Or: 

  [INFO] [05/03/2015 13:44:18.962] 
 [application-akka.actor.default-dispatcher-2] 
 [akka://application/user/localProcessor/$b/$a/$b/$b/$a] Message 
 [kernel.modeller.data.messages.SubPathProcessingRequest] from 
 Actor[akka://application/temp/$c] to 
 Actor[akka://application/user/localProcessor/$b/$a/$b/$b/$a#-285620982] was 
 not delivered. [3] dead letters encountered. This logging can be turned off 
 or adjusted with configuration settings 'akka.log-dead-letters' and 
 'akka.log-dead-letters-during-shutdown'.


 I am really confused with this issue because there are some successful 
 attempts... But in most cases messages were not delivered 
 to SubPathProcessingSupervisor, and very rarely from supervisor to 
 SubPathFinderActor. Each attempt may have different results with the same 
 input. Could you please help me to understand what is going on here? 

 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google Groups 
 Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send an 
 email to akka-user+...@googlegroups.com javascript:.
 To post to this group, send email to akka...@googlegroups.com 
 javascript:.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.




 -- 
 Akka Team
 Typesafe - Reactive apps on the JVM
 Blog: letitcrash.com
 Twitter: @akkateam
  

-- 
  Read the docs: http://akka.io/docs/
  Check

Re: [akka-user] Message was not delivered issue

2015-05-06 Thread Sergey Sopin
It seems my problem is solved. I have stopped root actor before messages 
were delivered to child ones.
Sorry for disturbing and thank you.

Regards,
Sergey

среда, 6 мая 2015 г., 13:30:12 UTC+3 пользователь Sergey Sopin написал:

 Hi Endre,

 I tried to reproduce my problem, but I could not do this. Everything works 
 correctly in my test roject: 
 https://github.com/SScare/PilotsAndTests/tree/master/PlayAkka
 I am confused even more than earlier...

 Regards,
 Sergey 

 вторник, 5 мая 2015 г., 14:25:23 UTC+3 пользователь Akka Team написал:

 Hi Sergey,

 The code looks OK, but I think it is not enough to see where the issue 
 is. Try to put together a minimized, self-contained code example that shows 
 the symptoms. The example should contain only minimal functionality that is 
 enough to replicate the problem.

 -Endre

 On Sun, May 3, 2015 at 12:48 PM, Sergey Sopin sopi...@gmail.com wrote:

 Hi, I'm developing a system based on Akka and I got stuck with situation 
 which is completely unclear for me. 
 I have three actors with following hierarchy: Root actor - 
 SubPathProcessingSupervisor - SubPathFinderActor.
 Root actor creates SubPathProcessingSupervisor  one during preStart 
 and then tries to send message to it using ask pattern. 

 //Root actor

 @Override
 public void preStart() throws Exception {
 super.preStart();
 this.subPathProcessingSupervisor = 
 context().actorOf(Props.create(SubPathProcessingSupervisor.class));
 this.pathModelsAggregatorSupervisor = 
 context().actorOf(Props.create(PathModelsAggregatorSupervisor.class));
 this.waitingDuration = (int)readConfig(akka.actor.timeouts, 
 subPathFindingAskDuration, 5000);
 }
 
 //...
 
 private IterableFutureObject 
 process(ConfiguredPathProcessingRequest request) {
 CollectionFutureObject result = new ArrayList();
 for (int ceSize = request.minCoverElementSize; ceSize = 
 request.path.getMaxCoverElementSize(); ceSize += 2) {
 for (int segSize = request.minCoverElementSize; segSize = 
 ceSize; segSize += 2) {
 FutureObject future = 
 Patterns.ask(subPathProcessingSupervisor,
 new SubPathProcessingRequest(request.path, 
 ceSize, segSize), waitingDuration);
 result.add(future);
 }
 }

 return result;
 }

 SubPathProcessingSupervisor has following structure: 

 public class SubPathProcessingSupervisor extends LoggingActor{

 @Override
 public void onReceive(Object o) throws Exception {
 logger.info(SubPathProcessingSupervisor : Message 
 received...);
 if(o instanceof SubPathProcessingRequest) {
 ActorRef subPathProcessorActor = 
 context().actorOf(Props.create(SubPathFinderActor.class));
 subPathProcessorActor.forward(o, context());
 } else {
 unhandled(o);
 }
 }
 }

 From time to time, when I am trying to debug my application I see 
 following messages:

 Either: 

 [INFO] [05/03/2015 13:43:48.413] 
 [application-akka.actor.default-dispatcher-9] 
 [akka://application/user/localProcessor/$a/$a/$b/$a/$a/$a] Message 
 [kernel.modeller.data.messages.SubPathProcessingRequest] from 
 Actor[akka://application/temp/$a] to 
 Actor[akka://application/user/localProcessor/$a/$a/$b/$a/$a/$a#342530440] 
 was not delivered. [1] dead letters encountered. This logging can be 
 turned 
 off or adjusted with configuration settings 'akka.log-dead-letters' and 
 'akka.log-dead-letters-during-shutdown'.


 Or: 

  [INFO] [05/03/2015 13:44:18.962] 
 [application-akka.actor.default-dispatcher-2] 
 [akka://application/user/localProcessor/$b/$a/$b/$b/$a] Message 
 [kernel.modeller.data.messages.SubPathProcessingRequest] from 
 Actor[akka://application/temp/$c] to 
 Actor[akka://application/user/localProcessor/$b/$a/$b/$b/$a#-285620982] 
 was 
 not delivered. [3] dead letters encountered. This logging can be turned 
 off 
 or adjusted with configuration settings 'akka.log-dead-letters' and 
 'akka.log-dead-letters-during-shutdown'.


 I am really confused with this issue because there are some successful 
 attempts... But in most cases messages were not delivered 
 to SubPathProcessingSupervisor, and very rarely from supervisor to 
 SubPathFinderActor. Each attempt may have different results with the same 
 input. Could you please help me to understand what is going on here? 

 -- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: 
 https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups Akka User List group.
 To unsubscribe from this group and stop receiving emails from it, send 
 an email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.
 Visit this group at http://groups.google.com/group/akka-user.
 For more options

[akka-user] Message was not delivered issue

2015-05-04 Thread Sergey Sopin
Hi, I'm developing a system based on Akka and I got stuck with situation 
which is completely unclear for me. 
I have three actors with following hierarchy: Root actor - 
SubPathProcessingSupervisor - SubPathFinderActor.
Root actor creates SubPathProcessingSupervisor  one during preStart and 
then tries to send message to it using ask pattern. 

//Root actor

@Override
public void preStart() throws Exception {
super.preStart();
this.subPathProcessingSupervisor = 
context().actorOf(Props.create(SubPathProcessingSupervisor.class));
this.pathModelsAggregatorSupervisor = 
context().actorOf(Props.create(PathModelsAggregatorSupervisor.class));
this.waitingDuration = (int)readConfig(akka.actor.timeouts, 
subPathFindingAskDuration, 5000);
}

//...

private IterableFutureObject 
process(ConfiguredPathProcessingRequest request) {
CollectionFutureObject result = new ArrayList();
for (int ceSize = request.minCoverElementSize; ceSize = 
request.path.getMaxCoverElementSize(); ceSize += 2) {
for (int segSize = request.minCoverElementSize; segSize = 
ceSize; segSize += 2) {
FutureObject future = 
Patterns.ask(subPathProcessingSupervisor,
new SubPathProcessingRequest(request.path, ceSize, 
segSize), waitingDuration);
result.add(future);
}
}

return result;
}

SubPathProcessingSupervisor has following structure: 

public class SubPathProcessingSupervisor extends LoggingActor{

@Override
public void onReceive(Object o) throws Exception {
logger.info(SubPathProcessingSupervisor : Message received...);
if(o instanceof SubPathProcessingRequest) {
ActorRef subPathProcessorActor = 
context().actorOf(Props.create(SubPathFinderActor.class));
subPathProcessorActor.forward(o, context());
} else {
unhandled(o);
}
}
}

From time to time, when I am trying to debug my application I see following 
messages:

Either: 

 [INFO] [05/03/2015 13:43:48.413] 
 [application-akka.actor.default-dispatcher-9] 
 [akka://application/user/localProcessor/$a/$a/$b/$a/$a/$a] Message 
 [kernel.modeller.data.messages.SubPathProcessingRequest] from 
 Actor[akka://application/temp/$a] to 
 Actor[akka://application/user/localProcessor/$a/$a/$b/$a/$a/$a#342530440] 
 was not delivered. [1] dead letters encountered. This logging can be turned 
 off or adjusted with configuration settings 'akka.log-dead-letters' and 
 'akka.log-dead-letters-during-shutdown'.


Or: 

  [INFO] [05/03/2015 13:44:18.962] 
 [application-akka.actor.default-dispatcher-2] 
 [akka://application/user/localProcessor/$b/$a/$b/$b/$a] Message 
 [kernel.modeller.data.messages.SubPathProcessingRequest] from 
 Actor[akka://application/temp/$c] to 
 Actor[akka://application/user/localProcessor/$b/$a/$b/$b/$a#-285620982] was 
 not delivered. [3] dead letters encountered. This logging can be turned off 
 or adjusted with configuration settings 'akka.log-dead-letters' and 
 'akka.log-dead-letters-during-shutdown'.


I am really confused with this issue because there are some successful 
attempts... But in most cases messages were not delivered 
to SubPathProcessingSupervisor, and very rarely from supervisor to 
SubPathFinderActor. Each attempt may have different results with the same 
input. Could you please help me to understand what is going on here? 

-- 
  Read the docs: http://akka.io/docs/
  Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups Akka 
User List group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.