Re: high GC in the Kmeans algorithm

2015-02-17 Thread lihu
Thanks for your answer. Yes, I cached the data, I can observed from the
WebUI that all the data is cached in the memory.

What I worry is that the dimension,  not the total size.

Sean Owen ever answered me that the Broadcast support the maximum array
size is 2GB, so 10^7 is a little huge?

On Wed, Feb 18, 2015 at 5:43 AM, Xiangrui Meng men...@gmail.com wrote:

 Did you cache the data? Was it fully cached? The k-means
 implementation doesn't create many temporary objects. I guess you need
 more RAM to avoid GC triggered frequently. Please monitor the memory
 usage using YourKit or VisualVM. -Xiangrui

 On Wed, Feb 11, 2015 at 1:35 AM, lihu lihu...@gmail.com wrote:
  I just want to make the best use of CPU,  and test the performance of
 spark
  if there is a lot of task in a single node.
 
  On Wed, Feb 11, 2015 at 5:29 PM, Sean Owen so...@cloudera.com wrote:
 
  Good, worth double-checking that's what you got. That's barely 1GB per
  task though. Why run 48 if you have 24 cores?
 
  On Wed, Feb 11, 2015 at 9:03 AM, lihu lihu...@gmail.com wrote:
   I give 50GB to the executor,  so it seem that  there is no reason the
   memory
   is not enough.
  
   On Wed, Feb 11, 2015 at 4:50 PM, Sean Owen so...@cloudera.com
 wrote:
  
   Meaning, you have 128GB per machine but how much memory are you
 giving
   the executors?
  
   On Wed, Feb 11, 2015 at 8:49 AM, lihu lihu...@gmail.com wrote:
What do you mean?  Yes,I an see there  is some data put in the
 memory
from
the web ui.
   
On Wed, Feb 11, 2015 at 4:25 PM, Sean Owen so...@cloudera.com
wrote:
   
Are you actually using that memory for executors?
   
On Wed, Feb 11, 2015 at 8:17 AM, lihu lihu...@gmail.com wrote:
 Hi,
 I  run the kmeans(MLlib) in a cluster with 12 workers.
 Every
 work
 own a
 128G RAM, 24Core. I run 48 task in one machine. the total data
 is
 just
 40GB.

When the dimension of the data set is about 10^7, for every
 task
 the
 duration is about 30s, but the cost for GC is about 20s.

When I reduce the dimension to 10^4, then the gc is small.

 So why gc is so high when the dimension is larger? or this
 is
 the
 reason
 caused by MLlib?




   
   
   
   
--
Best Wishes!
   
Li Hu(李浒) | Graduate Student
Institute for Interdisciplinary Information Sciences(IIIS)
Tsinghua University, China
   
Email: lihu...@gmail.com
Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
   
   
  
  
  
  
   --
   Best Wishes!
  
   Li Hu(李浒) | Graduate Student
   Institute for Interdisciplinary Information Sciences(IIIS)
   Tsinghua University, China
  
   Email: lihu...@gmail.com
   Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
  
  
 
 
 
 
  --
  Best Wishes!
 
  Li Hu(李浒) | Graduate Student
  Institute for Interdisciplinary Information Sciences(IIIS)
  Tsinghua University, China
 
  Email: lihu...@gmail.com
  Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
 
 



Re: Lost task - connection closed

2015-02-17 Thread Tianshuo Deng
Hi, Thanks for the reponse.
I discovered my problem was that some of the executors got OOM, tracing
down the logs of executors helps discovering the problem. Usually the log
from the driver do not reflect the OOM error and therefore causes
confusions among users.

This is just the discoveries on my side, not sure if OP was having the same
problem though

On Wed, Feb 11, 2015 at 12:03 AM, Arush Kharbanda 
ar...@sigmoidanalytics.com wrote:

 Hi

 Can you share the code you are trying to run.

 Thanks
 Arush

 On Wed, Feb 11, 2015 at 9:12 AM, Tianshuo Deng td...@twitter.com.invalid
 wrote:

 I have seen the same problem, It causes some tasks to fail, but not the
 whole job to fail.
 Hope someone could shed some light on what could be the cause of this.

 On Mon, Jan 26, 2015 at 9:49 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 It looks like something weird is going on with your object
 serialization, perhaps a funny form of self-reference which is not detected
 by ObjectOutputStream's typical loop avoidance. That, or you have some data
 structure like a linked list with a parent pointer and you have many
 thousand elements.

 Assuming the stack trace is coming from an executor, it is probably a
 problem with the objects you're sending back as results, so I would
 carefully examine these and maybe try serializing some using
 ObjectOutputStream manually.

 If your program looks like
 foo.map { row = doComplexOperation(row) }.take(10)

 you can also try changing it to
 foo.map { row = doComplexOperation(row); 1 }.take(10)

 to avoid serializing the result of that complex operation, which should
 help narrow down where exactly the problematic objects are coming from.

 On Mon, Jan 26, 2015 at 8:31 AM, octavian.ganea 
 octavian.ga...@inf.ethz.ch wrote:

 Here is the first error I get at the executors:

 15/01/26 17:27:04 ERROR ExecutorUncaughtExceptionHandler: Uncaught
 exception
 in thread Thread[handle-message-executor-16,5,main]
 java.lang.StackOverflowError
 at

 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
 at

 java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1840)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1533)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 

Re: How do you get the partitioner for an RDD in Java?

2015-02-17 Thread Darin McBeath


Thanks Imran.  That's exactly what I needed to know.

Darin.


From: Imran Rashid iras...@cloudera.com
To: Darin McBeath ddmcbe...@yahoo.com 
Cc: User user@spark.apache.org 
Sent: Tuesday, February 17, 2015 8:35 PM
Subject: Re: How do you get the partitioner for an RDD in Java?



a JavaRDD is just a wrapper around a normal RDD defined in scala, which is 
stored in the rdd field.  You can access everything that way.  The JavaRDD 
wrappers just provide some interfaces that are a bit easier to work with in 
Java.

If this is at all convincing, here's me demonstrating it inside the spark-shell 
(yes its scala, but I'm using the java api)

scala val jsc = new JavaSparkContext(sc)
jsc: org.apache.spark.api.java.JavaSparkContext = 
org.apache.spark.api.java.JavaSparkContext@7d365529

 
scala val data = jsc.parallelize(java.util.Arrays.asList(Array(a, b, c)))
data: org.apache.spark.api.java.JavaRDD[Array[String]] = 
ParallelCollectionRDD[0] at parallelize at console:15

 
scala data.rdd.partitioner
res0: Option[org.apache.spark.Partitioner] = None




On Tue, Feb 17, 2015 at 3:44 PM, Darin McBeath ddmcbe...@yahoo.com.invalid 
wrote:

In an 'early release' of the Learning Spark book, there is the following 
reference:

In Scala and Java, you can determine how an RDD is partitioned using its 
partitioner property (or partitioner() method in Java)

However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a 
way of getting this information.

I'm curious if anyone has any suggestions for how I might go about finding how 
an RDD is partitioned in a Java program.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RangePartitioner in Spark 1.2.1

2015-02-17 Thread java8964
Hi, Sparkers:
I just happened to search in google for something related to the 
RangePartitioner of spark, and found an old thread in this email list as here:
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html
I followed the code example mentioned in that email thread as following:
scala  import org.apache.spark.RangePartitionerimport 
org.apache.spark.RangePartitioner
scala val rdd = sc.parallelize(List(apple, Ball, cat, dog, Elephant, 
fox, gas, horse, index, jet, kitsch, long, moon, Neptune, 
ooze, Pen, quiet, rose, sun, talk, umbrella, voice, Walrus, 
xeon, Yam, zebra))rdd: org.apache.spark.rdd.RDD[String] = 
ParallelCollectionRDD[0] at parallelize at console:13
scala rdd.keyBy(s = s(0).toUpper)res0: org.apache.spark.rdd.RDD[(Char, 
String)] = MappedRDD[1] at keyBy at console:16
scala res0.partitionBy(new RangePartitioner[Char, String](26, 
res0)).valuesres1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at 
console:18
scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx, 
s))).collect.foreach(println)
The above example is clear for me to understand the meaning of the 
RangePartitioner, but to my surprise, I got the following result:
(0,apple)(0,Ball)(1,cat)(2,dog)(3,Elephant)(4,fox)(5,gas)(6,horse)(7,index)(8,jet)(9,kitsch)(10,long)(11,moon)(12,Neptune)(13,ooze)(14,Pen)(15,quiet)(16,rose)(17,sun)(18,talk)(19,umbrella)(20,voice)(21,Walrus)(22,xeon)(23,Yam)(24,zebra)
instead of a perfect range index from 0 to 25 in old email thread. Why is that? 
Is this a bug, or some new feature I don't understand?
BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4 binary 
release.
Thanks
Yong  

JsonRDD to parquet -- data loss

2015-02-17 Thread Vasu C
Hi,

I am running spark batch processing job using spark-submit command. And
below is my code snippet.  Basically converting JsonRDD to parquet and
storing it in HDFS location.

The problem I am facing is if multiple jobs are are triggered parallely,
even though job executes properly (as i can see in spark webUI), there is
no parquet file created in hdfs path. If 5 jobs are executed parallely than
only 3 parquet files are getting created.

Is this the data loss scenario ? Or am I missing something here. Please
help me in this

Here tableName is unique with timestamp appended to it.


val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val jsonRdd  = sqlContext.jsonRDD(results)

val parquetTable = sqlContext.parquetFile(parquetFilePath)

parquetTable.registerTempTable(tableName)

jsonRdd.insertInto(tableName)


Regards,

  Vasu C


Berlin Apache Spark Meetup

2015-02-17 Thread Ralph Bergmann | the4thFloor.eu
Hi,


there is a small Spark Meetup group in Berlin, Germany :-)
http://www.meetup.com/Berlin-Apache-Spark-Meetup/

Plaes add this group to the Meetups list at
https://spark.apache.org/community.html


Ralph

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Berlin Apache Spark Meetup

2015-02-17 Thread Matei Zaharia
Thanks! I've added you.

Matei

 On Feb 17, 2015, at 4:06 PM, Ralph Bergmann | the4thFloor.eu 
 ra...@the4thfloor.eu wrote:
 
 Hi,
 
 
 there is a small Spark Meetup group in Berlin, Germany :-)
 http://www.meetup.com/Berlin-Apache-Spark-Meetup/
 
 Plaes add this group to the Meetups list at
 https://spark.apache.org/community.html
 
 
 Ralph
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MapValues and Shuffle Reads

2015-02-17 Thread Imran Rashid
Hi Darrin,

You are asking for something near  dear to me:
https://issues.apache.org/jira/browse/SPARK-1061

There is a PR attached there as well.  Note that you could do everything in
that PR in your own user code, you don't need to wait for it to get merged,
*except* for the change to HadoopRDD so that it sorts the input partitions.
 (Though of course, you could always just have your implementation of
HadoopRDD as well ...)

you could also vote for the issue  watch it as well to encourage some
progress on it :)

