Thanks Luke. I changed the pipeline structure as shown below. I could see
that this flow would work for my original issue but now while building, I
am getting "j*ava.lang.IllegalArgumentException: Type of @Element must
match the DoFn type CustomTx/Filter
A/Values/Map/ParMultiDo(Anonymous).output [PCollection]*" error. I could
not get much input from docs or in other forums on this error. Appreciate
your help.
CustomTx {
public PCollection<CustomObj> expand(KV<> input){
// Tuple Tags
final TupleTag<KV<>> tagA = new TupleTag<KV<>>(){};
final TupleTag<KV<>> tagB = new TupleTag<KV<>>(){};
PCollectionTuple tuple = input.apply(ParDo.of(new DoFn <KV<>, KV<>>(){
@ProcessElement
public void processElement(ProcessContext c) {
String str = c.sideInput(s);
if ("A".equals(str))
c.output(tagA, c.element);
else
c.output(tagB, c.element);
}
}).withSideInputs(s)
.withOutputTags(tagA, TupleTagList.of(tagB)));
// Output PCollection
PCollection<CustomObject> output;
// A Flow
output = tuple.get(tagA).setCoder(kvCoder.of(...))
.apply("Filter A", Values.create())
.apply(ParDo.of(new DoFnA()))
...
// B Flow
output = tuple.get(tagB).setCoder(KvCoder.of(...))
.apply("Filter B", Values.create())
.apply(ParDo.of(new DoFnB()))
...
return output;
}
}
class DoFnA(){
}
class DoFnB(){
}
On Tue, Jul 7, 2020 at 7:39 PM Luke Cwik <[email protected]> wrote:
> Have both DoFns A and B in the graph at the same time and instead use
> another decider DoFn that outputs to either the PCollection that goes to
> DoFn A or DoFn B based upon the contents of the side input. Graph would
> look something like:
>
> PCollectionView<Decision> -\
> PCollection<Data> -> ParDo(Decider) -outA-> PCollection<Data> ->
> ParDo(DoFnA)
> \outB-> PCollection<Data> ->
> ParDo(DoFnB)
>
> See[1] for how to create a DoFn with multiple outputs.
>
> 1:
> https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs
>
> On Tue, Jul 7, 2020 at 7:31 PM Praveen K Viswanathan <
> [email protected]> wrote:
>
>> Hello Everyone,
>>
>> Apache Beam allows conditional branching during pipeline construction
>> time, but I have to decide whether to execute DoFn A or DoFn B during run
>> time (based upon a PCollection flag).
>>
>> My DoFns A and B are inside a custom transformation class and I am
>> passing my flag as PCollectionView to the transformation class. However,
>> Beam does not wait for the actual value of the PCollectionView and decides
>> which DoFn to call during DAG preparation itself (always goes to else part)
>>
>> class CustomTx {
>> public CustomTx(flag) {
>> this.flag = flag;
>> }
>>
>> public expand {
>> if (flag)
>> DoFn A
>> else
>> DoFn B
>> }
>> }
>>
>> class DoFn A {
>> }
>>
>> class DoFn B {
>> }
>>
>> If I have a DoFn inside my transformation's expand method and pass the
>> flag as side input it gives the correct value but then, I cannot call a
>> DoFn inside a DoFn. Appreciate any pointers on the best way to approach
>> this branching case.
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>
--
Thanks,
Praveen K Viswanathan