Hey everyone,
I have the following example that creates a windowed dstream, creates
another window based on the first, and then prints out the results. I
can run the test back to back and in some cases I get the expected
results, but then in other cases I get no output in the 2nd level streams.
Is there a specific way to structure the windowed streams that always
gives the expected results? Or what would cause the 2nd level streams to
be completely empty in some runs but not others? Right now it seems like
some kind of timing bug maybe?
package com.chris
import org.apache.spark.SparkContext
import org.apache.spark.streaming.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.collection.mutable.SynchronizedQueue
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite
class StreamingTestSuite extends FunSuite {
def mkSubStream (name: String, root: DStream[Int], windowDuration:
Int, slideInterval: Int): DStream[Int] = {
val w = root.window(Seconds(windowDuration), Seconds(slideInterval))
w.foreach(rdd => {
println("New "+name)
rdd.foreach(k => println("\t"+name+"\t"+k))
})
w
}
test("Test streaming") {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)
val sc = new StreamingContext("local", "Watcher", Seconds(1), null,
null, null)
val rddQueue = new SynchronizedQueue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
val inputStream = sc.queueStream(rddQueue)
val w5Stream = mkSubStream("base", inputStream, 5, 5)
//create 2 new windowed streams based on the other, and print out
the results
//NOTE: sometimes the correct values are printed, other times
nothing is printed... don't know why?
mkSubStream("5+05", w5Stream, 5, 10)
mkSubStream("10+05", w5Stream, 10, 10)
sc.start()
for (i <- 1 to 300) {
rddQueue += sc.sparkContext.makeRDD(i to i, 1)
}
readLine()
sc.stop()
}
}
Thanks,
Chris Regnier
-------------------------
Visualization Developer
Oculus Info Inc.
~