Hey,

First, I appreciate everyone's help! Thank you!

I wrote several wrappers to try and debug this, including one which is an
exact copy of `InputFormatSourceFunction` which also failed. They all
failed with the same error I detail above. I'll post two of them below.
They all extended `RichParallelSourceFunction` and, as far as I could tell,
were properly initialized (though I may have missed something!).
Additionally, for the two below, if I change `extends
RichParallelSourceFunction` to `extends
InputFormatSourceFunction(...,...)`, I no longer receive the exception.
This is what led me to believe the source of the issue was casting and how
I found the line of code where the stream graph is given the input format.

Quick explanation of the wrappers:
1. `WrappedInputFormat` does a basic wrap around
`InputFormatSourceFunction` and delegates all methods to the underlying
`InputFormatSourceFunction`
2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
`InputFormatSourceFunction` source.
3. They're being used in a test which looks vaguely like:
`DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
InputFormatSourceFunction[String](source,
implicitly[TypeInformation[String]]))).javaStream).asScala.toSeq`

class WrappedInputFormat[A](
  inputFormat: InputFormatSourceFunction[A]
)(
  implicit typeInfo: TypeInformation[A]
) extends RichParallelSourceFunction[A] {

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
    inputFormat.run(sourceContext)
  }
  override def setRuntimeContext(t: RuntimeContext): Unit = {
    inputFormat.setRuntimeContext(t)
  }
  override def equals(obj: scala.Any) = {
    inputFormat.equals(obj)
  }
  override def hashCode() = { inputFormat.hashCode() }
  override def toString = { inputFormat.toString }
  override def getRuntimeContext(): RuntimeContext = {
inputFormat.getRuntimeContext }
  override def getIterationRuntimeContext = {
inputFormat.getIterationRuntimeContext }
  override def open(parameters: Configuration): Unit = {
    inputFormat.open(parameters)
  }
  override def cancel(): Unit = {
    inputFormat.cancel()
  }
  override def close(): Unit = {
    inputFormat.close()
  }
}

And the other one:

class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
InputSplit], val typeInfo: TypeInformation[A]) extends
RichParallelSourceFunction[A] {

  @transient private var provider: InputSplitProvider = _
  @transient private var serializer: TypeSerializer[A] = _
  @transient private var splitIterator: Iterator[InputSplit] = _
  private var isRunning: Boolean = _

  override def open(parameters: Configuration): Unit = {
    val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
    if(format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
    }
    format.configure(parameters)

    provider = context.getInputSplitProvider
    serializer =
typeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
    splitIterator = getInputSplits()
    isRunning = splitIterator.hasNext
  }

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
    if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
    }

    var nextElement: A = serializer.createInstance()
    try {
      while (isRunning) {
        format.open(splitIterator.next())
        while (isRunning && !format.reachedEnd()) {
          nextElement = format.nextRecord(nextElement)
          if (nextElement != null) {
            sourceContext.collect(nextElement)
          } else {
            break
          }
          format.close()
          if (isRunning) {
            isRunning = splitIterator.hasNext
          }
        }
      }
    } finally {

      format.close()
      if (format.isInstanceOf[RichInputFormat[_,_]]) {
        format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
      }
      isRunning = false
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }

  override def close(): Unit = {
    format.close()
    if(format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
    }
  }

  private def getInputSplits(): Iterator[InputSplit] = {
    new Iterator[InputSplit] {
      private var nextSplit: InputSplit = _
      private var exhausted: Boolean = _

      override def hasNext: Boolean = {
        if(exhausted) { return false }
        if(nextSplit != null) { return true }
        var split: InputSplit = null

        try {
          split =
provider.getNextInputSplit(getRuntimeContext.getUserCodeClassLoader)
        } catch {
          case e: InputSplitProviderException =>
            throw new RuntimeException("No InputSplit Provider", e)
        }

        if(split != null) {
          nextSplit = split
          true
        } else {
          exhausted = true
          false
        }
      }

      override def next(): InputSplit = {
        if(nextSplit == null && !hasNext) {
          throw new NoSuchElementException()
        }
        val tmp: InputSplit = nextSplit
        nextSplit = null
        tmp
      }

    }
  }
}

Best,

Aaron Levin

On Wed, Oct 24, 2018 at 8:00 AM, Kien Truong <duckientru...@gmail.com>
wrote:

> Hi,
>
> Since InputFormatSourceFunction is a subclass of
> RichParallelSourceFunction, your wrapper should also extend this class.
>
> In addition, remember to overwrite the methods defined in the
> AbstractRichFunction interface and
>
> proxy the call to the underlying InputFormatSourceFunction, in order to
> initialize the underlying source correctly.
>
>
> Best regards,
>
> Kien
>
>
> On 10/20/2018 1:06 AM, Aaron Levin wrote:
>
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
> stream (via `env.addSource` and a subsequent sink) I get errors related to
> the `InputSplitAssigner` not being initialized for a particular vertex ID.
> Full error here[1].
>
> I believe the underlying error is related to this[0] call to `instanceof
> InputFormatSourceFunction`.
>
> *My questions*:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
> Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so, would
> ya'll be open to a PR which adds an interface one can extend which will set
> the input format in the stream graph? Or is there a preferred way of
> achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] https://github.com/apache/flink/blob/release-1.6/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/api/graph/
> StreamGraphGenerator.java#L480
> [1]
> java.lang.RuntimeException: Could not retrieve next input split.
>     at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>     at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
>     at REDACTED
>     at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:424)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:290)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
> Requesting the next input split failed.
>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:69)
>     at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>     ... 8 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
>     at java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
>     at java.util.concurrent.CompletableFuture.get(
> CompletableFuture.java:1915)
>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:61)
>     ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(
> JobMaster.java:575)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:247)
> ...
>
>

Reply via email to