On Tue, Feb 17, 2015 at 2:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote:

 Thanks Imran.

 I think you are probably correct.  I was a bit surprised that there was no
 shuffle read in the initial hash partition step.  I will adjust the code as
 you suggest to prove that is the case.

 I have a slightly different question.  If I save an RDD to S3 (or some 
 equivalent)
 and this RDD was hash partitioned at the time, do I still need to hash
 partition the RDD again when I read it in?  Is there a way that I could
 prevent all of the shuffling (such as providing a hint)?  My parts for the
 RDD will be gzipped so they would not be splittable).  In reality, that's
 what I would really want to do in the first place.

 Thanks again for your insights.

 Darin.

   --
  *From:* Imran Rashid iras...@cloudera.com
 *To:* Darin McBeath ddmcbe...@yahoo.com
 *Cc:* User user@spark.apache.org
 *Sent:* Tuesday, February 17, 2015 3:29 PM
 *Subject:* Re: MapValues and Shuffle Reads

 Hi Darin,

 When you say you see 400GB of shuffle writes from the first code
 snippet, what do you mean?  There is no action in that first set, so it
 won't do anything.  By itself, it won't do any shuffle writing, or anything
 else for that matter.

 Most likely, the .count() on your second code snippet is actually causing
 the execution of some of the first snippet as well.  The .partitionBy will
 result in both shuffle writes and shuffle reads, but they aren't set in
 motion until the .count further down the line.  Its confusing b/c the stage
 boundaries don't line up exactly with your RDD variables here.  
 hsfBaselinePairRDD
 spans 2 stages, and baselinePairRDD actually gets merged into the stage
 above it.

 If you do a hsfBaselinePairRDD.count after your first code snippet, and
 then run the second code snippet afterwards, is it more like what you
 expect?

 Imran



 On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath 
 ddmcbe...@yahoo.com.invalid wrote:

 In the following code, I read in a large sequence file from S3 (1TB)
 spread across 1024 partitions.  When I look at the job/stage summary, I see
 about 400GB of shuffle writes which seems to make sense as I'm doing a hash
 partition on this file.

 // Get the baseline input file
 JavaPairRDDText,Text hsfBaselinePairRDDReadable =
 sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class,
 Text.class, Text.class);

 JavaPairRDDString, String hsfBaselinePairRDD =
 hsfBaselinePairRDDReadable.mapToPair(new
 ConvertFromWritableTypes()).partitionBy(new
 HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 I then execute the following code (with a count to force execution) and
 what I find very strange is that when I look at the job/stage summary, I
 see more than 340GB of shuffle read.  Why would there be any shuffle read
 in this step?  I would expect there to be little (if any) shuffle reads in
 this step.

 // Use 'substring' to extract the epoch value from each record.
 JavaPairRDDString, Long baselinePairRDD =
 hsfBaselinePairRDD.mapValues(new
 ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 log.info(Number of baseline records:  + baselinePairRDD.count());

 Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

 Any insights would be appreciated.

 I'm using Spark 1.2.0 in a stand-alone cluster.


 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: How do you get the partitioner for an RDD in Java?

2015-02-17 Thread Imran Rashid
a JavaRDD is just a wrapper around a normal RDD defined in scala, which is
stored in the rdd field.  You can access everything that way.  The
JavaRDD wrappers just provide some interfaces that are a bit easier to work
with in Java.

If this is at all convincing, here's me demonstrating it inside the
spark-shell (yes its scala, but I'm using the java api)

scala val jsc = new JavaSparkContext(sc)
 jsc: org.apache.spark.api.java.JavaSparkContext =
 org.apache.spark.api.java.JavaSparkContext@7d365529



scala val data = jsc.parallelize(java.util.Arrays.asList(Array(a, b,
 c)))
 data: org.apache.spark.api.java.JavaRDD[Array[String]] =
 ParallelCollectionRDD[0] at parallelize at console:15



scala data.rdd.partitioner
 res0: Option[org.apache.spark.Partitioner] = None


On Tue, Feb 17, 2015 at 3:44 PM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:

 In an 'early release' of the Learning Spark book, there is the following
 reference:

 In Scala and Java, you can determine how an RDD is partitioned using its
 partitioner property (or partitioner() method in Java)

 However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or
 a way of getting this information.

 I'm curious if anyone has any suggestions for how I might go about finding
 how an RDD is partitioned in a Java program.

 Thanks.

 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark Streaming output cannot be used as input?

2015-02-17 Thread Jose Fernandez
Hello folks,

Our intended use case is:

-  Spark Streaming app #1 reads from RabbitMQ and output to HDFS

-  Spark Streaming app #2 reads #1's output and stores the data into 
Elasticsearch

The idea behind this architecture is that if Elasticsearch is down due to an 
upgrade or system error we don't have to stop reading messages from the queue. 
We could also scale each process separately as needed.

After a few hours research my understanding is that Spark Streaming outputs 
files in a *directory* for which you provide the prefix and suffix. This is 
despite the ScalaDoc for DStream saveAsObjectFiles suggesting otherwise:

  /**
   * Save each RDD in this DStream as a Sequence file of serialized objects.
   * The file name at each batch interval is generated based on `prefix` and
   * `suffix`: prefix-TIME_IN_MS.suffix.
   */

Spark Streaming can monitor an HDFS directory for files but subfolders are not 
supported. So as far as I can tell, it is not possible to use Spark Streaming 
output as input for a different Spark Streaming app without somehow performing 
a separate operation in the middle.

Am I missing something obvious? I've read some suggestions like using Hadoop to 
merge the directories (whose names I don't see how you would know) and to 
reduce the partitions to 1 (which wouldn't help).

Any other suggestions? What is the expected pattern a developer would follow 
that would make Spark Streaming's output format usable?

/prefont face=arial size=2 color=#736F6E



a 
href=http://www.sdl.com/?utm_source=Emailutm_medium=Email%2BSignatureutm_campaign=SDL%2BStandard%2BEmail%2BSignature;
img src=http://www.sdl.com/Content/images/SDLlogo2014.png; 
border=0brbrwww.sdl.com
/abrbr

font face=arial size=1 color=#736F6E

bSDL PLC confidential, all rights reserved./b

If you are not the intended recipient of this mail SDL requests and requires 
that you delete it without acting upon or copying any of its contents, 
and we further request that you advise us.BRBR
SDL PLC is a public limited company registered in England and Wales.  
Registered number: 02675207.

br

Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, 
UK./font


This message has been scanned for malware by Websense. www.websense.com


spark-core in a servlet

2015-02-17 Thread Ralph Bergmann | the4thFloor.eu
Hi,


I want to use spark-core inside of a HttpServlet. I use Maven for the
build task but I have a dependency problem :-(

I get this error message:

ClassCastException:
com.sun.jersey.server.impl.container.servlet.JerseyServletContainerInitializer
cannot be cast to javax.servlet.ServletContainerInitializer

When I add this exclusions it builds but than there are other classes
not found at runtime:

  dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.11/artifactId
 version1.2.1/version
 exclusions
exclusion
   groupIdorg.apache.hadoop/groupId
   artifactIdhadoop-client/artifactId
/exclusion
exclusion
   groupIdorg.eclipse.jetty/groupId
   artifactId*/artifactId
/exclusion
 /exclusions
  /dependency


What can I do?


Thanks a lot!,

Ralph

-- 

Ralph Bergmann

iOS and Android app developer


www  http://www.the4thFloor.eu

mail ra...@the4thfloor.eu
skypedasralph

google+  https://plus.google.com/+RalphBergmann
xing https://www.xing.com/profile/Ralph_Bergmann3
linkedin https://www.linkedin.com/in/ralphbergmann
gulp https://www.gulp.de/Profil/RalphBergmann.html
github   https://github.com/the4thfloor


pgp key id   0x421F9B78
pgp fingerprint  CEE3 7AE9 07BE 98DF CD5A E69C F131 4A8E 421F 9B78
project
   xmlns=http://maven.apache.org/POM/4.0.0;
   xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
   xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;
   modelVersion4.0.0/modelVersion

   nameSparkWordCount/name
   version1.0-SNAPSHOT/version
   groupIdeu.the4thfloor/groupId
   artifactIdSparkWordCount/artifactId
   packagingwar/packaging
   urlhttp://maven.apache.org/url

   properties
  jdk.version1.7/jdk.version
   /properties

   dependencies
  dependency
 groupIdjunit/groupId
 artifactIdjunit/artifactId
 version4.11/version
 scopetest/scope
  /dependency
  dependency
 groupIdjavax.servlet/groupId
 artifactIdjavax.servlet-api/artifactId
 version3.1.0/version
 scopeprovided/scope
  /dependency
  dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.11/artifactId
 version1.2.1/version
 exclusions
exclusion
   groupIdorg.apache.hadoop/groupId
   artifactIdhadoop-client/artifactId
/exclusion
exclusion
   groupIdorg.eclipse.jetty/groupId
   artifactId*/artifactId
/exclusion
 /exclusions
  /dependency
   /dependencies

   build
  finalNameSparkWordCount/finalName
  plugins
 !-- Eclipse project --
 plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-eclipse-plugin/artifactId
version2.9/version
configuration
   !-- Always download and attach dependencies source code --
   downloadSourcestrue/downloadSources
   downloadJavadocsfalse/downloadJavadocs
   !-- Avoid type mvn eclipse:eclipse -Dwtpversion=2.0 --
   wtpversion2.0/wtpversion
/configuration
 /plugin
 !-- Set JDK Compiler Level --
 plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-compiler-plugin/artifactId
version2.3.2/version
configuration
   source${jdk.version}/source
   target${jdk.version}/target
/configuration
 /plugin
 !-- For Maven Tomcat Plugin --
 plugin
groupIdorg.apache.tomcat.maven/groupId
artifactIdtomcat7-maven-plugin/artifactId
version2.2/version
configuration
   path/SparkWordCount/path
/configuration
 /plugin
  /plugins
   /build
/project

signature.asc
Description: OpenPGP digital signature


Re: Lost task - connection closed

2015-02-17 Thread Tianshuo Deng
Hi, Thanks for the reponse.
I discovered my problem was that some of the executors got OOM, tracing
down the logs of executors helps discovering the problem. Usually the log
from the driver do not reflect the OOM error and therefore causes
confusions among users.

This is just the discoveries on my side, not sure if OP was having the same
problem though

On Wed, Feb 11, 2015 at 12:03 AM, Arush Kharbanda 
ar...@sigmoidanalytics.com wrote:

 Hi

 Can you share the code you are trying to run.

 Thanks
 Arush

 On Wed, Feb 11, 2015 at 9:12 AM, Tianshuo Deng td...@twitter.com.invalid
 wrote:

 I have seen the same problem, It causes some tasks to fail, but not the
 whole job to fail.
 Hope someone could shed some light on what could be the cause of this.

 On Mon, Jan 26, 2015 at 9:49 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 It looks like something weird is going on with your object
 serialization, perhaps a funny form of self-reference which is not detected
 by ObjectOutputStream's typical loop avoidance. That, or you have some data
 structure like a linked list with a parent pointer and you have many
 thousand elements.

 Assuming the stack trace is coming from an executor, it is probably a
 problem with the objects you're sending back as results, so I would
 carefully examine these and maybe try serializing some using
 ObjectOutputStream manually.

 If your program looks like
 foo.map { row = doComplexOperation(row) }.take(10)

 you can also try changing it to
 foo.map { row = doComplexOperation(row); 1 }.take(10)

 to avoid serializing the result of that complex operation, which should
 help narrow down where exactly the problematic objects are coming from.

 On Mon, Jan 26, 2015 at 8:31 AM, octavian.ganea 
 octavian.ga...@inf.ethz.ch wrote:

 Here is the first error I get at the executors:

 15/01/26 17:27:04 ERROR ExecutorUncaughtExceptionHandler: Uncaught
 exception
 in thread Thread[handle-message-executor-16,5,main]
 java.lang.StackOverflowError
 at

 java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
 at

 java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1840)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1533)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 

Re: JsonRDD to parquet -- data loss

2015-02-17 Thread Arush Kharbanda
I am not sure, if this the easiest way to solve your problem. But you can
connect to the HIVE metastore(through derby) and find the HDFS path from
there.

On Wed, Feb 18, 2015 at 9:31 AM, Vasu C vasuc.bigd...@gmail.com wrote:

 Hi,

 I am running spark batch processing job using spark-submit command. And
 below is my code snippet.  Basically converting JsonRDD to parquet and
 storing it in HDFS location.

 The problem I am facing is if multiple jobs are are triggered parallely,
 even though job executes properly (as i can see in spark webUI), there is
 no parquet file created in hdfs path. If 5 jobs are executed parallely than
 only 3 parquet files are getting created.

 Is this the data loss scenario ? Or am I missing something here. Please
 help me in this

 Here tableName is unique with timestamp appended to it.


 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 val jsonRdd  = sqlContext.jsonRDD(results)

 val parquetTable = sqlContext.parquetFile(parquetFilePath)

 parquetTable.registerTempTable(tableName)

 jsonRdd.insertInto(tableName)


 Regards,

   Vasu C




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-17 Thread Tom Walwyn
Thanks for the reply, I'll try your suggestions.

Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
of (Int, Int). I'm doing the self-join so I can count two things. First, I
can count the number of times a value appears in the data set. Second I can
count number of times values occur with the same key. For example, if I
have the following:

(1,2)
(1,3)
(4,3)

Then joining with itself I get:

(1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
(1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
(1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
(1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
(4,(3,3)) - map - ((3,3),1) _|

Note that I want to keep the duplicates (2,2) and reflections.

Rgds

On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 the issue. I've also tried increasing the number of partitions after the
 join dramatically (up to 15000). Nothing has been effective. Thus, I'm
 beginning to suspect I don't have enough resources for the job.

 Does anyone have a feeling about what the resource requirements would be
 for a use-case like this? I could scale the cluster up if necessary, but
 would like to avoid it. I'm willing to accept longer computation times if
 that is an option.

 Warm Regards,
 Thomas





Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-17 Thread dgoldenberg
I'm getting the below error when running spark-submit on my class. This class
has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ
4.10.3 from within the class.

This is in conflict with the older version, HttpClient 3.1 that's a
dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4).

I've tried setting spark.files.userClassPathFirst to true in SparkConf in my
program, also setting it to true in  $SPARK-HOME/conf/spark-defaults.conf as

spark.files.userClassPathFirst true

No go, I'm still getting the error, as below. Is there anything else I can
try? Are there any plans in Spark to support multiple class loaders?

Exception in thread main java.lang.NoSuchMethodError:
org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
at
org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
at
org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
at
org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
at
org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
at
org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
at
org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
at
org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
at
org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)
...





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-17 Thread Tom Walwyn
Hi All,

I'm a new Spark (and Hadoop) user and I want to find out if the cluster
resources I am using are feasible for my use-case. The following is a
snippet of code that is causing a OOM exception in the executor after about
125/1000 tasks during the map stage.

 val rdd2 = rdd.join(rdd, numPartitions=1000)
 .map(fp=((fp._2._1, fp._2._2), 1))
 .reduceByKey((x,y)=x+y)
 rdd2.count()

Which errors with a stack trace like:

 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
stage 2.0 (TID 498)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at
scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
 at
scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
 at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
 at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
 at scala.collection.immutable.List.foreach(List.scala:318)

rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
co-occuring values by key in the dataset, i.e. 'These two numbers occurred
with the same key n times'. I intentionally don't want to filter out
duplicates and reflections. rdd is about 3.6 million records, which has a
size in memory of about 120MB, and results in a 'joined' RDD (before the
reduceByKey stage) of around 460 million records, with a size in memory of
about 35GB.

My cluster setup is as follows. I have 3 nodes, where each node has 2 cores
and about 7.5GB of memory. I'm running Spark on YARN. The driver and
executors are allowed 1280m each and the job has 5 executors and 1 driver.
Additionally, I have set spark.storage.memoryFraction to 0.06, and
spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
the issue. I've also tried increasing the number of partitions after the
join dramatically (up to 15000). Nothing has been effective. Thus, I'm
beginning to suspect I don't have enough resources for the job.

Does anyone have a feeling about what the resource requirements would be
for a use-case like this? I could scale the cluster up if necessary, but
would like to avoid it. I'm willing to accept longer computation times if
that is an option.

Warm Regards,
Thomas


RE: Percentile example

2015-02-17 Thread SiMaYunRui
Thanks Imran for very detailed explanations and options. I think for now 
T-Digest is what I want. 

From: iras...@cloudera.com
Date: Tue, 17 Feb 2015 08:39:48 -0600
Subject: Re: Percentile example
To: myl...@hotmail.com
CC: user@spark.apache.org

(trying to repost to the list w/out URLs -- rejected as spam earlier)
Hi,
Using take() is not a good idea, as you have noted it will pull a lot of data 
down to the driver so its not scalable.  Here are some more scalable 
alternatives:
1. Approximate solutions
1a. Sample the data.  Just sample some of the data to the driver, sort that 
data in memory, and take the 66th percentile of that sample.
1b.  Make a histogram with pre-determined buckets.  Eg., if you know your data 
ranges from 0 to 1 and is uniform-ish, you could make buckets every 0.01.  Then 
count how many data points go into each bucket.  Or if you only care about 
relative error and you have integers (often the case if your data is counts), 
then you can span the full range of integers with a relatively small number of 
buckets.  Eg., you only need 200 buckets for 5% error.  See the Histogram class 
in twitter's Ostrich library
The problem is, if you have no idea what the distribution of your data is, its 
very hard to come up with good buckets; you could have an arbitrary amount of 
data going to one bucket, and thus tons of error.
1c.  Use a TDigest , a compact  scalable data structure for approximating 
distributions, and performs reasonably across a wide range of distributions.  
You would make one TDigest for each partition (with mapPartitions), and then 
merge all of the TDigests together.  I wrote up a little more detail on this 
earlier, you can search the spark-user on nabble for tdigest
2. Exact solutions.  There are also a few options here, but I'll give one that 
is a variant of what you suggested.  Start out by doing a sortByKey.  Then 
figure out how many records you have in each partitions (with mapPartitions).  
Figure out which partition the 66th percentile would be in.  Then just read the 
one partition you want, and go down to the Nth record in that partition.
To read the one partition you want, you can either (a) use 
mapPartitionsWithIndex, and just ignore every partition that isnt' the one you 
want or (b) use PartitionPruningRDD.  PartitionPruningRDD will avoid launching 
empty tasks on the other partitions, so it will be slightly more efficient, but 
its also a developer api, so perhaps not worth going to that level of detail.
Note that internally, sortByKey will sample your data to get an approximate 
distribution, to figure out what data to put in each partition.  However, your 
still getting an exact answer this way -- the approximation is only important 
for distributing work among all executors.  Even if the approximation is 
inaccurate, you'll still correct for it, you will just have unequal partitions.
Imran On Sun, Feb 15, 2015 at 9:37 AM, SiMaYunRui myl...@hotmail.com wrote:



hello, 
I am a newbie to spark and trying to figure out how to get percentile against a 
big data set. Actually, I googled this topic but not find any very useful code 
example and explanation. Seems that I can use transformer SortBykey to get my 
data set in order, but not pretty sure how can I get value of , for example, 
percentile 66. 
Should I use take() to pick up the value of percentile 66? I don't believe any 
machine can load my data set in memory. I believe there must be more efficient 
approaches. 
Can anyone shed some light on this problem? 
Regards
  



  

Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-17 Thread Akhil Das
Why are you joining the rdd with itself?

You can try these things:

- Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

- Set your default Serializer to Kryo (.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer))

- Enable rdd compression (.set(spark.rdd.compress,true))


Thanks
Best Regards

On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 the issue. I've also tried increasing the number of partitions after the
 join dramatically (up to 15000). Nothing has been effective. Thus, I'm
 beginning to suspect I don't have enough resources for the job.

 Does anyone have a feeling about what the resource requirements would be
 for a use-case like this? I could scale the cluster up if necessary, but
 would like to avoid it. I'm willing to accept longer computation times if
 that is an option.

 Warm Regards,
 Thomas




Re: spark-core in a servlet

2015-02-17 Thread Arush Kharbanda
I am not sure if this could be causing the issue but spark  is compatible
with scala 2.10.
Instead of spark-core_2.11 you might want to try spark-core_2.10

On Wed, Feb 18, 2015 at 5:44 AM, Ralph Bergmann | the4thFloor.eu 
ra...@the4thfloor.eu wrote:

 Hi,


 I want to use spark-core inside of a HttpServlet. I use Maven for the
 build task but I have a dependency problem :-(

 I get this error message:

 ClassCastException:

 com.sun.jersey.server.impl.container.servlet.JerseyServletContainerInitializer
 cannot be cast to javax.servlet.ServletContainerInitializer

 When I add this exclusions it builds but than there are other classes
 not found at runtime:

   dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.11/artifactId
  version1.2.1/version
  exclusions
 exclusion
groupIdorg.apache.hadoop/groupId
artifactIdhadoop-client/artifactId
 /exclusion
 exclusion
groupIdorg.eclipse.jetty/groupId
artifactId*/artifactId
 /exclusion
  /exclusions
   /dependency


 What can I do?


 Thanks a lot!,

 Ralph

 --

 Ralph Bergmann

 iOS and Android app developer


 www  http://www.the4thFloor.eu

 mail ra...@the4thfloor.eu
 skypedasralph

 google+  https://plus.google.com/+RalphBergmann
 xing https://www.xing.com/profile/Ralph_Bergmann3
 linkedin https://www.linkedin.com/in/ralphbergmann
 gulp https://www.gulp.de/Profil/RalphBergmann.html
 github   https://github.com/the4thfloor


 pgp key id   0x421F9B78
 pgp fingerprint  CEE3 7AE9 07BE 98DF CD5A E69C F131 4A8E 421F 9B78




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


RE: Percentile example

2015-02-17 Thread SiMaYunRui
Thanks Kohler, that's very interesting approach. I never used Spark SQL and not 
sure whether my cluster was configured well for it. But will definitely have a 
try. 

From: c.koh...@elsevier.com
To: myl...@hotmail.com; user@spark.apache.org
Subject: Re: Percentile example
Date: Tue, 17 Feb 2015 17:41:53 +













The best approach I’ve found to calculate Percentiles in Spark is to leverage 
SparkSQL.  If you use the Hive Query Language support, you can use the UDAFs 
for percentiles (as of Spark 1.2)



Something like this (Note: syntax not guaranteed to run but should give you the 
gist of what you need to do):  




JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaHiveContext hsc = new JavaHiveContext(sc);
//Get your Data into a SchemaRDD and register the Table



// Query it

String hql =  SELECT FIELD1, FIELD2, percentile(FIELD3, 0.05) AS
 ptile5 from TABLE-NAME GROUP BY FIELD1, FIELD2;”

JavaSchemaRDD result = hsc.hql(hql);
ListRow grp = result.collect();





for (int z = 2; z 
  row.length(); z++) {
  // Do something with the results
}




Curt









From: SiMaYunRui myl...@hotmail.com

Date: Sunday, February 15, 2015 at 10:37 AM

To: user@spark.apache.org user@spark.apache.org

Subject: Percentile example







hello, 



I am a newbie to spark and trying to figure out how to get percentile against a 
big data set. Actually, I googled this topic but not find any very useful code 
example and explanation. Seems that I can use transformer SortBykey to get my 
data set in order,
 but not pretty sure how can I get value of , for example, percentile 66. 



Should I use take() to pick up the value of percentile 66? I don't believe any 
machine can load my data set in memory. I believe there must be more efficient 
approaches. 



Can anyone shed some light on this problem? 



Regards






  

Re: How to pass parameters to a spark-jobserver Scala class?

2015-02-17 Thread Vasu C
Hi Sasi,

To pass parameters to spark-jobserver usecurl -d input.string = a b c
a b see  and in Job server class use config.getString(input.string). 

You can pass multiple parameters like starttime,endtime etc and use
config.getString() to get the values.

The examples are shown here
https://github.com/spark-jobserver/spark-jobserver


Regards,
   Vasu C



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-parameters-to-a-spark-jobserver-Scala-class-tp21671p21692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RangePartitioner in Spark 1.2.1

2015-02-17 Thread Aaron Davidson
RangePartitioner does not actually provide a guarantee that all partitions
will be equal sized (that is hard), and instead uses sampling to
approximate equal buckets. Thus, it is possible that a bucket is left empty.

If you want the specified behavior, you should define your own partitioner.
It would look something like this (untested):
class AlphabetPartitioner extends Partitioner {
  def numPartitions = 26
  def getPartition(key: Any): Int = key match {
case s: String = s(0).toUpper - 'A'
  }
  override def equals(other: Any): Boolean =
other.isInstanceOf[AlphabetPartitioner]
  override def hashCode: Int = 0
}

On Tue, Feb 17, 2015 at 7:05 PM, java8964 java8...@hotmail.com wrote:

 Hi, Sparkers:

 I just happened to search in google for something related to the
 RangePartitioner of spark, and found an old thread in this email list as
 here:


 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html

 I followed the code example mentioned in that email thread as following:

 scala  import org.apache.spark.RangePartitioner
 import org.apache.spark.RangePartitioner

 scala val rdd = sc.parallelize(List(apple, Ball, cat, dog,
 Elephant, fox, gas, horse, index, jet, kitsch, long,
 moon, Neptune, ooze, Pen, quiet, rose, sun, talk,
 umbrella, voice, Walrus, xeon, Yam, zebra))
 rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
 parallelize at console:13

 scala rdd.keyBy(s = s(0).toUpper)
 res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at
 console:16

 scala res0.partitionBy(new RangePartitioner[Char, String](26,
 res0)).values
 res1: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
 console:18

 scala res1.mapPartitionsWithIndex((idx, itr) = itr.map(s = (idx,
 s))).collect.foreach(println)

 The above example is clear for me to understand the meaning of the
 RangePartitioner, but to my surprise, I got the following result:

 *(0,apple)*
 *(0,Ball)*
 (1,cat)
 (2,dog)
 (3,Elephant)
 (4,fox)
 (5,gas)
 (6,horse)
 (7,index)
 (8,jet)
 (9,kitsch)
 (10,long)
 (11,moon)
 (12,Neptune)
 (13,ooze)
 (14,Pen)
 (15,quiet)
 (16,rose)
 (17,sun)
 (18,talk)
 (19,umbrella)
 (20,voice)
 (21,Walrus)
 (22,xeon)
 (23,Yam)
 (24,zebra)

 instead of a perfect range index from 0 to 25 in old email thread. Why is
 that? Is this a bug, or some new feature I don't understand?

 BTW, the above environment I tested is in Spark 1.2.1 with Hadoop 2.4
 binary release.

 Thanks

 Yong



Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-17 Thread Arush Kharbanda
Hi

Did you try to make maven pick the latest version

http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management

That way solrj won't cause any issue, you can try this and check if the
part of your code where you access HDFS works fine?



On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg dgoldenberg...@gmail.com
wrote:

 I'm getting the below error when running spark-submit on my class. This
 class
 has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ
 4.10.3 from within the class.

 This is in conflict with the older version, HttpClient 3.1 that's a
 dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4).

 I've tried setting spark.files.userClassPathFirst to true in SparkConf in
 my
 program, also setting it to true in  $SPARK-HOME/conf/spark-defaults.conf
 as

 spark.files.userClassPathFirst true

 No go, I'm still getting the error, as below. Is there anything else I can
 try? Are there any plans in Spark to support multiple class loaders?

 Exception in thread main java.lang.NoSuchMethodError:

 org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
 at

 org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
 at

 org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
 at

 org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
 at

 org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
 at

 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:168)
 at

 org.apache.solr.client.solrj.impl.HttpSolrServer.init(HttpSolrServer.java:141)
 ...





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Identify the performance bottleneck from hardware prospective

2015-02-17 Thread Akhil Das
It would be good if you can share the piece of code that you are using, so
people can suggest you how to optimize it further and stuffs like that.
Also, since you are having 20Gb of memory and ~30Gb of data, you can try
doing a rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
or .persist(StorageLevel.MEMORY_AND_DISK_2), ~12Gb of memory will be usable
by default out of 20Gb, you can increase it by setting
spark.storage.memoryFraction.

Thanks
Best Regards

On Tue, Feb 17, 2015 at 4:06 PM, Julaiti Alafate jalaf...@eng.ucsd.edu
wrote:

 Thank you very much for your reply!

 My task is to count the number of word pairs in a document. If w1 and w2
 occur together in one sentence, the number of occurrence of word pair (w1,
 w2) adds 1. So the computational part of this algorithm is simply a
 two-level for-loop.

 Since the cluster is monitored by Ganglia, I can easily see that neither
 CPU or network IO is under pressure. The only parameter left is memory. In
 the executor tab of Spark Web UI, I can see a column named memory used.
 It showed that only 6GB of 20GB memory is used. I understand this is
 measuring the size of RDD that persist in memory. So can I at least assume
 the data/object I used in my program is not exceeding memory limit?

 My confusion here is, why can't my program run faster while there is still
 efficient memory, CPU time and network bandwidth it can utilize?

 Best regards,
 Julaiti


 On Tue, Feb 17, 2015 at 12:53 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 What application are you running? Here's a few things:

 - You will hit bottleneck on CPU if you are doing some complex
 computation (like parsing a json etc.)
 - You will hit bottleneck on Memory if your data/objects used in the
 program is large (like defining playing with HashMaps etc inside your map*
 operations), Here you can set spark.executor.memory to a higher number and
 also you can change the spark.storage.memoryFraction whose default value is
 0.6 of your executor memory.
 - Network will be a bottleneck if data is not available locally on one of
 the worker and hence it has to collect it from others, which is a lot of
 Serialization and data transfer across your cluster.

 Thanks
 Best Regards

 On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate jalaf...@eng.ucsd.edu
 wrote:

 Hi there,

 I am trying to scale up the data size that my application is handling.
 This application is running on a cluster with 16 slave nodes. Each slave
 node has 60GB memory. It is running in standalone mode. The data is coming
 from HDFS that also in same local network.

 In order to have an understanding on how my program is running, I also
 had a Ganglia installed on the cluster. From previous run, I know the stage
 that taking longest time to run is counting word pairs (my RDD consists of
 sentences from a corpus). My goal is to identify the bottleneck of my
 application, then modify my program or hardware configurations according to
 that.

 Unfortunately, I didn't find too much information on Spark monitoring
 and optimization topics. Reynold Xin gave a great talk on Spark Summit 2014
 for application tuning from tasks perspective. Basically, his focus is on
 tasks that oddly slower than the average. However, it didn't solve my
 problem because there is no such tasks that run way slow than others in my
 case.

 So I tried to identify the bottleneck from hardware prospective. I want
 to know what the limitation of the cluster is. I think if the executers are
 running hard, either CPU, memory or network bandwidth (or maybe the
 combinations) is hitting the roof. But Ganglia reports the CPU utilization
 of cluster is no more than 50%, network utilization is high for several
 seconds at the beginning, then drop close to 0. From Spark UI, I can see
 the nodes with maximum memory usage is consuming around 6GB, while
 spark.executor.memory is set to be 20GB.

 I am very confused that the program is not running fast enough, while
 hardware resources are not in shortage. Could you please give me some hints
 about what decides the performance of a Spark application from hardware
 perspective?

 Thanks!

 Julaiti






Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-17 Thread Charles Feduke
Emre,

As you are keeping the properties file external to the JAR you need to make
sure to submit the properties file as an additional --files (or whatever
the necessary CLI switch is) so all the executors get a copy of the file
along with the JAR.

If you know you are going to just put the properties file on HDFS then why
don't you define a custom system setting like properties.url and pass it
along:

(this is for Spark shell, the only CLI string I have available at the
moment:)

spark-shell --jars $JAR_NAME \
--conf 'properties.url=hdfs://config/stuff.properties' \
--conf
'spark.executor.extraJavaOptions=-Dproperties.url=hdfs://config/stuff.properties'

... then load the properties file during initialization by examining the
properties.url system setting.

I'd still strongly recommend Typesafe Config as it makes this a lot less
painful, and I know for certain you can place your *.conf at a URL (using
the -Dconfig.url=) though it probably won't work with an HDFS URL.


On Tue Feb 17 2015 at 9:53:08 AM Gerard Maas gerard.m...@gmail.com wrote:

 +1 for TypeSafe config
 Our practice is to include all spark properties under a 'spark' entry in
 the config file alongside job-specific configuration:

 A config file would look like:
 spark {
  master = 
  cleaner.ttl = 123456
  ...
 }
 job {
 context {
 src = foo
 action = barAction
 }
 prop1 = val1
 }

 Then, to create our Spark context, we transparently pass the spark section
 to a SparkConf instance.
 This idiom will instantiate the context with the spark specific
 configuration:


 sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark)))

 And we can make use of the config object everywhere else.

 We use the override model of the typesafe config: reasonable defaults go
 in the reference.conf (within the jar). Environment-specific overrides go
 in the application.conf (alongside the job jar) and hacks are passed with
 -Dprop=value :-)


 -kr, Gerard.


 On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 I've decided to try

   spark-submit ... --conf
 spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties

 But when I try to retrieve the value of propertiesFile via

