Re: OOM with groupBy + saveAsTextFile

2014-11-01 Thread Reynold Xin
None of your tuning will help here because the problem is actually the way
you are saving the output. If you take a look at the stacktrace, it is
trying to build a single string that is too large for the VM to allocate
memory. The VM is actually not running out of memory, but rather, JVM
cannot support a single String so large.

I suspect this is due to the fact that the value in your key, value pair
after group by is too long (maybe it concatenates every single record). Do
you really want to save the key, value output this way using a text file?
Maybe you can write them out as multiple strings rather than a single super
giant string.




On Sat, Nov 1, 2014 at 9:52 PM, arthur.hk.c...@gmail.com <
arthur.hk.c...@gmail.com> wrote:

>
> Hi,
>
> FYI as follows.  Could you post your heap size settings as well your Spark
> app code?
>
> Regards
> Arthur
>
> 3.1.3 Detail Message: Requested array size exceeds VM limitThe detail
> message Requested array size exceeds VM limit indicates that the
> application (or APIs used by that application) attempted to allocate an
> array that is larger than the heap size. For example, if an application
> attempts to allocate an array of 512MB but the maximum heap size is 256MB
> then OutOfMemoryError will be thrown with the reason Requested array size
> exceeds VM limit. In most cases the problem is either a configuration
> issue (heap size too small), or a bug that results in an application
> attempting to create a huge array, for example, when the number of elements
> in the array are computed using an algorithm that computes an incorrect
> size.”
>
>
>
>
> On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar 
> wrote:
>
> Resurfacing the thread. Oom shouldn't be the norm for a common groupby /
> sort use case in a framework that is leading in sorting bench marks? Or is
> there something fundamentally wrong in the usage?
> On 02-Nov-2014 1:06 am, "Bharath Ravi Kumar"  wrote:
>
>> Hi,
>>
>> I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
>> of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
>> of 1061 keys with values being Iterable> String>>. The job runs on 3 hosts in a standalone setup with each host's
>> executor having 100G RAM and 24 cores dedicated to it. While the groupBy
>> stage completes successfully with ~24GB of shuffle write, the
>> saveAsTextFile fails after repeated retries with each attempt failing due
>> to an out of memory error *[1]*. I understand that a few partitions may
>> be overloaded as a result of the groupBy and I've tried the following
>> config combinations unsuccessfully:
>>
>> 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
>> 1061 paritions and have max cores = 3 so that each key is a "logical"
>> partition (though many partitions will end up on very few hosts), and each
>> host likely runs saveAsTextFile on a single key at a time due to max cores
>> = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.
>>
>> 2) Leave max cores unspecified, set the level of parallelism to 72, and
>> leave number of partitions unspecified (in which case the # input
>> partitions was used, which is 44)
>> Since I do not intend to cache RDD's, I have set
>> spark.storage.memoryFraction=0.2 in both cases.
>>
>> My understanding is that if each host is processing a single logical
>> partition to saveAsTextFile and is reading from other hosts to write out
>> the RDD, it is unlikely that it would run out of memory. My interpretation
>> of the spark tuning guide is that the degree of parallelism has little
>> impact in case (1) above since max cores = number of hosts. Can someone
>> explain why there are still OOM's with 100G being available? On a related
>> note, intuitively (though I haven't read the source), it appears that an
>> entire key-value pair needn't fit into memory of a single host for
>> saveAsTextFile since a single shuffle read from a remote can be written to
>> HDFS before the next remote read is carried out. This way, not all data
>> needs to be collected at the same time.
>>
>> Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
>> tuning guide and even as per Datastax's spark introduction), there may need
>> to be more documentation around the internals of spark to help users take
>> better informed tuning decisions with parallelism, max cores, number
>> partitions and other tunables. Is there any ongoing effort on that front?
>>
>> Thanks,
>> Bharath
>>
>>
>> *[1]* OOM stack trace and logs
>> 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
>> 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
>> size exceeds VM limit
>> java.util.Arrays.copyOf(Arrays.java:3326)
>>
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>>
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>>
>> java.lang.AbstractStringBuilder.append(Abstract

Re: How to correctly extimate the number of partition of a graph in GraphX

2014-11-01 Thread James
Hello,

We get a graph with 100B edges of nearly 800GB in gz format.
We have 80 machines, each one has 60GB memory.
I have not ever seen the program run to completion.

Alcaid

2014-11-02 14:06 GMT+08:00 Ankur Dave :

> How large is your graph, and how much memory does your cluster have?
>
> We don't have a good way to determine the *optimal* number of partitions
> aside from trial and error, but to get the job to at least run to
> completion, it might help to use the MEMORY_AND_DISK storage level and a
> large number of partitions.
>
> Ankur 
>
> On Sat, Nov 1, 2014 at 10:57 PM, James  wrote:
>
>> Hello,
>>
>> I am trying to run Connected Component algorithm on a very big graph. In
>> practice I found that a small number of partition size would lead to OOM,
>> while a large number would cause various time out exceptions. Thus I wonder
>> how to estimate the number of partition of a graph in GraphX?
>>
>> Alcaid
>>
>
>


Re: How to correctly extimate the number of partition of a graph in GraphX

2014-11-01 Thread Ankur Dave
How large is your graph, and how much memory does your cluster have?

We don't have a good way to determine the *optimal* number of partitions
aside from trial and error, but to get the job to at least run to
completion, it might help to use the MEMORY_AND_DISK storage level and a
large number of partitions.

Ankur 

On Sat, Nov 1, 2014 at 10:57 PM, James  wrote:

> Hello,
>
> I am trying to run Connected Component algorithm on a very big graph. In
> practice I found that a small number of partition size would lead to OOM,
> while a large number would cause various time out exceptions. Thus I wonder
> how to estimate the number of partition of a graph in GraphX?
>
> Alcaid
>


How to correctly extimate the number of partition of a graph in GraphX

2014-11-01 Thread James
Hello,

I am trying to run Connected Component algorithm on a very big graph. In
practice I found that a small number of partition size would lead to OOM,
while a large number would cause various time out exceptions. Thus I wonder
how to estimate the number of partition of a graph in GraphX?

Alcaid


Re: OOM with groupBy + saveAsTextFile

2014-11-01 Thread arthur.hk.c...@gmail.com

Hi,

FYI as follows.  Could you post your heap size settings as well your Spark app 
code?

Regards
Arthur

3.1.3 Detail Message: Requested array size exceeds VM limit

The detail message Requested array size exceeds VM limit indicates that the 
application (or APIs used by that application) attempted to allocate an array 
that is larger than the heap size. For example, if an application attempts to 
allocate an array of 512MB but the maximum heap size is 256MB then 
OutOfMemoryError will be thrown with the reason Requested array size exceeds VM 
limit. In most cases the problem is either a configuration issue (heap size too 
small), or a bug that results in an application attempting to create a huge 
array, for example, when the number of elements in the array are computed using 
an algorithm that computes an incorrect size.”




On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar  wrote:

> Resurfacing the thread. Oom shouldn't be the norm for a common groupby / sort 
> use case in a framework that is leading in sorting bench marks? Or is there 
> something fundamentally wrong in the usage?
> 
> On 02-Nov-2014 1:06 am, "Bharath Ravi Kumar"  wrote:
> Hi,
> 
> I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of 
> count ~ 100 million. The data size is 20GB and groupBy results in an RDD of 
> 1061 keys with values being Iterable String>>. The job runs on 3 hosts in a standalone setup with each host's 
> executor having 100G RAM and 24 cores dedicated to it. While the groupBy 
> stage completes successfully with ~24GB of shuffle write, the saveAsTextFile 
> fails after repeated retries with each attempt failing due to an out of 
> memory error [1]. I understand that a few partitions may be overloaded as a 
> result of the groupBy and I've tried the following config combinations 
> unsuccessfully:
> 
> 1) Repartition the initial rdd (44 input partitions but 1061 keys) across 
> 1061 paritions and have max cores = 3 so that each key is a "logical" 
> partition (though many partitions will end up on very few hosts), and each 
> host likely runs saveAsTextFile on a single key at a time due to max cores = 
> 3 with 3 hosts in the cluster. The level of parallelism is unspecified.
> 
> 2) Leave max cores unspecified, set the level of parallelism to 72, and leave 
> number of partitions unspecified (in which case the # input partitions was 
> used, which is 44)
> Since I do not intend to cache RDD's, I have set 
> spark.storage.memoryFraction=0.2 in both cases.
> 
> My understanding is that if each host is processing a single logical 
> partition to saveAsTextFile and is reading from other hosts to write out the 
> RDD, it is unlikely that it would run out of memory. My interpretation of the 
> spark tuning guide is that the degree of parallelism has little impact in 
> case (1) above since max cores = number of hosts. Can someone explain why 
> there are still OOM's with 100G being available? On a related note, 
> intuitively (though I haven't read the source), it appears that an entire 
> key-value pair needn't fit into memory of a single host for saveAsTextFile 
> since a single shuffle read from a remote can be written to HDFS before the 
> next remote read is carried out. This way, not all data needs to be collected 
> at the same time. 
> 
> Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the 
> tuning guide and even as per Datastax's spark introduction), there may need 
> to be more documentation around the internals of spark to help users take 
> better informed tuning decisions with parallelism, max cores, number 
> partitions and other tunables. Is there any ongoing effort on that front?
> 
> Thanks,
> Bharath
> 
> 
> [1] OOM stack trace and logs
> 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID 
> 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size 
> exceeds VM limit
> java.util.Arrays.copyOf(Arrays.java:3326)
> 
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
> 
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
> java.lang.StringBuilder.append(StringBuilder.java:136)
> scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
> scala.Tuple2.toString(Tuple2.scala:22)
> 
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
> 
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
> 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> org.apache.spark.scheduler.

Re: OOM with groupBy + saveAsTextFile

2014-11-01 Thread Bharath Ravi Kumar
Resurfacing the thread. Oom shouldn't be the norm for a common groupby /
sort use case in a framework that is leading in sorting bench marks? Or is
there something fundamentally wrong in the usage?
On 02-Nov-2014 1:06 am, "Bharath Ravi Kumar"  wrote:

> Hi,
>
> I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
> of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
> of 1061 keys with values being Iterable String>>. The job runs on 3 hosts in a standalone setup with each host's
> executor having 100G RAM and 24 cores dedicated to it. While the groupBy
> stage completes successfully with ~24GB of shuffle write, the
> saveAsTextFile fails after repeated retries with each attempt failing due
> to an out of memory error *[1]*. I understand that a few partitions may
> be overloaded as a result of the groupBy and I've tried the following
> config combinations unsuccessfully:
>
> 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
> 1061 paritions and have max cores = 3 so that each key is a "logical"
> partition (though many partitions will end up on very few hosts), and each
> host likely runs saveAsTextFile on a single key at a time due to max cores
> = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.
>
> 2) Leave max cores unspecified, set the level of parallelism to 72, and
> leave number of partitions unspecified (in which case the # input
> partitions was used, which is 44)
> Since I do not intend to cache RDD's, I have set
> spark.storage.memoryFraction=0.2 in both cases.
>
> My understanding is that if each host is processing a single logical
> partition to saveAsTextFile and is reading from other hosts to write out
> the RDD, it is unlikely that it would run out of memory. My interpretation
> of the spark tuning guide is that the degree of parallelism has little
> impact in case (1) above since max cores = number of hosts. Can someone
> explain why there are still OOM's with 100G being available? On a related
> note, intuitively (though I haven't read the source), it appears that an
> entire key-value pair needn't fit into memory of a single host for
> saveAsTextFile since a single shuffle read from a remote can be written to
> HDFS before the next remote read is carried out. This way, not all data
> needs to be collected at the same time.
>
> Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
> tuning guide and even as per Datastax's spark introduction), there may need
> to be more documentation around the internals of spark to help users take
> better informed tuning decisions with parallelism, max cores, number
> partitions and other tunables. Is there any ongoing effort on that front?
>
> Thanks,
> Bharath
>
>
> *[1]* OOM stack trace and logs
> 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
> 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
> size exceeds VM limit
> java.util.Arrays.copyOf(Arrays.java:3326)
>
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
> java.lang.StringBuilder.append(StringBuilder.java:136)
>
> scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
> scala.Tuple2.toString(Tuple2.scala:22)
>
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
>
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID
> 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com,
> 43704, 0), shuffleId=0, mapId=13, reduceId=92)
> 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
> ModelTrainer.java:141) as failed due to a fetch failure from Stage 37
> (groupBy at ModelTrainer.java:133)
> 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
> ModelTrainer.java:141) failed in 55.259 s
> 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
> ModelTrainer.java:133) and Stage 36 (saveAsTextFile at
> ModelTrainer.java:141) due to fetch failure
>
>
>
>
>


