Re: can spark take advantage of ordered data?

2017-03-10 Thread Jonathan Coveney
While I was at Two Sigma I ended up implementing something similar to what
Koert described... you can check it out here:
https://github.com/twosigma/flint/blob/master/src/main/scala/com/twosigma/flint/rdd/OrderedRDD.scala.
They've built a lot more on top of this (including support for dataframes
etc).

2017-03-10 9:45 GMT-05:00 Koert Kuipers :

> this shouldn't be too hard. adding something to spark-sorted or to the
> dataframe/dataset logical plan that says "trust me, i am already
> partitioned and sorted" seems doable. however you most likely need a custom
> hash partitioner, and you have to be careful to read the data in without
> file splitting.
>
> On Mar 10, 2017 9:10 AM, "sourabh chaki"  wrote:
>
>> My use case is also quite similar. I have 2 feeds. One 3TB and another
>> 100GB. Both the feeds are generated by hadoop reduce operation and
>> partitioned by hadoop hashpartitioner. 3TB feed has 10K partitions whereas
>> 100GB file has 200 partitions.
>>
>> Now when I do a join between these two feeds using spark, spark shuffles
>> both the RDDS and it takes long time to complete. Can we do something so
>> that spark can recognise the existing partitions of 3TB feed and shuffles
>> only 200GB feed?
>> It can be mapside scan for bigger RDD and shuffle read from smaller RDD?
>>
>> I have looked at spark-sorted project, but that project does not utilise
>> the pre-existing partitions in the feed.
>> Any pointer will be helpful.
>>
>> Thanks
>> Sourabh
>>
>> On Thu, Mar 12, 2015 at 6:35 AM, Imran Rashid 
>> wrote:
>>
>>> Hi Jonathan,
>>>
>>> you might be interested in https://issues.apache.org/j
>>> ira/browse/SPARK-3655 (not yet available) and https://github.com/tresata
>>> /spark-sorted (not part of spark, but it is available right now).
>>> Hopefully thats what you are looking for.  To the best of my knowledge that
>>> covers what is available now / what is being worked on.
>>>
>>> Imran
>>>
>>> On Wed, Mar 11, 2015 at 4:38 PM, Jonathan Coveney 
>>> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I am wondering if spark already has support for optimizations on sorted
>>>> data and/or if such support could be added (I am comfortable dropping to a
>>>> lower level if necessary to implement this, but I'm not sure if it is
>>>> possible at all).
>>>>
>>>> Context: we have a number of data sets which are essentially already
>>>> sorted on a key. With our current systems, we can take advantage of this to
>>>> do a lot of analysis in a very efficient fashion...merges and joins, for
>>>> example, can be done very efficiently, as can folds on a secondary key and
>>>> so on.
>>>>
>>>> I was wondering if spark would be a fit for implementing these sorts of
>>>> optimizations? Obviously it is sort of a niche case, but would this be
>>>> achievable? Any pointers on where I should look?
>>>>
>>>
>>>
>>


Re: simultaneous actions

2016-01-15 Thread Jonathan Coveney
SparkContext is thread safe. And RDDs just describe operations.

While I generally agree that you want to model as much possible as
transformations as possible, this is not always possible. And in that case,
you have no option than to use threads.

Spark's designers should have made all actions return Futures, but alas...

El viernes, 15 de enero de 2016, Jakob Odersky 
escribió:

> I don't think RDDs are threadsafe.
> More fundamentally however, why would you want to run RDD actions in
> parallel? The idea behind RDDs is to provide you with an abstraction for
> computing parallel operations on distributed data. Even if you were to call
> actions from several threads at once, the individual executors of your
> spark environment would still have to perform operations sequentially.
>
> As an alternative, I would suggest to restructure your RDD transformations
> to compute the required results in one single operation.
>
> On 15 January 2016 at 06:18, Jonathan Coveney  > wrote:
>
>> Threads
>>
>>
>> El viernes, 15 de enero de 2016, Kira > > escribió:
>>
>>> Hi,
>>>
>>> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can
>>> this
>>> be done ?
>>>
>>> Thank you,
>>> Regards
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: simultaneous actions

2016-01-15 Thread Jonathan Coveney
Threads

El viernes, 15 de enero de 2016, Kira  escribió:

> Hi,
>
> Can we run *simultaneous* actions on the *same RDD* ?; if yes how can this
> be done ?
>
> Thank you,
> Regards
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/simultaneous-actions-tp25977.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
>
>


Re: Is there a way to delete task history besides using a ttl?

2015-11-17 Thread Jonathan Coveney
reading the code, is there any reason why
setting spark.cleaner.ttl.MAP_OUTPUT_TRACKER directly won't get picked up?

2015-11-17 14:45 GMT-05:00 Jonathan Coveney :

> so I have the following...
>
> broadcast some stuff
> cache an rdd
> do a bunch of stuff, eventually calling actions which reduce it to an
> acceptable size
>
> I'm getting an OOM on the driver (well, GC is getting out of control),
> largely because I have a lot of partitions and it looks like the job
> history is getting too large. ttl is an option, but the downside is that it
> will also delete the rdd...this isn't really what I want. what I want is to
> keep my in memory data structures (the rdd, broadcast variable, etc) but
> get rid of the old metadata that I don't need anymore (ie tasks that have
> executed).
>
> Is there a way to achieve this?
>


Is there a way to delete task history besides using a ttl?

2015-11-17 Thread Jonathan Coveney
so I have the following...

broadcast some stuff
cache an rdd
do a bunch of stuff, eventually calling actions which reduce it to an
acceptable size

I'm getting an OOM on the driver (well, GC is getting out of control),
largely because I have a lot of partitions and it looks like the job
history is getting too large. ttl is an option, but the downside is that it
will also delete the rdd...this isn't really what I want. what I want is to
keep my in memory data structures (the rdd, broadcast variable, etc) but
get rid of the old metadata that I don't need anymore (ie tasks that have
executed).

Is there a way to achieve this?


Re: Getting ClassNotFoundException: scala.Some on Spark 1.5.x

2015-11-02 Thread Jonathan Coveney
My guess, and it's just a guess, is that there is some change between
versions which you got bit by as it chsnged the class path.

El lunes, 2 de noviembre de 2015, Babar Tareen 
escribió:

