Re: Weird worker usage

2015-09-28 Thread Bryan Jeffrey
Nukunj,

No, I'm not calling set w/ master at all.  This ended up being a foolish
configuration problem with my slaves file.

Regards,

Bryan Jeffrey

On Fri, Sep 25, 2015 at 11:20 PM, N B  wrote:

> Bryan,
>
> By any chance, are you calling SparkConf.setMaster("local[*]") inside your
> application code?
>
> Nikunj
>
> On Fri, Sep 25, 2015 at 9:56 AM, Bryan Jeffrey 
> wrote:
>
>> Looking at this further, it appears that my Spark Context is not
>> correctly setting the Master name.  I see the following in logs:
>>
>> 15/09/25 16:45:42 INFO DriverRunner: Launch Command:
>> "/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp"
>> "/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar"
>> "-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING"
>> "-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "-
>> Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077"
>> "-Dspark.driver.supervise=true" "-Dspark.logConf=true"
>> "-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
>> "-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m"
>> "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://
>> sparkWorker@10.0.0.6:48077/user/Worker"
>> "/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
>> "MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker"
>> "kafkaBroker:9092" "--topic" "test" "--numStreams" "9"
>> "--threadParallelism" "9"
>> 15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root
>> 15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root
>> 15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(root); users
>> with modify permissions: Set(root)
>> 15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started
>> 15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on
>> port 59670.
>> 15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp://
>> sparkWorker@10.0.0.6:48077/user/Worker
>> 15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger
>> 15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to
>> akka.tcp://sparkWorker@10.0.0.6:48077/user/Worker
>> 15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory
>> /tmp/sparkcheckpoint does not exist
>> 15/09/25 16:45:45 INFO MainClass: Setting up streaming context with
>> configuration: org.apache.spark.SparkConf@56057cbf and time window 2000
>> ms
>> 15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1
>> 15/09/25 16:45:45 INFO SparkContext: Spark configuration:
>> spark.app.name=MainClass
>> spark.default.parallelism=6
>> spark.driver.supervise=true
>> spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar
>> spark.logConf=true
>> spark.master=local[*]
>> spark.rpc.askTimeout=10
>> spark.streaming.receiver.maxRate=500
>>
>> As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming
>> context still registers the master as local[*].  Any idea why?
>>
>> Thank you,
>>
>> Bryan Jeffrey
>>
>>
>>
>


Re: Weird worker usage

2015-09-26 Thread Akhil Das
That means only a single receiver is doing all the work and hence the data
is local to your N1 machine and hence all tasks are executed there. Now to
get the data to N2, you need to do either a .repartition or set the
StorageLevel MEMORY*_2 where _2 enables the data replication and i guess
that will solve your problem.

Thanks
Best Regards

On Sun, Sep 27, 2015 at 12:50 AM, Akhil Das 
wrote:

> That means only
>
> Thanks
> Best Regards
>
> On Sun, Sep 27, 2015 at 12:07 AM, N B  wrote:
>
>> Hello,
>>
>> Does anyone have an insight into what could be the issue here?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Fri, Sep 25, 2015 at 10:44 AM, N B  wrote:
>>
>>> Hi Akhil,
>>>
>>> I do have 25 partitions being created. I have set
>>> the spark.default.parallelism property to 25. Batch size is 30 seconds and
>>> block interval is 1200 ms which also gives us roughly 25 partitions from
>>> the input stream. I can see 25 partitions being created and used in the
>>> Spark UI also. Its just that those tasks are waiting for cores on N1 to get
>>> free before being scheduled while N2 is sitting idle.
>>>
>>> The cluster configuration is:
>>>
>>> N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node.
>>> N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node.
>>>
>>> for a grand total of 28 cores. But it still does most of the processing
>>> on N1 (divided among the 2 workers running) but almost completely
>>> disregarding N2 until its the final stage where data is being written out
>>> to Elasticsearch. I am not sure I understand the reason behind it not
>>> distributing more partitions to N2 to begin with and use it effectively.
>>> Since there are only 12 cores on N1 and 25 total partitions, shouldn't it
>>> send some of those partitions to N2 as well?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>> On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das 
>>> wrote:
>>>
 Parallel tasks totally depends on the # of partitions that you are
 having, if you are not receiving sufficient partitions (partitions > total
 # cores) then try to do a .repartition.

 Thanks
 Best Regards

 On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:

> Hello all,
>
> I have a Spark streaming application that reads from a Flume Stream,
> does quite a few maps/filters in addition to a few reduceByKeyAndWindow 
> and
> join operations before writing the analyzed output to ElasticSearch inside
> a foreachRDD()...
>
> I recently started to run this on a 2 node cluster (Standalone) with
> the driver program directly submitting to Spark master on the same host.
> The way I have divided the resources is as follows:
>
> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores
> each worker)
> N2: 2 spark workers (16 gb + 8 cores each worker).
>
> The application works just fine but it is underusing N2 completely. It
> seems to use N1 (note that both executors on N1 get used) for all the
> analytics but when it comes to writing to Elasticsearch, it does divide 
> the
> data around into all 4 executors which then write to ES on a separate 
> host.
>
> I am puzzled as to why the data is not being distributed evenly from
> the get go into all 4 executors and why would it only do so in the final
> step of the pipeline which seems counterproductive as well?
>
> CPU usage on N1 is near the peak while on N2 is < 10% of overall
> capacity.
>
> Any help in getting the resources more evenly utilized on N1 and N2 is
> welcome.
>
> Thanks in advance,
> Nikunj
>
>

