RE: Spark Tasks on second node never return in Yarn when I have more than 1 task node

2015-11-24 Thread Shuai Zheng
Hi All,

 

Hi Just an update on this case.

 

I try many different combination on settings (and I just upgrade to latest EMR 
4.2.0 with Spark 1.5.2).

 

I just found out that the problem is from:

 

spark-submit --deploy-mode client --executor-cores=24 --driver-memory=5G 
--executor-memory=45G

 

If I set the –executor-cores=20 (or anything less than 20, there is no issue). 

 

This is a quite interesting case, because the instance (C3*8xlarge) has 32 
virtual core and can run without any issue with one task .

 

So I guess the issue should come from:

1, connection limit from EC2 instance on EMR to S3 (this reason doesn’t make 
enough sense to me, I will contact EMR support to clarify)

2, some library packed in the jar cause this limit? (also not very reasonable).

 

Report here in case anyone face similar issue.

 

Regards,

 

Shuai

 

From: Jonathan Kelly [mailto:jonathaka...@gmail.com] 
Sent: Thursday, November 19, 2015 6:54 PM
To: Shuai Zheng
Cc: user
Subject: Re: Spark Tasks on second node never return in Yarn when I have more 
than 1 task node

 

I don't know if this actually has anything to do with why your job is hanging, 
but since you are using EMR you should probably not set those fs.s3 properties 
but rather let it use EMRFS, EMR's optimized Hadoop FileSystem implementation 
for interacting with S3. One benefit is that it will automatically pick up your 
AWS credentials from your EC2 instance role rather than you having to configure 
them manually (since doing so is insecure because you have to get the secret 
access key onto your instance).

 

If simply making that change does not fix the issue, a jstack of the hung 
process would help you figure out what it is doing. You should also look at the 
YARN container logs (which automatically get uploaded to your S3 logs bucket if 
you have this enabled).

 

~ Jonathan

 

On Thu, Nov 19, 2015 at 1:32 PM, Shuai Zheng <szheng.c...@gmail.com> wrote:

Hi All,

 

I face a very weird case. I have already simplify the scenario to the most so 
everyone can replay the scenario. 

 

My env:

 

AWS EMR 4.1.0, Spark1.5

 

My code can run without any problem when I run it in a local mode, and it has 
no problem when it run on a EMR cluster with one master and one task node. 

 

But when I try to run a multiple node (more than 1 task node, which means 3 
nodes cluster), the tasks will never return from one of it. 

 

The log as below:

 

15/11/19 21:19:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 
ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 
ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 
ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)

 

So you can see the task will alternatively submitted to two instances, one is 
ip-10-165-121-188 and another is ip-10-155-160-147.

And later only the tasks runs on the ip-10-165-121-188.ec2 will finish will 
always just wait there, ip-10-155-160-147.ec2 never return.

 

The data and code has been tested in local mode, single spark cluster mode, so 
it should not be an issue on logic or data.

 

And I have attached my test case here (I believe it is simple enough and no any 
business logic is involved):

 

   public void createSiteGridExposure2() {

  JavaSparkContext ctx = this.createSparkContextTest("Test");

  ctx.textFile(siteEncodeLocation).flatMapToPair(new 
PairFlatMapFunction<String, String, String>() {

 @Override

 public Iterable<Tuple2<String, String>> call(String line) 
throws Exception {

   List<Tuple2<String, String>> res = new 
ArrayList<Tuple2<String, String>>();

   return res;

 }

  }).collectAsMap();

  ctx.stop();

   }

 

