I am having a heck of a time writing a simple Junit test for my spark streaming code. The best code example I have been able to find is http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/ unfortunately it is written in Spock and Scala. I am having trouble figuring out how to get it to work in Java
Seems like the key to the scala example is the ClockWrapper class https://github.com/mkuthan/example-spark/blob/master/src/test/scala/org/apac he/spark/ClockWrapper.scala . I have not figured out how to get something like that to work in Java. My first unit test is very simple public class ParseTest { public static JavaStreamingContext javaStreamingContext= null; public static SparkContext sparkContext = null; public static JavaSparkContext javaSparkContext = null; public static SQLContext sqlContext = null; public static volatile boolean runningTestSuite = false; @BeforeClass public static void setUpBeforeClass() throws Exception { if (!runningTestSuite) { SparkConf conf = new SparkConf().setMaster(master).setAppName(appName); sparkContext = new SparkContext(conf); javaSparkContext = new JavaSparkContext(sparkContext); sqlContext = new org.apache.spark.sql.SQLContext(sparkContext); Duration batchInterval = Durations.seconds(batchIntervalInSec); javaStreamingContext = new JavaStreamingContext(javaSparkContext, batchInterval); String checkpointDir = Files.createTempDirectory(appName).toString(); javaStreamingContext.checkpoint(checkpointDir); runningTestSuite = true; } } @AfterClass public static void tearDownAfterClass() throws Exception { if (runningTestSuite && javaSparkContext != null) { javaSparkContext.stop(); javaSparkContext = null; sqlContext = null; sparkContext = null; runningTestSuite = false; } } @Test public void test() { // 1) create a list of of pojo objects ... // 2) convert list of pojo objects to RDD JavaRDD<MyPojo> pojoRDD = javaSparkContext.parallelize(listOfPojos); // 3) create a QueueInputDStream Queue<JavaRDD<MyPojo>> rddQueue = new LinkedList<JavaRDD<MyPojo>>(); rddQueue.add(pojoRDD); // covert to DStream JavaDStream<Status> tweets = javaStreamingContext.queueStream(rddQueue); javaStreamingContext.start(); //javaStreamingContext.awaitTermination(); Thread.sleep(3 * 1000); // looks like this would not be needed if ClockWrapper worked Sample scala spock code has asserts after start(). In java I am not able to work with the JavaDStreams after start java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220) at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64) at org.apache.spark.streaming.dstream.ForEachDStream.<init>(ForEachDStream.scal a:26) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$2.apply(DStre am.scala:642) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$2.apply(DStre am.scala:642) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:14 7) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:10 8) at org.apache.spark.SparkContext.withScope(SparkContext.scala:709) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala :266) at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:638) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$s p(DStream.scala:631) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStre am.scala:629) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStre am.scala:629) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:14 7) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:10 8) at org.apache.spark.SparkContext.withScope(SparkContext.scala:709) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala :266) at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.foreachRDD(JavaDSt reamLike.scala:315) at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaD StreamLike.scala:43) at com.pws.fantasySport.spark.streaming.util.JavaDStreamCount.hack(JavaDStreamC ount.java:20) at com.pws.fantasySport.ml.ParseTweetsTest.test(ParseTweetsTest.java:105) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl .java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod. java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.j ava:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.ja va:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.jav a:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26 ) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav a:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav a:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26 ) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestRef erence.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:3 8) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu nner.java:459) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu nner.java:675) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner. java:382) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner .java:192) } }
