Hi all,

I was trying to implement a join similar to what was laid out in the flink 
forward talk Joining Infinity: Windowless Stream Processing with 
Flink<https://www.youtube.com/watch?v=UrRQYzux5L0&feature=youtu.be&list=PLDX4T_cnKjD2E_lSDcxOXED59GvXHVKR->
 and I have been running to some issues. I am running on 1.2-SNAPSHOT compiled 
for scala 2.11 and suspect this may be a regression so I am including the dev 
mailing list. When initializing the value state I receive a null pointer 
exception:

java.lang.NullPointerException
                at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:109)
                at 
com.mediamath.reporting.streaming.FlatMapper$.open(StreamingPipeline.scala:21)
                at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
                at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:154)
                at 
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:368)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:593)
                at java.lang.Thread.run(Thread.java:745)


Below is a minimum failing example:

import org.apache.flink.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {

  val buffer = ArrayBuffer.empty[Long]

  @transient var state: ValueState[String] = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    state = getRuntimeContext.getState(new 
ValueStateDescriptor[String]("state-descriptor", classOf[String], ""))
  }

  override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = {
    state.update(value)
  }

  override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
    buffer += value

    if (state.value() != "") {
      for (elem ← buffer) {
        out.collect((elem, state.value()))
      }

      buffer.clear()
    }
  }
}

object StreamingPipeline {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStateBackend(new MemoryStateBackend())

    val pipeline1 = env.generateSequence(0, 1000)

    val pipeline2 = env.fromElements("even", "odd")

    pipeline1.connect(pipeline2)
      .keyBy(
        elem ⇒ elem % 2 == 0,
        elem ⇒ elem == "even"
      ).flatMap(FlatMapper)
      .print()

    env.execute("Example")
  }
}
I also attempted retrieving the state each time I needed it:

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

object FlatMapper extends RichCoFlatMapFunction[Long, String, (Long, String)] {
  val descriptor = new ValueStateDescriptor[String]("state-descriptor", 
classOf[String], "")

  val buffer = ArrayBuffer.empty[Long]

  override def flatMap2(value: String, out: Collector[(Long, String)]): Unit = {
    getRuntimeContext.getState(descriptor).update(value)
  }

  override def flatMap1(value: Long, out: Collector[(Long, String)]): Unit = {
    buffer += value

    val state = getRuntimeContext.getState(descriptor)

    if (state.value() != "") {
      for (elem ← buffer) {
        out.collect((elem, state.value()))
      }

      buffer.clear()
    }
  }
}

object StreamingPipeline {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(30 * 1000, CheckpointingMode.EXACTLY_ONCE)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStateBackend(new MemoryStateBackend())

    val pipeline1 = env.generateSequence(0, 1000)

    val pipeline2 = env.fromElements("even", "odd")

    pipeline1.connect(pipeline2)
      .keyBy(
        elem ⇒ elem % 2 == 0,
        elem ⇒ elem == "even"
      ).flatMap(FlatMapper)
      .print()

    env.execute("Example")
  }

}

but this results in this precondition 
failing<https://github.com/apache/flink/blob/6f0faf9bb35e7cac3a38ed792cdabd6400fc4c79/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java#L88>
 on updates.

Seth Wiesman

Reply via email to