Hi,
I'm trying to use the integration of Camel with reactive streams in a
Spring project, but I'm stuck.
This is my RouterBuilder:

@Component
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
    from("direct:pluto").to("reactive-streams:paperino");
  }
}

and this is my RestController:

@GetMapping(path = "/hello2")
public Flux<String> hello2() {

Publisher<String> paperinoPublisher = camel.fromStream("paperino",
String.class);
Mono<String> bodyToMono = wcBuilder.baseUrl("http://httpbin.org
").build().get().uri("/get").retrieve().bodyToMono(String.class);

Flux<String> flux = Flux.from(paperinoPublisher).doOnNext(s -> log.info("Hello
from flux: {}",s) )
.flatMap(s -> bodyToMono)
.doOnNext(b -> log.info("Response: {}",b));

producer.sendBody("direct:pluto","ciao");
return  flux;
}

When Rest endpoint is called, I got this exception raised by the call to
"sendBody":

Caused by: java.lang.IllegalStateException: The stream has no active
subscriptions
at
org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:110)
~[camel-reactive-streams-3.5.0.jar:3.5.0]
at
org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:151)
~[camel-reactive-streams-3.5.0.jar:3.5.0]
at
org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52)
~[camel-reactive-streams-3.5.0.jar:3.5.0]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:169)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:404)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60)
~[camel-base-3.5.0.jar:3.5.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:147)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:287)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:67)
~[camel-direct-3.5.0.jar:3.5.0]
at
org.apache.camel.processor.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:217)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.processor.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.processor.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.impl.engine.DefaultProducerCache.send(DefaultProducerCache.java:189)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:175)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:171)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:152)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.impl.engine.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:186)
~[camel-base-3.5.0.jar:3.5.0]
at
org.apache.camel.impl.engine.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:194)
~[camel-base-3.5.0.jar:3.5.0]
at
eu.tasgroup.camel.CamelRestController.hello2(CamelRestController.java:70)
~[classes/:na]

Obviously the subscription must not be done by my code, because it's Spring
Webflux which subscribes to the returned flux.
Thanks in advance.

Reply via email to