Spark streaming job not able to launch more number of executors

2020-09-18 Thread Vibhor Banga ( Engineering - VS)
Hi all,

We have a spark streaming job which reads from two kafka topics with 10
partitions each. And we are running the streaming job with 3 concurrent
microbatches. (So total 20 partitions and 3 concurrency)

We have following question:

In our processing DAG, we do a rdd.persist() at one stage, after which we
fork out the DAG into two. Each of the forks has an action (forEach) at the
end. In this case, we are observing that the number of executors is not
exceeding the number of input kafka partitions. Job is not spawning more
than 60 executors (2*10*3). And we see that the tasks from the two actions
and the 3 concurrent microbatches are competing with each other for
resources. So even though the max processing time of a task is 'x', the
overall  processing time of the stage is much greater than 'x'.

Is there a way by which we can ensure that the two forks of the DAG get
processed in parallel by spawning more number of executors?
(We have not put any cap of maxExecutors)

Following are the job configurations:
spark.dynamicAllocation.enabled: true
spark.dynamicAllocation.minExecutors: NOT_SET

Please let us know if you have any ideas that can be useful here.

Thanks,
-Vibhor

-- 



*-*


*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*

 

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*

 

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*


_-_



Writing data to HBase using Spark

2014-06-10 Thread Vibhor Banga
Hi,

I am reading data from a HBase table to RDD and then using foreach on that
RDD I am doing some processing on every Result of HBase table. After this
processing I want to store the processed data back to another HBase table.

How can I do that ? If I use standard Hadoop and HBase classes to write
data to HBase I fall into serialization issues.

How should I write data to HBase in this case?

Thanks,
-Vibhor


Re: Using Spark on Data size larger than Memory size

2014-06-07 Thread Vibhor Banga
Aaron, Thank You for your response and clarifying things.

-Vibhor


On Sun, Jun 1, 2014 at 11:40 AM, Aaron Davidson ilike...@gmail.com wrote:

 There is no fundamental issue if you're running on data that is larger
 than cluster memory size. Many operations can stream data through, and thus
 memory usage is independent of input data size. Certain operations require
 an entire *partition* (not dataset) to fit in memory, but there are not
 many instances of this left (sorting comes to mind, and this is being
 worked on).

 In general, one problem with Spark today is that you *can* OOM under
 certain configurations, and it's possible you'll need to change from the
 default configuration if you're using doing very memory-intensive jobs.
 However, there are very few cases where Spark would simply fail as a matter
 of course *-- *for instance, you can always increase the number of
 partitions to decrease the size of any given one. or repartition data to
 eliminate skew.

 Regarding impact on performance, as Mayur said, there may absolutely be an
 impact depending on your jobs. If you're doing a join on a very large
 amount of data with few partitions, then we'll have to spill to disk. If
 you can't cache your working set of data in memory, you will also see a
 performance degradation. Spark enables the use of memory to make things
 fast, but if you just don't have enough memory, it won't be terribly fast.


 On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 Clearly thr will be impact on performance but frankly depends on what you
 are trying to achieve with the dataset.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Some inputs will be really helpful.

 Thanks,
 -Vibhor


 On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Hi all,

 I am planning to use spark with HBase, where I generate RDD by reading
 data from HBase Table.

 I want to know that in the case when the size of HBase Table grows
 larger than the size of RAM available in the cluster, will the application
 fail, or will there be an impact in performance ?

 Any thoughts in this direction will be helpful and are welcome.

 Thanks,
 -Vibhor




 --
 Vibhor Banga
 Software Development Engineer
 Flipkart Internet Pvt. Ltd., Bangalore






Serialization problem in Spark

2014-06-05 Thread Vibhor Banga
Hi,

I am trying to do something like following in Spark:

JavaPairRDDbyte[], MyObject eventRDD = hBaseRDD.map(new
PairFunctionTuple2ImmutableBytesWritable, Result, byte[], MyObject () {
@Override
public Tuple2byte[], MyObject 
call(Tuple2ImmutableBytesWritable, Result
immutableBytesWritableResultTuple2) throws Exception {
return new
Tuple2byte[], MyObject (immutableBytesWritableResultTuple2._1.get(),
MyClass.get(immutableBytesWritableResultTuple2._2));
}
});

eventRDD.foreach(new VoidFunctionTuple2byte[], Event() {
@Override
public void call(Tuple2byte[], Event eventTuple2) throws
Exception {

processForEvent(eventTuple2._2);
}
});


processForEvent() function flow contains some processing and ultimately
writing to HBase Table. But I am getting serialisation issues with Hadoop
and HBase inbuilt classes. How do I solve this ? Does using Kyro
Serialisation help in this case ?