protected JavaSparkContext createSparkContextTest(String appName) {

  SparkConf sparkConf = new SparkConf().setAppName(appName);

 

  JavaSparkContext ctx = new JavaSparkContext(sparkConf);

  Configuration hadoopConf = ctx.hadoopConfiguration();

  if (awsAccessKeyId != null) {

 

 hadoopConf.set("fs.s3.impl", 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

 hadoopConf.set("fs.s3.awsAccessKeyId", awsAccessKeyId);

 hadoopConf.set("fs.s3.awsSecretAccessKey", 
awsSecretAccessKey);

 

 hadoopConf.set("fs.s3n.impl", 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

 ha

Spark Tasks on second node never return in Yarn when I have more than 1 task node

2015-11-19 Thread Shuai Zheng
Hi All,

 

I face a very weird case. I have already simplify the scenario to the most
so everyone can replay the scenario. 

 

My env:

 

AWS EMR 4.1.0, Spark1.5

 

My code can run without any problem when I run it in a local mode, and it
has no problem when it run on a EMR cluster with one master and one task
node. 

 

But when I try to run a multiple node (more than 1 task node, which means 3
nodes cluster), the tasks will never return from one of it. 

 

The log as below:

 

15/11/19 21:19:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
0, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
1, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
2, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)

15/11/19 21:19:07 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
3, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)

 

So you can see the task will alternatively submitted to two instances, one
is ip-10-165-121-188 and another is ip-10-155-160-147.

And later only the tasks runs on the ip-10-165-121-188.ec2 will finish will
always just wait there, ip-10-155-160-147.ec2 never return.

 

The data and code has been tested in local mode, single spark cluster mode,
so it should not be an issue on logic or data.

 

And I have attached my test case here (I believe it is simple enough and no
any business logic is involved):

 

   public void createSiteGridExposure2() {

  JavaSparkContext ctx = this.createSparkContextTest("Test");

  ctx.textFile(siteEncodeLocation).flatMapToPair(new
PairFlatMapFunction() {

 @Override

 public Iterable> call(String
line) throws Exception {

   List> res = new
ArrayList>();

   return res;

 }

  }).collectAsMap();

  ctx.stop();

   }

 

protected JavaSparkContext createSparkContextTest(String appName) {

  SparkConf sparkConf = new SparkConf().setAppName(appName);

 

  JavaSparkContext ctx = new JavaSparkContext(sparkConf);

  Configuration hadoopConf = ctx.hadoopConfiguration();

  if (awsAccessKeyId != null) {

 

 hadoopConf.set("fs.s3.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

 hadoopConf.set("fs.s3.awsAccessKeyId", awsAccessKeyId);

 hadoopConf.set("fs.s3.awsSecretAccessKey",
awsSecretAccessKey);

 

 hadoopConf.set("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

 hadoopConf.set("fs.s3n.awsAccessKeyId",
awsAccessKeyId);

 hadoopConf.set("fs.s3n.awsSecretAccessKey",
awsSecretAccessKey);

  }

  return ctx;

   }

 

 

Anyone has any idea why this happened? I am a bit lost because the code
works in local mode and 2 node (1 master 1 task) clusters, but when it move
a multiple task nodes cluster, I have this issue. No error no exception, not
even timeout (because I wait more than 1 hours and there is no timeout
also).

 

Regards,

 

Shuai



Re: Spark Tasks on second node never return in Yarn when I have more than 1 task node

2015-11-19 Thread Jonathan Kelly
I don't know if this actually has anything to do with why your job is
hanging, but since you are using EMR you should probably not set those
fs.s3 properties but rather let it use EMRFS, EMR's optimized Hadoop
FileSystem implementation for interacting with S3. One benefit is that it
will automatically pick up your AWS credentials from your EC2 instance role
rather than you having to configure them manually (since doing so is
insecure because you have to get the secret access key onto your instance).

If simply making that change does not fix the issue, a jstack of the hung
process would help you figure out what it is doing. You should also look at
the YARN container logs (which automatically get uploaded to your S3 logs
bucket if you have this enabled).

~ Jonathan

On Thu, Nov 19, 2015 at 1:32 PM, Shuai Zheng  wrote:

> Hi All,
>
>
>
> I face a very weird case. I have already simplify the scenario to the most
> so everyone can replay the scenario.
>
>
>
> My env:
>
>
>
> AWS EMR 4.1.0, Spark1.5
>
>
>
> My code can run without any problem when I run it in a local mode, and it
> has no problem when it run on a EMR cluster with one master and one task
> node.
>
>
>
> But when I try to run a multiple node (more than 1 task node, which means
> 3 nodes cluster), the tasks will never return from one of it.
>
>
>
> The log as below:
>
>
>
> 15/11/19 21:19:07 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 0, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)
>
> 15/11/19 21:19:07 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 1, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)
>
> 15/11/19 21:19:07 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
> 2, ip-10-165-121-188.ec2.internal, PROCESS_LOCAL, 2241 bytes)
>
> 15/11/19 21:19:07 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 3, ip-10-155-160-147.ec2.internal, PROCESS_LOCAL, 2241 bytes)
>
>
>
> So you can see the task will alternatively submitted to two instances, one
> is ip-10-165-121-188 and another is ip-10-155-160-147.
>
> And later only the tasks runs on the ip-10-165-121-188.ec2 will finish
> will always just wait there, ip-10-155-160-147.ec2 never return.
>
>
>
> The data and code has been tested in local mode, single spark cluster
> mode, so it should not be an issue on logic or data.
>
>
>
> And I have attached my test case here (I believe it is simple enough and
> no any business logic is involved):
>
>
>
>*public* *void* createSiteGridExposure2() {
>
>   JavaSparkContext ctx = *this*.createSparkContextTest("Test"
> );
>
>   ctx.textFile(siteEncodeLocation).flatMapToPair(*new* 
> *PairFlatMapFunction String, String>()* {
>
>  @Override
>
>  *public* Iterable>
> call(String line) *throws* Exception {
>
>List> res = *new*
> ArrayList>();
>
>*return* res;
>
>  }
>
>   }).collectAsMap();
>
>   ctx.stop();
>
>}
>
>
>
> *protected* JavaSparkContext createSparkContextTest(String appName) {
>
>   SparkConf sparkConf = *new* SparkConf().setAppName(appName);
>
>
>
>   JavaSparkContext ctx = *new* JavaSparkContext(sparkConf);
>
>   Configuration hadoopConf = ctx.hadoopConfiguration();
>
>   *if* (awsAccessKeyId != *null*) {
>
>
>
>  hadoopConf.set("fs.s3.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
>
>  hadoopConf.set("fs.s3.awsAccessKeyId", awsAccessKeyId
> );
>
>  hadoopConf.set("fs.s3.awsSecretAccessKey",
> awsSecretAccessKey);
>
>
>
>  hadoopConf.set("fs.s3n.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
>
>  hadoopConf.set("fs.s3n.awsAccessKeyId",
> awsAccessKeyId);
>
>  hadoopConf.set("fs.s3n.awsSecretAccessKey",
> awsSecretAccessKey);
>
>   }
>
>   *return* ctx;
>
>}
>
>
>
>
>
> Anyone has any idea why this happened? I am a bit lost because the code
> works in local mode and 2 node (1 master 1 task) clusters, but when it move
> a multiple task nodes cluster, I have this issue. No error no exception,
> not even timeout (because I wait more than 1 hours and there is no timeout
> also).
>
>
>
> Regards,
>
>
>
> Shuai
>