Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Sergey Sopin
Hi Rafał,

Code that uses my shape is following: 

private CompletionStage buildAndRunGraph(ArrayList 
sourceList) {  //First function
Source source = Source.from(sourceList);
Materializer materializer = ActorMaterializer.create(context());
System.out.println("Running flow.");
return source.map(i -> {System.out.println("Message_1!"); return 
i;}).via(balancer(buildFinderFlow(), sourceList.size(), false)).
   map(i -> 
{System.out.println("Message_2!"); return i;}).runWith(Sink.seq(), 
materializer);

}

/**
 * This procedure build main processing flow.
 * Flow design is following:
 *
 * +---+
 * |   +-(0)--++   |
 * |   |   +---+  | FL |-(1)---|>
 * |   +-->|   |-(0)->++   |
 * >---|-->|Management |  +-+  |
 * |   +-->|  flow |-(1)->| S   |+ |
 * |   |+->|   |  +-+| |
 * |   ||  +---+-(2)->+---+  | |
 * |   |+-|C  |  | |
 * |   |  +---+  | |
 * |   +-+ |
 * +---+
 *
 * @return main processing flow.
 */
private static Flow buildFinderFlow() {
return Flow.fromGraph(GraphDSL.create(builder -> {
Graph, NotUsed> managementFlow = 
new ManagementFlow();   //Extends my new UniformFanShape. Code listed below
UniformFanShape managementShape = 
builder.add(managementFlow);

Graph, NotUsed> fl = new 
FLShape(); //FLShape extends FanOutShape2
FanOutShape2 flShape = builder.add(fl);

FlowShape s = builder.add(new SDShape()); //SDShape 
extends FlowShape
FlowShape c = builder.add(new CDShape()); //CDShape 
extends FlowShape

builder.from(managementShape.out(0)).toInlet(flShape.in());
builder.from(managementShape.out(1)).toInlet(s.in());
builder.from(managementShape.out(2)).toInlet(c.in());

builder.from(flShape.out0()).toInlet(managementShape.in(0));

builder.from(s.out()).toInlet(managementShape.in(2));

builder.from(c.out()).toInlet(managementShape.in(3));

return new FlowShape<>(managementShape.in(1), flShape.out1());

})
);
}

/**
 * This procedure returns Flow which contains set of sub Flows to be run 
asynchronously.
 *
 * @param worker Flow which contains processing logic and to be run 
asynchronously
 * @param workerCount amount of asycnhronous processes
 * @param  Type of input
 * @param  Type of output
 * @return Flow which contains set of asynchronous processes
 */
private static  Flow balancer(
Flow worker, int workerCount, boolean 
waitForAllDownstreams) {
return Flow.fromGraph(GraphDSL.create(b -> {
final UniformFanOutShape balance =
b.add(Balance.create(workerCount, waitForAllDownstreams));
final UniformFanInShape merge =
b.add(Merge.create(workerCount));

for (int i = 0; i < workerCount; i++) {

b.from(balance.out(i)).via(b.add(worker.async())).toInlet(merge.in(i));
}

return FlowShape.of(balance.in(), merge.out());
}));
}


And here is the code of ManagementFlow class which extends UniformFanShape:

package kernel.modeller.workers.streamFinder.subPathFinderShapes;

import akka.stream.Attributes;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import kernel.modeller.data.Data;
import kernel.modeller.workers.streamFinder.generic.UniformFanShape;

public final class ManagementFlow extends 
GraphStage> {
//Inlets
public final Inlet startIn = Inlet.create("Start.in");
public final Inlet flIn = Inlet.create("FL.in");
public final Inlet sIn = Inlet.create("sDir.in");
public final Inlet cIn = Inlet.create("cDir.in");
//Outlets
public final Outlet flOut = Outlet.create("FL.out");
public final Outlet sOut = Outlet.create("sDir.out");
public final Outlet cOut = Outlet.create("cDir.out");

private Inlet[] inlets = { flIn, startIn, sIn, cIn};
private Outlet[] outlets = {flOut, sOut, cOut};

private byte inletNumberToPullFrom = -1;

//Shape
private final UniformFanShape shape = new 

[akka-user] Re: Simple beginner questions: Accessing values in flows

2016-10-19 Thread mnielsen894
So, rightly or wrongly, I did this, which seems to work:

 val flow = builder.add(wsl.mapMaterializedValue(f => {

  f map {

u => if (u.response.status == StatusCodes.SwitchingProtocols) {

  log.info("Switched protocols")

}

else

  throw new IllegalStateException("Did not switch")

  }
})

The key bit is the mapMaterializedValue inside the builder.add.

Whether this is how one is *intended* to do it is another matter, but so 
far the result has proven satisfactory.


On Wednesday, October 19, 2016 at 1:08:37 AM UTC-4, Evgeny Shepelyuk wrote:
>
> Hi, could you please spare final code ?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Alexey Shuksto
среда, 19 октября 2016 г., 15:47:31 UTC+3 пользователь √ написал:
>
> Ok, so a sort of "correlator"-stage?
>

Exactly.
 

> Yes, so the BidiFlow you create has shared state tied to the intance, not 
> the mateiralization.
> I think you'll need to create a custom GraphStage with a BidiShape.
>

Yep, thanks for your help -- I've implement such correlator as GraphStage, 
looks better now.

One minor question (docs aren't clear enough there): is it safe to drop 
`pull(in)` in `in` `onPush()` handler if such `pull(in)` happen in `out` 
`onPull()` handler?

In documentation for custom stages `in` pulled twice (both in `onPush()` 
and `onPull()`) and in code it is only pulled once.
 

>  
>
>>
>> среда, 19 октября 2016 г., 15:23:04 UTC+3 пользователь √ написал:
>>
>>>
>>>
>>> On Wed, Oct 19, 2016 at 2:18 PM, Alexey Shuksto  
>>> wrote:
>>>
 2 Konrad: Yep, in original question I meant not 'DSL construction time' 
 but 'execution time' thread-safety. Thanks for clarification.

 2 Victor: Use case is simple: outgoing flow need to store `Promise` of 
 future remote response in some shared state which then would be completed 
 by incoming flow. 

>>>
>>> So it's a bidirectional buffer of Promises and Futures?
>>>
>>>
>>>  
>>>
 There could be as many promises as there were outgoing messages, but 
 the order of responses are not guaranteed and there could be additional 
 messages in incoming flow.

 What do you meant by 'actively prevents multiple materializations'?

>>>
>>> What happens when you materialize that bidiflow N times?
>>>  
>>>

 среда, 19 октября 2016 г., 14:29:14 UTC+3 пользователь √ написал:
>
> Hi Alexey,
>
> Not only is it not thread-safe, but it also actively prevents multiple 
> materializations.
>
> Perhaps if you state your use-case we can suggest an alternative?
>
> On Wed, Oct 19, 2016 at 1:24 PM, Alexey Shuksto  
> wrote:
>
>> Hello hAkkers,
>>
>> Simple example:
>> val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
>>   var counter = 0
>>   
>>   val outbound = b.add(Flow[String].map { str =>
>> counter += 1
>> str -> counter
>>   })
>>   val inbound = b.add(Flow[(String, Int)].map { pair =>
>> counter -= 1
>> pair._1
>>   })
>>   
>>   BidiShape.fromFlows(outbound, inbound)
>> })
>>
>> Can I presume that contents of 'build block' is thread-safe or I need 
>> to guard `counter` somehow (use `AtomicInt` and such)?
>>
>> Also, do BidiFlow support 'duplex' mode or they process 
>> incoming/outgoing messages one at time?
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: 
>> https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google 
>> Groups "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, 
>> send an email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>
 -- 
 >> Read the docs: http://akka.io/docs/
 >> Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
 >> Search the archives: 
 https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups "Akka User List" group.
 To unsubscribe from this group and stop receiving emails from it, send 
 an email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.
 Visit this group at https://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.

>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> √
>>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>  Read the docs: http://akka.io/docs/

Re: [akka-user] Re: Fetching remote file via akka-http singleRequest results in extremely high allocation rate

2016-10-19 Thread vladyslav . chaban
Hi,

there's really not too much to add to the gist from the first post: this 
method is called once, almost right in the main class, to fetch some 
resource. For now I managed to work this issue around, but I thought that 
working on rope-like ByteStrings should be faster than on array-backed Java 
strings? Is it possible to split incoming ByteString chunk directly without 
reallocation?

Thanks

On Wednesday, October 19, 2016 at 4:32:09 PM UTC+2, Konrad Malawski wrote:
>
> Hi Vladyslav,
> firstly, no reason to get desperate :-)
> Glad you did some profiling, it would be super helpful next time around if 
> you share more details about both your code an profile you're observing.
> It's hard to give advice what can be improved when given a blank canvas 
> (no idea what your code looks like).
>
> Of course things can be improved, and in fact just a few days ago we 
> merged a significant performance improvement that affects the Framing 
> stages:
> https://github.com/akka/akka/pull/21539
>
> Keep an eye out for 2.4.12.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 19 October 2016 at 16:11:32, vladysla...@rtsmunity.com  (
> vladysla...@rtsmunity.com ) wrote:
>
>
> After some playing with JMC and async boundaries it turned out the hottest 
> methods are related to ByteString splitting and delimiting (for example, 
> scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized, 
> GenIterable) takes 73-75% of processor time). The input file is 45M lines, 
> 2-3-characters each. Is it at all possible to make this work?
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: 
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+...@googlegroups.com .
> To post to this group, send email to akka...@googlegroups.com 
> .
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Fetching remote file via akka-http singleRequest results in extremely high allocation rate