Thanks,
-Vibhor


Re: Serialization problem in Spark

2014-06-05 Thread Vibhor Banga
Any inputs on this will be helpful.

Thanks,
-Vibhor


On Thu, Jun 5, 2014 at 3:41 PM, Vibhor Banga vibhorba...@gmail.com wrote:

 Hi,

 I am trying to do something like following in Spark:

 JavaPairRDDbyte[], MyObject eventRDD = hBaseRDD.map(new
 PairFunctionTuple2ImmutableBytesWritable, Result, byte[], MyObject () {
 @Override
 public Tuple2byte[], MyObject 
 call(Tuple2ImmutableBytesWritable, Result
 immutableBytesWritableResultTuple2) throws Exception {
 return new
 Tuple2byte[], MyObject (immutableBytesWritableResultTuple2._1.get(),
 MyClass.get(immutableBytesWritableResultTuple2._2));
 }
 });

 eventRDD.foreach(new VoidFunctionTuple2byte[], Event() {
 @Override
 public void call(Tuple2byte[], Event eventTuple2) throws
 Exception {

 processForEvent(eventTuple2._2);
 }
 });


 processForEvent() function flow contains some processing and ultimately
 writing to HBase Table. But I am getting serialisation issues with Hadoop
 and HBase inbuilt classes. How do I solve this ? Does using Kyro
 Serialisation help in this case ?

 Thanks,
 -Vibhor




-- 
Vibhor Banga
Software Development Engineer
Flipkart Internet Pvt. Ltd., Bangalore


Re: Using Spark on Data size larger than Memory size

2014-05-31 Thread Vibhor Banga
Some inputs will be really helpful.

Thanks,
-Vibhor


On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com wrote:

 Hi all,

 I am planning to use spark with HBase, where I generate RDD by reading
 data from HBase Table.

 I want to know that in the case when the size of HBase Table grows larger
 than the size of RAM available in the cluster, will the application fail,
 or will there be an impact in performance ?

 Any thoughts in this direction will be helpful and are welcome.

 Thanks,
 -Vibhor




-- 
Vibhor Banga
Software Development Engineer
Flipkart Internet Pvt. Ltd., Bangalore


Using Spark on Data size larger than Memory size

2014-05-30 Thread Vibhor Banga
Hi all,

I am planning to use spark with HBase, where I generate RDD by reading data
from HBase Table.

I want to know that in the case when the size of HBase Table grows larger
than the size of RAM available in the cluster, will the application fail,
or will there be an impact in performance ?

Any thoughts in this direction will be helpful and are welcome.

Thanks,
-Vibhor


Re: Problem using Spark with Hbase

2014-05-30 Thread Vibhor Banga
Thanks Mayur for the reply.

Actually issue was the I was running Spark application on hadoop-2.2.0 and
hbase version there was 0.95.2.

But spark by default gets build by an older hbase version. So I had to
build spark again with hbase version as 0.95.2 in spark build file. And it
worked.

Thanks,
-Vibhor


On Wed, May 28, 2014 at 11:34 PM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 Try this..

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, May 28, 2014 at 7:40 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Any one who has used spark this way or has faced similar issue, please
 help.

 Thanks,
 -Vibhor


 On Wed, May 28, 2014 at 6:03 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Hi all,

 I am facing issues while using spark with HBase. I am getting
 NullPointerException at org.apache.hadoop.hbase.TableName.valueOf
 (TableName.java:288)

 Can someone please help to resolve this issue. What am I missing ?


 I am using following snippet of code -

 Configuration config = HBaseConfiguration.create();

 config.set(hbase.zookeeper.znode.parent, hostname1);
 config.set(hbase.zookeeper.quorum,hostname1);
 config.set(hbase.zookeeper.property.clientPort,2181);
 config.set(hbase.master, hostname1:
 config.set(fs.defaultFS,hdfs://hostname1/);
 config.set(dfs.namenode.rpc-address,hostname1:8020);

 config.set(TableInputFormat.INPUT_TABLE, tableName);

JavaSparkContext ctx = new JavaSparkContext(args[0], Simple,
  System.getenv(sparkHome),
 JavaSparkContext.jarOfClass(Simple.class));

JavaPairRDDImmutableBytesWritable, Result hBaseRDD
 = ctx.newAPIHadoopRDD( config, TableInputFormat.class,
 ImmutableBytesWritable.class, Result.class);

   MapImmutableBytesWritable, Result rddMap =
 hBaseRDD.collectAsMap();


 But when I go to the spark cluster and check the logs, I see following
 error -

 INFO NewHadoopRDD: Input split: w3-target1.nm.flipkart.com:,
 14/05/28 16:48:51 ERROR TableInputFormat: java.lang.NullPointerException
 at org.apache.hadoop.hbase.TableName.valueOf(TableName.java:288)
 at org.apache.hadoop.hbase.client.HTable.init(HTable.java:154)
 at 
 org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:99)
 at 
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:92)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at 
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
 at 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
 at 
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Thanks,

 -Vibhor








