Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-19 Thread Cody Koeninger
Is there a reason not to just use scala?  It's not a lot of code... and
it'll be even less code in scala ;)

On Wed, Aug 19, 2015 at 4:14 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but
 OptionKafkaRDD[K, V, U, T, R]   is not subclass of OptionRDD[R];

 In scala C[T’] is a subclass of C[T] as per
 https://twitter.github.io/scala_school/type-basics.html
 but this is not allowed in java.

 So is there any workaround to achieve this in java for overriding 
 DirectKafkaInputDStream
 ?


 On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java
 generic inheritance is not supported so derived class cannot return
  different genric typed subclass from overriden method.

 On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Option is covariant and KafkaRDD is a subclass of RDD

 On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is it that in scala its allowed for derived class to have any return
 type ?

  And streaming jar is originally created in scala so its allowed for
 DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
 compute method ?

 On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 looking at source code of
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream

 override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]]
 = {
 val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
 val rdd = KafkaRDD[K, V, U, T, R](
   context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
 messageHandler)

 currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset)
 Some(rdd)
   }


 But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,

 So what should  be the return type of custom DStream extends
 DirectKafkaInputDStream .
 Since I want the behaviour to be same as of DirectKafkaInputDStream
  in normal scenarios and return none in specific scenario.

 And why the same error did not come while extending
 DirectKafkaInputDStream from InputDStream ? Since new return type 
 Option[KafkaRDD[K,
 V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
 failed?




 On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org
 wrote:

 The superclass method in DStream is defined as returning an
 Option[RDD[T]]

 On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Getting compilation error while overriding compute method of
 DirectKafkaInputDStream.


 [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
 compute(org.apache.spark.streaming.Time) in 
 CustomDirectKafkaInputDstream
 cannot override compute(org.apache.spark.streaming.Time) in
 org.apache.spark.streaming.dstream.DStream; attempting to use 
 incompatible
 return type

 [ERROR] found   :
 scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]

 [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][]


 class :

 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder,
 DefaultDecoder, byte[][] compute(
 Time validTime) {

 int processed=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }
 }


 What should be the return type of compute method ? super class is
 returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, 
 DefaultDecoder,
 byte[][]  but its expecting
  scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived  class . 
 Is
 there something wring with code?

 On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Look at the definitions of the java-specific
 KafkaUtils.createDirectStream methods (the ones that take a
 JavaStreamingContext)

 On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 How to create classtag in java ?Also Constructor
 of DirectKafkaInputDStream takes Function1 not Function but
 kafkautils.createDirectStream allows function.

 I have below as overriden DirectKafkaInputDStream.


 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], 
 kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 public CustomDirectKafkaInputDstream(
 StreamingContext ssc_,
 MapString, String kafkaParams,
 MapTopicAndPartition, Object fromOffsets,
 Function1MessageAndMetadatabyte[], byte[], byte[][]
 messageHandler,
 ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2,
 

Re: What's the best practice for developing new features for spark ?

2015-08-19 Thread Zoltán Zvara
I personally build with SBT and run Spark on YARN with IntelliJ. You need
to connect to remote JVMs with a remote debugger. You also need to do
similar, if you use Python, because it will launch a JVM on the driver
aswell.

On Wed, Aug 19, 2015 at 2:10 PM canan chen ccn...@gmail.com wrote:

 Thanks Ted.  I notice another thread about running spark programmatically
 (client mode for standalone and yarn). Would it be much easier to debug
 spark if is is possible ? Hasn't anyone thought about it ?

 On Wed, Aug 19, 2015 at 5:50 PM, Ted Yu yuzhih...@gmail.com wrote:

 See this thread:


 http://search-hadoop.com/m/q3RTtdZv0d1btRHl/Spark+build+modulesubj=Building+Spark+Building+just+one+module+



  On Aug 19, 2015, at 1:44 AM, canan chen ccn...@gmail.com wrote:
 
  I want to work on one jira, but it is not easy to do unit test, because
 it involves different components especially UI. spark building is pretty
 slow, I don't want to build it each time to test my code change. I am
 wondering how other people do ? Is there any experience can share ? Thanks
 
 





RE: Scala: How to match a java object????

2015-08-19 Thread Saif.A.Ellafi
Hi, thank you all for the asssistance.

It is odd, it works when creating a new java.mathBigDecimal object, but not if 
I work directly with

scala 5 match { case x: java.math.BigDecimal = 2 }
console:23: error: scrutinee is incompatible with pattern type;
found   : java.math.BigDecimal
required: Int
  5 match { case x: java.math.BigDecimal = 2 }

I will try and see how it works for my Seq[Any] and see. Thanks for the work 
arounds.
Saif

From: Sujit Pal [mailto:sujitatgt...@gmail.com]
Sent: Tuesday, August 18, 2015 6:25 PM
To: Ellafi, Saif A.
Cc: wrbri...@gmail.com; user
Subject: Re: Scala: How to match a java object

Hi Saif,

Would this work?

import scala.collection.JavaConversions._

new java.math.BigDecimal(5) match { case x: java.math.BigDecimal = 
x.doubleValue }

It gives me on the scala console.

res9: Double = 5.0

Assuming you had a stream of BigDecimals, you could just call map on it.

myBigDecimals.map(_.doubleValue)

to get your Seq of Doubles. You will need the JavaConversions._ import to allow 
Java Doubles to be treated by Scala as Scala Doubles.

-sujit

On Tue, Aug 18, 2015 at 12:59 PM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi, thank you for further assistance

you can reproduce this by simply running

5 match { case java.math.BigDecimal = 2 }

In my personal case, I am applying a map acton to a Seq[Any], so the elements 
inside are of type any, to which I need to apply a proper 
.asInstanceOf[WhoYouShouldBe].

Saif

From: William Briggs [mailto:wrbri...@gmail.commailto:wrbri...@gmail.com]
Sent: Tuesday, August 18, 2015 4:46 PM
To: Ellafi, Saif A.; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Scala: How to match a java object


Could you share your pattern matching expression that is failing?

On Tue, Aug 18, 2015, 3:38 PM  
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi all,

I am trying to run a spark job, in which I receive java.math.BigDecimal 
objects, instead of the scala equivalents, and I am trying to convert them into 
Doubles.
If I try to match-case this object class, I get: “error: object 
java.math.BigDecimal is not a value”

How could I get around matching java objects? I would like to avoid a multiple 
try-catch on ClassCastExceptions for all my checks.

Thank you,
Saif




RE: Scala: How to match a java object????

2015-08-19 Thread Saif.A.Ellafi
It is okay. this methodology works very well for mapping objects of my Seq[Any].

It is indeed very cool :-)

Saif

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, August 19, 2015 10:47 AM
To: Ellafi, Saif A.
Cc: sujitatgt...@gmail.com; wrbri...@gmail.com; user@spark.apache.org
Subject: Re: Scala: How to match a java object

Saif:
In your example below, the error was due to there is no automatic conversion 
from Int to BigDecimal.

Cheers


On Aug 19, 2015, at 6:40 AM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi, thank you all for the asssistance.

It is odd, it works when creating a new java.mathBigDecimal object, but not if 
I work directly with

scala 5 match { case x: java.math.BigDecimal = 2 }
console:23: error: scrutinee is incompatible with pattern type;
found   : java.math.BigDecimal
required: Int
  5 match { case x: java.math.BigDecimal = 2 }

I will try and see how it works for my Seq[Any] and see. Thanks for the work 
arounds.
Saif

From: Sujit Pal [mailto:sujitatgt...@gmail.com]
Sent: Tuesday, August 18, 2015 6:25 PM
To: Ellafi, Saif A.
Cc: wrbri...@gmail.commailto:wrbri...@gmail.com; user
Subject: Re: Scala: How to match a java object

Hi Saif,

Would this work?

import scala.collection.JavaConversions._

new java.math.BigDecimal(5) match { case x: java.math.BigDecimal = 
x.doubleValue }

It gives me on the scala console.

res9: Double = 5.0

Assuming you had a stream of BigDecimals, you could just call map on it.

myBigDecimals.map(_.doubleValue)

to get your Seq of Doubles. You will need the JavaConversions._ import to allow 
Java Doubles to be treated by Scala as Scala Doubles.

-sujit

On Tue, Aug 18, 2015 at 12:59 PM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi, thank you for further assistance

you can reproduce this by simply running

5 match { case java.math.BigDecimal = 2 }

In my personal case, I am applying a map acton to a Seq[Any], so the elements 
inside are of type any, to which I need to apply a proper 
.asInstanceOf[WhoYouShouldBe].

Saif

From: William Briggs [mailto:wrbri...@gmail.commailto:wrbri...@gmail.com]
Sent: Tuesday, August 18, 2015 4:46 PM
To: Ellafi, Saif A.; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Scala: How to match a java object


Could you share your pattern matching expression that is failing?

On Tue, Aug 18, 2015, 3:38 PM  
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi all,

I am trying to run a spark job, in which I receive java.math.BigDecimal 
objects, instead of the scala equivalents, and I am trying to convert them into 
Doubles.
If I try to match-case this object class, I get: “error: object 
java.math.BigDecimal is not a value”

How could I get around matching java objects? I would like to avoid a multiple 
try-catch on ClassCastExceptions for all my checks.

Thank you,
Saif




Re: What is the reason for ExecutorLostFailure?

2015-08-19 Thread VIJAYAKUMAR JAWAHARLAL
Hints are good. Thanks Corey. I will try to find out more from the logs.

 On Aug 18, 2015, at 7:23 PM, Corey Nolet cjno...@gmail.com wrote:
 
 Usually more information as to the cause of this will be found down in your 
 logs. I generally see this happen when an out of memory exception has 
 occurred for one reason or another on an executor. It's possible your memory 
 settings are too small per executor or the concurrent number of tasks you are 
 running are too large for some of the executors. Other times, it's possible 
 using RDD functions like groupBy() that collect an unbounded amount of items 
 into memory could be causing it. 
 
 Either way, the logs for the executors should be able to give you some 
 insight, have you looked at those yet?
 
 On Tue, Aug 18, 2015 at 6:26 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io 
 mailto:sparkh...@data2o.io wrote:
 Hi All
 
 Why am I getting ExecutorLostFailure and executors are completely lost for 
 rest of the processing? Eventually it makes job to fail. One thing for sure 
 that lot of shuffling happens across executors in my program. 
 
 Is there a way to understand and debug ExecutorLostFailure? Any pointers 
 regarding “ExecutorLostFailure” would help me a lot.
 
 Thanks
 Vijay
 



Re: What's the best practice for developing new features for spark ?

2015-08-19 Thread canan chen
Thanks Ted.  I notice another thread about running spark programmatically
(client mode for standalone and yarn). Would it be much easier to debug
spark if is is possible ? Hasn't anyone thought about it ?

On Wed, Aug 19, 2015 at 5:50 PM, Ted Yu yuzhih...@gmail.com wrote:

 See this thread:


 http://search-hadoop.com/m/q3RTtdZv0d1btRHl/Spark+build+modulesubj=Building+Spark+Building+just+one+module+



  On Aug 19, 2015, at 1:44 AM, canan chen ccn...@gmail.com wrote:
 
  I want to work on one jira, but it is not easy to do unit test, because
 it involves different components especially UI. spark building is pretty
 slow, I don't want to build it each time to test my code change. I am
 wondering how other people do ? Is there any experience can share ? Thanks
 
 



Re: Scala: How to match a java object????

2015-08-19 Thread Ted Yu
Saif:
In your example below, the error was due to there is no automatic conversion 
from Int to BigDecimal. 

Cheers



 On Aug 19, 2015, at 6:40 AM, saif.a.ell...@wellsfargo.com 
 saif.a.ell...@wellsfargo.com wrote:
 
 Hi, thank you all for the asssistance.
  
 It is odd, it works when creating a new java.mathBigDecimal object, but not 
 if I work directly with
  
 scala 5 match { case x: java.math.BigDecimal = 2 }
 console:23: error: scrutinee is incompatible with pattern type;
 found   : java.math.BigDecimal
 required: Int
   5 match { case x: java.math.BigDecimal = 2 }
  
 I will try and see how it works for my Seq[Any] and see. Thanks for the work 
 arounds.
 Saif
  
 From: Sujit Pal [mailto:sujitatgt...@gmail.com] 
 Sent: Tuesday, August 18, 2015 6:25 PM
 To: Ellafi, Saif A.
 Cc: wrbri...@gmail.com; user
 Subject: Re: Scala: How to match a java object
  
 Hi Saif,
  
 Would this work?
 import scala.collection.JavaConversions._
 
 new java.math.BigDecimal(5) match { case x: java.math.BigDecimal = 
 x.doubleValue }
 
 It gives me on the scala console.
 
 res9: Double = 5.0
 
 Assuming you had a stream of BigDecimals, you could just call map on it.
 
 myBigDecimals.map(_.doubleValue)
 
 to get your Seq of Doubles. You will need the JavaConversions._ import to 
 allow Java Doubles to be treated by Scala as Scala Doubles.
 
 -sujit
 
  
 On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote:
 Hi, thank you for further assistance
  
 you can reproduce this by simply running
  
 5 match { case java.math.BigDecimal = 2 }
  
 In my personal case, I am applying a map acton to a Seq[Any], so the elements 
 inside are of type any, to which I need to apply a proper 
 .asInstanceOf[WhoYouShouldBe].
  
 Saif
  
 From: William Briggs [mailto:wrbri...@gmail.com] 
 Sent: Tuesday, August 18, 2015 4:46 PM
 To: Ellafi, Saif A.; user@spark.apache.org
 Subject: Re: Scala: How to match a java object
  
 Could you share your pattern matching expression that is failing?
 
  
 On Tue, Aug 18, 2015, 3:38 PM  saif.a.ell...@wellsfargo.com wrote:
 Hi all,
  
 I am trying to run a spark job, in which I receive java.math.BigDecimal 
 objects, instead of the scala equivalents, and I am trying to convert them 
 into Doubles.
 If I try to match-case this object class, I get: “error: object 
 java.math.BigDecimal is not a value”
  
 How could I get around matching java objects? I would like to avoid a 
 multiple try-catch on ClassCastExceptions for all my checks.
  
 Thank you,
 Saif
  
  