System.err.println(propertiesFile :  +
 System.getProperty(propertiesFile));

 I get NULL:

propertiesFile : null

 Interestingly, when I run spark-submit with --verbose, I see that it
 prints:

   spark.driver.extraJavaOptions -
 -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties

 I couldn't understand why I couldn't get to the value of propertiesFile
 by using standard System.getProperty method. (I can use new
 SparkConf().get(spark.driver.extraJavaOptions)  and manually parse it,
 and retrieve the value, but I'd like to know why I cannot retrieve that
 value using System.getProperty method).

 Any ideas?

 If I can achieve what I've described above properly, I plan to pass a
 properties file that resides on HDFS, so that it will be available to my
 driver program wherever that program runs.

 --
 Emre




 On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com
  wrote:

 I haven't actually tried mixing non-Spark settings into the Spark
 properties. Instead I package my properties into the jar and use the
 Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
 specific) to get at my properties:

 Properties file: src/main/resources/integration.conf

 (below $ENV might be set to either integration or prod[3])

 ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \
 --conf 'config.resource=$ENV.conf' \
 --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'

 Since the properties file is packaged up with the JAR I don't have to
 worry about sending the file separately to all of the slave nodes. Typesafe
 Config is written in Java so it will work if you're not using Scala. (The
 Typesafe Config also has the advantage of being extremely easy to integrate
 with code that is using Java Properties today.)

 If you instead want to send the file separately from the JAR and you use
 the Typesafe Config library, you can specify config.file instead of
 .resource; though I'd point you to [3] below if you want to make your
 development life easier.

 1. https://github.com/typesafehub/config
 2. https://github.com/ceedubs/ficus
 3.
 http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/



 On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I'm using Spark 1.2.1 and have a module.properties file, and in it I
 have non-Spark properties, as well as Spark properties, e.g.:

job.output.dir=file:///home/emre/data/mymodule/out

 I'm trying to pass it to spark-submit via:

spark-submit --class com.myModule --master local[4] --deploy-mode
 client --verbose --properties-file /home/emre/data/mymodule.properties
 mymodule.jar

 And I 

Re: Percentile example

2015-02-17 Thread Imran Rashid
(trying to repost to the list w/out URLs -- rejected as spam earlier)

Hi,

Using take() is not a good idea, as you have noted it will pull a lot of
data down to the driver so its not scalable.  Here are some more scalable
alternatives:

1. Approximate solutions

1a. Sample the data.  Just sample some of the data to the driver, sort that
data in memory, and take the 66th percentile of that sample.

1b.  Make a histogram with pre-determined buckets.  Eg., if you know your
data ranges from 0 to 1 and is uniform-ish, you could make buckets every
0.01.  Then count how many data points go into each bucket.  Or if you only
care about relative error and you have integers (often the case if your
data is counts), then you can span the full range of integers with a
relatively small number of buckets.  Eg., you only need 200 buckets for 5%
error.  See the Histogram class in twitter's Ostrich library

The problem is, if you have no idea what the distribution of your data is,
its very hard to come up with good buckets; you could have an arbitrary
amount of data going to one bucket, and thus tons of error.

1c.  Use a TDigest , a compact  scalable data structure for approximating
distributions, and performs reasonably across a wide range of
distributions.  You would make one TDigest for each partition (with
mapPartitions), and then merge all of the TDigests together.  I wrote up a
little more detail on this earlier, you can search the spark-user on nabble
for tdigest

2. Exact solutions.  There are also a few options here, but I'll give one
that is a variant of what you suggested.  Start out by doing a sortByKey.
Then figure out how many records you have in each partitions (with
mapPartitions).  Figure out which partition the 66th percentile would be
in.  Then just read the one partition you want, and go down to the Nth
record in that partition.

To read the one partition you want, you can either (a) use
mapPartitionsWithIndex, and just ignore every partition that isnt' the one
you want or (b) use PartitionPruningRDD.  PartitionPruningRDD will avoid
launching empty tasks on the other partitions, so it will be slightly more
efficient, but its also a developer api, so perhaps not worth going to that
level of detail.

Note that internally, sortByKey will sample your data to get an approximate
distribution, to figure out what data to put in each partition.  However,
your still getting an exact answer this way -- the approximation is only
important for distributing work among all executors.  Even if the
approximation is inaccurate, you'll still correct for it, you will just
have unequal partitions.

Imran


 On Sun, Feb 15, 2015 at 9:37 AM, SiMaYunRui myl...@hotmail.com wrote:

 hello,

 I am a newbie to spark and trying to figure out how to get percentile
 against a big data set. Actually, I googled this topic but not find any
 very useful code example and explanation. Seems that I can use transformer
 SortBykey to get my data set in order, but not pretty sure how can I get
 value of , for example, percentile 66.

 Should I use take() to pick up the value of percentile 66? I don't
 believe any machine can load my data set in memory. I believe there must be
 more efficient approaches.

 Can anyone shed some light on this problem?

 *Regards*





Processing graphs

2015-02-17 Thread Vijayasarathy Kannan
Hi,

I am working on a Spark application that processes graphs and I am trying
to do the following.

- group the vertices (key - vertex, value - set of its outgoing edges)
- distribute each key to separate processes and process them (like mapper)
- reduce the results back at the main process

Does the groupBy functionality do the distribution by default?
Do we have to explicitly use RDDs to enable automatic distribution?

It'd be great if you could help me understand these and how to go about
with the problem.

Thanks.


Re: Tuning number of partitions per CPU

2015-02-17 Thread Sean Owen
More tasks means a little *more* total CPU time is required, not less,
because of the overhead of handling tasks. However, more tasks can
actually mean less wall-clock time.

This is because tasks vary in how long they take. If you have 1 task
per core, the job takes as long as the slowest task and at the end all
other cores are idling.

Splitting up the tasks makes the distribution of time taken across
cores more even on average. That is, the variance of task completion
times is higher than the variance of the sum of N task completion
times each of which is 1/N the size.

Whether this is actually better depends on the per-task overhead and
variance in execution time. With high overhead and low variance, one
task per core is probably optimal.

