Re: Best way to import data from Oracle to Spark?

2015-09-09 Thread Reynold Xin
Using the JDBC data source is probably the best way.
http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#jdbc-to-other-databases

On Tue, Sep 8, 2015 at 10:11 AM, Cui Lin  wrote:

> What's the best way to import data from Oracle to Spark? Thanks!
>
>
> --
> Best regards!
>
> Lin,Cui
>


[ANNOUNCE] Announcing Spark 1.5.0

2015-09-09 Thread Reynold Xin
Hi All,

Spark 1.5.0 is the sixth release on the 1.x line. This release represents
1400+ patches from 230+ contributors and 80+ institutions. To download
Spark 1.5.0 visit the downloads page.

A huge thanks go to all of the individuals and organizations involved in
development and testing of this release.

Visit the release notes [1] to read about the new features, or download [2]
the release today.

For errata in the contributions or release notes, please e-mail me
*directly* (not on-list).

Thanks to everyone who helped work on this release!

[1] http://spark.apache.org/releases/spark-release-1-5-0.html
[2] http://spark.apache.org/downloads.html


Re: Driver OOM after upgrading to 1.5

2015-09-09 Thread Reynold Xin
Java 7 / 8?

On Wed, Sep 9, 2015 at 10:10 AM, Sandy Ryza  wrote:

> I just upgraded the spark-timeseries
>  project to run on top of
> 1.5, and I'm noticing that tests are failing with OOMEs.
>
> I ran a jmap -histo on the process and discovered the top heap items to be:
>1:163428   22236064  
>2:163428   21112648  
>3: 12638   14459192  
>4: 12638   13455904  
>5: 105397642528  
>
> Not sure whether this is suspicious.  Any ideas?
>
> -Sandy
>


Re: Driver OOM after upgrading to 1.5

2015-09-09 Thread Reynold Xin
It's likely that with codegen, you need a bigger permgen space. Also I
found that Java 7 doesn't do very well w.r.t. flushing code cache. As a
result, Spark SQL and DataFrames now run much better under Java 8, because
it flushes code cache better.


On Wed, Sep 9, 2015 at 2:12 PM, Sandy Ryza <sandy.r...@cloudera.com> wrote:

> Java 7.
>
> FWIW I was just able to get it to work by increasing MaxPermSize to 256m.
>
> -Sandy
>
> On Wed, Sep 9, 2015 at 11:37 AM, Reynold Xin <r...@databricks.com> wrote:
>
>> Java 7 / 8?
>>
>> On Wed, Sep 9, 2015 at 10:10 AM, Sandy Ryza <sandy.r...@cloudera.com>
>> wrote:
>>
>>> I just upgraded the spark-timeseries
>>> <https://github.com/cloudera/spark-timeseries> project to run on top of
>>> 1.5, and I'm noticing that tests are failing with OOMEs.
>>>
>>> I ran a jmap -histo on the process and discovered the top heap items to
>>> be:
>>>1:163428   22236064  
>>>2:163428   21112648  
>>>3: 12638   14459192  
>>>4: 12638   13455904  
>>>5: 105397642528  
>>>
>>> Not sure whether this is suspicious.  Any ideas?
>>>
>>> -Sandy
>>>
>>
>>
>


Re: Problems with Tungsten in Spark 1.5.0-rc2

2015-09-07 Thread Reynold Xin
On Wed, Sep 2, 2015 at 12:03 AM, Anders Arpteg  wrote:

>
> BTW, is it possible (or will it be) to use Tungsten with dynamic
> allocation and the external shuffle manager?
>
>
Yes - I think this already works. There isn't anything specific here
related to Tungsten.


Re: How to avoid shuffle errors for a large join ?

2015-09-05 Thread Reynold Xin
Try increase the shuffle memory fraction (by default it is only 16%).
Again, if you run Spark 1.5, this will probably run a lot faster,
especially if you increase the shuffle memory fraction ...

On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak <tom...@gmail.com> wrote:

> While it works with sort-merge-join, it takes about 12h to finish (with
> 1 shuffle partitions). My hunch is that the reason for that is this:
>
> INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to
> disk (62 times so far)
>
> (and lots more where this comes from).
>
> On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin <r...@databricks.com> wrote:
>
>> Can you try 1.5? This should work much, much better in 1.5 out of the box.
>>
>> For 1.4, I think you'd want to turn on sort-merge-join, which is off by
>> default. However, the sort-merge join in 1.4 can still trigger a lot of
>> garbage, making it slower. SMJ performance is probably 5x - 1000x better in
>> 1.5 for your case.
>>
>>
>> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak <tom...@gmail.com> wrote:
>>
>>> I'm getting errors like "Removing executor with no recent heartbeats" &
>>> "Missing an output location for shuffle" errors for a large SparkSql join
>>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
>>> configure the job to avoid them.
>>>
>>> The initial stage completes fine with some 30k tasks on a cluster with
>>> 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
>>> the shuffle stage first waits 30min in the scheduling phase according to
>>> the UI, and then dies with the mentioned errors.
>>>
>>> I can see in the GC logs that the executors reach their memory limits
>>> (32g per executor, 2 workers per machine) and can't allocate any more stuff
>>> in the heap. Fwiw, the top 10 in the memory use histogram are:
>>>
>>> num #instances #bytes  class name
>>> --
>>>1: 24913959511958700560
>>>  scala.collection.immutable.HashMap$HashMap1
>>>2: 251085327 8034730464  scala.Tuple2
>>>3: 243694737 5848673688  java.lang.Float
>>>4: 231198778 5548770672  java.lang.Integer
>>>5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
>>>6:  72191582 2310130624
>>>  scala.collection.immutable.HashMap$HashTrieMap
>>>7:  74114058 1778737392  java.lang.Long
>>>8:   6059103  779203840  [Ljava.lang.Object;
>>>9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
>>>   10: 34749   70122104  [B
>>>
>>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>>>
>>> spark.core.connection.ack.wait.timeout 600
>>> spark.executor.heartbeatInterval   60s
>>> spark.executor.memory  32g
>>> spark.mesos.coarse false
>>> spark.network.timeout  600s
>>> spark.shuffle.blockTransferService netty
>>> spark.shuffle.consolidateFiles true
>>> spark.shuffle.file.buffer  1m
>>> spark.shuffle.io.maxRetries6
>>> spark.shuffle.manager  sort
>>>
>>> The join is currently configured with spark.sql.shuffle.partitions=1000
>>> but that doesn't seem to help. Would increasing the partitions help ? Is
>>> there a formula to determine an approximate partitions number value for a
>>> join ?
>>> Any help with this job would be appreciated !
>>>
>>> cheers,
>>> Tom
>>>
>>
>>
>


Re: How to avoid shuffle errors for a large join ?

2015-08-29 Thread Reynold Xin
Can you try 1.5? This should work much, much better in 1.5 out of the box.

For 1.4, I think you'd want to turn on sort-merge-join, which is off by
default. However, the sort-merge join in 1.4 can still trigger a lot of
garbage, making it slower. SMJ performance is probably 5x - 1000x better in
1.5 for your case.


On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak tom...@gmail.com wrote:

 I'm getting errors like Removing executor with no recent heartbeats 
 Missing an output location for shuffle errors for a large SparkSql join
 (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
 configure the job to avoid them.

 The initial stage completes fine with some 30k tasks on a cluster with 70
 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
 the shuffle stage first waits 30min in the scheduling phase according to
 the UI, and then dies with the mentioned errors.

 I can see in the GC logs that the executors reach their memory limits (32g
 per executor, 2 workers per machine) and can't allocate any more stuff in
 the heap. Fwiw, the top 10 in the memory use histogram are:

 num #instances #bytes  class name
 --
1: 24913959511958700560
  scala.collection.immutable.HashMap$HashMap1
2: 251085327 8034730464  scala.Tuple2
3: 243694737 5848673688  java.lang.Float
4: 231198778 5548770672  java.lang.Integer
5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
6:  72191582 2310130624
  scala.collection.immutable.HashMap$HashTrieMap
7:  74114058 1778737392  java.lang.Long
8:   6059103  779203840  [Ljava.lang.Object;
9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
   10: 34749   70122104  [B

 Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

 spark.core.connection.ack.wait.timeout 600
 spark.executor.heartbeatInterval   60s
 spark.executor.memory  32g
 spark.mesos.coarse false
 spark.network.timeout  600s
 spark.shuffle.blockTransferService netty
 spark.shuffle.consolidateFiles true
 spark.shuffle.file.buffer  1m
 spark.shuffle.io.maxRetries6
 spark.shuffle.manager  sort

 The join is currently configured with spark.sql.shuffle.partitions=1000
 but that doesn't seem to help. Would increasing the partitions help ? Is
 there a formula to determine an approximate partitions number value for a
 join ?
 Any help with this job would be appreciated !

 cheers,
 Tom



Re: DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.

2015-08-21 Thread Reynold Xin
You've probably hit this bug:
https://issues.apache.org/jira/browse/SPARK-7180

It's fixed in Spark 1.4.1+. Try setting spark.serializer.extraDebugInfo to
false and see if it goes away.


On Fri, Aug 21, 2015 at 3:37 AM, Eugene Morozov evgeny.a.moro...@gmail.com
wrote:

 Hi,

 I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm
 trying to save my data frame to parquet.
 The issue I'm stuck looks like serialization tries to do pretty weird
 thing: tries to write to an empty array.

 The last (through stack trace) line of spark code that leads to exception
 is in method SerializationDebugger.visitSerializable(o: Object, stack:
 List[String]): List[String].
 desc.getObjFieldValues(finalObj, objFieldValues)

 The reason it does so, is because finalObj is
 org.apache.spark.sql.execution.Project and objFieldValues is an empty
 array! As a result there are two fields to read from the Project instance
 object (happens in java.io.ObjectStreamClass), but there is an empty array
 to read into.

 A little bit of code with debug info:
 private def visitSerializable(o: Object, stack: List[String]):
 List[String] = {
 val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project,
 desc: org.apache.spark.sql.execution.Project
   val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0:
 SparkPlan, 1: Project]
   var i = 0 //i: 0
   while (i  slotDescs.length) {
 val slotDesc = slotDescs(i) //slotDesc:
 org.apache.spark.sql.execution.SparkPlan
 if (slotDesc.hasWriteObjectMethod) {
   // TODO: Handle classes that specify writeObject method.
 } else {
   val fields: Array[ObjectStreamField] = slotDesc.getFields
 //fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled]
   val objFieldValues: Array[Object] = new
 Array[Object](slotDesc.getNumObjFields) //objFieldValues:
 java.lang.Object[0]
   val numPrims = fields.length - objFieldValues.length //numPrims:
 1
   desc.getObjFieldValues(finalObj, objFieldValues) //leads to
 exception

 So it looks like it gets objFieldValues array from the SparkPlan object,
 but uses it to receive values from Project object.

 Here is the schema of my data frame
 root
  |-- Id: long (nullable = true)
  |-- explodes: struct (nullable = true)
  ||-- Identifiers: array (nullable = true)
  |||-- element: struct (containsNull = true)
  ||||-- Type: array (nullable = true)
  |||||-- element: string (containsNull = true)
  |-- Identifiers: struct (nullable = true)
  ||-- Type: array (nullable = true)
  |||-- element: string (containsNull = true)
  |-- Type2: string (nullable = true)
  |-- Type: string (nullable = true)

 Actual stack trace is:
 org.apache.spark.SparkException: Task not serializable
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
 at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
 at
 com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at
 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
 at
 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at
 org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
 at
 org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at
 org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at
 org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
 at
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
 at
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
 at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
 at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
 at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
 Caused by: 

Re: Memory allocation error with Spark 1.5

2015-08-05 Thread Reynold Xin
In Spark 1.5, we have a new way to manage memory (part of Project
Tungsten). The default unit of memory allocation is 64MB, which is way too
high when you have 1G of memory allocated in total and have more than 4
threads.

We will reduce the default page size before releasing 1.5.  For now, you
can just reduce spark.buffer.pageSize variable to a lower value (e.g. 16m).

https://github.com/apache/spark/blob/702aa9d7fb16c98a50e046edfd76b8a7861d0391/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala#L125

On Wed, Aug 5, 2015 at 9:25 AM, Alexis Seigneurin aseigneu...@ippon.fr
wrote:

 Hi,

 I'm receiving a memory allocation error with a recent build of Spark 1.5:

 java.io.IOException: Unable to acquire 67108864 bytes of memory
 at
 org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:348)
 at
 org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:398)
 at
 org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92)
 at
 org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174)
 at
 org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146)
 at
 org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126)


 The issue appears when joining 2 datasets. One with 6084 records, the
 other one with 200 records. I'm expecting to receive 200 records in the
 result.

 I'm using a homemade build prepared from branch-1.5 with commit ID
 eedb996. I have run mvn -DskipTests clean install to generate that
 build.

 Apart from that, I'm using Java 1.7.0_51 and Maven 3.3.3.

 I've prepared a test case that can be built and executed very easily (data
 files are included in the repo):
 https://github.com/aseigneurin/spark-testcase

 One thing to note is that the issue arises when the master is set to
 local[*] but not when set to local. Both options work without problem
 with Spark 1.4, though.

 Any help will be greatly appreciated!

 Many thanks,
 Alexis



