Hi,

I think it should be working. At least from the top of my head I do not see
any reason why it shouldn't be working.

Just make sure that you are proxying all relevant methods, not only those
defined in `SourceFunction`. For example `FlinkKafkaConsumer` is
implementing/extending: `RichParallelSourceFunction`,
`CheckpointListener`, `CheckpointedFunction` and  `ResultTypeQueryable<T>`,
so if you want to wrap `FlinkKafkaConsumer`, you would need to proxy all of
those interfaces/calls from your `WrappingSourceFunction` to the
`innerSourceFunction`.

Best,
Piotrek

śr., 14 kwi 2021 o 11:36 Schneider, Jochen <j.schnei...@here.com>
napisał(a):

> Hi!
>
> To work around FLINK-2491
> <https://issues.apache.org/jira/browse/FLINK-2491> which causes
> checkpointing issues for us I am trying to chain SourceFunctions so that
> the first one never quits. The basic idea is as follows:
>
> class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) 
> extends SourceFunction[Outer] {
>
>   override def run(outerCtx: SourceContext[Outer]): Unit = {
>
>     outerCtx.collect(...)
>
>
>
>
>
>     val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)
>
>     innerSourceFunction.run(innerCtx)
>
>   }
>
>
>
>   override def cancel() = innerSourceFunction.cancel()
>
> }
>
> Is it ok to call run() of a different SourceFunction inside of run() and
> implement my own SourceContext delegating to another one? It works for a
> small test running on a local Flink environment, but I am wondering if
> there could be any issues doing that on production.
>
> Thanks,
>
>                 Jochen
>

Reply via email to