I am having a heck of a time writing a simple Junit test for my spark
streaming code. The best code example I have been able to find is
http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/ unfortunately
it is written in Spock and Scala. I am having trouble figuring out how to
get it to work in Java

Seems like the key to the scala example is the ClockWrapper class
https://github.com/mkuthan/example-spark/blob/master/src/test/scala/org/apac
he/spark/ClockWrapper.scala . I have not figured out how to get something
like that to work in Java.

My first unit test is very simple



public class ParseTest {

    public static JavaStreamingContext javaStreamingContext= null;

    public static SparkContext sparkContext = null;

    public static JavaSparkContext javaSparkContext = null;

    public static SQLContext sqlContext = null;

    public static volatile boolean runningTestSuite = false;



    @BeforeClass

    public static void setUpBeforeClass() throws Exception {

        if (!runningTestSuite) {

            SparkConf conf = new
SparkConf().setMaster(master).setAppName(appName);



            sparkContext = new SparkContext(conf);

            javaSparkContext = new JavaSparkContext(sparkContext);

            sqlContext = new org.apache.spark.sql.SQLContext(sparkContext);

            

            Duration batchInterval = Durations.seconds(batchIntervalInSec);

            javaStreamingContext = new
JavaStreamingContext(javaSparkContext, batchInterval);

            

            String checkpointDir =
Files.createTempDirectory(appName).toString();

            javaStreamingContext.checkpoint(checkpointDir);

            

            runningTestSuite = true;

        }

    }



    @AfterClass

    public static void tearDownAfterClass() throws Exception {

        if (runningTestSuite && javaSparkContext != null) {

            javaSparkContext.stop();

            javaSparkContext = null;

            sqlContext = null;

            sparkContext = null;

            runningTestSuite = false;

        }

    }

    

    @Test

    public void test() {

     // 1) create a list of of pojo objects

        ...

        

     // 2) convert list of pojo objects to RDD

        JavaRDD<MyPojo> pojoRDD = javaSparkContext.parallelize(listOfPojos);

        

      // 3) create a  QueueInputDStream

        Queue<JavaRDD<MyPojo>> rddQueue = new LinkedList<JavaRDD<MyPojo>>();

        rddQueue.add(pojoRDD);

        

        // covert to DStream

        JavaDStream<Status> tweets =
javaStreamingContext.queueStream(rddQueue);

        

        javaStreamingContext.start();

        //javaStreamingContext.awaitTermination();

        

        Thread.sleep(3 * 1000); // looks like this would not be needed if
ClockWrapper worked



Sample scala spock code has asserts after start(). In java I am not able to
work with the JavaDStreams after start

        java.lang.IllegalStateException: Adding new inputs, transformations,
and output operations after starting a context is not supported

at 
org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)

at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)

at 
org.apache.spark.streaming.dstream.ForEachDStream.<init>(ForEachDStream.scal
a:26)

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$2.apply(DStre
am.scala:642)

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$2.apply(DStre
am.scala:642)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:14
7)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:10
8)

at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)

at 
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala
:266)

at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:638)

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$s
p(DStream.scala:631)

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStre
am.scala:629)

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStre
am.scala:629)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:14
7)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:10
8)

at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)

at 
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala
:266)

at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629)

at 
org.apache.spark.streaming.api.java.JavaDStreamLike$class.foreachRDD(JavaDSt
reamLike.scala:315)

at 
org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaD
StreamLike.scala:43)

at 
com.pws.fantasySport.spark.streaming.util.JavaDStreamCount.hack(JavaDStreamC
ount.java:20)

at com.pws.fantasySport.ml.ParseTweetsTest.test(ParseTweetsTest.java:105)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.
java:50)

at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.j
ava:12)

at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.ja
va:47)

at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.jav
a:17)

at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26
)

at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)

at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)

at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav
a:78)

at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav
a:57)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)

at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)

at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)

at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)

at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)

at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26
)

at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)

at org.junit.runners.ParentRunner.run(ParentRunner.java:363)

at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestRef
erence.java:50)

at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:3
8)

at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu
nner.java:459)

at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu
nner.java:675)

at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.
java:382)

at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner
.java:192)



    }

}


Reply via email to