>>>
>>
>


Re: Weird worker usage

2015-09-26 Thread Akhil Das
That means only

Thanks
Best Regards

On Sun, Sep 27, 2015 at 12:07 AM, N B  wrote:

> Hello,
>
> Does anyone have an insight into what could be the issue here?
>
> Thanks
> Nikunj
>
>
> On Fri, Sep 25, 2015 at 10:44 AM, N B  wrote:
>
>> Hi Akhil,
>>
>> I do have 25 partitions being created. I have set
>> the spark.default.parallelism property to 25. Batch size is 30 seconds and
>> block interval is 1200 ms which also gives us roughly 25 partitions from
>> the input stream. I can see 25 partitions being created and used in the
>> Spark UI also. Its just that those tasks are waiting for cores on N1 to get
>> free before being scheduled while N2 is sitting idle.
>>
>> The cluster configuration is:
>>
>> N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node.
>> N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node.
>>
>> for a grand total of 28 cores. But it still does most of the processing
>> on N1 (divided among the 2 workers running) but almost completely
>> disregarding N2 until its the final stage where data is being written out
>> to Elasticsearch. I am not sure I understand the reason behind it not
>> distributing more partitions to N2 to begin with and use it effectively.
>> Since there are only 12 cores on N1 and 25 total partitions, shouldn't it
>> send some of those partitions to N2 as well?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das 
>> wrote:
>>
>>> Parallel tasks totally depends on the # of partitions that you are
>>> having, if you are not receiving sufficient partitions (partitions > total
>>> # cores) then try to do a .repartition.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:
>>>
 Hello all,

 I have a Spark streaming application that reads from a Flume Stream,
 does quite a few maps/filters in addition to a few reduceByKeyAndWindow and
 join operations before writing the analyzed output to ElasticSearch inside
 a foreachRDD()...

 I recently started to run this on a 2 node cluster (Standalone) with
 the driver program directly submitting to Spark master on the same host.
 The way I have divided the resources is as follows:

 N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores
 each worker)
 N2: 2 spark workers (16 gb + 8 cores each worker).

 The application works just fine but it is underusing N2 completely. It
 seems to use N1 (note that both executors on N1 get used) for all the
 analytics but when it comes to writing to Elasticsearch, it does divide the
 data around into all 4 executors which then write to ES on a separate host.

 I am puzzled as to why the data is not being distributed evenly from
 the get go into all 4 executors and why would it only do so in the final
 step of the pipeline which seems counterproductive as well?

 CPU usage on N1 is near the peak while on N2 is < 10% of overall
 capacity.

 Any help in getting the resources more evenly utilized on N1 and N2 is
 welcome.

 Thanks in advance,
 Nikunj


>>>
>>
>


Re: Weird worker usage

2015-09-26 Thread N B
Hello,

Does anyone have an insight into what could be the issue here?

Thanks
Nikunj


