> On September 20, 2018 at 7:30 AM Claus Ibsen <[email protected]> wrote:
>
>
> Hi
>
> You can use the splitter and aggregator to do something like that.
>
> The aggregator can group the splitted messages together based on that
> frame, and it can do this in out of order.
> And the output of the aggregator is routed separated from the input (async)
>
> Maybe try to build a simple use-case / sample / unit test with just
> the splitter and aggregator and see if you can build something.
> And if not then maybe post a bit here again with what you have done,
> so we can better help / understand your use-case.
Hi, Claus.
Thank you for your response.
As it turned out, I didn't need a Splitter or Aggregator at all to implement my
"distributed splitter/aggregator".
All I needed was a ProducerTemplate and some simple Processor classes.
It really ended up being a thing of beauty...
I'll include the sanitized code below for anyone who is interested.
[ First I list the "before" SENDER / RECEIVER, which doesn't have data chunking,
and then the "after" SENDER / RECEIVER which includes a "DataChunker" to do
the Splitting
and a "DataChunkProcessor" to do the Aggregating. ]
Thanks again.
Ron
p.s. And if anyone wants to help me figure out how to get @Autowired to work
for ProducerTemplate, that would be great.
Otherwise, creating and holding onto the first ProducerTemplate I need is
working just fine.
From googling, the best I can determine is that somehow my Processor
@Component classes are being processed
before the CamelContext is fully up (and so there's no default
ProducerTemplate to inject). Yes...?
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
BEFORE, without data chunking (splitting/aggregating).
SENDER:
@Component
public class Route extends RouteBuilder
{
@Override
public void configure () throws Exception
{
restConfiguration()
.host("localhost").port(8080)
.bindingMode(RestBindingMode.json);
getContext().setStreamCaching(true);
from("timer:autos?period={{timer.period}}")
.streamCaching()
.to("rest:get:auto/list")
.to("direct:udp");
from("direct:udp")
.convertBodyTo(String.class, “UTF-8”)
.to("netty4:udp://localhost:{{receiver.port}}?udpConnectionlessSending=true&sync=false")
}
}
--------------------------------------------------------------------------------
RECEIVER:
@Component
public class AdapterRouteBuilder extends RouteBuilder
{
@Override
public void configure () throws Exception
{
JacksonDataFormat autoListJson = new ListJacksonDataFormat(Auto.class);
getContext().setStreamCaching(true);
from("netty4:udp://localhost:{{receiver.port}}?sync=false")
.to("direct:autolist");
from("direct:autolist")
.unmarshal(autoListJson)
.process(new AutoListProcessor()) // creates a protobuf, sets
it in the Exchange
.to("direct:protobuf");
from("direct:protobuf")
.marshal().protobuf()
.to("rabbitmq:{{rabbitmq.exch}}?connectionFactory=#rabbitmq&routingKey={{rabbitmq.rkey}}&exchangeType=topic&durable=false&autoDelete=true");
}
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
AFTER, with data chunking (splitting/aggregating).
SENDER:
@Component
public class Route extends RouteBuilder
{
@Override
public void configure () throws Exception
{
JacksonDataFormat autoListJson = new ListJacksonDataFormat(Auto.class);
JacksonDataFormat dataChunkJson = new
JacksonDataFormat(DataChunk.class);
getContext().setStreamCaching(true);
restConfiguration()
.host("localhost").port(8080)
.bindingMode(RestBindingMode.json);
getContext().setStreamCaching(true);
from("timer:autos?period={{timer.period}}")
.to("rest:get:auto/list")
.to("direct:datachunker");
from("direct:datachunker")
.unmarshal(autoListJson)
.process(new DataChunker()); // creates DataChunk; sends
directly to "direct:udp"
from("direct:udp")
.marshal(dataChunkJson)
.convertBodyTo(String.class, "UTF-8")
.to("netty4:udp://localhost:{{receiver.port}}?udpConnectionlessSending=true&sync=false")
}
}
--------------------------------------------------------------------------------
RECEIVER:
@Component
public class AdapterRouteBuilder extends RouteBuilder
{
@Override
public void configure () throws Exception
{
JacksonDataFormat dataChunkJson = new
JacksonDataFormat(DataChunk.class);
getContext().setStreamCaching(true);
from("netty4:udp://localhost:{{receiver.port}}?sync=false")
.to("direct:datachunk");
from("direct:datachunk")
.unmarshal(dataChunkJson)
.process(new DataChunkProcessor()); // sends directly to
"direct:protobuf"
from("direct:protobuf")
.marshal().protobuf()
.to("rabbitmq:{{rabbitmq.exch}}?connectionFactory=#rabbitmq&routingKey={{rabbitmq.rkey}}&exchangeType=topic&durable=false&autoDelete=true");
}
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
public class DataChunker implements Processor
{
// @Autowired
// private ProducerTemplate producerTemplate; // XXX: can't get this to work
private ProducerTemplate producerTemplate = null;
private static int frame = 0;
public void process (Exchange exchange) throws Exception
{
if ( producerTemplate == null ) producerTemplate =
exchange.getContext().createProducerTemplate();
List<Auto> autos = null;
try {
autos = exchange.getIn().getBody(List.class);
} catch (Exception e) {
System.out.println("*** ERROR: DataChunker: process: " +
e.getMessage());
}
++frame;
int packet = 0;
for ( Auto auto : autos )
{
DataChunk chunk = new DataChunk();
chunk.setFrame(frame);
chunk.setPacket(++packet);
chunk.setTotalPackets(autos.size());
chunk.getList().add(auto);
producerTemplate.sendBody("direct:udp", chunk);
}
// XXX: That's it. Don't need to do: exchange.getIn().setBody(...);
}
}
--------------------------------------------------------------------------------
public class DataChunkProcessor implements Processor
{
private static class DataFrame
{
public int frame = 0;
public int size = 0; // expected # packets, from a DataChunk header
public boolean written = false;
public Set<Integer> packets = new HashSet<Integer>();
public List<Autos> autos = new ArrayList<>();
// XXX: ... stuff left out
}
// @Autowired
// private ProducerTemplate producerTemplate; // XXX: can't get this to work
private ProducerTemplate producerTemplate = null;
private static DataFrame dataFrame = new DataFrame();
public void process (Exchange exchange) throws Exception
{
DataChunk chunk = null;
try {
chunk = exchange.getIn().getBody(DataChunk.class);
} catch (Exception e) {
System.out.println("*** ERROR: DataChunkProcessor: process: " +
e.getMessage());
}
// XXX: process DataChunk, store stuff in 'dataFrame', determine if it
is time to write, etc.
if ( it_is_time_to_write ) {
buildAndWrite(exchange);
}
// XXX: don't have to do anything to the Exchange; the route has
effectively ended in RouteBuilder
}
public void buildAndWrite (Exchange exchange) throws ParseException
{
if ( producerTemplate == null ) producerTemplate =
exchange.getContext().createProducerTemplate();
MyProtobuf proto = buildProtobuf(); // XXX: loops on dataFrame.autos
and builds custom protobuf object
producerTemplate.sendBody("direct:protobuf", proto);
dataFrame.written = true;
}
}
> On Thu, Sep 20, 2018 at 7:15 AM Ron Cecchini <[email protected]> wrote:
> >
> >
> > So, I have a situation where I need something like a Splitter and an
> > Aggregator.
> > But as far as I can tell from reading and googling, maybe my situation is
> > nonstandard?
> > From what I can tell, a Splitter and Aggregator are used together within a
> > single route.
> > In my case, I need the Splitter and Aggregator separated into a sender and
> > receiver, resp.
> >
> >
> > I'm just looking for someone to tell me if the following fits squarely
> > within the Splitter
> > and Aggregator patterns - if so, I'll dig in and figure it out - or if
> > there's another pattern
> > or something else to try.
> >
> >
> > Thank you in advance for your guidance, and sorry for being so verbose
> > again (just trying to be clear).
> >
> >
> > -----
> >
> >
> > On the Splitter side, per usual, I need to split a big message into
> > individual messages.
> > However, I can't just split and let each individual message continue on the
> > route.
> > Instead, I need to "wrap" each individual message and stick some header
> > information on it
> >
> >
> > The situation is very much like the following, which is very UDP-like:
> >
> >
> > Big messages come in, and they get split into "packages" of a preset size.
> > All the individual "packages" can be said to belong to a "frame" of data.
> > The header of the individual messages contain the Frame # and Package # and
> > the Total #
> > of packages in the frame so the receiver knows when it has received a full
> > frame of data.
> >
> > Message: 1
> > Frame: 1 - Package: 1 - Total: 3
> > Frame: 1 - Package: 2 - Total: 3
> > Frame: 1 - Package: 3 - Total: 3
> > Message: 2
> > Frame: 2 - Package: 1 - Total: 2
> > Frame: 2 - Package: 2 - Total: 2
> >
> >
> > Etc.
> >
> >
> > If I can't accomplish this with a split() of some kind, how could I do it
> > with a regular Processor?
> > Having a Processor manually split and bundle the data into "packages" is
> > trivial.
> > But how does the Processor then write the individual messages back to a
> > "direct:processPackage" route point?
> > Can a Processor invoke (write data to) a route, at some point in the middle
> > of that route?
> >
> >
> > -----
> >
> >
> > The Aggregator, as you would expect, needs to do the opposite of the above:
> >
> >
> > It needs to aggregate "packages" of data until it determines it has a full
> > "frame".
> > Then it bundles all the package payloads into a single, big message.
> > When a frame is not full, data does not flow to the rest of the route.
> > When the frame is full, the data is written to some route mid-point; e.g.
> > "direct:translateMessage".
> >
> >
> > So, can this sort of "asynchronous" aggregating be done?
> > Can an aggregating Processor basically maintain state, and decide to write
> > or not write to a route?
> >
> >
> > Thank you again for any pointers.
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2