Re: Why use spark.history.fs.logDirectory instead of spark.eventLog.dir

2015-08-19 Thread canan chen
Anyone know about this ? Or do I miss something here ?

On Fri, Aug 7, 2015 at 4:20 PM, canan chen ccn...@gmail.com wrote:

 Is there any reason that historyserver use another property for the event
 log dir ? Thanks



Re: Difference between Sort based and Hash based shuffle

2015-08-19 Thread Muhammad Haseeb Javed
Thanks Andrew for a detailed response,

So the reason why key value pairs with same keys are always found in a
single buckets in Hash based shuffle but not in Sort is because in
sort-shuffle each mapper writes a single partitioned file, and it is up to
the reducer to fetch correct partitions from the the files ?

On Wed, Aug 19, 2015 at 2:13 AM, Andrew Or and...@databricks.com wrote:

 Hi Muhammad,

 On a high level, in hash-based shuffle each mapper M writes R shuffle
 files, one for each reducer where R is the number of reduce partitions.
 This results in M * R shuffle files. Since it is not uncommon for M and R
 to be O(1000), this quickly becomes expensive. An optimization with
 hash-based shuffle is consolidation, where all mappers run in the same core
 C write one file per reducer, resulting in C * R files. This is a strict
 improvement, but it is still relatively expensive.

 Instead, in sort-based shuffle each mapper writes a single partitioned
 file. This allows a particular reducer to request a specific portion of
 each mapper's single output file. In more detail, the mapper first fills up
 an internal buffer in memory and continually spills the contents of the
 buffer to disk, then finally merges all the spilled files together to form
 one final output file. This places much less stress on the file system and
 requires much fewer I/O operations especially on the read side.

 -Andrew



 2015-08-16 11:08 GMT-07:00 Muhammad Haseeb Javed 
 11besemja...@seecs.edu.pk:

 I did check it out and although I did get a general understanding of the
 various classes used to implement Sort and Hash shuffles, however these
 slides lack details as to how they are implemented and why sort generally
 has better performance than hash

 On Sun, Aug 16, 2015 at 4:31 AM, Ravi Kiran ravikiranmag...@gmail.com
 wrote:

 Have a look at this presentation.
 http://www.slideshare.net/colorant/spark-shuffle-introduction . Can be
 of help to you.

 On Sat, Aug 15, 2015 at 1:42 PM, Muhammad Haseeb Javed 
 11besemja...@seecs.edu.pk wrote:

 What are the major differences between how Sort based and Hash based
 shuffle operate and what is it that cause Sort Shuffle to perform better
 than Hash?
 Any talks that discuss both shuffles in detail, how they are
 implemented and the performance gains ?







Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-08-19 Thread Igor Berman
any differences in number of cores, memory settings for executors?


