Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-25 Thread Sergey Sopin
Hi Rafał,

Thanks a lot for your comments! 
You are right, it is a time to test them separately.
I've read that blog posts, but I will do it again carefully.
Thanks again.
 
Regards,
Sergey

вторник, 25 октября 2016 г., 12:54:15 UTC+3 пользователь Rafał Krzewski 
написал:
>
> Sergey, I spent a few minutes looking at your code, and I see two problems:
> -inletNumberToPullFrom is not scoped properly. It should be local to 
> GraphLogic implementation. In the current form, it is shared between all 
> ManagementFlow instances that get materialized that is definitely not what 
> you want
> - you expect that elements will be pushed to the inlets before findInelt() 
> method will be first called. To the best of my knowledge this is not how 
> Akka Streams work. If you don't see the exception in the default case of 
> switch(inletNumbetToPullFrom) thrown, that would mean that the demand token 
> never reaches ManagementFlow inlet 0, possibly because it is not propagated 
> correctly in FL graph.
>
> My recommendation is still to test FL, S, C and ManagementFlow in 
> isolation (ie connecting them only to TestSource, TestSink, Source.single, 
> Sink.ignore etc) and only then proceed to wiring them together.
>
> BTW, have you read the blog posts [1] and [2]? I find them quite 
> informative.
>
> Cheers,
> Rafał
>
> [1] http://blog.akka.io/streams/2016/07/30/mastering-graph-stage-part-1 
> 
> [2] 
> http://blog.akka.io/integrations/2016/08/25/simple-sink-source-with-graphstage
>  
> 
>
> W dniu poniedziałek, 24 października 2016 23:38:54 UTC+2 użytkownik Sergey 
> Sopin napisał:
>>
>> Hi Rafał,
>>
>> - sink requests data from you
>>> - OutletHandler.onPull is invoked on the outlet where the sink is 
>>> connected to
>>> - you propagate demand outstream by calling pull on any (or all) of your 
>>> Inlets, depending on your logic
>>> - eventually data becomes available upstream
>>> - InletHandler.onPush is invoked on the inlet you pulled previously, 
>>> with the incoming element
>>>
>>
>> I tried to add logging into onPull functions, but it didn't help. I see 
>> only messages from the beggining of flow ("Message_1"), but not from my 
>> custom shape. 
>>
>> OK, but the actual number of workers should not be greater than the 
>>> number of available CPUs, because otherwise Akka will interleave their 
>>> execution anyway. Spawning 1000s of worker flows will only waste memory. Of 
>>> course I understand that the input of fixed collection of data is artif
>>>
>>
>> I tried to remove balancer at all and work with the single worker. It 
>> doesn't help.
>>
>>  I usually prefer to debug single problem at a time than a number of 
>>> possibly interrelated problems at once.
>>>
>>
>> Me too and I will try, but I am sure that it will give me nothing in this 
>> case.
>> The problem is in the shape itself. 
>>
>> Regards,
>> Sergey
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-25 Thread Rafał Krzewski
Sergey, I spent a few minutes looking at your code, and I see two problems:
-inletNumberToPullFrom is not scoped properly. It should be local to 
GraphLogic implementation. In the current form, it is shared between all 
ManagementFlow instances that get materialized that is definitely not what 
you want
- you expect that elements will be pushed to the inlets before findInelt() 
method will be first called. To the best of my knowledge this is not how 
Akka Streams work. If you don't see the exception in the default case of 
switch(inletNumbetToPullFrom) thrown, that would mean that the demand token 
never reaches ManagementFlow inlet 0, possibly because it is not propagated 
correctly in FL graph.

My recommendation is still to test FL, S, C and ManagementFlow in isolation 
(ie connecting them only to TestSource, TestSink, Source.single, 
Sink.ignore etc) and only then proceed to wiring them together.

BTW, have you read the blog posts [1] and [2]? I find them quite 
informative.

Cheers,
Rafał

[1] http://blog.akka.io/streams/2016/07/30/mastering-graph-stage-part-1
[2] 
http://blog.akka.io/integrations/2016/08/25/simple-sink-source-with-graphstage

W dniu poniedziałek, 24 października 2016 23:38:54 UTC+2 użytkownik Sergey 
Sopin napisał:
>
> Hi Rafał,
>
> - sink requests data from you
>> - OutletHandler.onPull is invoked on the outlet where the sink is 
>> connected to
>> - you propagate demand outstream by calling pull on any (or all) of your 
>> Inlets, depending on your logic
>> - eventually data becomes available upstream
>> - InletHandler.onPush is invoked on the inlet you pulled previously, with 
>> the incoming element
>>
>
> I tried to add logging into onPull functions, but it didn't help. I see 
> only messages from the beggining of flow ("Message_1"), but not from my 
> custom shape. 
>
> OK, but the actual number of workers should not be greater than the number 
>> of available CPUs, because otherwise Akka will interleave their execution 
>> anyway. Spawning 1000s of worker flows will only waste memory. Of course I 
>> understand that the input of fixed collection of data is artif
>>
>
> I tried to remove balancer at all and work with the single worker. It 
> doesn't help.
>
>  I usually prefer to debug single problem at a time than a number of 
>> possibly interrelated problems at once.
>>
>
> Me too and I will try, but I am sure that it will give me nothing in this 
> case.
> The problem is in the shape itself. 
>
> Regards,
> Sergey
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-24 Thread Sergey Sopin
Hi Rafał,

- sink requests data from you
> - OutletHandler.onPull is invoked on the outlet where the sink is 
> connected to
> - you propagate demand outstream by calling pull on any (or all) of your 
> Inlets, depending on your logic
> - eventually data becomes available upstream
> - InletHandler.onPush is invoked on the inlet you pulled previously, with 
> the incoming element
>

I tried to add logging into onPull functions, but it didn't help. I see 
only messages from the beggining of flow ("Message_1"), but not from my 
custom shape. 

OK, but the actual number of workers should not be greater than the number 
> of available CPUs, because otherwise Akka will interleave their execution 
> anyway. Spawning 1000s of worker flows will only waste memory. Of course I 
> understand that the input of fixed collection of data is artif
>

I tried to remove balancer at all and work with the single worker. It 
doesn't help.

 I usually prefer to debug single problem at a time than a number of 
> possibly interrelated problems at once.
>

Me too and I will try, but I am sure that it will give me nothing in this 
case.
The problem is in the shape itself. 

Regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-20 Thread Rafał Krzewski
W dniu czwartek, 20 października 2016 20:28:26 UTC+2 użytkownik Sergey 
Sopin napisał:
>
> Rafał,
>
> - If I understand onPulI() function correctly, it is allowed only for 
> outlets, so in order to pull something you need to push it in this outlet 
> firstly.
>
>
Here's how I understand how this works (and if I'm wrong, I'd really 
appreciate comments / corrections!)

The initiating component in the system is the Sink. It pulls data out of 
it's upstream component. So, if there is a attached to your custom 
flow-shaped GraphStage the sequence of events is as follows:

- sink requests data from you
- OutletHandler.onPull is invoked on the outlet where the sink is connected 
to
- you propagate demand outstream by calling pull on any (or all) of your 
Inlets, depending on your logic
- eventually data becomes available upstream
- InletHandler.onPush is invoked on the inlet you pulled previously, with 
the incoming element
- you do your processing and call push on the appropriate outlet, which 
means any outlet that signaled demand beforehand. This also means that you 
might want to postpone pulling from upstream until *all* of your outlets 
signaled demand. Otherwise you might end up with a processed element that 
you can't emit downstream because it's intended outlet is backpressuring 
you (ie. not requesting an element at the moment). 

GraphStages can also react to asynchronous events other than pulls and 
pushes: timers, cancellation requests etc, but that's whole another story.

 

> - Balancer has been taken from the docs: 
> Balancing_jobs_to_a_fixed_pool_of_workers 
> 
>  
>
>
OK, but the actual number of workers should not be greater than the number 
of available CPUs, because otherwise Akka will interleave their execution 
anyway. Spawning 1000s of worker flows will only waste memory. Of course I 
understand that the input of fixed collection of data is artificial 
example. If the amount of data was fixed and small enough to fit into RAM, 
using Akka Streams for processing it would be an overkill.
 

> - I don't think that testing will help me, because based on the first 
> statement I would see something in the log. So, messages do not go inside 
> the shape and stuck somewhere between flow and managementFlow. 
>
>
Whatever floats your boat, man :) I usually prefer to debug single problem 
at a time than a number of possibly interrelated problems at once. Also, I 
still believe that the the messages are not pulled into managementFlow 
because demand is somehow lost. The few messages that you see leaving the 
source are probably pulled into the buffers at async boundary created with 
worker.async().But that's just my guess.

Cheers,
Rafał
 

> Regards,
> Sergey
>
> четверг, 20 октября 2016 г., 13:52:58 UTC+3 пользователь Rafał Krzewski 
> написал:
>>
>> Sergey,
>> I have a few remarks after cursory reading of your code:
>>
>> - Akka Streams (and Reactrive Streams) are pull based. As the messages 
>> travel downstream, virtual demand tokens travel upstream. Each graph 
>> element is allowed to push elements downstream only when demand is 
>> signaled. This means that you must keep track of demand carefully. In a 
>> GraphStage that is flow-shaped (has both inlets and outlets), flow of data 
>> is initiated by a pull on it's outlets. If you fails to propagate such pull 
>> to some (or all) of your GraphStage's inlets things are going to stall. In 
>> your test code, you put debut statements in onPush methods, but you should 
>> also monitor onPull
>>
>> - There's something strange with your balancer component. Either I'm 
>> misreading things, or you removed some important details while editing the 
>> code for publication, but it seems to me that each Data input element will 
>> be processed in a dedicated, parallel finderFlow / each finderFlow will 
>> ever see only a single element. This also could be a reason of the "clog" 
>> you experience.
>>
>> - In general I would suggest building your flow processing "outwards": 
>> first try to validate that managementFlow works in isolation (unit tests 
>> with Stream Test Kit would be recommended here) and once you have this 
>> settled, build and test large flows progressively.
>>
>> Hope that helps,
>> Rafał
>>
>>
>>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this gr

Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-20 Thread Sergey Sopin
Rafał,

- If I understand onPulI() function correctly, it is allowed only for 
outlets, so in order to pull something you need to push it in this outlet 
firstly.

- Balancer has been taken from the docs: 
Balancing_jobs_to_a_fixed_pool_of_workers 

 

- I don't think that testing will help me, because based on the first 
statement I would see something in the log. So, messages do not go inside 
the shape and stuck somewhere between flow and managementFlow. 

Regards,
Sergey

четверг, 20 октября 2016 г., 13:52:58 UTC+3 пользователь Rafał Krzewski 
написал:
>
> Sergey,
> I have a few remarks after cursory reading of your code:
>
> - Akka Streams (and Reactrive Streams) are pull based. As the messages 
> travel downstream, virtual demand tokens travel upstream. Each graph 
> element is allowed to push elements downstream only when demand is 
> signaled. This means that you must keep track of demand carefully. In a 
> GraphStage that is flow-shaped (has both inlets and outlets), flow of data 
> is initiated by a pull on it's outlets. If you fails to propagate such pull 
> to some (or all) of your GraphStage's inlets things are going to stall. In 
> your test code, you put debut statements in onPush methods, but you should 
> also monitor onPull
>
> - There's something strange with your balancer component. Either I'm 
> misreading things, or you removed some important details while editing the 
> code for publication, but it seems to me that each Data input element will 
> be processed in a dedicated, parallel finderFlow / each finderFlow will 
> ever see only a single element. This also could be a reason of the "clog" 
> you experience.
>
> - In general I would suggest building your flow processing "outwards": 
> first try to validate that managementFlow works in isolation (unit tests 
> with Stream Test Kit would be recommended here) and once you have this 
> settled, build and test large flows progressively.
>
> Hope that helps,
> Rafał
>
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-20 Thread Rafał Krzewski
Sergey,
I have a few remarks after cursory reading of your code:

- Akka Streams (and Reactrive Streams) are pull based. As the messages 
travel downstream, virtual demand tokens travel upstream. Each graph 
element is allowed to push elements downstream only when demand is 
signaled. This means that you must keep track of demand carefully. In a 
GraphStage that is flow-shaped (has both inlets and outlets), flow of data 
is initiated by a pull on it's outlets. If you fails to propagate such pull 
to some (or all) of your GraphStage's inlets things are going to stall. In 
your test code, you put debut statements in onPush methods, but you should 
also monitor onPull

- There's something strange with your balancer component. Either I'm 
misreading things, or you removed some important details while editing the 
code for publication, but it seems to me that each Data input element will 
be processed in a dedicated, parallel finderFlow / each finderFlow will 
ever see only a single element. This also could be a reason of the "clog" 
you experience.

- In general I would suggest building your flow processing "outwards": 
first try to validate that managementFlow works in isolation (unit tests 
with Stream Test Kit would be recommended here) and once you have this 
settled, build and test large flows progressively.

