Hey everyone,

I have the following example that creates a windowed dstream, creates another window based on the first, and then prints out the results. I can run the test back to back and in some cases I get the expected results, but then in other cases I get no output in the 2nd level streams.

Is there a specific way to structure the windowed streams that always gives the expected results? Or what would cause the 2nd level streams to be completely empty in some runs but not others? Right now it seems like some kind of timing bug maybe?


package com.chris
import org.apache.spark.SparkContext
import org.apache.spark.streaming.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.collection.mutable.SynchronizedQueue
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite

class StreamingTestSuite extends FunSuite {
def mkSubStream (name: String, root: DStream[Int], windowDuration: Int, slideInterval: Int): DStream[Int] = {
    val w = root.window(Seconds(windowDuration), Seconds(slideInterval))
    w.foreach(rdd => {
      println("New "+name)
      rdd.foreach(k => println("\t"+name+"\t"+k))
    })
    w
  }

  test("Test streaming") {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)

val sc = new StreamingContext("local", "Watcher", Seconds(1), null, null, null)
    val rddQueue = new SynchronizedQueue[RDD[Int]]()

    // Create the QueueInputDStream and use it do some processing
    val inputStream = sc.queueStream(rddQueue)
    val w5Stream = mkSubStream("base", inputStream, 5, 5)

//create 2 new windowed streams based on the other, and print out the results //NOTE: sometimes the correct values are printed, other times nothing is printed... don't know why?
    mkSubStream("5+05", w5Stream, 5, 10)
    mkSubStream("10+05", w5Stream, 10, 10)

    sc.start()
    for (i <- 1 to 300) {
      rddQueue += sc.sparkContext.makeRDD(i to i, 1)
    }
    readLine()
    sc.stop()
  }
}

Thanks,
Chris Regnier
-------------------------
Visualization Developer
Oculus Info Inc.

~

Reply via email to