On 19 August 2015 at 09:49, Rick Moritz rah...@gmail.com wrote:

 Dear list,

 I am observing a very strange difference in behaviour between a Spark
 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin
 interpreter (compiled with Java 6 and sourced from maven central).

 The workflow loads data from Hive, applies a number of transformations
 (including quite a lot of shuffle operations) and then presents an enriched
 dataset. The code (an resulting DAGs) are identical in each case.

 The following particularities are noted:
 Importing the HiveRDD and caching it yields identical results on both
 platforms.
 Applying case classes, leads to a 2-2.5MB increase in dataset size per
 partition (excepting empty partitions).

 Writing shuffles shows this much more significant result:

 Zeppelin:
 *Total Time Across All Tasks: * 2,6 min
 *Input Size / Records: * 2.4 GB / 7314771
 *Shuffle Write: * 673.5 MB / 7314771

 vs

 Spark-shell:
 *Total Time Across All Tasks: * 28 min
 *Input Size / Records: * 3.6 GB / 7314771
 *Shuffle Write: * 9.0 GB / 7314771

 This is one of the early stages, which reads from a cached partition and
 then feeds into a join-stage. The latter stages show similar behaviour in
 producing excessive shuffle spills.

 Quite often the excessive shuffle volume will lead to massive shuffle
 spills which ultimately kill not only performance, but the actual executors
 as well.

 I have examined the Environment tab in the SParkUI and identified no
 notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling
 mode. I fail to see how this would impact shuffle writes in such a drastic
 way, since it should be on the inter-job level, while this happens at the
 inter-stage level.

 I was somewhat supicious of maybe compression or serialization playing a
 role, but the SparkConf points to those being set to the default. Also
 Zeppelin's interpreter adds no relevant additional default parameters.
 I performed a diff between rc4 (which was later released) and 1.4.0 and as
 expected there were no differences, besides a single class (remarkably, a
 shuffle-relevant class:
 /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
 differing in its binary representation due to being compiled with Java 7
 instead of Java 6. The decompiled sources of those two are again identical.

 I may attempt as a next step to simply replace that file in the packaged
 jar, to ascertain that indeed there is no difference between the two
 versions, but would consider this to be a major bg, if a simple compiler
 change leads to this kind of issue.

 I a also open for any other ideas, in particular to verify that the same
 compression/serialization is indeed happening, and regarding ways to
 determin what exactly is written into these shuffles -- currently I only
 know that the tuples are bigger (or smaller) than they ought to be. The
 Zeppelin-obtained results do appear to be consistent at least, thus the
 suspicion is, that there is an issue with the process launched from
 spark-shell. I will also attempt to build a spark job and spark-submit it
 using different spark-binaries to further explore the issue.

 Best Regards,

 Rick Moritz

 PS: I already tried to send this mail yesterday, but it never made it onto
 the list, as far as I can tell -- I apologize should anyone receive this as
 a second copy.




Re: Spark return key value pair

2015-08-19 Thread Dawid Wysakowicz
I am not 100% sure but probably flatMap unwinds the tuples. Try with map
instead.

2015-08-19 13:10 GMT+02:00 Jerry OELoo oylje...@gmail.com:

 Hi.
 I want to parse a file and return a key-value pair with pySpark, but
 result is strange to me.
 the test.sql is a big fie and each line is usename and password, with
 # between them, I use below mapper2 to map data, and in my
 understanding, i in words.take(10) should be a tuple, but the result
 is that i is username or password, this is strange for me to
 understand, Thanks for you help.

 def mapper2(line):

 words = line.split('#')
 return (words[0].strip(), words[1].strip())

 def main2(sc):

 lines = sc.textFile(hdfs://master:9000/spark/test.sql)
 words = lines.flatMap(mapper2)

 for i in words.take(10):
 msg = i + : + \n


 --
 Rejoice,I Desire!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to automatically relaunch a Driver program after crashes?

2015-08-19 Thread William Briggs
When submitting to YARN, you can specify two different operation modes for
the driver with the --master parameter: yarn-client or yarn-cluster. For
more information on submitting to YARN, see this page in the Spark docs:
http://spark.apache.org/docs/latest/running-on-yarn.html

yarn-cluster mode will run the driver inside of the Application Master,
which will be retried on failure. The number of retries is dependent on the
yarn.resourcemanager.am.max-attempts configuration setting for the YARN
ResourceManager.

Regards,
Will

On Wed, Aug 19, 2015 at 2:55 AM, Spark Enthusiast sparkenthusi...@yahoo.in
wrote:

 Folks,

 As I see, the Driver program is a single point of failure. Now, I have
 seen ways as to how to make it recover from failures on a restart (using
 Checkpointing) but I have not seen anything as to how to restart it
 automatically if it crashes.

 Will running the Driver as a Hadoop Yarn Application do it? Can someone
 educate me as to how?



Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-08-19 Thread Igor Berman
i would compare spark ui metrics for both cases and see any
differences(number of partitions, number of spills etc)
why can't you make repl to be consistent with zepellin spark version?
 might be rc has issues...




On 19 August 2015 at 14:42, Rick Moritz rah...@gmail.com wrote:

 No, the setup is one driver with 32g of memory, and three executors each
 with 8g of memory in both cases. No core-number has been specified, thus it
 should default to single-core (though I've seen the yarn-owned jvms
 wrapping the executors take up to 3 cores in top). That is, unless, as I
 suggested, there are different defaults for the two means of job submission
 that come into play in a non-transparent fashion (i.e. not visible in
 SparkConf).

 On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman igor.ber...@gmail.com
 wrote:

 any differences in number of cores, memory settings for executors?


 On 19 August 2015 at 09:49, Rick Moritz rah...@gmail.com wrote:

 Dear list,

 I am observing a very strange difference in behaviour between a Spark
 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin
 interpreter (compiled with Java 6 and sourced from maven central).

 The workflow loads data from Hive, applies a number of transformations
 (including quite a lot of shuffle operations) and then presents an enriched
 dataset. The code (an resulting DAGs) are identical in each case.

 The following particularities are noted:
 Importing the HiveRDD and caching it yields identical results on both
 platforms.
 Applying case classes, leads to a 2-2.5MB increase in dataset size per
 partition (excepting empty partitions).

 Writing shuffles shows this much more significant result:

 Zeppelin:
 *Total Time Across All Tasks: * 2,6 min
 *Input Size / Records: * 2.4 GB / 7314771
 *Shuffle Write: * 673.5 MB / 7314771

 vs

 Spark-shell:
 *Total Time Across All Tasks: * 28 min
 *Input Size / Records: * 3.6 GB / 7314771
 *Shuffle Write: * 9.0 GB / 7314771

 This is one of the early stages, which reads from a cached partition and
 then feeds into a join-stage. The latter stages show similar behaviour in
 producing excessive shuffle spills.

 Quite often the excessive shuffle volume will lead to massive shuffle
 spills which ultimately kill not only performance, but the actual executors
 as well.

 I have examined the Environment tab in the SParkUI and identified no
 notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling
 mode. I fail to see how this would impact shuffle writes in such a drastic
 way, since it should be on the inter-job level, while this happens at the
 inter-stage level.

 I was somewhat supicious of maybe compression or serialization playing a
 role, but the SparkConf points to those being set to the default. Also
 Zeppelin's interpreter adds no relevant additional default parameters.
 I performed a diff between rc4 (which was later released) and 1.4.0 and
 as expected there were no differences, besides a single class (remarkably,
 a shuffle-relevant class:
 /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
 differing in its binary representation due to being compiled with Java 7
 instead of Java 6. The decompiled sources of those two are again identical.

 I may attempt as a next step to simply replace that file in the packaged
 jar, to ascertain that indeed there is no difference between the two
 versions, but would consider this to be a major bg, if a simple compiler
 change leads to this kind of issue.

 I a also open for any other ideas, in particular to verify that the same
 compression/serialization is indeed happening, and regarding ways to
 determin what exactly is written into these shuffles -- currently I only
 know that the tuples are bigger (or smaller) than they ought to be. The
 Zeppelin-obtained results do appear to be consistent at least, thus the
 suspicion is, that there is an issue with the process launched from
 spark-shell. I will also attempt to build a spark job and spark-submit it
 using different spark-binaries to further explore the issue.

 Best Regards,

 Rick Moritz

 PS: I already tried to send this mail yesterday, but it never made it
 onto the list, as far as I can tell -- I apologize should anyone receive
 this as a second copy.






Fwd: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-08-19 Thread Rick Moritz
oops, forgot to reply-all on this thread.
-- Forwarded message --
From: Rick Moritz rah...@gmail.com
Date: Wed, Aug 19, 2015 at 2:46 PM
Subject: Re: Strange shuffle behaviour difference between Zeppelin and
Spark-shell
To: Igor Berman igor.ber...@gmail.com


Those values are not explicitely set, and attempting to read their values
results in 'java.util.NoSuchElementException: spark.shuffle.spill.compress'.
What I mean by the volume per element being larger is illustrated in my
original post: for each case the number of elements is identical, but the
volume of data required to obtain/manage these elements is many times
greater.

The only difference used to be that Zeppelin had FAIR scheduling over FIFO
scheduling for spark-shell. I just verified that spark-shell with FAIR
scheduling makes no difference. The only other difference in the
environment lies in some class-path variables which should only affect
method availability, not actual usage.

Another fact to note: Spark assembly (1.4.0-rc4) was built with provided
hadoop dependencies (build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0
-Phadoop-provided -Phive -Phive-thriftserver -Psparkr -DskipTests clean
package) for 2.6.0 from Hortonworks, while Zeppelin was built with
dependencies against 2.6.0 from Maven central.

On Wed, Aug 19, 2015 at 2:08 PM, Igor Berman igor.ber...@gmail.com wrote:

 so what your case for version differences?
 what do u mean by  in spark-shell the volume per element is much larger
 can you verify that configuration in spark ui (under Environment tab is
 same).
 if you suspect compression than check following properties:
 spark.shuffle.compress
 spark.shuffle.spill.compress
 spark.io.compression.codec
 spark.rdd.compress



 On 19 August 2015 at 15:03, Rick Moritz rah...@gmail.com wrote:

 Number of partitions and even size look relatively similar - except in
 spark-shell the volume per element is much larger, especially in later
 stages. That's when shuffles start to spill. Zeppelin creates almost no
 spills at all. The number of elements per partition are the same for both
 setups, but with very different data volume in/out. Almost as though
 compression was used in one case, and not in another, or as though
 shuffling is somehow less specific, and more nodes get data that they
 ultimately don't process at all. The same shuffling algorithm appears to be
 at work in each case, if the partitioning of the number of elements is
 anything to go by.

 On Wed, Aug 19, 2015 at 1:58 PM, Igor Berman igor.ber...@gmail.com
 wrote:

 i would compare spark ui metrics for both cases and see any
 differences(number of partitions, number of spills etc)
 why can't you make repl to be consistent with zepellin spark version?
  might be rc has issues...




 On 19 August 2015 at 14:42, Rick Moritz rah...@gmail.com wrote:

 No, the setup is one driver with 32g of memory, and three executors
 each with 8g of memory in both cases. No core-number has been specified,
 thus it should default to single-core (though I've seen the yarn-owned jvms
 wrapping the executors take up to 3 cores in top). That is, unless, as I
 suggested, there are different defaults for the two means of job submission
 that come into play in a non-transparent fashion (i.e. not visible in
 SparkConf).

 On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman igor.ber...@gmail.com
 wrote:

 any differences in number of cores, memory settings for executors?


 On 19 August 2015 at 09:49, Rick Moritz rah...@gmail.com wrote:

 Dear list,

 I am observing a very strange difference in behaviour between a Spark
 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin
 interpreter (compiled with Java 6 and sourced from maven central).

 The workflow loads data from Hive, applies a number of
 transformations (including quite a lot of shuffle operations) and then
 presents an enriched dataset. The code (an resulting DAGs) are identical 
 in
 each case.

 The following particularities are noted:
 Importing the HiveRDD and caching it yields identical results on both
 platforms.
 Applying case classes, leads to a 2-2.5MB increase in dataset size
 per partition (excepting empty partitions).

 Writing shuffles shows this much more significant result:

 Zeppelin:
 *Total Time Across All Tasks: * 2,6 min
 *Input Size / Records: * 2.4 GB / 7314771
 *Shuffle Write: * 673.5 MB / 7314771

 vs

 Spark-shell:
 *Total Time Across All Tasks: * 28 min
 *Input Size / Records: * 3.6 GB / 7314771
 *Shuffle Write: * 9.0 GB / 7314771

 This is one of the early stages, which reads from a cached partition
 and then feeds into a join-stage. The latter stages show similar 
 behaviour
 in producing excessive shuffle spills.

 Quite often the excessive shuffle volume will lead to massive shuffle
 spills which ultimately kill not only performance, but the actual 
 executors
 as well.

 I have examined the Environment tab in the SParkUI and identified no
 notable difference besides FAIR 

RE: Failed to fetch block error

2015-08-19 Thread java8964
From the log, it looks like the OS user who is running spark cannot open any 
more file.
Check your ulimit setting for that user:
ulimit -aopen files  (-n) 65536

 Date: Tue, 18 Aug 2015 22:06:04 -0700
 From: swethakasire...@gmail.com
 To: user@spark.apache.org
 Subject: Failed to fetch block  error
 
 Hi,
 
 I see the following error in my Spark Job even after using like 100 cores
 and 16G memory. Did any of you experience the same problem earlier?
 
 15/08/18 21:51:23 ERROR shuffle.RetryingBlockFetcher: Failed to fetch block
 input-0-1439959114400, and will not retry (0 retries)
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /data1/spark/spark-aed30958-2ee1-4eb7-984e-6402fb0a0503/blockmgr-ded36b52-ccc7-48dc-ba05-65bb21fc4136/34/input-0-1439959114400
 (Too many open files)
   at java.io.RandomAccessFile.open(Native Method)
   at java.io.RandomAccessFile.init(RandomAccessFile.java:241)
   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110)
   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
   at 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511)
   at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
   at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-fetch-block-error-tp24335.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-08-19 Thread Rick Moritz
No, the setup is one driver with 32g of memory, and three executors each
with 8g of memory in both cases. No core-number has been specified, thus it
should default to single-core (though I've seen the yarn-owned jvms
wrapping the executors take up to 3 cores in top). That is, unless, as I
suggested, there are different defaults for the two means of job submission
that come into play in a non-transparent fashion (i.e. not visible in
SparkConf).

On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman igor.ber...@gmail.com wrote:

 any differences in number of cores, memory settings for executors?


 On 19 August 2015 at 09:49, Rick Moritz rah...@gmail.com wrote:

 Dear list,

 I am observing a very strange difference in behaviour between a Spark
 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin
 interpreter (compiled with Java 6 and sourced from maven central).

 The workflow loads data from Hive, applies a number of transformations
 (including quite a lot of shuffle operations) and then presents an enriched
 dataset. The code (an resulting DAGs) are identical in each case.

 The following particularities are noted:
 Importing the HiveRDD and caching it yields identical results on both
 platforms.
 Applying case classes, leads to a 2-2.5MB increase in dataset size per
 partition (excepting empty partitions).

 Writing shuffles shows this much more significant result:

 Zeppelin:
 *Total Time Across All Tasks: * 2,6 min
 *Input Size / Records: * 2.4 GB / 7314771
 *Shuffle Write: * 673.5 MB / 7314771

 vs

 Spark-shell:
 *Total Time Across All Tasks: * 28 min
 *Input Size / Records: * 3.6 GB / 7314771
 *Shuffle Write: * 9.0 GB / 7314771

 This is one of the early stages, which reads from a cached partition and
 then feeds into a join-stage. The latter stages show similar behaviour in
 producing excessive shuffle spills.

 Quite often the excessive shuffle volume will lead to massive shuffle
 spills which ultimately kill not only performance, but the actual executors
 as well.

 I have examined the Environment tab in the SParkUI and identified no
 notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling
 mode. I fail to see how this would impact shuffle writes in such a drastic
 way, since it should be on the inter-job level, while this happens at the
 inter-stage level.

 I was somewhat supicious of maybe compression or serialization playing a
 role, but the SparkConf points to those being set to the default. Also
 Zeppelin's interpreter adds no relevant additional default parameters.
 I performed a diff between rc4 (which was later released) and 1.4.0 and
 as expected there were no differences, besides a single class (remarkably,
 a shuffle-relevant class:
 /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
 differing in its binary representation due to being compiled with Java 7
 instead of Java 6. The decompiled sources of those two are again identical.

 I may attempt as a next step to simply replace that file in the packaged
 jar, to ascertain that indeed there is no difference between the two
 versions, but would consider this to be a major bg, if a simple compiler
 change leads to this kind of issue.

 I a also open for any other ideas, in particular to verify that the same
 compression/serialization is indeed happening, and regarding ways to
 determin what exactly is written into these shuffles -- currently I only
 know that the tuples are bigger (or smaller) than they ought to be. The
 Zeppelin-obtained results do appear to be consistent at least, thus the
 suspicion is, that there is an issue with the process launched from
 spark-shell. I will also attempt to build a spark job and spark-submit it
 using different spark-binaries to further explore the issue.

 Best Regards,

 Rick Moritz

 PS: I already tried to send this mail yesterday, but it never made it
 onto the list, as far as I can tell -- I apologize should anyone receive
 this as a second copy.





blogs/articles/videos on how to analyse spark performance

2015-08-19 Thread Todd
Hi,
I would ask if there are some blogs/articles/videos on how to analyse spark 
performance during runtime,eg, tools that can be used or something related.


Re: how do I execute a job on a single worker node in standalone mode

2015-08-19 Thread Axel Dahl
That worked great, thanks Andrew.

On Tue, Aug 18, 2015 at 1:39 PM, Andrew Or and...@databricks.com wrote:

 Hi Axel,

 You can try setting `spark.deploy.spreadOut` to false (through your
 conf/spark-defaults.conf file). What this does is essentially try to
 schedule as many cores on one worker as possible before spilling over to
 other workers. Note that you *must* restart the cluster through the sbin
 scripts.

 For more information see:
 http://spark.apache.org/docs/latest/spark-standalone.html.

 Feel free to let me know whether it works,
 -Andrew


 2015-08-18 4:49 GMT-07:00 Igor Berman igor.ber...@gmail.com:

 by default standalone creates 1 executor on every worker machine per
 application
 number of overall cores is configured with --total-executor-cores
 so in general if you'll specify --total-executor-cores=1 then there would
 be only 1 core on some executor and you'll get what you want

 on the other hand, if you application needs all cores of your cluster and
 only some specific job should run on single executor there are few methods
 to achieve this
 e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition


 On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote:

 I have a 4 node cluster and have been playing around with the
 num-executors parameters, executor-memory and executor-cores

 I set the following:
 --executor-memory=10G
 --num-executors=1
 --executor-cores=8

 But when I run the job, I see that each worker, is running one executor
 which has  2 cores and 2.5G memory.

 What I'd like to do instead is have Spark just allocate the job to a
 single worker node?

 Is that possible in standalone mode or do I need a job/resource
 scheduler like Yarn to do that?

 Thanks in advance,

 -Axel







Re: SQLContext Create Table Problem

2015-08-19 Thread Yin Huai
Can you try to use HiveContext instead of SQLContext? Your query is trying
to create a table and persist the metadata of the table in metastore, which
is only supported by HiveContext.

On Wed, Aug 19, 2015 at 8:44 AM, Yusuf Can Gürkan yu...@useinsider.com
wrote:

 Hello,

 I’m trying to create a table with sqlContext.sql method as below:

 *val sc = new SparkContext()*
 *val sqlContext = new SQLContext(sc)*

 *import sqlContext.implicits._*

 *sqlContext.sql(s*
 *create table if not exists landing (*
 *date string,*
 *referrer string*
 *)*
 *partitioned by (partnerid string,dt string)*
 *row format delimited fields terminated by '\t' lines terminated by '\n'*
 *STORED AS TEXTFILE LOCATION 's3n://...'*
 *  ”)*


 It gives error on spark-submit:

 *Exception in thread main java.lang.RuntimeException: [2.1] failure:
 ``with'' expected but identifier create found*

 *create external table if not exists landing (*

 *^*
 * at scala.sys.package$.error(package.scala:27)*
 * at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)*
 * at
 org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)*



 What can be the reason??



Spark Streaming: Some issues (Could not compute split, block —— not found) and questions

2015-08-19 Thread jlg
Some background on what we're trying to do:

We have four Kinesis receivers with varying amounts of data coming through
them. Ultimately we work on a unioned stream that is getting about 11
MB/second of data. We use a batch size of 5 seconds. 

We create four distinct DStreams from this data that have different
aggregation computations (various combinations of
map/flatMap/reduceByKeyAndWindow and then finishing by serializing the
records to JSON strings and writing them to S3). We want to do 30 minute
windows of computations on this data, to get a better compression rate for
the aggregates (there are a lot of repeated keys across this time frame, and
we want to combine them all -- we do this using reduceByKeyAndWindow). 

But even when trying to do 5 minute windows, we have issues with Could not
compute split, block —— not found. This is being run on a YARN cluster and
it seems like the executors are getting killed even though they should have
plenty of memory. 

Also, it seems like no computation actually takes place until the end of the
window duration. This seems inefficient if there is a lot of data that you
know is going to be needed for the computation. Is there any good way around
this?

There are some of the configuration settings we are using for Spark:

spark.executor.memory=26000M,\
spark.executor.cores=4,\
spark.executor.instances=5,\
spark.driver.cores=4,\
spark.driver.memory=24000M,\
spark.default.parallelism=128,\
spark.streaming.blockInterval=100ms,\
spark.streaming.receiver.maxRate=2,\
spark.akka.timeout=300,\
spark.storage.memoryFraction=0.6,\
spark.rdd.compress=true,\
spark.executor.instances=16,\
spark.serializer=org.apache.spark.serializer.KryoSerializer,\
spark.kryoserializer.buffer.max=2047m,\


Is this the correct way to do this, and how can I further debug to figure
out this issue? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Some-issues-Could-not-compute-split-block-not-found-and-questions-tp24342.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQLContext Create Table Problem

2015-08-19 Thread Yusuf Can Gürkan
Hey Yin,

Thanks for answer. I thought that this could be problem but i can not create 
HiveContext because i can not import org.apache.spark.sql.hive.HiveContext. It 
does not see this package. 

I read that i should build spark with -PHive but i’m running on Amazon EMR 
1.4.1 and on spark-shell i can import hive package but can not do the same on 
spark-submit. Do you have any idea why? Because if it’s related to build with 
-PHive, how can i import it in spark-shell?

 On 19 Aug 2015, at 18:59, Yin Huai yh...@databricks.com wrote:
 
 Can you try to use HiveContext instead of SQLContext? Your query is trying to 
 create a table and persist the metadata of the table in metastore, which is 
 only supported by HiveContext.
 
 On Wed, Aug 19, 2015 at 8:44 AM, Yusuf Can Gürkan yu...@useinsider.com 
 mailto:yu...@useinsider.com wrote:
 Hello,
 
 I’m trying to create a table with sqlContext.sql method as below:
 
 val sc = new SparkContext()
 val sqlContext = new SQLContext(sc)
 
 import sqlContext.implicits._
 
 sqlContext.sql(s
 create table if not exists landing (
 date string,
 referrer string
 )
 partitioned by (partnerid string,dt string)
 row format delimited fields terminated by '\t' lines terminated by '\n'
 STORED AS TEXTFILE LOCATION 's3n://...' 
   ”)
 
 
 It gives error on spark-submit:
 
 Exception in thread main java.lang.RuntimeException: [2.1] failure: 
 ``with'' expected but identifier create found
 
 create external table if not exists landing (
 
 ^
   at scala.sys.package$.error(package.scala:27)
   at 
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
   at 
 org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
 
 
 
 What can be the reason??
 



How to overwrite partition when writing Parquet?

2015-08-19 Thread Romi Kuntsman
Hello,

I have a DataFrame, with a date column which I want to use as a partition.
Each day I want to write the data for the same date in Parquet, and then
read a dataframe for a date range.

I'm using:
myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir);

If I use SaveMode.Append, then writing data for the same partition adds the
same data there again.
If I use SaveMode.Overwrite, then writing data for a single partition
removes all the data for all partitions.

How can I overwrite only a given partition or manually remove a partition
before writing?

Many thanks!
Romi K.


Re: blogs/articles/videos on how to analyse spark performance

2015-08-19 Thread Gourav Sengupta
Excellent resource: http://www.oreilly.com/pub/e/3330

And more amazing is the fact that the presenter actually responds to your
questions.

Regards,
Gourav Sengupta

On Wed, Aug 19, 2015 at 4:12 PM, Todd bit1...@163.com wrote:

 Hi,
 I would ask if there are some blogs/articles/videos on how to analyse
 spark performance during runtime,eg, tools that can be used or something
 related.



Re: how do I execute a job on a single worker node in standalone mode

2015-08-19 Thread Axel Dahl
hmm maybe I spoke too soon.

I have an apache zeppelin instance running and have configured it to use 48
cores (each node only has 16 cores), so I figured by setting it to 48,
would mean that spark would grab 3 nodes.  what happens instead though is
that spark, reports that 48 cores are being used, but only executes
everything on 1 node, it looks like it's not grabbing the extra nodes.

On Wed, Aug 19, 2015 at 8:43 AM, Axel Dahl a...@whisperstream.com wrote:

 That worked great, thanks Andrew.

 On Tue, Aug 18, 2015 at 1:39 PM, Andrew Or and...@databricks.com wrote:

 Hi Axel,

 You can try setting `spark.deploy.spreadOut` to false (through your
 conf/spark-defaults.conf file). What this does is essentially try to
 schedule as many cores on one worker as possible before spilling over to
 other workers. Note that you *must* restart the cluster through the sbin
 scripts.

 For more information see:
 http://spark.apache.org/docs/latest/spark-standalone.html.

 Feel free to let me know whether it works,
 -Andrew


 2015-08-18 4:49 GMT-07:00 Igor Berman igor.ber...@gmail.com:

 by default standalone creates 1 executor on every worker machine per
 application
 number of overall cores is configured with --total-executor-cores
 so in general if you'll specify --total-executor-cores=1 then there
 would be only 1 core on some executor and you'll get what you want

 on the other hand, if you application needs all cores of your cluster
 and only some specific job should run on single executor there are few
 methods to achieve this
 e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition


 On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote:

 I have a 4 node cluster and have been playing around with the
 num-executors parameters, executor-memory and executor-cores

 I set the following:
 --executor-memory=10G
 --num-executors=1
 --executor-cores=8

 But when I run the job, I see that each worker, is running one executor
 which has  2 cores and 2.5G memory.

 What I'd like to do instead is have Spark just allocate the job to a
 single worker node?

 Is that possible in standalone mode or do I need a job/resource
 scheduler like Yarn to do that?

 Thanks in advance,

 -Axel








Re: blogs/articles/videos on how to analyse spark performance

2015-08-19 Thread Igor Berman
you don't need to register, search in youtube for this video...

On 19 August 2015 at 18:34, Gourav Sengupta gourav.sengu...@gmail.com
wrote:

 Excellent resource: http://www.oreilly.com/pub/e/3330

 And more amazing is the fact that the presenter actually responds to your
 questions.

 Regards,
 Gourav Sengupta

 On Wed, Aug 19, 2015 at 4:12 PM, Todd bit1...@163.com wrote:

 Hi,
 I would ask if there are some blogs/articles/videos on how to analyse
 spark performance during runtime,eg, tools that can be used or something
 related.





Re: Too many files/dirs in hdfs

2015-08-19 Thread Mohit Anchlia
My question was how to do this in Hadoop? Could somebody point me to some
examples?

On Tue, Aug 18, 2015 at 10:43 PM, UMESH CHAUDHARY umesh9...@gmail.com
wrote:

 Of course, Java or Scala can do that:
 1) Create a FileWriter with append or roll over option
 2) For each RDD create a StringBuilder after applying your filters
 3) Write this StringBuilder to File when you want to write (The duration
 can be defined as a condition)

 On Tue, Aug 18, 2015 at 11:05 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 Is there a way to store all the results in one file and keep the file
 roll over separate than the spark streaming batch interval?

 On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com
 wrote:

 In Spark Streaming you can simply check whether your RDD contains any
 records or not and if records are there you can save them using
 FIleOutputStream:

 DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE
 YOUR STUFF} };

 This will not create unnecessary files of 0 bytes.

 On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Currently, spark streaming would create a new directory for every batch
 and store the data to it (whether it has anything or not). There is no
 direct append call as of now, but you can achieve this either with
 FileUtil.copyMerge
 http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167
 or have a separate program which will do the clean up for you.

 Thanks
 Best Regards

 On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 Spark stream seems to be creating 0 bytes files even when there is no
 data. Also, I have 2 concerns here:

 1) Extra unnecessary files is being created from the output
 2) Hadoop doesn't work really well with too many files and I see that
 it is creating a directory with a timestamp every 1 second. Is there a
 better way of writing a file, may be use some kind of append mechanism
 where one doesn't have to change the batch interval.








