Have you taken a look
at streaming/src/test//java/org/apache/spark/streaming/JavaAPISuite.java ?
JavaDStream<Integer> stream = ssc.queueStream(rdds);
JavaTestUtils.attachTestOutputStream(stream);
FYI
On Tue, Dec 15, 2015 at 6:36 PM, Andy Davidson <
[email protected]> wrote:
> I am having a heck of a time writing a simple Junit test for my spark
> streaming code. The best code example I have been able to find is
> http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/ unfortunately
> it is written in Spock and Scala. I am having trouble figuring out how to
> get it to work in Java
>
> Seems like the key to the scala example is the ClockWrapper class
> https://github.com/mkuthan/example-spark/blob/master/src/test/scala/org/apache/spark/ClockWrapper.scala
> .
> I have not figured out how to get something like that to work in Java.
>
> My first unit test is very simple
>
>
>
> public class ParseTest {
>
> public static JavaStreamingContext javaStreamingContext= null;
>
> public static SparkContext sparkContext = null;
>
> public static JavaSparkContext javaSparkContext = null;
>
> public static SQLContext sqlContext = null;
>
> public static volatile boolean runningTestSuite = false;
>
>
> @BeforeClass
>
> public static void setUpBeforeClass() throws Exception {
>
> if (!runningTestSuite) {
>
> SparkConf conf = new SparkConf().setMaster(master).setAppName(
> appName);
>
>
> sparkContext = new SparkContext(conf);
>
> javaSparkContext = new JavaSparkContext(sparkContext);
>
> sqlContext = new org.apache.spark.sql.SQLContext(sparkContext
> );
>
>
>
> Duration batchInterval = Durations.seconds(batchIntervalInSec
> );
>
> javaStreamingContext = new JavaStreamingContext(
> javaSparkContext, batchInterval);
>
>
>
> String checkpointDir = Files.createTempDirectory(appName
> ).toString();
>
> javaStreamingContext.checkpoint(checkpointDir);
>
>
>
> runningTestSuite = true;
>
> }
>
> }
>
>
> @AfterClass
>
> public static void tearDownAfterClass() throws Exception {
>
> if (runningTestSuite && javaSparkContext != null) {
>
> javaSparkContext.stop();
>
> javaSparkContext = null;
>
> sqlContext = null;
>
> sparkContext = null;
>
> runningTestSuite = false;
>
> }
>
> }
>
>
>
> @Test
>
> public void test() {
>
> // 1) create a list of of pojo objects
>
> ...
>
>
>
> // 2) convert list of pojo objects to RDD
>
> JavaRDD<MyPojo> pojoRDD = javaSparkContext.parallelize(listOfPojos
> );
>
>
>
> // 3) create a QueueInputDStream
>
> Queue<JavaRDD<MyPojo>> rddQueue = new LinkedList<JavaRDD<MyPojo
> >>();
>
> rddQueue.add(pojoRDD);
>
>
>
> // covert to DStream
>
> JavaDStream<Status> tweets =
> javaStreamingContext.queueStream(rddQueue);
>
>
>
> javaStreamingContext.start();
>
> //javaStreamingContext.awaitTermination();
>
>
>
> Thread.sleep(3 * 1000); // looks like this would not be needed if
> ClockWrapper
> worked
>
>
> Sample scala spock code has asserts after start(). In java I am not able
> to work with the JavaDStreams after start
>
> java.lang.IllegalStateException: Adding new inputs,
> transformations, and output operations after starting a context is not
> supported
>
> at
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)
>
> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream.<init>(ForEachDStream.scala:26)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$2.apply(DStream.scala:642)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$2.apply(DStream.scala:642)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>
> at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
>
> at
> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:266)
>
> at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:638)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:631)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>
> at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
>
> at
> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:266)
>
> at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629)
>
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$class.foreachRDD(JavaDStreamLike.scala:315)
>
> at
> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:43)
>
> at
> com.pws.fantasySport.spark.streaming.util.JavaDStreamCount.hack(JavaDStreamCount.java:20)
>
> at com.pws.fantasySport.ml.ParseTweetsTest.test(ParseTweetsTest.java:105)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>
> at
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>
> at
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
>
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
>
>
> }
>
> }
>