With trident the original args going into the each never go away, so you don't
have to output them.
tridentTopology .newDRPCStream("crmc", null) .each(new
Fields("args"), new Fn1(), new Fields("B")) .each(new
Fields("args","B"), new Fn2(), new Fields("C")) .each(new
Fields("args","C"), new Fn3(), new Fields("D"));
By removing the "args" being output from Fn2() it should work for you. I know
it is a bit odd but that is how it works.
- Bobby
On Thursday, August 3, 2017, 10:32:17 AM CDT, J.R. Pauley <[email protected]>
wrote:
I'm scratching my head trying to produce a simple drpc stream that does what I
want. Logically I think I want this:
tridentTopology .newDRPCStream("crmc", null) .each(new
Fields("args"), new Fn1(), new Fields("B")) .each(new
Fields("args","B"), new Fn2(), new Fields("args", "C")) .each(new
Fields("args","C"), new Fn3(), new Fields("D"));
However I get a runtime exception for duplicated args, so I renamed the args to
give them all unique names, like so:
tridentTopology .newDRPCStream("crmc", null) .each(new
Fields("args"), new NormalizeFn(), new Fields("dpc")) .each(new
Fields("args2","B"), new Fn2(), new Fields("args3", "C")) .each(new
Fields("args3","C"), new Fn3(), new Fields("D"));
This works (sort of). The problem is I end up with output stream of lots of
duplicate args. I don't want the original "args" emitted in the output stream
at all. But if I emit D I still see args2,args3,D in the output.
All I am trying to do is1)make args available to all 3 named functions,
and2)supply additional arg B as input to Fn2, and3)supply additional arg C as
input to Fn3 which outputs D and D only as result
I've read that I don't need to define new "args" as they are passed to all
functions. However if I try to access tuple.getString(1) in Fn2 I get an
ArrayOutOfBounds unless I am explicityly passing in 2 named parameters.
So I'm really confused as to how best to define this topology. Any help
appreciated.