Hi Ted

I added the following hack to my gradle project. I am now able to run spark
streaming unit tests in my project.

Hopefully others will find this helpful

andy

dependencies {

        provided    group: 'commons-cli',      name: 'commons-cli',
version: '1.3+'

        provided    group: 'org.apache.spark', name: 'spark-sql_2.10',
version: '1.5.2'

provided    group: 'org.apache.spark', name: 'spark-streaming_2.10',
version: '1.5.2'

        provided    group: 'org.apache.spark', name:
'spark-streaming-twitter_2.10', version: '1.5.2'

    

        testCompile 'org.apache.spark:spark-streaming_2.10:1.5.2'

        testCompile group: 'org.scalatest',    name: 'scalatest_2.10',
version: '2.2.1'

        

        // gradle can not find  dependencies that end in '-tests.jar' E.G.
spark-streaming_2.10-1.5.2-tests.jar

        // as a work around we checked the binary into git

   

        File sparkStreamingTestJar =
file('src/test/resources/sparkTestJarFiles/spark-streaming_2.10-1.5.2-tests.
jar')

        testCompile files(sparkStreamingTestJar)

       

        File sparkCoreTestJar =
file('src/test/resources/sparkTestJarFiles/spark-core_2.10-1.5.2-tests.jar')

        testCompile files(sparkCoreTestJar)

}



From:  Andrew Davidson <a...@santacruzintegration.com>
Date:  Wednesday, December 16, 2015 at 5:37 PM
To:  Ted Yu <yuzhih...@gmail.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: looking for Spark streaming unit example written in Java

> Hi Ted
> 
> I am having a heck of a time trying use the JavaAPISuite code in my project.
> 
> I basically copied testTransform() into my local project. I can compile it
> using java 8 how ever I can not seem to get it to run.
> 
> com.pws.fantasy.ml.SparkStreamingTransformerTest > testTransform FAILED
> 
>     java.lang.NoClassDefFoundError at SparkStreamingTransformerTest.java:73
> 
>         Caused by: java.lang.ClassNotFoundException at
> SparkStreamingTransformerTest.java:73
> 
> 
> 
> Line 73 is
> 
>       JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc,
> inputData, 1);
> 
> 
> 
> 
> 
> This class is defined in
> spark-1.5.2/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.s
> cala
> 
> 
> 
> Do you know if the test jars are published? As a work around I downloaded the
> spark source and ran Œmvn install¹
> 
> 
> 
> $ ls -l 
> ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-streaming_2
> .10-1.5.2-tests.jar
> 
> -rw-r--r‹  1 ~  staff   1.8M Dec 16 16:37
> ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-streaming_2
> .10-1.5.2-tests.jar
> 
> $ 
> 
> 
> $ jar -tf 
> ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-streaming_2
> .10-1.5.2-tests.jar | grep JavaTestUtils
> 
> org/apache/spark/streaming/JavaTestUtils$.class
> 
> org/apache/spark/streaming/JavaTestUtils.class
> 
> $ 
> 
> 
> 
> My local project builds using gradle (I have never use grade before). I tried
> lots of hacks.  Not sure why it I can not use JavaTestUtils
> 
> 
> 
> dependencies {
> 
>         provided    group: 'commons-cli',      name: 'commons-cli',
> version: '1.3+'
> 
>         provided    group: 'org.apache.spark', name: 'spark-sql_2.10',
> version: '1.5.2'
> 
> // provided    group: 'org.apache.spark', name: 'spark-streaming_2.10',
> version: '1.5.2'
> 
>         provided 'org.apache.spark:spark-streaming_2.10:1.5.2'
> 
>  
> 
>         // strange works a little better if testCompile is delcared twice
> 
>         // testCompile 'org.apache.spark:spark-streaming_2.10:1.5.2'
> 
>         // testCompile 'org.apache.spark:spark-streaming_2.10:1.5.2-tests'
> 
>         testCompile
> files(Œ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-stre
> aming_2.10-1.5.2-tests.jar¹)
> 
> 
> 
>         testCompile
> files(Œ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-stre
> aming_2.10-1.5.2-tests.jar')
> 
>         testRuntime
> files(Œ~/.m2/repository/org/apache/spark/spark-streaming_2.10/1.5.2/spark-stre
> aming_2.10-1.5.2-tests.jar')
> 
>  }
> 
> 
> 
> Any suggestions would be greatly appreciated.
> 
> 
> 
> Andy
> 
> 
> 
> $ javap JavaTestUtils.class
> 
> Compiled from "JavaTestUtils.scala"
> 
> public final class org.apache.spark.streaming.JavaTestUtils {
> 
>   public static org.scalatest.Status run(scala.Option<java.lang.String>,
> org.scalatest.Args);
> 
>   public static org.scalatest.Status runTest(java.lang.String,
> org.scalatest.Args);
> 
>   public static void after(scala.Function0<java.lang.Object>);
> 
>   public static void before(scala.Function0<java.lang.Object>);
> 
>   public static <U, V> int setupStreams$default$3();
> 
>   public static <U, V> boolean testOperation$default$4();
> 
>   public static <U, V, W> void
> testOperation(scala.collection.Seq<scala.collection.Seq<U>>,
> scala.collection.Seq<scala.collection.Seq<V>>,
> scala.Function2<org.apache.spark.streaming.dstream.DStream<U>,
> org.apache.spark.streaming.dstream.DStream<V>,
> org.apache.spark.streaming.dstream.DStream<W>>,
> scala.collection.Seq<scala.collection.Seq<W>>, int, boolean,
> scala.reflect.ClassTag<U>, scala.reflect.ClassTag<V>,
> scala.reflect.ClassTag<W>);
> 
>   public static <U, V, W> void
> testOperation(scala.collection.Seq<scala.collection.Seq<U>>,
> scala.collection.Seq<scala.collection.Seq<V>>,
> scala.Function2<org.apache.spark.streaming.dstream.DStream<U>,
> org.apache.spark.streaming.dstream.DStream<V>,
> org.apache.spark.streaming.dstream.DStream<W>>,
> scala.collection.Seq<scala.collection.Seq<W>>, boolean,
> scala.reflect.ClassTag<U>, scala.reflect.ClassTag<V>,
> scala.reflect.ClassTag<W>);
> 
>   public static <U, V> void
> testOperation(scala.collection.Seq<scala.collection.Seq<U>>,
> scala.Function1<org.apache.spark.streaming.dstream.DStream<U>,
> org.apache.spark.streaming.dstream.DStream<V>>,
> scala.collection.Seq<scala.collection.Seq<V>>, int, boolean,
> scala.reflect.ClassTag<U>, scala.reflect.ClassTag<V>);
> 
>   public static <U, V> void
> testOperation(scala.collection.Seq<scala.collection.Seq<U>>,
> scala.Function1<org.apache.spark.streaming.dstream.DStream<U>,
> org.apache.spark.streaming.dstream.DStream<V>>,
> scala.collection.Seq<scala.collection.Seq<V>>, boolean,
> scala.reflect.ClassTag<U>, scala.reflect.ClassTag<V>);
> 
>   public static <V> void
> verifyOutput(scala.collection.Seq<scala.collection.Seq<V>>,
> scala.collection.Seq<scala.collection.Seq<V>>, boolean,
> scala.reflect.ClassTag<V>);
> 
>   public static <V>
> scala.collection.Seq<scala.collection.Seq<scala.collection.Seq<V>>>
> runStreamsWithPartitions(org.apache.spark.streaming.StreamingContext, int,
> int, scala.reflect.ClassTag<V>);
> 
>   public static <V> scala.collection.Seq<scala.collection.Seq<V>>
> runStreams(org.apache.spark.streaming.StreamingContext, int, int,
> scala.reflect.ClassTag<V>);
> 
>   public static <U, V, W> org.apache.spark.streaming.StreamingContext
> setupStreams(scala.collection.Seq<scala.collection.Seq<U>>,
> scala.collection.Seq<scala.collection.Seq<V>>,
> scala.Function2<org.apache.spark.streaming.dstream.DStream<U>,
> org.apache.spark.streaming.dstream.DStream<V>,
> org.apache.spark.streaming.dstream.DStream<W>>, scala.reflect.ClassTag<U>,
> scala.reflect.ClassTag<V>, scala.reflect.ClassTag<W>);
> 
>   public static <U, V> org.apache.spark.streaming.StreamingContext
> setupStreams(scala.collection.Seq<scala.collection.Seq<U>>,
> scala.Function1<org.apache.spark.streaming.dstream.DStream<U>,
> org.apache.spark.streaming.dstream.DStream<V>>, int,
> scala.reflect.ClassTag<U>, scala.reflect.ClassTag<V>);
> 
>   public static <R> R withTestServer(org.apache.spark.streaming.TestServer,
> scala.Function1<org.apache.spark.streaming.TestServer, R>);
> 
>   public static <R> R
> withStreamingContext(org.apache.spark.streaming.StreamingContext,
> scala.Function1<org.apache.spark.streaming.StreamingContext, R>);
> 
>   public static void afterFunction();
> 
>   public static void beforeFunction();
> 
>   public static boolean actuallyWait();
> 
>   public static boolean useManualClock();
> 
>   public static int numInputPartitions();
> 
>   public static org.apache.spark.streaming.Duration batchDuration();
> 
>   public static java.lang.String master();
> 
>   public static java.lang.String framework();
> 
>   public static void
> org$apache$spark$streaming$TestSuiteBase$_setter_$eventuallyTimeout_$eq(org.sc
> alatest.concurrent.PatienceConfiguration$Timeout);
> 
>   public static void
> org$apache$spark$streaming$TestSuiteBase$_setter_$conf_$eq(org.apache.spark.Sp
> arkConf);
> 
>   public static org.scalatest.concurrent.PatienceConfiguration$Timeout
> eventuallyTimeout();
> 
>   public static org.apache.spark.SparkConf conf();
> 
>   public static java.lang.String checkpointDir();
> 
>   public static <V> java.util.List<java.util.List<java.util.List<V>>>
> runStreamsWithPartitions(org.apache.spark.streaming.api.java.JavaStreamingCont
> ext, int, int);
> 
>   public static <V> java.util.List<java.util.List<V>>
> runStreams(org.apache.spark.streaming.api.java.JavaStreamingContext, int,
> int);
> 
>   public static <T, This extends
> org.apache.spark.streaming.api.java.JavaDStreamLike<T, This, R>, R extends
> org.apache.spark.api.java.JavaRDDLike<T, R>>
> org.apache.spark.streaming.dstream.DStream<scala.runtime.BoxedUnit>
> attachTestOutputStream(org.apache.spark.streaming.api.java.JavaDStreamLike<T,
> This, R>);
> 
>   public static <T> org.apache.spark.streaming.api.java.JavaDStream<T>
> attachTestInputStream(org.apache.spark.streaming.api.java.JavaStreamingContext
> , java.util.List<java.util.List<T>>, int);
> 
>   public static int maxWaitTimeMillis();
> 
>   public static boolean isTraceEnabled();
> 
>   public static void logError(scala.Function0<java.lang.String>,
> java.lang.Throwable);
> 
>   public static void logWarning(scala.Function0<java.lang.String>,
> java.lang.Throwable);
> 
>   public static void logTrace(scala.Function0<java.lang.String>,
> java.lang.Throwable);
> 
>   public static void logDebug(scala.Function0<java.lang.String>,
> java.lang.Throwable);
> 
>   public static void logInfo(scala.Function0<java.lang.String>,
> java.lang.Throwable);
> 
>   public static void logError(scala.Function0<java.lang.String>);
> 
>   public static void logWarning(scala.Function0<java.lang.String>);
> 
>   public static void logTrace(scala.Function0<java.lang.String>);
> 
>   public static void logDebug(scala.Function0<java.lang.String>);
> 
>   public static void logInfo(scala.Function0<java.lang.String>);
> 
>   public static org.slf4j.Logger log();
> 
>   public static java.lang.String logName();
> 
>   public static <T>
> org.scalactic.TripleEqualsSupport$TripleEqualsInvocationOnSpread<T>
> $bang$eq$eq(org.scalactic.TripleEqualsSupport$Spread<T>);
> 
>   public static <T>
> org.scalactic.TripleEqualsSupport$TripleEqualsInvocationOnSpread<T>
> $eq$eq$eq(org.scalactic.TripleEqualsSupport$Spread<T>);
> 
>   public static 
> org.scalactic.TripleEqualsSupport$TripleEqualsInvocation<scala.runtime.Null$>
> $bang$eq$eq(scala.runtime.Null$);
> 
>   public static 
> org.scalactic.TripleEqualsSupport$TripleEqualsInvocation<scala.runtime.Null$>
> $eq$eq$eq(scala.runtime.Null$);
> 
>   public static <T>
> org.scalactic.TripleEqualsSupport$TripleEqualsInvocation<T> $bang$eq$eq(T);
> 
>   public static <T>
> org.scalactic.TripleEqualsSupport$TripleEqualsInvocation<T> $eq$eq$eq(T);
> 
>   public static <A> org.scalactic.Equality<A> defaultEquality();
> 
>   public static <A, B> org.scalactic.Constraint<A, B>
> convertEquivalenceToBToAConversionConstraint(org.scalactic.Equivalence<A>,
> scala.Function1<B, A>);
> 
>   public static <A, B> org.scalactic.Constraint<A, B>
> conversionCheckedConstraint(org.scalactic.Equivalence<A>, scala.Function1<B,
> A>);
> 
>   public static <A, B> org.scalactic.Constraint<A, B>
> convertEquivalenceToAToBConversionConstraint(org.scalactic.Equivalence<B>,
> scala.Function1<A, B>);
> 
>   public static <A, B> org.scalactic.Constraint<A, B>
> lowPriorityConversionCheckedConstraint(org.scalactic.Equivalence<B>,
> scala.Function1<A, B>);
> 
>   public static <A, B> org.scalactic.Constraint<A, B>
> convertEquivalenceToBToAConstraint(org.scalactic.Equivalence<A>,
> scala.Predef$$less$colon$less<B, A>);
> 
>   public static <A, B> org.scalactic.Constraint<A, B>
> typeCheckedConstraint(org.scalactic.Equivalence<A>,
> scala.Predef$$less$colon$less<B, A>);
> 
>   public static <A, B> org.scalactic.Constraint<A, B>
> convertEquivalenceToAToBConstraint(org.scalactic.Equivalence<B>,
> scala.Predef$$less$colon$less<A, B>);
> 
>   public static <A, B> org.scalactic.Constraint<A, B>
> lowPriorityTypeCheckedConstraint(org.scalactic.Equivalence<B>,
> scala.Predef$$less$colon$less<A, B>);
> 
>   public static <A, B> org.scalactic.Constraint<A, B>
> unconstrainedEquality(org.scalactic.Equality<A>);
> 
>   public static <T>
> org.scalactic.TripleEqualsSupport$LegacyCheckingEqualizer<T>
> convertToLegacyCheckingEqualizer(T);
> 
>   public static <T> org.scalactic.TripleEqualsSupport$LegacyEqualizer<T>
> convertToLegacyEqualizer(T);
> 
>   public static <T> org.scalactic.TripleEqualsSupport$CheckingEqualizer<T>
> convertToCheckingEqualizer(T);
> 
>   public static <T> org.scalactic.TripleEqualsSupport$Equalizer<T>
> convertToEqualizer(T);
> 
>   public static <T> T withClue(java.lang.Object, scala.Function0<T>);
> 
>   public static scala.runtime.Nothing$ cancel(java.lang.Throwable);
> 
>   public static scala.runtime.Nothing$ cancel(java.lang.String,
> java.lang.Throwable);
> 
>   public static scala.runtime.Nothing$ cancel(java.lang.String);
> 
>   public static scala.runtime.Nothing$ cancel();
> 
>   public static scala.runtime.Nothing$ fail(java.lang.Throwable);
> 
>   public static scala.runtime.Nothing$ fail(java.lang.String,
> java.lang.Throwable);
> 
>   public static scala.runtime.Nothing$ fail(java.lang.String);
> 
>   public static scala.runtime.Nothing$ fail();
> 
>   public static void expect(java.lang.Object, java.lang.Object);
> 
>   public static void expectResult(java.lang.Object, java.lang.Object);
> 
>   public static void assertResult(java.lang.Object, java.lang.Object);
> 
>   public static void expect(java.lang.Object, java.lang.Object,
> java.lang.Object);
> 
>   public static void expectResult(java.lang.Object, java.lang.Object,
> java.lang.Object);
> 
>   public static void assertResult(java.lang.Object, java.lang.Object,
> java.lang.Object);
> 
>   public static <T> java.lang.Throwable trap(scala.Function0<T>);
> 
>   public static <T> T intercept(scala.Function0<java.lang.Object>,
> scala.reflect.Manifest<T>);
> 
>   public static void assume(scala.Option<java.lang.String>);
> 
>   public static void assume(scala.Option<java.lang.String>, java.lang.Object);
> 
>   public static void assert(scala.Option<java.lang.String>);
> 
>   public static void assert(scala.Option<java.lang.String>, java.lang.Object);
> 
>   public static void
> org$scalatest$Assertions$_setter_$assertionsHelper_$eq(org.scalatest.Assertion
> s$AssertionsHelper);
> 
>   public static org.scalatest.Assertions$AssertionsHelper assertionsHelper();
> 
>   public static org.scalatest.Status run(scala.Option<java.lang.String>,
> org.scalatest.Reporter, org.scalatest.Stopper, org.scalatest.Filter,
> scala.collection.immutable.Map<java.lang.String, java.lang.Object>,
> scala.Option<org.scalatest.Distributor>, org.scalatest.Tracker);
> 
>   public static boolean execute$default$7();
> 
>   public static boolean execute$default$6();
> 
>   public static boolean execute$default$5();
> 
>   public static boolean execute$default$4();
> 
>   public static boolean execute$default$3();
> 
>   public static org.scalatest.ConfigMap execute$default$2();
> 
>   public static java.lang.String execute$default$1();
> 
>   public static scala.Option<java.lang.String> rerunner();
> 
>   public static int expectedTestCount(org.scalatest.Filter);
> 
>   public static void
> pendingUntilFixed(scala.Function0<scala.runtime.BoxedUnit>);
> 
>   public static org.scalatest.PendingNothing pending();
> 
>   public static java.lang.String suiteId();
> 
>   public static java.lang.String suiteName();
> 
>   public static org.scalatest.Status runNestedSuites(org.scalatest.Args);
> 
>   public static org.scalatest.Outcome
> withFixture(org.scalatest.Suite$NoArgTest);
> 
>   public static void execute();
> 
>   public static void execute(java.lang.String, org.scalatest.ConfigMap,
> boolean, boolean, boolean, boolean, boolean);
> 
>   public static scala.collection.immutable.IndexedSeq<org.scalatest.Suite>
> nestedSuites();
> 
>   public static void
> org$scalatest$Suite$_setter_$styleName_$eq(java.lang.String);
> 
>   public static org.scalatest.ConfigMap testDataFor$default$2();
> 
>   public static org.scalatest.TestData testDataFor(java.lang.String,
> org.scalatest.ConfigMap);
> 
>   public static void testsFor(scala.runtime.BoxedUnit);
> 
>   public static org.scalatest.Status runTests(scala.Option<java.lang.String>,
> org.scalatest.Args);
> 
>   public static scala.collection.immutable.Map<java.lang.String,
> scala.collection.immutable.Set<java.lang.String>> tags();
> 
>   public static scala.collection.immutable.Set<java.lang.String> testNames();
> 
>   public static void ignore(java.lang.String,
> scala.collection.Seq<org.scalatest.Tag>,
> scala.Function0<scala.runtime.BoxedUnit>);
> 
>   public static void test(java.lang.String,
> scala.collection.Seq<org.scalatest.Tag>,
> scala.Function0<scala.runtime.BoxedUnit>);
> 
>   public static void registerIgnoredTest(java.lang.String,
> scala.collection.Seq<org.scalatest.Tag>,
> scala.Function0<scala.runtime.BoxedUnit>);
> 
>   public static void registerTest(java.lang.String,
> scala.collection.Seq<org.scalatest.Tag>,
> scala.Function0<scala.runtime.BoxedUnit>);
> 
>   public static org.scalatest.Documenter markup();
> 
>   public static org.scalatest.Alerter alert();
> 
>   public static org.scalatest.Notifier note();
> 
>   public static org.scalatest.Informer info();
> 
>   public static void
> org$scalatest$FunSuiteLike$_setter_$styleName_$eq(java.lang.String);
> 
>   public static java.lang.String styleName();
> 
>   public static java.lang.String toString();
> 
> }
> 
> $ 
> 
> 
> 
> From:  Ted Yu <yuzhih...@gmail.com>
> Date:  Tuesday, December 15, 2015 at 7:09 PM
> To:  Andrew Davidson <a...@santacruzintegration.com>
> Cc:  "user @spark" <user@spark.apache.org>
> Subject:  Re: looking for Spark streaming unit example written in Java
> 
>> Have you taken a look at
>> streaming/src/test//java/org/apache/spark/streaming/JavaAPISuite.java ?
>> 
>>     JavaDStream<Integer> stream = ssc.queueStream(rdds);
>>     JavaTestUtils.attachTestOutputStream(stream);
>> 
>> FYI
>> 
>> On Tue, Dec 15, 2015 at 6:36 PM, Andy Davidson
>> <a...@santacruzintegration.com> wrote:
>>> I am having a heck of a time writing a simple Junit test for my spark
>>> streaming code. The best code example I have been able to find is
>>> http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/ unfortunately
>>> it is written in Spock and Scala. I am having trouble figuring out how to
>>> get it to work in Java
>>> 
>>> Seems like the key to the scala example is the ClockWrapper class
>>> https://github.com/mkuthan/example-spark/blob/master/src/test/scala/org/apac
>>> he/spark/ClockWrapper.scala . I have not figured out how to get something
>>> like that to work in Java.
>>> 
>>> My first unit test is very simple
>>> 
>>> 
>>> 
>>> public class ParseTest {
>>> 
>>>     public static JavaStreamingContext javaStreamingContext= null;
>>> 
>>>     public static SparkContext sparkContext = null;
>>> 
>>>     public static JavaSparkContext javaSparkContext = null;
>>> 
>>>     public static SQLContext sqlContext = null;
>>> 
>>>     public static volatile boolean runningTestSuite = false;
>>> 
>>> 
>>> 
>>>     @BeforeClass
>>> 
>>>     public static void setUpBeforeClass() throws Exception {
>>> 
>>>         if (!runningTestSuite) {
>>> 
>>>             SparkConf conf = new
>>> SparkConf().setMaster(master).setAppName(appName);
>>> 
>>> 
>>> 
>>>             sparkContext = new SparkContext(conf);
>>> 
>>>             javaSparkContext = new JavaSparkContext(sparkContext);
>>> 
>>>             sqlContext = new org.apache.spark.sql.SQLContext(sparkContext);
>>> 
>>>             
>>> 
>>>             Duration batchInterval = Durations.seconds(batchIntervalInSec);
>>> 
>>>             javaStreamingContext = new
>>> JavaStreamingContext(javaSparkContext, batchInterval);
>>> 
>>>             
>>> 
>>>             String checkpointDir =
>>> Files.createTempDirectory(appName).toString();
>>> 
>>>             javaStreamingContext.checkpoint(checkpointDir);
>>> 
>>>             
>>> 
>>>             runningTestSuite = true;
>>> 
>>>         }
>>> 
>>>     }
>>> 
>>> 
>>> 
>>>     @AfterClass
>>> 
>>>     public static void tearDownAfterClass() throws Exception {
>>> 
>>>         if (runningTestSuite && javaSparkContext != null) {
>>> 
>>>             javaSparkContext.stop();
>>> 
>>>             javaSparkContext = null;
>>> 
>>>             sqlContext = null;
>>> 
>>>             sparkContext = null;
>>> 
>>>             runningTestSuite = false;
>>> 
>>>         }
>>> 
>>>     }
>>> 
>>>     
>>> 
>>>     @Test
>>> 
>>>     public void test() {
>>> 
>>>      // 1) create a list of of pojo objects
>>> 
>>>         ...
>>> 
>>>         
>>> 
>>>      // 2) convert list of pojo objects to RDD
>>> 
>>>         JavaRDD<MyPojo> pojoRDD = javaSparkContext.parallelize(listOfPojos);
>>> 
>>>         
>>> 
>>>       // 3) create a  QueueInputDStream
>>> 
>>>         Queue<JavaRDD<MyPojo>> rddQueue = new LinkedList<JavaRDD<MyPojo>>();
>>> 
>>>         rddQueue.add(pojoRDD);
>>> 
>>>         
>>> 
>>>         // covert to DStream
>>> 
>>>         JavaDStream<Status> tweets =
>>> javaStreamingContext.queueStream(rddQueue);
>>> 
>>>         
>>> 
>>>         javaStreamingContext.start();
>>> 
>>>         //javaStreamingContext.awaitTermination();
>>> 
>>>         
>>> 
>>>         Thread.sleep(3 * 1000); // looks like this would not be needed if
>>> ClockWrapper worked
>>> 
>>> 
>>> 
>>> Sample scala spock code has asserts after start(). In java I am not able to
>>> work with the JavaDStreams after start
>>> 
>>>         java.lang.IllegalStateException: Adding new inputs, transformations,
>>> and output operations after starting a context is not supported
>>> 
>>> at 
>>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:220)
>>> 
>>> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>> 
>>> at 
>>> org.apache.spark.streaming.dstream.ForEachDStream.<init>(ForEachDStream.scal
>>> a:26)
>>> 
>>> at 
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$2.apply(DStre
>>> am.scala:642)
>>> 
>>> at 
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$2.apply(DStre
>>> am.scala:642)
>>> 
>>> at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:14
>>> 7)
>>> 
>>> at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:10
>>> 8)
>>> 
>>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
>>> 
>>> at 
>>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala
>>> :266)
>>> 
>>> at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:638)
>>> 
>>> at 
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$s
>>> p(DStream.scala:631)
>>> 
>>> at 
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStre
>>> am.scala:629)
>>> 
>>> at 
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStre
>>> am.scala:629)
>>> 
>>> at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:14
>>> 7)
>>> 
>>> at 
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:10
>>> 8)
>>> 
>>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
>>> 
>>> at 
>>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala
>>> :266)
>>> 
>>> at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629)
>>> 
>>> at 
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$class.foreachRDD(JavaDSt
>>> reamLike.scala:315)
>>> 
>>> at 
>>> org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaD
>>> StreamLike.scala:43)
>>> 
>>> at 
>>> com.pws.fantasySport.spark.streaming.util.JavaDStreamCount.hack(JavaDStreamC
>>> ount.java:20)
>>> 
>>> at com.pws.fantasySport.ml.ParseTweetsTest.test(ParseTweetsTest.java:105)
>>> 
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> 
>>> at 
>>> 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62>>>
)
>>> 
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
>>> .java:43)
>>> 
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> 
>>> at 
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.
>>> java:50)
>>> 
>>> at 
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.j
>>> ava:12)
>>> 
>>> at 
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.ja
>>> va:47)
>>> 
>>> at 
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.jav
>>> a:17)
>>> 
>>> at 
>>> 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26>>>
)
>>> 
>>> at 
>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>> 
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>> 
>>> at 
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav
>>> a:78)
>>> 
>>> at 
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.jav
>>> a:57)
>>> 
>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>> 
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>> 
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>> 
>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>> 
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>> 
>>> at 
>>> 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26>>>
)
>>> 
>>> at 
>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>> 
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>> 
>>> at 
>>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestRef
>>> erence.java:50)
>>> 
>>> at 
>>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:3
>>> 8)
>>> 
>>> at 
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu
>>> nner.java:459)
>>> 
>>> at 
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRu
>>> nner.java:675)
>>> 
>>> at 
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.
>>> java:382)
>>> 
>>> at 
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner
>>> .java:192)
>>> 
>>> 
>>> 
>>>     }
>>> 
>>> }
>> 


Reply via email to