Hi Grant, I think the fluent API is tripping you up a bit, but what you are trying to do is possible.
Stream rootStream = …; Stream streamA = rootStream.each(new Fields(…), filterA); streamA.localOrShuffle() .each(new Fields(…), eachA, new Fields(…)); Stream streamB = rootStream.each(new Fields(…), new Negate(filterA)); streamB.localOrShuffle() .each(new Fields(…), eachB, new Fields(…)); In streamB I used the Negate filter — all that does is invert an existing filter. That way you don’t have to write two filters that are just the opposite of one another. -Taylor > On Sep 24, 2015, at 12:35 PM, Grant Overby (groverby) <grove...@cisco.com> > wrote: > > I have a trident topology where a portion of the DAG looks like this: > > > partition - - - filterA - - - eachA > / > stream - - - - > \ > partition - - - filterB - - - eachB > > > > I believe with the above DAG, each tuple will be sent down both sides of the > fork. Approximately half will be filtered out by each filter. Thus, forcing > half my tuples to cross the partition only to get dropped afterwards. > > Can the DAG be constructed like the following. If so, how do I define the > topology? > > filterA - - - partition - - - eachA > / > stream - - - - > \ > filterB - - - partition - - - eachB > > > > > The following doesn’t appear to work. filterA and filterB don’t receive > tuples. > > Stream stream = …; > > stream.parallelismHint(N); > > stream > .each(new Fields(…), filterA) > .localOrShuffle() > .each(new Fields(…), eachA, new Fields(…)) > ; > > stream > .each(new Fields(…), filterB) > .localOrShuffle() > .each(new Fields(…), eachB, new Fields(…)) > ; > > > > > > > > Grant Overby > Software Engineer > Cisco.com <http://www.cisco.com/> > grove...@cisco.com <mailto:grove...@cisco.com> > Mobile: 865 724 4910 > > > Think before you print. > This email may contain confidential and privileged material for the sole use > of the intended recipient. Any review, use, distribution or disclosure by > others is strictly prohibited. If you are not the intended recipient (or > authorized to receive for the recipient), please contact the sender by reply > email and delete all copies of this message. > > Please click here > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for > Company Registration Information. > > > >
signature.asc
Description: Message signed with OpenPGP using GPGMail