Re: SparkSQL + Hive Cached Table Exception

2014-11-01 Thread Cheng Lian
Just submitted a PR to fix this https://github.com/apache/spark/pull/3059

On Sun, Nov 2, 2014 at 12:36 AM, Jean-Pascal Billaud 
wrote:

> Great! Thanks.
>
> Sent from my iPad
>
> On Nov 1, 2014, at 8:35 AM, Cheng Lian  wrote:
>
> Hi Jean,
>
> Thanks for reporting this. This is indeed a bug: some column types
> (Binary, Array, Map and Struct, and unfortunately for some reason,
> Boolean), a NoopColumnStats is used to collect column statistics, which
> causes this issue. Filed SPARK-4182 to track this issue, will fix this ASAP.
>
> Cheng
>
> On Fri, Oct 31, 2014 at 7:04 AM, Jean-Pascal Billaud 
> wrote:
>
>> Hi,
>>
>> While testing SparkSQL on top of our Hive metastore, I am getting
>> some java.lang.ArrayIndexOutOfBoundsException while reusing a cached RDD
>> table.
>>
>> Basically, I have a table "mtable" partitioned by some "date" field in
>> hive and below is the scala code I am running in spark-shell:
>>
>> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
>> val rdd_mtable = sqlContext.sql("select * from mtable where
>> date=20141028");
>> rdd_mtable.registerTempTable("rdd_mtable");
>> sqlContext.cacheTable("rdd_mtable");
>> sqlContext.sql("select count(*) from rdd_mtable").collect(); <-- OK
>> sqlContext.sql("select count(*) from rdd_mtable").collect(); <-- Exception
>>
>> So the first collect() is working just fine, however running the second
>> collect() which I expect use the cached RDD throws some
>> java.lang.ArrayIndexOutOfBoundsException, see the backtrace at the end of
>> this email. It seems the columnar traversal is crashing for some reasons.
>> FYI, I am using spark ToT (234de9232bcfa212317a8073c4a82c3863b36b14).
>>
>> java.lang.ArrayIndexOutOfBoundsException: 14
>> at
>> org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
>> at
>> org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
>> at
>> org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)
>> at org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:89)
>> at
>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66)
>> at
>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at
>> org.apache.spark.sql.columnar.InMemoryRelation.computeSizeInBytes(InMemoryColumnarTableScan.scala:66)
>> at
>> org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:87)
>> at
>> org.apache.spark.sql.columnar.InMemoryRelation.statisticsToBePropagated(InMemoryColumnarTableScan.scala:73)
>> at
>> org.apache.spark.sql.columnar.InMemoryRelation.withOutput(InMemoryColumnarTableScan.scala:147)
>> at
>> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122)
>> at
>> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122)
>> at scala.Option.map(Option.scala:145)
>> at
>> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:122)
>> at
>> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:119)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
>> at
>