SQLContext load. Filtering files

2015-08-19 Thread Masf
Hi.

I'd like to read Avro files using this library
https://github.com/databricks/spark-avro

I need to load several files from a folder, not all files. Is there some
functionality to filter the files to load?

And... Is is possible to know the name of the files loaded from a folder?

My problem is that I have a folder where an external process is inserting
files every X minutes and I need process these files once, and I can't
move, rename or copy the source files.


Thanks
-- 

Regards
Miguel Ángel


Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-19 Thread Marcin Kuthan
I'm glad that I could help :)
19 sie 2015 8:52 AM Shenghua(Daniel) Wan wansheng...@gmail.com
napisał(a):

 +1

 I wish I have read this blog earlier. I am using Java and have just
 implemented a singleton producer per executor/JVM during the day.
 Yes, I did see that NonSerializableException when I was debugging the code
 ...

 Thanks for sharing.

 On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das t...@databricks.com
 wrote:

 Its a cool blog post! Tweeted it!
 Broadcasting the configuration necessary for lazily instantiating the
 producer is a good idea.

 Nitpick: The first code example has an extra `}` ;)

 On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com
 wrote:

 As long as Kafka producent is thread-safe you don't need any pool at
 all. Just share single producer on every executor. Please look at my blog
 post for more details. http://allegro.tech/spark-kafka-integration.html
 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com
 napisał(a):

 All of you are right.

 I was trying to create too many producers. My idea was to create a
 pool(for now the pool contains only one producer) shared by all the
 executors.
 After I realized it was related to the serializable issues (though I
 did not find clear clues in the source code to indicate the broacast
 template type parameter must be implement serializable),  I followed spark
 cassandra connector design and created a singleton of Kafka producer pools.
 There is not exception noticed.

 Thanks for all your comments.


 On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Why are you even trying to broadcast a producer? A broadcast variable
 is some immutable piece of serializable DATA that can be used for
 processing on the executors. A Kafka producer is neither DATA nor
 immutable, and definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like
 this in the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get
 the wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 

Re: Spark Streaming failing on YARN Cluster

2015-08-19 Thread Ramkumar V
I'm getting some spark exception. Please look this log trace (
*http://pastebin.com/xL9jaRUa
http://pastebin.com/xL9jaRUa* ).

*Thanks*,
https://in.linkedin.com/in/ramkumarcs31


On Wed, Aug 19, 2015 at 10:20 PM, Hari Shreedharan 
hshreedha...@cloudera.com wrote:

 It looks like you are having issues with the files getting distributed to
 the cluster. What is the exception you are getting now?


 On Wednesday, August 19, 2015, Ramkumar V ramkumar.c...@gmail.com wrote:

 Thanks a lot for your suggestion. I had modified HADOOP_CONF_DIR in
 spark-env.sh so that core-site.xml is under HADOOP_CONF_DIR. i can able
 to see the logs like that you had shown above. Now i can able to run for 3
 minutes and store results between every minutes. After sometimes, there is
 an exception. How to fix this exception ? and Can you please explain where
 its going wrong ?

 *Log Link : http://pastebin.com/xL9jaRUa http://pastebin.com/xL9jaRUa *


 *Thanks*,
 https://in.linkedin.com/in/ramkumarcs31


 On Wed, Aug 19, 2015 at 1:54 PM, Jeff Zhang zjf...@gmail.com wrote:

 HADOOP_CONF_DIR is the environment variable point to the hadoop conf
 directory.  Not sure how CDH organize that, make sure core-site.xml is
 under HADOOP_CONF_DIR.

 On Wed, Aug 19, 2015 at 4:06 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 We are using Cloudera-5.3.1. since it is one of the earlier version of
 CDH, it doesnt supports the latest version of spark. So i installed
 spark-1.4.1 separately in my machine. I couldnt able to do spark-submit in
 cluster mode. How to core-site.xml under classpath ? it will be very
 helpful if you could explain in detail to solve this issue.

 *Thanks*,
 https://in.linkedin.com/in/ramkumarcs31


 On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote:


1. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying

 file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar
2. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying

 file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar
3. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip
4. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip
5. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py
6.


1. diagnostics: Application application_1437639737006_3808 failed
2 times due to AM Container for appattempt_1437639737006_3808_02 
 exited
with  exitCode: -1000 due to: File
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist
2. .Failing this attempt.. Failing the application.



 The machine you run spark is the client machine, while the yarn AM is
 running on another machine. And the yarn AM complains that the files are
 not found as your logs shown.
 From the logs, its seems that these files are not copied to the HDFS
 as local resources. I doubt that you didn't put core-site.xml under your
 classpath, so that spark can not detect your remote file system and won't
 copy the files to hdfs as local resources. Usually in yarn-cluster mode,
 you should be able to see the logs like following.

  15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM
 container
  15/08/14 10:48:49 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
 - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/spark.py - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/pyspark.zip

 On Thu, Aug 13, 2015 at 2:50 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 Hi,

 I have a cluster of 1 master and 2 slaves. I'm running a spark
 streaming in master and I want to utilize all nodes in my cluster. i had
 specified some parameters like driver memory and executor memory in my
 code. when i give --deploy-mode cluster --master yarn-cluster in my
 spark-submit, it gives the following error.

 Log link : *http://pastebin.com/kfyVWDGR
 http://pastebin.com/kfyVWDGR*

 How to fix this issue ? Please help me if i'm doing wrong.


 *Thanks*,
 Ramkumar V




 --
 Best Regards

 Jeff Zhang





 --
 Best Regards

 Jeff Zhang




 --

 Thanks,
 Hari




Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-19 Thread Shushant Arora
will try scala.
Only Reason of not using scala is - never worked on that.

