Did you see my updated topology. I only removed"args" from the output fields of Fn2(). I left them as the input fields to Fn2 and Fn3
- Bobby On Thursday, August 3, 2017, 10:55:04 AM CDT, J.R. Pauley <[email protected]> wrote: Bobby, sorry if I am misreading you, but if I don't explicitly declare "args" then Fn2 execute has only the single parameter "B" tuple.getString(0) as its input. I am not able to call tuple.getString(1) and tuple.size() =1, not 2. So below can't be right. How do I access "args" in Fn2? tridentTopology .newDRPCStream("crmc", null) .each(new Fields("args"), new Fn1(), new Fields("B")) .each(new Fields("B"), new Fn2(), new Fields("C")) .each(new Fields("C"), new Fn3(), new Fields("D")); On Thu, Aug 3, 2017 at 11:36 AM, Bobby Evans <[email protected]> wrote: With trident the original args going into the each never go away, so you don't have to output them. tridentTopology .newDRPCStream("crmc", null) .each(new Fields("args"), new Fn1(), new Fields("B")) .each(new Fields("args","B"), new Fn2(), new Fields("C")) .each(new Fields("args","C"), new Fn3(), new Fields("D")); By removing the "args" being output from Fn2() it should work for you. I know it is a bit odd but that is how it works. - Bobby On Thursday, August 3, 2017, 10:32:17 AM CDT, J.R. Pauley <[email protected]> wrote: I'm scratching my head trying to produce a simple drpc stream that does what I want. Logically I think I want this: tridentTopology .newDRPCStream("crmc", null) .each(new Fields("args"), new Fn1(), new Fields("B")) .each(new Fields("args","B"), new Fn2(), new Fields("args", "C")) .each(new Fields("args","C"), new Fn3(), new Fields("D")); However I get a runtime exception for duplicated args, so I renamed the args to give them all unique names, like so: tridentTopology .newDRPCStream("crmc", null) .each(new Fields("args"), new NormalizeFn(), new Fields("dpc")) .each(new Fields("args2","B"), new Fn2(), new Fields("args3", "C")) .each(new Fields("args3","C"), new Fn3(), new Fields("D")); This works (sort of). The problem is I end up with output stream of lots of duplicate args. I don't want the original "args" emitted in the output stream at all. But if I emit D I still see args2,args3,D in the output. All I am trying to do is1)make args available to all 3 named functions, and2)supply additional arg B as input to Fn2, and3)supply additional arg C as input to Fn3 which outputs D and D only as result I've read that I don't need to define new "args" as they are passed to all functions. However if I try to access tuple.getString(1) in Fn2 I get an ArrayOutOfBounds unless I am explicityly passing in 2 named parameters. So I'm really confused as to how best to define this topology. Any help appreciated.