Re: Spark SQL : how to find element where a field is in a given set

2014-11-01 Thread abhinav chowdary
I have same requirement of passing list of values to in clause, when i am
trying to do

i am getting below error

scala> val longList = Seq[Expression]("a", "b")
:11: error: type mismatch;
 found   : String("a")
 required: org.apache.spark.sql.catalyst.expressions.Expression
   val longList = Seq[Expression]("a", "b")

Thanks


On Fri, Aug 29, 2014 at 3:52 PM, Michael Armbrust 
wrote:

> This feature was not part of that version.  It will be in 1.1.
>
>
> On Fri, Aug 29, 2014 at 12:33 PM, Jaonary Rabarisoa 
> wrote:
>
>>
>> 1.0.2
>>
>>
>> On Friday, August 29, 2014, Michael Armbrust 
>> wrote:
>>
>>> What version are you using?
>>>
>>>
>>>
>>> On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa 
>>> wrote:
>>>
 Still not working for me. I got a compilation error : *value in is not
 a member of Symbol.* Any ideas ?


 On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust <
 mich...@databricks.com> wrote:

> To pass a list to a variadic function you can use the type ascription
> :_*
>
> For example:
>
> val longList = Seq[Expression]("a", "b", ...)
> table("src").where('key in (longList: _*))
>
> Also, note that I had to explicitly specify Expression as the type
> parameter of Seq to ensure that the compiler converts "a" and "b" into
> Spark SQL expressions.
>
>
>
>
> On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa  > wrote:
>
>> ok, but what if I have a long list do I need to hard code like this
>> every element of my list of is there a function that translate a list 
>> into
>> a tuple ?
>>
>>
>> On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> You don't need the Seq, as in is a variadic function.
>>>
>>> personTable.where('name in ("foo", "bar"))
>>>
>>>
>>>
>>> On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa <
>>> jaon...@gmail.com> wrote:
>>>
 Hi all,

 What is the expression that I should use with spark sql DSL if I
 need to retreive
 data with a field in a given set.
 For example :

 I have the following schema

 case class Person(name: String, age: Int)

 And I need to do something like :

 personTable.where('name in Seq("foo", "bar")) ?


 Cheers.


 Jaonary

>>>
>>>
>>
>

>>>
>>
>
>


-- 
Warm Regards
Abhinav Chowdary


Re: union of SchemaRDDs

2014-11-01 Thread Matei Zaharia
It does generalize types, but only on the intersection of the columns it seems. 
There might be a way to get the union of the columns too using HiveQL. Types 
generalize up with string being the "most general".

Matei

> On Nov 1, 2014, at 6:22 PM, Daniel Mahler  wrote:
> 
> Thanks Matei. What does unionAll do if the input RDD schemas are not 100% 
> compatible. Does it take the union of the columns and generalize the types?
> 
> thanks
> Daniel
> 
> On Sat, Nov 1, 2014 at 6:08 PM, Matei Zaharia  > wrote:
> Try unionAll, which is a special method on SchemaRDDs that keeps the schema 
> on the results.
> 
> Matei
> 
> > On Nov 1, 2014, at 3:57 PM, Daniel Mahler  > > wrote:
> >
> > I would like to combine 2 parquet tables I have create.
> > I tried:
> >
> >   sc.union(sqx.parquetFile("fileA"), sqx.parquetFile("fileB"))
> >
> > but that just returns RDD[Row].
> > How do I combine them to get a SchemaRDD[Row]?
> >
> > thanks
> > Daniel
> 
> 



org.apache.hadoop.security.UserGroupInformation.doAs Issue

2014-11-01 Thread TJ Klein
Hi there,

I am trying to run the example code pi.py on a cluster, however, I only got
it working on localhost. When trying to run in standalone mode, 

./bin/spark-submit \
  --master spark://[mymaster]:7077 \
  examples/src/main/python/pi.py \

I get warnings about resources and memory (the workstation actually has
192GByte Memory and 32 cores).

14/11/01 21:37:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory
14/11/01 21:37:05 INFO client.AppClient$ClientActor: Executor updated:
app-20141101213420-/4 is now EXITED (Command exited with code 1)
14/11/01 21:37:05 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20141101213420-/4 removed: Command exited with code 1
14/11/01 21:37:05 INFO client.AppClient$ClientActor: Executor added:
app-20141101213420-/5 on worker-20141101213345-localhost-33525
(localhost:33525) with 32 cores
14/11/01 21:37:05 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20141101213420-/5 on hostPort localhost:33525 with 32 cores,
1024.0 MB RAM
14/11/01 21:37:05 INFO client.AppClient$ClientActor: Executor updated:
app-20141101213420-/5 is now RUNNING
14/11/01 21:37:20 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory
14/11/01 21:37:35 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory
14/11/01 21:37:38 INFO client.AppClient$ClientActor: Executor updated:
app-20141101213420-/5 is now EXITED (Command exited with code 1)
14/11/01 21:37:38 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20141101213420-/5 removed: Command exited with code 1
14/11/01 21:37:38 INFO client.AppClient$ClientActor: Executor added:
app-20141101213420-/6 on worker-20141101213345-localhost-33525
(localhost:33525) with 32 cores
14/11/01 21:37:38 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20141101213420-/6 on hostPort localhost:33525 with 32 cores,
1024.0 MB RAM
14/11/01 21:37:38 INFO client.AppClient$ClientActor: Executor updated:
app-20141101213420-/6 is now RUNNING
14/11/01 21:37:50 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory
14/11/01 21:38:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory
14/11/01 21:38:11 INFO client.AppClient$ClientActor: Executor updated:
app-20141101213420-/6 is now EXITED (Command exited with code 1)
14/11/01 21:38:11 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20141101213420-/6 removed: Command exited with code 1
14/11/01 21:38:11 INFO client.AppClient$ClientActor: Executor added:
app-20141101213420-/7 on worker-20141101213345-localhost-33525
(localhost:33525) with 32 cores
14/11/01 21:38:11 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20141101213420-/7 on hostPort localhost:33525 with 32 cores,
1024.0 MB RAM
14/11/01 21:38:11 INFO client.AppClient$ClientActor: Executor updated:
app-20141101213420-/7 is now RUNNING
[..]



The worker is connected successfully to the master and tries to run the
code: 

14/11/01 21:39:17 INFO worker.Worker: Asked to launch executor
app-20141101213420-/9 for PythonPi
14/11/01 21:39:17 WARN worker.CommandUtils: SPARK_JAVA_OPTS was set on the
worker. It is deprecated in Spark 1.0.
14/11/01 21:39:17 WARN worker.CommandUtils: Set SPARK_LOCAL_DIRS for
node-specific storage locations.
14/11/01 21:39:17 INFO worker.ExecutorRunner: Launch command:
"/usr/lib/jvm/java-6-openjdk-amd64/jre/bin/java" "-cp"
"::/etc/hadoop/spark/spark-1.1.0/conf:/etc/hadoop/spark/spark-1.1.0/assembly/target/scala-2.10/spark-assembly-1.1.0-hadoop2.5.1.jar:/etc/hadoop/conf"
"-XX:MaxPermSize=128m" "-verbose:gc" "-XX:+PrintGCDetails"
"-XX:+PrintGCTimeStamps" "-Dspark.akka.frameSize=32"
"-Dspark.driver.port=47509" "-verbose:gc" "-XX:+PrintGCDetails"
"-XX:+PrintGCTimeStamps" "-Xms1024M" "-Xmx1024M"
"org.apache.spark.executor.CoarseGrainedExecutorBackend"
"akka.tcp://sparkDriver@localhost:47509/user/CoarseGrainedScheduler" "9"
"localhost" "32" "akka.tcp://sparkWorker@localhost:33525/user/Worker"
"app-20141101213420-"
14/11/01 21:39:50 INFO worker.Worker: Executor app-20141101213420-/9
finished with state EXITED message Command exited with code 1 exitStatus 1


Looking at the working thread log file in
/spark-1.1.0/work/app-20141101213420-/[..]/stderr

14/11/01 21:38:46 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://driverPropsFetcher@localhost:52163]
14/11/01 21:38:46 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://driverPropsFetche

Re: union of SchemaRDDs

2014-11-01 Thread Daniel Mahler
Thanks Matei. What does unionAll do if the input RDD schemas are not 100%
compatible. Does it take the union of the columns and generalize the types?

thanks
Daniel

On Sat, Nov 1, 2014 at 6:08 PM, Matei Zaharia 
wrote:

> Try unionAll, which is a special method on SchemaRDDs that keeps the
> schema on the results.
>
> Matei
>
> > On Nov 1, 2014, at 3:57 PM, Daniel Mahler  wrote:
> >
> > I would like to combine 2 parquet tables I have create.
> > I tried:
> >
> >   sc.union(sqx.parquetFile("fileA"), sqx.parquetFile("fileB"))
> >
> > but that just returns RDD[Row].
> > How do I combine them to get a SchemaRDD[Row]?
> >
> > thanks
> > Daniel
>
>


Re: union of SchemaRDDs

2014-11-01 Thread Matei Zaharia
Try unionAll, which is a special method on SchemaRDDs that keeps the schema on 
the results.

Matei

> On Nov 1, 2014, at 3:57 PM, Daniel Mahler  wrote:
> 
> I would like to combine 2 parquet tables I have create.
> I tried:
> 
>   sc.union(sqx.parquetFile("fileA"), sqx.parquetFile("fileB"))
> 
> but that just returns RDD[Row].
> How do I combine them to get a SchemaRDD[Row]?
> 
> thanks
> Daniel


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



union of SchemaRDDs

2014-11-01 Thread Daniel Mahler
I would like to combine 2 parquet tables I have create.
I tried:

  sc.union(sqx.parquetFile("fileA"), sqx.parquetFile("fileB"))

but that just returns RDD[Row].
How do I combine them to get a SchemaRDD[Row]?

thanks
Daniel


Re: Spark speed performance

2014-11-01 Thread Aaron Davidson
coalesce() is a streaming operation if used without the second parameter,
it does not put all the data in RAM. If used with the second parameter
(shuffle = true), then it performs a shuffle, but still does not put all
the data in RAM.

On Sat, Nov 1, 2014 at 12:09 PM,  wrote:

> Now I am getting to problems using:
>
> distData = sc.textFile(sys.argv[2]).coalesce(10)
>
>
>
> The problem is that it seems that Spark is trying to put all the data to
> RAM first and then perform coalesce. Do you know if there is something
> that would do coalesce on fly with for example fixed size of the partition?
> Do you think that something like this is possible? Unfortunately I am not
> able to find anything like this in the Spark documentation.
>
> Thank you in advance for any advices or suggestions.
>
> Best regards,
> Jan
>
> __
>
>
> Thank you very much lot of very small json files was exactly the speed
> performance problem, using coalesce makes my Spark program to run on single
> node only twice slower (even with starting Spark) than single node Python
> program, which is acceptable.
>
> Jan
> __
>
> Because the overhead between JVM and Python, single task will be
> slower than your local Python scripts, but it's very easy to scale to
> many CPUs.
>
> Even one CPUs, it's not common that PySpark was 100 times slower. You
> have many small files, each file will be processed by a task, which
> will have about 100ms overhead (scheduled and executed), but the small
> file can be processed in your single thread Python script in less than
> 1ms.
>
> You could pack your json files into larger ones, or you could try to
> merge the small tasks into larger one by coalesce(N), such as:
>
> distData = sc.textFile(sys.argv[2]).coalesce(10)  # which will have 10
> partitons (tasks)
>
> Davies
>
> On Sat, Oct 18, 2014 at 12:07 PM,   wrote:
> > Hi,
> >
> > I have program that I have for single computer (in Python) exection and
> also
> > implemented the same for Spark. This program basically only reads .json
> from
> > which it takes one field and saves it back. Using Spark my program runs
> > aproximately 100 times slower on 1 master and 1 slave. So I would like to
> > ask where possibly might be the problem?
> >
> > My Spark program looks like:
> >
> >
> >
> > sc = SparkContext(appName="Json data preprocessor")
> >
> > distData = sc.textFile(sys.argv[2])
> >
> > json_extractor = JsonExtractor(sys.argv[1])
> >
> > cleanedData = distData.flatMap(json_extractor.extract_json)
> >
> > cleanedData.saveAsTextFile(sys.argv[3])
> >
> > JsonExtractor only selects the data from field that is given by
> sys.argv[1].
> >
> >
> >
> > My data are basically many small one json files, where is one json per
> line.
> >
> > I have tried both, reading and writing the data from/to Amazon S3, local
> > disc on all the machines.
> >
> > I would like to ask if there is something that I am missing or if Spark
> is
> > supposed to be so slow in comparison with the local non parallelized
> single
> > node program.
> >
> >
> >
> > Thank you in advance for any suggestions or hints.
> >
> >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: OOM with groupBy + saveAsTextFile

2014-11-01 Thread Bharath Ravi Kumar
Minor clarification: I'm running spark 1.1.0 on JDK 1.8, Linux 64 bit.

On Sun, Nov 2, 2014 at 1:06 AM, Bharath Ravi Kumar 
wrote:

> Hi,
>
> I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
> of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
> of 1061 keys with values being Iterable String>>. The job runs on 3 hosts in a standalone setup with each host's
> executor having 100G RAM and 24 cores dedicated to it. While the groupBy
> stage completes successfully with ~24GB of shuffle write, the
> saveAsTextFile fails after repeated retries with each attempt failing due
> to an out of memory error *[1]*. I understand that a few partitions may
> be overloaded as a result of the groupBy and I've tried the following
> config combinations unsuccessfully:
>
> 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
> 1061 paritions and have max cores = 3 so that each key is a "logical"
> partition (though many partitions will end up on very few hosts), and each
> host likely runs saveAsTextFile on a single key at a time due to max cores
> = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.
>
> 2) Leave max cores unspecified, set the level of parallelism to 72, and
> leave number of partitions unspecified (in which case the # input
> partitions was used, which is 44)
> Since I do not intend to cache RDD's, I have set
> spark.storage.memoryFraction=0.2 in both cases.
>
> My understanding is that if each host is processing a single logical
> partition to saveAsTextFile and is reading from other hosts to write out
> the RDD, it is unlikely that it would run out of memory. My interpretation
> of the spark tuning guide is that the degree of parallelism has little
> impact in case (1) above since max cores = number of hosts. Can someone
> explain why there are still OOM's with 100G being available? On a related
> note, intuitively (though I haven't read the source), it appears that an
> entire key-value pair needn't fit into memory of a single host for
> saveAsTextFile since a single shuffle read from a remote can be written to
> HDFS before the next remote read is carried out. This way, not all data
> needs to be collected at the same time.
>
> Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
> tuning guide and even as per Datastax's spark introduction), there may need
> to be more documentation around the internals of spark to help users take
> better informed tuning decisions with parallelism, max cores, number
> partitions and other tunables. Is there any ongoing effort on that front?
>
> Thanks,
> Bharath
>
>
> *[1]* OOM stack trace and logs
> 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
> 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
> size exceeds VM limit
> java.util.Arrays.copyOf(Arrays.java:3326)
>
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
> java.lang.StringBuilder.append(StringBuilder.java:136)
>
> scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
> scala.Tuple2.toString(Tuple2.scala:22)
>
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
>
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID
> 1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com,
> 43704, 0), shuffleId=0, mapId=13, reduceId=92)
> 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
> ModelTrainer.java:141) as failed due to a fetch failure from Stage 37
> (groupBy at ModelTrainer.java:133)
> 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
> ModelTrainer.java:141) failed in 55.259 s
> 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
> ModelTrainer.java:133) and Stage 36 (saveAsTextFile at
> ModelTrainer.java:141) due to fetch failure
>
>
>
>
>