Re: Grouping runs of elements in a RDD

2015-06-30 Thread Reynold Xin
Try mapPartitions, which gives you an iterator, and you can produce an
iterator back.


On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote:

 Hi all,

 I have a problem where I have a RDD of elements:

 Item1 Item2 Item3 Item4 Item5 Item6 ...

 and I want to run a function over them to decide which runs of elements to
 group together:

 [Item1 Item2] [Item3] [Item4 Item5 Item6] ...

 Technically, I could use aggregate to do this, but I would have to use a
 List of List of T which would produce a very large collection in memory.

 Is there an easy way to accomplish this?  e.g.,, it would be nice to have
 a version of aggregate where the combination function can return a complete
 group that is added to the new RDD and an incomplete group which is passed
 to the next call of the reduce function.

 Thanks,
 RJ



Re: Building scaladoc using build/sbt unidoc failure

2015-06-13 Thread Reynold Xin
Try build/sbt clean first.


On Tue, May 26, 2015 at 4:45 PM, Justin Yip yipjus...@prediction.io wrote:

 Hello,

 I am trying to build scala doc from the 1.4 branch. But it failed due to
 [error] (sql/compile:compile) java.lang.AssertionError: assertion failed:
 List(object package$DebugNode, object package$DebugNode)

 I followed the instruction on github
 https://github.com/apache/spark/tree/branch-1.4/docs and used the
 following command:

 $ build/sbt unidoc

 Please see attachment for detailed error. Did I miss anything?

 Thanks.

 Justin


 *unidoc_error.txt* (30K) Download Attachment
 http://apache-spark-user-list.1001560.n3.nabble.com/attachment/23044/0/unidoc_error.txt

 --
 View this message in context: Building scaladoc using build/sbt unidoc
 failure
 http://apache-spark-user-list.1001560.n3.nabble.com/Building-scaladoc-using-build-sbt-unidoc-failure-tp23044.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Exception when using CLUSTER BY or ORDER BY

2015-06-12 Thread Reynold Xin
Tom,

Can you file a JIRA and attach a small reproducible test case if possible?


On Tue, May 19, 2015 at 1:50 PM, Thomas Dudziak tom...@gmail.com wrote:

 Under certain circumstances that I haven't yet been able to isolate, I get
 the following error when doing a HQL query using HiveContext (Spark 1.3.1
 on Mesos, fine-grained mode). Is this a known problem or should I file a
 JIRA for it ?


 org.apache.spark.SparkException: Can only zip RDDs with same number of 
 elements in each partition
   at 
 org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:746)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
   at 
 org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:56)
   at 
 org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:259)
   at 
 org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:257)
   at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
   at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
   at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:64)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)




Re: rdd.sample() methods very slow

2015-05-22 Thread Reynold Xin
You can do something like this:

val myRdd = ...

val rddSampledByPartition = PartitionPruningRDD.create(myRdd, i =
Random.nextDouble()  0.1)  // this samples 10% of the partitions

rddSampledByPartition.mapPartitions { iter = iter.take(10) }  // take the
first 10 elements out of each partition



On Thu, May 21, 2015 at 11:36 AM, Sean Owen so...@cloudera.com wrote:

 If sampling whole partitions is sufficient (or a part of a partition),
 sure you could mapPartitionsWithIndex and decide whether to process a
 partition at all based on its # and skip the rest. That's much faster.

 On Thu, May 21, 2015 at 7:07 PM, Wang, Ningjun (LNG-NPV)
 ningjun.w...@lexisnexis.com wrote:
  I don't need to be 100% randome. How about randomly pick a few
 partitions and return all docs in those partitions? Is
  rdd.mapPartitionsWithIndex() the right method to use to just process a
 small portion of partitions?
 
  Ningjun

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: DataFrame Column Alias problem

2015-05-22 Thread Reynold Xin
In 1.4 it actually shows col1 by default.

In 1.3, you can add col1 to the output, i.e.

df.groupBy($col1).agg($col1, count($col1).as(c)).show()


