> On September 20, 2018 at 7:30 AM Claus Ibsen <claus.ib...@gmail.com> wrote: > > > Hi > > You can use the splitter and aggregator to do something like that. > > The aggregator can group the splitted messages together based on that > frame, and it can do this in out of order. > And the output of the aggregator is routed separated from the input (async) > > Maybe try to build a simple use-case / sample / unit test with just > the splitter and aggregator and see if you can build something. > And if not then maybe post a bit here again with what you have done, > so we can better help / understand your use-case.
Hi, Claus. Thank you for your response. As it turned out, I didn't need a Splitter or Aggregator at all to implement my "distributed splitter/aggregator". All I needed was a ProducerTemplate and some simple Processor classes. It really ended up being a thing of beauty... I'll include the sanitized code below for anyone who is interested. [ First I list the "before" SENDER / RECEIVER, which doesn't have data chunking, and then the "after" SENDER / RECEIVER which includes a "DataChunker" to do the Splitting and a "DataChunkProcessor" to do the Aggregating. ] Thanks again. Ron p.s. And if anyone wants to help me figure out how to get @Autowired to work for ProducerTemplate, that would be great. Otherwise, creating and holding onto the first ProducerTemplate I need is working just fine. From googling, the best I can determine is that somehow my Processor @Component classes are being processed before the CamelContext is fully up (and so there's no default ProducerTemplate to inject). Yes...? -------------------------------------------------------------------------------- -------------------------------------------------------------------------------- BEFORE, without data chunking (splitting/aggregating). SENDER: @Component public class Route extends RouteBuilder { @Override public void configure () throws Exception { restConfiguration() .host("localhost").port(8080) .bindingMode(RestBindingMode.json); getContext().setStreamCaching(true); from("timer:autos?period={{timer.period}}") .streamCaching() .to("rest:get:auto/list") .to("direct:udp"); from("direct:udp") .convertBodyTo(String.class, “UTF-8”) .to("netty4:udp://localhost:{{receiver.port}}?udpConnectionlessSending=true&sync=false") } } -------------------------------------------------------------------------------- RECEIVER: @Component public class AdapterRouteBuilder extends RouteBuilder { @Override public void configure () throws Exception { JacksonDataFormat autoListJson = new ListJacksonDataFormat(Auto.class); getContext().setStreamCaching(true); from("netty4:udp://localhost:{{receiver.port}}?sync=false") .to("direct:autolist"); from("direct:autolist") .unmarshal(autoListJson) .process(new AutoListProcessor()) // creates a protobuf, sets it in the Exchange .to("direct:protobuf"); from("direct:protobuf") .marshal().protobuf() .to("rabbitmq:{{rabbitmq.exch}}?connectionFactory=#rabbitmq&routingKey={{rabbitmq.rkey}}&exchangeType=topic&durable=false&autoDelete=true"); } -------------------------------------------------------------------------------- -------------------------------------------------------------------------------- AFTER, with data chunking (splitting/aggregating). SENDER: @Component public class Route extends RouteBuilder { @Override public void configure () throws Exception { JacksonDataFormat autoListJson = new ListJacksonDataFormat(Auto.class); JacksonDataFormat dataChunkJson = new JacksonDataFormat(DataChunk.class); getContext().setStreamCaching(true); restConfiguration() .host("localhost").port(8080) .bindingMode(RestBindingMode.json); getContext().setStreamCaching(true); from("timer:autos?period={{timer.period}}") .to("rest:get:auto/list") .to("direct:datachunker"); from("direct:datachunker") .unmarshal(autoListJson) .process(new DataChunker()); // creates DataChunk; sends directly to "direct:udp" from("direct:udp") .marshal(dataChunkJson) .convertBodyTo(String.class, "UTF-8") .to("netty4:udp://localhost:{{receiver.port}}?udpConnectionlessSending=true&sync=false") } } -------------------------------------------------------------------------------- RECEIVER: @Component public class AdapterRouteBuilder extends RouteBuilder { @Override public void configure () throws Exception { JacksonDataFormat dataChunkJson = new JacksonDataFormat(DataChunk.class); getContext().setStreamCaching(true); from("netty4:udp://localhost:{{receiver.port}}?sync=false") .to("direct:datachunk"); from("direct:datachunk") .unmarshal(dataChunkJson) .process(new DataChunkProcessor()); // sends directly to "direct:protobuf" from("direct:protobuf") .marshal().protobuf() .to("rabbitmq:{{rabbitmq.exch}}?connectionFactory=#rabbitmq&routingKey={{rabbitmq.rkey}}&exchangeType=topic&durable=false&autoDelete=true"); } -------------------------------------------------------------------------------- -------------------------------------------------------------------------------- public class DataChunker implements Processor { // @Autowired // private ProducerTemplate producerTemplate; // XXX: can't get this to work private ProducerTemplate producerTemplate = null; private static int frame = 0; public void process (Exchange exchange) throws Exception { if ( producerTemplate == null ) producerTemplate = exchange.getContext().createProducerTemplate(); List<Auto> autos = null; try { autos = exchange.getIn().getBody(List.class); } catch (Exception e) { System.out.println("*** ERROR: DataChunker: process: " + e.getMessage()); } ++frame; int packet = 0; for ( Auto auto : autos ) { DataChunk chunk = new DataChunk(); chunk.setFrame(frame); chunk.setPacket(++packet); chunk.setTotalPackets(autos.size()); chunk.getList().add(auto); producerTemplate.sendBody("direct:udp", chunk); } // XXX: That's it. Don't need to do: exchange.getIn().setBody(...); } } -------------------------------------------------------------------------------- public class DataChunkProcessor implements Processor { private static class DataFrame { public int frame = 0; public int size = 0; // expected # packets, from a DataChunk header public boolean written = false; public Set<Integer> packets = new HashSet<Integer>(); public List<Autos> autos = new ArrayList<>(); // XXX: ... stuff left out } // @Autowired // private ProducerTemplate producerTemplate; // XXX: can't get this to work private ProducerTemplate producerTemplate = null; private static DataFrame dataFrame = new DataFrame(); public void process (Exchange exchange) throws Exception { DataChunk chunk = null; try { chunk = exchange.getIn().getBody(DataChunk.class); } catch (Exception e) { System.out.println("*** ERROR: DataChunkProcessor: process: " + e.getMessage()); } // XXX: process DataChunk, store stuff in 'dataFrame', determine if it is time to write, etc. if ( it_is_time_to_write ) { buildAndWrite(exchange); } // XXX: don't have to do anything to the Exchange; the route has effectively ended in RouteBuilder } public void buildAndWrite (Exchange exchange) throws ParseException { if ( producerTemplate == null ) producerTemplate = exchange.getContext().createProducerTemplate(); MyProtobuf proto = buildProtobuf(); // XXX: loops on dataFrame.autos and builds custom protobuf object producerTemplate.sendBody("direct:protobuf", proto); dataFrame.written = true; } } > On Thu, Sep 20, 2018 at 7:15 AM Ron Cecchini <roncecch...@comcast.net> wrote: > > > > > > So, I have a situation where I need something like a Splitter and an > > Aggregator. > > But as far as I can tell from reading and googling, maybe my situation is > > nonstandard? > > From what I can tell, a Splitter and Aggregator are used together within a > > single route. > > In my case, I need the Splitter and Aggregator separated into a sender and > > receiver, resp. > > > > > > I'm just looking for someone to tell me if the following fits squarely > > within the Splitter > > and Aggregator patterns - if so, I'll dig in and figure it out - or if > > there's another pattern > > or something else to try. > > > > > > Thank you in advance for your guidance, and sorry for being so verbose > > again (just trying to be clear). > > > > > > ----- > > > > > > On the Splitter side, per usual, I need to split a big message into > > individual messages. > > However, I can't just split and let each individual message continue on the > > route. > > Instead, I need to "wrap" each individual message and stick some header > > information on it > > > > > > The situation is very much like the following, which is very UDP-like: > > > > > > Big messages come in, and they get split into "packages" of a preset size. > > All the individual "packages" can be said to belong to a "frame" of data. > > The header of the individual messages contain the Frame # and Package # and > > the Total # > > of packages in the frame so the receiver knows when it has received a full > > frame of data. > > > > Message: 1 > > Frame: 1 - Package: 1 - Total: 3 > > Frame: 1 - Package: 2 - Total: 3 > > Frame: 1 - Package: 3 - Total: 3 > > Message: 2 > > Frame: 2 - Package: 1 - Total: 2 > > Frame: 2 - Package: 2 - Total: 2 > > > > > > Etc. > > > > > > If I can't accomplish this with a split() of some kind, how could I do it > > with a regular Processor? > > Having a Processor manually split and bundle the data into "packages" is > > trivial. > > But how does the Processor then write the individual messages back to a > > "direct:processPackage" route point? > > Can a Processor invoke (write data to) a route, at some point in the middle > > of that route? > > > > > > ----- > > > > > > The Aggregator, as you would expect, needs to do the opposite of the above: > > > > > > It needs to aggregate "packages" of data until it determines it has a full > > "frame". > > Then it bundles all the package payloads into a single, big message. > > When a frame is not full, data does not flow to the rest of the route. > > When the frame is full, the data is written to some route mid-point; e.g. > > "direct:translateMessage". > > > > > > So, can this sort of "asynchronous" aggregating be done? > > Can an aggregating Processor basically maintain state, and decide to write > > or not write to a route? > > > > > > Thank you again for any pointers. > > > > -- > Claus Ibsen > ----------------- > http://davsclaus.com @davsclaus > Camel in Action 2: https://www.manning.com/ibsen2