Hope that helps,
Rafał


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Sergey Sopin
Hi Rafał,

Code that uses my shape is following: 

private CompletionStage> buildAndRunGraph(ArrayList 
sourceList) {  //First function
Source source = Source.from(sourceList);
Materializer materializer = ActorMaterializer.create(context());
System.out.println("Running flow.");
return source.map(i -> {System.out.println("Message_1!"); return 
i;}).via(balancer(buildFinderFlow(), sourceList.size(), false)).
   map(i -> 
{System.out.println("Message_2!"); return i;}).runWith(Sink.seq(), 
materializer);

}

/**
 * This procedure build main processing flow.
 * Flow design is following:
 *
 * +---+
 * |   +-(0)--++   |
 * |   |   +---+  | FL |-(1)---|>
 * |   +-->|   |-(0)->++   |
 * >---|-->|Management |  +-+  |
 * |   +-->|  flow |-(1)->| S   |+ |
 * |   |+->|   |  +-+| |
 * |   ||  +---+-(2)->+---+  | |
 * |   |+-|C  |  | |
 * |   |  +---+  | |
 * |   +-+ |
 * +---+
 *
 * @return main processing flow.
 */
private static Flow buildFinderFlow() {
return Flow.fromGraph(GraphDSL.create(builder -> {
Graph, NotUsed> managementFlow = 
new ManagementFlow();   //Extends my new UniformFanShape. Code listed below
UniformFanShape managementShape = 
builder.add(managementFlow);

Graph, NotUsed> fl = new 
FLShape(); //FLShape extends FanOutShape2
FanOutShape2 flShape = builder.add(fl);

FlowShape s = builder.add(new SDShape()); //SDShape 
extends FlowShape
FlowShape c = builder.add(new CDShape()); //CDShape 
extends FlowShape

builder.from(managementShape.out(0)).toInlet(flShape.in());
builder.from(managementShape.out(1)).toInlet(s.in());
builder.from(managementShape.out(2)).toInlet(c.in());

builder.from(flShape.out0()).toInlet(managementShape.in(0));

builder.from(s.out()).toInlet(managementShape.in(2));

builder.from(c.out()).toInlet(managementShape.in(3));

return new FlowShape<>(managementShape.in(1), flShape.out1());

})
);
}

/**
 * This procedure returns Flow which contains set of sub Flows to be run 
asynchronously.
 *
 * @param worker Flow which contains processing logic and to be run 
asynchronously
 * @param workerCount amount of asycnhronous processes
 * @param  Type of input
 * @param  Type of output
 * @return Flow which contains set of asynchronous processes
 */
private static  Flow balancer(
Flow worker, int workerCount, boolean 
waitForAllDownstreams) {
return Flow.fromGraph(GraphDSL.create(b -> {
final UniformFanOutShape balance =
b.add(Balance.create(workerCount, waitForAllDownstreams));
final UniformFanInShape merge =
b.add(Merge.create(workerCount));

for (int i = 0; i < workerCount; i++) {

b.from(balance.out(i)).via(b.add(worker.async())).toInlet(merge.in(i));
}

return FlowShape.of(balance.in(), merge.out());
}));
}


And here is the code of ManagementFlow class which extends UniformFanShape:

package kernel.modeller.workers.streamFinder.subPathFinderShapes;

import akka.stream.Attributes;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import kernel.modeller.data.Data;
import kernel.modeller.workers.streamFinder.generic.UniformFanShape;

public final class ManagementFlow extends 
GraphStage> {
//Inlets
public final Inlet startIn = Inlet.create("Start.in");
public final Inlet flIn = Inlet.create("FL.in");
public final Inlet sIn = Inlet.create("sDir.in");
public final Inlet cIn = Inlet.create("cDir.in");
//Outlets
public final Outlet flOut = Outlet.create("FL.out");
public final Outlet sOut = Outlet.create("sDir.out");
public final Outlet cOut = Outlet.create("cDir.out");

private Inlet[] inlets = { flIn, startIn, sIn, cIn};
private Outlet[] outlets = {flOut, sOut, cOut};

private byte inletNumberToPullFrom = -1;

//Shape
private final UniformFanShape shape = new 
UniformFanShape((Inlet[])inlets, (Outlet[])outlets);

@Override
public UniformFanShape shape() {
return shape;
}

@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
//Handler for Start.in Inlet
{

Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Rafał Krzewski
Sergey, you haven't shown any code related to passing messages yet, so it's 
impossible to guess what's happening at this point.
As I said in previous message, the critical things are the actual 
components you put *inside* of your graph. The shapes just determine 
external connectors.

Cheers,
Rafał

W dniu środa, 19 października 2016 12:11:20 UTC+2 użytkownik Sergey Sopin 
napisał:
>
> Yep, I was trying to make something like that, but my code doesn't work. 
> Messages stuck somewhere and I don't know how to fix it due to lack of 
> Scala knowledge. You can find the code in my initial message here. 
> I took UniformFanOut as example and unsuccessfully tried to replace single 
> Inlet with the set of them.
> If you could help me to fix it I would be more than happy :)
> Thanks!
>
> Regards,
> Sergey
>
> среда, 19 октября 2016 г., 11:32:05 UTC+3 пользователь Konrad Malawski 
> написал:
>>
>> Shapes don't need separate java or scala api, it's shared.
>>
>> You can just subclass a shape and make a class that directly represents 
>> your shape.
>> If you want AmorphousShape then sure, but please note that it's purpose 
>> is to "forget about the types of those".
>>
>> If you want a well typed one simply extend Shape and fill in the abstract 
>> methods - see FlowShape etc for examples how to do this.
>>
>> -- 
>> Konrad `ktoso` Malawski
>> Akka  @ Lightbend 
>>
>> On 19 October 2016 at 08:03:16, Sergey Sopin (sopi...@gmail.com) wrote:
>>
>> Hi, 
>>
>> Yes, but it seems that I need to create Java API for it, because my app 
>> is in Java. 
>> I used Inkscape app. to draw the diagram.
>>
>> Cheers,
>> Sergey
>>
>> среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski 
>> написал: 
>>>
>>> A custom GraphStage [1] using AmorphousShape is probably the way to go 
>>> in this case. 
>>>
>>> That's a really neat diagram, BTW! What software did you us to create it?
>>>
>>> Cheers,
>>> Rafał
>>>
>>> [1] 
>>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Custom_processing_with_GraphStage
>>>
>>> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey 
>>> Sopin napisał: 

 Hi again,

 I have a very specific case. My flow looks like this one: 


 

 The idea of multi input/output shape was to redirect messages to a 
 right output based on the message data.

 I just learn streams, so maybe you can suggest a better solution?

 Thanks!


 Cheers, 

 Sergey



 вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
 написал: 
>
> It's not clear to me, what are you trying to accomplish. It looks like 
> you are trying to implement AmorphousShape (ie. arbitrary number of open 
> inlets and outlets) on your own, and then a specific variant of it, that 
> has all inlets sharing the same type, and all outlets sharing another 
> type. 
> The "Fan" fragment in the names you used is a bit misleading, since in 
> Akka 
> Stream's own usage of it names like FanIn / FanOut shape mean that such 
> grap has many inlets and single outlet / single inlet many outlets. The 
> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
> many blades :) I am wondering what use case you have in mind for your 
> AmorphousShape because the graphs that can be materialized and executed 
> must ultimately have a ClosedShape. You could use such multi-outlet 
> graphs 
> for reusing pieces of functionality, but anything more complex than a 
> BidiShape  seems  rather unwieldy to me. 
>
> My understanding is that Graph's shape should not interfere with 
> message flow, because it's just a canvas with contact points on the 
> perimeter. What matters are the components that you plug into it. Akka 
> just 
> makes sure that you don't leave any of the contact points dangling. This 
> makes me think that the problems with messages getting "stuck" was caused 
> somewhere other than graph shape construction site.
>
> Have you tried inserting probes alon the lines of 
> Flow.alsoTo(Sink.foreach(_ 
> => println("beep!"))) (shooting from the hip here, apologies if it 
> does not compile straight away) into your graph? That could help you 
> locate 
> where the messages are stuck / discarded.
>
> Cheers,
> Rafał
>
> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik 
> Sergey Sopin napisał: 
>>
>> Hi, 
>>
>> I am trying to create my own akka streams shape with several Inlets 
>> and Outlets. I have written following code: 
>>
>> package kernel.modeller.workers.streamFinder.generic
>>
>> import akka.stream.{Shape, Outlet, Inlet

Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Sergey Sopin
Yep, I was trying to make something like that, but my code doesn't work. 
Messages stuck somewhere and I don't know how to fix it due to lack of 
Scala knowledge. You can find the code in my initial message here. 
I took UniformFanOut as example and unsuccessfully tried to replace single 
Inlet with the set of them.
If you could help me to fix it I would be more than happy :)
Thanks!