On Thu, May 21, 2015 at 11:22 PM, SLiZn Liu sliznmail...@gmail.com wrote:

 However this returns a single column of c, without showing the original
 col1.
 ​

 On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha sriharsha@gmail.com
 wrote:

 df.groupBy($col1).agg(count($col1).as(c)).show

 On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com
 wrote:

 Hi Spark Users Group,

 I’m doing groupby operations on my DataFrame *df* as following, to get
 count for each value of col1:

  df.groupBy(col1).agg(col1 - count).show // I don't know if I 
  should write like this.
 col1   COUNT(col1#347)
 aaa2
 bbb4
 ccc4
 ...
 and more...

 As I ‘d like to sort by the resulting count, with
 .sort(COUNT(col1#347)), but the column name of the count result
 obviously cannot be retrieved in advance. Intuitively one might consider
 acquire column name by column index in a fashion of R’s DataFrame, except
 Spark doesn’t support. I have Googled *spark agg alias* and so forth,
 and checked DataFrame.as in Spark API, neither helped on this. Am I the
 only one who had ever got stuck on this issue or anything I have missed?

 REGARDS,
 Todd Leo
 ​





Re: Why is RDD to PairRDDFunctions only via implicits?

2015-05-22 Thread Reynold Xin
I'm not sure if it is possible to overload the map function twice, once for
just KV pairs, and another for K and V separately.


On Fri, May 22, 2015 at 10:26 AM, Justin Pihony justin.pih...@gmail.com
wrote:

 This ticket https://issues.apache.org/jira/browse/SPARK-4397 improved
 the RDD API, but it could be even more discoverable if made available via
 the API directly. I assume this was originally an omission that now needs
 to be kept for backwards compatibility, but would any of the repo owners be
 open to making this more discoverable to the point of API docs and tab
 completion (while keeping both binary and source compatibility)?


 class PairRDD extends RDD{
   pair methods
 }

 RDD{
   def map[K: ClassTag, V: ClassTag](f: T = (K,V)):PairRDD[K,V]
 }

 As long as the implicits remain, then compatibility remains, but now it is
 explicit in the docs on how to get a PairRDD and in tab completion.

 Thoughts?

 Justin Pihony



Re: Is the AMP lab done next February?

2015-05-11 Thread Reynold Xin
Relaying an answer from AMP director Mike Franklin:

One year into the lab we got a 5 yr Expeditions in Computing Award as part
of the White House Big Data initiative in 2012, so we extend the lab for a
year.   We intend to start winding it down at the end of 2016, while
supporting existing projects and students who will be finishing up.   The
AMPLab faculty are starting discussions this summer about what research
challenges we'd like to tackle next, and how best to organize to do so.

An interesting thing to note is that the Spark project started at about
this point in the AMPLab predecessor project (RADLab) so we have a track
record of being able to make these transitions.


On Sat, May 9, 2015 at 8:43 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 From  my StackOverflow question
 
 https://stackoverflow.com/questions/29593139/is-the-amp-lab-done-next-february
 
 :

 Is there a way to track whether Berkeley's AMP lab will indeed shutdown
 next
 year?

 From their about site:

 The AMPLab is a five-year collaborative effort at UC Berkeley and it
 was
 started in February 2011.

 So, I was curious if this was a hard date, or if it will be extended (or
 has
 already been extended?)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-the-AMP-lab-done-next-February-tp22832.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue

2015-05-11 Thread Reynold Xin
Looks like it is spending a lot of time doing hash probing. It could be a
number of the following:

1. hash probing itself is inherently expensive compared with rest of your
workload

2. murmur3 doesn't work well with this key distribution

3. quadratic probing (triangular sequence) with a power-of-2 hash table
works really badly for this workload.

One way to test this is to instrument changeValue function to store the
number of probes in total, and then log it. We added this probing
capability to the new Bytes2Bytes hash map we built. We should consider
just having it being reported as some built-in metrics to facilitate
debugging.

https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214






On Mon, May 11, 2015 at 4:21 AM, Michal Haris michal.ha...@visualdna.com
wrote:

 This is the stack trace of the worker thread:


 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)

 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
 org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)

 org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
 org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
 org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 org.apache.spark.scheduler.Task.run(Task.scala:64)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)

 On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote:

 Do you have any more specific profiling data that you can share?  I'm
 curious to know where AppendOnlyMap.changeValue is being called from.

 On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com
 wrote:

 +dev
 On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote:

  Just wanted to check if somebody has seen similar behaviour or knows
 what
  we might be doing wrong. We have a relatively complex spark application
  which processes half a terabyte of data at various stages. We have
 profiled
  it in several ways and everything seems to point to one place where
 90% of
  the time is spent:  AppendOnlyMap.changeValue. The job scales and is
  relatively faster than its map-reduce alternative but it still feels
 slower
  than it should be. I am suspecting too much spill but I haven't seen
 any
  improvement by increasing number of partitions to 10k. Any idea would
 be
  appreciated.
 
  --
  Michal Haris
  Technical Architect
  direct line: +44 (0) 207 749 0229
  www.visualdna.com | t: +44 (0) 207 734 7033,
 





 --
 Michal Haris
 Technical Architect
 direct line: +44 (0) 207 749 0229
 www.visualdna.com | t: +44 (0) 207 734 7033,



Re: [SparkSQL 1.4.0] groupBy columns are always nullable?

2015-05-11 Thread Reynold Xin
Thanks for catching this. I didn't read carefully enough.

It'd make sense to have the udaf result be non-nullable, if the exprs are
indeed non-nullable.

On Mon, May 11, 2015 at 1:32 PM, Olivier Girardot ssab...@gmail.com wrote:

 Hi Haopu,
 actually here `key` is nullable because this is your input's schema :

 scala result.printSchema
 root
 |-- key: string (nullable = true)
 |-- SUM(value): long (nullable = true)

 scala df.printSchema
 root
 |-- key: string (nullable = true)
 |-- value: long (nullable = false)

 I tried it with a schema where the key is not flagged as nullable, and the
 schema is actually respected. What you can argue however is that SUM(value)
 should also be not nullable since value is not nullable.

 @rxin do you think it would be reasonable to flag the Sum aggregation
 function as nullable (or not) depending on the input expression's schema ?

 Regards,

 Olivier.
 Le lun. 11 mai 2015 à 22:07, Reynold Xin r...@databricks.com a écrit :

 Not by design. Would you be interested in submitting a pull request?

 On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote:

 I try to get the result schema of aggregate functions using DataFrame
 API.

 However, I find the result field of groupBy columns are always nullable
 even the source field is not nullable.

 I want to know if this is by design, thank you! Below is the simple code
 to show the issue.

 ==

   import sqlContext.implicits._
   import org.apache.spark.sql.functions._
   case class Test(key: String, value: Long)
   val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF

   val result = df.groupBy(key).agg($key, sum(value))

   // From the output, you can see the key column is nullable, why??
   result.printSchema
 //root
 // |-- key: string (nullable = true)
 // |-- SUM(value): long (nullable = true)


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: [SparkSQL 1.4.0] groupBy columns are always nullable?

2015-05-11 Thread Reynold Xin
Not by design. Would you be interested in submitting a pull request?

On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote:

 I try to get the result schema of aggregate functions using DataFrame
 API.

 However, I find the result field of groupBy columns are always nullable
 even the source field is not nullable.

 I want to know if this is by design, thank you! Below is the simple code
 to show the issue.

 ==

   import sqlContext.implicits._
   import org.apache.spark.sql.functions._
   case class Test(key: String, value: Long)
   val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF

   val result = df.groupBy(key).agg($key, sum(value))

   // From the output, you can see the key column is nullable, why??
   result.printSchema
 //root
 // |-- key: string (nullable = true)
 // |-- SUM(value): long (nullable = true)


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




[ANNOUNCE] Ending Java 6 support in Spark 1.5 (Sep 2015)

2015-05-05 Thread Reynold Xin
Hi all,

We will drop support for Java 6 starting Spark 1.5, tentative scheduled to
be released in Sep 2015. Spark 1.4, scheduled to be released in June 2015,
will be the last minor release that supports Java 6. That is to say:

Spark 1.4.x (~ Jun 2015): will work with Java 6, 7, 8.

Spark 1.5+ (~ Sep 2015): will NOT work with Java 6, but work with Java 7, 8.


PS: Oracle ended Java 6 updates in Feb 2013.


Re: How to distribute Spark computation recipes

2015-04-27 Thread Reynold Xin
The code themselves are the recipies, no?


On Mon, Apr 27, 2015 at 2:49 AM, Olivier Girardot 
o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 I know that any RDD is related to its SparkContext and the associated
 variables (broadcast, accumulators), but I'm looking for a way to
 serialize/deserialize full RDD computations ?

 @rxin Spark SQL is, in a way, already doing this but the parsers are
 private[sql], is there any way to reuse this work to get Logical/Physical
 Plans in  out of Spark ?

 Regards,

 Olivier.



Re: how to make a spark cluster ?

2015-04-21 Thread Reynold Xin
Actually if you only have one machine, just use the Spark local mode.

Just download the Spark tarball, untar it, set master to local[N], where N
= number of cores. You are good to go. There is no setup of job tracker or
Hadoop.