> I am using *'sbt run'* to execute the code. Detailed sbt output is here (
> https://drive.google.com/open?id=0B2dlA_DzEohVakpValRjRS1zVG8).
>
> I had scala 2.11.7 installed on my machine. But even after uninstalling
> it, I am still getting the exception with 2.11.6.
>
> Changing the scala version to 2.11.7 in build.sbt fixes the exception as
> you suggested. I am unclear as to why it works with 2.11.7 and not 2.11.6.
>
> Thanks,
> Babar
>
> On Mon, Nov 2, 2015 at 2:10 PM Jonathan Coveney  > wrote:
>
>> Caused by: java.lang.ClassNotFoundException: scala.Some
>>
>> indicates that you don't have the scala libs present. How are you
>> executing this? My guess is the issue is a conflict between scala 2.11.6 in
>> your build and 2.11.7? Not sure...try setting your scala to 2.11.7?
>>
>> But really, first it'd be good to see what command you're using to invoke
>> this.
>>
>> 2015-11-02 14:48 GMT-05:00 Babar Tareen > >:
>>
>>> Resending, haven't found a workaround. Any help is highly appreciated.
>>>
>>> -- Forwarded message --
>>> From: Babar Tareen >> >
>>> Date: Thu, Oct 22, 2015 at 2:47 PM
>>> Subject: Getting ClassNotFoundException: scala.Some on Spark 1.5.x
>>> To: user@spark.apache.org
>>> 
>>>
>>>
>>> Hi,
>>>
>>> I am getting following exception when submitting a job to Spark 1.5.x
>>> from Scala. The same code works with Spark 1.4.1. Any clues as to what
>>> might causing the exception.
>>>
>>>
>>>
>>> *Code:App.scala*import org.apache.spark.SparkContext
>>>
>>> object App {
>>>   def main(args: Array[String]) = {
>>> val l = List(1,2,3,4,5,6,7,8,9,0)
>>> val sc = new SparkContext("local[4]", "soark-test")
>>> val rdd = sc.parallelize(l)
>>> rdd.foreach(println)
>>> println(rdd.collect())
>>>   }
>>> }
>>>
>>> *build.sbt*
>>> lazy val sparkjob = (project in file("."))
>>>   .settings(
>>> name := "SparkJob",
>>> version := "1.0",
>>> scalaVersion := "2.11.6",
>>> libraryDependencies := libs
>>> )
>>>
>>> lazy val libs = Seq(
>>>   "org.apache.spark" %% "spark-core" % "1.5.1"
>>> )
>>>
>>>
>>> *Exception:*15/10/22 14:32:42 INFO DAGScheduler: Job 0 failed: foreach
>>> at app.scala:9, took 0.689832 s
>>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
>>> stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure:
>>> Lost task 2.0 in stage 0.0 (TID 2, localhost): java.io.IOException:
>>> java.lang.ClassNotFoundException: scala.Some
>>> [error] at
>>> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
>>> [error] at
>>> org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
>>> [error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> [error] at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> [error] at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> [error] at java.lang.reflect.Method.invoke(Method.java:497)
>>> [error] at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>> [error] at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>>> [error] at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> [error] at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> [error] at
>>> java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
>>> [error] at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> [error] at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>> [error] at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> [error] at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:19

Re: Getting ClassNotFoundException: scala.Some on Spark 1.5.x

2015-11-02 Thread Jonathan Coveney
Caused by: java.lang.ClassNotFoundException: scala.Some

indicates that you don't have the scala libs present. How are you executing
this? My guess is the issue is a conflict between scala 2.11.6 in your
build and 2.11.7? Not sure...try setting your scala to 2.11.7?

But really, first it'd be good to see what command you're using to invoke
this.

2015-11-02 14:48 GMT-05:00 Babar Tareen :

> Resending, haven't found a workaround. Any help is highly appreciated.
>
> -- Forwarded message --
> From: Babar Tareen 
> Date: Thu, Oct 22, 2015 at 2:47 PM
> Subject: Getting ClassNotFoundException: scala.Some on Spark 1.5.x
> To: user@spark.apache.org
>
>
> Hi,
>
> I am getting following exception when submitting a job to Spark 1.5.x from
> Scala. The same code works with Spark 1.4.1. Any clues as to what might
> causing the exception.
>
>
>
> *Code:App.scala*import org.apache.spark.SparkContext
>
> object App {
>   def main(args: Array[String]) = {
> val l = List(1,2,3,4,5,6,7,8,9,0)
> val sc = new SparkContext("local[4]", "soark-test")
> val rdd = sc.parallelize(l)
> rdd.foreach(println)
> println(rdd.collect())
>   }
> }
>
> *build.sbt*
> lazy val sparkjob = (project in file("."))
>   .settings(
> name := "SparkJob",
> version := "1.0",
> scalaVersion := "2.11.6",
> libraryDependencies := libs
> )
>
> lazy val libs = Seq(
>   "org.apache.spark" %% "spark-core" % "1.5.1"
> )
>
>
> *Exception:*15/10/22 14:32:42 INFO DAGScheduler: Job 0 failed: foreach at
> app.scala:9, took 0.689832 s
> [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
> stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure:
> Lost task 2.0 in stage 0.0 (TID 2, localhost): java.io.IOException:
> java.lang.ClassNotFoundException: scala.Some
> [error] at
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
> [error] at
> org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
> [error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [error] at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> [error] at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> [error] at java.lang.reflect.Method.invoke(Method.java:497)
> [error] at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> [error] at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
> [error] at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> [error] at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> [error] at
> java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
> [error] at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> [error] at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> [error] at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> [error] at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> [error] at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> [error] at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> [error] at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> [error] at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> [error] at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
> [error] at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
> [error] at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
> [error] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [error] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [error] at java.lang.Thread.run(Thread.java:745)
> [error] Caused by: java.lang.ClassNotFoundException: scala.Some
> [error] at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> [error] at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> [error] at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> [error] at java.lang.Class.forName0(Native Method)
> [error] at java.lang.Class.forName(Class.java:348)
> [error] at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> [error] at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> [error] at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> [error] at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> [error] at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> [error] at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStr

Re: Sort Merge Join

2015-11-02 Thread Jonathan Coveney
Additionally, I'm curious if there are any JIRAS around making dataframes
support ordering better? there are a lot of operations that can be
optimized if you know that you have a total ordering on your data...are
there any plans, or at least JIRAS, around having the catalyst optimizer
handle this case?

2015-11-02 9:39 GMT-05:00 Alex Nastetsky :

> Thanks for the response.
>
> Taking the file system based data source as “UnknownPartitioning”, will
> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
> from different data sets with the identical join keys will be loaded by the
> same node/task , since lots of factors need to be considered, like task
> pool size, cluster size, source format, storage, data locality etc.,.
>
> I’ll agree it’s worth to optimize it for performance concerns, and
> actually in Hive, it is called bucket join. I am not sure will that happens
> soon in Spark SQL.
>
>
> Yes, this is supported in
>
>- Hive with bucket join
>- Pig with USING "merge"
>
>- MR with CompositeInputFormat
>
> But I guess it's not supported in Spark?
>
> On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao  wrote:
>
>> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
>> example, in the code below, the two datasets have different number of
>> partitions, but it still does a SortMerge join after a "hashpartitioning".
>>
>>
>>
>> [Hao:] A distributed JOIN operation (either HashBased or SortBased Join)
>> requires the records with the identical join keys MUST BE shuffled to the
>> same “reducer” node / task, hashpartitioning is just a strategy to tell
>> spark shuffle service how to achieve that, in theory, we even can use the
>> `RangePartitioning` instead (but it’s less efficient, that’s why we don’t
>> choose it for JOIN). So conceptually the JOIN operator doesn’t care so much
>> about the shuffle strategy so much if it satisfies the demand on data
>> distribution.
>>
>>
>>
>> 2) If both datasets have already been previously partitioned/sorted the
>> same and stored on the file system (e.g. in a previous job), is there a way
>> to tell Spark this so that it won't want to do a "hashpartitioning" on
>> them? It looks like Spark just considers datasets that have been just read
>> from the the file system to have UnknownPartitioning. In the example below,
>> I try to join a dataframe to itself, and it still wants to hash repartition.
>>
>>
>>
>> [Hao:] Take this as example:
>>
>>
>>
>> EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON
>> a.key=b.key JOIN src c ON b.key=c.key
>>
>>
>>
>> == Physical Plan ==
>>
>> TungstenProject [value#20,value#22,value#24]
>>
>> SortMergeJoin [key#21], [key#23]
>>
>>   TungstenSort [key#21 ASC], false, 0
>>
>>TungstenProject [key#21,value#22,value#20]
>>
>> SortMergeJoin [key#19], [key#21]
>>
>>  TungstenSort [key#19 ASC], false, 0
>>
>>   TungstenExchange hashpartitioning(key#19,200)
>>
>>ConvertToUnsafe
>>
>> HiveTableScan [key#19,value#20], (MetastoreRelation default, src,
>> Some(a))
>>
>>  TungstenSort [key#21 ASC], false, 0
>>
>>   TungstenExchange hashpartitioning(key#21,200)
>>
>>ConvertToUnsafe
>>
>> HiveTableScan [key#21,value#22], (MetastoreRelation default, src,
>> Some(b))
>>
>>   TungstenSort [key#23 ASC], false, 0
>>
>>TungstenExchange hashpartitioning(key#23,200)
>>
>> ConvertToUnsafe
>>
>>  HiveTableScan [key#23,value#24], (MetastoreRelation default, src,
>> Some(c))
>>
>>
>>
>> There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN
>> src b ON a.key=b.key”, as we didn’t change the data distribution after
>> it, so we can join another table “JOIN src c ON b.key=c.key” directly,
>> which only require the table “c” for repartitioning on “key”.
>>
>>
>>
>> Taking the file system based data source as “UnknownPartitioning”, will
>> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
>> from different data sets with the identical join keys will be loaded by the
>> same node/task , since lots of factors need to be considered, like task
>> pool size, cluster size, source format, storage, data locality etc.,.
>>
>> I’ll agree it’s worth to optimize it for performance concerns, and
>> actually in Hive, it is called bucket join. I am not sure will that happens
>> soon in Spark SQL.
>>
>>
>>
>> Hao
>>
>>
>>
>> *From:* Alex Nastetsky [mailto:alex.nastet...@vervemobile.com]
>> *Sent:* Monday, November 2, 2015 11:29 AM
>> *To:* user
>> *Subject:* Sort Merge Join
>>
>>
>>
>> Hi,
>>
>>
>>
>> I'm trying to understand SortMergeJoin (SPARK-2213).
>>
>>
>>
>> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
>> example, in the code below, the two datasets have different number of
>> partitions, but it still does a SortMerge join after a "hashpartitioning".
>>
>>
>>
>> CODE:
>>
>>val sparkConf = new SparkConf()
>>
>>   .setApp

Re: Cannot start REPL shell since 1.4.0

2015-10-23 Thread Jonathan Coveney
do you have JAVA_HOME set to a java 7 jdk?

2015-10-23 7:12 GMT-04:00 emlyn :

> xjlin0 wrote
> > I cannot enter REPL shell in 1.4.0/1.4.1/1.5.0/1.5.1(with pre-built with
> > or without Hadoop or home compiled with ant or maven).  There was no
> error
> > message in v1.4.x, system prompt nothing.  On v1.5.x, once I enter
> > $SPARK_HOME/bin/pyspark or spark-shell, I got
> >
> > Error: Could not find or load main class org.apache.spark.launcher.Main
>
> I have the same problem (on MacOS X Yosemite, all spark versions since 1.4,
> installed both with homebrew and downloaded manually). I've been trying to
> start the pyspark shell, but it also fails in the same way for spark-shell
> and spark-sql and spark-submit. I've narrowed it down to the following line
> in the spark-class script:
>
> done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main
> "$@")
>
> (where $RUNNER is "java" and $LAUNCH_CLASSPATH is
>
> "/usr/local/Cellar/apache-spark/1.5.1/libexec/lib/spark-assembly-1.5.1-hadoop2.6.0.jar",
> which does exist and does contain the org.apache.spark.launcher.Main class,
> despite the message that it can't be found)
>
> If I run it manually, using:
>
> SPARK_HOME=/usr/local/Cellar/apache-spark/1.5.1/libexec java -cp
>
> /usr/local/Cellar/apache-spark/1.5.1/libexec/lib/spark-assembly-1.5.1-hadoop2.6.0.jar
> org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit
> pyspark-shell-main --name PySparkShell
>
> It runs without that error, and instead prints out (where "\0" is a nul
> character):
>
> env\0PYSPARK_SUBMIT_ARGS="--name" "PySparkShell" "pyspark-shell"\0python\0
>
> I'm not really sure what to try next, maybe with this extra information
> someone has an idea what's going wrong, and how to fix it.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-start-REPL-shell-since-1-4-0-tp24921p25176.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark performance non-linear response

2015-10-07 Thread Jonathan Coveney
I've noticed this as well and am curious if there is anything more people
can say.

My theory is that it is just communication overhead. If you only have a
couple of gigabytes (a tiny dataset), then spotting that into 50 nodes
means you'll have a ton of tiny partitions all finishing very quickly, and
thus creating a lot of communication overhead per amount of data processed.
Just a theory though.

El miércoles, 7 de octubre de 2015, Yadid Ayzenberg 
escribió:

> Additional missing relevant information:
>
> Im running a transformation, there are no Shuffles occurring and at the
> end im performing a lookup of 4 partitions on the driver.
>
>
>
> On 10/7/15 11:26 AM, Yadid Ayzenberg wrote:
>
> Hi All,
>
> Im using spark 1.4.1 to to analyze a largish data set (several Gigabytes
> of data). The RDD is partitioned into 2048 partitions which are more or
> less equal and entirely cached in RAM.
> I evaluated the performance on several cluster sizes, and am witnessing a
> non linear (power) performance improvement as the cluster size increases
> (plot below). Each node has 4 cores and each worker is configured to use
> 10GB or RAM.
>
> [image: Spark performance]
>
> I would expect a more linear response given the number of partitions and
> the fact that all of the data is cached.
> Can anyone suggest what I should tweak in order to improve the performance?
> Or perhaps provide an explanation as to the behavior Im witnessing?
>
> Yadid
>
>
>


Re: laziness in textFile reading from HDFS?

2015-10-06 Thread Jonathan Coveney
LZO files are not splittable by default but there are projects with Input
and Output formats to make splittable LZO files. Check out twitter's
elephantbird on GitHub

El miércoles, 7 de octubre de 2015, Mohammed Guller 
escribió:

> It is not uncommon to process datasets larger than available memory with
> Spark.
>
> I don't remember whether LZO files are splittable. Perhaps, in your case
> Spark is running into issues while decompressing a large LZO file.
>
> See if this helps:
>
> http://stackoverflow.com/questions/25248170/spark-hadoop-throws-exception-for-large-lzo-files
>
>
> Mohammed
>
>
> -Original Message-
> From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> Sent: Tuesday, October 6, 2015 4:08 PM
> To: Mohammed Guller
> Cc: davidkl; user@spark.apache.org 
> Subject: Re: laziness in textFile reading from HDFS?
>
> Agreed. This is spark 1.2 on CDH5.x. How do you mitigate where the data
> sets are larger than available memory?
>
> My jobs stall and gc/heap issues all over the place.
>
> ..via mobile
>
> > On Oct 6, 2015, at 4:44 PM, Mohammed Guller  > wrote:
> >
> > I have not used LZO compressed files from Spark, so not sure why it
> stalls without caching.
> >
> > In general, if you are going to make just one pass over the data, there
> is not much benefit in caching it. The data gets read anyway only after the
> first action is called. If you are calling just a map operation and then a
> save operation, I don't see how caching would help.
> >
> > Mohammed
> >
> >
> > -Original Message-
> > From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> > Sent: Tuesday, October 6, 2015 3:32 PM
> > To: Mohammed Guller
> > Cc: davidkl; user@spark.apache.org 
> > Subject: Re: laziness in textFile reading from HDFS?
> >
> > One.
> >
> > I read in LZO compressed files from HDFS Perform a map operation cache
> the results of this map operation call saveAsHadoopFile to write LZO back
> to HDFS.
> >
> > Without the cache, the job will stall.
> >
> > mn
> >
> >> On Oct 5, 2015, at 7:25 PM, Mohammed Guller  > wrote:
> >>
> >> Is there any specific reason for caching the RDD? How many passes you
> make over the dataset?
> >>
> >> Mohammed
> >>
> >> -Original Message-
> >> From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> >> Sent: Saturday, October 3, 2015 9:50 PM
> >> To: Mohammed Guller
> >> Cc: davidkl; user@spark.apache.org 
> >> Subject: Re: laziness in textFile reading from HDFS?
> >>
> >> Is there any more information or best practices here?  I have the exact
> same issues when reading large data sets from HDFS (larger than available
> RAM) and I cannot run without setting the RDD persistence level to
> MEMORY_AND_DISK_SER, and using nearly all the cluster resources.
> >>
> >> Should I repartition this RDD to be equal to the number of cores?
> >>
> >> I notice that the job duration on the YARN UI is about 30 minutes
> longer than the Spark UI.  When the job initially starts, there is no tasks
> shown in the Spark UI..?
> >>
> >> All I;m doing is reading records from HDFS text files with sc.textFile,
> and rewriting them back to HDFS grouped by a timestamp.
> >>
> >> Thanks,
> >> mn
> >>
> >>> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  > wrote:
> >>>
> >>> 1) It is not required to have the same amount of memory as data.
> >>> 2) By default the # of partitions are equal to the number of HDFS
> >>> blocks
> >>> 3) Yes, the read operation is lazy
> >>> 4) It is okay to have more number of partitions than number of cores.
> >>>
> >>> Mohammed
> >>>
> >>> -Original Message-
> >>> From: davidkl [mailto:davidkl...@hotmail.com ]
> >>> Sent: Monday, September 28, 2015 1:40 AM
> >>> To: user@spark.apache.org 
> >>> Subject: laziness in textFile reading from HDFS?
> >>>
> >>> Hello,
> >>>
> >>> I need to process a significant amount of data every day, about 4TB.
> This will be processed in batches of about 140GB. The cluster this will be
> running on doesn't have enough memory to hold the dataset at once, so I am
> trying to understand how this works internally.
> >>>
> >>> When using textFile to read an HDFS folder (containing multiple
> files), I understand that the number of partitions created are equal to the
> number of HDFS blocks, correct? Are those created in a lazy way? I mean, if
> the number of blocks/partitions is larger than the number of cores/threads
> the Spark driver was launched with (N), are N partitions created initially
> and then the rest when required? Or are all those partitions created up
> front?
> >>>
> >>> I want to avoid reading the whole data into memory just to spill it
> out to disk if there is no enough memory.
> >>>
> >>> Thanks!
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>> http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-text
> >>> F i le-reading-from-HDFS-tp24837.html Sent from the Apache Spark
> >>> User List mailing list archive at Nabble.com.
> >>>
> >>> --

Re: laziness in textFile reading from HDFS?

2015-10-06 Thread Jonathan Coveney
LZO files are not splittable by default but there are projects with Input
and Output formats to make splittable LZO files. Check out twitter's
elephantbird on GitHub

El miércoles, 7 de octubre de 2015, Mohammed Guller 
escribió:

> It is not uncommon to process datasets larger than available memory with
> Spark.
>
> I don't remember whether LZO files are splittable. Perhaps, in your case
> Spark is running into issues while decompressing a large LZO file.
>
> See if this helps:
>
> http://stackoverflow.com/questions/25248170/spark-hadoop-throws-exception-for-large-lzo-files
>
>
> Mohammed
>
>
> -Original Message-
> From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> Sent: Tuesday, October 6, 2015 4:08 PM
> To: Mohammed Guller
> Cc: davidkl; user@spark.apache.org 
> Subject: Re: laziness in textFile reading from HDFS?
>
> Agreed. This is spark 1.2 on CDH5.x. How do you mitigate where the data
> sets are larger than available memory?
>
> My jobs stall and gc/heap issues all over the place.
>
> ..via mobile
>
> > On Oct 6, 2015, at 4:44 PM, Mohammed Guller  > wrote:
> >
> > I have not used LZO compressed files from Spark, so not sure why it
> stalls without caching.
> >
> > In general, if you are going to make just one pass over the data, there
> is not much benefit in caching it. The data gets read anyway only after the
> first action is called. If you are calling just a map operation and then a
> save operation, I don't see how caching would help.
> >
> > Mohammed
> >
> >
> > -Original Message-
> > From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> > Sent: Tuesday, October 6, 2015 3:32 PM
> > To: Mohammed Guller
> > Cc: davidkl; user@spark.apache.org 
> > Subject: Re: laziness in textFile reading from HDFS?
> >
> > One.
> >
> > I read in LZO compressed files from HDFS Perform a map operation cache
> the results of this map operation call saveAsHadoopFile to write LZO back
> to HDFS.
> >
> > Without the cache, the job will stall.
> >
> > mn
> >
> >> On Oct 5, 2015, at 7:25 PM, Mohammed Guller  > wrote:
> >>
> >> Is there any specific reason for caching the RDD? How many passes you
> make over the dataset?
> >>
> >> Mohammed
> >>
> >> -Original Message-
> >> From: Matt Narrell [mailto:matt.narr...@gmail.com ]
> >> Sent: Saturday, October 3, 2015 9:50 PM
> >> To: Mohammed Guller
> >> Cc: davidkl; user@spark.apache.org 
> >> Subject: Re: laziness in textFile reading from HDFS?
> >>
> >> Is there any more information or best practices here?  I have the exact
> same issues when reading large data sets from HDFS (larger than available
> RAM) and I cannot run without setting the RDD persistence level to
> MEMORY_AND_DISK_SER, and using nearly all the cluster resources.
> >>
> >> Should I repartition this RDD to be equal to the number of cores?
> >>
> >> I notice that the job duration on the YARN UI is about 30 minutes
> longer than the Spark UI.  When the job initially starts, there is no tasks
> shown in the Spark UI..?
> >>
> >> All I;m doing is reading records from HDFS text files with sc.textFile,
> and rewriting them back to HDFS grouped by a timestamp.
> >>
> >> Thanks,
> >> mn
> >>
> >>> On Sep 29, 2015, at 8:06 PM, Mohammed Guller  > wrote:
> >>>
> >>> 1) It is not required to have the same amount of memory as data.
> >>> 2) By default the # of partitions are equal to the number of HDFS
> >>> blocks
> >>> 3) Yes, the read operation is lazy
> >>> 4) It is okay to have more number of partitions than number of cores.
> >>>
> >>> Mohammed
> >>>
> >>> -Original Message-
> >>> From: davidkl [mailto:davidkl...@hotmail.com ]
> >>> Sent: Monday, September 28, 2015 1:40 AM
> >>> To: user@spark.apache.org 
> >>> Subject: laziness in textFile reading from HDFS?
> >>>
> >>> Hello,
> >>>
> >>> I need to process a significant amount of data every day, about 4TB.
> This will be processed in batches of about 140GB. The cluster this will be
> running on doesn't have enough memory to hold the dataset at once, so I am
> trying to understand how this works internally.
> >>>
> >>> When using textFile to read an HDFS folder (containing multiple
> files), I understand that the number of partitions created are equal to the
> number of HDFS blocks, correct? Are those created in a lazy way? I mean, if
> the number of blocks/partitions is larger than the number of cores/threads
> the Spark driver was launched with (N), are N partitions created initially
> and then the rest when required? Or are all those partitions created up
> front?
> >>>
> >>> I want to avoid reading the whole data into memory just to spill it
> out to disk if there is no enough memory.
> >>>
> >>> Thanks!
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>> http://apache-spark-user-list.1001560.n3.nabble.com/laziness-in-text
> >>> F i le-reading-from-HDFS-tp24837.html Sent from the Apache Spark
> >>> User List mailing list archive at Nabble.com.
> >>>
> >>> --

Re: RDD of ImmutableList

2015-10-06 Thread Jonathan Coveney
Nobody is saying not to use immutable data structures, only that guava's
aren't natively supported.

Scala's default collections library is all immutable. list, Vector, Map.
This is what people generally use, especially in scala code!

El martes, 6 de octubre de 2015, Jakub Dubovsky <
spark.dubovsky.ja...@seznam.cz> escribió:

> Thank you for quick reaction.
>
> I have to say this is very surprising to me. I never received an advice to
> stop using an immutable approach. Whole RDD is designed to be immutable
> (which is sort of sabotaged by not being able to (de)serialize immutable
> classes properly). I will ask on dev list if this is to be changed or not.
>
> Ok, I have let go initial feelings and now let's be pragmatic. And this is
> still for everyone not just Igor:
>
> I use a class from a library which is immutable. Now I want to use this
> class to represent my data in RDD because this saves me a huge amount of
> work. The class uses ImmutableList as one of its fields. That's why it
> fails. But isn't there a way to workaround this? I ask this because I
> have exactly zero knowledge about kryo and the way how it works. So for
> example would some of these two work?
>
> 1) Change the external class so that it implements writeObject, readObject
> methods (it's java). Will these methods be used by kryo? (I can ask the
> maintainers of a library to change the class if the change is reasonable.
> Adding these methods would be while dropping immutability certainly
> wouldn't)
>
> 2) Wrap the class to scala class which would translate the data during
> (de)serialization?
>
>   Thanks!
>   Jakub Dubovsky
>
> -- Původní zpráva --
> Od: Igor Berman  >
> Komu: Jakub Dubovsky  >
> Datum: 5. 10. 2015 20:11:35
> Předmět: Re: RDD of ImmutableList
>
> kryo doesn't support guava's collections by default
> I remember encountered project in github that fixes this(not sure though).
> I've ended to stop using guava collections as soon as spark rdds are
> concerned.
>
> On 5 October 2015 at 21:04, Jakub Dubovsky  > wrote:
>
> Hi all,
>
>   I would like to have an advice on how to use ImmutableList with RDD. Small
> presentation of an essence of my problem in spark-shell with guava jar
> added:
>
> scala> import com.google.common.collect.ImmutableList
> import com.google.common.collect.ImmutableList
>
> scala> val arr = Array(ImmutableList.of(1,2), ImmutableList.of(2,4),
> ImmutableList.of(3,6))
> arr: Array[com.google.common.collect.ImmutableList[Int]] = Array([1, 2],
> [2, 4], [3, 6])
>
> scala> val rdd = sc.parallelize(arr)
> rdd:
> org.apache.spark.rdd.RDD[com.google.common.collect.ImmutableList[Int]] =
> ParallelCollectionRDD[0] at parallelize at :24
>
> scala> rdd.count
>
>  This results in kryo exception saying that it cannot add a new element to
> list instance while deserialization:
>
> java.io.IOException: java.lang.UnsupportedOperationException
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
> at
> org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
> ...
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException
> at
> com.google.common.collect.ImmutableCollection.add(ImmutableCollection.java:91)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
> ...
>
>   It somehow makes sense. But I cannot think of a workaround and I do not
> believe that using ImmutableList with RDD is not possible. How this is
> solved?
>
>   Thank you in advance!
>
>Jakub Dubovsky
>
>
>


Re: does KafkaCluster can be public ?

2015-10-06 Thread Jonathan Coveney
You can put a class in the org.apache.spark namespace to access anything
that is private[spark]. You can then make enrichments there to access
whatever you need. Just beware upgrade pain :)

El martes, 6 de octubre de 2015, Erwan ALLAIN 
escribió:

> Hello,
>
> I'm currently testing spark streaming with kafka.
> I'm creating DirectStream with KafkaUtils and everything's fine. However I
> would like to use the signature where I can specify my own message handler
> (to play with partition and offset). In this case, I need to manage
> offset/partition by myself to fill fromOffsets argument.
> I have found a Jira on this usecase
> https://issues.apache.org/jira/browse/SPARK-6714 but it has been closed
> telling that it's too specific.
> I'm aware that it can be done using kafka api (TopicMetaDataRequest and
> OffsetRequest) but what I have to do is almost the same as the KafkaCluster
> which is private.
>
> is it possible to :
>  - add another signature in KafkaUtils ?
>  - make KafkaCluster public ?
>
> or do you have any other srmart solution where I don't need to copy/paste
> KafkaCluster ?
>
> Thanks.
>
> Regards,
> Erwan ALLAIN
>


Re: Apache Spark job in local[*] is slower than regular 1-thread Python program

2015-09-22 Thread Jonathan Coveney
It's highly conceivable to be able to beat spark in performance on tiny
data sets like this. That's not really what it has been optimized for.

El martes, 22 de septiembre de 2015, juljoin 
escribió:

> Hello,
>
> I am trying to figure Spark out and I still have some problems with its
> speed, I can't figure them out. In short, I wrote two programs that loop
> through a 3.8Gb file and filter each line depending of if a certain word is
> present.
>
> I wrote a one-thread python program doing the job and I obtain:
> - for the 3.8Gb file:
> / lines found: 82100
>  in: *10.54 seconds*/
>  - no filter, just looping through the file:
> / in: 01.65 seconds/
>
> The Spark app doing the same and executed on 8 threads gives:
>  - for the 3.8Gb file:
> / lines found: 82100
>  in: *18.27 seconds*/
>  - for a 38Mb file:
> /lines found: 821
> in: 2.53 seconds/
>
> I must do something wrong to obtain a result twice as slow on the 8 threads
> than on 1 thread.
>
> 1. First, I thought it might be because of the setting-up cost of Spark.
> But
> for smaller files it only takes 2 seconds which makes this option
> improbable.
> 2. Looping through the file takes up 1.65 seconds (thank you SSD ^_^ ),
> processing takes up the other 9seconds (for the python app).
> -> This is why I thought splitting it up on the different processes will
> definitely speed it up.
>
> Note: Increasing the number of threads in Spark improves the speed (from 57
> seconds with 1 thread to 18 seconds with 8 threads). But still, there is a
> big difference in performance between simple python and Spark, it must be
> my
> doing!
>
> Can someone point me out on what I am doing wrong? That would be greatly
> appreciated :) I am new with all this big data stuff.
>
>
>
> *Here is the code for the Spark app:*
>
>
>
>
>
> *And the python code:*
>
>
>
> Thank you for reading up to this point :)
>
> Have a nice day!
>
>
> - Julien
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-job-in-local-is-slower-than-regular-1-thread-Python-program-tp24771.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
>
>


Re: spark-avro takes a lot time to load thousands of files

2015-09-22 Thread Jonathan Coveney
having a file per record is pretty inefficient on almost any file system

El martes, 22 de septiembre de 2015, Daniel Haviv <
daniel.ha...@veracity-group.com> escribió:

> Hi,
> We are trying to load around 10k avro files (each file holds only one
> record) using spark-avro but it takes over 15 minutes to load.
> It seems that most of the work is being done at the driver where it
> created a broadcast variable for each file.
>
> Any idea why is it behaving that way ?
> Thank you.
> Daniel
>


Re: Java vs. Scala for Spark

2015-09-08 Thread Jonathan Coveney
It worked for Twitter!

Seriously though: scala is much much more pleasant. And scala has a great
story for using Java libs. And since spark is kind of framework-y (use its
scripts to submit, start up repl, etc) the projects tend to be lead
projects, so even in a big company that uses Java the cost of scala is low
and fairly isolated. If you need to write large amounts of supporting
libraries, you are free to use Java or scala as you see fit.

El martes, 8 de septiembre de 2015, Bryan Jeffrey 
escribió:

> All,
>
> We're looking at language choice in developing a simple streaming
> processing application in spark.  We've got a small set of example code
> built in Scala.  Articles like the following:
> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
> would seem to indicate that Scala is great for use in distributed
> programming (including Spark).  However, there is a large group of folks
> that seem to feel that interoperability with other Java libraries is much
> to be desired, and that the cost of learning (yet another) language is
> quite high.
>
> Has anyone looked at Scala for Spark dev in an enterprise environment?
> What was the outcome?
>
> Regards,
>
> Bryan Jeffrey
>


Re: Spark 1.4 RDD to DF fails with toDF()

2015-09-07 Thread Jonathan Coveney
How are you building and running it?

El lunes, 7 de septiembre de 2015, Gheorghe Postelnicu <
gheorghe.posteln...@gmail.com> escribió:

> Interesting idea. Tried that, didn't work. Here is my new SBT file:
>
> name := """testMain"""
>
> scalaVersion := "2.11.6"
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.4.1" % "provided",
>   "org.apache.spark" %% "spark-sql" % "1.4.1" % "provided",
>   "org.scala-lang" % "scala-reflect" % "2.11.6"
> )
>
>
> On Mon, Sep 7, 2015 at 9:55 PM, Jonathan Coveney  > wrote:
>
>> Try adding the following to your build.sbt
>>
>> libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.11.6"
>>
>>
>> I believe that spark shades the scala library, and this is a library that it 
>> looks like you need in an unshaded way.
>>
>>
>> 2015-09-07 16:48 GMT-04:00 Gheorghe Postelnicu <
>> gheorghe.posteln...@gmail.com
>> >:
>>
>>> Hi,
>>>
>>> The following code fails when compiled from SBT:
>>>
>>> package main.scala
>>>
>>> import org.apache.spark.SparkContext
>>> import org.apache.spark.sql.SQLContext
>>>
>>> object TestMain {
>>>   def main(args: Array[String]): Unit = {
>>> implicit val sparkContext = new SparkContext()
>>> val sqlContext = new SQLContext(sparkContext)
>>> import sqlContext.implicits._
>>> sparkContext.parallelize(1 to 10).map(i => (i,
>>> i.toString)).toDF("intCol", "strCol")
>>>   }
>>> }
>>>
>>> with the following error:
>>>
>>> 15/09/07 21:39:21 INFO BlockManagerMaster: Registered BlockManager
>>> Exception in thread "main" java.lang.NoSuchMethodError:
>>> scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
>>> at main.scala.Bof$.main(Bof.scala:14)
>>> at main.scala.Bof.main(Bof.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> 15/09/07 21:39:22 INFO SparkContext: Invoking stop() from shutdown hook
>>>
>>> whereas the code above works in a spark shell.
>>>
>>> The code is compiled using Scala 2.11.6 and precompiled Spark 1.4.1
>>>
>>> Any suggestion on how to fix this would be much appreciated.
>>>
>>> Best,
>>> Gheorghe
>>>
>>>
>>
>


Re: Spark 1.4 RDD to DF fails with toDF()

2015-09-07 Thread Jonathan Coveney
Try adding the following to your build.sbt

libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.11.6"


I believe that spark shades the scala library, and this is a library
that it looks like you need in an unshaded way.


2015-09-07 16:48 GMT-04:00 Gheorghe Postelnicu <
gheorghe.posteln...@gmail.com>:

> Hi,
>
> The following code fails when compiled from SBT:
>
> package main.scala
>
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
>
> object TestMain {
>   def main(args: Array[String]): Unit = {
> implicit val sparkContext = new SparkContext()
> val sqlContext = new SQLContext(sparkContext)
> import sqlContext.implicits._
> sparkContext.parallelize(1 to 10).map(i => (i,
> i.toString)).toDF("intCol", "strCol")
>   }
> }
>
> with the following error:
>
> 15/09/07 21:39:21 INFO BlockManagerMaster: Registered BlockManager
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
> at main.scala.Bof$.main(Bof.scala:14)
> at main.scala.Bof.main(Bof.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 15/09/07 21:39:22 INFO SparkContext: Invoking stop() from shutdown hook
>
> whereas the code above works in a spark shell.
>
> The code is compiled using Scala 2.11.6 and precompiled Spark 1.4.1
>
> Any suggestion on how to fix this would be much appreciated.
>
> Best,
> Gheorghe
>
>


Re: extracting file path using dataframes

2015-09-01 Thread Jonathan Coveney
You can make a Hadoop input format which passes through the name of the
file. I generally find it easier to just hit Hadoop, get the file names,
and construct the RDDs though

El martes, 1 de septiembre de 2015, Matt K  escribió:

> Just want to add - I'm looking to partition the resulting Parquet files by
> customer-id, which is why I'm looking to extract the customer-id from the
> path.
>
> On Tue, Sep 1, 2015 at 7:00 PM, Matt K  > wrote:
>
>> Hi all,
>>
>> TL;DR - is there a way to extract the source path from an RDD via the
>> Scala API?
>>
>> I have sequence files on S3 that look something like this:
>> s3://data/customer=123/...
>> s3://data/customer=456/...
>>
>> I am using Spark Dataframes to convert these sequence files to Parquet.
>> As part of the processing, I actually need to know the customer-id. I'm
>> doing something like this:
>>
>> val rdd = sql.sparkContext.sequenceFile("s3://data/customer=*/*", 
>> classOf[BytesWritable],
>> classOf[Text])
>>
>> val rowRdd = rdd.map(x => convertTextRowToTypedRdd(x._2, schema,
>> delimiter))
>>
>> val dataFrame = sql.createDataFrame(rowRdd, schema)
>>
>>
>> What I am trying to figure out is how to get the customer-id, which is
>> part of the path. I am not sure if there's a way to extract the source path
>> from the resulting HadoopRDD. Do I need to create one RDD per customer to
>> get around this?
>>
>>
>> Thanks,
>>
>> -Matt
>>
>
>
>
> --
> www.calcmachine.com - easy online calculator.
>


Re: types allowed for saveasobjectfile?

2015-08-27 Thread Jonathan Coveney
array[String] doesn't pretty print by default. Use .mkString(",") for
example

El jueves, 27 de agosto de 2015, Arun Luthra 
escribió:

> What types of RDD can saveAsObjectFile(path) handle? I tried a naive test
> with an RDD[Array[String]], but when I tried to read back the result with
> sc.objectFile(path).take(5).foreach(println), I got a non-promising output
> looking like:
>
> [Ljava.lang.String;@46123a
> [Ljava.lang.String;@76123b
> [Ljava.lang.String;@13144c
> [Ljava.lang.String;@75146d
> [Ljava.lang.String;@79118f
>
>
> Arun
>


Re: spark and scala-2.11

2015-08-24 Thread Jonathan Coveney
I've used the instructions and it worked fine.

Can you post exactly what you're doing, and what it fails with? Or are you
just trying to understand how it works?

2015-08-24 15:48 GMT-04:00 Lanny Ripple :

> Hello,
>
> The instructions for building spark against scala-2.11 indicate using
> -Dspark-2.11.  When I look in the pom.xml I find a profile named
> 'spark-2.11' but nothing that would indicate I should set a property.  The
> sbt build seems to need the -Dscala-2.11 property set.  Finally build/mvn
> does a simple grep of scala.version (which doesn't change after running dev/
> change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4
> scala library.
>
> Anyone know (from having done it and used it in production) if the build
> instructions for spark-1.4.1 against Scala-2.11 are correct?
>
> Thanks.
>   -Lanny
>


Re: How to set log level in spark-submit ?

2015-07-29 Thread Jonathan Coveney
Put a log4j.properties file in conf/. You can copy
log4j.properties.template as a good base

El miércoles, 29 de julio de 2015, canan chen  escribió:

> Anyone know how to set log level in spark-submit ? Thanks
>


Re: broadcast variable question

2015-07-28 Thread Jonathan Coveney
That's great! Thanks

El martes, 28 de julio de 2015, Ted Yu  escribió:

> If I understand correctly, there would be one value in the executor.
>
> Cheers
>
> On Tue, Jul 28, 2015 at 4:23 PM, Jonathan Coveney  > wrote:
>
>> i am running in coarse grained mode, let's say with 8 cores per executor.
>>
>> If I use a broadcast variable, will all of the tasks in that executor
>> share the same value? Or will each task broadcast its own value ie in this
>> case, would there be one value in the executor shared by the 8 tasks, or
>> would there be eight copies, 1 per task?
>>
>
>


broadcast variable question

2015-07-28 Thread Jonathan Coveney
i am running in coarse grained mode, let's say with 8 cores per executor.

If I use a broadcast variable, will all of the tasks in that executor share
the same value? Or will each task broadcast its own value ie in this case,
would there be one value in the executor shared by the 8 tasks, or would
there be eight copies, 1 per task?


--jars not working?

2015-06-12 Thread Jonathan Coveney
Spark version is 1.3.0 (will upgrade as soon as we upgrade past mesos
0.19.0)...

Regardless, I'm running into a really weird situation where when I pass
--jars to bin/spark-shell I can't reference those classes on the repl. Is
this expected? The logs even tell me that my jars have been added, and yet
the classes inside of them are not available.

Am I missing something obvious?


Re: Partition Case Class RDD without ParRDDFunctions

2015-05-07 Thread Jonathan Coveney
what about .groupBy doesn't work for you?

2015-05-07 8:17 GMT-04:00 Night Wolf :

> MyClass is a basic scala case class (using Spark 1.3.1);
>
> case class Result(crn: Long, pid: Int, promoWk: Int, windowKey: Int, ipi: 
> Double) {
>   override def hashCode(): Int = crn.hashCode()
> }
>
>
> On Wed, May 6, 2015 at 8:09 PM, ayan guha  wrote:
>
>> How does your MyClqss looks like? I was experimenting with Row class in
>> python and apparently partitionby automatically takes first column as key.
>> However, I am not sure how you can access a part of an object without
>> deserializing it (either explicitly or Spark doing it for you)
>>
>> On Wed, May 6, 2015 at 7:14 PM, Night Wolf 
>> wrote:
>>
>>> Hi,
>>>
>>> If I have an RDD[MyClass] and I want to partition it by the hash code of
>>> MyClass for performance reasons, is there any way to do this without
>>> converting it into a PairRDD RDD[(K,V)] and calling partitionBy???
>>>
>>> Mapping it to a tuple2 seems like a waste of space/computation.
>>>
>>> It looks like the PairRDDFunctions..partitionBy() uses a
>>> ShuffleRDD[K,V,C] requires K,V,C? Could I create a new
>>> ShuffleRDD[MyClass,MyClass,MyClass](caseClassRdd, new HashParitioner)?
>>>
>>> Cheers,
>>> N
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Avro to Parquet ?

2015-05-07 Thread Jonathan Coveney
A helpful example of how to convert:
http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/

As far as performance, that depends on your data. If you have a lot of
columns and use all of them, parquet deserialization is expensive. If you
have a column and only need a few (or have some filters you can push down),
the savings can be huge.

2015-05-07 11:29 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :

> 1) What is the best way to convert data from Avro to Parquet so that it
> can be later read and processed ?
>
> 2) Will the performance of processing (join, reduceByKey) be better if
> both datasets are in Parquet format when compared to Avro + Sequence ?
>
> --
> Deepak
>
>


Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0

2015-05-06 Thread Jonathan Coveney
Can you check your local and remote logs?

2015-05-06 16:24 GMT-04:00 Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com>:

>  This problem happen in Spark 1.3.1.  It happen when two jobs are running
> simultaneously each in its own Spark Context.
>
>
>
> I don’t remember seeing this bug in Spark 1.2.0. Is it a new bug
> introduced in Spark 1.3.1?
>
>
>
> Ningjun
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Wednesday, May 06, 2015 11:32 AM
> *To:* Wang, Ningjun (LNG-NPV)
> *Cc:* user@spark.apache.org
> *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
> Failed to get broadcast_2_piece0
>
>
>
> Which release of Spark are you using ?
>
>
>
> Thanks
>
>
> On May 6, 2015, at 8:03 AM, Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com> wrote:
>
>  I run a job on spark standalone cluster and got the exception below
>
>
>
> Here is the line of code that cause problem
>
>
>
> *val *myRdd: RDD[(String, String, String)] = … *// RDD of (docid,
> cattegory, path) *
>
>
> myRdd.persist(StorageLevel.*MEMORY_AND_DISK_SER*)
>
> *val *cats: Array[String] = myRdd.map(t => t._2).distinct().collect()  //
> This line cause the exception
>
>
>
>
>
> 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
> org.apache.spark.SparkException:
> Failed to get broadcast_2_piece0 of broadcast_2
>
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_2_piece0 of broadcast_2
>
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBr
>
> oadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBr
>
> oadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
>
> at scala.Option.getOrElse(Option.scala:120)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBr
>
> oadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBr
>
> oadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBr
>
> oadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
>
> at scala.collection.immutable.List.foreach(List.scala:318)
>
> at org.apache.spark.broadcast.TorrentBroadcast.org
> $apache$spark$broadcast$TorrentBroadcast$$
>
> readBlocks(TorrentBroadcast.scala:119)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBr
>
> oadcast.scala:174)
>
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
>
> ... 12 more
>
>
>
>
>
> Any idea what cause the problem and how to avoid it?
>
>
>
> Thanks
>
> Ningjun
>
>
>
>


Re: Number of files to load

2015-05-05 Thread Jonathan Coveney
You should check out parquet.

If you can avoid 5minute log files, you can have an hourly (or daily!) MR
job that compacts these. Another nice thing about parquet is it has filter
push down so if you want a smaller range of time you can avoid
deserializing most of the other data

El martes, 5 de mayo de 2015, Rendy Bambang Junior 
escribió:

> Thanks, Im not aware of splittable file formats.
>
> If that is the case, is number of files affect spark performance? Maybe
> because overhead when opening file?  And that problem is solved by having a
> big sized files in splittable file format?
>
> Any suggestion from your experience how to organize data in splittable
> file format on HDFS for Spark?
>
> Rendy
> On May 6, 2015 1:03 AM, "Jonathan Coveney"  > wrote:
>
>> "As per my understanding, storing 5minutes file means we could not
>> create RDD more granular than 5minutes."
>>
>> This depends on the file format. Many file formats are splittable (like
>> parquet), meaning that you can seek into various points of the file.
>>
>> 2015-05-05 12:45 GMT-04:00 Rendy Bambang Junior > >:
>>
>>> Let say I am storing my data in HDFS with folder structure and file
>>> partitioning as per below:
>>> /analytics/2015/05/02/partition-2015-05-02-13-50-
>>> Note that new file is created every 5 minutes.
>>>
>>> As per my understanding, storing 5minutes file means we could not create
>>> RDD more granular than 5minutes.
>>>
>>> In the other hand, when we want to aggregate monthly data, number of
>>> file will be enormous (around 84000 files).
>>>
>>> My question is, what are the consideration to say that the number of
>>> file to be loaded to RDD is just 'too many'? Is 84000 'too many' files?
>>>
>>> One thing that comes to my mind is overhead when spark try to open file,
>>> however Im not sure whether it is a valid concern.
>>>
>>> Rendy
>>>
>>
>>


Re: Number of files to load

2015-05-05 Thread Jonathan Coveney
"As per my understanding, storing 5minutes file means we could not create
RDD more granular than 5minutes."

This depends on the file format. Many file formats are splittable (like
parquet), meaning that you can seek into various points of the file.

2015-05-05 12:45 GMT-04:00 Rendy Bambang Junior :

> Let say I am storing my data in HDFS with folder structure and file
> partitioning as per below:
> /analytics/2015/05/02/partition-2015-05-02-13-50-
> Note that new file is created every 5 minutes.
>
> As per my understanding, storing 5minutes file means we could not create
> RDD more granular than 5minutes.
>
> In the other hand, when we want to aggregate monthly data, number of file
> will be enormous (around 84000 files).
>
> My question is, what are the consideration to say that the number of file
> to be loaded to RDD is just 'too many'? Is 84000 'too many' files?
>
> One thing that comes to my mind is overhead when spark try to open file,
> however Im not sure whether it is a valid concern.
>
> Rendy
>


Re: Configuring amount of disk space available to spark executors in mesos?

2015-04-13 Thread Jonathan Coveney
Nothing so complicated... we are seeing mesos kill off our executors
immediately. When I reroute logging to an NFS directory we have available,
the executors survive fine. As such I am wondering if the spark workers are
getting killed by mesos for exceeding their disk quota (which atm is 0).
This could be a red herring, however.

2015-04-13 15:41 GMT-04:00 Patrick Wendell :

> Hey Jonathan,
>
> Are you referring to disk space used for storing persisted RDD's? For
> that, Spark does not bound the amount of data persisted to disk. It's
> a similar story to how Spark's shuffle disk output works (and also
> Hadoop and other frameworks make this assumption as well for their
> shuffle data, AFAIK).
>
> We could (in theory) add a storage level that bounds the amount of
> data persisted to disk and forces re-computation if the partition did
> not fit. I'd be interested to hear more about a workload where that's
> relevant though, before going that route. Maybe if people are using
> SSD's that would make sense.
>
> - Patrick
>
> On Mon, Apr 13, 2015 at 8:19 AM, Jonathan Coveney 
> wrote:
> > I'm surprised that I haven't been able to find this via google, but I
> > haven't...
> >
> > What is the setting that requests some amount of disk space for the
> > executors? Maybe I'm misunderstanding how this is configured...
> >
> > Thanks for any help!
>


Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread Jonathan Coveney
I'm not 100% sure of spark's implementation but in the MR frameworks, it
would have a much larger shuffle write size becasue that node is dealing
with a lot more data and as a result has a lot  more to shuffle

2015-04-13 13:20 GMT-04:00 java8964 :

> If it is really due to data skew, will the task hanging has much bigger 
> Shuffle
> Write Size in this case?
>
> In this case, the shuffle write size for that task is 0, and the rest IO
> of this task is not much larger than the fast finished tasks, is that
> normal?
>
> I am also interested in this case, as from statistics on the UI, how it
> indicates the task could have skew data?
>
> Yong
>
> --
> Date: Mon, 13 Apr 2015 12:58:12 -0400
> Subject: Re: Equi Join is taking for ever. 1 Task is Running while other
> 199 are complete
> From: jcove...@gmail.com
> To: deepuj...@gmail.com
> CC: user@spark.apache.org
>
>
> I can promise you that this is also a problem in the pig world :) not sure
> why it's not a problem for this data set, though... are you sure that the
> two are doing the exact same code?
>
> you should inspect your source data. Make a histogram for each and see
> what the data distribution looks like. If there is a value or bucket with a
> disproportionate set of values you know you have an issue
>
> 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
>
> You mean there is a tuple in either RDD, that has itemID = 0 or null ?
> And what is catch all ?
>
> That implies is it a good idea to run a filter on each RDD first ? We do
> not do this using Pig on M/R. Is it required in Spark world ?
>
> On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney 
> wrote:
>
> My guess would be data skew. Do you know if there is some item id that is
> a catch all? can it be null? item id 0? lots of data sets have this sort of
> value and it always kills joins
>
> 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
>
> Code:
>
> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
> Long))] = lstgItem.join(viEvents).map {
>   case (itemId, (listing, viDetail)) =>
> val viSummary = new VISummary
> viSummary.leafCategoryId = listing.getLeafCategId().toInt
> viSummary.itemSiteId = listing.getItemSiteId().toInt
> viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
> viSummary.sellerCountryId = listing.getSlrCntryId().toInt
> viSummary.buyerSegment = "0"
> viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue()
> > 0) 1 else 0)
> val sellerId = listing.getSlrId.toLong
> (sellerId, (viDetail, viSummary, itemId))
> }
>
> Running Tasks:
> Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
> RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
> PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7
> h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0
> SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
> 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
> MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
> phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3 min
> 2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0 SUCCESS
> PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15
> min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4 MB
>
>
>
> Command:
> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
> --jars
> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>  --num-executors 3000 --driver-memory 12g --driver-java-options
> "-XX:MaxPermSize=6G" --executor-memory 12g --executor-cores 1 --queue
> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
> startDate=2015-04-6 endDate=2015-04-7
> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
> maxbuffersize=1068 maxResultSize=2G
>
>
> What do i do ? I killed the job twice and its stuck again. Where is it
> stuck ?
>
> --
> Deepak
>
>
>
>
>
> --
> Deepak
>
>
>


Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread Jonathan Coveney
I can promise you that this is also a problem in the pig world :) not sure
why it's not a problem for this data set, though... are you sure that the
two are doing the exact same code?