OOM with groupBy + saveAsTextFile

2014-11-01 Thread Bharath Ravi Kumar
Hi,

I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of
count ~ 100 million. The data size is 20GB and groupBy results in an RDD of
1061 keys with values being Iterable>. The job runs on 3 hosts in a standalone setup with each host's
executor having 100G RAM and 24 cores dedicated to it. While the groupBy
stage completes successfully with ~24GB of shuffle write, the
saveAsTextFile fails after repeated retries with each attempt failing due
to an out of memory error *[1]*. I understand that a few partitions may be
overloaded as a result of the groupBy and I've tried the following config
combinations unsuccessfully:

1) Repartition the initial rdd (44 input partitions but 1061 keys) across
1061 paritions and have max cores = 3 so that each key is a "logical"
partition (though many partitions will end up on very few hosts), and each
host likely runs saveAsTextFile on a single key at a time due to max cores
= 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

2) Leave max cores unspecified, set the level of parallelism to 72, and
leave number of partitions unspecified (in which case the # input
partitions was used, which is 44)
Since I do not intend to cache RDD's, I have set
spark.storage.memoryFraction=0.2 in both cases.

My understanding is that if each host is processing a single logical
partition to saveAsTextFile and is reading from other hosts to write out
the RDD, it is unlikely that it would run out of memory. My interpretation
of the spark tuning guide is that the degree of parallelism has little
impact in case (1) above since max cores = number of hosts. Can someone
explain why there are still OOM's with 100G being available? On a related
note, intuitively (though I haven't read the source), it appears that an
entire key-value pair needn't fit into memory of a single host for
saveAsTextFile since a single shuffle read from a remote can be written to
HDFS before the next remote read is carried out. This way, not all data
needs to be collected at the same time.

Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
tuning guide and even as per Datastax's spark introduction), there may need
to be more documentation around the internals of spark to help users take
better informed tuning decisions with parallelism, max cores, number
partitions and other tunables. Is there any ongoing effort on that front?

Thanks,
Bharath


*[1]* OOM stack trace and logs
14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array size
exceeds VM limit
java.util.Arrays.copyOf(Arrays.java:3326)

java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)

