Re: Autoscaling of Spark YARN cluster

2015-12-14 Thread Mingyu Kim
Cool. Using Ambari to monitor and scale up/down the cluster sounds
promising. Thanks for the pointer!

Mingyu

From:  Deepak Sharma <deepakmc...@gmail.com>
Date:  Monday, December 14, 2015 at 1:53 AM
To:  cs user <acldstk...@gmail.com>
Cc:  Mingyu Kim <m...@palantir.com>, "user@spark.apache.org"
<user@spark.apache.org>
Subject:  Re: Autoscaling of Spark YARN cluster

An approach I can think of  is using Ambari Metrics Service(AMS)
Using these metrics , you can decide upon if the cluster is low in
resources.
If yes, call the Ambari management API to add the node to the cluster.

Thanks
Deepak

On Mon, Dec 14, 2015 at 2:48 PM, cs user <acldstk...@gmail.com> wrote:
> Hi Mingyu, 
> 
> I'd be interested in hearing about anything else you find which might meet
> your needs for this.
> 
> One way perhaps this could be done would be to use Ambari. Ambari comes with a
> nice api which you can use to add additional nodes into a cluster:
> 
> https://github.com/apache/ambari/blob/trunk/ambari-server/docs/api/v1/index.md
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_ambari
> _blob_trunk_ambari-2Dserver_docs_api_v1_index.md=CwMFaQ=izlc9mHr637UR4lpLE
> ZLFFS3Vn2UXBrZ4tFb6oOnmz8=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84=tDt9
> pyS5Gz-4R50zQ9pG1lDSVv8Gg03JsQXzTtsghag=aceNpj9HLTmsTeVMI5VMxj9HmbU3ls0gqxa2
> OVkkUOA=> 
> 
> Once the node has been built, the ambari agent installed, you can then call
> back to the management node via the api, tell it what you want the new node to
> be, and it will connect, configure your new node and add it to the cluster.
> 
> You could create a host group within the cluster blueprint with the minimal
> components you need to install to have it operate as a yarn node.
> 
> As for the decision to scale, that is outside of the remit of Ambari. I guess
> you could look into using aws autoscaling or you could look into a product
> called scalr, which has an opensource version. We are using this to install an
> ambari cluster using chef to configure the nodes up until the point it hands
> over to Ambari. 
> 
> Scalr allows you to write custom scaling metrics which you could use to query
> the # of applications queued, # of resources available values and add nodes
> when required. 
> 
> Cheers!
> 
> On Mon, Dec 14, 2015 at 8:57 AM, Mingyu Kim <m...@palantir.com> wrote:
>> Hi all,
>> 
>> Has anyone tried out autoscaling Spark YARN cluster on a public cloud (e.g.
>> EC2) based on workload? To be clear, I¹m interested in scaling the cluster
>> itself up and down by adding and removing YARN nodes based on the cluster
>> resource utilization (e.g. # of applications queued, # of resources
>> available), as opposed to scaling resources assigned to Spark applications,
>> which is natively supported by Spark¹s dynamic resource scheduling. I¹ve
>> found that Cloudbreak
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__sequenceiq.com_cloudbrea
>> k-2Ddocs_latest_periscope_-23how-2Dit-2Dworks=CwMFaQ=izlc9mHr637UR4lpLEZL
>> FFS3Vn2UXBrZ4tFb6oOnmz8=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84=tDt9p
>> yS5Gz-4R50zQ9pG1lDSVv8Gg03JsQXzTtsghag=qKfLbs_mv_rLKTEHN1FUW98fehzu7HAbdD7t
>> h9dykTg=>  has a similar feature, but it¹s in ³technical preview², and I
>> didn¹t find much else from my search.
>> 
>> This might be a general YARN question, but wanted to check if there¹s a
>> solution popular in the Spark community. Any sharing of experience around
>> autoscaling will be helpful!
>> 
>> Thanks,
>> Mingyu
> 



