[ 
https://issues.apache.org/jira/browse/FLINK-9149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9149.
-----------------------------------
    Resolution: Duplicate

> The creation of the ExecutionPlan fails when you combine a SideOutput with an 
> SplitStream.
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9149
>                 URL: https://issues.apache.org/jira/browse/FLINK-9149
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.2
>            Reporter: Fred Teunissen
>            Priority: Minor
>
> The creation of the ExecutionPlan fails when you combine a SideOutput with an 
> SplitStream.
> Code:
> {code:java}
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.util.Collector
> object SideOutputTest {
>   def main(args: Array[String]) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setParallelism(3)
>     val inputLongStream = env.generateSequence(0L, 100L)
>     val filteredLongStream = inputLongStream.process(new LogFilterFunction)
>     val splittedLongStream = filteredLongStream.split(l => 
> Seq((l%4).toString))
>     // workaround
>     //  val splittedLongStream = filteredLongStream.map(x=>x).split(l => 
> Seq((l%4).toString))
>     val oneStream = splittedLongStream.select("1").map(l => l+1000)
>     val twoStream = splittedLongStream.select("2").map(l => l+2000)
>     val threeStream = splittedLongStream.select("3").map(l => l+3000)
>     oneStream.union(twoStream, threeStream).print()
>     val loggingStream = filteredLongStream.getSideOutput(loggingOutputTag)
>     loggingStream.print()
>     println(env.getExecutionPlan)
>     env.execute()
>   }
>   val loggingOutputTag = OutputTag[String]("loggingStream")
> }
> class LogFilterFunction extends ProcessFunction[Long, Long] {
>   override def processElement(value: Long, ctx: ProcessFunction[Long, 
> Long]#Context, out: Collector[Long]): Unit = {
>     if (value % 4 == 0) {
>       ctx.output(SideOutputTest.loggingOutputTag, s"LogFilterFunction logging 
> for $value")
>     }    else {
>       out.collect(value)
>     }
>   }
> }
> {code}
> Exception:
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:331)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:346)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
> Workaround:
> add a redundant *map(x=>x)* before the *split* function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to