Thanks Luke. I changed the pipeline structure as shown below. I could see
that this flow would work for my original issue but now while building, I
am getting "j*ava.lang.IllegalArgumentException: Type of @Element must
match the DoFn type CustomTx/Filter
A/Values/Map/ParMultiDo(Anonymous).output [PCollection]*" error. I could
not get much input from docs or in other forums on this error. Appreciate
your help.

CustomTx {

public PCollection<CustomObj> expand(KV<> input){

// Tuple Tags
final TupleTag<KV<>> tagA = new TupleTag<KV<>>(){};
final TupleTag<KV<>> tagB = new TupleTag<KV<>>(){};

PCollectionTuple tuple = input.apply(ParDo.of(new DoFn <KV<>, KV<>>(){
   @ProcessElement
   public void processElement(ProcessContext c) {
      String str = c.sideInput(s);
      if ("A".equals(str))
         c.output(tagA, c.element);
     else
        c.output(tagB, c.element);
  }
}).withSideInputs(s)
 .withOutputTags(tagA, TupleTagList.of(tagB)));

// Output PCollection
PCollection<CustomObject> output;

// A Flow
output = tuple.get(tagA).setCoder(kvCoder.of(...))
             .apply("Filter A", Values.create())
             .apply(ParDo.of(new DoFnA()))
             ...

// B Flow
output = tuple.get(tagB).setCoder(KvCoder.of(...))
             .apply("Filter B", Values.create())
             .apply(ParDo.of(new DoFnB()))
             ...

return output;
}
}

class DoFnA(){

}

class DoFnB(){

}

On Tue, Jul 7, 2020 at 7:39 PM Luke Cwik <[email protected]> wrote:

> Have both DoFns A and B in the graph at the same time and instead use
> another decider DoFn that outputs to either the PCollection that goes to
> DoFn A or DoFn B based upon the contents of the side input. Graph would
> look something like:
>
> PCollectionView<Decision> -\
> PCollection<Data> -> ParDo(Decider) -outA-> PCollection<Data> ->
> ParDo(DoFnA)
>                                     \outB-> PCollection<Data> ->
> ParDo(DoFnB)
>
> See[1] for how to create a DoFn with multiple outputs.
>
> 1:
> https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs
>
> On Tue, Jul 7, 2020 at 7:31 PM Praveen K Viswanathan <
> [email protected]> wrote:
>
>> Hello Everyone,
>>
>> Apache Beam allows conditional branching during pipeline construction
>> time, but I have to decide whether to execute DoFn A or DoFn B during run
>> time (based upon a PCollection flag).
>>
>> My DoFns A and B are inside a custom transformation class and I am
>> passing my flag as PCollectionView to the transformation class. However,
>> Beam does not wait for the actual value of the PCollectionView and decides
>> which DoFn to call during DAG preparation itself (always goes to else part)
>>
>> class CustomTx {
>>    public CustomTx(flag) {
>>     this.flag = flag;
>>    }
>>
>>  public expand {
>>   if (flag)
>>      DoFn A
>>   else
>>      DoFn B
>>   }
>> }
>>
>> class DoFn A {
>> }
>>
>> class DoFn B {
>> }
>>
>> If I have a DoFn inside my transformation's expand method and pass the
>> flag as side input it gives the correct value but then, I cannot call a
>> DoFn inside a DoFn. Appreciate any pointers on the best way to approach
>> this branching case.
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan

Reply via email to