On Mon, Apr 20, 2015 at 3:21 PM, haihar nahak harihar1...@gmail.com wrote:

 Thank you :)

 On Mon, Apr 20, 2015 at 4:46 PM, Jörn Franke jornfra...@gmail.com wrote:

 Hi, If you have just one physical machine then I would try out Docker
 instead of a full VM (would be waste of memory and CPU).

 Best regards
 Le 20 avr. 2015 00:11, hnahak harihar1...@gmail.com a écrit :

 Hi All,

 I've big physical machine with 16 CPUs , 256 GB RAM, 20 TB Hard disk. I
 just
 need to know what should be the best solution to make a spark cluster?

 If I need to process TBs of data then
 1. Only one machine, which contain driver, executor, job tracker and task
 tracker everything.
 2. create 4 VMs and each VM should consist 4 CPUs , 64 GB RAM
 3. create 8 VMs and each VM should consist 2 CPUs , 32 GB RAM each

 please give me your views/suggestions



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-spark-cluster-tp22563.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 {{{H2N}}}-(@:



Re: Updating a Column in a DataFrame

2015-04-21 Thread Reynold Xin
You can use

df.withColumn(a, df.b)

to make column a having the same value as column b.


On Mon, Apr 20, 2015 at 3:38 PM, ARose ashley.r...@telarix.com wrote:

 In my Java application, I want to update the values of a Column in a given
 DataFrame. However, I realize DataFrames are immutable, and therefore
 cannot
 be updated by conventional means. Is there a workaround for this sort of
 transformation? If so, can someone provide an example?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Updating-a-Column-in-a-DataFrame-tp22578.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Column renaming after DataFrame.groupBy

2015-04-21 Thread Reynold Xin
You can use the more verbose syntax:

d.groupBy(_1).agg(d(_1), sum(_1).as(sum_1), sum(_2).as(sum_2))

On Tue, Apr 21, 2015 at 1:06 AM, Justin Yip yipjus...@prediction.io wrote:

 Hello,

 I would like rename a column after aggregation. In the following code, the
 column name is SUM(_1#179), is there a way to rename it to a more
 friendly name?

 scala val d = sqlContext.createDataFrame(Seq((1, 2), (1, 3), (2, 10)))
 scala d.groupBy(_1).sum().printSchema
 root
  |-- _1: integer (nullable = false)
  |-- SUM(_1#179): long (nullable = true)
  |-- SUM(_2#180): long (nullable = true)

 Thanks.

 Justin

 --
 View this message in context: Column renaming after DataFrame.groupBy
 http://apache-spark-user-list.1001560.n3.nabble.com/Column-renaming-after-DataFrame-groupBy-tp22586.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Why does the HDFS parquet file generated by Spark SQL have different size with those on Tachyon?

2015-04-17 Thread Reynold Xin
It's because you did a repartition -- which rearranges all the data.

Parquet uses all kinds of compression techniques such as dictionary
encoding and run-length encoding, which would result in the size difference
when the data is ordered different.

On Fri, Apr 17, 2015 at 4:51 AM, zhangxiongfei zhangxiongfei0...@163.com
wrote:

 Hi,
 I did some tests on Parquet Files with Spark SQL DataFrame API.
 I generated 36 gzip compressed parquet files by Spark SQL and stored them
 on Tachyon,The size of each file is about  222M.Then read them with below
 code.
 val tfs
 =sqlContext.parquetFile(tachyon://datanode8.bitauto.dmp:19998/apps/tachyon/adClick);
 Next,I just save this DataFrame onto HDFS with below code.It will generate
 36 parquet files too,but the size of each file is about 265M

 tfs.repartition(36).saveAsParquetFile(/user/zhangxf/adClick-parquet-tachyon);
 My question is Why the files on HDFS has different size with those on
 Tachyon even though they come from the same original data?


 Thanks
 Zhang Xiongfei




Re: Expected behavior for DataFrame.unionAll

2015-04-14 Thread Reynold Xin
I think what happened was applying the narrowest possible type. Type
widening is required, and as a result, the narrowest type is string between
a string and an int.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L144


On Tue, Apr 7, 2015 at 5:00 PM, Justin Yip yipjus...@prediction.io wrote:

 Hello,

 I am experimenting with DataFrame. I tried to construct two DataFrames
 with:
 1. case class A(a: Int, b: String)
 scala adf.printSchema()
 root
  |-- a: integer (nullable = false)
  |-- b: string (nullable = true)

 2. case class B(a: String, c: Int)
 scala bdf.printSchema()
 root
  |-- a: string (nullable = true)
  |-- c: integer (nullable = false)


 Then I unioned the these two DataFrame with the unionAll function, and I
 get the following schema. It is kind of a mixture of A and B.

 scala val udf = adf.unionAll(bdf)
 scala udf.printSchema()
 root
  |-- a: string (nullable = false)
  |-- b: string (nullable = true)

 The unionAll documentation says it behaves like the SQL UNION ALL
 function. However, unioning incompatible types is not well defined for SQL.
 Is there any expected behavior for unioning incompatible data frames?

 Thanks.

 Justin



Re: [Spark1.3] UDF registration issue

2015-04-14 Thread Reynold Xin
You can do this:

strLen = udf((s: String) = s.length())
cleanProcessDF.withColumn(dii,strLen(col(di)))

(You might need to play with the type signature a little bit to get it to
compile)


On Fri, Apr 10, 2015 at 11:30 AM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 Hi, I'm running into some trouble trying to register a UDF:

 scala sqlContext.udf.register(strLen, (s: String) = s.length())
 res22: org.apache.spark.sql.UserDefinedFunction = 
 UserDefinedFunction(function1,IntegerType)

 scala cleanProcessDF.withColumn(dii,strLen(col(di)))
 console:33: error: not found: value strLen
   cleanProcessDF.withColumn(dii,strLen(col(di)))

 ​

 Where cleanProcessDF is a dataframe
 Is my syntax wrong? Or am I missing an import of some sort?



Manning looking for a co-author for the GraphX in Action book

2015-04-13 Thread Reynold Xin
Hi all,

Manning (the publisher) is looking for a co-author for the GraphX in Action
book. The book currently has one author (Michael Malak), but they are
looking for a co-author to work closely with Michael and improve the
writings and make it more consumable.

Early access page for the book: http://www.manning.com/malak/

Let me know if you are interested in that. Cheers.


Re: ArrayBuffer within a DataFrame

2015-04-03 Thread Reynold Xin
There is already an explode function on DataFrame btw

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L712

I think something like this would work. You might need to play with the
type.

df.explode(arrayBufferColumn) { x = x }



On Fri, Apr 3, 2015 at 6:43 AM, Denny Lee denny.g@gmail.com wrote:

 Thanks Dean - fun hack :)

 On Fri, Apr 3, 2015 at 6:11 AM Dean Wampler deanwamp...@gmail.com wrote:

 A hack workaround is to use flatMap:

 rdd.flatMap{ case (date, array) = for (x - array) yield (date, x) }

 For those of you who don't know Scala, the for comprehension iterates
 through the ArrayBuffer, named array and yields new tuples with the date
 and each element. The case expression to the left of the = pattern matches
 on the input tuples.

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 2, 2015 at 10:45 PM, Denny Lee denny.g@gmail.com wrote:

 Thanks Michael - that was it!  I was drawing a blank on this one for
 some reason - much appreciated!


 On Thu, Apr 2, 2015 at 8:27 PM Michael Armbrust mich...@databricks.com
 wrote:

 A lateral view explode using HiveQL.  I'm hopping to add explode
 shorthand directly to the df API in 1.4.

 On Thu, Apr 2, 2015 at 7:10 PM, Denny Lee denny.g@gmail.com
 wrote:

 Quick question - the output of a dataframe is in the format of:

 [2015-04, ArrayBuffer(A, B, C, D)]

 and I'd like to return it as:

 2015-04, A
 2015-04, B
 2015-04, C
 2015-04, D

 What's the best way to do this?

 Thanks in advance!







Re: Can I call aggregate UDF in DataFrame?

2015-04-01 Thread Reynold Xin
You totally can.

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L792

There is also an attempt at adding stddev here already:
https://github.com/apache/spark/pull/5228



On Thu, Mar 26, 2015 at 12:37 AM, Haopu Wang hw...@qilinsoft.com wrote:

 Specifically there are only 5 aggregate functions in class
 org.apache.spark.sql.GroupedData: sum/max/min/mean/count.

 Can I plugin a function to calculate stddev?

 Thank you!


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Build fails on 1.3 Branch

2015-03-29 Thread Reynold Xin
I pushed a hotfix to the branch. Should work now.


On Sun, Mar 29, 2015 at 9:23 AM, Marty Bower sp...@mjhb.com wrote:

 Yes, that worked - thank you very much.



 On Sun, Mar 29, 2015 at 9:05 AM Ted Yu yuzhih...@gmail.com wrote:

 Jenkins build failed too:


 https://amplab.cs.berkeley.edu/jenkins/view/Spark/job/Spark-1.3-Maven-with-YARN/HADOOP_PROFILE=hadoop-2.4,label=centos/326/consoleFull

 For the moment, you can apply the following change:

 diff --git
 a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
 b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpression
 index a53ae97..7ae4b38 100644
 ---
 a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
 +++
 b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
 @@ -18,7 +18,7 @@
  package org.apache.spark.sql

  import org.apache.spark.sql.catalyst.expressions.NamedExpression
 -import org.apache.spark.sql.catalyst.plans.logical.{Project, NoRelation}
 +import org.apache.spark.sql.catalyst.plans.logical.{Project}
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.test.TestSQLContext
  import org.apache.spark.sql.test.TestSQLContext.implicits._

 Cheers

 On Sun, Mar 29, 2015 at 8:48 AM, mjhb sp...@mjhb.com wrote:

 I tried pulling the source and building for the first time, but cannot
 get
 past the object NoRelation is not a member of package
 org.apache.spark.sql.catalyst.plans.logical error below on the 1.3
 branch.
 I can build the 1.2 branch.

 I have tried with both -Dscala-2.11 and 2.10 (after running the
 appropriate
 change-version-to-2.1#.sh), and with different combinations of hadoop
 flags.

 Relevant excerpts below - full build output available at
 http://mjhb.com/tmp/build-spark-1.3.out or
 http://mjhb.com/tmp/build-spark-1.3.out.gz

 $ git branch
 * branch-1.3
 $ build/mvn -e -X -DskipTests clean package
 Apache Maven 3.0.5
 Maven home: /usr/share/maven
 Java version: 1.7.0_76, vendor: Oracle Corporation
 Java home: /usr/lib/jvm/java-7-oracle/jre
 Default locale: en_US, platform encoding: UTF-8
 OS name: linux, version: 3.16.0-33-generic, arch: amd64, family:
 unix

 snipped...

 [error]

 /home/marty/work/spark-1.3-maint/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala:21:
 object NoRelation is not a member of package
 org.apache.spark.sql.catalyst.plans.logical
 [error] import org.apache.spark.sql.catalyst.plans.logical.{Project,
 NoRelation}
 [error]^
 [error] one error found
 [debug] Compilation failed (CompilerInterface)
 [error] Compile failed at Mar 29, 2015 7:52:54 AM [5.793s]




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Build-fails-on-1-3-Branch-tp22275.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark disk-to-disk

2015-03-23 Thread Reynold Xin
Maybe implement a very simple function that uses the Hadoop API to read in
based on file names (i.e. parts)?

On Mon, Mar 23, 2015 at 10:55 AM, Koert Kuipers ko...@tresata.com wrote:

 there is a way to reinstate the partitioner, but that requires
 sc.objectFile to read exactly what i wrote, which means sc.objectFile
 should never split files on reading (a feature of hadoop file inputformat
 that gets in the way here).

 On Mon, Mar 23, 2015 at 1:39 PM, Koert Kuipers ko...@tresata.com wrote:

 i just realized the major limitation is that i lose partitioning info...

 On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin r...@databricks.com wrote:


 On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com
 wrote:

 so finally i can resort to:
 rdd.saveAsObjectFile(...)
 sc.objectFile(...)
 but that seems like a rather broken abstraction.


 This seems like a fine solution to me.






Re: spark disk-to-disk

2015-03-22 Thread Reynold Xin
On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com wrote:

 so finally i can resort to:
 rdd.saveAsObjectFile(...)
 sc.objectFile(...)
 but that seems like a rather broken abstraction.


This seems like a fine solution to me.


Re: SchemaRDD: SQL Queries vs Language Integrated Queries

2015-03-10 Thread Reynold Xin
They should have the same performance, as they are compiled down to the
same execution plan.

Note that starting in Spark 1.3, SchemaRDD is renamed DataFrame:

https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html



On Tue, Mar 10, 2015 at 2:13 PM, Cesar Flores ces...@gmail.com wrote:


 I am new to the SchemaRDD class, and I am trying to decide in using SQL
 queries or Language Integrated Queries (
 https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD
 ).

 Can someone tell me what is the main difference between the two
 approaches, besides using different syntax? Are they interchangeable? Which
 one has better performance?


 Thanks a lot
 --
 Cesar Flores



Help vote for Spark talks at the Hadoop Summit

2015-02-24 Thread Reynold Xin
Hi all,

The Hadoop Summit uses community choice voting to decide which talks to
feature. It would be great if the community could help vote for Spark talks
so that Spark has a good showing at this event. You can make three votes on
each track. Below I've listed 3 talks that are important to Spark's
roadmap. Please give 3 votes to each of the following talks.

Committer Track: Lessons from Running Ultra Large Scale Spark Workloads on
Hadoop
https://hadoopsummit.uservoice.com/forums/283260-committer-track/suggestions/7074016

Data Science track: DataFrames: large-scale data science on Hadoop data
with Spark
https://hadoopsummit.uservoice.com/forums/283261-data-science-and-hadoop/suggestions/7074147

Future of Hadoop track: Online Approximate OLAP in SparkSQL
https://hadoopsummit.uservoice.com/forums/283266-the-future-of-apache-hadoop/suggestions/7074424


Thanks!


Re: Spark 1.3 dataframe documentation

2015-02-24 Thread Reynold Xin
The official documentation will be posted when 1.3 is released (early
March).

Right now, you can build the docs yourself by running jekyll build in
docs. Alternatively, just look at dataframe,py as Ted pointed out.


On Tue, Feb 24, 2015 at 6:56 AM, Ted Yu yuzhih...@gmail.com wrote:

 Have you looked at python/pyspark/sql/dataframe.py ?

 Cheers

 On Tue, Feb 24, 2015 at 6:12 AM, poiuytrez guilla...@databerries.com
 wrote:

 Hello,

 I have built Spark 1.3. I can successfully use the dataframe api.
 However, I
 am not able to find its api documentation in Python. Do you know when the
 documentation will be available?

 Best Regards,
 poiuytrez



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-dataframe-documentation-tp21789.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: New guide on how to write a Spark job in Clojure

2015-02-24 Thread Reynold Xin
Thanks for sharing, Chris.

On Tue, Feb 24, 2015 at 4:39 AM, Christian Betz 
christian.b...@performance-media.de wrote:

  Hi all,

  Maybe some of you are interested: I wrote a new guide on how to start
 using Spark from Clojure. The tutorial covers

- setting up a project,
- doing REPL- or Test Driven Development of Spark jobs
- Running Spark jobs locally.

 Just read it on
 https://gorillalabs.github.io/sparkling/articles/tfidf_guide.html.

  Comments (and Pull requests) are very welcome.

  Sincerly

  Chris




Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Reynold Xin
BTW we merged this today: https://github.com/apache/spark/pull/4640

This should allow us in the future to address column by name in a Row.


On Mon, Feb 16, 2015 at 11:39 AM, Michael Armbrust mich...@databricks.com
wrote:

 I can unpack the code snippet a bit:

 caper.select('ran_id) is the same as saying SELECT ran_id FROM table in
 SQL.  Its always a good idea to explicitly request the columns you need
 right before using them.  That way you are tolerant of any changes to the
 schema that might happen upstream.

 The next part .map { case Row(ranId: String) = ... } is doing an
 extraction to pull out the values of the row into typed variables.  This is
 the same as doing .map(row = row(0).asInstanceOf[String]) or .map(row =
 row.getString(0)), but I find this syntax easier to read since it lines
 up nicely with the select clause that comes right before it.  It's also
 less verbose especially when pulling out a bunch of columns.

 Regarding the differences between python and java/scala, part of this is
 just due to the nature of these language.  Since java/scala are statically
 typed, you will always have to explicitly say the type of the column you
 are extracting (the bonus here is they are much faster than python due to
 optimizations this strictness allows).  However, since its already a little
 more verbose, we decided not to have the more expensive ability to look up
 columns in a row by name, and instead go with a faster ordinal based API.
 We could revisit this, but its not currently something we are planning to
 change.

 Michael

 On Mon, Feb 16, 2015 at 11:04 AM, Eric Bell e...@ericjbell.com wrote:

  I am just learning scala so I don't actually understand what your code
 snippet is doing but thank you, I will learn more so I can figure it out.

 I am new to all of this and still trying to make the mental shift from
 normal programming to distributed programming, but it seems to me that the
 row object would know its own schema object that it came from and be able
 to ask its schema to transform a name to a column number. Am I missing
 something or is this just a matter of time constraints and this one just
 hasn't gotten into the queue yet?

 Baring that, do the schema classes provide methods for doing this? I've
 looked and didn't see anything.

 I've just discovered that the python implementation for SchemaRDD does in
 fact allow for referencing by name and column. Why is this provided in the
 python implementation but not scala or java implementations?

 Thanks,

 --eric



 On 02/16/2015 10:46 AM, Michael Armbrust wrote:

 For efficiency the row objects don't contain the schema so you can't get
 the column by name directly.  I usually do a select followed by pattern
 matching. Something like the following:

  caper.select('ran_id).map { case Row(ranId: String) = }

 On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell e...@ericjbell.com wrote:

 Is it possible to reference a column from a SchemaRDD using the column's
 name instead of its number?

 For example, let's say I've created a SchemaRDD from an avro file:

 val sqlContext = new SQLContext(sc)
 import sqlContext._
 val caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper)
 caper.registerTempTable(caper)

 scala caper
 res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
 SchemaRDD.scala:108
 == Query Plan ==
 == Physical Plan ==
 PhysicalRDD
 [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
 scala

 Now I want to access fields, and of course the normal thing to do is to
 use a field name, not a field number.

 scala val kv = caper.map(r = (r.ran_id, r))
 console:23: error: value ran_id is not a member of
 org.apache.spark.sql.Row
val kv = caper.map(r = (r.ran_id, r))

 How do I do this?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Spark ML pipeline

2015-02-11 Thread Reynold Xin
Yes. Next release (Spark 1.3) is coming out end of Feb / early Mar.

On Wed, Feb 11, 2015 at 7:22 AM, Jianguo Li flyingfromch...@gmail.com
wrote:

 Hi,

 I really like the pipeline in the spark.ml in Spark1.2 release. Will
 there be more machine learning algorithms implemented for the pipeline
 framework in the next major release? Any idea when the next major release
 comes out?

 Thanks,

 Jianguo



Re: Which version to use for shuffle service if I'm going to run multiple versions of Spark

2015-02-10 Thread Reynold Xin
I think we made the binary protocol compatible across all versions, so you
should be fine with using any one of them. 1.2.1 is probably the best since
it is the most recent stable release.

On Tue, Feb 10, 2015 at 8:43 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 I need to use branch-1.2 and sometimes master builds of Spark for my
 project. However the officially supported Spark version by our Hadoop admin
 is only 1.2.0.

 So, my question is which version/build of spark-yarn-shuffle.jar should I
 use that works for all four versions? (1.2.0, 1.2.1, 1.2,2, 1.3.0)

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Re: 2GB limit for partitions?

2015-02-03 Thread Reynold Xin
cc dev list


How are you saving the data? There are two relevant 2GB limits:

1. Caching

2. Shuffle


For caching, a partition is turned into a single block.

For shuffle, each map partition is partitioned into R blocks, where R =
number of reduce tasks. It is unlikely a shuffle block  2G, although it
can still happen.

I think the 2nd problem is easier to fix than the 1st, because we can
handle that in the network transport layer. It'd require us to divide the
transfer of a very large block into multiple smaller blocks.



On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

 Michael,

 you are right, there is definitely some limit at 2GB.  Here is a trivial
 example to demonstrate it:

 import org.apache.spark.storage.StorageLevel
 val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
 Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
 d.count()

 It gives the same error you are observing.  I was under the same
 impression as Sean about the limits only being on blocks, not partitions --
 but clearly that isn't the case here.

 I don't know the whole story yet, but I just wanted to at least let you
 know you aren't crazy :)
 At the very least this suggests that you might need to make smaller
 partitions for now.

 Imran


 On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000 files it
 will go
  into and whose value is what I will want to stick into the file.
  2) Partition the data and use the body of mapPartition to open a file
 and
  save the data.
 
  My apologies, this is actually embedded in a bigger mess, so I won't
 post
  it.
 
  However, I get errors telling me that there is an
 IllegalArgumentException:
  Size exceeds Inter.MAX_VALUE, with 

Re: How to access OpenHashSet in my standalone program?

2015-01-14 Thread Reynold Xin
The reason is fairly simple actually - we don't want to commit to
maintaining the specific APIs exposed. If we expose OpenHashSet, we will
have to always keep that in Spark and not change the API.

On Tue, Jan 13, 2015 at 12:39 PM, Tae-Hyuk Ahn ahn@gmail.com wrote:

 Thank, Josh and Reynold. Yes, I can incorporate it to my package and
 use it. But I am still wondering why you designed such useful
 functions as private.

 On Tue, Jan 13, 2015 at 3:33 PM, Reynold Xin r...@databricks.com wrote:
  It is not meant to be a public API. If you want to use it, maybe copy the
  code out of the package and put it in your own project.
 
  On Fri, Jan 9, 2015 at 7:19 AM, Tae-Hyuk Ahn ahn@gmail.com wrote:
 
  Hi,
 
  I would like to use OpenHashSet
  (org.apache.spark.util.collection.OpenHashSet) in my standalone
 program. I
  can import it without error as:
 
  import org.apache.spark.util.collection.OpenHashSet
 
  However, when I try to access it, I am getting an error as:
 
  object OpenHashSet in package collection cannot be accessed in package
  org.apache.spark.util.collection
 
  I suspect this error is caused by private object. I am wondering how I
 can
  use this object in my standalone program.
 
  Thanks,
 
  Ted
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-OpenHashSet-in-my-standalone-program-tp21065.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 



Re: saveAsTextFile just uses toString and Row@37f108

2015-01-13 Thread Reynold Xin
It is just calling RDD's saveAsTextFile. I guess we should really override
the saveAsTextFile in SchemaRDD (or make Row.toString comma separated).

Do you mind filing a JIRA ticket and copy me?


On Tue, Jan 13, 2015 at 12:03 AM, Kevin Burton bur...@spinn3r.com wrote:

 This is almost funny.

 I want to dump a computation to the filesystem.  It’s just the result of a
 Spark SQL call reading the data from Cassandra.

 The problem is that it looks like it’s just calling toString() which is
 useless.

 The example is below.

 I assume this is just a (bad) bug.

 org.apache.spark.sql.api.java.Row@37f108

 org.apache.spark.sql.api.java.Row@d0426773

 org.apache.spark.sql.api.java.Row@38c9d3


 --

 Founder/CEO Spinn3r.com
 Location: *San Francisco, CA*
 blog: http://burtonator.wordpress.com
 … or check out my Google+ profile
 https://plus.google.com/102718274791889610666/posts
 http://spinn3r.com




Re: Creating RDD from only few columns of a Parquet file

2015-01-13 Thread Reynold Xin
What query did you run? Parquet should have predicate and column pushdown,
i.e. if your query only needs to read 3 columns, then only 3 will be read.

On Mon, Jan 12, 2015 at 10:20 PM, Ajay Srivastava 
a_k_srivast...@yahoo.com.invalid wrote:

 Hi,
 I am trying to read a parquet file using -

 val parquetFile = sqlContext.parquetFile(people.parquet)

 There is no way to specify that I am interested in reading only some columns 
 from disk. For example, If the parquet file has 10 columns and want to read 
 only 3 columns from disk.

 We have done an experiment -
 Table1 - Parquet file containing 10 columns
 Table2 - Parquet file containing only 3 columns which were used in query

 The time taken by query on table1 and table2 shows huge difference. Query on 
 Table1 takes more than double of time taken on table2 which makes me think 
 that spark is reading all the columns from disk in case of table1 when it 
 needs only 3 columns.

 How should I make sure that it reads only 3 of 10 columns from disk ?


 Regards,
 Ajay




Re: How to access OpenHashSet in my standalone program?

2015-01-13 Thread Reynold Xin
It is not meant to be a public API. If you want to use it, maybe copy the
code out of the package and put it in your own project.

On Fri, Jan 9, 2015 at 7:19 AM, Tae-Hyuk Ahn ahn@gmail.com wrote:

 Hi,

 I would like to use OpenHashSet
 (org.apache.spark.util.collection.OpenHashSet) in my standalone program. I
 can import it without error as:

 import org.apache.spark.util.collection.OpenHashSet

 However, when I try to access it, I am getting an error as:

 object OpenHashSet in package collection cannot be accessed in package
 org.apache.spark.util.collection

 I suspect this error is caused by private object. I am wondering how I can
 use this object in my standalone program.

 Thanks,

 Ted



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-OpenHashSet-in-my-standalone-program-tp21065.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark on teradata?

2015-01-08 Thread Reynold Xin
Depending on your use cases. If the use case is to extract small amount of
data out of teradata, then you can use the JdbcRDD and soon a jdbc input
source based on the new Spark SQL external data source API.



On Wed, Jan 7, 2015 at 7:14 AM, gen tang gen.tan...@gmail.com wrote:

 Hi,

 I have a stupid question:
 Is it possible to use spark on Teradata data warehouse, please? I read
 some news on internet which say yes. However, I didn't find any example
 about this issue

 Thanks in advance.

 Cheers
 Gen




Re: Confused why I'm losing workers/executors when writing a large file to S3

2014-11-13 Thread Reynold Xin
Darin,

You might want to increase these config options also:

spark.akka.timeout 300
spark.storage.blockManagerSlaveTimeoutMs 30

On Thu, Nov 13, 2014 at 11:31 AM, Darin McBeath ddmcbe...@yahoo.com.invalid
 wrote:

 For one of my Spark jobs, my workers/executors are dying and leaving the
 cluster.

 On the master, I see something like the following in the log file.  I'm
 surprised to see the '60' seconds in the master log below because I
 explicitly set it to '600' (or so I thought) in my spark job (see below).
 This is happening at the end of my job when I'm trying to persist a large
 RDD (probably around 300+GB) back to S3 (in 256 partitions).  My cluster
 consists of 6 r3.8xlarge machines.  The job successfully works when I'm
 outputting 100GB or 200GB.

 If  you have any thoughts/insights, it would be appreciated.

 Thanks.

 Darin.

 Here is where I'm setting the 'timeout' in my spark job.

 SparkConf conf = new SparkConf()
 .setAppName(SparkSync Application)
 .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
 .set(spark.rdd.compress,true)
 .set(spark.core.connection.ack.wait.timeout,600);
 ​
 On the master, I see the following in the log file.

 4/11/13 17:20:39 WARN master.Master: Removing
 worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 because we got no
 heartbeat in 60 seconds
 14/11/13 17:20:39 INFO master.Master: Removing worker
 worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 on
 ip-10-35-184-232.ec2.internal:51877
 14/11/13 17:20:39 INFO master.Master: Telling app of lost executor: 2

 On a worker, I see something like the following in the log file.

 14/11/13 17:20:58 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
   at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
   at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
   at scala.concurrent.Await$.result(package.scala:107)
   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176)
   at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362)
 14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: I/O exception
 (java.net.SocketException) caught when processing request: Broken pipe
 14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:32 INFO httpclient.HttpMethodDirector: I/O exception
 (java.net.SocketException) caught when processing request: Broken pipe
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception
 (java.io.IOException) caught when processing request: Resetting to invalid
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:58 WARN utils.RestUtils: Retried connection 6 times, which
 exceeds the maximum retry count of 5
 14/11/13 17:21:58 WARN utils.RestUtils: Retried connection 6 times, which
 exceeds the maximum retry count of 5
 14/11/13 17:22:57 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]




Re: Breaking the previous large-scale sort record with Spark

2014-11-05 Thread Reynold Xin
Hi all,

We are excited to announce that the benchmark entry has been reviewed by
the Sort Benchmark committee and Spark has officially won the Daytona
GraySort contest in sorting 100TB of data.

Our entry tied with a UCSD research team building high performance systems
and we jointly set a new world record. This is an important milestone for
the project, as it validates the amount of engineering work put into Spark
by the community.

As Matei said, For an engine to scale from these multi-hour petabyte batch
jobs down to 100-millisecond streaming and interactive queries is quite
uncommon, and it's thanks to all of you folks that we are able to make this
happen.

Updated blog post:
http://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html




On Fri, Oct 10, 2014 at 7:54 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Hi folks,

 I interrupt your regularly scheduled user / dev list to bring you some
 pretty cool news for the project, which is that we've been able to use
 Spark to break MapReduce's 100 TB and 1 PB sort records, sorting data 3x
 faster on 10x fewer nodes. There's a detailed writeup at
 http://databricks.com/blog/2014/10/10/spark-breaks-previous-large-scale-sort-record.html.
 Summary: while Hadoop MapReduce held last year's 100 TB world record by
 sorting 100 TB in 72 minutes on 2100 nodes, we sorted it in 23 minutes on
 206 nodes; and we also scaled up to sort 1 PB in 234 minutes.

 I want to thank Reynold Xin for leading this effort over the past few
 weeks, along with Parviz Deyhim, Xiangrui Meng, Aaron Davidson and Ali
 Ghodsi. In addition, we'd really like to thank Amazon's EC2 team for
 providing the machines to make this possible. Finally, this result would of
 course not be possible without the many many other contributions, testing
 and feature requests from throughout the community.

 For an engine to scale from these multi-hour petabyte batch jobs down to
 100-millisecond streaming and interactive queries is quite uncommon, and
 it's thanks to all of you folks that we are able to make this happen.

 Matei
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: OOM with groupBy + saveAsTextFile

2014-11-02 Thread Reynold Xin
None of your tuning will help here because the problem is actually the way
you are saving the output. If you take a look at the stacktrace, it is
trying to build a single string that is too large for the VM to allocate
memory. The VM is actually not running out of memory, but rather, JVM
cannot support a single String so large.

I suspect this is due to the fact that the value in your key, value pair
after group by is too long (maybe it concatenates every single record). Do
you really want to save the key, value output this way using a text file?
Maybe you can write them out as multiple strings rather than a single super
giant string.




On Sat, Nov 1, 2014 at 9:52 PM, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:


 Hi,

 FYI as follows.  Could you post your heap size settings as well your Spark
 app code?

 Regards
 Arthur

 3.1.3 Detail Message: Requested array size exceeds VM limitThe detail
 message Requested array size exceeds VM limit indicates that the
 application (or APIs used by that application) attempted to allocate an
 array that is larger than the heap size. For example, if an application
 attempts to allocate an array of 512MB but the maximum heap size is 256MB
 then OutOfMemoryError will be thrown with the reason Requested array size
 exceeds VM limit. In most cases the problem is either a configuration
 issue (heap size too small), or a bug that results in an application
 attempting to create a huge array, for example, when the number of elements
 in the array are computed using an algorithm that computes an incorrect
 size.”




 On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Resurfacing the thread. Oom shouldn't be the norm for a common groupby /
 sort use case in a framework that is leading in sorting bench marks? Or is
 there something fundamentally wrong in the usage?
 On 02-Nov-2014 1:06 am, Bharath Ravi Kumar reachb...@gmail.com wrote:

 Hi,

 I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD
 of count ~ 100 million. The data size is 20GB and groupBy results in an RDD
 of 1061 keys with values being IterableTuple4String, Integer, Double,
 String. The job runs on 3 hosts in a standalone setup with each host's
 executor having 100G RAM and 24 cores dedicated to it. While the groupBy
 stage completes successfully with ~24GB of shuffle write, the
 saveAsTextFile fails after repeated retries with each attempt failing due
 to an out of memory error *[1]*. I understand that a few partitions may
 be overloaded as a result of the groupBy and I've tried the following
 config combinations unsuccessfully:

 1) Repartition the initial rdd (44 input partitions but 1061 keys) across
 1061 paritions and have max cores = 3 so that each key is a logical
 partition (though many partitions will end up on very few hosts), and each
 host likely runs saveAsTextFile on a single key at a time due to max cores
 = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.

 2) Leave max cores unspecified, set the level of parallelism to 72, and
 leave number of partitions unspecified (in which case the # input
 partitions was used, which is 44)
 Since I do not intend to cache RDD's, I have set
 spark.storage.memoryFraction=0.2 in both cases.

 My understanding is that if each host is processing a single logical
 partition to saveAsTextFile and is reading from other hosts to write out
 the RDD, it is unlikely that it would run out of memory. My interpretation
 of the spark tuning guide is that the degree of parallelism has little
 impact in case (1) above since max cores = number of hosts. Can someone
 explain why there are still OOM's with 100G being available? On a related
 note, intuitively (though I haven't read the source), it appears that an
 entire key-value pair needn't fit into memory of a single host for
 saveAsTextFile since a single shuffle read from a remote can be written to
 HDFS before the next remote read is carried out. This way, not all data
 needs to be collected at the same time.

 Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the
 tuning guide and even as per Datastax's spark introduction), there may need
 to be more documentation around the internals of spark to help users take
 better informed tuning decisions with parallelism, max cores, number
 partitions and other tunables. Is there any ongoing effort on that front?

 Thanks,
 Bharath


 *[1]* OOM stack trace and logs
 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID
 1264, proc1.foo.bar.com): java.lang.OutOfMemoryError: Requested array
 size exceeds VM limit
 java.util.Arrays.copyOf(Arrays.java:3326)

 java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)

 java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)

 java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
 