you should inspect your source data. Make a histogram for each and see what
the data distribution looks like. If there is a value or bucket with a
disproportionate set of values you know you have an issue

2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :

> You mean there is a tuple in either RDD, that has itemID = 0 or null ?
> And what is catch all ?
>
> That implies is it a good idea to run a filter on each RDD first ? We do
> not do this using Pig on M/R. Is it required in Spark world ?
>
> On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney 
> wrote:
>
>> My guess would be data skew. Do you know if there is some item id that is
>> a catch all? can it be null? item id 0? lots of data sets have this sort of
>> value and it always kills joins
>>
>> 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
>>
>> Code:
>>>
>>> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
>>> Long))] = lstgItem.join(viEvents).map {
>>>   case (itemId, (listing, viDetail)) =>
>>> val viSummary = new VISummary
>>> viSummary.leafCategoryId = listing.getLeafCategId().toInt
>>> viSummary.itemSiteId = listing.getItemSiteId().toInt
>>> viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
>>> viSummary.sellerCountryId = listing.getSlrCntryId().toInt
>>> viSummary.buyerSegment = "0"
>>> viSummary.isBin = (if
>>> (listing.getBinPriceLstgCurncy.doubleValue() > 0) 1 else 0)
>>> val sellerId = listing.getSlrId.toLong
>>> (sellerId, (viDetail, viSummary, itemId))
>>> }
>>>
>>> Running Tasks:
>>> Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
>>> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size
>>> / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0
>>> RUNNING PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13
>>> 06:43:53 1.7 h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB
>>> 2 218 0 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
>>> 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
>>> MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
>>> phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3
>>> min  2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0
>>> SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13
>>> 06:43:53 15 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4
>>> MB
>>>
>>>
>>>
>>> Command:
>>> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>> --jars
>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>>>  --num-executors 3000 --driver-memory 12g --driver-java-options
>>> "-XX:MaxPermSize=6G" --executor-memory 12g --executor-cores 1 --queue
>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>>> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
>>> startDate=2015-04-6 endDate=2015-04-7
>>> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>> maxbuffersize=1068 maxResultSize=2G
>>>
>>>
>>> What do i do ? I killed the job twice and its stuck again. Where is it
>>> stuck ?
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>


Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-13 Thread Jonathan Coveney
My guess would be data skew. Do you know if there is some item id that is a
catch all? can it be null? item id 0? lots of data sets have this sort of
value and it always kills joins

