Hey all,

I'm using the RedisAggregationRepository for my aggregator, and it directly
supports recovery via RecoverableAggregationRepository. I noticed that if
I'm using a completionPredicate, and the incoming exchange passes that
predicate immediately, that the exchange is never added to the recovery
repository.

I assume this is done for performance reasons, but it also means that in
the case of a downstream error, recovery is not an option.

Consider the following case:

   1. Message 1 arrives from GCP Pub/Sub
   2. Message 1 arrives at the aggregator, and Pub/Sub receives an ACK
   3. Message 1 immediately passes the predicate, so it's not placed in the
   aggregation repository
   4. Message 1 fails to process, perhaps because the JVM was terminated

Outcome: Message 1 is lost
Expected Outcome: Message 1 is recovered from the aggregation repository

I found the code that handles this case and it seems to be explicit, but I
don't know if it was designed w/ recovery aggregation repositories in mind.
Unfortunately, I don't see a way to override the behavior since the method
is private.

[image: image.png]

On a related note, it'd be nice to add another completion condition (or
provide the ability for users to extend completion conditions), that uses
completionTimeout and completionPredicate together with AND logic instead
of OR logic. That way, we can say at least 2 seconds has to have gone by
without receiving a correlated message, and at that time, we want to check
the completionPredicate to see if it passes (basically add logic to check
the completionPredicate when another completion condition is met). This
would prevent me from having to chain aggregators together w/ a loop.

If there's a workaround, I'd really appreciate some advice.

Thanks,
Chris

-- 
*Chris Furlong*
Co-founder, CTO
Direct: (843) 345-3884
Email: chris.furl...@anduin.ai

Reply via email to