Re: something about rdd.collect

2014-10-14 Thread Reynold Xin
Hi Randy,

collect essentially transfers all the data to the driver node. You definitely 
wouldn’t want to collect 200 million words. It is a pretty large number and you 
can run out of memory on your driver with that much data.

-- 
Reynold Xin


On October 14, 2014 at 9:26:13 PM, randylu (randyl...@gmail.com) wrote:

My code is as follows:  
*documents.flatMap(case words = words.map(w = (w, 1))).reduceByKey(_ +  
_).collect()*  
In driver's log, reduceByKey() is finished, but collect() seems always in  
run, just can't be finished.  
In additional, there are about 200,000,000 words needs to be collected. Is  
it too large for collect()? But when words decreases to 1,000,000, it's  
okay!  
Anyone can explain it? Thanks a lot.  



--  
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/something-about-rdd-collect-tp16451.html
  
Sent from the Apache Spark User List mailing list archive at Nabble.com.  

-  
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org  
For additional commands, e-mail: user-h...@spark.apache.org  



Re: SQL queries fail in 1.2.0-SNAPSHOT

2014-09-29 Thread Reynold Xin
Hi Daoyuan,

Do you mind applying this patch and look at the exception again?

https://github.com/apache/spark/pull/2580


It has also been merged in master so if you pull from master, you should
have that.


