No worries, the error was due to the DoFn element not defined as an instance of SchemaCoder.
On Tue, Jul 7, 2020 at 10:46 PM Praveen K Viswanathan < [email protected]> wrote: > Thanks Luke. I changed the pipeline structure as shown below. I could see > that this flow would work for my original issue but now while building, I > am getting "j*ava.lang.IllegalArgumentException: Type of @Element must > match the DoFn type CustomTx/Filter > A/Values/Map/ParMultiDo(Anonymous).output [PCollection]*" error. I could > not get much input from docs or in other forums on this error. Appreciate > your help. > > CustomTx { > > public PCollection<CustomObj> expand(KV<> input){ > > // Tuple Tags > final TupleTag<KV<>> tagA = new TupleTag<KV<>>(){}; > final TupleTag<KV<>> tagB = new TupleTag<KV<>>(){}; > > PCollectionTuple tuple = input.apply(ParDo.of(new DoFn <KV<>, KV<>>(){ > @ProcessElement > public void processElement(ProcessContext c) { > String str = c.sideInput(s); > if ("A".equals(str)) > c.output(tagA, c.element); > else > c.output(tagB, c.element); > } > }).withSideInputs(s) > .withOutputTags(tagA, TupleTagList.of(tagB))); > > // Output PCollection > PCollection<CustomObject> output; > > // A Flow > output = tuple.get(tagA).setCoder(kvCoder.of(...)) > .apply("Filter A", Values.create()) > .apply(ParDo.of(new DoFnA())) > ... > > // B Flow > output = tuple.get(tagB).setCoder(KvCoder.of(...)) > .apply("Filter B", Values.create()) > .apply(ParDo.of(new DoFnB())) > ... > > return output; > } > } > > class DoFnA(){ > > } > > class DoFnB(){ > > } > > On Tue, Jul 7, 2020 at 7:39 PM Luke Cwik <[email protected]> wrote: > >> Have both DoFns A and B in the graph at the same time and instead use >> another decider DoFn that outputs to either the PCollection that goes to >> DoFn A or DoFn B based upon the contents of the side input. Graph would >> look something like: >> >> PCollectionView<Decision> -\ >> PCollection<Data> -> ParDo(Decider) -outA-> PCollection<Data> -> >> ParDo(DoFnA) >> \outB-> PCollection<Data> -> >> ParDo(DoFnB) >> >> See[1] for how to create a DoFn with multiple outputs. >> >> 1: >> https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs >> >> On Tue, Jul 7, 2020 at 7:31 PM Praveen K Viswanathan < >> [email protected]> wrote: >> >>> Hello Everyone, >>> >>> Apache Beam allows conditional branching during pipeline construction >>> time, but I have to decide whether to execute DoFn A or DoFn B during run >>> time (based upon a PCollection flag). >>> >>> My DoFns A and B are inside a custom transformation class and I am >>> passing my flag as PCollectionView to the transformation class. However, >>> Beam does not wait for the actual value of the PCollectionView and decides >>> which DoFn to call during DAG preparation itself (always goes to else part) >>> >>> class CustomTx { >>> public CustomTx(flag) { >>> this.flag = flag; >>> } >>> >>> public expand { >>> if (flag) >>> DoFn A >>> else >>> DoFn B >>> } >>> } >>> >>> class DoFn A { >>> } >>> >>> class DoFn B { >>> } >>> >>> If I have a DoFn inside my transformation's expand method and pass the >>> flag as side input it gives the correct value but then, I cannot call a >>> DoFn inside a DoFn. Appreciate any pointers on the best way to approach >>> this branching case. >>> >>> -- >>> Thanks, >>> Praveen K Viswanathan >>> >> > > -- > Thanks, > Praveen K Viswanathan > -- Thanks, Praveen K Viswanathan