-- 
Thanks
Deepak
www.bigdatabig.com 
<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.bigdatabig.com=Cw
MFaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=ennQJq47pNnObsDh-88a9YU
rUulcYQoV8giPASqXB84=tDt9pyS5Gz-4R50zQ9pG1lDSVv8Gg03JsQXzTtsghag=HGOZP3P
urGS6jiGFWaz2IevpABa9qmCrmkbP-hwvmhI=>
www.keosha.net 
<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.keosha.net=CwMFaQ
=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=ennQJq47pNnObsDh-88a9YUrUul
cYQoV8giPASqXB84=tDt9pyS5Gz-4R50zQ9pG1lDSVv8Gg03JsQXzTtsghag=U8sfm5YwpBP
1s8c4QjkSsmIESUG56RNKo3O6ZEnijA4=>




smime.p7s
Description: S/MIME cryptographic signature


Autoscaling of Spark YARN cluster

2015-12-14 Thread Mingyu Kim
Hi all,

Has anyone tried out autoscaling Spark YARN cluster on a public cloud (e.g.
EC2) based on workload? To be clear, I¹m interested in scaling the cluster
itself up and down by adding and removing YARN nodes based on the cluster
resource utilization (e.g. # of applications queued, # of resources
available), as opposed to scaling resources assigned to Spark applications,
which is natively supported by Spark¹s dynamic resource scheduling. I¹ve
found that Cloudbreak
  has
a similar feature, but it¹s in ³technical preview², and I didn¹t find much
else from my search.

This might be a general YARN question, but wanted to check if there¹s a
solution popular in the Spark community. Any sharing of experience around
autoscaling will be helpful!

Thanks,
Mingyu




smime.p7s
Description: S/MIME cryptographic signature


Re: compatibility issue with Jersey2

2015-10-13 Thread Mingyu Kim
Hi all,

I filed https://issues.apache.org/jira/browse/SPARK-11081. Since Jersey’s 
surface area is relatively small and seems to be only used for Spark UI and 
json API, shading the dependency might make sense similar to what’s done for 
Jerry dependencies at https://issues.apache.org/jira/browse/SPARK-3996. Would 
this be reasonable?

Mingyu







On 10/7/15, 11:26 AM, "Marcelo Vanzin"  wrote:

>Seems like you might be running into
>https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D10910=CQIBaQ=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84=GuNlWwLNE7UP5euS6Ccu86dUSs1AuiouVOM3bTeZuoQ=Z23j4oFFQ12DNJYiFfXFsXPlpav2HD0W0eZqVEhjjOk=
> . I've been busy with
>other things but plan to take a look at that one when I find time...
>right now I don't really have a solution, other than making sure your
>application's jars do not include those classes the exception is
>complaining about.
>
>On Wed, Oct 7, 2015 at 10:23 AM, Gary Ogden  wrote:
>> What you suggested seems to have worked for unit tests. But now it throws
>> this at run time on mesos with spark-submit:
>>
>> Exception in thread "main" java.lang.LinkageError: loader constraint
>> violation: when resolving method
>> "org.slf4j.impl.StaticLoggerBinder.getLoggerFactory()Lorg/slf4j/ILoggerFactory;"
>> the class loader (instance of
>> org/apache/spark/util/ChildFirstURLClassLoader) of the current class,
>> org/slf4j/LoggerFactory, and the class loader (instance of
>> sun/misc/Launcher$AppClassLoader) for resolved class,
>> org/slf4j/impl/StaticLoggerBinder, have different Class objects for the type
>> LoggerFactory; used in the signature
>>  at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:336)
>>  at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:284)
>>  at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:305)
>>  at com.company.spark.utils.SparkJob.(SparkJob.java:41)
>>  at java.lang.Class.forName0(Native Method)
>>  at java.lang.Class.forName(Unknown Source)
>>  at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:634)
>>  at 
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>> On 6 October 2015 at 16:20, Marcelo Vanzin  wrote:
>>>
>>> On Tue, Oct 6, 2015 at 12:04 PM, Gary Ogden  wrote:
>>> > But we run unit tests differently in our build environment, which is
>>> > throwing the error. It's setup like this:
>>> >
>>> > I suspect this is what you were referring to when you said I have a
>>> > problem?
>>>
>>> Yes, that is what I was referring to. But, in your test environment,
>>> you might be able to work around the problem by setting
>>> "spark.ui.enabled=false"; that should disable all the code that uses
>>> Jersey, so you can use your newer version in your unit tests.
>>>
>>>
>>> --
>>> Marcelo
>>
>>
>
>
>
>-- 
>Marcelo
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