On Mon, Sep 29, 2014 at 1:17 AM, Wang, Daoyuan daoyuan.w...@intel.com
wrote:

  Hi all,



 I had some of my queries run on 1.1.0-SANPSHOT at commit b1b20301(Aug 24),
 but in current master branch, my queries would not work. I looked into the
 stderr file in executor, and find the following lines:



 14/09/26 16:52:46 ERROR nio.NioBlockTransferService: Exception handling
 buffer message

 java.io.IOException: Channel not open for writing - cannot extend file to
 required size

 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:868)

 at
 org.apache.spark.network.FileSegmentManagedBuffer.nioByteBuffer(ManagedBuffer.scala:73)

 at
 org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:203)

 at org.apache.spark.network.nio.NioBlockTransferService.org
 $apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:179)

 at
 org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:149)

 at
 org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:149)

 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

 at
 org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)

 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

 at
 org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)

 at org.apache.spark.network.nio.NioBlockTransferService.org
 $apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:149)

 at
 org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:68)

 at
 org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:68)

 at org.apache.spark.network.nio.ConnectionManager.org
 $apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:677)

 at
 org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:515)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 at java.lang.Thread.run(Thread.java:745)



 Shuffle compress was turned off, because I encountered parsing_error when
 with shuffle compress. Even after I set the native library path, I got
 errors when uncompress in snappy. With shuffle compress turned off, I still
 get message above in some of my nodes, and the others would have a message
 that saying ack is not received after 60s. Any one get some ideas? Thanks
 for your help!



 Thanks,

 Daoyuan Wang