2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :

> Code:
>
> val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
> Long))] = lstgItem.join(viEvents).map {
>   case (itemId, (listing, viDetail)) =>
> val viSummary = new VISummary
> viSummary.leafCategoryId = listing.getLeafCategId().toInt
> viSummary.itemSiteId = listing.getItemSiteId().toInt
> viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
> viSummary.sellerCountryId = listing.getSlrCntryId().toInt
> viSummary.buyerSegment = "0"
> viSummary.isBin = (if (listing.getBinPriceLstgCurncy.doubleValue()
> > 0) 1 else 0)
> val sellerId = listing.getSlrId.toLong
> (sellerId, (viDetail, viSummary, itemId))
> }
>
> Running Tasks:
> Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
> DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
> RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
> PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7
> h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0
> SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
> 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
> MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
> phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3 min
> 2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0 SUCCESS
> PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13 06:43:53 15
> min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4 MB
>
>
>
> Command:
> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
> --jars
> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
>  --num-executors 3000 --driver-memory 12g --driver-java-options
> "-XX:MaxPermSize=6G" --executor-memory 12g --executor-cores 1 --queue
> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
> /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar
> startDate=2015-04-6 endDate=2015-04-7
> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
> maxbuffersize=1068 maxResultSize=2G
>
>
> What do i do ? I killed the job twice and its stuck again. Where is it
> stuck ?
>
> --
> Deepak
>
>