smime.p7s
Description: S/MIME cryptographic signature


Re: Which OutputCommitter to use for S3?

2015-02-23 Thread Mingyu Kim
Cool, we will start from there. Thanks Aaron and Josh!

Darin, it¹s likely because the DirectOutputCommitter is compiled with
Hadoop 1 classes and you¹re running it with Hadoop 2.
org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
became an interface in Hadoop 2.

Mingyu





On 2/23/15, 11:52 AM, Darin McBeath ddmcbe...@yahoo.com.INVALID wrote:

Aaron.  Thanks for the class. Since I'm currently writing Java based
Spark applications, I tried converting your class to Java (it seemed
pretty straightforward).

I set up the use of the class as follows:

SparkConf conf = new SparkConf()
.set(spark.hadoop.mapred.output.committer.class,
com.elsevier.common.DirectOutputCommitter);

And I then try and save a file to S3 (which I believe should use the old
hadoop apis).

JavaPairRDDText, Text newBaselineRDDWritable =
reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
Text.class, Text.class, SequenceFileOutputFormat.class,
org.apache.hadoop.io.compress.GzipCodec.class);

But, I get the following error message.

Exception in thread main java.lang.IncompatibleClassChangeError: Found
class org.apache.hadoop.mapred.JobContext, but interface was expected
at 
com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
java:68)
at 
org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
.scala:1075)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
ala:940)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
ala:902)
at 
org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
71)
at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)

In my class, JobContext is an interface of  type
org.apache.hadoop.mapred.JobContext.

Is there something obvious that I might be doing wrong (or messed up in
the translation from Scala to Java) or something I should look into?  I'm
using Spark 1.2 with hadoop 2.4.


Thanks.

Darin.





From: Aaron Davidson ilike...@gmail.com
To: Andrew Ash and...@andrewash.com
Cc: Josh Rosen rosenvi...@gmail.com; Mingyu Kim m...@palantir.com;
user@spark.apache.org user@spark.apache.org; Aaron Davidson
aa...@databricks.com
Sent: Saturday, February 21, 2015 7:01 PM
Subject: Re: Which OutputCommitter to use for S3?



Here is the class:
https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
dav_c513916e72101bbe14ecd=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
Onmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFs
zOvl_-ZnxmkBPHo1K24TfGEs=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8e=

You can use it by setting mapred.output.committer.class in the Hadoop
configuration (or spark.hadoop.mapred.output.committer.class in the
Spark configuration). Note that this only works for the old Hadoop APIs,
I believe the new Hadoop APIs strongly tie committer to input format (so
FileInputFormat always uses FileOutputCommitter), which makes this fix
more difficult to apply.




On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash and...@andrewash.com wrote:

Josh is that class something you guys would consider open sourcing, or
would you rather the community step up and create an OutputCommitter
implementation optimized for S3?


On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen rosenvi...@gmail.com wrote:

We (Databricks) use our own DirectOutputCommitter implementation, which
is a couple tens of lines of Scala code.  The class would almost
entirely be a no-op except we took some care to properly handle the
_SUCCESS file.


On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote:

I didn¹t get any response. It¹d be really appreciated if anyone using a
special OutputCommitter for S3 can comment on this!


Thanks,
Mingyu


From: Mingyu Kim m...@palantir.com
Date: Monday, February 16, 2015 at 1:15 AM
To: user@spark.apache.org user@spark.apache.org
Subject: Which OutputCommitter to use for S3?



HI all,


The default OutputCommitter used by RDD, which is FileOutputCommitter,
seems to require moving files at the commit step, which is not a
constant operation in S3, as discussed in
https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apa
che.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40ent
ropy.be-253Ed=AwIFaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=e
nnQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=_2YAVrYZtQmuKZRf6sFszOvl_-
ZnxmkBPHo1K24TfGEs=EQOZaHRANJupdjXCfHSXL2t5BZ9YgMt2pRc3pht4o7oe= .
People seem to develop their own NullOutputCommitter implementation or
use DirectFileOutputCommitter (as mentioned in SPARK-3595), but I
wanted to check if there is a de facto standard, publicly available
OutputCommitter to use for S3 in conjunction with Spark.


Thanks,
Mingyu

Re: Which OutputCommitter to use for S3?