Spark meetup on Oct 15 in NYC

2014-09-28 Thread Reynold Xin
Hi Spark users and developers,

Some of the most active Spark developers (including Matei Zaharia, Michael
Armbrust, Joseph Bradley, TD, Paco Nathan, and me) will be in NYC for
Strata NYC. We are working with the Spark NYC meetup group and Bloomberg to
host a meetup event. This might be the event with the highest committer to
user ratio in the history of user meetups. Look forward to meeting more
users in NYC.

You can sign up for that here:
http://www.meetup.com/Spark-NYC/events/209271842/

Cheers.


Re: driver memory management

2014-09-28 Thread Reynold Xin
The storage fraction only limits the amount of memory used for storage. It
doesn't actually limit anything else. I.e you can use all the memory if you
want in collect.

On Sunday, September 28, 2014, Brad Miller bmill...@eecs.berkeley.edu
wrote:

 Hi All,

 I am interested to collect() a large RDD so that I can run a learning
 algorithm on it.  I've noticed that when I don't increase
 SPARK_DRIVER_MEMORY I can run out of memory. I've also noticed that it
 looks like the same fraction of memory is reserved for storage on the
 driver as on the worker nodes, and that the web UI doesn't show any storage
 usage on the driver.  Since that memory is reserved for storage, it seems
 possible that it is not being used towards the collection of my RDD.

 Is there a way to configure the memory management (
 spark.storage.memoryFraction, spark.shuffle.memoryFraction) for the
 driver separately from the workers?

 Is there any reason to leave space for shuffle or storage on the driver?
 It seems like I never see either of these used on the web UI, although I
 may not be interpreting the UI correctly or my jobs may not trigger the use
 case.

 For context, I am using PySpark (so much of my processing happens outside
 of the allocated memory in Java) and running the Spark 1.1.0 release
 binaries.

 best,
 -Brad