java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)

java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
java.lang.StringBuilder.append(StringBuilder.java:136)

scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
scala.Tuple2.toString(Tuple2.scala:22)

org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)

org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID
1295, proc1.foo.bar.com): FetchFailed(BlockManagerId(1, proc1.foo.bar.com,
43704, 0), shuffleId=0, mapId=13, reduceId=92)
14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
ModelTrainer.java:141) as failed due to a fetch failure from Stage 37
(groupBy at ModelTrainer.java:133)
14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
ModelTrainer.java:141) failed in 55.259 s
14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
ModelTrainer.java:133) and Stage 36 (saveAsTextFile at
ModelTrainer.java:141) due to fetch failure


Re: Spark speed performance

2014-11-01 Thread jan.zikes

Now I am getting to problems using:

distData = sc.textFile(sys.argv[2]).coalesce(10)
 
The problem is that it seems that Spark is trying to put all the data to RAM 
first and then perform coalesce. Do you know if there is something that would 
do coalesce on fly with for example fixed size of the partition? Do you think 
that something like this is possible? Unfortunately I am not able to find 
anything like this in the Spark documentation.

Thank you in advance for any advices or suggestions.

Best regards,
Jan 
__



Thank you very much lot of very small json files was exactly the speed 
performance problem, using coalesce makes my Spark program to run on single 
node only twice slower (even with starting Spark) than single node Python 
program, which is acceptable.

