What are the standard approaches for testing a streaming algorithm?  I have been
able to come up with the below where I

1) create a data source that emits events in bunches with set times so that I
know the events will be in the same window,
2) end the stream with a mapWithState where the state checks if the expected
elements pass by in the expected order.

This does not seem like the most robust way of doing this.  Suggestions?


import java.io.{FileWriter, StringWriter}
import java.util.{Calendar, Date, Properties}

import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.slf4j.{Logger, LoggerFactory}

import scala.math.max
import scala.util.{Random, hashing}
import java.time

object SessionizePageviewsTT {

  val logger: Logger = LoggerFactory.getLogger("SessionizePageviewsTT") // 
classOf doesn't work on an object

  def get_now_ms(): Long = {

  def main(args: Array[String]) {

    val env: StreamExecutionEnvironment = 

    val xs = 1 to 30
    def sourceFF(scx: SourceContext[Int]): Unit = {
      var cur = 1
      var now: Long = get_now_ms()
      while (cur < 31) {
        // every 10 wait 10 seconds and then burst a bunch
        if (cur % 10 == 0) {
          now = get_now_ms()
        println("emiting: " + cur + ", " + now)
        scx.collectWithTimestamp(cur, now)
        cur += 1

    val x: DataStream[Int] = env.addSource(sourceFF _)

    val vals = List(45, 145);

    def checkFF(xy: (Int, Int), s: Option[Int]): ((Int, Int), Option[Int]) = {
      val idx = if (s.isDefined) s.get else 0
      if (idx < vals.length) {
        if (xy._1 == vals(idx)) {
          println("all ok")
        } else {
          println("error error")

      (xy, Some(idx + 1))

    x.map(x => (x,1)).keyBy(1).timeWindow(Time.seconds(10)).sum(0).keyBy(x => 


