>
> I use Either but I unmarshal all text to Left and JSON to Right
def mapPlain(entity: HttpEntity): Future[Left[String, Nothing]] = {
Unmarshal(entity).to[String].map(Left(_))
}
def mapChecking(entity: HttpEntity): Future[Right[String, AnyRef]] = {
) => {
val successCallback = getAsyncCallback{
(_: Unit) => {
if(mustFinish) completeStage() else pull(in)
waitForHandler = false
}
}
successCallback.invoke(rs)
}
Gary Struthers
Inspired by Jonas Bonér's *Reactive Microservices Architecture* but for
Akka Streams with Kafka, Avro, Cassandra, Akka HTTP, Algebird, & Actors
with Typesafe Config and Logging.
Custom stream stages post-process calls that return errors or throw
exceptions, some use Akka Streams Supervision.
Thanks, It fires now. This was a MockSource just for working out how to do
error handling. The problem was the tests completed before the timer fired.
Gary
On Saturday, August 27, 2016 at 12:23:59 AM UTC-7, drewhk wrote:
>
>
>
>>
> Is this a Source? You omitted the shape... Anyway, it might be
Thanks Konrad, but I posted because I tried scheduleOnce and got no delay
scheduleOnce(logger.debug("1 currentTimeMillis {}",
System.currentTimeMillis()),
FiniteDuration(100, MICROSECONDS))
scheduleOnce(logger.debug("2 currentTimeMillis {}",
System.currentTimeMillis()),
FiniteDuration(100,
Hi,
I'm handling exceptions in a custom GraphStage, with some exceptions I want
to retry after a delay. Is there a preferred way to do this? Do I just call
Thread.sleep?
Gary
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
>>
I think I understand. The other part of my question is when a stream is
within an actor. If the stream has an error where I want the enclosing
actor to stop how do I do that? Also, any advice on testing error handling?
--
>> Read the docs: http://akka.io/docs/
>>
Thanks Konrad, when I skimmed that page I read it as supervision didn't
work with GraphStage, which I use a lot but reading slowly I see it's
GraphStage junction that's not supported and I don't use that. This gives
me what I need.
--
>> Read the docs: http://akka.io/docs/
If an Actor contains a Stream what happens when the stream throws an
exception and there is no stream Decider to handle it? Can the Actor's
supervisor handle it and Resume, Restart, and Stop the Actor with the
stream?
Gary
--
>> Read the docs: http://akka.io/docs/
>>
My Sink receives a Future. I don't want to do anything with it so I use
Sink.ignore but then I don't know when it's completed and that I need to
know. Should I write a custom Sink or does the API already have a solution
for this?
Gary
--
>> Read the docs: http://akka.io/docs/
I'm thinking about a Source that gets external data having a feedback input
to tell it when to get more data. Is there a way to make a shape with an
input and still work as a Source?
Thanks,
Gary
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
>>
> outside of the stream. If your application's design is such that the stream
> is owned by an Actor and the Actor needs to restart itself in case of
> Stream's failure you need to "connect the wires" yourself.
>
> Cheers,
> Rafał
>
> W dniu niedziela, 6 marca 2016 21:48:59 UTC+1
Stream supervision is similar but different to Actor supervision. I don't
see what I'm supposed to do when a stream triggers a Supervision.Stop. What
does it mean that a stream "completes with an error"? Where is the error?
If I create a stream in an Actor is the Actor the stream's supervisor?
I'm calling a 3rd party Java library that returns a Java Future from my
GraphStageLogic onPush(). I want the future to complete before calling
pull(in). Simply blocking with Java Future's get(...) works. So do I really
need to do this within a blocking-dispatcher? If so, how do I setup
The reference doc shows custom Sources and Flows but not Sinks. I wrote a
custom Sink with an InHandler that overrides onPush but it is never called.
I assumed that calling runWith would cause the SinkShape to pull by
default. An upstream custom Flow doesn't receive either onPush or onPull.
Thanks Konrad,
Knowing it wasn't your change was enough, now everything compiles. I was
missing an unrelated dependency and that messed up the build.
On Thursday, February 18, 2016 at 1:06:22 AM UTC-8, Konrad Malawski wrote:
>
> I've migrated and everything compiles except the http route test.
I've migrated and everything compiles except the http route test. Tests
like the one shown don't compile, "route" isn't found.
it should "respond with handled = false for partial path" in {
Get(saPath) ~> route ~> check {
handled shouldEqual false
}
}
I don't see how to
I'm testing a Flow with TestSource and TestSink
"TestSource/TestSink Example" should {
"work" in {
val (pub, sub) = TestSource.probe[MyClass]
.via(myFlow[String])
.toMat(TestSink.probe[String])(Keep.both)
.run()
sub.request(1)
pub.sendNext(myData)
val response =
p.out0 ~> Sink.ignore
> unzip.out1 ~> Sink.ignore
> unzip.out2 ~> Sink.ignore
> ClosedShape
> }).run()
>
> Can you please explain what was the source of the confusion so we can
> improve our docs?
>
> -Endre
>
> On Fri, Jan 22, 2016 at 4:
I want to create a FanOutShape with generic types that takes a function (A)
=> (B, B). I'm getting lost with the UnzipWith, UnzipWith2,
UnzipWithCreator2, and UnzipWithApply. I can't find explanations of how to
use this. Can someone explain how this is supposed to be used?
Thanks,
Gary
--
I want to write a custom Source, like NumberSource in the reference doc and
I want to feed it with a Queue or a BlockingQueue. How should I handle the
case when onPull is received but the queue is empty? I saw an example where
nothing is pushed. I also see there are isAvailable methods but it
clarify your question?
>
> On Sunday, November 15, 2015 at 8:21:07 PM UTC-7, Gary Struthers wrote:
>>
>> 2.0 M1 doc "Note: This is not yet implemented as stated here, this
>> document illustrates intent."
>> OK, things will change but how do I bind an object r
2.0 M1 doc "Note: This is not yet implemented as stated here, this document
illustrates intent."
OK, things will change but how do I bind an object reference to a Flow or
PushStage at materialization time in the current implementation?
Gary
--
>> Read the docs:
I have a flow graph that needs initialization and post processing. I'd like
to do this within an actor. The actor receives a message, the flow graph is
initialized, then the message is passed to it, then its result is sent to
different actors. I don't see this use case in the docs so I don't
>
> The 2.0M-1 docs have a TODO for GraphStage. In the meantime is there
> anything out there to help start using them?
>
Gary
--
>> Read the docs: http://akka.io/docs/
>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>
I have an actor that makes a call returning a Future and it works
def receive = {
case GetAccountBalances(id: Long) ⇒ {
try {
val f = requestCheckingBalances(id, CheckingBalancesClient.
configBaseUrl(hostConfig))
f pipeTo sender
} catch {
case e:
I have a simple scaladsl server and it works with this ScalatestRouteTest
test
it should return existing Checking Account Balances in {
Post(s/account/balances/checking, GetAccountBalances(1L)) ~ routes ~
check {
status shouldBe OK
contentType shouldBe `application/json`
I have a simple scaladsl server and it works with this ScalatestRouteTest
test
it should return existing Checking Account Balances in {
Post(s/account/balances/checking, GetAccountBalances(1L)) ~ routes ~
check {
status shouldBe OK
contentType shouldBe `application/json`
After updating I can't find *akka-http-spray-json-experimental *for RC4 and
get this error
[error] missing or invalid dependency detected while loading class file
'SprayJsonSupport.class'.
[error] Could not access type FlowMaterializer in package akka.stream,
[error] because it (or its
http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-RC3/#akka.http.ConnectionPoolSetup
The first create and constructor arg is options: Traversable
http://www.scala-lang.org/api/2.10.5/index.html#scala.collection.immutable.Traversable
[SocketOption]
The linked to source file
Pardon my myopia, IDE build path was org.parboiled should be
aka-parsing-experimental.
Now it works in ide.
Gary
--
Read the docs: http://akka.io/docs/
Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
Search the archives:
31 matches
Mail list logo