2015-02-20 Thread Mingyu Kim
I didn’t get any response. It’d be really appreciated if anyone using a special 
OutputCommitter for S3 can comment on this!

Thanks,
Mingyu

From: Mingyu Kim m...@palantir.commailto:m...@palantir.com
Date: Monday, February 16, 2015 at 1:15 AM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Which OutputCommitter to use for S3?

HI all,

The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to 
require moving files at the commit step, which is not a constant operation in 
S3, as discussed in 
http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3Ehttps://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=.
 People seem to develop their own NullOutputCommitter implementation or use 
DirectFileOutputCommitter (as mentioned in 
SPARK-3595https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=),
 but I wanted to check if there is a de facto standard, publicly available 
OutputCommitter to use for S3 in conjunction with Spark.

Thanks,
Mingyu


Which OutputCommitter to use for S3?

2015-02-16 Thread Mingyu Kim
HI all,

The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to 
require moving files at the commit step, which is not a constant operation in 
S3, as discussed in 
http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E.
 People seem to develop their own NullOutputCommitter implementation or use 
DirectFileOutputCommitter (as mentioned in 
SPARK-3595https://issues.apache.org/jira/browse/SPARK-3595), but I wanted to 
check if there is a de facto standard, publicly available OutputCommitter to 
use for S3 in conjunction with Spark.

Thanks,
Mingyu


Re: How does Spark speculation prevent duplicated work?

2014-07-16 Thread Mingyu Kim
That makes sense. Thanks everyone for the explanations!

Mingyu

From:  Matei Zaharia matei.zaha...@gmail.com
Reply-To:  user@spark.apache.org user@spark.apache.org
Date:  Tuesday, July 15, 2014 at 3:00 PM
To:  user@spark.apache.org user@spark.apache.org
Subject:  Re: How does Spark speculation prevent duplicated work?

Yeah, this is handled by the commit call of the FileOutputFormat. In
general Hadoop OutputFormats have a concept called committing the output,
which you should do only once per partition. In the file ones it does an
atomic rename to make sure that the final output is a complete file.

Matei

On Jul 15, 2014, at 2:49 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 The way the HDFS file writing works at a high level is that each attempt to
 write a partition to a file starts writing to unique temporary file (say,
 something like targetDirectory/_temp/part-X_attempt-). If the writing
 into the file successfully completes, then the temporary file is moved to the
 final location (say, targetDirectory/part-X). If, due to speculative
 execution, the file already exists in the final intended location, then move
 is avoided. Or, its overwritten, I forget the implementation. Either ways, all
 attempts to write the same partition, will always write the same data to the
 temp file (assuming the spark transformation generating the data is
 deterministic and idempotent). And once one attempt is successful, the final
 file will have the same data. Hence, writing to HDFS / S3 is idempotent.
 
 Now this logic is already implemented within the Hadoop's MapReduce logic, and
 Spark just uses it directly.
 
 TD
 
 
 On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim m...@palantir.com wrote:
 Thanks for the explanation, guys.
 
 I looked into the saveAsHadoopFile implementation a little bit. If you see
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/sp
 ark/rdd/PairRDDFunctions.scala
 https://urldefense.proofpoint.com/v1/url?u=https://github.com/apache/spark/b
 lob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scalak=
 fDZpZZQMmYwf27OU23GmAQ%3D%3D%0Ar=UKDOcu6qL3KsoZhpOohNBR1ucPNmWnbd3eEJ9hVUdMk
 %3D%0Am=Sb74h34ZToCtFlhH6q91HplG%2FXaCtRoAmwWFXD9vXI0%3D%0As=a68ed701b6f285
 5cc2fb0aaec8d033cd6ef9bafbb2a91ce7a10e465e79d0a4d2  at line 843, the HDFS
 write happens at per-partition processing, not at the result handling, so I
 have a feeling that it might be writing multiple times. This may be fine if
 both tasks for the same partition completes because it will simply overwrite
 the output partition with the same content, but this could be an issue if one
 of the tasks completes and the other is in the middle of writing the
 partition by the time the entire stage completes. Can someone explain this?
 
 Bertrand, I¹m slightly confused about your comment. So, is it the case that
 HDFS will handle the writes as a temp file write followed by an atomic move,
 so the concern I had above is handled at the HDFS level?
 
 Mingyu
 
 From: Bertrand Dechoux decho...@gmail.com
 Reply-To: user@spark.apache.org user@spark.apache.org
 Date: Tuesday, July 15, 2014 at 1:22 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: Re: How does Spark speculation prevent duplicated work?
 
 I haven't look at the implementation but what you would do with any
 filesystem is write to a file inside the workspace directory of the task. And
 then only the attempt of the task that should be kept will perform a move to
 the final path. The other attempts are simply discarded. For most filesystem
 (and that's the case for HDFS), a 'move' is a very simple and fast action
 because only the full path/name of the file change but not its content or
 where this content is physically stored.
 
 Executive speculation happens in Hadoop MapReduce. Spark has the same
 concept. As long as you apply functions with no side effect (ie the only
 impact is the returned results), then you just need to not take into account
 results from additional attempts of the same task/operator.
 
 Bertrand Dechoux
 
 
 On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash and...@andrewash.com wrote:
 Hi Nan, 
 
 Great digging in -- that makes sense to me for when a job is producing some
 output handled by Spark like a .count or .distinct or similar.
 
 For the other part of the question, I'm also interested in side effects like
 an HDFS disk write.  If one task is writing to an HDFS path and another task
 starts up, wouldn't it also attempt to write to the same path?  How is that
 de-conflicted?
 
 
 On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu zhunanmcg...@gmail.com wrote:
 Hi, Mingyuan, 
 
 According to my understanding,
 
 Spark processes the result generated from each partition by passing them to
 resultHandler (SparkContext.scala L1056)
 
 This resultHandler is usually just put the result in a driver-side array,
 the length of which is always partitions.size
 
 this design effectively ensures that the actions are idempotent

How does Spark speculation prevent duplicated work?

2014-07-15 Thread Mingyu Kim
Hi all,

I was curious about the details of Spark speculation. So, my understanding
is that, when ³speculated² tasks are newly scheduled on other machines, the
original tasks are still running until the entire stage completes. This
seems to leave some room for duplicated work because some spark actions are
not idempotent. For example, it may be counting a partition twice in case of
RDD.count or may be writing a partition to HDFS twice in case of
RDD.save*(). How does it prevent this kind of duplicated work?

Mingyu




smime.p7s
Description: S/MIME cryptographic signature


JavaRDD.mapToPair throws NPE

2014-06-24 Thread Mingyu Kim
Hi all,

I¹m trying to use JavaRDD.mapToPair(), but it fails with NPE on the
executor. The PairFunction used in the call is null for some reason. Any
comments/help would be appreciated!

My setup is,
* Java 7
* Spark 1.0.0
* Hadoop 2.0.0-mr1-cdh4.6.0
Here¹s the code snippet.

 import org.apache.spark.SparkConf;
 
 import org.apache.spark.api.java.JavaPairRDD;
 
 import org.apache.spark.api.java.JavaRDD;
 
 import org.apache.spark.api.java.JavaSparkContext;
 
 import org.apache.spark.api.java.function.PairFunction;
 
 
 
 import scala.Tuple2;
 
 
 
 public class Test {
 
 public static void main(String[] args) {
 
 SparkConf conf = new SparkConf()
 
 .setMaster(spark://mymaster)
 
 .setAppName(MyApp)
 
 .setSparkHome(/my/spark/home);
 
 
 
 JavaSparkContext sc = new JavaSparkContext(conf);
 
 sc.addJar(/path/to/jar); // ship the jar of this class
 
 JavaRDDString rdd = sc.textFile(/path/to/nums.csv²); // nums.csv
 simply has one integer per line
 
 JavaPairRDDInteger, Integer pairRdd = rdd.mapToPair(new
 MyPairFunction());
 
 
 
 System.out.println(pairRdd.collect());
 
 }
 
 
 
 private static final class MyPairFunction implements PairFunctionString,
 Integer, Integer {
 
 private static final long serialVersionUID = 1L;
 
 
 
 @Override
 
 public Tuple2Integer, Integer call(String s) throws Exception {
 
 return new Tuple2Integer, Integer(Integer.parseInt(s),
 Integer.parseInt(s));
 
 }
 
 }
 
 }
 
 
Here¹s the stack trace.
 
 Exception in thread main 14/06/24 14:39:01 INFO scheduler.TaskSchedulerImpl:
 Removed TaskSet 0.0, whose tasks have all completed, from pool
 
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0
 failed 4 times, most recent failure: Exception failure in TID 6 on host
 10.160.24.216: java.lang.NullPointerException
 
 
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaP
 airRDD.scala:750)
 
 
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaP
 airRDD.scala:750)
 
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 
 
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 
 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 
 
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 
 scala.collection.AbstractIterator.to(Iterator.scala:1157)
 
 
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 
 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 
 
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 
 scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 
 org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
 
 org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
 
 
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
 
 
 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
 
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 
 org.apache.spark.scheduler.Task.run(Task.scala:51)
 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
 
 
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145
)
 
 
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615
)
 
 java.lang.Thread.run(Thread.java:722)
 
 Driver stacktrace:
 
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedule
 r$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSchedul
 er.scala:1017)
 
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSchedul
 er.scala:1015)
 
 at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 
 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(D
 AGScheduler.scala:633)
 
 at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(D
 AGScheduler.scala:633)
 
 at scala.Option.foreach(Option.scala:236)
 
 at 
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala
 :633)
 
 at 
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.ap
 plyOrElse(DAGScheduler.scala:1207)
 
 at 