2016-10-19 Thread johannes . rudolph
Using mapConcat instead may even be faster ;)

On Wednesday, October 19, 2016 at 5:05:22 PM UTC+2, 
vladysla...@rtsmunity.com wrote:
>
> Okay, so the issue was really in Framing performance. Changing Framing 
> stage for
>
> flatMapConcat(chunk -> Source.from(Arrays.asList(chunk.split("\n".
>
> actually made it reasonably fast.
>
>
> On Wednesday, October 19, 2016 at 3:25:50 PM UTC+2, 
> vladysla...@rtsmunity.com wrote:
>>
>> Hello,
>>
>> I have the following problem. I'm trying to fetch a file via HTTP in 
>> streaming fashion, processing it line by line (code: 
>> https://gist.github.com/anonymous/c737fa388b55181dcdab9fa6cb8cb2bc), but 
>> it is extremely slow (about a minute to fetch a 150MB file to t2.micro 
>> instance from S3). I guess, something is reallocated for each input line 
>> (input files have very short lines), resulting in chainsaw-like heap size 
>> graph and extremely high allocation rate (JMC screenshot: 
>> http://prnt.sc/cw8si6). What am I doing wrong?
>>
>> Thanks
>>
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Fetching remote file via akka-http singleRequest results in extremely high allocation rate

2016-10-19 Thread vladyslav . chaban
Okay, so the issue was really in Framing performance. Changing Framing 
stage for

flatMapConcat(chunk -> Source.from(Arrays.asList(chunk.split("\n".

actually made it reasonably fast.


On Wednesday, October 19, 2016 at 3:25:50 PM UTC+2, 
vladysla...@rtsmunity.com wrote:
>
> Hello,
>
> I have the following problem. I'm trying to fetch a file via HTTP in 
> streaming fashion, processing it line by line (code: 
> https://gist.github.com/anonymous/c737fa388b55181dcdab9fa6cb8cb2bc), but 
> it is extremely slow (about a minute to fetch a 150MB file to t2.micro 
> instance from S3). I guess, something is reallocated for each input line 
> (input files have very short lines), resulting in chainsaw-like heap size 
> graph and extremely high allocation rate (JMC screenshot: 
> http://prnt.sc/cw8si6). What am I doing wrong?
>
> Thanks
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Fetching remote file via akka-http singleRequest results in extremely high allocation rate

2016-10-19 Thread johannes . rudolph
Hi Vladyslav,

this sounds like a worst-case scenario for the Framing stages: 45M lines 
and each line 2-3 characters long will put a lot of pressure on the streams 
infrastructure and the framing stage. It might still make sense to 
benchmark and optimize the Framing stage. One optimization could be to 
collect a few elements in one go (the current implementation only looks for 
one match at a time).

Johannes


On Wednesday, October 19, 2016 at 4:11:29 PM UTC+2, 
vladysla...@rtsmunity.com wrote:
>
>
> After some playing with JMC and async boundaries it turned out the hottest 
> methods are related to ByteString splitting and delimiting (for example, 
> scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized, 
> GenIterable) takes 73-75% of processor time). The input file is 45M lines, 
> 2-3-characters each. Is it at all possible to make this work?
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Fetching remote file via akka-http singleRequest results in extremely high allocation rate

2016-10-19 Thread Konrad Malawski
Hi Vladyslav,
firstly, no reason to get desperate :-)
Glad you did some profiling, it would be super helpful next time around if
you share more details about both your code an profile you're observing.
It's hard to give advice what can be improved when given a blank canvas (no
idea what your code looks like).

Of course things can be improved, and in fact just a few days ago we merged
a significant performance improvement that affects the Framing stages:
https://github.com/akka/akka/pull/21539

Keep an eye out for 2.4.12.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 19 October 2016 at 16:11:32, vladyslav.cha...@rtsmunity.com (
vladyslav.cha...@rtsmunity.com) wrote:


After some playing with JMC and async boundaries it turned out the hottest
methods are related to ByteString splitting and delimiting (for example,
scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized,
GenIterable) takes 73-75% of processor time). The input file is 45M lines,
2-3-characters each. Is it at all possible to make this work?
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Fetching remote file via akka-http singleRequest results in extremely high allocation rate

2016-10-19 Thread vladyslav . chaban

After some playing with JMC and async boundaries it turned out the hottest 
methods are related to ByteString splitting and delimiting (for example, 
scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized, 
GenIterable) takes 73-75% of processor time). The input file is 45M lines, 
2-3-characters each. Is it at all possible to make this work?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: Fetching remote file via akka-http singleRequest results in extremely high allocation rate

2016-10-19 Thread vladyslav . chaban
Akka and Akka-HTTP version is 2.4.11.
On Wednesday, October 19, 2016 at 3:25:50 PM UTC+2, 
vladysla...@rtsmunity.com wrote:
>
> Hello,
>
> I have the following problem. I'm trying to fetch a file via HTTP in 
> streaming fashion, processing it line by line (code: 
> https://gist.github.com/anonymous/c737fa388b55181dcdab9fa6cb8cb2bc), but 
> it is extremely slow (about a minute to fetch a 150MB file to t2.micro 
> instance from S3). I guess, something is reallocated for each input line 
> (input files have very short lines), resulting in chainsaw-like heap size 
> graph and extremely high allocation rate (JMC screenshot: 
> http://prnt.sc/cw8si6). What am I doing wrong?
>
> Thanks
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Fetching remote file via akka-http singleRequest results in extremely high allocation rate

2016-10-19 Thread vladyslav . chaban
Hello,

I have the following problem. I'm trying to fetch a file via HTTP in 
streaming fashion, processing it line by line (code: 
https://gist.github.com/anonymous/c737fa388b55181dcdab9fa6cb8cb2bc), but it 
is extremely slow (about a minute to fetch a 150MB file to t2.micro 
instance from S3). I guess, something is reallocated for each input line 
(input files have very short lines), resulting in chainsaw-like heap size 
graph and extremely high allocation rate (JMC screenshot: 
http://prnt.sc/cw8si6). What am I doing wrong?

Thanks

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Viktor Klang
On Wed, Oct 19, 2016 at 2:43 PM, Alexey Shuksto  wrote:

> 1. Flow itself is a bidi-codec from ByteString to our own Request/Response
> entities. Each Request has Promise[Response] attribute. Shared state is
> more like Map[Request.Id, Promise[Response]] -- because order of Responses
> are not guarantied.
>

Ok, so a sort of "correlator"-stage?


>
> 2. It should have state shared only "inside" this one materialized flow.
>

Yes, so the BidiFlow you create has shared state tied to the intance, not
the mateiralization.
I think you'll need to create a custom GraphStage with a BidiShape.


>
> среда, 19 октября 2016 г., 15:23:04 UTC+3 пользователь √ написал:
>
>>
>>
>> On Wed, Oct 19, 2016 at 2:18 PM, Alexey Shuksto  wrote:
>>
>>> 2 Konrad: Yep, in original question I meant not 'DSL construction time'
>>> but 'execution time' thread-safety. Thanks for clarification.
>>>
>>> 2 Victor: Use case is simple: outgoing flow need to store `Promise` of
>>> future remote response in some shared state which then would be completed
>>> by incoming flow.
>>>
>>
>> So it's a bidirectional buffer of Promises and Futures?
>>
>>
>>
>>
>>> There could be as many promises as there were outgoing messages, but the
>>> order of responses are not guaranteed and there could be additional
>>> messages in incoming flow.
>>>
>>> What do you meant by 'actively prevents multiple materializations'?
>>>
>>
>> What happens when you materialize that bidiflow N times?
>>
>>
>>>
>>> среда, 19 октября 2016 г., 14:29:14 UTC+3 пользователь √ написал:

 Hi Alexey,

 Not only is it not thread-safe, but it also actively prevents multiple
 materializations.

 Perhaps if you state your use-case we can suggest an alternative?

 On Wed, Oct 19, 2016 at 1:24 PM, Alexey Shuksto 
 wrote:

> Hello hAkkers,
>
> Simple example:
> val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
>   var counter = 0
>
>   val outbound = b.add(Flow[String].map { str =>
> counter += 1
> str -> counter
>   })
>   val inbound = b.add(Flow[(String, Int)].map { pair =>
> counter -= 1
> pair._1
>   })
>
>   BidiShape.fromFlows(outbound, inbound)
> })
>
> Can I presume that contents of 'build block' is thread-safe or I need
> to guard `counter` somehow (use `AtomicInt` and such)?
>
> Also, do BidiFlow support 'duplex' mode or they process
> incoming/outgoing messages one at time?
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/c
> urrent/additional/faq.html
> >> Search the archives: https://groups.google.com/grou
> p/akka-user
> ---
> You received this message because you are subscribed to the Google
> Groups "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to akka-user+...@googlegroups.com.
> To post to this group, send email to akka...@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



 --
 Cheers,
 √

>>> --
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Cheers,
>> √
>>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google 

Re: [akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Alexey Shuksto
1. Flow itself is a bidi-codec from ByteString to our own Request/Response 
entities. Each Request has Promise[Response] attribute. Shared state is 
more like Map[Request.Id, Promise[Response]] -- because order of Responses 
are not guarantied.

2. It should have state shared only "inside" this one materialized flow.

среда, 19 октября 2016 г., 15:23:04 UTC+3 пользователь √ написал:
>
>
>
> On Wed, Oct 19, 2016 at 2:18 PM, Alexey Shuksto  > wrote:
>
>> 2 Konrad: Yep, in original question I meant not 'DSL construction time' 
>> but 'execution time' thread-safety. Thanks for clarification.
>>
>> 2 Victor: Use case is simple: outgoing flow need to store `Promise` of 
>> future remote response in some shared state which then would be completed 
>> by incoming flow. 
>>
>
> So it's a bidirectional buffer of Promises and Futures?
>
>
>  
>
>> There could be as many promises as there were outgoing messages, but the 
>> order of responses are not guaranteed and there could be additional 
>> messages in incoming flow.
>>
>> What do you meant by 'actively prevents multiple materializations'?
>>
>
> What happens when you materialize that bidiflow N times?
>  
>
>>
>> среда, 19 октября 2016 г., 14:29:14 UTC+3 пользователь √ написал:
>>>
>>> Hi Alexey,
>>>
>>> Not only is it not thread-safe, but it also actively prevents multiple 
>>> materializations.
>>>
>>> Perhaps if you state your use-case we can suggest an alternative?
>>>
>>> On Wed, Oct 19, 2016 at 1:24 PM, Alexey Shuksto  
>>> wrote:
>>>
 Hello hAkkers,

 Simple example:
 val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
   var counter = 0
   
   val outbound = b.add(Flow[String].map { str =>
 counter += 1
 str -> counter
   })
   val inbound = b.add(Flow[(String, Int)].map { pair =>
 counter -= 1
 pair._1
   })
   
   BidiShape.fromFlows(outbound, inbound)
 })

 Can I presume that contents of 'build block' is thread-safe or I need 
 to guard `counter` somehow (use `AtomicInt` and such)?

 Also, do BidiFlow support 'duplex' mode or they process 
 incoming/outgoing messages one at time?

 -- 
 >> Read the docs: http://akka.io/docs/
 >> Check the FAQ: 
 http://doc.akka.io/docs/akka/current/additional/faq.html
 >> Search the archives: 
 https://groups.google.com/group/akka-user
 --- 
 You received this message because you are subscribed to the Google 
 Groups "Akka User List" group.
 To unsubscribe from this group and stop receiving emails from it, send 
 an email to akka-user+...@googlegroups.com.
 To post to this group, send email to akka...@googlegroups.com.
 Visit this group at https://groups.google.com/group/akka-user.
 For more options, visit https://groups.google.com/d/optout.

>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> √
>>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Viktor Klang
On Wed, Oct 19, 2016 at 2:18 PM, Alexey Shuksto  wrote:

> 2 Konrad: Yep, in original question I meant not 'DSL construction time'
> but 'execution time' thread-safety. Thanks for clarification.
>
> 2 Victor: Use case is simple: outgoing flow need to store `Promise` of
> future remote response in some shared state which then would be completed
> by incoming flow.
>

So it's a bidirectional buffer of Promises and Futures?




> There could be as many promises as there were outgoing messages, but the
> order of responses are not guaranteed and there could be additional
> messages in incoming flow.
>
> What do you meant by 'actively prevents multiple materializations'?
>

What happens when you materialize that bidiflow N times?


>
> среда, 19 октября 2016 г., 14:29:14 UTC+3 пользователь √ написал:
>>
>> Hi Alexey,
>>
>> Not only is it not thread-safe, but it also actively prevents multiple
>> materializations.
>>
>> Perhaps if you state your use-case we can suggest an alternative?
>>
>> On Wed, Oct 19, 2016 at 1:24 PM, Alexey Shuksto  wrote:
>>
>>> Hello hAkkers,
>>>
>>> Simple example:
>>> val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
>>>   var counter = 0
>>>
>>>   val outbound = b.add(Flow[String].map { str =>
>>> counter += 1
>>> str -> counter
>>>   })
>>>   val inbound = b.add(Flow[(String, Int)].map { pair =>
>>> counter -= 1
>>> pair._1
>>>   })
>>>
>>>   BidiShape.fromFlows(outbound, inbound)
>>> })
>>>
>>> Can I presume that contents of 'build block' is thread-safe or I need to
>>> guard `counter` somehow (use `AtomicInt` and such)?
>>>
>>> Also, do BidiFlow support 'duplex' mode or they process
>>> incoming/outgoing messages one at time?
>>>
>>> --
>>> >> Read the docs: http://akka.io/docs/
>>> >> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to akka-user+...@googlegroups.com.
>>> To post to this group, send email to akka...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Cheers,
>> √
>>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Alexey Shuksto
2 Konrad: Yep, in original question I meant not 'DSL construction time' but 
'execution time' thread-safety. Thanks for clarification.

2 Victor: Use case is simple: outgoing flow need to store `Promise` of 
future remote response in some shared state which then would be completed 
by incoming flow. There could be as many promises as there were outgoing 
messages, but the order of responses are not guaranteed and there could be 
additional messages in incoming flow.

What do you meant by 'actively prevents multiple materializations'?

среда, 19 октября 2016 г., 14:29:14 UTC+3 пользователь √ написал:
>
> Hi Alexey,
>
> Not only is it not thread-safe, but it also actively prevents multiple 
> materializations.
>
> Perhaps if you state your use-case we can suggest an alternative?
>
> On Wed, Oct 19, 2016 at 1:24 PM, Alexey Shuksto  > wrote:
>
>> Hello hAkkers,
>>
>> Simple example:
>> val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
>>   var counter = 0
>>   
>>   val outbound = b.add(Flow[String].map { str =>
>> counter += 1
>> str -> counter
>>   })
>>   val inbound = b.add(Flow[(String, Int)].map { pair =>
>> counter -= 1
>> pair._1
>>   })
>>   
>>   BidiShape.fromFlows(outbound, inbound)
>> })
>>
>> Can I presume that contents of 'build block' is thread-safe or I need to 
>> guard `counter` somehow (use `AtomicInt` and such)?
>>
>> Also, do BidiFlow support 'duplex' mode or they process incoming/outgoing 
>> messages one at time?
>>
>> -- 
>> >> Read the docs: http://akka.io/docs/
>> >> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com .
>> To post to this group, send email to akka...@googlegroups.com 
>> .
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Rafał Krzewski
Sergey, you haven't shown any code related to passing messages yet, so it's 
impossible to guess what's happening at this point.
As I said in previous message, the critical things are the actual 
components you put *inside* of your graph. The shapes just determine 
external connectors.

Cheers,
Rafał

W dniu środa, 19 października 2016 12:11:20 UTC+2 użytkownik Sergey Sopin 
napisał:
>
> Yep, I was trying to make something like that, but my code doesn't work. 
> Messages stuck somewhere and I don't know how to fix it due to lack of 
> Scala knowledge. You can find the code in my initial message here. 
> I took UniformFanOut as example and unsuccessfully tried to replace single 
> Inlet with the set of them.
> If you could help me to fix it I would be more than happy :)
> Thanks!
>
> Regards,
> Sergey
>
> среда, 19 октября 2016 г., 11:32:05 UTC+3 пользователь Konrad Malawski 
> написал:
>>
>> Shapes don't need separate java or scala api, it's shared.
>>
>> You can just subclass a shape and make a class that directly represents 
>> your shape.
>> If you want AmorphousShape then sure, but please note that it's purpose 
>> is to "forget about the types of those".
>>
>> If you want a well typed one simply extend Shape and fill in the abstract 
>> methods - see FlowShape etc for examples how to do this.
>>
>> -- 
>> Konrad `ktoso` Malawski
>> Akka  @ Lightbend 
>>
>> On 19 October 2016 at 08:03:16, Sergey Sopin (sopi...@gmail.com) wrote:
>>
>> Hi, 
>>
>> Yes, but it seems that I need to create Java API for it, because my app 
>> is in Java. 
>> I used Inkscape app. to draw the diagram.
>>
>> Cheers,
>> Sergey
>>
>> среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski 
>> написал: 
>>>
>>> A custom GraphStage [1] using AmorphousShape is probably the way to go 
>>> in this case. 
>>>
>>> That's a really neat diagram, BTW! What software did you us to create it?
>>>
>>> Cheers,
>>> Rafał
>>>
>>> [1] 
>>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Custom_processing_with_GraphStage
>>>
>>> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey 
>>> Sopin napisał: 

 Hi again,

 I have a very specific case. My flow looks like this one: 


 

 The idea of multi input/output shape was to redirect messages to a 
 right output based on the message data.

 I just learn streams, so maybe you can suggest a better solution?

 Thanks!


 Cheers, 

 Sergey



 вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
 написал: 
>
> It's not clear to me, what are you trying to accomplish. It looks like 
> you are trying to implement AmorphousShape (ie. arbitrary number of open 
> inlets and outlets) on your own, and then a specific variant of it, that 
> has all inlets sharing the same type, and all outlets sharing another 
> type. 
> The "Fan" fragment in the names you used is a bit misleading, since in 
> Akka 
> Stream's own usage of it names like FanIn / FanOut shape mean that such 
> grap has many inlets and single outlet / single inlet many outlets. The 
> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
> many blades :) I am wondering what use case you have in mind for your 
> AmorphousShape because the graphs that can be materialized and executed 
> must ultimately have a ClosedShape. You could use such multi-outlet 
> graphs 
> for reusing pieces of functionality, but anything more complex than a 
> BidiShape  seems  rather unwieldy to me. 
>
> My understanding is that Graph's shape should not interfere with 
> message flow, because it's just a canvas with contact points on the 
> perimeter. What matters are the components that you plug into it. Akka 
> just 
> makes sure that you don't leave any of the contact points dangling. This 
> makes me think that the problems with messages getting "stuck" was caused 
> somewhere other than graph shape construction site.
>
> Have you tried inserting probes alon the lines of 
> Flow.alsoTo(Sink.foreach(_ 
> => println("beep!"))) (shooting from the hip here, apologies if it 
> does not compile straight away) into your graph? That could help you 
> locate 
> where the messages are stuck / discarded.
>
> Cheers,
> Rafał
>
> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik 
> Sergey Sopin napisał: 
>>
>> Hi, 
>>
>> I am trying to create my own akka streams shape with several Inlets 
>> and Outlets. I have written following code: 
>>
>> package kernel.modeller.workers.streamFinder.generic
>>
>> import akka.stream.{Shape, Outlet, 

Re: [akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Viktor Klang
Hi Alexey,

Not only is it not thread-safe, but it also actively prevents multiple
materializations.

Perhaps if you state your use-case we can suggest an alternative?

On Wed, Oct 19, 2016 at 1:24 PM, Alexey Shuksto  wrote:

> Hello hAkkers,
>
> Simple example:
> val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
>   var counter = 0
>
>   val outbound = b.add(Flow[String].map { str =>
> counter += 1
> str -> counter
>   })
>   val inbound = b.add(Flow[(String, Int)].map { pair =>
> counter -= 1
> pair._1
>   })
>
>   BidiShape.fromFlows(outbound, inbound)
> })
>
> Can I presume that contents of 'build block' is thread-safe or I need to
> guard `counter` somehow (use `AtomicInt` and such)?
>
> Also, do BidiFlow support 'duplex' mode or they process incoming/outgoing
> messages one at time?
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Konrad Malawski
This is not safe, outbound and inbound flows could be executing on
different threads.
It's not a question about the the DSL being safe - that's fine as it's only
constructing stuff,
but the graph you constructed is accessing shared state from (potentially)
different threads - thus it is not safe.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 19 October 2016 at 13:24:50, Alexey Shuksto (seig...@gmail.com) wrote:

Hello hAkkers,

Simple example:
val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
  var counter = 0

  val outbound = b.add(Flow[String].map { str =>
counter += 1
str -> counter
  })
  val inbound = b.add(Flow[(String, Int)].map { pair =>
counter -= 1
pair._1
  })

  BidiShape.fromFlows(outbound, inbound)
})

