Hi, I think it should be working. At least from the top of my head I do not see any reason why it shouldn't be working.
Just make sure that you are proxying all relevant methods, not only those defined in `SourceFunction`. For example `FlinkKafkaConsumer` is implementing/extending: `RichParallelSourceFunction`, `CheckpointListener`, `CheckpointedFunction` and `ResultTypeQueryable<T>`, so if you want to wrap `FlinkKafkaConsumer`, you would need to proxy all of those interfaces/calls from your `WrappingSourceFunction` to the `innerSourceFunction`. Best, Piotrek śr., 14 kwi 2021 o 11:36 Schneider, Jochen <j.schnei...@here.com> napisał(a): > Hi! > > To work around FLINK-2491 > <https://issues.apache.org/jira/browse/FLINK-2491> which causes > checkpointing issues for us I am trying to chain SourceFunctions so that > the first one never quits. The basic idea is as follows: > > class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) > extends SourceFunction[Outer] { > > override def run(outerCtx: SourceContext[Outer]): Unit = { > > outerCtx.collect(...) > > > > > > val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx) > > innerSourceFunction.run(innerCtx) > > } > > > > override def cancel() = innerSourceFunction.cancel() > > } > > Is it ok to call run() of a different SourceFunction inside of run() and > implement my own SourceContext delegating to another one? It works for a > small test running on a local Flink environment, but I am wondering if > there could be any issues doing that on production. > > Thanks, > > Jochen >