On Fri, Sep 25, 2015 at 10:44 AM, N B  wrote:

> Hi Akhil,
>
> I do have 25 partitions being created. I have set
> the spark.default.parallelism property to 25. Batch size is 30 seconds and
> block interval is 1200 ms which also gives us roughly 25 partitions from
> the input stream. I can see 25 partitions being created and used in the
> Spark UI also. Its just that those tasks are waiting for cores on N1 to get
> free before being scheduled while N2 is sitting idle.
>
> The cluster configuration is:
>
> N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node.
> N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node.
>
> for a grand total of 28 cores. But it still does most of the processing on
> N1 (divided among the 2 workers running) but almost completely disregarding
> N2 until its the final stage where data is being written out to
> Elasticsearch. I am not sure I understand the reason behind it not
> distributing more partitions to N2 to begin with and use it effectively.
> Since there are only 12 cores on N1 and 25 total partitions, shouldn't it
> send some of those partitions to N2 as well?
>
> Thanks
> Nikunj
>
>
> On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das 
> wrote:
>
>> Parallel tasks totally depends on the # of partitions that you are
>> having, if you are not receiving sufficient partitions (partitions > total
>> # cores) then try to do a .repartition.
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:
>>
>>> Hello all,
>>>
>>> I have a Spark streaming application that reads from a Flume Stream,
>>> does quite a few maps/filters in addition to a few reduceByKeyAndWindow and
>>> join operations before writing the analyzed output to ElasticSearch inside
>>> a foreachRDD()...
>>>
>>> I recently started to run this on a 2 node cluster (Standalone) with the
>>> driver program directly submitting to Spark master on the same host. The
>>> way I have divided the resources is as follows:
>>>
>>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
>>> worker)
>>> N2: 2 spark workers (16 gb + 8 cores each worker).
>>>
>>> The application works just fine but it is underusing N2 completely. It
>>> seems to use N1 (note that both executors on N1 get used) for all the
>>> analytics but when it comes to writing to Elasticsearch, it does divide the
>>> data around into all 4 executors which then write to ES on a separate host.
>>>
>>> I am puzzled as to why the data is not being distributed evenly from the
>>> get go into all 4 executors and why would it only do so in the final step
>>> of the pipeline which seems counterproductive as well?
>>>
>>> CPU usage on N1 is near the peak while on N2 is < 10% of overall
>>> capacity.
>>>
>>> Any help in getting the resources more evenly utilized on N1 and N2 is
>>> welcome.
>>>
>>> Thanks in advance,
>>> Nikunj
>>>
>>>
>>
>


Re: Weird worker usage

2015-09-25 Thread N B
Bryan,

By any chance, are you calling SparkConf.setMaster("local[*]") inside your
application code?

Nikunj

On Fri, Sep 25, 2015 at 9:56 AM, Bryan Jeffrey 
wrote:

> Looking at this further, it appears that my Spark Context is not correctly
> setting the Master name.  I see the following in logs:
>
> 15/09/25 16:45:42 INFO DriverRunner: Launch Command:
> "/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp"
> "/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar"
> "-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING"
> "-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "-
> Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077"
> "-Dspark.driver.supervise=true" "-Dspark.logConf=true"
> "-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
> "-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m"
> "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://
> sparkWorker@10.0.0.6:48077/user/Worker"
> "/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
> "MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker"
> "kafkaBroker:9092" "--topic" "test" "--numStreams" "9"
> "--threadParallelism" "9"
> 15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root
> 15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root
> 15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(root); users
> with modify permissions: Set(root)
> 15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started
> 15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on
> port 59670.
> 15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp://
> sparkWorker@10.0.0.6:48077/user/Worker
> 15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger
> 15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to akka.tcp://
> sparkWorker@10.0.0.6:48077/user/Worker
> 15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory
> /tmp/sparkcheckpoint does not exist
> 15/09/25 16:45:45 INFO MainClass: Setting up streaming context with
> configuration: org.apache.spark.SparkConf@56057cbf and time window 2000 ms
> 15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1
> 15/09/25 16:45:45 INFO SparkContext: Spark configuration:
> spark.app.name=MainClass
> spark.default.parallelism=6
> spark.driver.supervise=true
> spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar
> spark.logConf=true
> spark.master=local[*]
> spark.rpc.askTimeout=10
> spark.streaming.receiver.maxRate=500
>
> As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming
> context still registers the master as local[*].  Any idea why?
>
> Thank you,
>
> Bryan Jeffrey
>
>
>


