No worries, the error was due to the DoFn element not defined as an
instance of SchemaCoder.

On Tue, Jul 7, 2020 at 10:46 PM Praveen K Viswanathan <
[email protected]> wrote:

> Thanks Luke. I changed the pipeline structure as shown below. I could see
> that this flow would work for my original issue but now while building, I
> am getting "j*ava.lang.IllegalArgumentException: Type of @Element must
> match the DoFn type CustomTx/Filter
> A/Values/Map/ParMultiDo(Anonymous).output [PCollection]*" error. I could
> not get much input from docs or in other forums on this error. Appreciate
> your help.
>
> CustomTx {
>
> public PCollection<CustomObj> expand(KV<> input){
>
> // Tuple Tags
> final TupleTag<KV<>> tagA = new TupleTag<KV<>>(){};
> final TupleTag<KV<>> tagB = new TupleTag<KV<>>(){};
>
> PCollectionTuple tuple = input.apply(ParDo.of(new DoFn <KV<>, KV<>>(){
>    @ProcessElement
>    public void processElement(ProcessContext c) {
>       String str = c.sideInput(s);
>       if ("A".equals(str))
>          c.output(tagA, c.element);
>      else
>         c.output(tagB, c.element);
>   }
> }).withSideInputs(s)
>  .withOutputTags(tagA, TupleTagList.of(tagB)));
>
> // Output PCollection
> PCollection<CustomObject> output;
>
> // A Flow
> output = tuple.get(tagA).setCoder(kvCoder.of(...))
>              .apply("Filter A", Values.create())
>              .apply(ParDo.of(new DoFnA()))
>              ...
>
> // B Flow
> output = tuple.get(tagB).setCoder(KvCoder.of(...))
>              .apply("Filter B", Values.create())
>              .apply(ParDo.of(new DoFnB()))
>              ...
>
> return output;
> }
> }
>
> class DoFnA(){
>
> }
>
> class DoFnB(){
>
> }
>
> On Tue, Jul 7, 2020 at 7:39 PM Luke Cwik <[email protected]> wrote:
>
>> Have both DoFns A and B in the graph at the same time and instead use
>> another decider DoFn that outputs to either the PCollection that goes to
>> DoFn A or DoFn B based upon the contents of the side input. Graph would
>> look something like:
>>
>> PCollectionView<Decision> -\
>> PCollection<Data> -> ParDo(Decider) -outA-> PCollection<Data> ->
>> ParDo(DoFnA)
>>                                     \outB-> PCollection<Data> ->
>> ParDo(DoFnB)
>>
>> See[1] for how to create a DoFn with multiple outputs.
>>
>> 1:
>> https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs
>>
>> On Tue, Jul 7, 2020 at 7:31 PM Praveen K Viswanathan <
>> [email protected]> wrote:
>>
>>> Hello Everyone,
>>>
>>> Apache Beam allows conditional branching during pipeline construction
>>> time, but I have to decide whether to execute DoFn A or DoFn B during run
>>> time (based upon a PCollection flag).
>>>
>>> My DoFns A and B are inside a custom transformation class and I am
>>> passing my flag as PCollectionView to the transformation class. However,
>>> Beam does not wait for the actual value of the PCollectionView and decides
>>> which DoFn to call during DAG preparation itself (always goes to else part)
>>>
>>> class CustomTx {
>>>    public CustomTx(flag) {
>>>     this.flag = flag;
>>>    }
>>>
>>>  public expand {
>>>   if (flag)
>>>      DoFn A
>>>   else
>>>      DoFn B
>>>   }
>>> }
>>>
>>> class DoFn A {
>>> }
>>>
>>> class DoFn B {
>>> }
>>>
>>> If I have a DoFn inside my transformation's expand method and pass the
>>> flag as side input it gives the correct value but then, I cannot call a
>>> DoFn inside a DoFn. Appreciate any pointers on the best way to approach
>>> this branching case.
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>


-- 
Thanks,
Praveen K Viswanathan

Reply via email to