Re: Testing a streaming data flow

2016-10-17 Thread Stefan Richter
Hi,

I think there are some things that could be helpful for testing your algorithm. 
From the top of my head, first thing is that you could try to test in a more 
„unit-testing“ style, i.e. just write small drivers that inject records to your 
UDFs and check if the output is as expected. 

Other than that, you should use event time instead of processing time for 
reproducible tests. With event time, there has to be no relationship from your 
source to some external timing (like System time) and no need to introduce 
sleeps to your code. You can find examples of this in several places in Flink’s 
tests. In particular, if you are interested in simulating sessions, you could 
take a look at 
org.apache.flink.test.windowing.sessionwindows.SessionWindowITCase and reuse 
some parts of it.

Best,
Stefan


> Am 17.10.2016 um 15:21 schrieb bart.kasterm...@kpn.com:
> 
> What are the standard approaches for testing a streaming algorithm?  I have 
> been
> able to come up with the below where I
> 
> 1) create a data source that emits events in bunches with set times so that I
> know the events will be in the same window,
> 2) end the stream with a mapWithState where the state checks if the expected
> elements pass by in the expected order.
> 
> This does not seem like the most robust way of doing this.  Suggestions?
> 
> Best,
> Bart
> 
> import java.io .{FileWriter, StringWriter}
> import java.util.{Calendar, Date, Properties}
> 
> import com.fasterxml.jackson.annotation.JsonIgnoreProperties
> import com.fasterxml.jackson.databind.ObjectMapper
> import com.fasterxml.jackson.module.scala.DefaultScalaModule
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.functions.source.SourceFunction
> import 
> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, 
> FlinkKafkaProducer09}
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.slf4j.{Logger, LoggerFactory}
> 
> import scala.math.max
> import scala.util.{Random, hashing}
> import java.time
> 
> 
> object SessionizePageviewsTT {
> 
>   val logger: Logger = LoggerFactory.getLogger("SessionizePageviewsTT") // 
> classOf doesn't work on an object
> 
>   def get_now_ms(): Long = {
> System.currentTimeMillis()
>   }
> 
>   def main(args: Array[String]) {
> 
> val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> val xs = 1 to 30
> def sourceFF(scx: SourceContext[Int]): Unit = {
>   var cur = 1
>   var now: Long = get_now_ms()
>   while (cur < 31) {
> // every 10 wait 10 seconds and then burst a bunch
> if (cur % 10 == 0) {
>   Thread.sleep(1)
>   now = get_now_ms()
> }
> println("emiting: " + cur + ", " + now)
> scx.collectWithTimestamp(cur, now)
> cur += 1
>   }
> }
> 
> val x: DataStream[Int] = env.addSource(sourceFF _)
> 
> val vals = List(45, 145);
> 
> def checkFF(xy: (Int, Int), s: Option[Int]): ((Int, Int), Option[Int]) = {
>   val idx = if (s.isDefined) s.get else 0
>   if (idx < vals.length) {
> if (xy._1 == vals(idx)) {
>   println("all ok")
> } else {
>   println("error error")
> }
>   }
> 
>   (xy, Some(idx + 1))
> }
> 
> x.map(x => (x,1)).keyBy(1).timeWindow(Time.seconds(10)).sum(0).keyBy(x => 
> 1).mapWithState(checkFF).print
> 
> env.execute(s"")
>   }
> }



Testing a streaming data flow

2016-10-17 Thread bart.kastermans
What are the standard approaches for testing a streaming algorithm?  I have been
able to come up with the below where I

1) create a data source that emits events in bunches with set times so that I
know the events will be in the same window,
2) end the stream with a mapWithState where the state checks if the expected
elements pass by in the expected order.

This does not seem like the most robust way of doing this.  Suggestions?

Best,
Bart


import java.io.{FileWriter, StringWriter}
import java.util.{Calendar, Date, Properties}

import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.source.SourceFunction
import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, 
FlinkKafkaProducer09}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.slf4j.{Logger, LoggerFactory}

import scala.math.max
import scala.util.{Random, hashing}
import java.time


object SessionizePageviewsTT {

  val logger: Logger = LoggerFactory.getLogger("SessionizePageviewsTT") // 
classOf doesn't work on an object

  def get_now_ms(): Long = {
System.currentTimeMillis()
  }

  def main(args: Array[String]) {

val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment

val xs = 1 to 30
def sourceFF(scx: SourceContext[Int]): Unit = {
  var cur = 1
  var now: Long = get_now_ms()
  while (cur < 31) {
// every 10 wait 10 seconds and then burst a bunch
if (cur % 10 == 0) {
  Thread.sleep(1)
  now = get_now_ms()
}
println("emiting: " + cur + ", " + now)
scx.collectWithTimestamp(cur, now)
cur += 1
  }
}

val x: DataStream[Int] = env.addSource(sourceFF _)

val vals = List(45, 145);

def checkFF(xy: (Int, Int), s: Option[Int]): ((Int, Int), Option[Int]) = {
  val idx = if (s.isDefined) s.get else 0
  if (idx < vals.length) {
if (xy._1 == vals(idx)) {
  println("all ok")
} else {
  println("error error")
}
  }

  (xy, Some(idx + 1))
}

x.map(x => (x,1)).keyBy(1).timeWindow(Time.seconds(10)).sum(0).keyBy(x => 
1).mapWithState(checkFF).print

env.execute(s"")
  }
}