On Wed, Aug 19, 2015 at 7:34 PM, Cody Koeninger c...@koeninger.org wrote:

 Is there a reason not to just use scala?  It's not a lot of code... and
 it'll be even less code in scala ;)

 On Wed, Aug 19, 2015 at 4:14 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but
 OptionKafkaRDD[K, V, U, T, R]   is not subclass of OptionRDD[R];

 In scala C[T’] is a subclass of C[T] as per
 https://twitter.github.io/scala_school/type-basics.html
 but this is not allowed in java.

 So is there any workaround to achieve this in java for overriding 
 DirectKafkaInputDStream
 ?


 On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java
 generic inheritance is not supported so derived class cannot return
  different genric typed subclass from overriden method.

 On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Option is covariant and KafkaRDD is a subclass of RDD

 On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is it that in scala its allowed for derived class to have any return
 type ?

  And streaming jar is originally created in scala so its allowed for
 DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
 compute method ?

 On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 looking at source code of
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream

 override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T,
 R]] = {
 val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
 val rdd = KafkaRDD[K, V, U, T, R](
   context.sparkContext, kafkaParams, currentOffsets,
 untilOffsets, messageHandler)

 currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset)
 Some(rdd)
   }


 But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,

 So what should  be the return type of custom DStream extends
 DirectKafkaInputDStream .
 Since I want the behaviour to be same as of DirectKafkaInputDStream
  in normal scenarios and return none in specific scenario.

 And why the same error did not come while extending
 DirectKafkaInputDStream from InputDStream ? Since new return type 
 Option[KafkaRDD[K,
 V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
 failed?




 On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org
 wrote:

 The superclass method in DStream is defined as returning an
 Option[RDD[T]]

 On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Getting compilation error while overriding compute method of
 DirectKafkaInputDStream.


 [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
 compute(org.apache.spark.streaming.Time) in 
 CustomDirectKafkaInputDstream
 cannot override compute(org.apache.spark.streaming.Time) in
 org.apache.spark.streaming.dstream.DStream; attempting to use 
 incompatible
 return type

 [ERROR] found   :
 scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]

 [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][]


 class :

 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], 
 kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder,
 DefaultDecoder, byte[][] compute(
 Time validTime) {

 int processed=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }
 }


 What should be the return type of compute method ? super class is
 returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, 
 DefaultDecoder,
 byte[][]  but its expecting
  scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived  class 
 . Is
 there something wring with code?

 On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
  wrote:

 Look at the definitions of the java-specific
 KafkaUtils.createDirectStream methods (the ones that take a
 JavaStreamingContext)

 On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 How to create classtag in java ?Also Constructor
 of DirectKafkaInputDStream takes Function1 not Function but
 kafkautils.createDirectStream allows function.

 I have below as overriden DirectKafkaInputDStream.


 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], 
 kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 public CustomDirectKafkaInputDstream(
 StreamingContext ssc_,
 MapString, String kafkaParams,
 

PySpark on Mesos - Scaling

2015-08-19 Thread scox
I'm running a pyspark program on a Mesos cluster and seeing behavior where:
* The first three stages run with multiple (10-15) tasks.
* The fourth stage runs with only one task.
* It is using 10 cpus, which is 5 machines in this configuration
* It is very slow

I would like it to use more resources and more are available on the cluster.

I've tried setting spark.driver.cores and spark.executor.memory to no avail.

Can someone suggest (a) how I go about debugging why this is happening in
the first place (b) how to configure it to use more resources?

If this is answered elsewhere, I was unable to find it and would appreciate
a link.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-on-Mesos-Scaling-tp24347.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how do I execute a job on a single worker node in standalone mode

2015-08-19 Thread Andrew Or
Hi Axel, what spark version are you using? Also, what do your
configurations look like for the following?

- spark.cores.max (also --total-executor-cores)
- spark.executor.cores (also --executor-cores)


2015-08-19 9:27 GMT-07:00 Axel Dahl a...@whisperstream.com:

 hmm maybe I spoke too soon.

 I have an apache zeppelin instance running and have configured it to use
 48 cores (each node only has 16 cores), so I figured by setting it to 48,
 would mean that spark would grab 3 nodes.  what happens instead though is
 that spark, reports that 48 cores are being used, but only executes
 everything on 1 node, it looks like it's not grabbing the extra nodes.

 On Wed, Aug 19, 2015 at 8:43 AM, Axel Dahl a...@whisperstream.com wrote:

 That worked great, thanks Andrew.

 On Tue, Aug 18, 2015 at 1:39 PM, Andrew Or and...@databricks.com wrote:

 Hi Axel,

 You can try setting `spark.deploy.spreadOut` to false (through your
 conf/spark-defaults.conf file). What this does is essentially try to
 schedule as many cores on one worker as possible before spilling over to
 other workers. Note that you *must* restart the cluster through the sbin
 scripts.

 For more information see:
 http://spark.apache.org/docs/latest/spark-standalone.html.

 Feel free to let me know whether it works,
 -Andrew


 2015-08-18 4:49 GMT-07:00 Igor Berman igor.ber...@gmail.com:

 by default standalone creates 1 executor on every worker machine per
 application
 number of overall cores is configured with --total-executor-cores
 so in general if you'll specify --total-executor-cores=1 then there
 would be only 1 core on some executor and you'll get what you want

 on the other hand, if you application needs all cores of your cluster
 and only some specific job should run on single executor there are few
 methods to achieve this
 e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition


 On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote:

 I have a 4 node cluster and have been playing around with the
 num-executors parameters, executor-memory and executor-cores

 I set the following:
 --executor-memory=10G
 --num-executors=1
 --executor-cores=8

 But when I run the job, I see that each worker, is running one
 executor which has  2 cores and 2.5G memory.

 What I'd like to do instead is have Spark just allocate the job to a
 single worker node?

 Is that possible in standalone mode or do I need a job/resource
 scheduler like Yarn to do that?

 Thanks in advance,

 -Axel









Re: How to minimize shuffling on Spark dataframe Join?

2015-08-19 Thread Romi Kuntsman
If you create a PairRDD from the DataFrame, using
dataFrame.toRDD().mapToPair(), then you can call
partitionBy(someCustomPartitioner) which will partition the RDD by the key
(of the pair).
Then the operations on it (like joining with another RDD) will consider
this partitioning.
I'm not sure that DataFrames already support this.

On Wed, Aug 12, 2015 at 11:16 AM Abdullah Anwar 
abdullah.ibn.an...@gmail.com wrote:

 Hi Hemant,

 Thank you for your replay.

 I think source of my dataframe is not partitioned on key, its an avro
 file where 'id' is a field .. but I don't know how to read a file and at
 the same time configure partition key. I couldn't find  anything on
 SQLContext.read.load where you can set partition key. or in dataframe where
 you can set partition key. If it could partition the on the specified key
 .. will spark put the same partition range on same machine for two
 different dataframe??

What are the overall tips to join faster?

 Best Regards,
 Abdullah




 On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Is the source of your dataframe partitioned on key? As per your mail, it
 looks like it is not. If that is the case,  for partitioning the data, you
 will have to shuffle the data anyway.

 Another part of your question is - how to co-group data from two
 dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is
 a way. I am not sure if something similar is available for DataFrames.

 Hemant





 On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar 
 abdullah.ibn.an...@gmail.com wrote:



 I have two dataframes like this

   student_rdf = (studentid, name, ...)
   student_result_rdf = (studentid, gpa, ...)

 we need to join this two dataframes. we are now doing like this,

 student_rdf.join(student_result_rdf, student_result_rdf[studentid] == 
 student_rdf[studentid])

 So it is simple. But it creates lots of data shuffling across worker
 nodes, but as joining key is similar and if the dataframe could (understand
 the partitionkey) be partitioned using that key (studentid) then there
 suppose not to be any shuffling at all. As similar data (based on partition
 key) would reside in similar node. is it possible, to hint spark to do this?

 So, I am finding the way to partition data based on a column while I
 read a dataframe from input. And If it is possible that Spark would
 understand that two partitionkey of two dataframes are similar, then how?




 --
 Abdullah





 --
 Abdullah



Re: Spark return key value pair

2015-08-19 Thread Robin East
Dawid is right, if you did words.count it would be twice the number of input 
lines. You can use map like this:

words = lines.map(mapper2)

   for i in words.take(10):
   msg = i[0] + :” + i[1] + \n”

---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/malak/ http://www.manning.com/malak/


 On 19 Aug 2015, at 12:19, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote:
 
 I am not 100% sure but probably flatMap unwinds the tuples. Try with map 
 instead.
 
 2015-08-19 13:10 GMT+02:00 Jerry OELoo oylje...@gmail.com 
 mailto:oylje...@gmail.com:
 Hi.
 I want to parse a file and return a key-value pair with pySpark, but
 result is strange to me.
 the test.sql is a big fie and each line is usename and password, with
 # between them, I use below mapper2 to map data, and in my
 understanding, i in words.take(10) should be a tuple, but the result
 is that i is username or password, this is strange for me to
 understand, Thanks for you help.
 
 def mapper2(line):
 
 words = line.split('#')
 return (words[0].strip(), words[1].strip())
 
 def main2(sc):
 
 lines = sc.textFile(hdfs://master:9000/spark/test.sql)
 words = lines.flatMap(mapper2)
 
 for i in words.take(10):
 msg = i + : + \n
 
 
 --
 Rejoice,I Desire!
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



Re: Issues with S3 paths that contain colons

2015-08-19 Thread Romi Kuntsman
I had the exact same issue, and overcame it by overriding
NativeS3FileSystem with my own class, where I replaced the implementation
of globStatus. It's a hack but it works.
Then I set the hadoop config fs.myschema.impl to my class name, and
accessed the files through myschema:// instead of s3n://

@Override
public FileStatus[] globStatus(final Path pathPattern, final PathFilter filter)
throws IOException {
  final FileStatus[] statusList = super.listStatus(pathPattern);
  final ListFileStatus result = Lists.newLinkedList();
  for (FileStatus fileStatus : statusList) {
if (filter.accept(fileStatus.getPath())) {
  result.add(fileStatus);
}
  }
  return result.toArray(new FileStatus[] {});
}



On Wed, Aug 19, 2015 at 9:14 PM Steve Loughran ste...@hortonworks.com
wrote:

 you might want to think about filing a JIRA on issues.apache.org against
 HADOOP here, component being fs/s3. That doesn't mean it is fixable, only
 known.

 Every FS has its own set of forbidden characters  filenames; unix doesn't
 files named .; windows doesn't allow files called COM1, ..., so hitting
 some filesystem rule is sometimes a problem. Here, though, you've got the
 file in S3, the listing finds it, but other bits of the codepath are
 failing -which implies that it is something in the Hadoop libs.


  On 18 Aug 2015, at 08:20, Brian Stempin brian.stem...@gmail.com wrote:
 
  Hi,
  I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0).  I'm seeing
 the
  exception below when encountering file names that contain colons.  Any
 idea
  on how to get around this?
 
  scala val files = sc.textFile(s3a://redactedbucketname/*)
 
  2015-08-18 04:38:34,567 INFO  [main] storage.MemoryStore
  (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with
  curMem=669367, maxMem=285203496
 
  2015-08-18 04:38:34,568 INFO  [main] storage.MemoryStore
  (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in
 memory
  (estimated size 236.5 KB, free 271.1 MB)
 
  2015-08-18 04:38:34,663 INFO  [main] storage.MemoryStore
  (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with
  curMem=911591, maxMem=285203496
 
  2015-08-18 04:38:34,664 INFO  [main] storage.MemoryStore
  (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in
  memory (estimated size 21.0 KB, free 271.1 MB)
 
  2015-08-18 04:38:34,665 INFO
 [sparkDriver-akka.actor.default-dispatcher-19]
  storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added
  broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB,
 free:
  271.9 MB)
 
  2015-08-18 04:38:34,667 INFO  [main] spark.SparkContext
  (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at
  console:21
 
  files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at
 textFile at
  console:21
 
 
  scala files.count
 
  2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
  (S3AFileSystem.java:listStatus(533)) - List status for path:
  s3a://redactedbucketname/
 
  2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
  (S3AFileSystem.java:getFileStatus(684)) - Getting path status for
  s3a://redactedbucketname/ ()
 
  java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative
  path in absolute URI:
 
 [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv
 
  at org.apache.hadoop.fs.Path.initialize(Path.java:206)
 
  at org.apache.hadoop.fs.Path.init(Path.java:172)
 
  at org.apache.hadoop.fs.Path.init(Path.java:94)
 
  at org.apache.hadoop.fs.Globber.glob(Globber.java:240)
 
  at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700)
 
  at
 
 org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229)
 
  at
 
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200)
 
  at
 
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279)
 
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
 
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)
 
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)
 
  at scala.Option.getOrElse(Option.scala:120)
 
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 
  at
 
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)
 
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)
 
  at scala.Option.getOrElse(Option.scala:120)
 
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
 
  at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
 
  at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24)
 
  at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29)
 
  at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31)
 
  at $iwC$iwC$iwC$iwC$iwC.init(console:33)
 
  at $iwC$iwC$iwC$iwC.init(console:35)
 
  at $iwC$iwC$iwC.init(console:37)
 
  at 

Re: Why use spark.history.fs.logDirectory instead of spark.eventLog.dir

2015-08-19 Thread Andrew Or
Hi Canan,

The event log dir is a per-application setting whereas the history server
is an independent service that serves history UIs from many applications.
If you use history server locally then the `spark.history.fs.logDirectory`
will happen to point to `spark.eventLog.dir`, but the use case it provides
is broader than that.

-Andrew

2015-08-19 5:13 GMT-07:00 canan chen ccn...@gmail.com:

 Anyone know about this ? Or do I miss something here ?

 On Fri, Aug 7, 2015 at 4:20 PM, canan chen ccn...@gmail.com wrote:

 Is there any reason that historyserver use another property for the event
 log dir ? Thanks





Re: Difference between Sort based and Hash based shuffle

2015-08-19 Thread Andrew Or
Yes, in other words, a bucket is a single file in hash-based shuffle (no
consolidation), but a segment of partitioned file in sort-based shuffle.