Re: Weird worker usage

2015-09-25 Thread N B
Hi Akhil,

I do have 25 partitions being created. I have set
the spark.default.parallelism property to 25. Batch size is 30 seconds and
block interval is 1200 ms which also gives us roughly 25 partitions from
the input stream. I can see 25 partitions being created and used in the
Spark UI also. Its just that those tasks are waiting for cores on N1 to get
free before being scheduled while N2 is sitting idle.

The cluster configuration is:

N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node.
N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node.

for a grand total of 28 cores. But it still does most of the processing on
N1 (divided among the 2 workers running) but almost completely disregarding
N2 until its the final stage where data is being written out to
Elasticsearch. I am not sure I understand the reason behind it not
distributing more partitions to N2 to begin with and use it effectively.
Since there are only 12 cores on N1 and 25 total partitions, shouldn't it
send some of those partitions to N2 as well?

Thanks
Nikunj


On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das 
wrote:

> Parallel tasks totally depends on the # of partitions that you are having,
> if you are not receiving sufficient partitions (partitions > total # cores)
> then try to do a .repartition.
>
> Thanks
> Best Regards
>
> On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:
>
>> Hello all,
>>
>> I have a Spark streaming application that reads from a Flume Stream, does
>> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
>> operations before writing the analyzed output to ElasticSearch inside a
>> foreachRDD()...
>>
>> I recently started to run this on a 2 node cluster (Standalone) with the
>> driver program directly submitting to Spark master on the same host. The
>> way I have divided the resources is as follows:
>>
>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
>> worker)
>> N2: 2 spark workers (16 gb + 8 cores each worker).
>>
>> The application works just fine but it is underusing N2 completely. It
>> seems to use N1 (note that both executors on N1 get used) for all the
>> analytics but when it comes to writing to Elasticsearch, it does divide the
>> data around into all 4 executors which then write to ES on a separate host.
>>
>> I am puzzled as to why the data is not being distributed evenly from the
>> get go into all 4 executors and why would it only do so in the final step
>> of the pipeline which seems counterproductive as well?
>>
>> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity.
>>
>> Any help in getting the resources more evenly utilized on N1 and N2 is
>> welcome.
>>
>> Thanks in advance,
>> Nikunj
>>
>>
>


Re: Weird worker usage

2015-09-25 Thread Bryan Jeffrey
Looking at this further, it appears that my Spark Context is not correctly
setting the Master name.  I see the following in logs:

15/09/25 16:45:42 INFO DriverRunner: Launch Command:
"/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp"
"/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar"
"-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING"
"-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "-
Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077"
"-Dspark.driver.supervise=true" "-Dspark.logConf=true"
"-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
"-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m"
"org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://
sparkWorker@10.0.0.6:48077/user/Worker"
"/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
"MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker"
"kafkaBroker:9092" "--topic" "test" "--numStreams" "9"
"--threadParallelism" "9"
15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root
15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root
15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root); users
with modify permissions: Set(root)
15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started
15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on port
59670.
15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp://
sparkWorker@10.0.0.6:48077/user/Worker
15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger
15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to akka.tcp://
sparkWorker@10.0.0.6:48077/user/Worker
15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory
/tmp/sparkcheckpoint does not exist
15/09/25 16:45:45 INFO MainClass: Setting up streaming context with
configuration: org.apache.spark.SparkConf@56057cbf and time window 2000 ms
15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1
15/09/25 16:45:45 INFO SparkContext: Spark configuration:
spark.app.name=MainClass
spark.default.parallelism=6
spark.driver.supervise=true
spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar
spark.logConf=true
spark.master=local[*]
spark.rpc.askTimeout=10
spark.streaming.receiver.maxRate=500

As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming
context still registers the master as local[*].  Any idea why?

Thank you,

Bryan Jeffrey


Re: Weird worker usage