1.0.1 release plan

2014-06-19 Thread Mingyu Kim
Hi all,

Is there any plan for 1.0.1 release?

Mingyu




smime.p7s
Description: S/MIME cryptographic signature


Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
Yes, that’s what I meant. Sure, the numbers might not be actually sorted,
but the order of rows semantically are kept throughout non-shuffling
transforms. I’m on board with you on union as well.

Back to the original question, then, why is it important to coalesce to a
single partition? When you union two RDDs, for example, rdd1 = [“a, b,
c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three
lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the
two reds are concatenated.

Mingyu




On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote:

If you call map() on an RDD it will retain the ordering it had before,
but that is not necessarily a correct sort order for the new RDD.

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0]

Note that mapped is no longer sorted.

When you union two RDD's together it will effectively concatenate the
two orderings, which is also not a valid sorted order on the new RDD:

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5]

On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote:
 Thanks for the quick response!

 To better understand it, the reason sorted RDD has a well-defined
ordering
 is because sortedRDD.getPartitions() returns the partitions in the right
 order and each partition internally is properly sorted. So, if you have

 var rdd = sc.parallelize([2, 1, 3]);
 var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
 var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4]

 Since mapValues doesn’t change the order of partitions not change the
 order of rows within the partitions, I think “mapped” should have the
 exact same order as “sorted”. Sure, if a transform involves shuffling,