Jan 
__

Because the overhead between JVM and Python, single task will be
slower than your local Python scripts, but it's very easy to scale to
many CPUs.

Even one CPUs, it's not common that PySpark was 100 times slower. You
have many small files, each file will be processed by a task, which
will have about 100ms overhead (scheduled and executed), but the small
file can be processed in your single thread Python script in less than
1ms.

You could pack your json files into larger ones, or you could try to
merge the small tasks into larger one by coalesce(N), such as:

distData = sc.textFile(sys.argv[2]).coalesce(10)  # which will have 10
partitons (tasks)

Davies

On Sat, Oct 18, 2014 at 12:07 PM,   wrote:

Hi,

I have program that I have for single computer (in Python) exection and also
implemented the same for Spark. This program basically only reads .json from
which it takes one field and saves it back. Using Spark my program runs
aproximately 100 times slower on 1 master and 1 slave. So I would like to
ask where possibly might be the problem?

My Spark program looks like:



sc = SparkContext(appName="Json data preprocessor")

distData = sc.textFile(sys.argv[2])

json_extractor = JsonExtractor(sys.argv[1])

cleanedData = distData.flatMap(json_extractor.extract_json)

cleanedData.saveAsTextFile(sys.argv[3])

JsonExtractor only selects the data from field that is given by sys.argv[1].



My data are basically many small one json files, where is one json per line.

I have tried both, reading and writing the data from/to Amazon S3, local
disc on all the machines.

I would like to ask if there is something that I am missing or if Spark is
supposed to be so slow in comparison with the local non parallelized single
node program.



Thank you in advance for any suggestions or hints.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: stage failure: java.lang.IllegalStateException: unread block data

2014-11-01 Thread TJ Klein
Hi,

I get exactly the same error. It runs on my local machine but not on the
cluster. I am running the example pi.py example.

Best,
 Tassilo



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/stage-failure-java-lang-IllegalStateException-unread-block-data-tp17751p17889.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: A Spark Design Problem

2014-11-01 Thread Steve Lewis
join seems to me the proper approach followed by keying  the fits by KeyID
and using combineByKey to choose the best -
I am implementing that now and will report on performance

On Fri, Oct 31, 2014 at 11:56 AM, Sonal Goyal  wrote:

> Does the following help?
>
> JavaPairRDD join with JavaPairRDD
>
> If you partition both RDDs by the bin id, I think you should be able to
> get what you want.
>
> Best Regards,
> Sonal
> Nube Technologies 
>
> 
>
>
>>
>> On Fri, Oct 31, 2014 at 5:44 PM, Steve Lewis 
>> wrote:
>>
>>>
>>>  The original problem is in biology but the following captures the CS
>>> issues, Assume ...
>>>
>>


Re: SparkSQL + Hive Cached Table Exception