2015-09-25 Thread Bryan Jeffrey
I am seeing a similar issue when reading from Kafka.  I have a single Kafka
broker with 1 topic and 10 partitions on a separate machine.  I have a
three-node spark cluster, and verified that all workers are registered with
the master.  I'm initializing Kafka using a similar method to this article:
http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/.
I create 3 InputDStreams and union them together to provide a unified
context. I then repartition this to 6 partitions:

val streams = Range(0, configuration.numStreams)
  .map(x => {
 logger.info("Starting Setup of Kafka Stream #" + x + ": \n\tZookeepers: "
+ zookeepersToUse.mkString(",") + "\n\tBrokers: " +
brokersToUse.mkString(",") + "\n\tTopics: " + topicsToUse.mkString(","))
 KafkaStreamFactory.createKafkaStream(ssc, brokersToUse, zookeepersToUse,
topicsToUse)
}).toArray
val unionStream = ssc.union(streams)
if(configuration.threadParallelism > 0) {
  unionStream.repartition(configuration.threadParallelism)
}
unionStream


I am submitting the job to Spark using the following options:

/spark/spark-1.4.1/bin/spark-submit --deploy-mode client --supervise
--master "spark://sparkserver:7077" --conf spark.logConf=true --conf
spark.default.parallelism=6 --conf spark.streaming.receiver.maxRate=500
--class MainClass "/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
--checkpoint /tmp/sparkcheckpoint --broker kafkaBroker:9092 --topic test
--numStreams 9 --threadParallelism 9

Even when I put a long-running job in the queue, none of the other nodes
are anything but idle.

Am I missing something obvious?

Regards,

Bryan Jeffrey





On Fri, Sep 25, 2015 at 8:28 AM, Akhil Das 
wrote:

> Parallel tasks totally depends on the # of partitions that you are having,
> if you are not receiving sufficient partitions (partitions > total # cores)
> then try to do a .repartition.
>
> Thanks
> Best Regards
>
> On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:
>
>> Hello all,
>>
>> I have a Spark streaming application that reads from a Flume Stream, does
>> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
>> operations before writing the analyzed output to ElasticSearch inside a
>> foreachRDD()...
>>
>> I recently started to run this on a 2 node cluster (Standalone) with the
>> driver program directly submitting to Spark master on the same host. The
>> way I have divided the resources is as follows:
>>
>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
>> worker)
>> N2: 2 spark workers (16 gb + 8 cores each worker).
>>
>> The application works just fine but it is underusing N2 completely. It
>> seems to use N1 (note that both executors on N1 get used) for all the
>> analytics but when it comes to writing to Elasticsearch, it does divide the
>> data around into all 4 executors which then write to ES on a separate host.
>>
>> I am puzzled as to why the data is not being distributed evenly from the
>> get go into all 4 executors and why would it only do so in the final step
>> of the pipeline which seems counterproductive as well?
>>
>> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity.
>>
>> Any help in getting the resources more evenly utilized on N1 and N2 is
>> welcome.
>>
>> Thanks in advance,
>> Nikunj
>>
>>
>


Re: Weird worker usage

2015-09-25 Thread Akhil Das
Parallel tasks totally depends on the # of partitions that you are having,
if you are not receiving sufficient partitions (partitions > total # cores)
then try to do a .repartition.

Thanks
Best Regards

On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:

> Hello all,
>
> I have a Spark streaming application that reads from a Flume Stream, does
> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
> operations before writing the analyzed output to ElasticSearch inside a
> foreachRDD()...
>
> I recently started to run this on a 2 node cluster (Standalone) with the
> driver program directly submitting to Spark master on the same host. The
> way I have divided the resources is as follows:
>
> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
> worker)
> N2: 2 spark workers (16 gb + 8 cores each worker).
>
> The application works just fine but it is underusing N2 completely. It
> seems to use N1 (note that both executors on N1 get used) for all the
> analytics but when it comes to writing to Elasticsearch, it does divide the
> data around into all 4 executors which then write to ES on a separate host.
>
> I am puzzled as to why the data is not being distributed evenly from the
> get go into all 4 executors and why would it only do so in the final step
> of the pipeline which seems counterproductive as well?
>
> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity.
>
> Any help in getting the resources more evenly utilized on N1 and N2 is
> welcome.
>
> Thanks in advance,
> Nikunj
>
>