[ https://issues.apache.org/jira/browse/FLINK-9149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chesnay Schepler closed FLINK-9149. ----------------------------------- Resolution: Duplicate > The creation of the ExecutionPlan fails when you combine a SideOutput with an > SplitStream. > ------------------------------------------------------------------------------------------ > > Key: FLINK-9149 > URL: https://issues.apache.org/jira/browse/FLINK-9149 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.4.2 > Reporter: Fred Teunissen > Priority: Minor > > The creation of the ExecutionPlan fails when you combine a SideOutput with an > SplitStream. > Code: > {code:java} > import org.apache.flink.streaming.api.functions.ProcessFunction > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.util.Collector > object SideOutputTest { > def main(args: Array[String]) { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setParallelism(3) > val inputLongStream = env.generateSequence(0L, 100L) > val filteredLongStream = inputLongStream.process(new LogFilterFunction) > val splittedLongStream = filteredLongStream.split(l => > Seq((l%4).toString)) > // workaround > // val splittedLongStream = filteredLongStream.map(x=>x).split(l => > Seq((l%4).toString)) > val oneStream = splittedLongStream.select("1").map(l => l+1000) > val twoStream = splittedLongStream.select("2").map(l => l+2000) > val threeStream = splittedLongStream.select("3").map(l => l+3000) > oneStream.union(twoStream, threeStream).print() > val loggingStream = filteredLongStream.getSideOutput(loggingOutputTag) > loggingStream.print() > println(env.getExecutionPlan) > env.execute() > } > val loggingOutputTag = OutputTag[String]("loggingStream") > } > class LogFilterFunction extends ProcessFunction[Long, Long] { > override def processElement(value: Long, ctx: ProcessFunction[Long, > Long]#Context, out: Collector[Long]): Unit = { > if (value % 4 == 0) { > ctx.output(SideOutputTest.loggingOutputTag, s"LogFilterFunction logging > for $value") > } else { > out.collect(value) > } > } > } > {code} > Exception: > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:331) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:346) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > {noformat} > Workaround: > add a redundant *map(x=>x)* before the *split* function. -- This message was sent by Atlassian JIRA (v7.6.3#76005)