I'm scratching my head trying to produce a simple drpc stream that does
what I want. Logically I think I want this:
tridentTopology
.newDRPCStream("crmc", null)
.each(new Fields("args"), new Fn1(), new Fields("B"))
.each(new Fields("args","B"), new Fn2(), new Fields("args", "C"))
.each(new Fields("args","C"), new Fn3(), new Fields("D"));
However I get a runtime exception for duplicated args, so I renamed the
args to give them all unique names, like so:
tridentTopology
.newDRPCStream("crmc", null)
.each(new Fields("args"), new NormalizeFn(), new Fields("dpc"))
.each(new Fields("args2","B"), new Fn2(), new Fields("args3", "C"))
.each(new Fields("args3","C"), new Fn3(), new Fields("D"));
This works (sort of). The problem is I end up with output stream of lots of
duplicate args. I don't want the original "args" emitted in the output
stream at all. But if I emit D I still see args2,args3,D in the output.
All I am trying to do is
1)make args available to all 3 named functions, and
2)supply additional arg B as input to Fn2, and
3)supply additional arg C as input to Fn3 which outputs D and D only as
result
I've read that I don't need to define new "args" as they are passed to all
functions. However if I try to access tuple.getString(1) in Fn2 I get an
ArrayOutOfBounds unless I am explicityly passing in 2 named parameters.
So I'm really confused as to how best to define this topology. Any help
appreciated.