Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
provide any insight or advice, that would be helpful!

Thanks again.

Best,

Aaron Levin

On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronle...@stripe.com> wrote:

> Hey,
>
> Not sure how convo threading works on this list, so in case the folks CC'd
> missed my other response, here's some more info:
>
> First, I appreciate everyone's help! Thank you!
>
> I wrote several wrappers to try and debug this, including one which is an
> exact copy of `InputFormatSourceFunction` which also failed. They all
> failed with the same error I detail above. I'll post two of them below.
> They all extended `RichParallelSourceFunction` and, as far as I could tell,
> were properly initialized (though I may have missed something!).
> Additionally, for the two below, if I change `extends
> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
> I no longer receive the exception. This is what led me to believe the
> source of the issue was casting and how I found the line of code where the
> stream graph is given the input format.
>
> Quick explanation of the wrappers:
> 1. `WrappedInputFormat` does a basic wrap around
> `InputFormatSourceFunction` and delegates all methods to the underlying
> `InputFormatSourceFunction`
> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
> `InputFormatSourceFunction` source.
> 3. They're being used in a test which looks vaguely like:
> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str
> ing]]))).javaStream).asScala.toSeq`
>
> class WrappedInputFormat[A](
>   inputFormat: InputFormatSourceFunction[A]
> )(
>   implicit typeInfo: TypeInformation[A]
> ) extends RichParallelSourceFunction[A] {
>
>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
> = {
>     inputFormat.run(sourceContext)
>   }
>   override def setRuntimeContext(t: RuntimeContext): Unit = {
>     inputFormat.setRuntimeContext(t)
>   }
>   override def equals(obj: scala.Any) = {
>     inputFormat.equals(obj)
>   }
>   override def hashCode() = { inputFormat.hashCode() }
>   override def toString = { inputFormat.toString }
>   override def getRuntimeContext(): RuntimeContext = {
> inputFormat.getRuntimeContext }
>   override def getIterationRuntimeContext = {
> inputFormat.getIterationRuntimeContext }
>   override def open(parameters: Configuration): Unit = {
>     inputFormat.open(parameters)
>   }
>   override def cancel(): Unit = {
>     inputFormat.cancel()
>   }
>   override def close(): Unit = {
>     inputFormat.close()
>   }
> }
>
> And the other one:
>
> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
> InputSplit], val typeInfo: TypeInformation[A]) extends
> RichParallelSourceFunction[A] {
>
>   @transient private var provider: InputSplitProvider = _
>   @transient private var serializer: TypeSerializer[A] = _
>   @transient private var splitIterator: Iterator[InputSplit] = _
>   private var isRunning: Boolean = _
>
>   override def open(parameters: Configuration): Unit = {
>     val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>       format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
>     }
>     format.configure(parameters)
>
>     provider = context.getInputSplitProvider
>     serializer = typeInfo.createSerializer(getR
> untimeContext.getExecutionConfig)
>     splitIterator = getInputSplits()
>     isRunning = splitIterator.hasNext
>   }
>
>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
> = {
>     if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
>       format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
>     }
>
>     var nextElement: A = serializer.createInstance()
>     try {
>       while (isRunning) {
>         format.open(splitIterator.next())
>         while (isRunning && !format.reachedEnd()) {
>           nextElement = format.nextRecord(nextElement)
>           if (nextElement != null) {
>             sourceContext.collect(nextElement)
>           } else {
>             break
>           }
>           format.close()
>           if (isRunning) {
>             isRunning = splitIterator.hasNext
>           }
>         }
>       }
>     } finally {
>
>       format.close()
>       if (format.isInstanceOf[RichInputFormat[_,_]]) {
>         format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>       }
>       isRunning = false
>     }
>   }
>
>   override def cancel(): Unit = {
>     isRunning = false
>   }
>
>   override def close(): Unit = {
>     format.close()
>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>       format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>     }
>   }
>
>   private def getInputSplits(): Iterator[InputSplit] = {
>     new Iterator[InputSplit] {
>       private var nextSplit: InputSplit = _
>       private var exhausted: Boolean = _
>
>       override def hasNext: Boolean = {
>         if(exhausted) { return false }
>         if(nextSplit != null) { return true }
>         var split: InputSplit = null
>
>         try {
>           split = provider.getNextInputSplit(get
> RuntimeContext.getUserCodeClassLoader)
>         } catch {
>           case e: InputSplitProviderException =>
>             throw new RuntimeException("No InputSplit Provider", e)
>         }
>
>         if(split != null) {
>           nextSplit = split
>           true
>         } else {
>           exhausted = true
>           false
>         }
>       }
>
>       override def next(): InputSplit = {
>         if(nextSplit == null && !hasNext) {
>           throw new NoSuchElementException()
>         }
>         val tmp: InputSplit = nextSplit
>         nextSplit = null
>         tmp
>       }
>
>     }
>   }
> }
>
>
> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi Aaron,
>>
>> Could you share the code of you custom function?
>>
>> I am also adding Aljosha and Kostas to cc, who should be more helpful on
>> that topic.
>>
>> Best,
>>
>> Dawid
>> On 19/10/2018 20:06, Aaron Levin wrote:
>>
>> Hi,
>>
>> I'm writing a custom `SourceFunction` which wraps an underlying
>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
>> stream (via `env.addSource` and a subsequent sink) I get errors related to
>> the `InputSplitAssigner` not being initialized for a particular vertex ID.
>> Full error here[1].
>>
>> I believe the underlying error is related to this[0] call to `instanceof
>> InputFormatSourceFunction`.
>>
>> *My questions*:
>>
>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
>> Am I missing a chunk of the API covering this?
>> 2. is the error I'm experience related to that casting call? If so, would
>> ya'll be open to a PR which adds an interface one can extend which will set
>> the input format in the stream graph? Or is there a preferred way of
>> achieving this?
>>
>> Thanks!
>>
>> Aaron Levin
>>
>> [0] https://github.com/apache/flink/blob/release-1.6/flink-s
>> treaming-java/src/main/java/org/apache/flink/streaming/api/
>> graph/StreamGraphGenerator.java#L480
>> [1]
>> java.lang.RuntimeException: Could not retrieve next input split.
>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>> ourceFunction.open(InputFormatSourceFunction.java:71)
>>     at REDACTED
>>     at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:102)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:424)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:290)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: 
>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>> Requesting the next input split failed.
>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>> der.getNextInputSplit(RpcInputSplitProvider.java:69)
>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>>     ... 8 more
>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
>> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
>>     at java.util.concurrent.CompletableFuture.reportGet(Completable
>> Future.java:357)
>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture
>> .java:1915)
>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>> der.getNextInputSplit(RpcInputSplitProvider.java:61)
>>     ... 9 more
>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
>> cbc357ccb763df2852fee8c4fc7d55f2
>>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInpu
>> tSplit(JobMaster.java:575)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>> cation(AkkaRpcActor.java:247)
>> ...
>>
>>
>

Reply via email to