Hi!

To work around FLINK-2491<https://issues.apache.org/jira/browse/FLINK-2491> 
which causes checkpointing issues for us I am trying to chain SourceFunctions 
so that the first one never quits. The basic idea is as follows:

class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) 
extends SourceFunction[Outer] {

  override def run(outerCtx: SourceContext[Outer]): Unit = {

    outerCtx.collect(...)





    val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)

    innerSourceFunction.run(innerCtx)

  }



  override def cancel() = innerSourceFunction.cancel()

}

Is it ok to call run() of a different SourceFunction inside of run() and 
implement my own SourceContext delegating to another one? It works for a small 
test running on a local Flink environment, but I am wondering if there could be 
any issues doing that on production.

Thanks,

                Jochen

Reply via email to