On Tue, Feb 17, 2015 at 3:38 PM, Igor Petrov igorpetrov...@gmail.com wrote:
 Hello,

 thank for your replies. The question is actually about the recommendation in
 Spark docs: Typically you want 2-4 partitions for each CPU in your cluster.

 Why having several partitions per CPU is better than one partition per CPU?
 How one CPU can handle several tasks faster than one task?

 Thank You


 On Fri, Feb 13, 2015 at 2:44 PM, Puneet Kumar Ojha
 puneet.ku...@pubmatic.com wrote:

 Use below configuration if u r using 1.2 version:-

 SET spark.shuffle.consolidateFiles=true;
 SET spark.rdd.compress=true;
 SET spark.default.parallelism=1000;
 SET spark.deploy.defaultCores=54;

 Thanks
 Puneet.

 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Friday, February 13, 2015 4:46 PM
 To: Igor Petrov
 Cc: user@spark.apache.org
 Subject: Re: Tuning number of partitions per CPU

 18 cores or 36? doesn't probably matter.
 For this case where you have some overhead per partition of setting up the
 DB connection, it may indeed not help to chop up the data more finely than
 your total parallelism. Although that would imply quite an overhead. Are you
 doing any other expensive initialization per partition in your code?
 You might check some other basic things, like, are you bottlenecked on the
 DB (probably not) and are there task stragglers drawing out the completion
 time.

 On Fri, Feb 13, 2015 at 11:06 AM, Igor Petrov igorpetrov...@gmail.com
 wrote:
  Hello,
 
  In Spark programming guide
  (http://spark.apache.org/docs/1.2.0/programming-guide.html) there is a
  recommendation:
  Typically you want 2-4 partitions for each CPU in your cluster.
 
  We have a Spark Master and two Spark workers each with 18 cores and 18
  GB of RAM.
  In our application we use JdbcRDD to load data from a DB and then cache
  it.
  We load entities from a single table, now we have 76 million of
  entities (entity size in memory is about 160 bytes). We call count()
  during application startup to force entities loading. Here are our
  measurements for
  count() operation (cores x partitions = time):
  36x36 = 6.5 min
  36x72 = 7.7 min
  36x108 = 9.4 min
 
  So despite recommendations the most efficient setup is one partition
  per core. What is the reason for above recommendation?
 
  Java 8, Apache Spark 1.1.0
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Tuning-number-of-p
  artitions-per-CPU-tp21642.html Sent from the Apache Spark User List
  mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
  additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: SparkSQL + Tableau Connector

2015-02-17 Thread Andrew Lee
Hi Todd,
When I see /data/json appears in your log, I have a feeling that that is the 
default hive.metastore.warehouse.dir from hive-site.xml where the value is 
/data/.  Could you check that property and see if you can point that to the 
correct Hive table HDFS directory?
Another thing to look at is the Hive metastore mysql database if you are using 
mysql as the DB for metastore.
Date: Wed, 11 Feb 2015 19:53:35 -0500
Subject: Re: SparkSQL + Tableau Connector
From: tsind...@gmail.com
To: alee...@hotmail.com
CC: ar...@sigmoidanalytics.com; user@spark.apache.org

First sorry for the long post.  So back to tableau and Spark SQL, I'm still 
missing something.  
TL;DR
To get the Spark SQL Temp table associated with the metastore are there 
additional steps required beyond doing the below?

Initial SQL on connection:








create temporary table test 
using org.apache.spark.sql.json  
options (path '/data/json/*');
cache table test;

I feel like I'm missing a step of associating the Spark SQL table with the 
metastore, do I need to actually save it in some fashion?   I'm trying to avoid 
saving to hive if possible.
Details:
I configured the hive-site.xml and placed it in the $SPARK_HOME/conf.  It looks 
like this, thanks Andrew and Arush for the assistance:
?xml version=1.0??xml-stylesheet type=text/xsl href=configuration.xsl?
configuration  propertynamehive.semantic.analyzer.factory.impl/name 
   valueorg.apache.hcatalog.cli.HCatSemanticAnalyzerFactory/value  
/property
  propertynamehive.metastore.sasl.enabled/name
valuefalse/value  /property
  propertynamehive.server2.authentication/namevalueNONE/value 
 /property
  propertynamehive.server2.enable.doAs/namevaluetrue/value  
/property
  !--  propertynamehive.metastore.uris/name
valuethrift://localhost:9083/valuedescriptionIP address (or 
fully-qualified domain name) and port of the metastore host/description  
/property  --
  propertynamehive.warehouse.subdir.inherit.perms/name
valuetrue/value  /property
  propertynamehive.metastore.schema.verification/name
valuefalse/value  /property
  propertynamejavax.jdo.option.ConnectionURL/name
valuejdbc:mysql://localhost:3306/metastore_db?createDatabaseIfNotExist=true/value
descriptionmetadata is stored in a MySQL server/description  /property
  propertynamejavax.jdo.option.ConnectionDriverName/name
valuecom.mysql.jdbc.Driver/valuedescriptionMySQL JDBC driver 
class/description  /property
  propertynamejavax.jdo.option.ConnectionUserName/name
valuehiveuser/value  /property
  propertynamejavax.jdo.option.ConnectionPassword/name
valuehiveuser/value  /property
/configuration
When I start the server it looks fine:
$ ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 
--hiveconf hive.server2.thrift.bind.host radtech.io --master 
spark://radtech.io:7077 --driver-class-path 
/usr/local/spark/lib/mysql-connector-java-5.1.34-bin.jarstarting 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to 
/usr/local/spark-1.2.1-bin-hadoop2.4/logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.outradtech:spark
 tnist$ tail -f 
logs/spark-tnist-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-radtech.io.out15/02/11
 19:15:24 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20150211191524-0008/1 on hostPort 192.168.1.2:50851 with 2 cores, 512.0 MB 
RAM15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: 
app-20150211191524-0008/0 is now LOADING15/02/11 19:15:24 INFO 
AppClient$ClientActor: Executor updated: app-20150211191524-0008/1 is now 
LOADING15/02/11 19:15:24 INFO AppClient$ClientActor: Executor updated: 
app-20150211191524-0008/0 is now RUNNING15/02/11 19:15:24 INFO 
AppClient$ClientActor: Executor updated: app-20150211191524-0008/1 is now 
RUNNING15/02/11 19:15:24 INFO NettyBlockTransferService: Server created on 
5093815/02/11 19:15:24 INFO BlockManagerMaster: Trying to register 
BlockManager15/02/11 19:15:24 INFO BlockManagerMasterActor: Registering block 
manager 192.168.1.2:50938 with 265.1 MB RAM, BlockManagerId(driver, 
192.168.1.2, 50938)15/02/11 19:15:24 INFO BlockManagerMaster: Registered 
BlockManager15/02/11 19:15:25 INFO SparkDeploySchedulerBackend: 
SchedulerBackend is ready for scheduling beginning after reached 
minRegisteredResourcesRatio: 0.015/02/11 19:15:25 INFO HiveMetaStore: 0: 
Opening raw store with implemenation 
class:org.apache.hadoop.hive.metastore.ObjectStore15/02/11 19:15:25 INFO 
ObjectStore: ObjectStore, initialize called15/02/11 19:15:26 INFO Persistence: 
Property hive.metastore.integral.jdo.pushdown unknown - will be 
ignored15/02/11 19:15:26 INFO Persistence: Property datanucleus.cache.level2 
unknown - will be ignored15/02/11 19:15:26 WARN Connection: BoneCP specified 
but not present in CLASSPATH (or one of dependencies)15/02/11 19:15:26 WARN 
Connection: BoneCP 

RE: Spark sql failed in yarn-cluster mode when connecting to non-default hive database

2015-02-17 Thread Andrew Lee
HI All,
Just want to give everyone an update of what worked for me. Thanks for Cheng's 
comment and other ppl's help.
So what I misunderstood was the --driver-class-path and how that was related to 
--files.  I put both /etc/hive/hive-site.xml in both --files and 
--driver-class-path when I started in yarn-cluster mode. 
./bin/spark-submit --verbose --queue research --driver-java-options 
-XX:MaxPermSize=8192M --files /etc/hive/hive-site.xml --driver-class-path 
/etc/hive/hive-site.xml --master yarn --deploy-mode cluster 
The problem here is that --files only look for the local files to distribute it 
onto HDFS. The --driver-class-path is what brings to CLASSPATH during runtime, 
and as you can see, it is trying to look at /etc/hive/hive-site.xml on the 
container in the remote nodes which apparently doesn't exist.  For some ppl, it 
may work fine is b/c they may deploy Hive configuration and JARs across their 
entire cluster so every node looks the same. But this wasn't my case in 
multi-tenant environment or a restricted secured cluster. So my parameter looks 
like this when I launch it.








./bin/spark-submit --verbose --queue research --driver-java-options 
-XX:MaxPermSize=8192M --files /etc/hive/hive-site.xml --driver-class-path 
hive-site.xml --master yarn --deploy-mode cluster 
So --driver-class-path here will only look at ./hive-site.xml on the remote 
container which was pre-deployed already by the --files. 
This worked for me, and I can have HiveContext API to talk to Hive metastore, 
and vice versa. Thanks.


Date: Thu, 5 Feb 2015 16:59:12 -0800
From: lian.cs@gmail.com
To: linlin200...@gmail.com; huaiyin@gmail.com
CC: user@spark.apache.org
Subject: Re: Spark sql failed in yarn-cluster mode when connecting to 
non-default hive database


  

  
  

  Hi Jenny,
  You may try to use --files
  $SPARK_HOME/conf/hive-site.xml --driver-class-path
  hive-site.xml when submitting your application. The
problem is that when running in cluster mode, the driver is
actually running in a random container directory on a random
executor node. By using --files,
you upload hive-site.xml to the container directory, by using 
--driver-class-path
  hive-site.xml, you add the file to classpath (the path
is relative to the container directory).
  When running in cluster
mode, have you tried to check the tables inside the default
database? If my guess is right, this should be an empty default
database inside the default Derby metastore created by
HiveContext when the hive-site.xml is missing.
  Best,

Cheng
  On 8/12/14 5:38 PM,
Jenny Zhao wrote:
  
  



  

  

  

  
  Hi Yin,

  

  hive-site.xml was copied to spark/conf and the same as
  the one under $HIVE_HOME/conf. 

  


through hive cli, I don't see any problem. but for spark
on yarn-cluster mode, I am not able to switch to a
database other than the default one, for Yarn-client
mode, it works fine.  



  
  Thanks!

  


Jenny

  
  



On Tue, Aug 12, 2014 at 12:53 PM,
  Yin Huai huaiyin@gmail.com
  wrote:

  
Hi Jenny,
  

  
  Have you copied hive-site.xml
  to spark/conf directory? If not, can you put it in
  conf/ and try again?
  


  Thanks,
  


  Yin


  


  

  On Mon, Aug 11, 2014 at
8:57 PM, Jenny Zhao linlin200...@gmail.com
wrote:


  

  

  
  Thanks Yin! 

  


here is my hive-site.xml,  which I copied
from $HIVE_HOME/conf, didn't experience
problem connecting to the metastore through
hive. which uses DB2 as metastore database.



  

?xml version=1.0?

?xml-stylesheet type=text/xsl

Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-17 Thread Gerard Maas
+1 for TypeSafe config
Our practice is to include all spark properties under a 'spark' entry in
the config file alongside job-specific configuration:

A config file would look like:
spark {
 master = 
 cleaner.ttl = 123456
 ...
}
job {
context {
src = foo
action = barAction
}
prop1 = val1
}

Then, to create our Spark context, we transparently pass the spark section
to a SparkConf instance.
This idiom will instantiate the context with the spark specific
configuration:

sparkConfig.setAll(configToStringSeq(config.getConfig(spark).atPath(spark)))

And we can make use of the config object everywhere else.

We use the override model of the typesafe config: reasonable defaults go in
the reference.conf (within the jar). Environment-specific overrides go in
the application.conf (alongside the job jar) and hacks are passed with
-Dprop=value :-)


-kr, Gerard.


On Tue, Feb 17, 2015 at 1:45 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've decided to try

   spark-submit ... --conf
 spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties

 But when I try to retrieve the value of propertiesFile via

System.err.println(propertiesFile :  +
 System.getProperty(propertiesFile));

 I get NULL:

propertiesFile : null

 Interestingly, when I run spark-submit with --verbose, I see that it
 prints:

   spark.driver.extraJavaOptions -
 -DpropertiesFile=/home/emre/data/belga/schemavalidator.properties

 I couldn't understand why I couldn't get to the value of propertiesFile
 by using standard System.getProperty method. (I can use new
 SparkConf().get(spark.driver.extraJavaOptions)  and manually parse it,
 and retrieve the value, but I'd like to know why I cannot retrieve that
 value using System.getProperty method).

 Any ideas?

 If I can achieve what I've described above properly, I plan to pass a
 properties file that resides on HDFS, so that it will be available to my
 driver program wherever that program runs.

 --
 Emre




 On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com
 wrote:

 I haven't actually tried mixing non-Spark settings into the Spark
 properties. Instead I package my properties into the jar and use the
 Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
 specific) to get at my properties:

 Properties file: src/main/resources/integration.conf

 (below $ENV might be set to either integration or prod[3])

 ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \
 --conf 'config.resource=$ENV.conf' \
 --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'

 Since the properties file is packaged up with the JAR I don't have to
 worry about sending the file separately to all of the slave nodes. Typesafe
 Config is written in Java so it will work if you're not using Scala. (The
 Typesafe Config also has the advantage of being extremely easy to integrate
 with code that is using Java Properties today.)

 If you instead want to send the file separately from the JAR and you use
 the Typesafe Config library, you can specify config.file instead of
 .resource; though I'd point you to [3] below if you want to make your
 development life easier.

 1. https://github.com/typesafehub/config
 2. https://github.com/ceedubs/ficus
 3.
 http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/



 On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I'm using Spark 1.2.1 and have a module.properties file, and in it I
 have non-Spark properties, as well as Spark properties, e.g.:

job.output.dir=file:///home/emre/data/mymodule/out

 I'm trying to pass it to spark-submit via:

spark-submit --class com.myModule --master local[4] --deploy-mode
 client --verbose --properties-file /home/emre/data/mymodule.properties
 mymodule.jar

 And I thought I could read the value of my non-Spark property, namely,
 job.output.dir by using:

 SparkConf sparkConf = new SparkConf();
 final String validatedJSONoutputDir =
 sparkConf.get(job.output.dir);

 But it gives me an exception:

 Exception in thread main java.util.NoSuchElementException:
 job.output.dir

 Is it not possible to mix Spark and non-Spark properties in a single
 .properties file, then pass it via --properties-file and then get the
 values of those non-Spark properties via SparkConf?

 Or is there another object / method to retrieve the values for those
 non-Spark properties?


 --
 Emre Sevinç




 --
 Emre Sevinc



Re: Implementing FIRST_VALUE, LEAD, LAG in Spark

2015-02-17 Thread Dmitry Tolpeko
I ended up with the following:

def firstValue(items: Iterable[String]) = for { i - items
} yield (i, items.head)

data.groupByKey().map{case(a, b)=(a, firstValue(b))}.collect

More details:
http://dmtolpeko.com/2015/02/17/first_value-last_value-lead-and-lag-in-spark/

I would appreciate any feedback.

Dmitry

On Fri, Feb 13, 2015 at 11:54 AM, Dmitry Tolpeko dmtolp...@gmail.com
wrote:

 Hello,

 To convert existing Map Reduce jobs to Spark, I need to implement window
 functions such as FIRST_VALUE, LEAD, LAG and so on. For example,
 FIRST_VALUE function:

 Source (1st column is key):

 A, A1
 A, A2
 A, A3
 B, B1
 B, B2
 C, C1

 and the result should be

 A, A1, A1
 A, A2, A1
 A, A3, A1
 B, B1, B1
 B, B2, B1
 C, C1, C1

 You can see that the first value in a group is repeated in each row.

 My current Spark/Scala code:

 def firstValue(b: Iterable[String]) : List[(String, String)] = {
   val c = scala.collection.mutable.MutableList[(String, String)]()
   var f = 
   b.foreach(d = { if(f.isEmpty()) f = d; c += d - f})
   c.toList
 }

 val data=sc.parallelize(List(
(A, A1),
(A, A2),
(A, A3),
(B, B1),
(B, B2),
(C, C1)))

 data.groupByKey().map{case(a, b)=(a, firstValue(b))}.collect

 So I create a new list after groupByKey. Is it right approach to do this
 in Spark? Are there any other options? Please point me to any drawbacks.

 Thanks,

 Dmitry




Large Similarity Job failing

2015-02-17 Thread Debasish Das
Hi,

I am running brute force similarity from RowMatrix on a job with 5M x 1.5M
sparse matrix with 800M entries. With 200M entries the job run fine but
with 800M I am getting exceptions like too many files open and no space
left on device...

Seems like I need more nodes or use dimsum sampling ?

I am running on 10 nodes where ulimit on each node is set at 65K...Memory
is not an issue since I can cache the dataset before similarity computation
starts.

