I think you need to use blockWhenFull on the producer, so in your to() call.

On Thu, Feb 3, 2022 at 10:33 AM Craig Taylor <ctalk...@ctalkobt.net> wrote:

> I've got a scenario where I need to iterate through all users within a
> system and send them to a remote as part of a routine "sync" to ensure that
> both systems are in agreement.
>
> User imports are also supported so there is some initial aggregation done
> on id in an attempt to minimize updates to the 3rd party system.   It's ok
> to send duplicates over, just not ideal so the initial aggregation has a
> size / timeout based aggregation.
>
> The 3rd party system (2nd step) is intended to aggregate and send those
> updates in fixed size batches.   Changing it to a simple "mock:blah"
> doesn't change behavior at this point.
>
> I'm running into issues when sending to the initial seda queue (see
> attached github example) and getting a queue full.  This occurs even when
> I'm attempting to use blockWhenFull and other parameters.
>
> Github at https://github.com/CTalkobt/issues .  (ref structure - Only
> specific to this issue at this point; M
> urphy will expand it in the future.)
>
> Route is :
>
> >         from("seda://incoming")
> >                 .routeId("incoming")
> >                 .aggregate(simple("${body.id}"),
> > AggregationStrategies.flexible().accumulateInCollection(ArrayList.class)
> )
> >                 .completionSize(100)
> >                 .completionTimeout(500)
> >                 .process(exch -> logger.info("incoming 1:" +
> > exch.getIn().getBody()))
> >                 .to("seda://part2");
>
>
>         from("seda://part2")
>                 .aggregate(constant(1),
> AggregationStrategies.flexible().accumulateInCollection(ArrayList.class))
>
>                     .completionTimeout(500)
>                     .completionSize(500)
>                 .process(exch -> logger.info("Got : " +
> exch.getIn().getBody(List.class).size()) );
>     }
>
> Stack trace:
>
> > org.apache.camel.CamelExecutionException: Exception occurred during
> > execution on the exchange: Exchange[99618F8E0720C3E-00000000000003E8]
> > at
> >
> org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)
> > ~[camel-api-3.14.1.jar:3.14.1]
> > at
> >
> org.apache.camel.support.ExchangeHelper.extractResultBody(ExchangeHelper.java:687)
> > ~[camel-support-3.14.1.jar:3.14.1]
> > at
> >
> org.apache.camel.impl.engine.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:591)
> > ~[camel-base-engine-3.14.1.jar:3.14.1]
> > ....
> > Caused by: java.lang.IllegalStateException: Queue full
> > at java.util.AbstractQueue.add(AbstractQueue.java:98) ~[na:1.8.0_312]
> > at
> >
> org.apache.camel.component.seda.SedaProducer.addToQueue(SedaProducer.java:251)
> > ~[camel-seda-3.14.1.jar:3.14.1]
> > at
> >
> org.apache.camel.component.seda.SedaProducer.process(SedaProducer.java:149)
> > ~[camel-seda-3.14.1.jar:3.14.1]
>
>
>
> Thanks,
> --
> -------------------------------------------
> Craig Taylor
> ctalk...@ctalkobt.net
>

Reply via email to