Can I presume that contents of 'build block' is thread-safe or I need to
guard `counter` somehow (use `AtomicInt` and such)?

Also, do BidiFlow support 'duplex' mode or they process incoming/outgoing
messages one at time?
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Is contents of GraphDSL.create() {..} thread-safe for BidiFlow?

2016-10-19 Thread Alexey Shuksto
Hello hAkkers,

Simple example:
val zipper = BidiFlow.fromGraph(GraphDSL.create() { b =>
  var counter = 0
  
  val outbound = b.add(Flow[String].map { str =>
counter += 1
str -> counter
  })
  val inbound = b.add(Flow[(String, Int)].map { pair =>
counter -= 1
pair._1
  })
  
  BidiShape.fromFlows(outbound, inbound)
})

Can I presume that contents of 'build block' is thread-safe or I need to 
guard `counter` somehow (use `AtomicInt` and such)?

Also, do BidiFlow support 'duplex' mode or they process incoming/outgoing 
messages one at time?

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Sergey Sopin
Yep, I was trying to make something like that, but my code doesn't work. 
Messages stuck somewhere and I don't know how to fix it due to lack of 
Scala knowledge. You can find the code in my initial message here. 
I took UniformFanOut as example and unsuccessfully tried to replace single 
Inlet with the set of them.
If you could help me to fix it I would be more than happy :)
Thanks!

