Well, the receiver always runs as part of an executor. When running locally (that is, spark-submit without --master), the executor is in the same process as the driver, so you see the printlns. If you are running with --master spark://cluster, then the executors ar running in different process and possibly different nodes. Hence you dont see the printlns in the output of driver process. If you see the the output of executorsin the Spark UI, then you may find those prints.
TD On Mon, Apr 20, 2015 at 5:16 AM, Ankit Patel <patel7...@hotmail.com> wrote: > The code I've written is simple as it just invokes a thread and calls a > store method on the Receiver class. > > I see this code with printlns working fine when I try spark-submit --jars > <jar> --class test.TestCustomReceiver <jar> > > However it does not work with I try the same command above with --master > spark://masterURL > spark-submit --master spark://masterURL --jars <jar> --class > test.TestCustomReceiver <jar> > > I also tried setting the master in the conf that I am created, but that > does not work either. I do not see the onStart println being printed when I > use --master option. Please advice. > > Also, the master I am attaching to has multiple workers across hosts with > many threads available to it. > > The code is pasted below (Classes: TestReviever, TestCustomReceiver): > > > > package test; > import org.apache.spark.storage.StorageLevel; > import org.apache.spark.streaming.receiver.Receiver; > > public class TestReceiver extends Receiver<String> { > > public TestReceiver() { > super(StorageLevel.MEMORY_ONLY()); > System.out.println("Ankit: Created TestReceiver"); > } > > @Override > public void onStart() { > System.out.println("Start TestReceiver"); > new TestThread().start(); > } > public void onStop() {} @SuppressWarnings("unused") > > private class TestThread extends Thread{ > @Override > public void run() { > while(true){ > try{ > sleep( (long) (Math.random() * > 3000)); > }catch(Exception e){ > e.printStackTrace(); > } > store("Time: " + > System.currentTimeMillis()); > } > } > } > > } > > > > package test; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.function.Function; > import org.apache.spark.streaming.Duration; > import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > > public class TestCustomReceiver { > public static void main(String[] args){ > SparkConf conf = new SparkConf(); > JavaStreamingContext ssc = new JavaStreamingContext(conf, > new Duration(1000)); > TestReceiver receiver = new TestReceiver(); > JavaReceiverInputDStream<String> stream = > ssc.receiverStream(receiver); > stream.map(new Function(){ > @Override > public Object call(Object arg0) throws Exception { > System.out.println("Received: " + arg0); > return arg0; > } > }).foreachRDD(new Function(){ > @Override > public Object call(Object arg0) throws Exception { > System.out.println("Total Count: " + > ((org.apache.spark.api.java.JavaRDD)arg0).count()); > return arg0; > } > }); > ssc.start(); > ssc.awaitTermination(); > } > > } > > > Thanks, > Ankit > > ------------------------------ > Date: Mon, 20 Apr 2015 12:22:03 +0530 > Subject: Re: SparkStreaming onStart not being invoked on CustomReceiver > attached to master with multiple workers > From: ak...@sigmoidanalytics.com > To: patel7...@hotmail.com > CC: user@spark.apache.org > > Would be good, if you can paste your custom receiver code and the code > that you used to invoke it. > > Thanks > Best Regards > > On Mon, Apr 20, 2015 at 9:43 AM, Ankit Patel <patel7...@hotmail.com> > wrote: > > > I am experiencing problem with SparkStreaming (Spark 1.2.0), the onStart > method is never called on CustomReceiver when calling spark-submit against > a master node with multiple workers. However, SparkStreaming works fine > with no master node set. Anyone notice this issue? > > >