2014-11-01 Thread Jean-Pascal Billaud
Great! Thanks.

Sent from my iPad

> On Nov 1, 2014, at 8:35 AM, Cheng Lian  wrote:
> 
> Hi Jean,
> 
> Thanks for reporting this. This is indeed a bug: some column types (Binary, 
> Array, Map and Struct, and unfortunately for some reason, Boolean), a 
> NoopColumnStats is used to collect column statistics, which causes this 
> issue. Filed SPARK-4182 to track this issue, will fix this ASAP.
> 
> Cheng
> 
>> On Fri, Oct 31, 2014 at 7:04 AM, Jean-Pascal Billaud  
>> wrote:
>> Hi,
>> 
>> While testing SparkSQL on top of our Hive metastore, I am getting some 
>> java.lang.ArrayIndexOutOfBoundsException while reusing a cached RDD table.
>> 
>> Basically, I have a table "mtable" partitioned by some "date" field in hive 
>> and below is the scala code I am running in spark-shell:
>> 
>> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
>> val rdd_mtable = sqlContext.sql("select * from mtable where date=20141028");
>> rdd_mtable.registerTempTable("rdd_mtable");
>> sqlContext.cacheTable("rdd_mtable");
>> sqlContext.sql("select count(*) from rdd_mtable").collect(); <-- OK
>> sqlContext.sql("select count(*) from rdd_mtable").collect(); <-- Exception
>> 
>> So the first collect() is working just fine, however running the second 
>> collect() which I expect use the cached RDD throws some 
>> java.lang.ArrayIndexOutOfBoundsException, see the backtrace at the end of 
>> this email. It seems the columnar traversal is crashing for some reasons. 
>> FYI, I am using spark ToT (234de9232bcfa212317a8073c4a82c3863b36b14).
>> 
>> java.lang.ArrayIndexOutOfBoundsException: 14
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:89)
>>  at 
>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66)
>>  at 
>> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>  at 
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>  at 
>> org.apache.spark.sql.columnar.InMemoryRelation.computeSizeInBytes(InMemoryColumnarTableScan.scala:66)
>>  at 
>> org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:87)
>>  at 
>> org.apache.spark.sql.columnar.InMemoryRelation.statisticsToBePropagated(InMemoryColumnarTableScan.scala:73)
>>  at 
>> org.apache.spark.sql.columnar.InMemoryRelation.withOutput(InMemoryColumnarTableScan.scala:147)
>>  at 
>> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122)
>>  at 
>> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122)
>>  at scala.Option.map(Option.scala:145)
>>  at 
>> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:122)
>>  at 
>> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:119)
>>  at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
>>  at 
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
>>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>  at 
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>  at 
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>  at 
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>  at 
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>  at 
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>  at 
>> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
>>  at 
>> or

Re: SparkSQL + Hive Cached Table Exception

2014-11-01 Thread Cheng Lian
Hi Jean,

Thanks for reporting this. This is indeed a bug: some column types (Binary,
Array, Map and Struct, and unfortunately for some reason, Boolean), a
NoopColumnStats is used to collect column statistics, which causes this
issue. Filed SPARK-4182 to track this issue, will fix this ASAP.

Cheng

On Fri, Oct 31, 2014 at 7:04 AM, Jean-Pascal Billaud 
wrote:

> Hi,
>
> While testing SparkSQL on top of our Hive metastore, I am getting
> some java.lang.ArrayIndexOutOfBoundsException while reusing a cached RDD
> table.
>
> Basically, I have a table "mtable" partitioned by some "date" field in
> hive and below is the scala code I am running in spark-shell:
>
> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
> val rdd_mtable = sqlContext.sql("select * from mtable where
> date=20141028");
> rdd_mtable.registerTempTable("rdd_mtable");
> sqlContext.cacheTable("rdd_mtable");
> sqlContext.sql("select count(*) from rdd_mtable").collect(); <-- OK
> sqlContext.sql("select count(*) from rdd_mtable").collect(); <-- Exception
>
> So the first collect() is working just fine, however running the second
> collect() which I expect use the cached RDD throws some
> java.lang.ArrayIndexOutOfBoundsException, see the backtrace at the end of
> this email. It seems the columnar traversal is crashing for some reasons.
> FYI, I am using spark ToT (234de9232bcfa212317a8073c4a82c3863b36b14).
>
> java.lang.ArrayIndexOutOfBoundsException: 14
> at
> org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
> at
> org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
> at
> org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:108)
> at org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:89)
> at
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66)
> at
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$computeSizeInBytes$1.apply(InMemoryColumnarTableScan.scala:66)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.sql.columnar.InMemoryRelation.computeSizeInBytes(InMemoryColumnarTableScan.scala:66)
> at
> org.apache.spark.sql.columnar.InMemoryRelation.statistics(InMemoryColumnarTableScan.scala:87)
> at
> org.apache.spark.sql.columnar.InMemoryRelation.statisticsToBePropagated(InMemoryColumnarTableScan.scala:73)
> at
> org.apache.spark.sql.columnar.InMemoryRelation.withOutput(InMemoryColumnarTableScan.scala:147)
> at
> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122)
> at
> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1$$anonfun$applyOrElse$1.apply(CacheManager.scala:122)
> at scala.Option.map(Option.scala:145)
> at
> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:122)
> at
> org.apache.spark.sql.CacheManager$$anonfun$useCachedData$1.applyOrElse(CacheManager.scala:119)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
> at
> org.apache.spark.sql.CacheManager$class.useCachedData(CacheManager.scala:119)
> at org.apache.spark.sql.SQLContext.useCachedData(SQLContext.scala:49)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:376)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:3

Re: use additional ebs volumes for hsdf storage with spark-ec2

2014-11-01 Thread Marius Soutier
Are these /vols formatted? You typically need to format and define a mount 
point in /mnt for attached EBS volumes.

I’m not using the ec2 script, so I don’t know what is installed, but there’s 
usually an HDFS info service running on port 50070. After changing 
hdfs-site.xml, you have to restart the HDFS service. The Cloudera distribution 
supports this in the UI, otherwise depending on your version and so on there 
should be scripts in /usr/local/hadoop, /usr/lib/hadoop-hdfs, or something 
similar.

On 31.10.2014, at 05:56, Daniel Mahler  wrote:

