What are passing as parameters to Spark-submit?
${SPARK_HOME}/bin/spark-submit \ --executor-cores=12 \ Also check http://spark.apache.org/docs/latest/configuration.html Execution Behavior/spark.executor.cores HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com> wrote: > Hi, > > > I am working with Spark 1.6.1, using kafka direct connect for streaming > data. > > Using spark scheduler and 3 slaves. > > Kafka topic is partitioned with a value of 10. > > > The problem i have is, there is only one thread per executor running my > function (logic implementation). > > > Can anybody tell me how can i increase threads per executor to get better > use of CPUs? > > > Thanks > > > Here is the code i have implemented: > > > *Driver*: > > > JavaStreamingContext ssc = new JavaStreamingContext(conf, new > Duration(10000)); > > //prepare streaming from kafka > > Set<String> topicsSet = new > HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(","))); > > Map<String, String> kafkaParams = new HashMap<>(); > > kafkaParams.put("metadata.broker.list", kafkaBrokers); > > kafkaParams.put("group.id", SparkStreamingImpl.class.getName()); > > > JavaPairInputDStream<String, String> inputMessages = > KafkaUtils.createDirectStream( > > ssc, > > String.class, > > String.class, > > StringDecoder.class, > > StringDecoder.class, > > kafkaParams, > > topicsSet > > ); > > > inputMessages.foreachRDD(new ForeachRDDFunction()); > > > *ForeachFunction*: > > > class ForeachFunction implements VoidFunction<Tuple2<String, String>> { > > private static final Counter foreachConcurrent = > ProcessingMetrics.metrics.counter( "foreach-concurrency" ); > > public ForeachFunction() { > > LOG.info("Creating a new ForeachFunction"); > > } > > > public void call(Tuple2<String, String> t) throws Exception { > > foreachConcurrent.inc(); > > LOG.info("processing message [" + t._1() + "]"); > > try { > > Thread.sleep(1000); > > } catch (Exception e) { } > > foreachConcurrent.dec(); > > } > > } > > > *ForeachRDDFunction*: > > > class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String, > String>> { > > private static final Counter foreachRDDConcurrent = > ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" ); > > private ForeachFunction foreachFunction = new ForeachFunction(); > > public ForeachRDDFunction() { > > LOG.info("Creating a new ForeachRDDFunction"); > > } > > > public void call(JavaPairRDD<String, String> t) throws Exception { > > foreachRDDConcurrent.inc(); > > LOG.info("call from inputMessages.foreachRDD with [" + > t.partitions().size() + "] partitions"); > > for (Partition p : t.partitions()) { > > if (p instanceof KafkaRDDPartition){ > > LOG.info("partition [" + p.index() + "] with count [" + > ((KafkaRDDPartition) p).count() + "]"); > > } > > } > > t.foreachAsync(foreachFunction); > > foreachRDDConcurrent.dec(); > > } > > } > > > *The log from driver that tells me my RDD is partitioned to process in > parallel*: > > > [Stage 70:> (3 + 3) / 20][Stage 71:> (0 + 0) / 20][Stage 72:> (0 + 0) / > 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from > inputMessages.foreachRDD with [20] partitions > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15] > > 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0] > > > *The log from one of executors showing exactly one message per second was > processed (only by one thread)*: > > > 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message > [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8] > > 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message > [e267cde2-ffea-4f7a-9934-f32a3b7218cc] > > 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message > [f055fe3c-0f72-4f41-9a31-df544f1e1cd3] > > 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message > [854faaa5-0abe-49a2-b13a-c290a3720b0e] > > 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message > [1bc0a141-b910-45fe-9881-e2066928fbc6] > > 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message > [67fb99c6-1ca1-4dfb-bffe-43b927fdec07] > > 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message > [de7d5934-bab2-4019-917e-c339d864ba18] > > 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message > [e63d7a7e-de32-4527-b8f1-641cfcc8869c] > > 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message > [1ce931ee-b8b1-4645-8a51-2c697bf1513b] > > 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message > [5367f3c1-d66c-4647-bb44-f5eab719031d] > >