Hi All - I am facing an issue while using *side-input*.

*What am I doing:*
>From my main program, I am calling a custom PTransform with a
PCollectionView as parameter. Inside custom PTransform, I am passing the
PCollectionView as a side-input to a DoFn.

*Issue:*
When I run the pipeline, I am expecting the log statement inside my DoFn's
processElement to get executed but it is not getting logged. If I remove
the side-input to my DoFn then the log is getting printed. I am suspecting
whether it could be related to windowing/execution order or my side-input
somehow being empty. Appreciate if you can clarify on what is going wrong
here.

*Code Structure:*


*Main Program:* PCollectionTuple tuple = input.apply(new FirstTx());

 // Get two tuple tags from first transformation
 PCollection1 = tuple.get(tag1).setCoder(...);
 PCollection2 = tuple.get(tag2).setCoder(...);

 // Converting PCollection1 to PCollectionView to use as a side-input
 // Note: I need to introduce a global window here as my source is
unbounded and when we use View.asList() it does GroupByKey internally
          which inturn demands a window
 PView = PCollection1.apply(Window.<KV<String, CustomObject>>into(new
GlobalWindows()) // Everything into global window.

   .triggering(Repeatedly.forever(DefaultTrigger.of()))

   .discardingFiredPanes()).apply(Values.create()).apply(View.asList());

// Pass PCollectionView to SecondTx as a param
PCollection3 = PCollection2.apply(new SecondTx(PView));

*SecondTx:*
Inside my SecondTx, I am getting the PView from constructor (this.PView =
PView) and calling a DoFn

public PCollection<CustomObject> expand(PCollection <KV <String, KV
<String, CustomObject>>> input) {
input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView));
...
}

// DoFn
class UpdateFn extends DoFn<Map<String, Map<String, Map<String, String>>>,
CustomObject> {
    @ProcessElement
    public void processElement(@Element Map<String, Map<String, Map<String,
String>>> input, OutputReceiver<CustomObject> out) {
       * Log.of("UpdateFn " + input);*
        out.output(new CustomObject());
    }
}

-- 
Thanks,
Praveen K Viswanathan

Reply via email to