Regards,
Sergey

среда, 19 октября 2016 г., 11:32:05 UTC+3 пользователь Konrad Malawski 
написал:
>
> Shapes don't need separate java or scala api, it's shared.
>
> You can just subclass a shape and make a class that directly represents 
> your shape.
> If you want AmorphousShape then sure, but please note that it's purpose is 
> to "forget about the types of those".
>
> If you want a well typed one simply extend Shape and fill in the abstract 
> methods - see FlowShape etc for examples how to do this.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 19 October 2016 at 08:03:16, Sergey Sopin (sopi...@gmail.com 
> ) wrote:
>
> Hi, 
>
> Yes, but it seems that I need to create Java API for it, because my app is 
> in Java. 
> I used Inkscape app. to draw the diagram.
>
> Cheers,
> Sergey
>
> среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski 
> написал: 
>>
>> A custom GraphStage [1] using AmorphousShape is probably the way to go in 
>> this case. 
>>
>> That's a really neat diagram, BTW! What software did you us to create it?
>>
>> Cheers,
>> Rafał
>>
>> [1] 
>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Custom_processing_with_GraphStage
>>
>> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey 
>> Sopin napisał: 
>>>
>>> Hi again,
>>>
>>> I have a very specific case. My flow looks like this one: 
>>>
>>>
>>> 
>>>
>>> The idea of multi input/output shape was to redirect messages to a right 
>>> output based on the message data.
>>>
>>> I just learn streams, so maybe you can suggest a better solution?
>>>
>>> Thanks!
>>>
>>>
>>> Cheers, 
>>>
>>> Sergey
>>>
>>>
>>>
>>> вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
>>> написал: 

 It's not clear to me, what are you trying to accomplish. It looks like 
 you are trying to implement AmorphousShape (ie. arbitrary number of open 
 inlets and outlets) on your own, and then a specific variant of it, that 
 has all inlets sharing the same type, and all outlets sharing another 
 type. 
 The "Fan" fragment in the names you used is a bit misleading, since in 
 Akka 
 Stream's own usage of it names like FanIn / FanOut shape mean that such 
 grap has many inlets and single outlet / single inlet many outlets. The 
 analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
 many blades :) I am wondering what use case you have in mind for your 
 AmorphousShape because the graphs that can be materialized and executed 
 must ultimately have a ClosedShape. You could use such multi-outlet graphs 
 for reusing pieces of functionality, but anything more complex than a 
 BidiShape  seems  rather unwieldy to me. 

 My understanding is that Graph's shape should not interfere with 
 message flow, because it's just a canvas with contact points on the 
 perimeter. What matters are the components that you plug into it. Akka 
 just 
 makes sure that you don't leave any of the contact points dangling. This 
 makes me think that the problems with messages getting "stuck" was caused 
 somewhere other than graph shape construction site.

 Have you tried inserting probes alon the lines of 
 Flow.alsoTo(Sink.foreach(_ 
 => println("beep!"))) (shooting from the hip here, apologies if it 
 does not compile straight away) into your graph? That could help you 
 locate 
 where the messages are stuck / discarded.

 Cheers,
 Rafał

 W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik 
 Sergey Sopin napisał: 