What's the cleanest way to make spark aware of my custom scheduler?

2015-04-13 Thread Jonathan Coveney
I need to have my own scheduler to point to a proprietary remote execution
framework.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2152

I'm looking at where it decides on the backend and it doesn't look like
there is a hook. Of course I can extend sparkContext and add my own, but
that seems sort of lame. Wondering if people know of a better way (or maybe
I'm just missing something obvious)


Configuring amount of disk space available to spark executors in mesos?

2015-04-13 Thread Jonathan Coveney
I'm surprised that I haven't been able to find this via google, but I
haven't...

What is the setting that requests some amount of disk space for the
executors? Maybe I'm misunderstanding how this is configured...

Thanks for any help!


Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Jonathan Coveney
This is just a deficiency of the api, imo. I agree: mapValues could
definitely be a function (K, V)=>V1. The option isn't set by the function,
it's on the RDD. So you could look at the code and do this.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala

 def mapValues[U](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
  (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
  preservesPartitioning = true)
  }

What you want:

 def mapValues[U](f: (K, V) => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
  (context, pid, iter) => iter.map { case t@(k, _) => (k, cleanF(t)) },
  preservesPartitioning = true)
  }

One of the nice things about spark is that making such new operators is
very easy :)

2015-03-26 17:54 GMT-04:00 Zhan Zhang :