the
 order will change. Am I mistaken? Is there an extra detail in sortedRDD
 that guarantees a well-defined ordering?

 If it’s true that the order of partitions returned by
RDD.getPartitions()
 and the row orders within the partitions determine the row order, I’m
not
 sure why union doesn’t respect the order because union operation simply
 concatenates the two lists of partitions from the two RDDs.

 Mingyu




 On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote:

You are right, once you sort() the RDD, then yes it has a well defined
ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com wrote:
 Hi Patrick,

 I¹m a little confused about your comment that RDDs are not ordered. As
far
 as I know, RDDs keep list of partitions that are ordered and this is
why I
 can call RDD.take() and get the same first k rows every time I call it
and
 RDD.take() returns the same entries as RDD.map(Š).take() because map
 preserves the partition order. RDD order is also what allows me to get
the
 top k out of RDD by doing RDD.sort().take().

 Am I misunderstanding it? Or, is it just when RDD is written to disk
that
 the order is not well preserved? Thanks in advance!

 Mingyu




 On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition,
then
 call MapPartitions and return an iterator that first adds your
header
 and then the rest of the file, then call saveAsTextFile. Keep in
mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator)
 .saveAsTextFile(out.csv)

 - Patrick

 On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
 buendia...@gmail.com wrote:
  Hi,
 
  I'm trying to find a way to create a csv header when using
  saveAsTextFile,
  and I came up with this:
 
  (sc.makeRDD(Array(col1,col2,col3), 1) ++
  myRdd.coalesce(1).map(_.mkString(,)))
.saveAsTextFile(out.csv)
 
  But it only saves the header part. Why is that the union method