2015-08-19 5:52 GMT-07:00 Muhammad Haseeb Javed 11besemja...@seecs.edu.pk:

 Thanks Andrew for a detailed response,

 So the reason why key value pairs with same keys are always found in a
 single buckets in Hash based shuffle but not in Sort is because in
 sort-shuffle each mapper writes a single partitioned file, and it is up to
 the reducer to fetch correct partitions from the the files ?

 On Wed, Aug 19, 2015 at 2:13 AM, Andrew Or and...@databricks.com wrote:

 Hi Muhammad,

 On a high level, in hash-based shuffle each mapper M writes R shuffle
 files, one for each reducer where R is the number of reduce partitions.
 This results in M * R shuffle files. Since it is not uncommon for M and R
 to be O(1000), this quickly becomes expensive. An optimization with
 hash-based shuffle is consolidation, where all mappers run in the same core
 C write one file per reducer, resulting in C * R files. This is a strict
 improvement, but it is still relatively expensive.

 Instead, in sort-based shuffle each mapper writes a single partitioned
 file. This allows a particular reducer to request a specific portion of
 each mapper's single output file. In more detail, the mapper first fills up
 an internal buffer in memory and continually spills the contents of the
 buffer to disk, then finally merges all the spilled files together to form
 one final output file. This places much less stress on the file system and
 requires much fewer I/O operations especially on the read side.

 -Andrew



 2015-08-16 11:08 GMT-07:00 Muhammad Haseeb Javed 
 11besemja...@seecs.edu.pk:

 I did check it out and although I did get a general understanding of the
 various classes used to implement Sort and Hash shuffles, however these
 slides lack details as to how they are implemented and why sort generally
 has better performance than hash

 On Sun, Aug 16, 2015 at 4:31 AM, Ravi Kiran ravikiranmag...@gmail.com
 wrote:

 Have a look at this presentation.
 http://www.slideshare.net/colorant/spark-shuffle-introduction . Can be
 of help to you.

 On Sat, Aug 15, 2015 at 1:42 PM, Muhammad Haseeb Javed 
 11besemja...@seecs.edu.pk wrote:

 What are the major differences between how Sort based and Hash based
 shuffle operate and what is it that cause Sort Shuffle to perform better
 than Hash?
 Any talks that discuss both shuffles in detail, how they are
 implemented and the performance gains ?








Re: issue Running Spark Job on Yarn Cluster

2015-08-19 Thread stark_summer
Please  look at more about  hadoop logs, such as  yarn logs -applicationId
xxx
attach more logs to this topic 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21779p24350.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to set the number of executors and tasks in a Spark Streaming job in Mesos

2015-08-19 Thread swetha
Hi,

How to set the number of executors and tasks in a Spark Streaming job in
Mesos? I have the following settings but my job still shows me 11 active
tasks and 11 executors. Any idea as to why this is happening
?

 sparkConf.set(spark.mesos.coarse, true)
  sparkConf.set(spark.cores.max, 128)
  sparkConf.set(spark.default.parallelism, 100)
  //sparkConf.set(spark.locality.wait, 0)
  sparkConf.set(spark.executor.memory, 32g)
  sparkConf.set(spark.streaming.unpersist, true)
  sparkConf.set(spark.shuffle.io.numConnectionsPerPeer, 1)
  sparkConf.set(spark.rdd.compress, true)
  sparkConf.set(spark.shuffle.memoryFraction, .6)
  sparkConf.set(spark.storage.memoryFraction, .2)
  sparkConf.set(spark.shuffle.spill, true)
  sparkConf.set(spark.shuffle.spill.compress, true)
  sparkConf.set(spark.streaming.receiver.writeAheadLog.enable, true)
  sparkConf.set(spark.streaming.blockInterval, 400)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-the-number-of-executors-and-tasks-in-a-Spark-Streaming-job-in-Mesos-tp24348.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why use spark.history.fs.logDirectory instead of spark.eventLog.dir

2015-08-19 Thread canan chen
Thanks Andrew, make sense.

On Thu, Aug 20, 2015 at 4:40 AM, Andrew Or and...@databricks.com wrote:

 Hi Canan,

 The event log dir is a per-application setting whereas the history server
 is an independent service that serves history UIs from many applications.
 If you use history server locally then the `spark.history.fs.logDirectory`
 will happen to point to `spark.eventLog.dir`, but the use case it provides
 is broader than that.

 -Andrew

 2015-08-19 5:13 GMT-07:00 canan chen ccn...@gmail.com:

 Anyone know about this ? Or do I miss something here ?

 On Fri, Aug 7, 2015 at 4:20 PM, canan chen ccn...@gmail.com wrote:

 Is there any reason that historyserver use another property for the
 event log dir ? Thanks






Re:Re: How to automatically relaunch a Driver program after crashes?

2015-08-19 Thread Todd


I think Yarn ResourceManager has the mechanism to relaunch the driver on 
failure. But I am uncertain.
Could someone help on this? Thanks.






At 2015-08-19 16:37:32, Spark Enthusiast sparkenthusi...@yahoo.in wrote:

Thanks for the reply.


Are Standalone or Mesos the only options? Is there a way to auto relaunch if 
driver runs as a Hadoop Yarn Application?





On Wednesday, 19 August 2015 12:49 PM, Todd bit1...@163.com wrote:




There is an option for the spark-submit (Spark standalone or Mesos with cluster 
deploy mode only)
  --supervise If given, restarts the driver on failure.







At 2015-08-19 14:55:39, Spark Enthusiast sparkenthusi...@yahoo.in wrote:

Folks,


As I see, the Driver program is a single point of failure. Now, I have seen 
ways as to how to make it recover from failures on a restart (using 
Checkpointing) but I have not seen anything as to how to restart it 
automatically if it crashes.


Will running the Driver as a Hadoop Yarn Application do it? Can someone educate 
me as to how?




Spark return key value pair

2015-08-19 Thread Jerry OELoo
Hi.
I want to parse a file and return a key-value pair with pySpark, but
result is strange to me.
the test.sql is a big fie and each line is usename and password, with
# between them, I use below mapper2 to map data, and in my
understanding, i in words.take(10) should be a tuple, but the result
is that i is username or password, this is strange for me to
understand, Thanks for you help.

def mapper2(line):

words = line.split('#')
return (words[0].strip(), words[1].strip())

def main2(sc):