-- 
Vibhor Banga
Software Development Engineer
Flipkart Internet Pvt. Ltd., Bangalore


Problem using Spark with Hbase

2014-05-28 Thread Vibhor Banga
Hi all,

I am facing issues while using spark with HBase. I am getting
NullPointerException at org.apache.hadoop.hbase.TableName.valueOf
(TableName.java:288)

Can someone please help to resolve this issue. What am I missing ?


I am using following snippet of code -

Configuration config = HBaseConfiguration.create();

config.set(hbase.zookeeper.znode.parent, hostname1);
config.set(hbase.zookeeper.quorum,hostname1);
config.set(hbase.zookeeper.property.clientPort,2181);
config.set(hbase.master, hostname1:
config.set(fs.defaultFS,hdfs://hostname1/);
config.set(dfs.namenode.rpc-address,hostname1:8020);

config.set(TableInputFormat.INPUT_TABLE, tableName);

   JavaSparkContext ctx = new JavaSparkContext(args[0], Simple,
 System.getenv(sparkHome),
JavaSparkContext.jarOfClass(Simple.class));

   JavaPairRDDImmutableBytesWritable, Result hBaseRDD
= ctx.newAPIHadoopRDD( config, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);

  MapImmutableBytesWritable, Result rddMap = hBaseRDD.collectAsMap();


But when I go to the spark cluster and check the logs, I see following
error -

INFO NewHadoopRDD: Input split: w3-target1.nm.flipkart.com:,
14/05/28 16:48:51 ERROR TableInputFormat: java.lang.NullPointerException
at org.apache.hadoop.hbase.TableName.valueOf(TableName.java:288)
at org.apache.hadoop.hbase.client.HTable.init(HTable.java:154)
at 
org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:99)
at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:92)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Thanks,

-Vibhor


Re: Problem using Spark with Hbase

2014-05-28 Thread Vibhor Banga
Any one who has used spark this way or has faced similar issue, please help.

Thanks,
-Vibhor

On Wed, May 28, 2014 at 6:03 PM, Vibhor Banga vibhorba...@gmail.com wrote:

 Hi all,

 I am facing issues while using spark with HBase. I am getting
 NullPointerException at org.apache.hadoop.hbase.TableName.valueOf
 (TableName.java:288)

 Can someone please help to resolve this issue. What am I missing ?


 I am using following snippet of code -

 Configuration config = HBaseConfiguration.create();

 config.set(hbase.zookeeper.znode.parent, hostname1);
 config.set(hbase.zookeeper.quorum,hostname1);
 config.set(hbase.zookeeper.property.clientPort,2181);
 config.set(hbase.master, hostname1:
 config.set(fs.defaultFS,hdfs://hostname1/);
 config.set(dfs.namenode.rpc-address,hostname1:8020);

 config.set(TableInputFormat.INPUT_TABLE, tableName);

JavaSparkContext ctx = new JavaSparkContext(args[0], Simple,
  System.getenv(sparkHome),
 JavaSparkContext.jarOfClass(Simple.class));

JavaPairRDDImmutableBytesWritable, Result hBaseRDD
 = ctx.newAPIHadoopRDD( config, TableInputFormat.class,
 ImmutableBytesWritable.class, Result.class);

   MapImmutableBytesWritable, Result rddMap = hBaseRDD.collectAsMap();


 But when I go to the spark cluster and check the logs, I see following
 error -

 INFO NewHadoopRDD: Input split: w3-target1.nm.flipkart.com:,
 14/05/28 16:48:51 ERROR TableInputFormat: java.lang.NullPointerException
   at org.apache.hadoop.hbase.TableName.valueOf(TableName.java:288)
   at org.apache.hadoop.hbase.client.HTable.init(HTable.java:154)
   at 
 org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:99)
   at 
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:92)
   at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:84)
   at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:48)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
   at org.apache.spark.scheduler.Task.run(Task.scala:53)
   at 
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
   at 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
   at 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
   at java.security.AccessController.doPrivileged(Native Method)
   at javax.security.auth.Subject.doAs(Subject.java:415)
   at 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
   at 
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)

 Thanks,

 -Vibhor