[akka-user] Re: streams: unroll -> sequence for flow -> reroll -- howto?

2017-05-18 Thread ben fleis
Posting my owner discovery for future readers/searchers, especially since 
it's so simple :)

The short answer: splitWhen(ign => true)

The code is probably not idiomatic, being my second short foray into scala. 
It does compile and run on my setup. Any general thoughts/improvements are 
appreciated :)

package spss

import scala.concurrent._
import scala.concurrent.duration._

import akka.actor.{ ActorSystem }
import akka.stream._
import akka.stream.scaladsl._

object BatchSplitWhen {
  val system = ActorSystem.create("batch2")
  implicit val materializer = ActorMaterializer.create(system)

  def main(args: Array[String]): Unit = {
val batchesStr = s"a word or two\nseparated by newlines\nmakes for a 
simple\nstreaming batch test."
val batches = 
Source(batchesStr.split("\n").to[collection.immutable.Seq])
*val substreams = batches.splitWhen(s => true)*
val toWords = substreams.mapConcat(_.split(" 
").to[collection.immutable.Seq])
val toUpper = toWords.map(_.toUpperCase)
val toLine = toUpper.reduce(_ + "-" + _)
*val toLines = toLine.mergeSubstreams.runFold("")(_ + "\n" + _)*

val res = Await.result(toLines, 5 seconds)
println(res)
system.terminate
  }
}

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] Re: streams: unroll -> sequence for flow -> reroll -- howto?

2017-05-11 Thread ben fleis
Thanks for the follow up -- the idea you posted is related, but not quite 
it.

I wrote a self-contained working example below. What I do in this example 
is manually materialize a graph, and wrap it as a future, making it usable 
by mapAsync.

*Ideal: to define an entire graph such that the materialized subgraphs in 
lineRewriter are instantiated and managed by akka itself.*

Can the code below be converted to do such a thing? (Scala responses also 
great, but this happens to be java.)

b

---


// materialize entire subgraph as an async func
public static CompletionStage lineRewriter(Materializer mat, 
String line) {
return Source
.from(Arrays.asList(line.split(" ")))   // unroll
.via(Flow.fromFunction(String::toUpperCase))// per-item 
processing
.runReduce((acc, w) -> acc + " " + w, mat); // rollup
}

public static void main(String[] args) {
final String batchesStr = "a word or two\nseparated by 
newlines\nmakes for a simple\nstreaming batch test.";
final Source batches = 
Source.from(Arrays.asList(batchesStr.split("\n")));
final Sink printResp = 
Sink.foreach(System.out::println);

batches.mapAsync(1, line -> lineRewriter(materializer, 
line)).toMat(printResp, Keep.right())
.run(materializer)
.thenApply(d -> { system.terminate(); return d; })
.exceptionally(t -> { system.terminate(); return 
Done.getInstance(); });
}



On Wednesday, May 10, 2017 at 1:53:43 PM UTC+2, Julian Howarth wrote:
>
> I may have misunderstood what you're trying to do but I think you can 
> probably use expand for this. In builder pseudocode, something like:
>
> Flow ~> Unzip ~>  ~> Zip
>   ~> Flow.expand(Iterator.continually(_))~> 
>


-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


[akka-user] streams: unroll -> sequence for flow -> reroll -- howto?

2017-05-09 Thread ben fleis
Hi,

I am new to akka streams, and working on a project having both real-time 
and batch needs. In the batch cases, I need to take context (e.g., 
requestID), from the initial request, and apply it at the end of batch-item 
processing. The batch-item subflow is by itself rather simple: (parse, 
mapConcat, transform).  The item transforms comes from the real-time 
context.

It is the folding into an output format that is less obvious. That code 
needs the request ID and other context from the original request to 
generate an output byte stream.

What I want to do is treat the context and input bytes as a tuple, unzip 
them in the beginning, zip them at the fold stage. I need to understand how 
to map each input byte stream into a sequence of stream elements in a 
sub-flow, and fold each subsequence in its natural grouping.

mapConcat flattens, thus dropping the boundaries between adjacent batch 
requests.

I could use map, and materialize a new stream within each batch handler for 
its items, and send the folded result as the output of the flow... but that 
seems strange, and likely to be already available. (Especially since 
GroupBy and SubFlow seem closely related...)

Does this exist?

Thanks,

b

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.