>
> Hi, 
>
> I am trying to create my own akka streams shape with several Inlets 
> and Outlets. I have written following code: 
>
> package kernel.modeller.workers.streamFinder.generic
>
> import akka.stream.{Shape, Outlet, Inlet}
> import scala.annotation.unchecked.uncheckedVariance
> import scala.collection.immutable
>
> object FanShape {
>   sealed trait Init[_] {
> def inlets: immutable.Seq[Inlet[_]]
> def outlets: immutable.Seq[Outlet[_]]
> def name: String
>   }
>   final case class Name[_](override val name: String) extends Init[Any] {
> override def inlets: immutable.Seq[Inlet[_]] = Nil
> override def outlets: immutable.Seq[Outlet[_]] = Nil
>   }
>   final case cla

Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Konrad Malawski
Shapes don't need separate java or scala api, it's shared.

You can just subclass a shape and make a class that directly represents
your shape.
If you want AmorphousShape then sure, but please note that it's purpose is
to "forget about the types of those".

If you want a well typed one simply extend Shape and fill in the abstract
methods - see FlowShape etc for examples how to do this.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 19 October 2016 at 08:03:16, Sergey Sopin (sopin1...@gmail.com) wrote:

Hi,

Yes, but it seems that I need to create Java API for it, because my app is
in Java.
I used Inkscape app. to draw the diagram.

Cheers,
Sergey

среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski
написал:
>
> A custom GraphStage [1] using AmorphousShape is probably the way to go in
> this case.
>
> That's a really neat diagram, BTW! What software did you us to create it?
>
> Cheers,
> Rafał
>
> [1] http://doc.akka.io/docs/akka/2.4/scala/stream/stream-
> customize.html#Custom_processing_with_GraphStage
>
> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey Sopin
> napisał:
>>
>> Hi again,
>>
>> I have a very specific case. My flow looks like this one:
>>
>>
>> 
>>
>> The idea of multi input/output shape was to redirect messages to a right
>> output based on the message data.
>>
>> I just learn streams, so maybe you can suggest a better solution?
>>
>> Thanks!
>>
>>
>> Cheers,
>>
>> Sergey
>>
>>
>>
>> вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski
>> написал:
>>>
>>> It's not clear to me, what are you trying to accomplish. It looks like
>>> you are trying to implement AmorphousShape (ie. arbitrary number of open
>>> inlets and outlets) on your own, and then a specific variant of it, that
>>> has all inlets sharing the same type, and all outlets sharing another type.
>>> The "Fan" fragment in the names you used is a bit misleading, since in Akka
>>> Stream's own usage of it names like FanIn / FanOut shape mean that such
>>> grap has many inlets and single outlet / single inlet many outlets. The
>>> analogy is to a Chinese-style hand held fan, rather than ceiling fan with
>>> many blades :) I am wondering what use case you have in mind for your
>>> AmorphousShape because the graphs that can be materialized and executed
>>> must ultimately have a ClosedShape. You could use such multi-outlet graphs
>>> for reusing pieces of functionality, but anything more complex than a
>>> BidiShape  seems  rather unwieldy to me.
>>>
>>> My understanding is that Graph's shape should not interfere with message
>>> flow, because it's just a canvas with contact points on the perimeter. What
>>> matters are the components that you plug into it. Akka just makes sure that
>>> you don't leave any of the contact points dangling. This makes me think
>>> that the problems with messages getting "stuck" was caused somewhere other
>>> than graph shape construction site.
>>>
>>> Have you tried inserting probes alon the lines of Flow.alsoTo(Sink.foreach(_
>>> => println("beep!"))) (shooting from the hip here, apologies if it does
>>> not compile straight away) into your graph? That could help you locate
>>> where the messages are stuck / discarded.
>>>
>>> Cheers,
>>> Rafał
>>>
>>> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik
>>> Sergey Sopin napisał:

 Hi,

 I am trying to create my own akka streams shape with several Inlets and
 Outlets. I have written following code:

 package kernel.modeller.workers.streamFinder.generic

 import akka.stream.{Shape, Outlet, Inlet}
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.immutable

 object FanShape {
   sealed trait Init[_] {
 def inlets: immutable.Seq[Inlet[_]]
 def outlets: immutable.Seq[Outlet[_]]
 def name: String
   }
   final case class Name[_](override val name: String) extends Init[Any] {
 override def inlets: immutable.Seq[Inlet[_]] = Nil
 override def outlets: immutable.Seq[Outlet[_]] = Nil
   }
   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
 override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
 override def name: String = "FanShape"
   }
 }

 abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
 Iterator[Outlet[_]], _name: String) extends Shape {

   import FanShape._

   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
 init.outlets.iterator, init.name)

   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
   final override def inlets: immutable.Seq[Inlet[_]] = _inlets

   private var _outlets: Vector[Outlet[_]] = Vector.empty
   private v

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-18 Thread Sergey Sopin
Hi,

Yes, but it seems that I need to create Java API for it, because my app is 
in Java. 
I used Inkscape app. to draw the diagram.

Cheers,
Sergey

среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski 
написал:
>
> A custom GraphStage [1] using AmorphousShape is probably the way to go in 
> this case.
>
> That's a really neat diagram, BTW! What software did you us to create it?
>
> Cheers,
> Rafał
>
> [1] 
> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Custom_processing_with_GraphStage
>
> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey Sopin 
> napisał:
>>
>> Hi again,
>>
>> I have a very specific case. My flow looks like this one:
>>
>>
>> 
>>
>> The idea of multi input/output shape was to redirect messages to a right 
>> output based on the message data.
>>
>> I just learn streams, so maybe you can suggest a better solution?
>>
>> Thanks!
>>
>>
>> Cheers, 
>>
>> Sergey
>>
>>
>>
>> вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
>> написал:
>>>
>>> It's not clear to me, what are you trying to accomplish. It looks like 
>>> you are trying to implement AmorphousShape (ie. arbitrary number of open 
>>> inlets and outlets) on your own, and then a specific variant of it, that 
>>> has all inlets sharing the same type, and all outlets sharing another type. 
>>> The "Fan" fragment in the names you used is a bit misleading, since in Akka 
>>> Stream's own usage of it names like FanIn / FanOut shape mean that such 
>>> grap has many inlets and single outlet / single inlet many outlets. The 
>>> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
>>> many blades :) I am wondering what use case you have in mind for your 
>>> AmorphousShape because the graphs that can be materialized and executed 
>>> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
>>> for reusing pieces of functionality, but anything more complex than a 
>>> BidiShape  seems  rather unwieldy to me.
>>>
>>> My understanding is that Graph's shape should not interfere with message 
>>> flow, because it's just a canvas with contact points on the perimeter. What 
>>> matters are the components that you plug into it. Akka just makes sure that 
>>> you don't leave any of the contact points dangling. This makes me think 
>>> that the problems with messages getting "stuck" was caused somewhere other 
>>> than graph shape construction site.
>>>
>>> Have you tried inserting probes alon the lines of 
>>> Flow.alsoTo(Sink.foreach(_ 
>>> => println("beep!"))) (shooting from the hip here, apologies if it does 
>>> not compile straight away) into your graph? That could help you locate 
>>> where the messages are stuck / discarded.
>>>
>>> Cheers,
>>> Rafał
>>>
>>> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik 
>>> Sergey Sopin napisał:

 Hi,

 I am trying to create my own akka streams shape with several Inlets and 
 Outlets. I have written following code: 

 package kernel.modeller.workers.streamFinder.generic

 import akka.stream.{Shape, Outlet, Inlet}
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.immutable

 object FanShape {
   sealed trait Init[_] {
 def inlets: immutable.Seq[Inlet[_]]
 def outlets: immutable.Seq[Outlet[_]]
 def name: String
   }
   final case class Name[_](override val name: String) extends Init[Any] {
 override def inlets: immutable.Seq[Inlet[_]] = Nil
 override def outlets: immutable.Seq[Outlet[_]] = Nil
   }
   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
 override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
 override def name: String = "FanShape"
   }
 }

 abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
 Iterator[Outlet[_]], _name: String) extends Shape {
   
   import FanShape._

   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
 init.outlets.iterator, init.name)

   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
   final override def inlets: immutable.Seq[Inlet[_]] = _inlets

   private var _outlets: Vector[Outlet[_]] = Vector.empty
   private var _inlets: Vector[Inlet[_]] = Vector.empty

   protected def newOutlet[T](name: String): Outlet[T] = {
 val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
 Outlet[T](s"${_name}.$name")
 _outlets :+= p
 p
   }

   protected def newInlet[T](name: String): Inlet[T] = {
 val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
 Inlet[T](s"${_name}.$name")
 _inlets :+= p
 p
   }

   protected def con

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-18 Thread Rafał Krzewski
A custom GraphStage [1] using AmorphousShape is probably the way to go in 
this case.