Regards,
Sergey

среда, 19 октября 2016 г., 11:32:05 UTC+3 пользователь Konrad Malawski 
написал:
>
> Shapes don't need separate java or scala api, it's shared.
>
> You can just subclass a shape and make a class that directly represents 
> your shape.
> If you want AmorphousShape then sure, but please note that it's purpose is 
> to "forget about the types of those".
>
> If you want a well typed one simply extend Shape and fill in the abstract 
> methods - see FlowShape etc for examples how to do this.
>
> -- 
> Konrad `ktoso` Malawski
> Akka  @ Lightbend 
>
> On 19 October 2016 at 08:03:16, Sergey Sopin (sopi...@gmail.com 
> ) wrote:
>
> Hi, 
>
> Yes, but it seems that I need to create Java API for it, because my app is 
> in Java. 
> I used Inkscape app. to draw the diagram.
>
> Cheers,
> Sergey
>
> среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski 
> написал: 
>>
>> A custom GraphStage [1] using AmorphousShape is probably the way to go in 
>> this case. 
>>
>> That's a really neat diagram, BTW! What software did you us to create it?
>>
>> Cheers,
>> Rafał
>>
>> [1] 
>> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Custom_processing_with_GraphStage
>>
>> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey 
>> Sopin napisał: 
>>>
>>> Hi again,
>>>
>>> I have a very specific case. My flow looks like this one: 
>>>
>>>
>>> 
>>>
>>> The idea of multi input/output shape was to redirect messages to a right 
>>> output based on the message data.
>>>
>>> I just learn streams, so maybe you can suggest a better solution?
>>>
>>> Thanks!
>>>
>>>
>>> Cheers, 
>>>
>>> Sergey
>>>
>>>
>>>
>>> вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
>>> написал: 

 It's not clear to me, what are you trying to accomplish. It looks like 
 you are trying to implement AmorphousShape (ie. arbitrary number of open 
 inlets and outlets) on your own, and then a specific variant of it, that 
 has all inlets sharing the same type, and all outlets sharing another 
 type. 
 The "Fan" fragment in the names you used is a bit misleading, since in 
 Akka 
 Stream's own usage of it names like FanIn / FanOut shape mean that such 
 grap has many inlets and single outlet / single inlet many outlets. The 
 analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
 many blades :) I am wondering what use case you have in mind for your 
 AmorphousShape because the graphs that can be materialized and executed 
 must ultimately have a ClosedShape. You could use such multi-outlet graphs 
 for reusing pieces of functionality, but anything more complex than a 
 BidiShape  seems  rather unwieldy to me. 

 My understanding is that Graph's shape should not interfere with 
 message flow, because it's just a canvas with contact points on the 
 perimeter. What matters are the components that you plug into it. Akka 
 just 
 makes sure that you don't leave any of the contact points dangling. This 
 makes me think that the problems with messages getting "stuck" was caused 
 somewhere other than graph shape construction site.

 Have you tried inserting probes alon the lines of 
 Flow.alsoTo(Sink.foreach(_ 
 => println("beep!"))) (shooting from the hip here, apologies if it 
 does not compile straight away) into your graph? That could help you 
 locate 
 where the messages are stuck / discarded.

 Cheers,
 Rafał

 W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik 
 Sergey Sopin napisał: 
