Hi Grant, I think the fluent API is tripping you up a bit, but what you are trying to do is possible.
Stream rootStream = …;
Stream streamA = rootStream.each(new Fields(…), filterA);
streamA.localOrShuffle()
.each(new Fields(…), eachA, new Fields(…));
Stream streamB = rootStream.each(new Fields(…), new Negate(filterA));
streamB.localOrShuffle()
.each(new Fields(…), eachB, new Fields(…));
In streamB I used the Negate filter — all that does is invert an existing
filter. That way you don’t have to write two filters that are just the opposite
of one another.
-Taylor
> On Sep 24, 2015, at 12:35 PM, Grant Overby (groverby) <[email protected]>
> wrote:
>
> I have a trident topology where a portion of the DAG looks like this:
>
>
> partition - - - filterA - - - eachA
> /
> stream - - - -
> \
> partition - - - filterB - - - eachB
>
>
>
> I believe with the above DAG, each tuple will be sent down both sides of the
> fork. Approximately half will be filtered out by each filter. Thus, forcing
> half my tuples to cross the partition only to get dropped afterwards.
>
> Can the DAG be constructed like the following. If so, how do I define the
> topology?
>
> filterA - - - partition - - - eachA
> /
> stream - - - -
> \
> filterB - - - partition - - - eachB
>
>
>
>
> The following doesn’t appear to work. filterA and filterB don’t receive
> tuples.
>
> Stream stream = …;
>
> stream.parallelismHint(N);
>
> stream
> .each(new Fields(…), filterA)
> .localOrShuffle()
> .each(new Fields(…), eachA, new Fields(…))
> ;
>
> stream
> .each(new Fields(…), filterB)
> .localOrShuffle()
> .each(new Fields(…), eachB, new Fields(…))
> ;
>
>
>
>
>
>
>
> Grant Overby
> Software Engineer
> Cisco.com <http://www.cisco.com/>
> [email protected] <mailto:[email protected]>
> Mobile: 865 724 4910
>
>
> Think before you print.
> This email may contain confidential and privileged material for the sole use
> of the intended recipient. Any review, use, distribution or disclosure by
> others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by reply
> email and delete all copies of this message.
>
> Please click here
> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for
> Company Registration Information.
>
>
>
>
signature.asc
Description: Message signed with OpenPGP using GPGMail
