Hi Team,
I am running my beam pipeline using Flink runner and trying to consumer side
input which are records from RDS.
1. Side Input Generation :
PCollectionView<JdbcIO.Read<EventSet>> eventSet = pipeline
.apply(GenerateSequence.from(0).withRate(1,
Duration.standardMinutes(5L)))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply("Read from RDS", ParDo.of(new
DataSource(runtimeConfig.getConfig("db"))))
.apply(View.asSingleton());
2 . Side Input Consumption :
pipeline.apply("Display Event", ParDo.of(new
PrintRecord(eventSet)).withSideInputs(eventSet))
3. Accessing records :
class PrintRecord extends DoFn<String, String>{
@ProcessElement
public void processElement(ProcessContext context) {
JdbcIO.Read<EventSet> event = context.sideInput(sideInput);
event. ? // can’t read event object here, it’s a POJO
}
}
Thanks,
Julius