Re: Weird worker usage
Nukunj, No, I'm not calling set w/ master at all. This ended up being a foolish configuration problem with my slaves file. Regards, Bryan Jeffrey On Fri, Sep 25, 2015 at 11:20 PM, N B wrote: > Bryan, > > By any chance, are you calling SparkConf.setMaster("local[*]") inside your > application code? > > Nikunj > > On Fri, Sep 25, 2015 at 9:56 AM, Bryan Jeffrey > wrote: > >> Looking at this further, it appears that my Spark Context is not >> correctly setting the Master name. I see the following in logs: >> >> 15/09/25 16:45:42 INFO DriverRunner: Launch Command: >> "/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp" >> "/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar" >> "-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING" >> "-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "- >> Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077" >> "-Dspark.driver.supervise=true" "-Dspark.logConf=true" >> "-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar" >> "-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m" >> "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp:// >> sparkWorker@10.0.0.6:48077/user/Worker" >> "/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar" >> "MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker" >> "kafkaBroker:9092" "--topic" "test" "--numStreams" "9" >> "--threadParallelism" "9" >> 15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop >> library for your platform... using builtin-java classes where applicable >> 15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root >> 15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root >> 15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication >> disabled; ui acls disabled; users with view permissions: Set(root); users >> with modify permissions: Set(root) >> 15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started >> 15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on >> port 59670. >> 15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp:// >> sparkWorker@10.0.0.6:48077/user/Worker >> 15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger >> 15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to >> akka.tcp://sparkWorker@10.0.0.6:48077/user/Worker >> 15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory >> /tmp/sparkcheckpoint does not exist >> 15/09/25 16:45:45 INFO MainClass: Setting up streaming context with >> configuration: org.apache.spark.SparkConf@56057cbf and time window 2000 >> ms >> 15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1 >> 15/09/25 16:45:45 INFO SparkContext: Spark configuration: >> spark.app.name=MainClass >> spark.default.parallelism=6 >> spark.driver.supervise=true >> spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar >> spark.logConf=true >> spark.master=local[*] >> spark.rpc.askTimeout=10 >> spark.streaming.receiver.maxRate=500 >> >> As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming >> context still registers the master as local[*]. Any idea why? >> >> Thank you, >> >> Bryan Jeffrey >> >> >> >
Re: Weird worker usage
That means only a single receiver is doing all the work and hence the data is local to your N1 machine and hence all tasks are executed there. Now to get the data to N2, you need to do either a .repartition or set the StorageLevel MEMORY*_2 where _2 enables the data replication and i guess that will solve your problem. Thanks Best Regards On Sun, Sep 27, 2015 at 12:50 AM, Akhil Das wrote: > That means only > > Thanks > Best Regards > > On Sun, Sep 27, 2015 at 12:07 AM, N B wrote: > >> Hello, >> >> Does anyone have an insight into what could be the issue here? >> >> Thanks >> Nikunj >> >> >> On Fri, Sep 25, 2015 at 10:44 AM, N B wrote: >> >>> Hi Akhil, >>> >>> I do have 25 partitions being created. I have set >>> the spark.default.parallelism property to 25. Batch size is 30 seconds and >>> block interval is 1200 ms which also gives us roughly 25 partitions from >>> the input stream. I can see 25 partitions being created and used in the >>> Spark UI also. Its just that those tasks are waiting for cores on N1 to get >>> free before being scheduled while N2 is sitting idle. >>> >>> The cluster configuration is: >>> >>> N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node. >>> N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node. >>> >>> for a grand total of 28 cores. But it still does most of the processing >>> on N1 (divided among the 2 workers running) but almost completely >>> disregarding N2 until its the final stage where data is being written out >>> to Elasticsearch. I am not sure I understand the reason behind it not >>> distributing more partitions to N2 to begin with and use it effectively. >>> Since there are only 12 cores on N1 and 25 total partitions, shouldn't it >>> send some of those partitions to N2 as well? >>> >>> Thanks >>> Nikunj >>> >>> >>> On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das >>> wrote: >>> Parallel tasks totally depends on the # of partitions that you are having, if you are not receiving sufficient partitions (partitions > total # cores) then try to do a .repartition. Thanks Best Regards On Fri, Sep 25, 2015 at 1:44 PM, N B wrote: > Hello all, > > I have a Spark streaming application that reads from a Flume Stream, > does quite a few maps/filters in addition to a few reduceByKeyAndWindow > and > join operations before writing the analyzed output to ElasticSearch inside > a foreachRDD()... > > I recently started to run this on a 2 node cluster (Standalone) with > the driver program directly submitting to Spark master on the same host. > The way I have divided the resources is as follows: > > N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores > each worker) > N2: 2 spark workers (16 gb + 8 cores each worker). > > The application works just fine but it is underusing N2 completely. It > seems to use N1 (note that both executors on N1 get used) for all the > analytics but when it comes to writing to Elasticsearch, it does divide > the > data around into all 4 executors which then write to ES on a separate > host. > > I am puzzled as to why the data is not being distributed evenly from > the get go into all 4 executors and why would it only do so in the final > step of the pipeline which seems counterproductive as well? > > CPU usage on N1 is near the peak while on N2 is < 10% of overall > capacity. > > Any help in getting the resources more evenly utilized on N1 and N2 is > welcome. > > Thanks in advance, > Nikunj > > >>> >> >
Re: Weird worker usage
That means only Thanks Best Regards On Sun, Sep 27, 2015 at 12:07 AM, N B wrote: > Hello, > > Does anyone have an insight into what could be the issue here? > > Thanks > Nikunj > > > On Fri, Sep 25, 2015 at 10:44 AM, N B wrote: > >> Hi Akhil, >> >> I do have 25 partitions being created. I have set >> the spark.default.parallelism property to 25. Batch size is 30 seconds and >> block interval is 1200 ms which also gives us roughly 25 partitions from >> the input stream. I can see 25 partitions being created and used in the >> Spark UI also. Its just that those tasks are waiting for cores on N1 to get >> free before being scheduled while N2 is sitting idle. >> >> The cluster configuration is: >> >> N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node. >> N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node. >> >> for a grand total of 28 cores. But it still does most of the processing >> on N1 (divided among the 2 workers running) but almost completely >> disregarding N2 until its the final stage where data is being written out >> to Elasticsearch. I am not sure I understand the reason behind it not >> distributing more partitions to N2 to begin with and use it effectively. >> Since there are only 12 cores on N1 and 25 total partitions, shouldn't it >> send some of those partitions to N2 as well? >> >> Thanks >> Nikunj >> >> >> On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das >> wrote: >> >>> Parallel tasks totally depends on the # of partitions that you are >>> having, if you are not receiving sufficient partitions (partitions > total >>> # cores) then try to do a .repartition. >>> >>> Thanks >>> Best Regards >>> >>> On Fri, Sep 25, 2015 at 1:44 PM, N B wrote: >>> Hello all, I have a Spark streaming application that reads from a Flume Stream, does quite a few maps/filters in addition to a few reduceByKeyAndWindow and join operations before writing the analyzed output to ElasticSearch inside a foreachRDD()... I recently started to run this on a 2 node cluster (Standalone) with the driver program directly submitting to Spark master on the same host. The way I have divided the resources is as follows: N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each worker) N2: 2 spark workers (16 gb + 8 cores each worker). The application works just fine but it is underusing N2 completely. It seems to use N1 (note that both executors on N1 get used) for all the analytics but when it comes to writing to Elasticsearch, it does divide the data around into all 4 executors which then write to ES on a separate host. I am puzzled as to why the data is not being distributed evenly from the get go into all 4 executors and why would it only do so in the final step of the pipeline which seems counterproductive as well? CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity. Any help in getting the resources more evenly utilized on N1 and N2 is welcome. Thanks in advance, Nikunj >>> >> >
Re: Weird worker usage
Hello, Does anyone have an insight into what could be the issue here? Thanks Nikunj On Fri, Sep 25, 2015 at 10:44 AM, N B wrote: > Hi Akhil, > > I do have 25 partitions being created. I have set > the spark.default.parallelism property to 25. Batch size is 30 seconds and > block interval is 1200 ms which also gives us roughly 25 partitions from > the input stream. I can see 25 partitions being created and used in the > Spark UI also. Its just that those tasks are waiting for cores on N1 to get > free before being scheduled while N2 is sitting idle. > > The cluster configuration is: > > N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node. > N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node. > > for a grand total of 28 cores. But it still does most of the processing on > N1 (divided among the 2 workers running) but almost completely disregarding > N2 until its the final stage where data is being written out to > Elasticsearch. I am not sure I understand the reason behind it not > distributing more partitions to N2 to begin with and use it effectively. > Since there are only 12 cores on N1 and 25 total partitions, shouldn't it > send some of those partitions to N2 as well? > > Thanks > Nikunj > > > On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das > wrote: > >> Parallel tasks totally depends on the # of partitions that you are >> having, if you are not receiving sufficient partitions (partitions > total >> # cores) then try to do a .repartition. >> >> Thanks >> Best Regards >> >> On Fri, Sep 25, 2015 at 1:44 PM, N B wrote: >> >>> Hello all, >>> >>> I have a Spark streaming application that reads from a Flume Stream, >>> does quite a few maps/filters in addition to a few reduceByKeyAndWindow and >>> join operations before writing the analyzed output to ElasticSearch inside >>> a foreachRDD()... >>> >>> I recently started to run this on a 2 node cluster (Standalone) with the >>> driver program directly submitting to Spark master on the same host. The >>> way I have divided the resources is as follows: >>> >>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each >>> worker) >>> N2: 2 spark workers (16 gb + 8 cores each worker). >>> >>> The application works just fine but it is underusing N2 completely. It >>> seems to use N1 (note that both executors on N1 get used) for all the >>> analytics but when it comes to writing to Elasticsearch, it does divide the >>> data around into all 4 executors which then write to ES on a separate host. >>> >>> I am puzzled as to why the data is not being distributed evenly from the >>> get go into all 4 executors and why would it only do so in the final step >>> of the pipeline which seems counterproductive as well? >>> >>> CPU usage on N1 is near the peak while on N2 is < 10% of overall >>> capacity. >>> >>> Any help in getting the resources more evenly utilized on N1 and N2 is >>> welcome. >>> >>> Thanks in advance, >>> Nikunj >>> >>> >> >
Re: Weird worker usage
Bryan, By any chance, are you calling SparkConf.setMaster("local[*]") inside your application code? Nikunj On Fri, Sep 25, 2015 at 9:56 AM, Bryan Jeffrey wrote: > Looking at this further, it appears that my Spark Context is not correctly > setting the Master name. I see the following in logs: > > 15/09/25 16:45:42 INFO DriverRunner: Launch Command: > "/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp" > "/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar" > "-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING" > "-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "- > Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077" > "-Dspark.driver.supervise=true" "-Dspark.logConf=true" > "-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar" > "-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m" > "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp:// > sparkWorker@10.0.0.6:48077/user/Worker" > "/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar" > "MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker" > "kafkaBroker:9092" "--topic" "test" "--numStreams" "9" > "--threadParallelism" "9" > 15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root > 15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root > 15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(root); users > with modify permissions: Set(root) > 15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started > 15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on > port 59670. > 15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp:// > sparkWorker@10.0.0.6:48077/user/Worker > 15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger > 15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to akka.tcp:// > sparkWorker@10.0.0.6:48077/user/Worker > 15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory > /tmp/sparkcheckpoint does not exist > 15/09/25 16:45:45 INFO MainClass: Setting up streaming context with > configuration: org.apache.spark.SparkConf@56057cbf and time window 2000 ms > 15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1 > 15/09/25 16:45:45 INFO SparkContext: Spark configuration: > spark.app.name=MainClass > spark.default.parallelism=6 > spark.driver.supervise=true > spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar > spark.logConf=true > spark.master=local[*] > spark.rpc.askTimeout=10 > spark.streaming.receiver.maxRate=500 > > As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming > context still registers the master as local[*]. Any idea why? > > Thank you, > > Bryan Jeffrey > > >
Re: Weird worker usage
Hi Akhil, I do have 25 partitions being created. I have set the spark.default.parallelism property to 25. Batch size is 30 seconds and block interval is 1200 ms which also gives us roughly 25 partitions from the input stream. I can see 25 partitions being created and used in the Spark UI also. Its just that those tasks are waiting for cores on N1 to get free before being scheduled while N2 is sitting idle. The cluster configuration is: N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node. N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node. for a grand total of 28 cores. But it still does most of the processing on N1 (divided among the 2 workers running) but almost completely disregarding N2 until its the final stage where data is being written out to Elasticsearch. I am not sure I understand the reason behind it not distributing more partitions to N2 to begin with and use it effectively. Since there are only 12 cores on N1 and 25 total partitions, shouldn't it send some of those partitions to N2 as well? Thanks Nikunj On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das wrote: > Parallel tasks totally depends on the # of partitions that you are having, > if you are not receiving sufficient partitions (partitions > total # cores) > then try to do a .repartition. > > Thanks > Best Regards > > On Fri, Sep 25, 2015 at 1:44 PM, N B wrote: > >> Hello all, >> >> I have a Spark streaming application that reads from a Flume Stream, does >> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join >> operations before writing the analyzed output to ElasticSearch inside a >> foreachRDD()... >> >> I recently started to run this on a 2 node cluster (Standalone) with the >> driver program directly submitting to Spark master on the same host. The >> way I have divided the resources is as follows: >> >> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each >> worker) >> N2: 2 spark workers (16 gb + 8 cores each worker). >> >> The application works just fine but it is underusing N2 completely. It >> seems to use N1 (note that both executors on N1 get used) for all the >> analytics but when it comes to writing to Elasticsearch, it does divide the >> data around into all 4 executors which then write to ES on a separate host. >> >> I am puzzled as to why the data is not being distributed evenly from the >> get go into all 4 executors and why would it only do so in the final step >> of the pipeline which seems counterproductive as well? >> >> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity. >> >> Any help in getting the resources more evenly utilized on N1 and N2 is >> welcome. >> >> Thanks in advance, >> Nikunj >> >> >
Re: Weird worker usage
Looking at this further, it appears that my Spark Context is not correctly setting the Master name. I see the following in logs: 15/09/25 16:45:42 INFO DriverRunner: Launch Command: "/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp" "/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar" "-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING" "-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "- Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077" "-Dspark.driver.supervise=true" "-Dspark.logConf=true" "-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar" "-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m" "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp:// sparkWorker@10.0.0.6:48077/user/Worker" "/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar" "MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker" "kafkaBroker:9092" "--topic" "test" "--numStreams" "9" "--threadParallelism" "9" 15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root 15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root 15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started 15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on port 59670. 15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp:// sparkWorker@10.0.0.6:48077/user/Worker 15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger 15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to akka.tcp:// sparkWorker@10.0.0.6:48077/user/Worker 15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory /tmp/sparkcheckpoint does not exist 15/09/25 16:45:45 INFO MainClass: Setting up streaming context with configuration: org.apache.spark.SparkConf@56057cbf and time window 2000 ms 15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1 15/09/25 16:45:45 INFO SparkContext: Spark configuration: spark.app.name=MainClass spark.default.parallelism=6 spark.driver.supervise=true spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar spark.logConf=true spark.master=local[*] spark.rpc.askTimeout=10 spark.streaming.receiver.maxRate=500 As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming context still registers the master as local[*]. Any idea why? Thank you, Bryan Jeffrey
Re: Weird worker usage
I am seeing a similar issue when reading from Kafka. I have a single Kafka broker with 1 topic and 10 partitions on a separate machine. I have a three-node spark cluster, and verified that all workers are registered with the master. I'm initializing Kafka using a similar method to this article: http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/. I create 3 InputDStreams and union them together to provide a unified context. I then repartition this to 6 partitions: val streams = Range(0, configuration.numStreams) .map(x => { logger.info("Starting Setup of Kafka Stream #" + x + ": \n\tZookeepers: " + zookeepersToUse.mkString(",") + "\n\tBrokers: " + brokersToUse.mkString(",") + "\n\tTopics: " + topicsToUse.mkString(",")) KafkaStreamFactory.createKafkaStream(ssc, brokersToUse, zookeepersToUse, topicsToUse) }).toArray val unionStream = ssc.union(streams) if(configuration.threadParallelism > 0) { unionStream.repartition(configuration.threadParallelism) } unionStream I am submitting the job to Spark using the following options: /spark/spark-1.4.1/bin/spark-submit --deploy-mode client --supervise --master "spark://sparkserver:7077" --conf spark.logConf=true --conf spark.default.parallelism=6 --conf spark.streaming.receiver.maxRate=500 --class MainClass "/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar" --checkpoint /tmp/sparkcheckpoint --broker kafkaBroker:9092 --topic test --numStreams 9 --threadParallelism 9 Even when I put a long-running job in the queue, none of the other nodes are anything but idle. Am I missing something obvious? Regards, Bryan Jeffrey On Fri, Sep 25, 2015 at 8:28 AM, Akhil Das wrote: > Parallel tasks totally depends on the # of partitions that you are having, > if you are not receiving sufficient partitions (partitions > total # cores) > then try to do a .repartition. > > Thanks > Best Regards > > On Fri, Sep 25, 2015 at 1:44 PM, N B wrote: > >> Hello all, >> >> I have a Spark streaming application that reads from a Flume Stream, does >> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join >> operations before writing the analyzed output to ElasticSearch inside a >> foreachRDD()... >> >> I recently started to run this on a 2 node cluster (Standalone) with the >> driver program directly submitting to Spark master on the same host. The >> way I have divided the resources is as follows: >> >> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each >> worker) >> N2: 2 spark workers (16 gb + 8 cores each worker). >> >> The application works just fine but it is underusing N2 completely. It >> seems to use N1 (note that both executors on N1 get used) for all the >> analytics but when it comes to writing to Elasticsearch, it does divide the >> data around into all 4 executors which then write to ES on a separate host. >> >> I am puzzled as to why the data is not being distributed evenly from the >> get go into all 4 executors and why would it only do so in the final step >> of the pipeline which seems counterproductive as well? >> >> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity. >> >> Any help in getting the resources more evenly utilized on N1 and N2 is >> welcome. >> >> Thanks in advance, >> Nikunj >> >> >
Re: Weird worker usage
Parallel tasks totally depends on the # of partitions that you are having, if you are not receiving sufficient partitions (partitions > total # cores) then try to do a .repartition. Thanks Best Regards On Fri, Sep 25, 2015 at 1:44 PM, N B wrote: > Hello all, > > I have a Spark streaming application that reads from a Flume Stream, does > quite a few maps/filters in addition to a few reduceByKeyAndWindow and join > operations before writing the analyzed output to ElasticSearch inside a > foreachRDD()... > > I recently started to run this on a 2 node cluster (Standalone) with the > driver program directly submitting to Spark master on the same host. The > way I have divided the resources is as follows: > > N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each > worker) > N2: 2 spark workers (16 gb + 8 cores each worker). > > The application works just fine but it is underusing N2 completely. It > seems to use N1 (note that both executors on N1 get used) for all the > analytics but when it comes to writing to Elasticsearch, it does divide the > data around into all 4 executors which then write to ES on a separate host. > > I am puzzled as to why the data is not being distributed evenly from the > get go into all 4 executors and why would it only do so in the final step > of the pipeline which seems counterproductive as well? > > CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity. > > Any help in getting the resources more evenly utilized on N1 and N2 is > welcome. > > Thanks in advance, > Nikunj > >