I tested the same job on YARN with Spark 1.1 and Spark 1.2 stable. Both the
jobs failed with FetchFailed msgs.

Thanks.
Deb


Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-17 Thread Imran Rashid
Hi Emre,

there shouldn't be any difference in which files get processed w/ print()
vs. foreachRDD().  In fact, if you look at the definition of print(), it is
just calling foreachRDD() underneath.  So there is something else going on
here.

We need a little more information to figure out exactly what is going on.
 (I think Sean was getting at the same thing ...)

(a) how do you know that when you use foreachRDD, all 20 files get
processed?

(b) How do you know that only 16 files get processed when you print()? Do
you know the other files are being skipped, or maybe they are just stuck
somewhere?  eg., suppose you start w/ 20 files, and you see 16 get
processed ... what happens after you add a few more files to the
directory?  Are they processed immediately, or are they never processed
either?

(c) Can you share any more code of what you are doing to the dstreams
*before* the print() / foreachRDD()?  That might give us more details about
what the difference is.

I can't see how .count.println() would be different than just println(),
but maybe I am missing something also.

Imran

On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Sean,

 In this case, I've been testing the code on my local machine and using
 Spark locally, so I all the log output was available on my terminal. And
 I've used the .print() method to have an output operation, just to force
 Spark execute.

 And I was not using foreachRDD, I was only using print() method on a
 JavaDStream object, and it was working fine for a few files, up to 16 (and
 without print() it did not do anything because there were no output
 operations).

 To sum it up, in my case:

  - Initially, use .print() and no foreachRDD: processes up to 16 files and
 does not do anything for the remaining 4.
  - Remove .print() and use foreachRDD: processes all of the 20 files.

 Maybe, as in Akhil Das's suggestion, using .count.print() might also have
 fixed my problem, but I'm satisfied with foreachRDD approach for now.
 (Though it is still a mystery to me why using .print() had a difference,
 maybe my mental model of Spark is wrong, I thought no matter what output
 operation I used, the number of files processed by Spark would be
 independent of that because the processing is done in a different method,
 .print() is only used to force Spark execute that processing, am I wrong?).

 --
 Emre


 On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote:

 Materialization shouldn't be relevant. The collect by itself doesn't let
 you detect whether it happened. Print should print some results to the
 console but on different machines, so may not be a reliable way to see what
 happened.

 Yes I understand your real process uses foreachRDD and that's what you
 should use. It sounds like that works. But you must always have been using
 that right? What do you mean that you changed to use it?

 Basically I'm not clear on what the real code does and what about the
 output of that code tells you only 16 files were processed.
 On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Sean,

 I did not understand your question very well, but what I do is checking
 the output directory (and I have various logger outputs at various stages
 showing the contents of an input file being processed, the response from
 the web service, etc.).

 By the way, I've already solved my problem by using foreachRDD instead
 of print (see my second message in this thread). Apparently forcing Spark
 to materialize DAG via print() is not the way to go. (My interpretation
 might be wrong, but this is what I've just seen in my case).

 --
 Emre




 On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote:

 How are you deciding whether files are processed or not? It doesn't
 seem possible from this code. Maybe it just seems so.
 On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've managed to solve this, but I still don't know exactly why my
 solution works:

 In my code I was trying to force the Spark to output via:

   jsonIn.print();

 jsonIn being a JavaDStreamString.

 When removed the code above, and added the code below to force the
 output operation, hence the execution:

 jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
   @Override
   public Void call(JavaRDDString stringJavaRDD) throws Exception
 {
 stringJavaRDD.collect();
 return null;
   }
 });

 It works as I expect, processing all of the 20 files I give to it,
 instead of stopping at 16.

 --
 Emre


 On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web 

MapValues and Shuffle Reads

2015-02-17 Thread Darin McBeath
In the following code, I read in a large sequence file from S3 (1TB) spread 
across 1024 partitions.  When I look at the job/stage summary, I see about 
400GB of shuffle writes which seems to make sense as I'm doing a hash partition 
on this file.

// Get the baseline input file
JavaPairRDDText,Text hsfBaselinePairRDDReadable = 
sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, 
Text.class, Text.class);

JavaPairRDDString, String hsfBaselinePairRDD = 
hsfBaselinePairRDDReadable.mapToPair(new 
ConvertFromWritableTypes()).partitionBy(new 
HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

I then execute the following code (with a count to force execution) and what I 
find very strange is that when I look at the job/stage summary, I see more than 
340GB of shuffle read.  Why would there be any shuffle read in this step?  I 
would expect there to be little (if any) shuffle reads in this step.

// Use 'substring' to extract the epoch value from each record.
JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new 
ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

log.info(Number of baseline records:  + baselinePairRDD.count());

Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

Any insights would be appreciated.

I'm using Spark 1.2.0 in a stand-alone cluster.


Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: OOM error

2015-02-17 Thread Harshvardhan Chauhan
Thanks for the pointer it led me to
http://spark.apache.org/docs/1.2.0/tuning.html increasing parallelism
resolved the issue.



On Mon, Feb 16, 2015 at 11:57 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Increase your executor memory, Also you can play around with increasing
 the number of partitions/parallelism etc.

 Thanks
 Best Regards

 On Tue, Feb 17, 2015 at 3:39 AM, Harshvardhan Chauhan ha...@gumgum.com
 wrote:

 Hi All,


 I need some help with Out Of Memory errors in my application. I am using
 Spark 1.1.0 and my application is using Java API. I am running my app on
 EC2  25 m3.xlarge (4 Cores 15GB Memory) instances. The app only fails
 sometimes. Lots of mapToPair tasks a failing.  My app is configured to run
 120 executors and executor memory is 2G.

 These are various errors i see the in my logs.

 15/02/16 10:53:48 INFO storage.MemoryStore: Block broadcast_1 of size 4680 
 dropped from memory (free 257277829)
 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was 
 thrown by a user handler while handling an exception event ([id: 0x6e0138a3, 
 /10.61.192.194:35196 = /10.164.164.228:49445] EXCEPTION: 
 java.lang.OutOfMemoryError: Java heap space)
 java.lang.OutOfMemoryError: Java heap space
  at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
  at 
 org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
  at 
 org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
  at 
 org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
  at 
 org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was 
 thrown by a user handler while handling an exception event ([id: 0x2d0c1db1, 
 /10.169.226.254:55790 = /10.164.164.228:49445] EXCEPTION: 
 java.lang.OutOfMemoryError: Java heap space)
 java.lang.OutOfMemoryError: Java heap space
  at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
  at 
 org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
  at 
 org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
  at 
 org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
  at 
 org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
  at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
 15/02/16 10:53:50 WARN channel.DefaultChannelPipeline: An exception was 
 thrown by a user handler while handling an exception event ([id: 0xd4211985, 
 /10.181.125.52:60959 = /10.164.164.228:49445] EXCEPTION: 
 java.lang.OutOfMemoryError: Java heap space)
 java.lang.OutOfMemoryError: Java heap space
  at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
  at 
 

Re: MapValues and Shuffle Reads

2015-02-17 Thread Imran Rashid
Hi Darin,

When you say you see 400GB of shuffle writes from the first code snippet,
what do you mean?  There is no action in that first set, so it won't do
anything.  By itself, it won't do any shuffle writing, or anything else for
that matter.

Most likely, the .count() on your second code snippet is actually causing
the execution of some of the first snippet as well.  The .partitionBy will
result in both shuffle writes and shuffle reads, but they aren't set in
motion until the .count further down the line.  Its confusing b/c the stage
boundaries don't line up exactly with your RDD variables here.
hsfBaselinePairRDD
spans 2 stages, and baselinePairRDD actually gets merged into the stage
above it.

If you do a hsfBaselinePairRDD.count after your first code snippet, and
then run the second code snippet afterwards, is it more like what you
expect?

Imran

On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:

 In the following code, I read in a large sequence file from S3 (1TB)
 spread across 1024 partitions.  When I look at the job/stage summary, I see
 about 400GB of shuffle writes which seems to make sense as I'm doing a hash
 partition on this file.

 // Get the baseline input file
 JavaPairRDDText,Text hsfBaselinePairRDDReadable =
 sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class,
 Text.class, Text.class);

 JavaPairRDDString, String hsfBaselinePairRDD =
 hsfBaselinePairRDDReadable.mapToPair(new
 ConvertFromWritableTypes()).partitionBy(new
 HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 I then execute the following code (with a count to force execution) and
 what I find very strange is that when I look at the job/stage summary, I
 see more than 340GB of shuffle read.  Why would there be any shuffle read
 in this step?  I would expect there to be little (if any) shuffle reads in
 this step.

 // Use 'substring' to extract the epoch value from each record.
 JavaPairRDDString, Long baselinePairRDD =
 hsfBaselinePairRDD.mapValues(new
 ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 log.info(Number of baseline records:  + baselinePairRDD.count());

 Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

 Any insights would be appreciated.

 I'm using Spark 1.2.0 in a stand-alone cluster.


 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Percentile example

2015-02-17 Thread Kohler, Curt E (ELS-STL)


The best approach I've found to calculate Percentiles in Spark is to leverage 
SparkSQL.  If you use the Hive Query Language support, you can use the UDAFs 
for percentiles (as of Spark 1.2)

Something like this (Note: syntax not guaranteed to run but should give you the 
gist of what you need to do):


JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaHiveContext hsc = new JavaHiveContext(sc);

//Get your Data into a SchemaRDD and register the Table


// Query it

String hql =  SELECT FIELD1, FIELD2, percentile(FIELD3, 0.05) AS ptile5 from 
TABLE-NAME GROUP BY FIELD1, FIELD2;

JavaSchemaRDD result = hsc.hql(hql);

ListRow grp = result.collect();


for (int z = 2; z   row.length(); z++) {

  // Do something with the results

}

Curt


From: SiMaYunRui myl...@hotmail.commailto:myl...@hotmail.com
Date: Sunday, February 15, 2015 at 10:37 AM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Percentile example

hello,

I am a newbie to spark and trying to figure out how to get percentile against a 
big data set. Actually, I googled this topic but not find any very useful code 
example and explanation. Seems that I can use transformer SortBykey to get my 
data set in order, but not pretty sure how can I get value of , for example, 
percentile 66.

Should I use take() to pick up the value of percentile 66? I don't believe any 
machine can load my data set in memory. I believe there must be more efficient 
approaches.

Can anyone shed some light on this problem?

Regards



ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration

2015-02-17 Thread Jadhav Shweta
Hi,

I am running streaning word count program in Spark Standalone mode cluster, 
having four machines in cluster.

public final class JavaKafkaStreamingWordCount {
private static final Pattern SPACE = Pattern.compile( );
static transient Configuration conf;
private JavaKafkaStreamingWordCount() {
}

public static void main(String[] args) {
if (args.length  4) {
System.err.println(Usage: JavaKafkaWordCount 
zkQuorum group topics numThreads);
System.exit(1);
}

StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new 
SparkConf().setAppName(JavaKafkaWordCount);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
new Duration(1));


jssc.checkpoint(hdfs://172.17.199.229:8020/spark/wordcountKafkaCheckpoint);

int numThreads = Integer.parseInt(args[3]);
MapString, Integer topicMap = new HashMapString, Integer();
String[] topics = args[2].split(//,);
for (String topic: topics) {
topicMap.put(topic, numThreads);
}

JavaPairReceiverInputDStreamString, String messages =
KafkaUtils.createStream(jssc, args[0], args[1], 
topicMap);

JavaDStreamString lines = messages.map(new 
FunctionTuple2String, String, String() {
@Override
public String call(Tuple2String, String tuple2) {
return tuple2._2();
}
});

JavaDStreamString words = lines.flatMap(new 
FlatMapFunctionString, String() {
@Override
public IterableString call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});

JavaPairDStreamString, Integer pairs = words.mapToPair(
new PairFunctionString, String, Integer() {
@Override 
public Tuple2String, Integer 
call(String s) {
return new Tuple2String, 
Integer(s, 1);
}
});

Function2ListInteger, OptionalInteger, OptionalInteger 
updateFunction =
new Function2ListInteger, OptionalInteger, 
OptionalInteger() {
@Override public OptionalInteger call(ListInteger 
values, OptionalInteger state) {
Integer newSum = 0;
if(state.isPresent()){
if(values.size()!=0){
newSum = state.get();
for(int temp : values){
newSum += temp;
}
}else{
newSum = state.get();
}
}
else{
if(values.size()!=0){
for(int temp : values){
newSum += 1;
}
}
}

return Optional.of(newSum);
}
};
JavaPairDStreamString, Integer runningCounts = 
pairs.updateStateByKey(updateFunction);
conf = new Configuration();

runningCounts.saveAsNewAPIHadoopFiles(hdfs://172.17.199.229:8020/spark/wordCountOutput/word,
 stream, Text.class, Text.class, (Class? extends 
org.apache.hadoop.mapreduce.OutputFormat?, ?)TextOutputFormat.class,conf);
//jssc.sparkContext().hadoopConfiguration();
jssc.start();
jssc.awaitTermination();
}
}

This is working fine in one node cluster but its giving following error when i 
try to run the same in cluster.

15/02/17 12:57:10 ERROR actor.OneForOneStrategy: 
org.apache.hadoop.conf.Configuration
java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at 

Re: Identify the performance bottleneck from hardware prospective

2015-02-17 Thread Julaiti Alafate
The raw data is ~30 GB. It consists of 250 millions sentences. The total
length of the documents (i.e. the sum of the length of all sentences) is 11
billions. I also ran a simple algorithm to roughly count the maximum number
of word pairs by summing up d * (d - 1) over all sentences, where d is the
length of the sentence. It is about 63 billions.

Thanks,
Julaiti


On Tue, Feb 17, 2015 at 2:44 AM, Arush Kharbanda ar...@sigmoidanalytics.com
 wrote:

 Hi

 How big is your dataset?

 Thanks
 Arush

 On Tue, Feb 17, 2015 at 4:06 PM, Julaiti Alafate jalaf...@eng.ucsd.edu
 wrote:

 Thank you very much for your reply!

 My task is to count the number of word pairs in a document. If w1 and w2
 occur together in one sentence, the number of occurrence of word pair (w1,
 w2) adds 1. So the computational part of this algorithm is simply a
 two-level for-loop.

 Since the cluster is monitored by Ganglia, I can easily see that neither
 CPU or network IO is under pressure. The only parameter left is memory. In
 the executor tab of Spark Web UI, I can see a column named memory used.
 It showed that only 6GB of 20GB memory is used. I understand this is
 measuring the size of RDD that persist in memory. So can I at least assume
 the data/object I used in my program is not exceeding memory limit?

 My confusion here is, why can't my program run faster while there is
 still efficient memory, CPU time and network bandwidth it can utilize?

 Best regards,
 Julaiti


 On Tue, Feb 17, 2015 at 12:53 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 What application are you running? Here's a few things:

 - You will hit bottleneck on CPU if you are doing some complex
 computation (like parsing a json etc.)
 - You will hit bottleneck on Memory if your data/objects used in the
 program is large (like defining playing with HashMaps etc inside your map*
 operations), Here you can set spark.executor.memory to a higher number and
 also you can change the spark.storage.memoryFraction whose default value is
 0.6 of your executor memory.
 - Network will be a bottleneck if data is not available locally on one
 of the worker and hence it has to collect it from others, which is a lot of
 Serialization and data transfer across your cluster.

 Thanks
 Best Regards

 On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate jalaf...@eng.ucsd.edu
  wrote:

 Hi there,

 I am trying to scale up the data size that my application is handling.
 This application is running on a cluster with 16 slave nodes. Each slave
 node has 60GB memory. It is running in standalone mode. The data is coming
 from HDFS that also in same local network.

 In order to have an understanding on how my program is running, I also
 had a Ganglia installed on the cluster. From previous run, I know the stage
 that taking longest time to run is counting word pairs (my RDD consists of
 sentences from a corpus). My goal is to identify the bottleneck of my
 application, then modify my program or hardware configurations according to
 that.

 Unfortunately, I didn't find too much information on Spark monitoring
 and optimization topics. Reynold Xin gave a great talk on Spark Summit 2014
 for application tuning from tasks perspective. Basically, his focus is on
 tasks that oddly slower than the average. However, it didn't solve my
 problem because there is no such tasks that run way slow than others in my
 case.

 So I tried to identify the bottleneck from hardware prospective. I want
 to know what the limitation of the cluster is. I think if the executers are
 running hard, either CPU, memory or network bandwidth (or maybe the
 combinations) is hitting the roof. But Ganglia reports the CPU utilization
 of cluster is no more than 50%, network utilization is high for several
 seconds at the beginning, then drop close to 0. From Spark UI, I can see
 the nodes with maximum memory usage is consuming around 6GB, while
 spark.executor.memory is set to be 20GB.

 I am very confused that the program is not running fast enough, while
 hardware resources are not in shortage. Could you please give me some hints
 about what decides the performance of a Spark application from hardware
 perspective?

 Thanks!

 Julaiti






 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com



Hive, Spark, Cassandra, Tableau, BI, etc.

2015-02-17 Thread Ashic Mahtab
Hi,I've seen a few articles where they CqlStorageHandler to create hive tables 
referencing Cassandra data using the thriftserver. Is there a secret to getting 
this to work? I've basically got Spark built with Hive, and a Cassandra 
cluster. Is there a way to get the hive server to talk to Cassandra? I've seen 
Calliope from Tuplejump, and it looks good, but was wondering if there's a more 
direct approach. The version updates of various things mean that if I could go 
direct even with a bit of hassle, I'd prefer that.
I've also managed to connect the Simba ODBC driver for Spark. Is there are way 
to get ODBC clients to use that to get to Cassandra data? Would I need to build 
a Spark application and pull in Cassandra data (or define RDDs) for ODBC 
clients to make use of it? I can't really use the Simba ODBC connector for 
Cassandra as it's missing a few things currently (like understanding UDTs + 
collection columns). 
What I'm really after is somehow letting Tableau + Excel users access to 
Cassandra data. I'd love to be able to use DataStax Enterprise, but as of now, 
the client is firmly in Apache Cassandra + Apache Spark territory. 
Thanks,Ashic. 

Re: MapValues and Shuffle Reads

2015-02-17 Thread Darin McBeath
Thanks Imran.

I think you are probably correct. I was a bit surprised that there was no 
shuffle read in the initial hash partition step. I will adjust the code as you 
suggest to prove that is the case. 

I have a slightly different question. If I save an RDD to S3 (or some 
equivalent) and this RDD was hash partitioned at the time, do I still need to 
hash partition the RDD again when I read it in? Is there a way that I could 
prevent all of the shuffling (such as providing a hint)? My parts for the RDD 
will be gzipped so they would not be splittable).  In reality, that's what I 
would really want to do in the first place.

Thanks again for your insights.

Darin.




From: Imran Rashid iras...@cloudera.com
To: Darin McBeath ddmcbe...@yahoo.com 
Cc: User user@spark.apache.org 
Sent: Tuesday, February 17, 2015 3:29 PM
Subject: Re: MapValues and Shuffle Reads



Hi Darin,

When you say you see 400GB of shuffle writes from the first code snippet, 
what do you mean?  There is no action in that first set, so it won't do 
anything.  By itself, it won't do any shuffle writing, or anything else for 
that matter.

Most likely, the .count() on your second code snippet is actually causing the 
execution of some of the first snippet as well.  The .partitionBy will result 
in both shuffle writes and shuffle reads, but they aren't set in motion until 
the .count further down the line.  Its confusing b/c the stage boundaries don't 
line up exactly with your RDD variables here.  hsfBaselinePairRDD spans 2 
stages, and baselinePairRDD actually gets merged into the stage above it.

If you do a hsfBaselinePairRDD.count after your first code snippet, and then 
run the second code snippet afterwards, is it more like what you expect?

Imran




On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid 
wrote:

In the following code, I read in a large sequence file from S3 (1TB) spread 
across 1024 partitions.  When I look at the job/stage summary, I see about 
400GB of shuffle writes which seems to make sense as I'm doing a hash partition 
on this file.

// Get the baseline input file
JavaPairRDDText,Text hsfBaselinePairRDDReadable = 
sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, 
Text.class, Text.class);

