Hi, I'm trying to use the integration of Camel with reactive streams in a Spring project, but I'm stuck. This is my RouterBuilder:
@Component public class MyRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { from("direct:pluto").to("reactive-streams:paperino"); } } and this is my RestController: @GetMapping(path = "/hello2") public Flux<String> hello2() { Publisher<String> paperinoPublisher = camel.fromStream("paperino", String.class); Mono<String> bodyToMono = wcBuilder.baseUrl("http://httpbin.org ").build().get().uri("/get").retrieve().bodyToMono(String.class); Flux<String> flux = Flux.from(paperinoPublisher).doOnNext(s -> log.info("Hello from flux: {}",s) ) .flatMap(s -> bodyToMono) .doOnNext(b -> log.info("Response: {}",b)); producer.sendBody("direct:pluto","ciao"); return flux; } When Rest endpoint is called, I got this exception raised by the call to "sendBody": Caused by: java.lang.IllegalStateException: The stream has no active subscriptions at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:110) ~[camel-reactive-streams-3.5.0.jar:3.5.0] at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:151) ~[camel-reactive-streams-3.5.0.jar:3.5.0] at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52) ~[camel-reactive-streams-3.5.0.jar:3.5.0] at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:169) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:404) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:60) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.processor.Pipeline.process(Pipeline.java:147) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:287) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:67) ~[camel-direct-3.5.0.jar:3.5.0] at org.apache.camel.processor.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:217) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.processor.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.processor.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.impl.engine.DefaultProducerCache.send(DefaultProducerCache.java:189) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:175) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:171) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:152) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.impl.engine.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:186) ~[camel-base-3.5.0.jar:3.5.0] at org.apache.camel.impl.engine.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:194) ~[camel-base-3.5.0.jar:3.5.0] at eu.tasgroup.camel.CamelRestController.hello2(CamelRestController.java:70) ~[classes/:na] Obviously the subscription must not be done by my code, because it's Spring Webflux which subscribes to the returned flux. Thanks in advance.