Hi Team,


I am running my beam pipeline using Flink runner and trying to consumer side 
input which are records from RDS.



1. Side Input Generation :



PCollectionView<JdbcIO.Read<EventSet>> eventSet = pipeline

       .apply(GenerateSequence.from(0).withRate(1, 
Duration.standardMinutes(5L)))

       .apply(

           Window.<Long>into(new GlobalWindows())

               
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))

               .discardingFiredPanes())

       .apply("Read from RDS", ParDo.of(new 
DataSource(runtimeConfig.getConfig("db"))))
       .apply(View.asSingleton());


2 . Side Input Consumption :



        pipeline.apply("Display Event", ParDo.of(new 
PrintRecord(eventSet)).withSideInputs(eventSet))



3. Accessing records :



         class PrintRecord extends DoFn<String, String>{



  @ProcessElement
  public void processElement(ProcessContext context) {
              JdbcIO.Read<EventSet> event = context.sideInput(sideInput);

            event. ?  // can’t read event object here, it’s a POJO

  }

}





Thanks,

Julius





Reply via email to