That's a really neat diagram, BTW! What software did you us to create it?

Cheers,
Rafał

[1] 
http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Custom_processing_with_GraphStage

W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey Sopin 
napisał:
>
> Hi again,
>
> I have a very specific case. My flow looks like this one:
>
>
> 
>
> The idea of multi input/output shape was to redirect messages to a right 
> output based on the message data.
>
> I just learn streams, so maybe you can suggest a better solution?
>
> Thanks!
>
>
> Cheers, 
>
> Sergey
>
>
>
> вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
> написал:
>>
>> It's not clear to me, what are you trying to accomplish. It looks like 
>> you are trying to implement AmorphousShape (ie. arbitrary number of open 
>> inlets and outlets) on your own, and then a specific variant of it, that 
>> has all inlets sharing the same type, and all outlets sharing another type. 
>> The "Fan" fragment in the names you used is a bit misleading, since in Akka 
>> Stream's own usage of it names like FanIn / FanOut shape mean that such 
>> grap has many inlets and single outlet / single inlet many outlets. The 
>> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
>> many blades :) I am wondering what use case you have in mind for your 
>> AmorphousShape because the graphs that can be materialized and executed 
>> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
>> for reusing pieces of functionality, but anything more complex than a 
>> BidiShape  seems  rather unwieldy to me.
>>
>> My understanding is that Graph's shape should not interfere with message 
>> flow, because it's just a canvas with contact points on the perimeter. What 
>> matters are the components that you plug into it. Akka just makes sure that 
>> you don't leave any of the contact points dangling. This makes me think 
>> that the problems with messages getting "stuck" was caused somewhere other 
>> than graph shape construction site.
>>
>> Have you tried inserting probes alon the lines of Flow.alsoTo(Sink.foreach(_ 
>> => println("beep!"))) (shooting from the hip here, apologies if it does 
>> not compile straight away) into your graph? That could help you locate 
>> where the messages are stuck / discarded.
>>
>> Cheers,
>> Rafał
>>
>> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik 
>> Sergey Sopin napisał:
>>>
>>> Hi,
>>>
>>> I am trying to create my own akka streams shape with several Inlets and 
>>> Outlets. I have written following code: 
>>>
>>> package kernel.modeller.workers.streamFinder.generic
>>>
>>> import akka.stream.{Shape, Outlet, Inlet}
>>> import scala.annotation.unchecked.uncheckedVariance
>>> import scala.collection.immutable
>>>
>>> object FanShape {
>>>   sealed trait Init[_] {
>>> def inlets: immutable.Seq[Inlet[_]]
>>> def outlets: immutable.Seq[Outlet[_]]
>>> def name: String
>>>   }
>>>   final case class Name[_](override val name: String) extends Init[Any] {
>>> override def inlets: immutable.Seq[Inlet[_]] = Nil
>>> override def outlets: immutable.Seq[Outlet[_]] = Nil
>>>   }
>>>   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
>>> override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
>>> override def name: String = "FanShape"
>>>   }
>>> }
>>>
>>> abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
>>> Iterator[Outlet[_]], _name: String) extends Shape {
>>>   
>>>   import FanShape._
>>>
>>>   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
>>> init.outlets.iterator, init.name)
>>>
>>>   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
>>>   final override def inlets: immutable.Seq[Inlet[_]] = _inlets
>>>
>>>   private var _outlets: Vector[Outlet[_]] = Vector.empty
>>>   private var _inlets: Vector[Inlet[_]] = Vector.empty
>>>
>>>   protected def newOutlet[T](name: String): Outlet[T] = {
>>> val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
>>> Outlet[T](s"${_name}.$name")
>>> _outlets :+= p
>>> p
>>>   }
>>>
>>>   protected def newInlet[T](name: String): Inlet[T] = {
>>> val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
>>> Inlet[T](s"${_name}.$name")
>>> _inlets :+= p
>>> p
>>>   }
>>>
>>>   protected def construct(init: Init[_]): FanShape[_]
>>>
>>>   def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()), 
>>> outlets.map(_.carbonCopy(
>>>   final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: 
>>> immutable.Seq[Outlet[_]]): FanShape[_] = {
>>> require(outlets.size == _outlets.size, s"proposed outlets 
>>> [${outlets.mkString(", ")}

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-18 Thread Sergey Sopin
Hi again,

I have a very specific case. My flow looks like this one:



The idea of multi input/output shape was to redirect messages to a right 
output based on the message data.

I just learn streams, so maybe you can suggest a better solution?

Thanks!


Cheers, 

Sergey



вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
написал:
>
> It's not clear to me, what are you trying to accomplish. It looks like you 
> are trying to implement AmorphousShape (ie. arbitrary number of open inlets 
> and outlets) on your own, and then a specific variant of it, that has all 
> inlets sharing the same type, and all outlets sharing another type. The 
> "Fan" fragment in the names you used is a bit misleading, since in Akka 
> Stream's own usage of it names like FanIn / FanOut shape mean that such 
> grap has many inlets and single outlet / single inlet many outlets. The 
> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
> many blades :) I am wondering what use case you have in mind for your 
> AmorphousShape because the graphs that can be materialized and executed 
> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
> for reusing pieces of functionality, but anything more complex than a 
> BidiShape  seems  rather unwieldy to me.
>
> My understanding is that Graph's shape should not interfere with message 
> flow, because it's just a canvas with contact points on the perimeter. What 
> matters are the components that you plug into it. Akka just makes sure that 
> you don't leave any of the contact points dangling. This makes me think 
> that the problems with messages getting "stuck" was caused somewhere other 
> than graph shape construction site.
>
> Have you tried inserting probes alon the lines of Flow.alsoTo(Sink.foreach(_ 
> => println("beep!"))) (shooting from the hip here, apologies if it does 
> not compile straight away) into your graph? That could help you locate 
> where the messages are stuck / discarded.
>
> Cheers,
> Rafał
>
> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik Sergey 
> Sopin napisał:
>>
>> Hi,
>>
>> I am trying to create my own akka streams shape with several Inlets and 
>> Outlets. I have written following code: 
>>
>> package kernel.modeller.workers.streamFinder.generic
>>
>> import akka.stream.{Shape, Outlet, Inlet}
>> import scala.annotation.unchecked.uncheckedVariance
>> import scala.collection.immutable
>>
>> object FanShape {
>>   sealed trait Init[_] {
>> def inlets: immutable.Seq[Inlet[_]]
>> def outlets: immutable.Seq[Outlet[_]]
>> def name: String
>>   }
>>   final case class Name[_](override val name: String) extends Init[Any] {
>> override def inlets: immutable.Seq[Inlet[_]] = Nil
>> override def outlets: immutable.Seq[Outlet[_]] = Nil
>>   }
>>   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
>> override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
>> override def name: String = "FanShape"
>>   }
>> }
>>
>> abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
>> Iterator[Outlet[_]], _name: String) extends Shape {
>>   
>>   import FanShape._
>>
>>   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
>> init.outlets.iterator, init.name)
>>
>>   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
>>   final override def inlets: immutable.Seq[Inlet[_]] = _inlets
>>
>>   private var _outlets: Vector[Outlet[_]] = Vector.empty
>>   private var _inlets: Vector[Inlet[_]] = Vector.empty
>>
>>   protected def newOutlet[T](name: String): Outlet[T] = {
>> val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
>> Outlet[T](s"${_name}.$name")
>> _outlets :+= p
>> p
>>   }
>>
>>   protected def newInlet[T](name: String): Inlet[T] = {
>> val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
>> Inlet[T](s"${_name}.$name")
>> _inlets :+= p
>> p
>>   }
>>
>>   protected def construct(init: Init[_]): FanShape[_]
>>
>>   def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()), 
>> outlets.map(_.carbonCopy(
>>   final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: 
>> immutable.Seq[Outlet[_]]): FanShape[_] = {
>> require(outlets.size == _outlets.size, s"proposed outlets 
>> [${outlets.mkString(", ")}] do not fit FanShape")
>> require(inlets.size == _inlets.size, s"proposed inlects 
>> [${inlets.mkString(", ")}] do not fit FanShape")
>> construct(Ports(inlets, outlets))
>>   }
>> }
>>
>> object UniformFanShape {
>>   def apply[I, O](inlets: Array[Inlet[I]], outlets: Outlet[O]*): 
>> UniformFanShape[I, O] =
>> new UniformFanShape(inlets.size, outlets.size, 
>> FanShape.Ports(inlets.toList, outlets.toList))
>> }
>>
>> class UniformFanShape[-I, +O](n: Int, m: Int, _init: FanShape.Init[

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-18 Thread Sergey Sopin
Hi, Rafał! 

Thanks a lot! You gave me everything I need :) I was looking 
for AmorphousShape!

Thanks again, I will not be inventing a wheel anymore!

Regards,
Sergey


вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
написал:
>
> It's not clear to me, what are you trying to accomplish. It looks like you 
> are trying to implement AmorphousShape (ie. arbitrary number of open inlets 
> and outlets) on your own, and then a specific variant of it, that has all 
> inlets sharing the same type, and all outlets sharing another type. The 
> "Fan" fragment in the names you used is a bit misleading, since in Akka 
> Stream's own usage of it names like FanIn / FanOut shape mean that such 
> grap has many inlets and single outlet / single inlet many outlets. The 
> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
> many blades :) I am wondering what use case you have in mind for your 
> AmorphousShape because the graphs that can be materialized and executed 
> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
> for reusing pieces of functionality, but anything more complex than a 
> BidiShape  seems  rather unwieldy to me.
>
> My understanding is that Graph's shape should not interfere with message 
> flow, because it's just a canvas with contact points on the perimeter. What 
> matters are the components that you plug into it. Akka just makes sure that 
> you don't leave any of the contact points dangling. This makes me think 
> that the problems with messages getting "stuck" was caused somewhere other 
> than graph shape construction site.
>
> Have you tried inserting probes alon the lines of Flow.alsoTo(Sink.foreach(_ 
> => println("beep!"))) (shooting from the hip here, apologies if it does 
> not compile straight away) into your graph? That could help you locate 
> where the messages are stuck / discarded.
>
> Cheers,
> Rafał
>
> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik Sergey 
> Sopin napisał:
>>
>> Hi,
>>
>> I am trying to create my own akka streams shape with several Inlets and 
>> Outlets. I have written following code: 
>>
>> package kernel.modeller.workers.streamFinder.generic
>>
>> import akka.stream.{Shape, Outlet, Inlet}
>> import scala.annotation.unchecked.uncheckedVariance
>> import scala.collection.immutable
>>
>> object FanShape {
>>   sealed trait Init[_] {
>> def inlets: immutable.Seq[Inlet[_]]
>> def outlets: immutable.Seq[Outlet[_]]
>> def name: String
>>   }
>>   final case class Name[_](override val name: String) extends Init[Any] {
>> override def inlets: immutable.Seq[Inlet[_]] = Nil
>> override def outlets: immutable.Seq[Outlet[_]] = Nil
>>   }
>>   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
>> override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
>> override def name: String = "FanShape"
>>   }
>> }
>>
>> abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
>> Iterator[Outlet[_]], _name: String) extends Shape {
>>   
>>   import FanShape._
>>
>>   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
>> init.outlets.iterator, init.name)
>>
>>   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
>>   final override def inlets: immutable.Seq[Inlet[_]] = _inlets
>>
>>   private var _outlets: Vector[Outlet[_]] = Vector.empty
>>   private var _inlets: Vector[Inlet[_]] = Vector.empty
>>
>>   protected def newOutlet[T](name: String): Outlet[T] = {
>> val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
>> Outlet[T](s"${_name}.$name")
>> _outlets :+= p
>> p
>>   }
>>
>>   protected def newInlet[T](name: String): Inlet[T] = {
>> val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
>> Inlet[T](s"${_name}.$name")
>> _inlets :+= p
>> p
>>   }
>>
>>   protected def construct(init: Init[_]): FanShape[_]
>>
>>   def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()), 
>> outlets.map(_.carbonCopy(
>>   final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: 
>> immutable.Seq[Outlet[_]]): FanShape[_] = {
>> require(outlets.size == _outlets.size, s"proposed outlets 
>> [${outlets.mkString(", ")}] do not fit FanShape")
>> require(inlets.size == _inlets.size, s"proposed inlects 
>> [${inlets.mkString(", ")}] do not fit FanShape")
>> construct(Ports(inlets, outlets))
>>   }
>> }
>>
>> object UniformFanShape {
>>   def apply[I, O](inlets: Array[Inlet[I]], outlets: Outlet[O]*): 
>> UniformFanShape[I, O] =
>> new UniformFanShape(inlets.size, outlets.size, 
>> FanShape.Ports(inlets.toList, outlets.toList))
>> }
>>
>> class UniformFanShape[-I, +O](n: Int, m: Int, _init: FanShape.Init[_]) 
>> extends FanShape(_init) {
>>   def this(n: Int, m: Int) = this (n, m, FanShape.Name("UniformFan"))
>>   def this(n: Int, m: Int, name: String) = this(n, m, FanShape.Name(name))
>>   def this(inlets: Array[Inlet[I]], outlets: 

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-18 Thread Rafał Krzewski
It's not clear to me, what are you trying to accomplish. It looks like you 
are trying to implement AmorphousShape (ie. arbitrary number of open inlets 
and outlets) on your own, and then a specific variant of it, that has all 
inlets sharing the same type, and all outlets sharing another type. The 
"Fan" fragment in the names you used is a bit misleading, since in Akka 
Stream's own usage of it names like FanIn / FanOut shape mean that such 
grap has many inlets and single outlet / single inlet many outlets. The 
analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
many blades :) I am wondering what use case you have in mind for your 
AmorphousShape because the graphs that can be materialized and executed 
must ultimately have a ClosedShape. You could use such multi-outlet graphs 
for reusing pieces of functionality, but anything more complex than a 
BidiShape  seems  rather unwieldy to me.

My understanding is that Graph's shape should not interfere with message 
flow, because it's just a canvas with contact points on the perimeter. What 
matters are the components that you plug into it. Akka just makes sure that 
you don't leave any of the contact points dangling. This makes me think 
that the problems with messages getting "stuck" was caused somewhere other 
than graph shape construction site.

Have you tried inserting probes alon the lines of Flow.alsoTo(Sink.foreach(_ 
=> println("beep!"))) (shooting from the hip here, apologies if it does not 
compile straight away) into your graph? That could help you locate where 
the messages are stuck / discarded.

Cheers,
Rafał

W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik Sergey 
Sopin napisał:
>
> Hi,
>
> I am trying to create my own akka streams shape with several Inlets and 
> Outlets. I have written following code: 
>
> package kernel.modeller.workers.streamFinder.generic
>
> import akka.stream.{Shape, Outlet, Inlet}
> import scala.annotation.unchecked.uncheckedVariance
> import scala.collection.immutable
>
> object FanShape {
>   sealed trait Init[_] {
> def inlets: immutable.Seq[Inlet[_]]
> def outlets: immutable.Seq[Outlet[_]]
> def name: String
>   }
>   final case class Name[_](override val name: String) extends Init[Any] {
> override def inlets: immutable.Seq[Inlet[_]] = Nil
> override def outlets: immutable.Seq[Outlet[_]] = Nil
>   }
>   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
> override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
> override def name: String = "FanShape"
>   }
> }
>
> abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
> Iterator[Outlet[_]], _name: String) extends Shape {
>   
>   import FanShape._
>
>   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
> init.outlets.iterator, init.name)
>
>   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
>   final override def inlets: immutable.Seq[Inlet[_]] = _inlets
>
>   private var _outlets: Vector[Outlet[_]] = Vector.empty
>   private var _inlets: Vector[Inlet[_]] = Vector.empty
>
>   protected def newOutlet[T](name: String): Outlet[T] = {
> val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
> Outlet[T](s"${_name}.$name")
> _outlets :+= p
> p
>   }
>
>   protected def newInlet[T](name: String): Inlet[T] = {
> val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
> Inlet[T](s"${_name}.$name")
> _inlets :+= p
> p
>   }
>
>   protected def construct(init: Init[_]): FanShape[_]
>
>   def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()), 
> outlets.map(_.carbonCopy(
>   final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: 
> immutable.Seq[Outlet[_]]): FanShape[_] = {
> require(outlets.size == _outlets.size, s"proposed outlets 
> [${outlets.mkString(", ")}] do not fit FanShape")
> require(inlets.size == _inlets.size, s"proposed inlects 
> [${inlets.mkString(", ")}] do not fit FanShape")
> construct(Ports(inlets, outlets))
>   }
> }
>
> object UniformFanShape {
>   def apply[I, O](inlets: Array[Inlet[I]], outlets: Outlet[O]*): 
> UniformFanShape[I, O] =
> new UniformFanShape(inlets.size, outlets.size, 
> FanShape.Ports(inlets.toList, outlets.toList))
> }
>
> class UniformFanShape[-I, +O](n: Int, m: Int, _init: FanShape.Init[_]) 
> extends FanShape(_init) {
>   def this(n: Int, m: Int) = this (n, m, FanShape.Name("UniformFan"))
>   def this(n: Int, m: Int, name: String) = this(n, m, FanShape.Name(name))
>   def this(inlets: Array[Inlet[I]], outlets: Array[Outlet[O]]) = 
> this(inlets.size, outlets.size, FanShape.Ports(inlets.toList, outlets.toList))
>   override protected def construct(init: FanShape.Init[_]): FanShape[_] = new 
> UniformFanShape(n, m, init)
>   override def deepCopy(): UniformFanShape[I, O] = 
> super.deepCopy().asInstanceOf[UniformFanShape[I, O]]
>
>   val inArray: Array[Inlet[I @uncheckedVariance]] = Array.tabulate(n

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-18 Thread Sergey Sopin
People say that following article may help: 

http://degoes.net/articles/insufficiently-polymorphic 

However, I still don't understand what's wrong with it. Any help will be 
appreciated.
Thanks!

Regards,
Sergey

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.