>
> Hi, 
>
> I am trying to create my own akka streams shape with several Inlets 
> and Outlets. I have written following code: 
>
> package kernel.modeller.workers.streamFinder.generic
>
> import akka.stream.{Shape, Outlet, Inlet}
> import scala.annotation.unchecked.uncheckedVariance
> import scala.collection.immutable
>
> object FanShape {
>   sealed trait Init[_] {
> def inlets: immutable.Seq[Inlet[_]]
> def outlets: immutable.Seq[Outlet[_]]
> def name: String
>   }
>   final case class Name[_](override val name: String) extends Init[Any] {
> override def inlets: immutable.Seq[Inlet[_]] = Nil
> override def outlets: immutable.Seq[Outlet[_]] = Nil
>   }
>   final case 

Re: [akka-user] Re: ANNOUNCE: Akka HTTP 3.0.0-RC1

2016-10-19 Thread Konrad Malawski
Thanks for reporting.
We spotted this and have fixed the issue already:
https://github.com/akka/akka-http/pull/409
The fixed artifacts are right now on their way to maven-central.

Happy hakking.

-- 
Konrad 'ktoso’ Malawski
Akka  @ Lightbend 
java.pl / geecon.org / krakowscala.pl / lambdakrk.pl /
sckrk.com

On 19 October 2016 at 11:04:21, Rafał Krzewski (rafal.krzew...@gmail.com)
wrote:

Hi,

I took a quick look at the new Akka HTTP artifacts published on Maven
Central, and I've noticed that artifact names don't have Scala version
suffix in them. Is this intentional?

Cheers,
Rafał

W dniu wtorek, 18 października 2016 00:22:17 UTC+2 użytkownik Konrad
'ktoso' Malawski napisał:
>
> Dear hakkers,
>
> We are proud to announce the first Release Candidate of the Akka HTTP's
> "fully stable" release–the only missing, bit was the Routing DSLs, which we
> now deem stable enough to support for an extended period of time.
>
>
> This release marks the first of the 3.0.0 series of this project and
> signifies a large step in terms of confidence in the library, as well as
> the move of Akka HTTP into its own repository. From now on Akka HTTP will
> be versioned separately from Akka “core”. This has been discussed at large
> with the community on akka-meta , and
> the akka-http  repositories on
> github. Thank you very much for your input!
>
> For more background why this move, please read “Akka HTTP - stable,
> growing and tons of opportunity
> ” on akka-meta. While
> preparing In the meantime we have delivered a Proof-of-Concept of HTTP/2
> for Akka HTTP and plan to continue this work later this year–community help
> is very much welcome on this front as well.
>
> The documentation from now on will be available here:
>
> Some noteworthy changes in the *3.0.0-RC1* (since it's move out from
> 2.4.11) release are:
>
>
>-
>
>New lightbend/paradox powered documentation
>-
>   -
>
>   This will allow us to aggregate it together with Akka and other
>   documentation, as well as link more easily to ScalaDoc pages
>   -
>
>   Akka HTTP documentation will from now on live here:
>   http://doc.akka.io/docs/akka-http/current/index.html
>   
>   -
>
>   We’ll work on a better theme for it very soon.
>   -
>
>Multipart is now correctly Binary MediaType (instead of
>WithOpenCharset) #398 
>-
>
>A new designated mailing-list and page for any critical security
>issues that might come up has been created:
>http://doc.akka.io/docs/akka-http/current/security.html
>
>-
>   -
>
>   Please follow the linked mailing list if you have production Akka
>   systems, so you’ll be the first to know in case a security issue is 
> found
>   and fixed in Akka.
>
>
> The plan regarding releasing a stable 3.0.0 is to wait a little bit for
> community feedback on the release candidates, and call a stable one no
> longer than a few weeks from now. We’re eagerly awaiting your feedback and
> can’t wait to ship the stable version of all of Akka HTTP’s modules!
>
> Credits
>
> A total 15 issues were closed since 2.4.11, most of the work was moving
> source code, documentation and issues to their new places.
>
> The complete list of closed issues can be found on the 3.0.0-RC1
>  milestone on
> github.
>
> For this release we had the help of 14 committers – thank you!
>
> A special thanks to Jonas Fonseca  who did a
> tremendously awesome job at migrating all the docs from sphinx
> (restructuredtext) to paradox (markdown), contributing features that the
> Akka docs needed to upstream Paradox–thanks a lot!
>
> Credits:
>
> commits added removed
>
>   10   22489   24696 Jonas Fonseca
>
>   101927 256 Johannes Rudolph
>
>   10 849 412 Konrad Malawski
>
>4 448 136 Robert Budźko
>
>2  37  37 Bernard Leach
>
>2 107   7 Richard Imaoka
>
>2  26  24 Jakub Kozłowski
>
>1 145 101 Jan @gosubpl
>
>1 108 114 Derek Wyatt
>
>1  45  33 Wojciech Langiewicz
>
>1  49   0 @2beaucoup
>
>1   6   6 Markus Hauck
>
>1   1   1 Ian Forsey
>
>1   1   1 Johan Andrén
>
>
> Happy hakking!
>
> – The Akka Team
>
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the
Google Groups "Akka User List" group.
To unsubscribe from this 

[akka-user] Re: ANNOUNCE: Akka HTTP 3.0.0-RC1

2016-10-19 Thread Rafał Krzewski
Hi,

I took a quick look at the new Akka HTTP artifacts published on Maven 
Central, and I've noticed that artifact names don't have Scala version 
suffix in them. Is this intentional?

Cheers,
Rafał

W dniu wtorek, 18 października 2016 00:22:17 UTC+2 użytkownik Konrad 
'ktoso' Malawski napisał:
>
> Dear hakkers,
>
> We are proud to announce the first Release Candidate of the Akka HTTP's 
> "fully stable" release–the only missing, bit was the Routing DSLs, which we 
> now deem stable enough to support for an extended period of time.
>
>
> This release marks the first of the 3.0.0 series of this project and 
> signifies a large step in terms of confidence in the library, as well as 
> the move of Akka HTTP into its own repository. From now on Akka HTTP will 
> be versioned separately from Akka “core”. This has been discussed at large 
> with the community on akka-meta , and 
> the akka-http  repositories on 
> github. Thank you very much for your input!
>
> For more background why this move, please read “Akka HTTP - stable, 
> growing and tons of opportunity 
> ” on akka-meta. While 
> preparing In the meantime we have delivered a Proof-of-Concept of HTTP/2 
> for Akka HTTP and plan to continue this work later this year–community help 
> is very much welcome on this front as well.
>
> The documentation from now on will be available here: 
>
> Some noteworthy changes in the *3.0.0-RC1* (since it's move out from 
> 2.4.11) release are:
>
>
>- 
>
>New lightbend/paradox powered documentation
>- 
>   
>   This will allow us to aggregate it together with Akka and other 
>   documentation, as well as link more easily to ScalaDoc pages
>   - 
>   
>   Akka HTTP documentation will from now on live here: 
>   http://doc.akka.io/docs/akka-http/current/index.html
>   - 
>   
>   We’ll work on a better theme for it very soon.
>   - 
>
>Multipart is now correctly Binary MediaType (instead of 
>WithOpenCharset) #398 
>- 
>
>A new designated mailing-list and page for any critical security 
>issues that might come up has been created: 
>http://doc.akka.io/docs/akka-http/current/security.html 
>- 
>   
>   Please follow the linked mailing list if you have production Akka 
>   systems, so you’ll be the first to know in case a security issue is 
> found 
>   and fixed in Akka.
>   
>
> The plan regarding releasing a stable 3.0.0 is to wait a little bit for 
> community feedback on the release candidates, and call a stable one no 
> longer than a few weeks from now. We’re eagerly awaiting your feedback and 
> can’t wait to ship the stable version of all of Akka HTTP’s modules!
>
> Credits
>
> A total 15 issues were closed since 2.4.11, most of the work was moving 
> source code, documentation and issues to their new places.
>
> The complete list of closed issues can be found on the 3.0.0-RC1 
>  milestone on 
> github.
>
> For this release we had the help of 14 committers – thank you!
>
> A special thanks to Jonas Fonseca  who did a 
> tremendously awesome job at migrating all the docs from sphinx 
> (restructuredtext) to paradox (markdown), contributing features that the 
> Akka docs needed to upstream Paradox–thanks a lot!
>
> Credits:
>
> commits added removed
>
>   10   22489   24696 Jonas Fonseca
>
>   101927 256 Johannes Rudolph
>
>   10 849 412 Konrad Malawski
>
>4 448 136 Robert Budźko
>
>2  37  37 Bernard Leach
>
>2 107   7 Richard Imaoka
>
>2  26  24 Jakub Kozłowski
>
>1 145 101 Jan @gosubpl
>
>1 108 114 Derek Wyatt
>
>1  45  33 Wojciech Langiewicz
>
>1  49   0 @2beaucoup
>
>1   6   6 Markus Hauck
>
>1   1   1 Ian Forsey
>
>1   1   1 Johan Andrén
>
>
> Happy hakking!
>
> – The Akka Team
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Konrad Malawski
Shapes don't need separate java or scala api, it's shared.

You can just subclass a shape and make a class that directly represents
your shape.
If you want AmorphousShape then sure, but please note that it's purpose is
to "forget about the types of those".

If you want a well typed one simply extend Shape and fill in the abstract
methods - see FlowShape etc for examples how to do this.

-- 
Konrad `ktoso` Malawski
Akka  @ Lightbend 

On 19 October 2016 at 08:03:16, Sergey Sopin (sopin1...@gmail.com) wrote:

Hi,

Yes, but it seems that I need to create Java API for it, because my app is
in Java.
I used Inkscape app. to draw the diagram.

Cheers,
Sergey

среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski
написал:
>
> A custom GraphStage [1] using AmorphousShape is probably the way to go in
> this case.
>
> That's a really neat diagram, BTW! What software did you us to create it?
>
> Cheers,
> Rafał
>
> [1] http://doc.akka.io/docs/akka/2.4/scala/stream/stream-
> customize.html#Custom_processing_with_GraphStage
>
> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey Sopin
> napisał:
>>
>> Hi again,
>>
>> I have a very specific case. My flow looks like this one:
>>
>>
>> 
>>
>> The idea of multi input/output shape was to redirect messages to a right
>> output based on the message data.
>>
>> I just learn streams, so maybe you can suggest a better solution?
>>
>> Thanks!
>>
>>
>> Cheers,
>>
>> Sergey
>>
>>
>>
>> вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski
>> написал:
>>>
>>> It's not clear to me, what are you trying to accomplish. It looks like
>>> you are trying to implement AmorphousShape (ie. arbitrary number of open
>>> inlets and outlets) on your own, and then a specific variant of it, that
>>> has all inlets sharing the same type, and all outlets sharing another type.
>>> The "Fan" fragment in the names you used is a bit misleading, since in Akka
>>> Stream's own usage of it names like FanIn / FanOut shape mean that such
>>> grap has many inlets and single outlet / single inlet many outlets. The
>>> analogy is to a Chinese-style hand held fan, rather than ceiling fan with
>>> many blades :) I am wondering what use case you have in mind for your
>>> AmorphousShape because the graphs that can be materialized and executed
>>> must ultimately have a ClosedShape. You could use such multi-outlet graphs
>>> for reusing pieces of functionality, but anything more complex than a
>>> BidiShape  seems  rather unwieldy to me.
>>>
>>> My understanding is that Graph's shape should not interfere with message
>>> flow, because it's just a canvas with contact points on the perimeter. What
>>> matters are the components that you plug into it. Akka just makes sure that
>>> you don't leave any of the contact points dangling. This makes me think
>>> that the problems with messages getting "stuck" was caused somewhere other
>>> than graph shape construction site.
>>>
>>> Have you tried inserting probes alon the lines of Flow.alsoTo(Sink.foreach(_
>>> => println("beep!"))) (shooting from the hip here, apologies if it does
>>> not compile straight away) into your graph? That could help you locate
>>> where the messages are stuck / discarded.
>>>
>>> Cheers,
>>> Rafał
>>>
>>> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik
>>> Sergey Sopin napisał:

 Hi,

 I am trying to create my own akka streams shape with several Inlets and
 Outlets. I have written following code:

 package kernel.modeller.workers.streamFinder.generic

 import akka.stream.{Shape, Outlet, Inlet}
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.immutable

 object FanShape {
   sealed trait Init[_] {
 def inlets: immutable.Seq[Inlet[_]]
 def outlets: immutable.Seq[Outlet[_]]
 def name: String
   }
   final case class Name[_](override val name: String) extends Init[Any] {
 override def inlets: immutable.Seq[Inlet[_]] = Nil
 override def outlets: immutable.Seq[Outlet[_]] = Nil
   }
   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
 override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
 override def name: String = "FanShape"
   }
 }

 abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
 Iterator[Outlet[_]], _name: String) extends Shape {

   import FanShape._

   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
 init.outlets.iterator, init.name)

   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
   final override def inlets: immutable.Seq[Inlet[_]] = _inlets

   private var _outlets: Vector[Outlet[_]] = Vector.empty
   private 