>  Thanks Jonathan. You are right regarding rewrite the example.
>
>  I mean providing such option to developer so that it is controllable.
> The example may seems silly, and I don’t know the use cases.
>
> But for example, if I also want to operate both the key and value part to
> generate some new value with keeping key part untouched. Then mapValues may
> not be able to  do this.
>
>  Changing the code to allow this is trivial, but I don’t know whether
> there is some special reason behind this.
>
>  Thanks.
>
>  Zhan Zhang
>
>
>
>
>  On Mar 26, 2015, at 2:49 PM, Jonathan Coveney  wrote:
>
>  I believe if you do the following:
>
>
> sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString
>
>  (8) MapPartitionsRDD[34] at reduceByKey at :23 []
>  |  MapPartitionsRDD[33] at mapValues at :23 []
>  |  ShuffledRDD[32] at reduceByKey at :23 []
>  +-(8) MapPartitionsRDD[31] at map at :23 []
> |  ParallelCollectionRDD[30] at parallelize at :23 []
>
>  The difference is that spark has no way to know that your map closure
> doesn't change the key. if you only use mapValues, it does. Pretty cool
> that they optimized that :)
>
> 2015-03-26 17:44 GMT-04:00 Zhan Zhang :
>
>> Hi Folks,
>>
>> Does anybody know what is the reason not allowing preserverPartitioning
>> in RDD.map? Do I miss something here?
>>
>> Following example involves two shuffles. I think if preservePartitioning
>> is allowed, we can avoid the second one, right?
>>
>>  val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
>>  val r2 = r1.map((_, 1))
>>  val r3 = r2.reduceByKey(_+_)
>>  val r4 = r3.map(x=>(x._1, x._2 + 1))
>>  val r5 = r4.reduceByKey(_+_)
>>  r5.collect.foreach(println)
>>
>> scala> r5.toDebugString
>> res2: String =
>> (8) ShuffledRDD[4] at reduceByKey at :29 []
>>  +-(8) MapPartitionsRDD[3] at map at :27 []
>> |  ShuffledRDD[2] at reduceByKey at :25 []
>> +-(8) MapPartitionsRDD[1] at map at :23 []
>>|  ParallelCollectionRDD[0] at parallelize at :21 []
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Jonathan Coveney
I believe if you do the following:

sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

(8) MapPartitionsRDD[34] at reduceByKey at :23 []
 |  MapPartitionsRDD[33] at mapValues at :23 []
 |  ShuffledRDD[32] at reduceByKey at :23 []
 +-(8) MapPartitionsRDD[31] at map at :23 []
|  ParallelCollectionRDD[30] at parallelize at :23 []

The difference is that spark has no way to know that your map closure
doesn't change the key. if you only use mapValues, it does. Pretty cool
that they optimized that :)

2015-03-26 17:44 GMT-04:00 Zhan Zhang :

> Hi Folks,
>
> Does anybody know what is the reason not allowing preserverPartitioning in
> RDD.map? Do I miss something here?
>
> Following example involves two shuffles. I think if preservePartitioning
> is allowed, we can avoid the second one, right?
>
>  val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
>  val r2 = r1.map((_, 1))
>  val r3 = r2.reduceByKey(_+_)
>  val r4 = r3.map(x=>(x._1, x._2 + 1))
>  val r5 = r4.reduceByKey(_+_)
>  r5.collect.foreach(println)
>
> scala> r5.toDebugString
> res2: String =
> (8) ShuffledRDD[4] at reduceByKey at :29 []
>  +-(8) MapPartitionsRDD[3] at map at :27 []
> |  ShuffledRDD[2] at reduceByKey at :25 []
> +-(8) MapPartitionsRDD[1] at map at :23 []
>|  ParallelCollectionRDD[0] at parallelize at :21 []
>
> Thanks.
>
> Zhan Zhang
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


can spark take advantage of ordered data?

2015-03-11 Thread Jonathan Coveney
Hello all,

I am wondering if spark already has support for optimizations on sorted
data and/or if such support could be added (I am comfortable dropping to a
lower level if necessary to implement this, but I'm not sure if it is
possible at all).

Context: we have a number of data sets which are essentially already sorted
on a key. With our current systems, we can take advantage of this to do a
lot of analysis in a very efficient fashion...merges and joins, for
example, can be done very efficiently, as can folds on a secondary key and
so on.

I was wondering if spark would be a fit for implementing these sorts of
optimizations? Obviously it is sort of a niche case, but would this be
achievable? Any pointers on where I should look?