> Thanks Akhil. I tried changing /root/ephemeral-hdfs/conf/hdfs-site.xml to have
> 
>   
> dfs.data.dir
> 
> /vol,/vol0,/vol1,/vol2,/vol3,/vol4,/vol5,/vol6,/vol7,/mnt/ephemeral-hdfs/data,/mnt2/ephemeral-hdfs/data
>   
> 
> and then running
> 
> /root/ephemeral-hdfs/bin/stop-all.sh
> copy-dir  /root/ephemeral-hdfs/conf/
> /root/ephemeral-hdfs/bin/start-all.sh
> 
> to try and make sure the new configurations taks on the entire cluster.
> I then ran spark to write to the local hdfs.
> It failed after filling the original /mnt* mounted drives,,
> without writing anything to the attached /vol* drives.
> 
> I also tried completely stopping and restarting the cluster,
> but restarting resets /root/ephemeral-hdfs/conf/hdfs-site.xml to the default 
> state.
> 
> thanks
> Daniel
> 
> 
> 
> On Thu, Oct 30, 2014 at 1:56 AM, Akhil Das  wrote:
> I think you can check in the core-site.xml or hdfs-site.xml file under 
> /root/ephemeral-hdfs/etc/hadoop/ where you can see data node dir property 
> which will be a comma separated list of volumes. 
> 
> Thanks
> Best Regards
> 
> On Thu, Oct 30, 2014 at 5:21 AM, Daniel Mahler  wrote:
> I started my ec2 spark cluster with 
> 
> ./ec2/spark---ebs-vol-{size=100,num=8,type=gp2} -t m3.xlarge -s 10 launch 
> mycluster
> 
> I see the additional volumes attached but they do not seem to be set up for 
> hdfs.
> How can I check if they are being utilized on all workers,
> and how can I get all workers to utilize the extra volumes for hdfs.
> I do not have experience using hadoop directly, only through spark.
> 
> thanks
> Daniel
> 
> 



--executor-cores cannot change vcores in yarn?

2014-11-01 Thread Gen
Hi,

Maybe it is a stupid question, but I am running spark on yarn. I request the
resources by the following command:
{code}
./spark-submit --master yarn-client --num-executors #number of worker
--executor-cores #number of cores. ...
{code}
However, after launching the task, I use /yarn node -status ID / to monitor
the situation of cluster. It shows that the number of Vcores used for each
container is always 1 no matter what number I pass by --executor-cores. 
Any ideas how to solve this problem? Thanks a lot in advance for your help.

Cheers
Gen




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/executor-cores-cannot-change-vcores-in-yarn-tp17883.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: "CANNOT FIND ADDRESS"

2014-11-01 Thread Akhil Das
Tr this

spark.storage.memoryFraction0.9
On 31 Oct 2014 20:27, "akhandeshi"  wrote:

> Thanks for the pointers! I did tried but didn't seem to help...
>
> In my latest try, I am doing spark-submit local
>
> But see the same message in  spark App ui (4040)
> localhost   CANNOT FIND ADDRESS
>
> In the logs, I see a lot of in-memory map to disk.  I don't understand why
> that is the case.  There should be over 35 gb ram available for input that
> is not significantly large.  It may be link to the performance issues that
> I
> am seeing.  I have another post for seeing advice on  that.  It seems, I am
> not able to tune spark sufficiently to execute my process successfully.
>
> 14/10/31 13:45:12 INFO ExternalAppendOnlyMap: Thread 206 spilling in-memory
> map of 1777 MB to disk (2 times so
>  far)
> 14/10/31 13:45:12 INFO ExternalAppendOnlyMap: Thread 228 spilling in-memory
> map of 393 MB to disk (1 time so f
> ar)
> 14/10/31 13:45:12 INFO ExternalAppendOnlyMap: Thread 259 spilling in-memory
> map of 392 MB to disk (2 times so
> far)
> 14/10/31 13:45:14 INctorsBySecId();^ZFO ExternalAppendOnlyMap: Thread 230
> spilling in-memory map of 554 MB to
> disk (2 times so far)
> 14/10/31 13:45:15 INFO ExternalAppendOnlyMap: Thread 235 spilling in-memory
> map of 3990 MB to disk (1 time so
> far)
> 14/10/31 13:45:15 INFO ExternalAppendOnlyMap: Thread 236 spilling in-memory
> map of 2667 MB to disk (1 time so
> far)
> 14/10/31 13:45:17 INFO ExternalAppendOnlyMap: Thread 259 spilling in-memory
> map of 825 MB to disk (3 times so
> far)
> 14/10/31 13:45:24 INFO ExternalAppendOnlyMap: Thread 228 spilling in-memory
> map of 4618 MB to disk (2 times so
>  far)
> 14/10/31 13:45:26 INFO ExternalAppendOnlyMap: Thread 233 spilling in-memory
> map of 15869 MB to disk (1 time so
>  far)
> 14/10/31 13:45:37 INFO ExternalAppendOnlyMap: Thread 206 spilling in-memory
> map of 3026 MB to disk (3 times so
>  far)
> 14/10/31 13:45:48 INFO ExternalAppendOnlyMap: Thread 228 spilling in-memory
> map of 401 MB to disk (3 times so
> far)
> 14/10/31 13:45:48 INFO ExternalAppendOnlyMap: Thread 259 spilling in-memory
> map of 392 MB to disk (4 times so
>
> My spark properties are:
>
> NameValue
> spark.akka.frameSize50
> spark.akka.timeout  900
> spark.app.name  Simple File Merge Application
> spark.core.connection.ack.wait.timeout  900
> spark.default.parallelism   10
> spark.driver.host   spark-single.c.fi-mdd-poc.internal
> spark.driver.memory 35g
> spark.driver.port   40255
> spark.eventLog.enabled  true
> spark.fileserver.urihttp://10.240.106.135:59255
> spark.jars  file:/home/ami_khandeshi_gmail_com/SparkExample-1.0.jar
> spark.masterlocal[16]
> spark.scheduler.modeFIFO
> spark.shuffle.consolidateFiles  true
> spark.storage.memoryFraction0.3
> spark.tachyonStore.baseDir  /tmp
> spark.tachyonStore.folderName   spark-21ad0fd2-2177-48ce-9242-8dbb33f2a1f1
> spark.tachyonStore.url  tachyon://mdd:19998
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/CANNOT-FIND-ADDRESS-tp17637p17824.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>