does
not
  return both RDD's?




smime.p7s
Description: S/MIME cryptographic signature


Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
Okay, that makes sense. It’d be great if this can be better documented at
some point, because the only way to find out about the resulting RDD row
order is by looking at the code.

Thanks for the discussion!

Mingyu




On 4/29/14, 11:59 PM, Patrick Wendell pwend...@gmail.com wrote:

I don't think we guarantee anywhere that union(A, B) will behave by
concatenating the partitions, it just happens to be an artifact of the
current implementation.

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
wouldn't violate the contract of union

AFIAK the only guarentee is the resulting RDD will contain all elements.

- Patrick

On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim m...@palantir.com wrote:
 Yes, that’s what I meant. Sure, the numbers might not be actually
sorted,
 but the order of rows semantically are kept throughout non-shuffling
 transforms. I’m on board with you on union as well.

 Back to the original question, then, why is it important to coalesce to
a
 single partition? When you union two RDDs, for example, rdd1 = [“a, b,
 c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
 rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
three
 lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
the
 two reds are concatenated.

 Mingyu




 On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote:

If you call map() on an RDD it will retain the ordering it had before,
but that is not necessarily a correct sort order for the new RDD.

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0]

Note that mapped is no longer sorted.

When you union two RDD's together it will effectively concatenate the
two orderings, which is also not a valid sorted order on the new RDD:

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5]

On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote:
 Thanks for the quick response!

 To better understand it, the reason sorted RDD has a well-defined
ordering
 is because sortedRDD.getPartitions() returns the partitions in the
right
 order and each partition internally is properly sorted. So, if you
have

 var rdd = sc.parallelize([2, 1, 3]);
 var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
 var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4]

 Since mapValues doesn’t change the order of partitions not change the
 order of rows within the partitions, I think “mapped” should have the
 exact same order as “sorted”. Sure, if a transform involves shuffling,
the
 order will change. Am I mistaken? Is there an extra detail in
sortedRDD
 that guarantees a well-defined ordering?

 If it’s true that the order of partitions returned by
RDD.getPartitions()
 and the row orders within the partitions determine the row order, I’m
not
 sure why union doesn’t respect the order because union operation
simply
 concatenates the two lists of partitions from the two RDDs.

 Mingyu




 On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote:

You are right, once you sort() the RDD, then yes it has a well defined
ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com
wrote:
 Hi Patrick,

 I¹m a little confused about your comment that RDDs are not ordered.
As
far
 as I know, RDDs keep list of partitions that are ordered and this is
why I
 can call RDD.take() and get the same first k rows every time I call
it
and
 RDD.take() returns the same entries as RDD.map(Š).take() because map
 preserves the partition order. RDD order is also what allows me to
get
the
 top k out of RDD by doing RDD.sort().take().

 Am I misunderstanding it? Or, is it just when RDD is written to disk
that
 the order is not well preserved? Thanks in advance!

 Mingyu




 On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union
two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition,
then
 call MapPartitions and return an iterator that first adds your
header
 and then the rest of the file, then call saveAsTextFile. Keep in
mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator

Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
I agree with you in general that as an API user, I shouldn’t be relying on
code. However, without looking at the code, there is no way for me to find
out even whether map() keeps the row order. Without the knowledge at all,
I’d need to do “sort” every time I need certain things in a certain order.
(and, sort is really expensive.) On the other hand, if I can assume, say,
“filter” or “map” doesn’t shuffle the rows around, I can do the sort once
and assume that the order is retained throughout such operations saving a
lot of time from doing unnecessary sorts.

Mingyu

From:  Mark Hamstra m...@clearstorydata.com
Reply-To:  user@spark.apache.org user@spark.apache.org
Date:  Wednesday, April 30, 2014 at 11:36 AM
To:  user@spark.apache.org user@spark.apache.org
Subject:  Re: Union of 2 RDD's only returns the first one

Which is what you shouldn't be doing as an API user, since that
implementation code might change.  The documentation doesn't mention a row
ordering guarantee, so none should be assumed.

It is hard enough for us to correctly document all of the things that the
API does do.  We really shouldn't be forced into the expectation that we
will also fully document everything that the API doesn't do.


