Hey Friends! Last ping and I'll move this over to a ticket. If anyone can provide any insight or advice, that would be helpful!
Thanks again. Best, Aaron Levin On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronle...@stripe.com> wrote: > Hey, > > Not sure how convo threading works on this list, so in case the folks CC'd > missed my other response, here's some more info: > > First, I appreciate everyone's help! Thank you! > > I wrote several wrappers to try and debug this, including one which is an > exact copy of `InputFormatSourceFunction` which also failed. They all > failed with the same error I detail above. I'll post two of them below. > They all extended `RichParallelSourceFunction` and, as far as I could tell, > were properly initialized (though I may have missed something!). > Additionally, for the two below, if I change `extends > RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`, > I no longer receive the exception. This is what led me to believe the > source of the issue was casting and how I found the line of code where the > stream graph is given the input format. > > Quick explanation of the wrappers: > 1. `WrappedInputFormat` does a basic wrap around > `InputFormatSourceFunction` and delegates all methods to the underlying > `InputFormatSourceFunction` > 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the > `InputFormatSourceFunction` source. > 3. They're being used in a test which looks vaguely like: > `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new > InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str > ing]]))).javaStream).asScala.toSeq` > > class WrappedInputFormat[A]( > inputFormat: InputFormatSourceFunction[A] > )( > implicit typeInfo: TypeInformation[A] > ) extends RichParallelSourceFunction[A] { > > override def run(sourceContext: SourceFunction.SourceContext[A]): Unit > = { > inputFormat.run(sourceContext) > } > override def setRuntimeContext(t: RuntimeContext): Unit = { > inputFormat.setRuntimeContext(t) > } > override def equals(obj: scala.Any) = { > inputFormat.equals(obj) > } > override def hashCode() = { inputFormat.hashCode() } > override def toString = { inputFormat.toString } > override def getRuntimeContext(): RuntimeContext = { > inputFormat.getRuntimeContext } > override def getIterationRuntimeContext = { > inputFormat.getIterationRuntimeContext } > override def open(parameters: Configuration): Unit = { > inputFormat.open(parameters) > } > override def cancel(): Unit = { > inputFormat.cancel() > } > override def close(): Unit = { > inputFormat.close() > } > } > > And the other one: > > class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, > InputSplit], val typeInfo: TypeInformation[A]) extends > RichParallelSourceFunction[A] { > > @transient private var provider: InputSplitProvider = _ > @transient private var serializer: TypeSerializer[A] = _ > @transient private var splitIterator: Iterator[InputSplit] = _ > private var isRunning: Boolean = _ > > override def open(parameters: Configuration): Unit = { > val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext] > if(format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context) > } > format.configure(parameters) > > provider = context.getInputSplitProvider > serializer = typeInfo.createSerializer(getR > untimeContext.getExecutionConfig) > splitIterator = getInputSplits() > isRunning = splitIterator.hasNext > } > > override def run(sourceContext: SourceFunction.SourceContext[A]): Unit > = { > if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].openInputFormat() > } > > var nextElement: A = serializer.createInstance() > try { > while (isRunning) { > format.open(splitIterator.next()) > while (isRunning && !format.reachedEnd()) { > nextElement = format.nextRecord(nextElement) > if (nextElement != null) { > sourceContext.collect(nextElement) > } else { > break > } > format.close() > if (isRunning) { > isRunning = splitIterator.hasNext > } > } > } > } finally { > > format.close() > if (format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() > } > isRunning = false > } > } > > override def cancel(): Unit = { > isRunning = false > } > > override def close(): Unit = { > format.close() > if(format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() > } > } > > private def getInputSplits(): Iterator[InputSplit] = { > new Iterator[InputSplit] { > private var nextSplit: InputSplit = _ > private var exhausted: Boolean = _ > > override def hasNext: Boolean = { > if(exhausted) { return false } > if(nextSplit != null) { return true } > var split: InputSplit = null > > try { > split = provider.getNextInputSplit(get > RuntimeContext.getUserCodeClassLoader) > } catch { > case e: InputSplitProviderException => > throw new RuntimeException("No InputSplit Provider", e) > } > > if(split != null) { > nextSplit = split > true > } else { > exhausted = true > false > } > } > > override def next(): InputSplit = { > if(nextSplit == null && !hasNext) { > throw new NoSuchElementException() > } > val tmp: InputSplit = nextSplit > nextSplit = null > tmp > } > > } > } > } > > > On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi Aaron, >> >> Could you share the code of you custom function? >> >> I am also adding Aljosha and Kostas to cc, who should be more helpful on >> that topic. >> >> Best, >> >> Dawid >> On 19/10/2018 20:06, Aaron Levin wrote: >> >> Hi, >> >> I'm writing a custom `SourceFunction` which wraps an underlying >> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a >> stream (via `env.addSource` and a subsequent sink) I get errors related to >> the `InputSplitAssigner` not being initialized for a particular vertex ID. >> Full error here[1]. >> >> I believe the underlying error is related to this[0] call to `instanceof >> InputFormatSourceFunction`. >> >> *My questions*: >> >> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? >> Am I missing a chunk of the API covering this? >> 2. is the error I'm experience related to that casting call? If so, would >> ya'll be open to a PR which adds an interface one can extend which will set >> the input format in the stream graph? Or is there a preferred way of >> achieving this? >> >> Thanks! >> >> Aaron Levin >> >> [0] https://github.com/apache/flink/blob/release-1.6/flink-s >> treaming-java/src/main/java/org/apache/flink/streaming/api/ >> graph/StreamGraphGenerator.java#L480 >> [1] >> java.lang.RuntimeException: Could not retrieve next input split. >> at org.apache.flink.streaming.api.functions.source.InputFormatS >> ourceFunction$1.hasNext(InputFormatSourceFunction.java:157) >> at org.apache.flink.streaming.api.functions.source.InputFormatS >> ourceFunction.open(InputFormatSourceFunction.java:71) >> at REDACTED >> at org.apache.flink.api.common.functions.util.FunctionUtils.ope >> nFunction(FunctionUtils.java:36) >> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp >> erator.open(AbstractUdfStreamOperator.java:102) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO >> perators(StreamTask.java:424) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:290) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: >> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: >> Requesting the next input split failed. >> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi >> der.getNextInputSplit(RpcInputSplitProvider.java:69) >> at org.apache.flink.streaming.api.functions.source.InputFormatS >> ourceFunction$1.hasNext(InputFormatSourceFunction.java:155) >> ... 8 more >> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: >> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2 >> at java.util.concurrent.CompletableFuture.reportGet(Completable >> Future.java:357) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture >> .java:1915) >> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi >> der.getNextInputSplit(RpcInputSplitProvider.java:61) >> ... 9 more >> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID >> cbc357ccb763df2852fee8c4fc7d55f2 >> at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInpu >> tSplit(JobMaster.java:575) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo >> cation(AkkaRpcActor.java:247) >> ... >> >> >