lines = sc.textFile(hdfs://master:9000/spark/test.sql)
words = lines.flatMap(mapper2)

for i in words.take(10):
msg = i + : + \n


-- 
Rejoice,I Desire!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark UI returning error 500 in yarn-client mode

2015-08-19 Thread mosheeshel
Running Spark on YARN in yarn-client mode, everything appears to be working
just fine except I can't access Spark UI via the Application Master link in
YARN UI or directly at http://driverip:4040/jobs I get error 500, and the
driver log shows the error pasted below:

When running the same job using spark master mode, everything is fine.

Any help is appreciated Error Stacktrace as appears in driver:
WARN  [2015-08-12 14:05:46,856]
org.spark-project.jetty.server.AbstractHttpConnection: header full:
java.lang.RuntimeException: Header6144
WARN  [2015-08-12 14:05:46,857] org.spark-project.jetty.server.Response:
Committed before 500 null
WARN  [2015-08-12 14:05:46,862]
org.spark-project.jetty.server.AbstractHttpConnection:

RE: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Cheng, Hao
Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false.

BTW, which version are you using?

Hao

From: Jerrick Hoang [mailto:jerrickho...@gmail.com]
Sent: Thursday, August 20, 2015 12:16 PM
To: Philip Weaver
Cc: user
Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions

I guess the question is why does spark have to do partition discovery with all 
partitions when the query only needs to look at one partition? Is there a conf 
flag to turn this off?

On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
I've had the same problem. It turns out that Spark (specifically parquet) is 
very slow at partition discovery. It got better in 1.5 (not yet released), but 
was still unacceptably slow. Sadly, we ended up reading parquet files manually 
in Python (via C++) and had to abandon Spark SQL because of this problem.

On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote:
Hi all,

I did a simple experiment with Spark SQL. I created a partitioned parquet table 
with only one partition (date=20140701). A simple `select count(*) from table 
where date=20140701` would run very fast (0.1 seconds). However, as I added 
more partitions the query takes longer and longer. When I added about 10,000 
partitions, the query took way too long. I feel like querying for a single 
partition should not be affected by having more partitions. Is this a known 
behaviour? What does spark try to do here?

Thanks,
Jerrick




Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Jerrick Hoang
I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs
trying to speed up spark sql with tables with a huge number of partitions,
I've made sure that those CLs are included but it's still very slow

On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to
 false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I guess the question is why does spark have to do partition discovery with
 all partitions when the query only needs to look at one partition? Is there
 a conf flag to turn this off?



 On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I've had the same problem. It turns out that Spark (specifically parquet)
 is very slow at partition discovery. It got better in 1.5 (not yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.



 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Hi all,



 I did a simple experiment with Spark SQL. I created a partitioned parquet
 table with only one partition (date=20140701). A simple `select count(*)
 from table where date=20140701` would run very fast (0.1 seconds). However,
 as I added more partitions the query takes longer and longer. When I added
 about 10,000 partitions, the query took way too long. I feel like querying
 for a single partition should not be affected by having more partitions. Is
 this a known behaviour? What does spark try to do here?



 Thanks,

 Jerrick







creating data warehouse with Spark and running query with Hive

2015-08-19 Thread Jeetendra Gangele
HI All,

I have a data in HDFS partition with Year/month/data/event_type. And I am
creating a hive tables with this data, this data is in JSON so I am using
json serve and creating hive tables.
 below is the code
  val jsonFile =
hiveContext.read.json(hdfs://localhost:9000/housing/events_real/category=Impressions/date=1007465766/*)
jsonFile.toDF().printSchema()
jsonFile.write.saveAsTable(JsonFileTable)
jsonFile.toDF().printSchema()
val events = hiveContext.sql(SELECT category, uid FROM JsonFileTable)
events.map(e = Event:  + e).collect().foreach(println)

saveAstable  failing with Error saying MKDir failed to create the directory
 ,anybody has any idea?


RE: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Cheng, Hao
Can you make some more profiling? I am wondering if the driver is busy with 
scanning the HDFS / S3.
Like jstack pid of driver process

And also, it’s will be great if you can paste the physical plan for the simple 
query.

From: Jerrick Hoang [mailto:jerrickho...@gmail.com]
Sent: Thursday, August 20, 2015 1:46 PM
To: Cheng, Hao
Cc: Philip Weaver; user
Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions

I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs 
trying to speed up spark sql with tables with a huge number of partitions, I've 
made sure that those CLs are included but it's still very slow

On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false.

BTW, which version are you using?

Hao

From: Jerrick Hoang 
[mailto:jerrickho...@gmail.commailto:jerrickho...@gmail.com]
Sent: Thursday, August 20, 2015 12:16 PM
To: Philip Weaver
Cc: user
Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions

I guess the question is why does spark have to do partition discovery with all 
partitions when the query only needs to look at one partition? Is there a conf 
flag to turn this off?

On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
I've had the same problem. It turns out that Spark (specifically parquet) is 
very slow at partition discovery. It got better in 1.5 (not yet released), but 
was still unacceptably slow. Sadly, we ended up reading parquet files manually 
in Python (via C++) and had to abandon Spark SQL because of this problem.

On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote:
Hi all,

I did a simple experiment with Spark SQL. I created a partitioned parquet table 
with only one partition (date=20140701). A simple `select count(*) from table 
where date=20140701` would run very fast (0.1 seconds). However, as I added 
more partitions the query takes longer and longer. When I added about 10,000 
partitions, the query took way too long. I feel like querying for a single 
partition should not be affected by having more partitions. Is this a known 
behaviour? What does spark try to do here?

Thanks,
Jerrick





Re: Spark Interview Questions

2015-08-19 Thread Sandeep Giri
Thank you All. I have updated it to a little better version.


Regards,
Sandeep Giri,
+1 347 781 4573 (US)
+91-953-899-8962 (IN)

www.KnowBigData.com. http://KnowBigData.com.
Phone: +1-253-397-1945 (Office)

[image: linkedin icon] https://linkedin.com/company/knowbigdata [image:
other site icon] http://knowbigdata.com  [image: facebook icon]
https://facebook.com/knowbigdata [image: twitter icon]
https://twitter.com/IKnowBigData https://twitter.com/IKnowBigData


On Mon, Aug 17, 2015 at 7:10 PM, Sandeep Giri sand...@knowbigdata.com
wrote:

 This statement is from the Spark's website itself.


 Regards,
 Sandeep Giri,
 +1 347 781 4573 (US)
 +91-953-899-8962 (IN)

 www.KnowBigData.com. http://KnowBigData.com.
 Phone: +1-253-397-1945 (Office)

 [image: linkedin icon] https://linkedin.com/company/knowbigdata [image:
 other site icon] http://knowbigdata.com  [image: facebook icon]
 https://facebook.com/knowbigdata [image: twitter icon]
 https://twitter.com/IKnowBigData https://twitter.com/IKnowBigData


 On Wed, Aug 12, 2015 at 10:42 PM, Peyman Mohajerian mohaj...@gmail.com
 wrote:

 I think this statement is inaccurate:
 Q7: What are Actions? A: An action brings back the data from the RDD to
 the local machine -

 Also I wouldn't say Spark is 100x faster than Hadoop and it is memory
 based. This is the kind of statement that will not get you the job. When it
 comes to shuffle it has to write to disk, it is a faster in many cases but
 100x is just some marketing statement in a very narrow use cases.






 On Thu, Jul 30, 2015 at 4:55 AM, Sandeep Giri sand...@knowbigdata.com
 wrote:

 i have prepared some interview questions:
 http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-1
 http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2

 please provide your feedback.

 On Wed, Jul 29, 2015, 23:43 Pedro Rodriguez ski.rodrig...@gmail.com
 wrote:

 You might look at the edx course on Apache Spark or ML with Spark.
 There are probably some homework problems or quiz questions that might be
 relevant. I haven't looked at the course myself, but thats where I would go
 first.


 https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x

 https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x

 --
 Pedro Rodriguez
 PhD Student in Distributed Machine Learning | CU Boulder
 UC Berkeley AMPLab Alumni

 ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
 Github: github.com/EntilZha | LinkedIn:
 https://www.linkedin.com/in/pedrorodriguezscience






Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-08-19 Thread Rick Moritz
Dear list,

I am observing a very strange difference in behaviour between a Spark
1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin
interpreter (compiled with Java 6 and sourced from maven central).

The workflow loads data from Hive, applies a number of transformations
(including quite a lot of shuffle operations) and then presents an enriched
dataset. The code (an resulting DAGs) are identical in each case.

The following particularities are noted:
Importing the HiveRDD and caching it yields identical results on both
platforms.
Applying case classes, leads to a 2-2.5MB increase in dataset size per
partition (excepting empty partitions).

Writing shuffles shows this much more significant result:

Zeppelin:
*Total Time Across All Tasks: * 2,6 min
*Input Size / Records: * 2.4 GB / 7314771
*Shuffle Write: * 673.5 MB / 7314771

vs

Spark-shell:
*Total Time Across All Tasks: * 28 min
*Input Size / Records: * 3.6 GB / 7314771
*Shuffle Write: * 9.0 GB / 7314771

This is one of the early stages, which reads from a cached partition and
then feeds into a join-stage. The latter stages show similar behaviour in
producing excessive shuffle spills.

Quite often the excessive shuffle volume will lead to massive shuffle
spills which ultimately kill not only performance, but the actual executors
as well.

I have examined the Environment tab in the SParkUI and identified no
notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling
mode. I fail to see how this would impact shuffle writes in such a drastic
way, since it should be on the inter-job level, while this happens at the
inter-stage level.

I was somewhat supicious of maybe compression or serialization playing a
role, but the SparkConf points to those being set to the default. Also
Zeppelin's interpreter adds no relevant additional default parameters.
I performed a diff between rc4 (which was later released) and 1.4.0 and as
expected there were no differences, besides a single class (remarkably, a
shuffle-relevant class:
/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
differing in its binary representation due to being compiled with Java 7
instead of Java 6. The decompiled sources of those two are again identical.

I may attempt as a next step to simply replace that file in the packaged
jar, to ascertain that indeed there is no difference between the two
versions, but would consider this to be a major bg, if a simple compiler
change leads to this kind of issue.

I a also open for any other ideas, in particular to verify that the same
compression/serialization is indeed happening, and regarding ways to
determin what exactly is written into these shuffles -- currently I only
know that the tuples are bigger (or smaller) than they ought to be. The
Zeppelin-obtained results do appear to be consistent at least, thus the
suspicion is, that there is an issue with the process launched from
spark-shell. I will also attempt to build a spark job and spark-submit it
using different spark-binaries to further explore the issue.

Best Regards,

Rick Moritz

PS: I already tried to send this mail yesterday, but it never made it onto
the list, as far as I can tell -- I apologize should anyone receive this as
a second copy.


Re:How to automatically relaunch a Driver program after crashes?

2015-08-19 Thread Todd
There is an option for the spark-submit (Spark standalone or Mesos with cluster 
deploy mode only)
  --supervise If given, restarts the driver on failure.







At 2015-08-19 14:55:39, Spark Enthusiast sparkenthusi...@yahoo.in wrote:

Folks,


As I see, the Driver program is a single point of failure. Now, I have seen 
ways as to how to make it recover from failures on a restart (using 
Checkpointing) but I have not seen anything as to how to restart it 
automatically if it crashes.


Will running the Driver as a Hadoop Yarn Application do it? Can someone educate 
me as to how?

Re: What am I missing that's preventing javac from finding the libraries (CLASSPATH is setup...)?

2015-08-19 Thread UMESH CHAUDHARY
Just add spark_1.4.1_yarn_shuffle.jar in ClassPath or create a New Maven
project using below dependency:

dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.11/artifactId
version1.4.1/version
/dependency

dependency
groupIdorg.apache.spark/groupId
artifactIdspark-sql_2.11/artifactId
version1.4.1/version
/dependency





On Tue, Aug 18, 2015 at 11:51 PM, Jerry jerry.c...@gmail.com wrote:

 So from what I understand, those usually pull dependencies for a given
 project? I'm able to run the spark shell so I'd assume I have everything.
 What am I missing from the big picture and what directory do I run maven on?

 Thanks,
 Jerry

 On Tue, Aug 18, 2015 at 11:15 AM, Ted Yu yuzhih...@gmail.com wrote:

 Normally people would establish maven project with Spark dependencies or,
 use sbt.

 Can you go with either approach ?

 Cheers

 On Tue, Aug 18, 2015 at 10:28 AM, Jerry jerry.c...@gmail.com wrote:

 Hello,

 So I setup Spark to run on my local machine to see if I can reproduce
 the issue I'm having with data frames, but I'm running into issues with the
 compiler.

 Here's what I got:

 $ echo $CLASSPATH

 /usr/lib/jvm/java-6-oracle/lib:/home/adminz/dev/spark/spark-1.4.1/lib/spark-assembly-1.4.1-hadoop2.6.0.jar


 javac Test.java
 Test.java:1: package org.apache.spark.sql.api.java does not exist
 import org.apache.spark.sql.api.java.*;
 ^
 Test.java:6: package org.apache.spark.sql does not exist
 import org.apache.spark.sql.*;
 ^
 Test.java:7: package org.apache.spark.sql.hive does not exist
 import org.apache.spark.sql.hive.*;
 


 Let me know what I'm doing wrong.

 Thanks,
 Jerry






SaveAsTable changes the order of rows

2015-08-19 Thread Kevin Jung
I have a simple RDD with Key/Value and do

val partitioned = rdd.partitionBy(new HashPartitioner(400))
val row = partitioned.first

I can get a key G2726 from a returned row. This first row is located on a 
partition #0 because G2726.hashCode is 67114000 and 67114000%400 is 0. But 
the order of keys is changed when I save rdd to table by doing saveAsTable. 
After doing this and calling sqlContext.table, a key from a first row is 
G265. Does DataFrame forget a parent's partitioner or Parquet format always 
rearranges the order of original data? In my case, the order is not important 
but some of users may want to keep their keys ordered.


Kevin




상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-19 Thread Shenghua(Daniel) Wan
+1

I wish I have read this blog earlier. I am using Java and have just
implemented a singleton producer per executor/JVM during the day.
Yes, I did see that NonSerializableException when I was debugging the code
...

Thanks for sharing.

On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das t...@databricks.com wrote:

 Its a cool blog post! Tweeted it!
 Broadcasting the configuration necessary for lazily instantiating the
 producer is a good idea.

 Nitpick: The first code example has an extra `}` ;)

 On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com
 wrote:

 As long as Kafka producent is thread-safe you don't need any pool at all.
 Just share single producer on every executor. Please look at my blog post
 for more details. http://allegro.tech/spark-kafka-integration.html
 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com
 napisał(a):

 All of you are right.

 I was trying to create too many producers. My idea was to create a
 pool(for now the pool contains only one producer) shared by all the
 executors.
 After I realized it was related to the serializable issues (though I did
 not find clear clues in the source code to indicate the broacast template
 type parameter must be implement serializable),  I followed spark cassandra
 connector design and created a singleton of Kafka producer pools. There is
 not exception noticed.

 Thanks for all your comments.


 On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com
 wrote:

 Why are you even trying to broadcast a producer? A broadcast variable
 is some immutable piece of serializable DATA that can be used for
 processing on the executors. A Kafka producer is neither DATA nor
 immutable, and definitely not serializable.
 The right way to do this is to create the producer in the executors.
 Please see the discussion in the programming guide

 http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I wouldn't expect a kafka producer to be serializable at all... among
 other things, it has a background thread

 On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan 
 wansheng...@gmail.com wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this
 in the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get
 the wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at
 

Does spark sql support column indexing

2015-08-19 Thread Todd
I don't find related talk on whether spark sql supports column indexing. If it 
does, is there guide how to do it? Thanks.


How to automatically relaunch a Driver program after crashes?

2015-08-19 Thread Spark Enthusiast
Folks,
As I see, the Driver program is a single point of failure. Now, I have seen 
ways as to how to make it recover from failures on a restart (using 
Checkpointing) but I have not seen anything as to how to restart it 
automatically if it crashes.
Will running the Driver as a Hadoop Yarn Application do it? Can someone educate 
me as to how?

Re: What's the best practice for developing new features for spark ?

2015-08-19 Thread Ted Yu
See this thread:

http://search-hadoop.com/m/q3RTtdZv0d1btRHl/Spark+build+modulesubj=Building+Spark+Building+just+one+module+



 On Aug 19, 2015, at 1:44 AM, canan chen ccn...@gmail.com wrote:
 
 I want to work on one jira, but it is not easy to do unit test, because it 
 involves different components especially UI. spark building is pretty slow, I 
 don't want to build it each time to test my code change. I am wondering how 
 other people do ? Is there any experience can share ? Thanks
 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark UI returning error 500 in yarn-client mode

2015-08-19 Thread Moshe Eshel
Running Spark on YARN in yarn-client mode, everything appears to be working
just fine except I can't access Spark UI via the Application Master link in
YARN UI or directly at http://driverip:4040/jobs I get error 500, and the
driver log shows the error pasted below:

When running the same job using spark master mode, everything is fine.

Any help is appreciated Error Stacktrace as appears in driver:
WARN  [2015-08-12 14:05:46,856]
org.spark-project.jetty.server.AbstractHttpConnection: header full:
java.lang.RuntimeException: Header6144
WARN  [2015-08-12 14:05:46,857] org.spark-project.jetty.server.Response:
Committed before 500 null
WARN  [2015-08-12 14:05:46,862]
org.spark-project.jetty.server.AbstractHttpConnection:

Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-19 Thread Shushant Arora
To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but
OptionKafkaRDD[K, V, U, T, R]   is not subclass of OptionRDD[R];

In scala C[T’] is a subclass of C[T] as per
https://twitter.github.io/scala_school/type-basics.html
but this is not allowed in java.

So is there any workaround to achieve this in java for overriding
DirectKafkaInputDStream
?


On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic
 inheritance is not supported so derived class cannot return  different
 genric typed subclass from overriden method.

 On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Option is covariant and KafkaRDD is a subclass of RDD

 On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Is it that in scala its allowed for derived class to have any return
 type ?

  And streaming jar is originally created in scala so its allowed for
 DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
 compute method ?

 On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 looking at source code of
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream

 override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]]
 = {
 val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
 val rdd = KafkaRDD[K, V, U, T, R](
   context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
 messageHandler)

 currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset)
 Some(rdd)
   }


 But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,

 So what should  be the return type of custom DStream extends
 DirectKafkaInputDStream .
 Since I want the behaviour to be same as of DirectKafkaInputDStream  in
 normal scenarios and return none in specific scenario.

 And why the same error did not come while extending
 DirectKafkaInputDStream from InputDStream ? Since new return type 
 Option[KafkaRDD[K,
 V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
 failed?




 On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org
 wrote:

 The superclass method in DStream is defined as returning an
 Option[RDD[T]]

 On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Getting compilation error while overriding compute method of
 DirectKafkaInputDStream.


 [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
 compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
 cannot override compute(org.apache.spark.streaming.Time) in
 org.apache.spark.streaming.dstream.DStream; attempting to use 
 incompatible
 return type

 [ERROR] found   :
 scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]

 [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][]


 class :

 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder,
 DefaultDecoder, byte[][] compute(
 Time validTime) {

 int processed=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }
 }


 What should be the return type of compute method ? super class is
 returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][]  but its expecting
  scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived  class . 
 Is
 there something wring with code?

 On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Look at the definitions of the java-specific
 KafkaUtils.createDirectStream methods (the ones that take a
 JavaStreamingContext)

 On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 How to create classtag in java ?Also Constructor
 of DirectKafkaInputDStream takes Function1 not Function but
 kafkautils.createDirectStream allows function.

 I have below as overriden DirectKafkaInputDStream.


 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], 
 kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 public CustomDirectKafkaInputDstream(
 StreamingContext ssc_,
 MapString, String kafkaParams,
 MapTopicAndPartition, Object fromOffsets,
 Function1MessageAndMetadatabyte[], byte[], byte[][]
 messageHandler,
 ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2,
 ClassTagDefaultDecoder evidence$3,
 ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5)
 {
 super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
 evidence$2,
 evidence$3, evidence$4, 

Re: Spark Streaming failing on YARN Cluster

2015-08-19 Thread Ramkumar V
Thanks a lot for your suggestion. I had modified HADOOP_CONF_DIR in
spark-env.sh so that core-site.xml is under HADOOP_CONF_DIR. i can able to
see the logs like that you had shown above. Now i can able to run for 3
minutes and store results between every minutes. After sometimes, there is
an exception. How to fix this exception ? and Can you please explain where
its going wrong ?

*Log Link : http://pastebin.com/xL9jaRUa http://pastebin.com/xL9jaRUa *


*Thanks*,
https://in.linkedin.com/in/ramkumarcs31


On Wed, Aug 19, 2015 at 1:54 PM, Jeff Zhang zjf...@gmail.com wrote:

 HADOOP_CONF_DIR is the environment variable point to the hadoop conf
 directory.  Not sure how CDH organize that, make sure core-site.xml is
 under HADOOP_CONF_DIR.

 On Wed, Aug 19, 2015 at 4:06 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 We are using Cloudera-5.3.1. since it is one of the earlier version of
 CDH, it doesnt supports the latest version of spark. So i installed
 spark-1.4.1 separately in my machine. I couldnt able to do spark-submit in
 cluster mode. How to core-site.xml under classpath ? it will be very
 helpful if you could explain in detail to solve this issue.

 *Thanks*,
 https://in.linkedin.com/in/ramkumarcs31


 On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote:


1. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying

 file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar
2. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying

 file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar
3. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip
4. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip
5. 15/08/12 13:24:49 INFO Client: Source and destination file
systems are the same. Not copying
file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py
6.


1. diagnostics: Application application_1437639737006_3808 failed 2
times due to AM Container for appattempt_1437639737006_3808_02 exited
with  exitCode: -1000 due to: File
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist
2. .Failing this attempt.. Failing the application.



 The machine you run spark is the client machine, while the yarn AM is
 running on another machine. And the yarn AM complains that the files are
 not found as your logs shown.
 From the logs, its seems that these files are not copied to the HDFS as
 local resources. I doubt that you didn't put core-site.xml under your
 classpath, so that spark can not detect your remote file system and won't
 copy the files to hdfs as local resources. Usually in yarn-cluster mode,
 you should be able to see the logs like following.

  15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM
 container
  15/08/14 10:48:49 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
 - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/spark.py - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/pyspark.zip

 On Thu, Aug 13, 2015 at 2:50 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 Hi,

 I have a cluster of 1 master and 2 slaves. I'm running a spark
 streaming in master and I want to utilize all nodes in my cluster. i had
 specified some parameters like driver memory and executor memory in my
 code. when i give --deploy-mode cluster --master yarn-cluster in my
 spark-submit, it gives the following error.

 Log link : *http://pastebin.com/kfyVWDGR
 http://pastebin.com/kfyVWDGR*

 How to fix this issue ? Please help me if i'm doing wrong.


 *Thanks*,
 Ramkumar V




 --
 Best Regards

 Jeff Zhang





 --
 Best Regards

 Jeff Zhang



回复:Does spark sql support column indexing

2015-08-19 Thread prosp4300

The answer is simply NO,
But I hope someone could give more deep insight or any meaningful reference
在2015年08月19日 15:21,Todd 写道:
I don't find related talk on whether spark sql supports column indexing. If it 
does, is there guide how to do it? Thanks.


Re: Spark Streaming failing on YARN Cluster

2015-08-19 Thread Ramkumar V
We are using Cloudera-5.3.1. since it is one of the earlier version of CDH,
it doesnt supports the latest version of spark. So i installed spark-1.4.1
separately in my machine. I couldnt able to do spark-submit in cluster
mode. How to core-site.xml under classpath ? it will be very helpful if you
could explain in detail to solve this issue.

*Thanks*,
https://in.linkedin.com/in/ramkumarcs31


On Fri, Aug 14, 2015 at 8:25 AM, Jeff Zhang zjf...@gmail.com wrote:


1. 15/08/12 13:24:49 INFO Client: Source and destination file systems
are the same. Not copying

 file:/home/hdfs/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.5.0-cdh5.3.5.jar
2. 15/08/12 13:24:49 INFO Client: Source and destination file systems
are the same. Not copying

 file:/home/hdfs/spark-1.4.1/external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.4.1.jar
3. 15/08/12 13:24:49 INFO Client: Source and destination file systems
are the same. Not copying 
 file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip
4. 15/08/12 13:24:49 INFO Client: Source and destination file systems
are the same. Not copying
file:/home/hdfs/spark-1.4.1/python/lib/py4j-0.8.2.1-src.zip
5. 15/08/12 13:24:49 INFO Client: Source and destination file systems
are the same. Not copying
file:/home/hdfs/spark-1.4.1/examples/src/main/python/streaming/kyt.py
6.


1. diagnostics: Application application_1437639737006_3808 failed 2
times due to AM Container for appattempt_1437639737006_3808_02 exited
with  exitCode: -1000 due to: File
file:/home/hdfs/spark-1.4.1/python/lib/pyspark.zip does not exist
2. .Failing this attempt.. Failing the application.



 The machine you run spark is the client machine, while the yarn AM is
 running on another machine. And the yarn AM complains that the files are
 not found as your logs shown.
 From the logs, its seems that these files are not copied to the HDFS as
 local resources. I doubt that you didn't put core-site.xml under your
 classpath, so that spark can not detect your remote file system and won't
 copy the files to hdfs as local resources. Usually in yarn-cluster mode,
 you should be able to see the logs like following.

  15/08/14 10:48:49 INFO yarn.Client: Preparing resources for our AM
 container
  15/08/14 10:48:49 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
 - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/spark.py - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/spark.py
  15/08/14 10:48:50 INFO yarn.Client: Uploading resource
 file:/Users/abc/github/spark/python/lib/pyspark.zip - hdfs://
 0.0.0.0:9000/user/abc/.sparkStaging/application_1439432662178_0019/pyspark.zip

 On Thu, Aug 13, 2015 at 2:50 PM, Ramkumar V ramkumar.c...@gmail.com
 wrote:

 Hi,

 I have a cluster of 1 master and 2 slaves. I'm running a spark streaming
 in master and I want to utilize all nodes in my cluster. i had specified
 some parameters like driver memory and executor memory in my code. when i
 give --deploy-mode cluster --master yarn-cluster in my spark-submit, it
 gives the following error.

 Log link : *http://pastebin.com/kfyVWDGR http://pastebin.com/kfyVWDGR*

 How to fix this issue ? Please help me if i'm doing wrong.


 *Thanks*,
 Ramkumar V




 --
 Best Regards

 Jeff Zhang



What's the best practice for developing new features for spark ?

2015-08-19 Thread canan chen
I want to work on one jira, but it is not easy to do unit test, because it
involves different components especially UI. spark building is pretty slow,
I don't want to build it each time to test my code change. I am wondering
how other people do ? Is there any experience can share ? Thanks


Re: Programmatically create SparkContext on YARN

2015-08-19 Thread Andreas Fritzler
Hi Andrew,

Thanks a lot for your response. I am aware of the '--master' flag in the
spark-submit command. However I would like to create the SparkContext
inside my coding.

Maybe I should elaborate a little bit further: I would like to reuse e.g.
the result of any Spark computation inside my code.

Here is the SparkPi example:

String[] jars = new String[1];

   jars[0] = System.getProperty(user.dir) +
 /target/SparkPi-1.0-SNAPSHOT.jar;


   SparkConf conf = new SparkConf()

   .setAppName(JavaSparkPi)

   .setMaster(spark://SPARK_HOST:7077)

   .setJars(jars);

   JavaSparkContext sc = new JavaSparkContext(conf);


   int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;

   int n = 100 * slices;

   ListInteger l = new ArrayListInteger(n);

   for (int i = 0; i  n; i++) {

 l.add(i);

   }


   JavaRDDInteger dataSet = sc.parallelize(l, slices);


   int *count* = dataSet.map(new FunctionInteger, Integer() {

 @Override

 public Integer call(Integer integer) {

   double x = Math.random() * 2 - 1;

   double y = Math.random() * 2 - 1;

   return (x * x + y * y  1) ? 1 : 0;

 }

   }).reduce(new Function2Integer, Integer, Integer() {

 @Override

 public Integer call(Integer integer, Integer integer2) {

   return integer + integer2;

 }

   });

   System.out.println(Pi is roughly  + 4.0 * *count* / n);


   sc.stop();


As you can see, I can reuse the result (count) in my coding directly.

So my goal would be to resuse this kind of implementation in YARN mode
(client/cluster mode). However, I didn't really find a solution how to do
that, since I always have to submit my Spark code via spark-submit.

What if I want to run this code as part of a web application which renders
the result as web page?

-- Andreas

On Tue, Aug 18, 2015 at 10:50 PM, Andrew Or and...@databricks.com wrote:

 Hi Andreas,

 I believe the distinction is not between standalone and YARN mode, but
 between client and cluster mode.

 In client mode, your Spark submit JVM runs your driver code. In cluster
 mode, one of the workers (or NodeManagers if you're using YARN) in the
 cluster runs your driver code. In the latter case, it doesn't really make
 sense to call `setMaster` in your driver because Spark needs to know which
 cluster you're submitting the application to.

 Instead, the recommended way is to set the master through the `--master`
 flag in the command line, e.g.

 $ bin/spark-submit
 --master spark://1.2.3.4:7077
 --class some.user.Clazz
 --name My app name
 --jars lib1.jar,lib2.jar
 --deploy-mode cluster
 app.jar

 Both YARN and standalone modes support client and cluster modes, and the
 spark-submit script is the common interface through which you can launch
 your application. In other words, you shouldn't have to do anything more
 than providing a different value to `--master` to use YARN.

 -Andrew

 2015-08-17 0:34 GMT-07:00 Andreas Fritzler andreas.fritz...@gmail.com:

 Hi all,

 when runnig the Spark cluster in standalone mode I am able to create the
 Spark context from Java via the following code snippet:

 SparkConf conf = new SparkConf()
.setAppName(MySparkApp)
.setMaster(spark://SPARK_MASTER:7077)
.setJars(jars);
 JavaSparkContext sc = new JavaSparkContext(conf);


 As soon as I'm done with my processing, I can just close it via

 sc.stop();

 Now my question: Is the same also possible when running Spark on YARN? I
 currently don't see how this should be possible without submitting your
 application as a packaged jar file. Is there a way to get this kind of
 interactivity from within your Scala/Java code?

 Regards,
 Andrea





Re: How to automatically relaunch a Driver program after crashes?

2015-08-19 Thread Spark Enthusiast
Thanks for the reply.
Are Standalone or Mesos the only options? Is there a way to auto relaunch if 
driver runs as a Hadoop Yarn Application? 


 On Wednesday, 19 August 2015 12:49 PM, Todd bit1...@163.com wrote:
   

 There is an option for the spark-submit (Spark standalone or Mesos with 
cluster deploy mode only)
  --supervise If given, restarts the driver on failure.




At 2015-08-19 14:55:39, Spark Enthusiast sparkenthusi...@yahoo.in wrote:
 
Folks,
As I see, the Driver program is a single point of failure. Now, I have seen 
ways as to how to make it recover from failures on a restart (using 
Checkpointing) but I have not seen anything as to how to restart it 
automatically if it crashes.
Will running the Driver as a Hadoop Yarn Application do it? Can someone educate 
me as to how?


  

Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Jerrick Hoang
I guess the question is why does spark have to do partition discovery with
all partitions when the query only needs to look at one partition? Is there
a conf flag to turn this off?

On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com
wrote:

 I've had the same problem. It turns out that Spark (specifically parquet)
 is very slow at partition discovery. It got better in 1.5 (not yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.

 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Hi all,

 I did a simple experiment with Spark SQL. I created a partitioned parquet
 table with only one partition (date=20140701). A simple `select count(*)
 from table where date=20140701` would run very fast (0.1 seconds). However,
 as I added more partitions the query takes longer and longer. When I added
 about 10,000 partitions, the query took way too long. I feel like querying
 for a single partition should not be affected by having more partitions. Is
 this a known behaviour? What does spark try to do here?

 Thanks,
 Jerrick





Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Jerrick Hoang
Hi all,

I did a simple experiment with Spark SQL. I created a partitioned parquet
table with only one partition (date=20140701). A simple `select count(*)
from table where date=20140701` would run very fast (0.1 seconds). However,
as I added more partitions the query takes longer and longer. When I added
about 10,000 partitions, the query took way too long. I feel like querying
for a single partition should not be affected by having more partitions. Is
this a known behaviour? What does spark try to do here?

Thanks,
Jerrick


Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Philip Weaver
I've had the same problem. It turns out that Spark (specifically parquet)
is very slow at partition discovery. It got better in 1.5 (not yet
released), but was still unacceptably slow. Sadly, we ended up reading
parquet files manually in Python (via C++) and had to abandon Spark SQL
because of this problem.

On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com
wrote:

 Hi all,

 I did a simple experiment with Spark SQL. I created a partitioned parquet
 table with only one partition (date=20140701). A simple `select count(*)
 from table where date=20140701` would run very fast (0.1 seconds). However,
 as I added more partitions the query takes longer and longer. When I added
 about 10,000 partitions, the query took way too long. I feel like querying
 for a single partition should not be affected by having more partitions. Is
 this a known behaviour? What does spark try to do here?

 Thanks,
 Jerrick