Hi Chris


On Sun, Jan 16, 2022 at 7:22 PM Chris Furlong
<chris.furl...@anduin.ai.invalid> wrote:

> Hey all,
>
> I'm using the RedisAggregationRepository for my aggregator, and it
> directly supports recovery via RecoverableAggregationRepository. I
> noticed that if I'm using a completionPredicate, and the incoming exchange
> passes that predicate immediately, that the exchange is never added to the
> recovery repository.
>
> I assume this is done for performance reasons, but it also means that in
> the case of a downstream error, recovery is not an option.
>
> Consider the following case:
>
>    1. Message 1 arrives from GCP Pub/Sub
>    2. Message 1 arrives at the aggregator, and Pub/Sub receives an ACK
>    3. Message 1 immediately passes the predicate, so it's not placed in
>    the aggregation repository
>    4. Message 1 fails to process, perhaps because the JVM was terminated
>
> Outcome: Message 1 is lost
> Expected Outcome: Message 1 is recovered from the aggregation repository
>
> I found the code that handles this case and it seems to be explicit, but I
> don't know if it was designed w/ recovery aggregation repositories in mind.
> Unfortunately, I don't see a way to override the behavior since the method
> is private.
>
>
Thanks for bringing this up and providing an analysis of the problem.

Yes it seems like that if the completion predicate causes an immediate
completion (without any previous exchanges partly aggregated) then there is
this corner case with the exchange would not have been placed in the
repository
before and therfore cannot be recovered.

You are welcome to create a JIRA.

And if you want to try to fix or help fix this then you are welcome to send
PRs or suggestions.
Also if you can build an unit test / example that demonstrate this then
that would be good. Especially if we can make an unit test so we have that
to test in the future, and guard against regressions.




> [image: image.png]
>
> On a related note, it'd be nice to add another completion condition (or
> provide the ability for users to extend completion conditions), that uses
> completionTimeout and completionPredicate together with AND logic instead
> of OR logic. That way, we can say at least 2 seconds has to have gone by
> without receiving a correlated message, and at that time, we want to check
> the completionPredicate to see if it passes (basically add logic to check
> the completionPredicate when another completion condition is met). This
> would prevent me from having to chain aggregators together w/ a loop.
>
>
Hmm yeah maybe its just that the combinataions of different completions is
already high. But we can potentially do this. You are welcome to create
another JIRA ticket


> If there's a workaround, I'd really appreciate some advice.
>
> Thanks,
> Chris
>
> --
> *Chris Furlong*
> Co-founder, CTO
> Direct: (843) 345-3884
> Email: chris.furl...@anduin.ai
>
>

-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to