JavaPairRDDString, String hsfBaselinePairRDD = 
hsfBaselinePairRDDReadable.mapToPair(new 
ConvertFromWritableTypes()).partitionBy(new 
HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

I then execute the following code (with a count to force execution) and what I 
find very strange is that when I look at the job/stage summary, I see more 
than 340GB of shuffle read.  Why would there be any shuffle read in this step? 
 I would expect there to be little (if any) shuffle reads in this step.

// Use 'substring' to extract the epoch value from each record.
JavaPairRDDString, Long baselinePairRDD = hsfBaselinePairRDD.mapValues(new 
ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

log.info(Number of baseline records:  + baselinePairRDD.count());

Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

Any insights would be appreciated.

I'm using Spark 1.2.0 in a stand-alone cluster.


Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stepsize with Linear Regression

2015-02-17 Thread Xiangrui Meng
The best step size depends on the condition number of the problem. You
can try some conditioning heuristics first, e.g., normalizing the
columns, and then try a common step size like 0.01. We should
implement line search for linear regression in the future, as in
LogisticRegressionWithLBFGS. Line search can determine the step size
automatically for you. -Xiangrui

On Tue, Feb 10, 2015 at 8:56 AM, Rishi Yadav ri...@infoobjects.com wrote:
 Are there any thumbrules how to set stepsize with gradient descent. I am
 using it for Linear Regression but I am sure it applies in general to
 gradient descent.

 I am at present deriving a number which fits closest to training data set
 response variable values. I am sure there is a better way to do it.

 Thanks and Regards,
 Rishi
 @meditativesoul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: high GC in the Kmeans algorithm

2015-02-17 Thread Xiangrui Meng
Did you cache the data? Was it fully cached? The k-means
implementation doesn't create many temporary objects. I guess you need
more RAM to avoid GC triggered frequently. Please monitor the memory
usage using YourKit or VisualVM. -Xiangrui

On Wed, Feb 11, 2015 at 1:35 AM, lihu lihu...@gmail.com wrote:
 I just want to make the best use of CPU,  and test the performance of spark
 if there is a lot of task in a single node.

 On Wed, Feb 11, 2015 at 5:29 PM, Sean Owen so...@cloudera.com wrote:

 Good, worth double-checking that's what you got. That's barely 1GB per
 task though. Why run 48 if you have 24 cores?

 On Wed, Feb 11, 2015 at 9:03 AM, lihu lihu...@gmail.com wrote:
  I give 50GB to the executor,  so it seem that  there is no reason the
  memory
  is not enough.
 
  On Wed, Feb 11, 2015 at 4:50 PM, Sean Owen so...@cloudera.com wrote:
 
  Meaning, you have 128GB per machine but how much memory are you giving
  the executors?
 
  On Wed, Feb 11, 2015 at 8:49 AM, lihu lihu...@gmail.com wrote:
   What do you mean?  Yes,I an see there  is some data put in the memory
   from
   the web ui.
  
   On Wed, Feb 11, 2015 at 4:25 PM, Sean Owen so...@cloudera.com
   wrote:
  
   Are you actually using that memory for executors?
  
   On Wed, Feb 11, 2015 at 8:17 AM, lihu lihu...@gmail.com wrote:
Hi,
I  run the kmeans(MLlib) in a cluster with 12 workers.  Every
work
own a
128G RAM, 24Core. I run 48 task in one machine. the total data is
just
40GB.
   
   When the dimension of the data set is about 10^7, for every
task
the
duration is about 30s, but the cost for GC is about 20s.
   
   When I reduce the dimension to 10^4, then the gc is small.
   
So why gc is so high when the dimension is larger? or this is
the
reason
caused by MLlib?
   
   
   
   
  
  
  
  
   --
   Best Wishes!
  
   Li Hu(李浒) | Graduate Student
   Institute for Interdisciplinary Information Sciences(IIIS)
   Tsinghua University, China
  
   Email: lihu...@gmail.com
   Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
  
  
 
 
 
 
  --
  Best Wishes!
 
  Li Hu(李浒) | Graduate Student
  Institute for Interdisciplinary Information Sciences(IIIS)
  Tsinghua University, China
 
  Email: lihu...@gmail.com
  Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
 
 




 --
 Best Wishes!

 Li Hu(李浒) | Graduate Student
 Institute for Interdisciplinary Information Sciences(IIIS)
 Tsinghua University, China

 Email: lihu...@gmail.com
 Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: feeding DataFrames into predictive algorithms

2015-02-17 Thread Xiangrui Meng
Hey Sandy,

The work should be done by a VectorAssembler, which combines multiple
columns (double/int/vector) into a vector column, which becomes the
features column for regression. We can going to create JIRAs for each
of these standard feature transformers. It would be great if you can
help implement some of them.

Best,
Xiangrui

On Wed, Feb 11, 2015 at 7:55 PM, Patrick Wendell pwend...@gmail.com wrote:
 I think there is a minor error here in that the first example needs a
 tail after the seq:

 df.map { row =
   (row.getDouble(0), row.toSeq.tail.map(_.asInstanceOf[Double]))
 }.toDataFrame(label, features)

 On Wed, Feb 11, 2015 at 7:46 PM, Michael Armbrust
 mich...@databricks.com wrote:
 It sounds like you probably want to do a standard Spark map, that results in
 a tuple with the structure you are looking for.  You can then just assign
 names to turn it back into a dataframe.

 Assuming the first column is your label and the rest are features you can do
 something like this:

 val df = sc.parallelize(
   (1.0, 2.3, 2.4) ::
   (1.2, 3.4, 1.2) ::
   (1.2, 2.3, 1.2) :: Nil).toDataFrame(a, b, c)

 df.map { row =
   (row.getDouble(0), row.toSeq.map(_.asInstanceOf[Double]))
 }.toDataFrame(label, features)

 df: org.apache.spark.sql.DataFrame = [label: double, features:
 arraydouble]

 If you'd prefer to stick closer to SQL you can define a UDF:

 val createArray = udf((a: Double, b: Double) = Seq(a, b))
 df.select('a as 'label, createArray('b,'c) as 'features)

 df: org.apache.spark.sql.DataFrame = [label: double, features:
 arraydouble]

 We'll add createArray as a first class member of the DSL.

 Michael

 On Wed, Feb 11, 2015 at 6:37 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hey All,

 I've been playing around with the new DataFrame and ML pipelines APIs and
 am having trouble accomplishing what seems like should be a fairly basic
 task.

 I have a DataFrame where each column is a Double.  I'd like to turn this
 into a DataFrame with a features column and a label column that I can feed
 into a regression.

 So far all the paths I've gone down have led me to internal APIs or
 convoluted casting in and out of RDD[Row] and DataFrame.  Is there a simple
 way of accomplishing this?

 any assistance (lookin' at you Xiangrui) much appreciated,
 Sandy



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLib usage on Spark Streaming

2015-02-17 Thread Xiangrui Meng
JavaDStream.foreachRDD
(https://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/api/java/JavaDStreamLike.html#foreachRDD(org.apache.spark.api.java.function.Function))
and Statistics.corr
(https://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/mllib/stat/Statistics.html#corr(org.apache.spark.rdd.RDD))
should be good starting points. -Xiangrui

On Mon, Feb 16, 2015 at 6:39 AM, Spico Florin spicoflo...@gmail.com wrote:
 Hello!
   I'm newbie to Spark and I have the following case study:
 1. Client sending at 100ms the following data:
   {uniqueId, timestamp, measure1, measure2 }
 2. Each 30 seconds I would like to correlate the data collected in the
 window, with some predefined double vector pattern for each given key. The
 predefined pattern has 300 records. The data should be also sorted by
 timestamp.
 3. When the correlation is greater than a predefined threshold (e.g 0.9) I
 would like to emit an new message containing {uniqueId,
 doubleCorrelationValue}
 4. For the correlation I would like to use MLlib
 5. As a programming language I would like to muse Java 7.

 Can you please give me some suggestions on how to create the skeleton for
 the above scenario?

 Thanks.
  Regards,
  Florin


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Large Similarity Job failing

2015-02-17 Thread Xiangrui Meng
The complexity of DIMSUM is independent of the number of rows but
still have quadratic dependency on the number of columns. 1.5M columns
may be too large to use DIMSUM. Try to increase the threshold and see
whether it helps. -Xiangrui

On Tue, Feb 17, 2015 at 6:28 AM, Debasish Das debasish.da...@gmail.com wrote:
 Hi,

 I am running brute force similarity from RowMatrix on a job with 5M x 1.5M
 sparse matrix with 800M entries. With 200M entries the job run fine but with
 800M I am getting exceptions like too many files open and no space left on
 device...

 Seems like I need more nodes or use dimsum sampling ?

 I am running on 10 nodes where ulimit on each node is set at 65K...Memory is
 not an issue since I can cache the dataset before similarity computation
 starts.

 I tested the same job on YARN with Spark 1.1 and Spark 1.2 stable. Both the
 jobs failed with FetchFailed msgs.

 Thanks.
 Deb

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [POWERED BY] Radius Intelligence

2015-02-17 Thread Xiangrui Meng
Thanks! I added Radius to
https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark.
-Xiangrui

On Tue, Feb 10, 2015 at 12:02 AM, Alexis Roos alexis.r...@gmail.com wrote:
 Also long due given our usage of Spark ..

 Radius Intelligence:
 URL: radius.com

 Description:
 Spark, MLLib
 Using Scala, Spark and MLLib for Radius Marketing and Sales intelligence
 platform including data aggregation, data processing, data clustering, data
 analysis and predictive modeling of all US businesses.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unknown sample in Naive Baye's

2015-02-17 Thread Xiangrui Meng
If there exists a sample that doesn't not belong to A/B/C, it means
that there exists another class D or Unknown besides A/B/C. You should
have some of these samples in the training set in order to let naive
Bayes learn the priors. -Xiangrui

On Tue, Feb 10, 2015 at 10:44 PM, jatinpreet jatinpr...@gmail.com wrote:
 Hi,

 I am using MLlib's Naive Baye's classifier to classify textual data. I am
 accessing the posterior probabilities through a hack for each class.

 Once I have trained the model, I want to remove documents whose confidence
 of classification is low. Say for a document, if the highest class
 probability is lesser than a pre-defined threshold(separate for each class),
 categorize this document as 'unknown'.

 Say there are three classes A, B and C with thresholds 0.35, 0.32 and 0.33
 respectively defined after training and testing. If I score a sample that
 belongs to neither of the three categories, I wish to classify it as
 'unknown'. But the issue is I can get a probability higher than these
 thresholds for a document that doesn't belong to the trained categories.

 Is there any technique which I can apply to segregate documents that belong
 to untrained classes with certain degree of confidence?

 Thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Unknown-sample-in-Naive-Baye-s-tp21594.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Naive Bayes model fails after a few predictions

2015-02-17 Thread Xiangrui Meng
Could you share the error log? What do you mean by 500 instead of
200? If this is the number of files, try to use `repartition` before
calling naive Bayes, which works the best when the number of
partitions matches the number of cores, or even less. -Xiangrui

On Tue, Feb 10, 2015 at 10:34 PM, rkgurram rkgur...@gmail.com wrote:
 Further I have tried HttpBroadcast but that too does not work.

 It is almost like there is a MemoryLeak because if I increase the input
 files to 500 instead of 200 the system crashes early.


 The code is as follows
 

   logger.info(Training the model Fold:[+ fold +])
 logger.info(Step 1: Split the input into Training and Testing sets)
 val splits = labeledPointRDD.randomSplit(Array(0.6, 0.4), seed = 11L)
 logger.info(Step 1: splits successful...)

 val training = splits(0)
 val test = splits(1)
 status = ModelStatus.IN_TRAINING
 //logger.info(Fold:[ + fold + ] Training count:  + training.count()
 +  Testing/Verification count: + test.count())

 logger.info(Step 2: Train the NB classifier)
 model = NaiveBayes.train(training, lambda = 1.0)
 logger.info(Step 2: NB model training complete Fold:[ + fold + ])

 logger.info(Step 3: Testing/Verification of the model)
 status = ModelStatus.IN_VERIFICATION
 val predictionAndLabel = test.map(p = (model.predict(p.features),
 p.label))
 val arry = predictionAndLabel.filter(x = x._1 == x._2)
 val accuracy = 1.0 * predictionAndLabel.filter(x = x._1 ==
 x._2).count() / test.count()
 logger.info(Step 3: Testing complete)
 status = ModelStatus.INITIALIZED
 logger.info(Fold[+ fold +] Accuracy:[ + accuracy + ] Model
 Status:[ + status + ])




 -Ravi



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Naive-Bayes-model-fails-after-a-few-predictions-tp21592p21593.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How do you get the partitioner for an RDD in Java?

2015-02-17 Thread Darin McBeath
In an 'early release' of the Learning Spark book, there is the following 
reference:

In Scala and Java, you can determine how an RDD is partitioned using its 
partitioner property (or partitioner() method in Java)

However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way 
of getting this information.

I'm curious if anyone has any suggestions for how I might go about finding how 
an RDD is partitioned in a Java program.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: WARN from Similarity Calculation

2015-02-17 Thread Xiangrui Meng
It may be caused by GC pause. Did you check the GC time in the Spark
UI? -Xiangrui

On Sun, Feb 15, 2015 at 8:10 PM, Debasish Das debasish.da...@gmail.com wrote:
 Hi,

 I am sometimes getting WARN from running Similarity calculation:

 15/02/15 23:07:55 WARN BlockManagerMasterActor: Removing BlockManager
 BlockManagerId(7, abc.com, 48419, 0) with no recent heart beats: 66435ms
 exceeds 45000ms

 Do I need to increase the default 45 s to larger values for cases where we
 are doing blocked operation or long compute in the mapPartitions ?

 Thanks.
 Deb

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Processing graphs

2015-02-17 Thread Yifan LI
Hi Kannan,

I am not sure I have understood what your question is exactly, but maybe the 
reduceByKey or reduceByKeyLocally functionality is better to your need.



Best,
Yifan LI





 On 17 Feb 2015, at 17:37, Vijayasarathy Kannan kvi...@vt.edu wrote:
 
 Hi,
 
 I am working on a Spark application that processes graphs and I am trying to 
 do the following.
 
 - group the vertices (key - vertex, value - set of its outgoing edges)
 - distribute each key to separate processes and process them (like mapper)
 - reduce the results back at the main process
 
 Does the groupBy functionality do the distribution by default?
 Do we have to explicitly use RDDs to enable automatic distribution?
 
 It'd be great if you could help me understand these and how to go about with 
 the problem.
 
 Thanks.



RE: Use of nscala-time within spark-shell

2015-02-17 Thread Hammam CHAMSI
I can use nscala-time with scala, but my issue is that I can't use it witinh 
spark-shell console! It gives my the error below.

Thanks

From: kevin...@apache.org
Date: Tue, 17 Feb 2015 08:50:04 +
Subject: Re: Use of nscala-time within spark-shell
To: hscha...@hotmail.com; kevin...@apache.org; user@spark.apache.org

Great, or you can just use nscala-time with scala 2.10!

On Tue Feb 17 2015 at 5:41:53 PM Hammam CHAMSI hscha...@hotmail.com wrote:



Thanks Kevin for your reply,

I downloaded the pre_built version and as you said the default spark scala 
version is 2.10. I'm now building spark 1.2.1 with scala 2.11. I'll share the 
results here.

Regards,
From: kevin...@apache.org
Date: Tue, 17 Feb 2015 01:10:09 +
Subject: Re: Use of nscala-time within spark-shell
To: hscha...@hotmail.com; user@spark.apache.org

What is your scala version used to build Spark? 
It seems your nscala-time library scala version is 2.11,
and default Spark scala version is 2.10. 

On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com wrote:



Hi All,


Thanks in advance for your help. I have timestamp which I need 
to convert to datetime using scala. A folder contains the three needed 
jar files: joda-convert-1.5.jar  joda-time-2.4.jar 
 nscala-time_2.11-1.8.0.jar

Using scala REPL and adding the jars: scala -classpath *.jar

I can use nscala-time like following:


scala import com.github.nscala_time.time.Imports._

import com.github.nscala_time.time.Imports._


scala import org.joda._

import org.joda._


scala DateTime.now

res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00


But when i try to use spark-shell:

ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
 /usr/local/spark/bin/spark-shell --master local --driver-memory 2g 
--executor-memory 2g --executor-cores 1


It successfully imports the jars:


scala import com.github.nscala_time.time.Imports._

import com.github.nscala_time.time.Imports._


scala import org.joda._

import org.joda._


but fails using them

scala DateTime.now

java.lang.NoSuchMethodError: 
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

at 
com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)

at 
com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)

at 
com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)

at com.github.nscala_time.time.Imports$.init(Imports.scala:20)

at com.github.nscala_time.time.Imports$.clinit(Imports.scala)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)

at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)

