Hi Grant,

I think the fluent API is tripping you up a bit, but what you are trying to do 
is possible.

Stream rootStream = …;

Stream streamA = rootStream.each(new Fields(…), filterA);
streamA.localOrShuffle()
             .each(new Fields(…), eachA, new Fields(…));

Stream streamB = rootStream.each(new Fields(…), new Negate(filterA));
streamB.localOrShuffle()
             .each(new Fields(…), eachB, new Fields(…));


In streamB I used the Negate filter — all that does is invert an existing 
filter. That way you don’t have to write two filters that are just the opposite 
of one another.

-Taylor


> On Sep 24, 2015, at 12:35 PM, Grant Overby (groverby) <grove...@cisco.com> 
> wrote:
> 
> I have a trident topology where a portion of the DAG looks like this:
> 
> 
>                   partition - - - filterA - - - eachA
>                 /
> stream - - - -
>                 \
>                   partition - - - filterB - - - eachB
> 
> 
> 
> I believe with the above DAG, each tuple will be sent down both sides of the 
> fork. Approximately half will be filtered out by each filter. Thus, forcing 
> half my tuples to cross the partition only to get dropped afterwards.
> 
> Can the DAG be constructed like the following. If so, how do I define the 
> topology?
> 
>                   filterA - - - partition - - - eachA
>                 /
> stream - - - -
>                 \
>                   filterB - - - partition - - - eachB
> 
> 
> 
> 
> The following doesn’t appear to work. filterA and filterB don’t receive 
> tuples.
> 
> Stream stream = …;
> 
> stream.parallelismHint(N);
> 
> stream
> .each(new Fields(…), filterA)
> .localOrShuffle()
> .each(new Fields(…), eachA, new Fields(…))
> ;
> 
> stream
> .each(new Fields(…), filterB)
> .localOrShuffle()
> .each(new Fields(…), eachB, new Fields(…))
> ;
> 
> 
> 
> 
> 
> 
> 
> Grant Overby
> Software Engineer
> Cisco.com <http://www.cisco.com/>
> grove...@cisco.com <mailto:grove...@cisco.com>
> Mobile: 865 724 4910
> 
> 
>  Think before you print.
> This email may contain confidential and privileged material for the sole use 
> of the intended recipient. Any review, use, distribution or disclosure by 
> others is strictly prohibited. If you are not the intended recipient (or 
> authorized to receive for the recipient), please contact the sender by reply 
> email and delete all copies of this message.
> 
> Please click here 
> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for 
> Company Registration Information.
> 
> 
> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to