Re: collect on hadoopFile RDD returns wrong results

2014-09-18 Thread Reynold Xin
This is due to the HadoopRDD (and also the underlying Hadoop InputFormat)
reuse objects to avoid allocation. It is sort of tricky to fix. However, in
most cases you can clone the records to make sure you are not collecting
the same object over and over again.

https://issues.apache.org/jira/browse/SPARK-1018

http://mail-archives.apache.org/mod_mbox/spark-user/201308.mbox/%3ccaf_kkpzrq4otyqvwcoc6plaz9x9_sfo33u4ysatki5ptqoy...@mail.gmail.com%3E


On Thu, Sep 18, 2014 at 12:43 AM, vasiliy zadonsk...@gmail.com wrote:

 i posted an example in previous post. Tested on spark 1.0.2, 1.2.0-SNAPSHOT
 and 1.1.0 for hadoop 2.4.0 on Windows and Linux servers with hortonworks
 hadoop 2.4 in local[4] mode. Any ideas about this spark behavior ?


 Akhil Das-2 wrote
  Can you dump out a small piece of data? while doing rdd.collect and
  rdd.foreach(println)
 
  Thanks
  Best Regards
 
  On Wed, Sep 17, 2014 at 12:26 PM, vasiliy lt;

  zadonskiyd@

  gt; wrote:
 
  it also appears in streaming hdfs fileStream
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-hadoopFile-RDD-returns-wrong-results-tp14368p14425.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail:

  user-unsubscribe@.apache

  For additional commands, e-mail:

  user-help@.apache

 
 





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/collect-on-hadoopFile-RDD-returns-wrong-results-tp14368p14527.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Comparative study

2014-07-08 Thread Reynold Xin
Not sure exactly what is happening but perhaps there are ways to
restructure your program for it to work better. Spark is definitely able to
handle much, much larger workloads.

I've personally run a workload that shuffled 300 TB of data. I've also ran
something that shuffled 5TB/node and stuffed my disks fairly full that the
file system is close to breaking.

We can definitely do a better job in Spark to make it output more
meaningful diagnosis and more robust with partitions of data that don't fit
in memory though. A lot of the work in the next few releases will be on
that.



On Tue, Jul 8, 2014 at 10:04 AM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 I'll respond for Dan.

 Our test dataset was a total of 10 GB of input data (full production
 dataset for this particular dataflow would be 60 GB roughly).

 I'm not sure what the size of the final output data was but I think it was
 on the order of 20 GBs for the given 10 GB of input data. Also, I can say
 that when we were experimenting with persist(DISK_ONLY), the size of all
 RDDs on disk was around 200 GB, which gives a sense of overall transient
 memory usage with no persistence.

 In terms of our test cluster, we had 15 nodes. Each node had 24 cores and
 2 workers each. Each executor got 14 GB of memory.

 -Suren



 On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  When you say large data sets, how large?
 Thanks


 On 07/07/2014 01:39 PM, Daniel Siegmann wrote:

  From a development perspective, I vastly prefer Spark to MapReduce. The
 MapReduce API is very constrained; Spark's API feels much more natural to
 me. Testing and local development is also very easy - creating a local
 Spark context is trivial and it reads local files. For your unit tests you
 can just have them create a local context and execute your flow with some
 test data. Even better, you can do ad-hoc work in the Spark shell and if
 you want that in your production code it will look exactly the same.

  Unfortunately, the picture isn't so rosy when it gets to production. In
 my experience, Spark simply doesn't scale to the volumes that MapReduce
 will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN
 would be better, but I haven't had the opportunity to try them. I find jobs
 tend to just hang forever for no apparent reason on large data sets (but
 smaller than what I push through MapReduce).

  I am hopeful the situation will improve - Spark is developing quickly -
 but if you have large amounts of data you should proceed with caution.

  Keep in mind there are some frameworks for Hadoop which can hide the
 ugly MapReduce with something very similar in form to Spark's API; e.g.
 Apache Crunch. So you might consider those as well.

  (Note: the above is with Spark 1.0.0.)



 On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com
 wrote:

  Hello Experts,



 I am doing some comparative study on the below:



 Spark vs Impala

 Spark vs MapREduce . Is it worth migrating from existing MR
 implementation to Spark?





 Please share your thoughts and expertise.





 Thanks,
 Santosh

 --

 This message is for the designated recipient only and may contain
 privileged, proprietary, or otherwise confidential information. If you have
 received it in error, please notify the sender immediately and delete the
 original. Any other use of the e-mail by you is prohibited. Where allowed
 by local law, electronic communications with Accenture and its affiliates,
 including e-mail and instant messaging (including content), may be scanned
 by our systems for the purposes of information security and assessment of
 internal compliance with Accenture policy.

 __

 www.accenture.com




 --
  Daniel Siegmann, Software Developer
 Velos
  Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




Re: Powered By Spark: Can you please add our org?

2014-07-08 Thread Reynold Xin
I added you to the list. Cheers.



On Mon, Jul 7, 2014 at 6:19 PM, Alex Gaudio adgau...@gmail.com wrote:

 Hi,

 Sailthru is also using Spark.  Could you please add us to the Powered By
 Spark https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark 
 page
 when you have a chance?

 Organization Name: Sailthru
 URL: www.sailthru.com
 Short Description: Our data science platform uses Spark to build
 predictive models and recommendation systems for marketing automation and
 personalization


 Thank you!
 Alex



openstack swift integration with Spark

2014-06-13 Thread Reynold Xin
If you are interested in openstack/swift integration with Spark, please
drop me a line. We are looking into improving the integration.

Thanks.


<    1   2