at $iwC$$iwC$$iwC$$iwC.init(console:28)

at $iwC$$iwC$$iwC.init(console:30)

at $iwC$$iwC.init(console:32)

at $iwC.init(console:34)

at init(console:36)

at .init(console:40)

at .clinit(console)

at .init(console:7)

at .clinit(console)

at $print(console)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)

at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)

at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)

at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)

at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)

at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)

at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)

at 

RE: Use of nscala-time within spark-shell

2015-02-17 Thread Hammam CHAMSI
Thanks Kevin for your reply,

I downloaded the pre_built version and as you said the default spark scala 
version is 2.10. I'm now building spark 1.2.1 with scala 2.11. I'll share the 
results here.

Regards,

From: kevin...@apache.org
Date: Tue, 17 Feb 2015 01:10:09 +
Subject: Re: Use of nscala-time within spark-shell
To: hscha...@hotmail.com; user@spark.apache.org

What is your scala version used to build Spark? 
It seems your nscala-time library scala version is 2.11,
and default Spark scala version is 2.10. 

On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com wrote:



Hi All,


Thanks in advance for your help. I have timestamp which I need 
to convert to datetime using scala. A folder contains the three needed 
jar files: joda-convert-1.5.jar  joda-time-2.4.jar 
 nscala-time_2.11-1.8.0.jar

Using scala REPL and adding the jars: scala -classpath *.jar

I can use nscala-time like following:


scala import com.github.nscala_time.time.Imports._

import com.github.nscala_time.time.Imports._


scala import org.joda._

import org.joda._


scala DateTime.now

res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00


But when i try to use spark-shell:

ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
 /usr/local/spark/bin/spark-shell --master local --driver-memory 2g 
--executor-memory 2g --executor-cores 1


It successfully imports the jars:


scala import com.github.nscala_time.time.Imports._

import com.github.nscala_time.time.Imports._


scala import org.joda._

import org.joda._


but fails using them

scala DateTime.now

java.lang.NoSuchMethodError: 
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

at 
com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)

at 
com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)

at 
com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)

at com.github.nscala_time.time.Imports$.init(Imports.scala:20)

at com.github.nscala_time.time.Imports$.clinit(Imports.scala)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)

at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)

at $iwC$$iwC$$iwC$$iwC.init(console:28)

at $iwC$$iwC$$iwC.init(console:30)

at $iwC$$iwC.init(console:32)

at $iwC.init(console:34)

at init(console:36)

at .init(console:40)

at .clinit(console)

at .init(console:7)

at .clinit(console)

at $print(console)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)

at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)

at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)

at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)

at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)

at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)

at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)

at org.apache.spark.repl.Main$.main(Main.scala:31)

at org.apache.spark.repl.Main.main(Main.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 

Re: Use of nscala-time within spark-shell

2015-02-17 Thread Kevin (Sangwoo) Kim
Great, or you can just use nscala-time with scala 2.10!

On Tue Feb 17 2015 at 5:41:53 PM Hammam CHAMSI hscha...@hotmail.com wrote:

 Thanks Kevin for your reply,

 I downloaded the pre_built version and as you said the default spark scala
 version is 2.10. I'm now building spark 1.2.1 with scala 2.11. I'll share
 the results here.

 Regards,
 --
 From: kevin...@apache.org
 Date: Tue, 17 Feb 2015 01:10:09 +
 Subject: Re: Use of nscala-time within spark-shell
 To: hscha...@hotmail.com; user@spark.apache.org


 What is your scala version used to build Spark?
 It seems your nscala-time library scala version is 2.11,
 and default Spark scala version is 2.10.


 On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com
 wrote:

 Hi All,

 Thanks in advance for your help. I have timestamp which I need to convert
 to datetime using scala. A folder contains the three needed jar files:
 joda-convert-1.5.jar  joda-time-2.4.jar  nscala-time_2.11-1.8.0.jar
 Using scala REPL and adding the jars: scala -classpath *.jar
 I can use nscala-time like following:

 scala import com.github.nscala_time.time.Imports._
 import com.github.nscala_time.time.Imports._

 scala import org.joda._
 import org.joda._

 scala DateTime.now
 res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00

 But when i try to use spark-shell:
 ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
 /usr/local/spark/bin/spark-shell --master local --driver-memory 2g
 --executor-memory 2g --executor-cores 1

 It successfully imports the jars:

 scala import com.github.nscala_time.time.Imports._
 import com.github.nscala_time.time.Imports._

 scala import org.joda._
 import org.joda._

 but fails using them
 scala DateTime.now
 java.lang.NoSuchMethodError:
 scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
 at
 com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)

 at
 com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)

 at
 com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)

 at com.github.nscala_time.time.Imports$.init(Imports.scala:20)
 at com.github.nscala_time.time.Imports$.clinit(Imports.scala)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)
 at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)
 at $iwC$$iwC$$iwC$$iwC.init(console:28)
 at $iwC$$iwC$$iwC.init(console:30)
 at $iwC$$iwC.init(console:32)
 at $iwC.init(console:34)
 at init(console:36)
 at .init(console:40)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
 at
 org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
 at
 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
 at
 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
 at
 org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
 at
 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)

 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
 at
 org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
 at
 org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
 at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)

 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

 at
 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 

Re: Identify the performance bottleneck from hardware prospective

2015-02-17 Thread Akhil Das
What application are you running? Here's a few things:

- You will hit bottleneck on CPU if you are doing some complex computation
(like parsing a json etc.)
- You will hit bottleneck on Memory if your data/objects used in the
program is large (like defining playing with HashMaps etc inside your map*
operations), Here you can set spark.executor.memory to a higher number and
also you can change the spark.storage.memoryFraction whose default value is
0.6 of your executor memory.
- Network will be a bottleneck if data is not available locally on one of
the worker and hence it has to collect it from others, which is a lot of
Serialization and data transfer across your cluster.

Thanks
Best Regards

On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate jalaf...@eng.ucsd.edu
wrote:

 Hi there,

 I am trying to scale up the data size that my application is handling.
 This application is running on a cluster with 16 slave nodes. Each slave
 node has 60GB memory. It is running in standalone mode. The data is coming
 from HDFS that also in same local network.

 In order to have an understanding on how my program is running, I also had
 a Ganglia installed on the cluster. From previous run, I know the stage
 that taking longest time to run is counting word pairs (my RDD consists of
 sentences from a corpus). My goal is to identify the bottleneck of my
 application, then modify my program or hardware configurations according to
 that.

 Unfortunately, I didn't find too much information on Spark monitoring and
 optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for
 application tuning from tasks perspective. Basically, his focus is on tasks
 that oddly slower than the average. However, it didn't solve my problem
 because there is no such tasks that run way slow than others in my case.

 So I tried to identify the bottleneck from hardware prospective. I want to
 know what the limitation of the cluster is. I think if the executers are
 running hard, either CPU, memory or network bandwidth (or maybe the
 combinations) is hitting the roof. But Ganglia reports the CPU utilization
 of cluster is no more than 50%, network utilization is high for several
 seconds at the beginning, then drop close to 0. From Spark UI, I can see
 the nodes with maximum memory usage is consuming around 6GB, while
 spark.executor.memory is set to be 20GB.

 I am very confused that the program is not running fast enough, while
 hardware resources are not in shortage. Could you please give me some hints
 about what decides the performance of a Spark application from hardware
 perspective?

 Thanks!

 Julaiti




Re: Use of nscala-time within spark-shell

2015-02-17 Thread Kevin (Sangwoo) Kim
Then, why don't you use nscala-time_2.10-1.8.0.jar, not
nscala-time_2.11-1.8.0.jar ?

On Tue Feb 17 2015 at 5:55:50 PM Hammam CHAMSI hscha...@hotmail.com wrote:

 I can use nscala-time with scala, but my issue is that I can't use it
 witinh spark-shell console! It gives my the error below.

 Thanks

 --
 From: kevin...@apache.org
 Date: Tue, 17 Feb 2015 08:50:04 +

 Subject: Re: Use of nscala-time within spark-shell
 To: hscha...@hotmail.com; kevin...@apache.org; user@spark.apache.org


 Great, or you can just use nscala-time with scala 2.10!

 On Tue Feb 17 2015 at 5:41:53 PM Hammam CHAMSI hscha...@hotmail.com
 wrote:

 Thanks Kevin for your reply,

 I downloaded the pre_built version and as you said the default spark scala
 version is 2.10. I'm now building spark 1.2.1 with scala 2.11. I'll share
 the results here.

 Regards,
 --
 From: kevin...@apache.org
 Date: Tue, 17 Feb 2015 01:10:09 +
 Subject: Re: Use of nscala-time within spark-shell
 To: hscha...@hotmail.com; user@spark.apache.org


 What is your scala version used to build Spark?
 It seems your nscala-time library scala version is 2.11,
 and default Spark scala version is 2.10.


 On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com
 wrote:

 Hi All,

 Thanks in advance for your help. I have timestamp which I need to convert
 to datetime using scala. A folder contains the three needed jar files:
 joda-convert-1.5.jar  joda-time-2.4.jar  nscala-time_2.11-1.8.0.jar
 Using scala REPL and adding the jars: scala -classpath *.jar
 I can use nscala-time like following:

 scala import com.github.nscala_time.time.Imports._
 import com.github.nscala_time.time.Imports._

 scala import org.joda._
 import org.joda._

 scala DateTime.now
 res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00

 But when i try to use spark-shell:
 ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
 /usr/local/spark/bin/spark-shell --master local --driver-memory 2g
 --executor-memory 2g --executor-cores 1

 It successfully imports the jars:

 scala import com.github.nscala_time.time.Imports._
 import com.github.nscala_time.time.Imports._

 scala import org.joda._
 import org.joda._

 but fails using them
 scala DateTime.now
 java.lang.NoSuchMethodError:
 scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
 at
 com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)

 at
 com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)

 at
 com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)

 at com.github.nscala_time.time.Imports$.init(Imports.scala:20)
 at com.github.nscala_time.time.Imports$.clinit(Imports.scala)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)
 at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)
 at $iwC$$iwC$$iwC$$iwC.init(console:28)
 at $iwC$$iwC$$iwC.init(console:30)
 at $iwC$$iwC.init(console:32)
 at $iwC.init(console:34)
 at init(console:36)
 at .init(console:40)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
 at
 org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
 at
 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
 at
 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
 at
 org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
 at
 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)

 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
 at
 org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
 at
 org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
 at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)

 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

 at
 

RE: How do you get the partitioner for an RDD in Java?

2015-02-17 Thread Mohammed Guller
Where did you look?

BTW, it is defined in the RDD class as a val:

val  partitioner: Option[Partitioner] 


Mohammed

-Original Message-
From: Darin McBeath [mailto:ddmcbe...@yahoo.com.INVALID] 
Sent: Tuesday, February 17, 2015 1:45 PM
To: User
Subject: How do you get the partitioner for an RDD in Java?

In an 'early release' of the Learning Spark book, there is the following 
reference:

In Scala and Java, you can determine how an RDD is partitioned using its 
partitioner property (or partitioner() method in Java)

However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or a way 
of getting this information.

I'm curious if anyone has any suggestions for how I might go about finding how 
an RDD is partitioned in a Java program.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Use of nscala-time within spark-shell

2015-02-17 Thread Hammam CHAMSI
My fault, I didn't notice the 11 in the jar name. It is working now with 
nscala-time_2.10-1.8.0.jar

Thanks Kevin

From: kevin...@apache.org
Date: Tue, 17 Feb 2015 08:58:13 +
Subject: Re: Use of nscala-time within spark-shell
To: hscha...@hotmail.com; kevin...@apache.org; user@spark.apache.org

Then, why don't you use nscala-time_2.10-1.8.0.jar, not 
nscala-time_2.11-1.8.0.jar ?

On Tue Feb 17 2015 at 5:55:50 PM Hammam CHAMSI hscha...@hotmail.com wrote:



I can use nscala-time with scala, but my issue is that I can't use it witinh 
spark-shell console! It gives my the error below.

Thanks

From: kevin...@apache.org
Date: Tue, 17 Feb 2015 08:50:04 +
Subject: Re: Use of nscala-time within spark-shell
To: hscha...@hotmail.com; kevin...@apache.org; user@spark.apache.org

Great, or you can just use nscala-time with scala 2.10!

On Tue Feb 17 2015 at 5:41:53 PM Hammam CHAMSI hscha...@hotmail.com wrote:



Thanks Kevin for your reply,

I downloaded the pre_built version and as you said the default spark scala 
version is 2.10. I'm now building spark 1.2.1 with scala 2.11. I'll share the 
results here.

Regards,
From: kevin...@apache.org
Date: Tue, 17 Feb 2015 01:10:09 +
Subject: Re: Use of nscala-time within spark-shell
To: hscha...@hotmail.com; user@spark.apache.org

What is your scala version used to build Spark? 
It seems your nscala-time library scala version is 2.11,
and default Spark scala version is 2.10. 

On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com wrote:



Hi All,


Thanks in advance for your help. I have timestamp which I need 
to convert to datetime using scala. A folder contains the three needed 
jar files: joda-convert-1.5.jar  joda-time-2.4.jar 
 nscala-time_2.11-1.8.0.jar

Using scala REPL and adding the jars: scala -classpath *.jar

I can use nscala-time like following:


scala import com.github.nscala_time.time.Imports._

import com.github.nscala_time.time.Imports._


scala import org.joda._

import org.joda._


scala DateTime.now

res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00