On Wed, Apr 30, 2014 at 11:13 AM, Mingyu Kim m...@palantir.com wrote:
 Okay, that makes sense. It’d be great if this can be better documented at
 some point, because the only way to find out about the resulting RDD row
 order is by looking at the code.
 
 Thanks for the discussion!
 
 Mingyu
 
 
 
 
 On 4/29/14, 11:59 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 I don't think we guarantee anywhere that union(A, B) will behave by
 concatenating the partitions, it just happens to be an artifact of the
 current implementation.
 
 rdd1 = [1,2,3]
 rdd2 = [1,4,5]
 
 rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
 rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
 wouldn't violate the contract of union
 
 AFIAK the only guarentee is the resulting RDD will contain all elements.
 
 - Patrick
 
 On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim m...@palantir.com wrote:
  Yes, that’s what I meant. Sure, the numbers might not be actually
 sorted,
  but the order of rows semantically are kept throughout non-shuffling
  transforms. I’m on board with you on union as well.
 
  Back to the original question, then, why is it important to coalesce to
 a
  single partition? When you union two RDDs, for example, rdd1 = [“a, b,
  c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
  rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
 three
  lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
 the
  two reds are concatenated.
 
  Mingyu
 
 
 
 
  On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 If you call map() on an RDD it will retain the ordering it had before,
 but that is not necessarily a correct sort order for the new RDD.
 
 var rdd = sc.parallelize([2, 1, 3]);
 var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
 var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0]
 
 Note that mapped is no longer sorted.
 
 When you union two RDD's together it will effectively concatenate the
 two orderings, which is also not a valid sorted order on the new RDD:
 
 rdd1 = [1,2,3]
 rdd2 = [1,4,5]
 
 rdd1.union(rdd2) = [1,2,3,1,4,5]
 
 On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote:
  Thanks for the quick response!
 
  To better understand it, the reason sorted RDD has a well-defined
 ordering
  is because sortedRDD.getPartitions() returns the partitions in the
 right
  order and each partition internally is properly sorted. So, if you
 have
 
  var rdd = sc.parallelize([2, 1, 3]);
  var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
  var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4]
 
  Since mapValues doesn’t change the order of partitions not change the
  order of rows within the partitions, I think “mapped” should have the
  exact same order as “sorted”. Sure, if a transform involves
 shuffling,
 the
  order will change. Am I mistaken? Is there an extra detail in
 sortedRDD
  that guarantees a well-defined ordering?
 
  If it’s true that the order of partitions returned by
 RDD.getPartitions()
  and the row orders within the partitions determine the row order, I’m
 not
  sure why union doesn’t respect the order because union operation
 simply
  concatenates the two lists of partitions from the two RDDs.
 
  Mingyu
 
 
 
 
  On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 You are right, once you sort() the RDD, then yes it has a well
 defined
 ordering.
 
 But that ordering is lost as soon as you transform the RDD,
 including
 if you union it with another RDD.
 
 On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com
 wrote:
  Hi Patrick,
 
  I¹m a little confused about your comment that RDDs are not
 ordered.
 As
 far
  as I know, RDDs keep list of partitions

Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Mingyu Kim
Hi Patrick,

I¹m a little confused about your comment that RDDs are not ordered. As far
as I know, RDDs keep list of partitions that are ordered and this is why I
can call RDD.take() and get the same first k rows every time I call it and
RDD.take() returns the same entries as RDD.map(Š).take() because map
preserves the partition order. RDD order is also what allows me to get the
top k out of RDD by doing RDD.sort().take().

Am I misunderstanding it? Or, is it just when RDD is written to disk that
the order is not well preserved? Thanks in advance!

Mingyu




On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition, then
 call MapPartitions and return an iterator that first adds your header
 and then the rest of the file, then call saveAsTextFile. Keep in mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator)
 .saveAsTextFile(out.csv)

 - Patrick

 On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
 buendia...@gmail.com wrote:
  Hi,
 
  I'm trying to find a way to create a csv header when using
  saveAsTextFile,
  and I came up with this:
 
  (sc.makeRDD(Array(col1,col2,col3), 1) ++
  myRdd.coalesce(1).map(_.mkString(,)))
.saveAsTextFile(out.csv)
 
  But it only saves the header part. Why is that the union method does
not
  return both RDD's?




smime.p7s
Description: S/MIME cryptographic signature