[akka-user] Re: [akka-stream] Problems with the shape creation

2016-10-19 Thread Sergey Sopin
Hi,

Yes, but it seems that I need to create Java API for it, because my app is 
in Java. 
I used Inkscape app. to draw the diagram.

Cheers,
Sergey

среда, 19 октября 2016 г., 0:46:00 UTC+3 пользователь Rafał Krzewski 
написал:
>
> A custom GraphStage [1] using AmorphousShape is probably the way to go in 
> this case.
>
> That's a really neat diagram, BTW! What software did you us to create it?
>
> Cheers,
> Rafał
>
> [1] 
> http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html#Custom_processing_with_GraphStage
>
> W dniu wtorek, 18 października 2016 22:12:07 UTC+2 użytkownik Sergey Sopin 
> napisał:
>>
>> Hi again,
>>
>> I have a very specific case. My flow looks like this one:
>>
>>
>> 
>>
>> The idea of multi input/output shape was to redirect messages to a right 
>> output based on the message data.
>>
>> I just learn streams, so maybe you can suggest a better solution?
>>
>> Thanks!
>>
>>
>> Cheers, 
>>
>> Sergey
>>
>>
>>
>> вторник, 18 октября 2016 г., 18:34:22 UTC+3 пользователь Rafał Krzewski 
>> написал:
>>>
>>> It's not clear to me, what are you trying to accomplish. It looks like 
>>> you are trying to implement AmorphousShape (ie. arbitrary number of open 
>>> inlets and outlets) on your own, and then a specific variant of it, that 
>>> has all inlets sharing the same type, and all outlets sharing another type. 
>>> The "Fan" fragment in the names you used is a bit misleading, since in Akka 
>>> Stream's own usage of it names like FanIn / FanOut shape mean that such 
>>> grap has many inlets and single outlet / single inlet many outlets. The 
>>> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
>>> many blades :) I am wondering what use case you have in mind for your 
>>> AmorphousShape because the graphs that can be materialized and executed 
>>> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
>>> for reusing pieces of functionality, but anything more complex than a 
>>> BidiShape  seems  rather unwieldy to me.
>>>
>>> My understanding is that Graph's shape should not interfere with message 
>>> flow, because it's just a canvas with contact points on the perimeter. What 
>>> matters are the components that you plug into it. Akka just makes sure that 
>>> you don't leave any of the contact points dangling. This makes me think 
>>> that the problems with messages getting "stuck" was caused somewhere other 
>>> than graph shape construction site.
>>>
>>> Have you tried inserting probes alon the lines of 
>>> Flow.alsoTo(Sink.foreach(_ 
>>> => println("beep!"))) (shooting from the hip here, apologies if it does 
>>> not compile straight away) into your graph? That could help you locate 
>>> where the messages are stuck / discarded.
>>>
>>> Cheers,
>>> Rafał
>>>
>>> W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik 
>>> Sergey Sopin napisał:

 Hi,

 I am trying to create my own akka streams shape with several Inlets and 
 Outlets. I have written following code: 

 package kernel.modeller.workers.streamFinder.generic

 import akka.stream.{Shape, Outlet, Inlet}
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.immutable

 object FanShape {
   sealed trait Init[_] {
 def inlets: immutable.Seq[Inlet[_]]
 def outlets: immutable.Seq[Outlet[_]]
 def name: String
   }
   final case class Name[_](override val name: String) extends Init[Any] {
 override def inlets: immutable.Seq[Inlet[_]] = Nil
 override def outlets: immutable.Seq[Outlet[_]] = Nil
   }
   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
 override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
 override def name: String = "FanShape"
   }
 }

 abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
 Iterator[Outlet[_]], _name: String) extends Shape {
   
   import FanShape._

   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
 init.outlets.iterator, init.name)

   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
   final override def inlets: immutable.Seq[Inlet[_]] = _inlets

   private var _outlets: Vector[Outlet[_]] = Vector.empty
   private var _inlets: Vector[Inlet[_]] = Vector.empty

   protected def newOutlet[T](name: String): Outlet[T] = {
 val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
 Outlet[T](s"${_name}.$name")
 _outlets :+= p
 p
   }

   protected def newInlet[T](name: String): Inlet[T] = {
 val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
 Inlet[T](s"${_name}.$name")
 _inlets :+= p
 p
   }

   protected def