But when i try to use spark-shell:

ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
 /usr/local/spark/bin/spark-shell --master local --driver-memory 2g 
--executor-memory 2g --executor-cores 1


It successfully imports the jars:


scala import com.github.nscala_time.time.Imports._

import com.github.nscala_time.time.Imports._


scala import org.joda._

import org.joda._


but fails using them

scala DateTime.now

java.lang.NoSuchMethodError: 
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

at 
com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)

at 
com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)

at 
com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)

at com.github.nscala_time.time.Imports$.init(Imports.scala:20)

at com.github.nscala_time.time.Imports$.clinit(Imports.scala)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)

at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)

at $iwC$$iwC$$iwC$$iwC.init(console:28)

at $iwC$$iwC$$iwC.init(console:30)

at $iwC$$iwC.init(console:32)

at $iwC.init(console:34)

at init(console:36)

at .init(console:40)

at .clinit(console)

at .init(console:7)

at .clinit(console)

at $print(console)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)

at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)

at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)

at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)

at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)

at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)

at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)

at 

Re: Configration Problem? (need help to get Spark job executed)

2015-02-17 Thread Arush Kharbanda
Hi

It could be due to the connectivity issue between the master and the slaves.

I have seen this issue occur for the following reasons.Are the slaves
visible in the Spark UI?And how much memory is allocated to the executors.

1. Syncing of configuration between Spark Master and Slaves.
2. Network connectivity issues between the master and slave.

Thanks
Arush

On Sat, Feb 14, 2015 at 3:07 PM, NORD SC jan.algermis...@nordsc.com wrote:

 Hi all,

 I am new to spark and seem to have hit a common newbie obstacle.

 I have a pretty simple setup and job but I am unable to get past this
 error when executing a job:

 TaskSchedulerImpl: Initial job has not accepted any resources; check your
 cluster UI to ensure that workers are registered and have sufficient memory”

 I have so far gained a basic understanding of worker/executor/driver
 memory, but have run out of ideas what to try next - maybe someone has a
 clue.


 My setup:

 Three node standalone cluster with C* and spark on each node and the
 Datastax C*/Spark connector JAR placed on each node.

 On the master I have the slaves configured in conf/slaves and I am using
 sbin/start-all.sh to start the whole cluster.

 On each node I have this in conf/spark-defauls.conf

 spark.masterspark://devpeng-db-cassandra-1:7077
 spark.eventLog.enabled   true
 spark.serializer org.apache.spark.serializer.KryoSerializer

 spark.executor.extraClassPath
 /opt/spark-cassandra-connector-assembly-1.2.0-alpha1.jar

 and this in conf/spart-env.sh

 SPARK_WORKER_MEMORY=6g



 My App looks like this

 object TestApp extends App {
   val conf = new SparkConf(true).set(spark.cassandra.connection.host,
 devpeng-db-cassandra-1.)
   val sc = new SparkContext(spark://devpeng-db-cassandra-1:7077,
 testApp, conf)
   val rdd = sc.cassandraTable(test, kv)
   println(“Count: “ + String.valueOf(rdd.count) )
   println(rdd.first)
 }

 Any kind of idea what to check next would help me at this point, I think.

 Jan

 Log of the application start:

 [info] Loading project definition from
 /Users/jan/projects/gkh/jump/workspace/gkh-spark-example/project
 [info] Set current project to csconnect (in build
 file:/Users/jan/projects/gkh/jump/workspace/gkh-spark-example/)
 [info] Compiling 1 Scala source to
 /Users/jan/projects/gkh/jump/workspace/gkh-spark-example/target/scala-2.10/classes...
 [info] Running jump.TestApp
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/02/14 10:30:11 INFO SecurityManager: Changing view acls to: jan
 15/02/14 10:30:11 INFO SecurityManager: Changing modify acls to: jan
 15/02/14 10:30:11 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(jan); users
 with modify permissions: Set(jan)
 15/02/14 10:30:11 INFO Slf4jLogger: Slf4jLogger started
 15/02/14 10:30:11 INFO Remoting: Starting remoting
 15/02/14 10:30:12 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@xx:58197]
 15/02/14 10:30:12 INFO Utils: Successfully started service 'sparkDriver'
 on port 58197.
 15/02/14 10:30:12 INFO SparkEnv: Registering MapOutputTracker
 15/02/14 10:30:12 INFO SparkEnv: Registering BlockManagerMaster
 15/02/14 10:30:12 INFO DiskBlockManager: Created local directory at
 /var/folders/vr/w3whx92d0356g5nj1p6s59grgn/T/spark-local-20150214103012-5b53
 15/02/14 10:30:12 INFO MemoryStore: MemoryStore started with capacity
 530.3 MB
 2015-02-14 10:30:12.304 java[24999:3b07] Unable to load realm info from
 SCDynamicStore
 15/02/14 10:30:12 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/02/14 10:30:12 INFO HttpFileServer: HTTP File server directory is
 /var/folders/vr/w3whx92d0356g5nj1p6s59grgn/T/spark-48459a22-c1ff-42d5-8b8e-cc89fe84933d
 15/02/14 10:30:12 INFO HttpServer: Starting HTTP Server
 15/02/14 10:30:12 INFO Utils: Successfully started service 'HTTP file
 server' on port 58198.
 15/02/14 10:30:12 INFO Utils: Successfully started service 'SparkUI' on
 port 4040.
 15/02/14 10:30:12 INFO SparkUI: Started SparkUI at http://xx:4040
 15/02/14 10:30:12 INFO AppClient$ClientActor: Connecting to master
 spark://devpeng-db-cassandra-1:7077...
 15/02/14 10:30:13 INFO SparkDeploySchedulerBackend: Connected to Spark
 cluster with app ID app-20150214103013-0001
 15/02/14 10:30:13 INFO AppClient$ClientActor: Executor added:
 app-20150214103013-0001/0 on
 worker-20150214102534-devpeng-db-cassandra-2.devpeng
 (devpeng-db-cassandra-2.devpeng.x:57563) with 8 cores
 15/02/14 10:30:13 INFO SparkDeploySchedulerBackend: Granted executor ID
 app-20150214103013-0001/0 on hostPort
 devpeng-db-cassandra-2.devpeng.:57563 with 8 cores, 512.0 MB RAM
 15/02/14 10:30:13 INFO AppClient$ClientActor: Executor added:
 app-20150214103013-0001/1 on
 worker-20150214102534-devpeng-db-cassandra-3.devpeng.-38773
 

Re: Identify the performance bottleneck from hardware prospective

2015-02-17 Thread Julaiti Alafate
Thank you very much for your reply!

My task is to count the number of word pairs in a document. If w1 and w2
occur together in one sentence, the number of occurrence of word pair (w1,
w2) adds 1. So the computational part of this algorithm is simply a
two-level for-loop.

Since the cluster is monitored by Ganglia, I can easily see that neither
CPU or network IO is under pressure. The only parameter left is memory. In
the executor tab of Spark Web UI, I can see a column named memory used.
It showed that only 6GB of 20GB memory is used. I understand this is
measuring the size of RDD that persist in memory. So can I at least assume
the data/object I used in my program is not exceeding memory limit?

My confusion here is, why can't my program run faster while there is still
efficient memory, CPU time and network bandwidth it can utilize?

Best regards,
Julaiti


On Tue, Feb 17, 2015 at 12:53 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 What application are you running? Here's a few things:

 - You will hit bottleneck on CPU if you are doing some complex computation
 (like parsing a json etc.)
 - You will hit bottleneck on Memory if your data/objects used in the
 program is large (like defining playing with HashMaps etc inside your map*
 operations), Here you can set spark.executor.memory to a higher number and
 also you can change the spark.storage.memoryFraction whose default value is
 0.6 of your executor memory.
 - Network will be a bottleneck if data is not available locally on one of
 the worker and hence it has to collect it from others, which is a lot of
 Serialization and data transfer across your cluster.

 Thanks
 Best Regards

 On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate jalaf...@eng.ucsd.edu
 wrote:

 Hi there,

 I am trying to scale up the data size that my application is handling.
 This application is running on a cluster with 16 slave nodes. Each slave
 node has 60GB memory. It is running in standalone mode. The data is coming
 from HDFS that also in same local network.

 In order to have an understanding on how my program is running, I also
 had a Ganglia installed on the cluster. From previous run, I know the stage
 that taking longest time to run is counting word pairs (my RDD consists of
 sentences from a corpus). My goal is to identify the bottleneck of my
 application, then modify my program or hardware configurations according to
 that.

 Unfortunately, I didn't find too much information on Spark monitoring and
 optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for
 application tuning from tasks perspective. Basically, his focus is on tasks
 that oddly slower than the average. However, it didn't solve my problem
 because there is no such tasks that run way slow than others in my case.

 So I tried to identify the bottleneck from hardware prospective. I want
 to know what the limitation of the cluster is. I think if the executers are
 running hard, either CPU, memory or network bandwidth (or maybe the
 combinations) is hitting the roof. But Ganglia reports the CPU utilization
 of cluster is no more than 50%, network utilization is high for several
 seconds at the beginning, then drop close to 0. From Spark UI, I can see
 the nodes with maximum memory usage is consuming around 6GB, while
 spark.executor.memory is set to be 20GB.

 I am very confused that the program is not running fast enough, while
 hardware resources are not in shortage. Could you please give me some hints
 about what decides the performance of a Spark application from hardware
 perspective?

 Thanks!

 Julaiti





Re: ERROR actor.OneForOneStrategy: org.apache.hadoop.conf.Configuration

2015-02-17 Thread Sean Owen
Tip: to debug where the unserializable reference comes from, run with

-Dsun.io.serialization.extendeddebuginfo=true

On Tue, Feb 17, 2015 at 10:20 AM, Jadhav Shweta jadhav.shw...@tcs.com wrote:
 15/02/17 12:57:10 ERROR actor.OneForOneStrategy:
 org.apache.hadoop.conf.Configuration
 java.io.NotSerializableException: org.apache.hadoop.conf.Configuration

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Identify the performance bottleneck from hardware prospective

2015-02-17 Thread Arush Kharbanda
Hi

How big is your dataset?

Thanks
Arush

On Tue, Feb 17, 2015 at 4:06 PM, Julaiti Alafate jalaf...@eng.ucsd.edu
wrote:

 Thank you very much for your reply!

 My task is to count the number of word pairs in a document. If w1 and w2
 occur together in one sentence, the number of occurrence of word pair (w1,
 w2) adds 1. So the computational part of this algorithm is simply a
 two-level for-loop.

 Since the cluster is monitored by Ganglia, I can easily see that neither
 CPU or network IO is under pressure. The only parameter left is memory. In
 the executor tab of Spark Web UI, I can see a column named memory used.
 It showed that only 6GB of 20GB memory is used. I understand this is
 measuring the size of RDD that persist in memory. So can I at least assume
 the data/object I used in my program is not exceeding memory limit?

 My confusion here is, why can't my program run faster while there is still
 efficient memory, CPU time and network bandwidth it can utilize?

 Best regards,
 Julaiti


 On Tue, Feb 17, 2015 at 12:53 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 What application are you running? Here's a few things:

 - You will hit bottleneck on CPU if you are doing some complex
 computation (like parsing a json etc.)
 - You will hit bottleneck on Memory if your data/objects used in the
 program is large (like defining playing with HashMaps etc inside your map*
 operations), Here you can set spark.executor.memory to a higher number and
 also you can change the spark.storage.memoryFraction whose default value is
 0.6 of your executor memory.
 - Network will be a bottleneck if data is not available locally on one of
 the worker and hence it has to collect it from others, which is a lot of
 Serialization and data transfer across your cluster.

 Thanks
 Best Regards

 On Tue, Feb 17, 2015 at 11:20 AM, Julaiti Alafate jalaf...@eng.ucsd.edu
 wrote:

 Hi there,

 I am trying to scale up the data size that my application is handling.
 This application is running on a cluster with 16 slave nodes. Each slave
 node has 60GB memory. It is running in standalone mode. The data is coming
 from HDFS that also in same local network.

 In order to have an understanding on how my program is running, I also
 had a Ganglia installed on the cluster. From previous run, I know the stage
 that taking longest time to run is counting word pairs (my RDD consists of
 sentences from a corpus). My goal is to identify the bottleneck of my
 application, then modify my program or hardware configurations according to
 that.

 Unfortunately, I didn't find too much information on Spark monitoring
 and optimization topics. Reynold Xin gave a great talk on Spark Summit 2014
 for application tuning from tasks perspective. Basically, his focus is on
 tasks that oddly slower than the average. However, it didn't solve my
 problem because there is no such tasks that run way slow than others in my
 case.

 So I tried to identify the bottleneck from hardware prospective. I want
 to know what the limitation of the cluster is. I think if the executers are
 running hard, either CPU, memory or network bandwidth (or maybe the
 combinations) is hitting the roof. But Ganglia reports the CPU utilization
 of cluster is no more than 50%, network utilization is high for several
 seconds at the beginning, then drop close to 0. From Spark UI, I can see
 the nodes with maximum memory usage is consuming around 6GB, while
 spark.executor.memory is set to be 20GB.

 I am very confused that the program is not running fast enough, while
 hardware resources are not in shortage. Could you please give me some hints
 about what decides the performance of a Spark application from hardware
 perspective?

 Thanks!

 Julaiti






-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Cleanup Questions

2015-02-17 Thread Ashic Mahtab
Two questions regarding worker cleanup:
1) Is the best place to enable worker cleanup setting
export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true 
-Dspark.worker.cleanup.interval=30 in conf/spark-env.sh for each worker? Or is 
there a better place?
2) I see this has a default TTL of 7 days. I've got some spark streaming jobs 
that run for more than 7 days. Would the spark cleanup honour currently running 
applications and not discard the data, or will it obliterate everything that's 
older than 7 days? If the latter, what's a good approach to clean up 
considering my streaming app will be running for  ttl period? 
Thanks,Ashic. 

Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-17 Thread Emre Sevinc
I've decided to try

  spark-submit ... --conf
spark.driver.extraJavaOptions=-DpropertiesFile=/home/emre/data/myModule.properties

But when I try to retrieve the value of propertiesFile via

   System.err.println(propertiesFile :  +
System.getProperty(propertiesFile));

I get NULL:

   propertiesFile : null

Interestingly, when I run spark-submit with --verbose, I see that it prints:

  spark.driver.extraJavaOptions -
-DpropertiesFile=/home/emre/data/belga/schemavalidator.properties

I couldn't understand why I couldn't get to the value of propertiesFile
by using standard System.getProperty method. (I can use new
SparkConf().get(spark.driver.extraJavaOptions)  and manually parse it,
and retrieve the value, but I'd like to know why I cannot retrieve that
value using System.getProperty method).

Any ideas?

If I can achieve what I've described above properly, I plan to pass a
properties file that resides on HDFS, so that it will be available to my
driver program wherever that program runs.

--
Emre




On Mon, Feb 16, 2015 at 4:41 PM, Charles Feduke charles.fed...@gmail.com
wrote:

 I haven't actually tried mixing non-Spark settings into the Spark
 properties. Instead I package my properties into the jar and use the
 Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
 specific) to get at my properties:

 Properties file: src/main/resources/integration.conf

 (below $ENV might be set to either integration or prod[3])

 ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \
 --conf 'config.resource=$ENV.conf' \
 --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'

 Since the properties file is packaged up with the JAR I don't have to
 worry about sending the file separately to all of the slave nodes. Typesafe
 Config is written in Java so it will work if you're not using Scala. (The
 Typesafe Config also has the advantage of being extremely easy to integrate
 with code that is using Java Properties today.)

 If you instead want to send the file separately from the JAR and you use
 the Typesafe Config library, you can specify config.file instead of
 .resource; though I'd point you to [3] below if you want to make your
 development life easier.

 1. https://github.com/typesafehub/config
 2. https://github.com/ceedubs/ficus
 3.
 http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/



 On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I'm using Spark 1.2.1 and have a module.properties file, and in it I have
 non-Spark properties, as well as Spark properties, e.g.:

job.output.dir=file:///home/emre/data/mymodule/out

 I'm trying to pass it to spark-submit via:

spark-submit --class com.myModule --master local[4] --deploy-mode
 client --verbose --properties-file /home/emre/data/mymodule.properties
 mymodule.jar

 And I thought I could read the value of my non-Spark property, namely,
 job.output.dir by using:

 SparkConf sparkConf = new SparkConf();
 final String validatedJSONoutputDir = sparkConf.get(job.output.dir);

 But it gives me an exception:

 Exception in thread main java.util.NoSuchElementException:
 job.output.dir

 Is it not possible to mix Spark and non-Spark properties in a single
 .properties file, then pass it via --properties-file and then get the
 values of those non-Spark properties via SparkConf?

 Or is there another object / method to retrieve the values for those
 non-Spark properties?


 --
 Emre Sevinç




-- 
Emre Sevinc