Re: Linkage error - duplicate class definition

2015-01-20 Thread Hafiz Mujadid
Have you solved this problem?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Linkage-error-duplicate-class-definition-tp9482p21260.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



IF statement doesn't work in Spark-SQL?

2015-01-20 Thread Xuelin Cao
Hi,

  I'm trying to migrate some hive scripts to Spark-SQL. However, I
found some statement is incompatible in Spark-sql.

  Here is my SQL. And the same SQL works fine in HIVE environment.

SELECT
  *if(ad_user_id>1000, 1000, ad_user_id) as user_id*
FROM
  ad_search_keywords

 What I found is, the parser reports error on the "*if*" statement:

No function to evaluate expression. type: AttributeReference, tree:
ad_user_id#4


 Anyone have any idea about this?


RE: IF statement doesn't work in Spark-SQL?

2015-01-20 Thread Wang, Daoyuan
Hi Xuelin,

What version of Spark are you using?

Thanks,
Daoyuan

From: Xuelin Cao [mailto:xuelincao2...@gmail.com]
Sent: Tuesday, January 20, 2015 5:22 PM
To: User
Subject: IF statement doesn't work in Spark-SQL?


Hi,

  I'm trying to migrate some hive scripts to Spark-SQL. However, I found 
some statement is incompatible in Spark-sql.

  Here is my SQL. And the same SQL works fine in HIVE environment.

SELECT
  if(ad_user_id>1000, 1000, ad_user_id) as user_id
FROM
  ad_search_keywords

 What I found is, the parser reports error on the "if" statement:

No function to evaluate expression. type: AttributeReference, tree: ad_user_id#4


 Anyone have any idea about this?




Re: Does Spark automatically run different stages concurrently when possible?

2015-01-20 Thread Sean Owen
You can persist the RDD in (2) right after it is created. It will not
cause it to be persisted immediately, but rather the first time it is
materialized. If you persist after (3) is calculated, then it will be
re-calculated (and persisted) after (4) is calculated.

On Tue, Jan 20, 2015 at 3:38 AM, Ashish  wrote:
> Sean,
>
> A related question. When to persist the RDD after step 2 or after Step
> 3 (nothing would happen before step 3 I assume)?
>
> On Mon, Jan 19, 2015 at 5:17 PM, Sean Owen  wrote:
>> From the OP:
>>
>> (1) val lines = Import full dataset using sc.textFile
>> (2) val ABonly = Filter out all rows from "lines" that are not of type A or B
>> (3) val processA = Process only the A rows from ABonly
>> (4) val processB = Process only the B rows from ABonly
>>
>> I assume that 3 and 4 are actions, or else nothing happens here at all.
>>
>> When 3 is invoked, it will compute 1, then 2, then 3. 4 will happen
>> after 3, and may even cause 1 and 2 to happen again if nothing is
>> persisted.
>>
>> You can invoke 3 and 4 in parallel on the driver if you like. That's
>> fine. But actions are blocking in the driver.
>>
>>
>>
>> On Mon, Jan 19, 2015 at 8:21 AM, davidkl  wrote:
>>> Hi Jon, I am looking for an answer for a similar question in the doc now, so
>>> far no clue.
>>>
>>> I would need to know what is spark behaviour in a situation like the example
>>> you provided, but taking into account also that there are multiple
>>> partitions/workers.
>>>
>>> I could imagine it's possible that different spark workers are not
>>> synchronized in terms of waiting for each other to progress to the next
>>> step/stage for the partitions of data they get assigned, while I believe in
>>> streaming they would wait for the current batch to complete before they
>>> start working on a new one.
>>>
>>> In the code I am working on, I need to make sure a particular step is
>>> completed (in all workers, for all partitions) before next transformation is
>>> applied.
>>>
>>> Would be great if someone could clarify or point to these issues in the doc!
>>> :-)
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>
> --
> thanks
> ashish
>
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: IF statement doesn't work in Spark-SQL?

2015-01-20 Thread Xuelin Cao
Hi, I'm using Spark 1.2


On Tue, Jan 20, 2015 at 5:59 PM, Wang, Daoyuan 
wrote:

>  Hi Xuelin,
>
>
>
> What version of Spark are you using?
>
>
>
> Thanks,
>
> Daoyuan
>
>
>
> *From:* Xuelin Cao [mailto:xuelincao2...@gmail.com]
> *Sent:* Tuesday, January 20, 2015 5:22 PM
> *To:* User
> *Subject:* IF statement doesn't work in Spark-SQL?
>
>
>
>
>
> Hi,
>
>
>
>   I'm trying to migrate some hive scripts to Spark-SQL. However, I
> found some statement is incompatible in Spark-sql.
>
>
>
>   Here is my SQL. And the same SQL works fine in HIVE environment.
>
>
>
> SELECT
>
>   *if(ad_user_id>1000, 1000, ad_user_id) as user_id*
>
> FROM
>
>   ad_search_keywords
>
>
>
>  What I found is, the parser reports error on the "*if*" statement:
>
>
>
> No function to evaluate expression. type: AttributeReference, tree:
> ad_user_id#4
>
>
>
>
>
>  Anyone have any idea about this?
>
>
>
>
>


Re: Does Spark automatically run different stages concurrently when possible?

2015-01-20 Thread Ashish
Thanks Sean !

On Tue, Jan 20, 2015 at 3:32 PM, Sean Owen  wrote:
> You can persist the RDD in (2) right after it is created. It will not
> cause it to be persisted immediately, but rather the first time it is
> materialized. If you persist after (3) is calculated, then it will be
> re-calculated (and persisted) after (4) is calculated.
>
> On Tue, Jan 20, 2015 at 3:38 AM, Ashish  wrote:
>> Sean,
>>
>> A related question. When to persist the RDD after step 2 or after Step
>> 3 (nothing would happen before step 3 I assume)?
>>
>> On Mon, Jan 19, 2015 at 5:17 PM, Sean Owen  wrote:
>>> From the OP:
>>>
>>> (1) val lines = Import full dataset using sc.textFile
>>> (2) val ABonly = Filter out all rows from "lines" that are not of type A or 
>>> B
>>> (3) val processA = Process only the A rows from ABonly
>>> (4) val processB = Process only the B rows from ABonly
>>>
>>> I assume that 3 and 4 are actions, or else nothing happens here at all.
>>>
>>> When 3 is invoked, it will compute 1, then 2, then 3. 4 will happen
>>> after 3, and may even cause 1 and 2 to happen again if nothing is
>>> persisted.
>>>
>>> You can invoke 3 and 4 in parallel on the driver if you like. That's
>>> fine. But actions are blocking in the driver.
>>>
>>>
>>>
>>> On Mon, Jan 19, 2015 at 8:21 AM, davidkl  wrote:
 Hi Jon, I am looking for an answer for a similar question in the doc now, 
 so
 far no clue.

 I would need to know what is spark behaviour in a situation like the 
 example
 you provided, but taking into account also that there are multiple
 partitions/workers.

 I could imagine it's possible that different spark workers are not
 synchronized in terms of waiting for each other to progress to the next
 step/stage for the partitions of data they get assigned, while I believe in
 streaming they would wait for the current batch to complete before they
 start working on a new one.

 In the code I am working on, I need to make sure a particular step is
 completed (in all workers, for all partitions) before next transformation 
 is
 applied.

 Would be great if someone could clarify or point to these issues in the 
 doc!
 :-)




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>>
>> --
>> thanks
>> ashish
>>
>> Blog: http://www.ashishpaliwal.com/blog
>> My Photo Galleries: http://www.pbase.com/ashishpaliwal



-- 
thanks
ashish

Blog: http://www.ashishpaliwal.com/blog
My Photo Galleries: http://www.pbase.com/ashishpaliwal

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark on YARN: java.lang.ClassCastException SerializedLambda to org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1

2015-01-20 Thread thanhtien522
Hi, I try to run Spark on YARN cluster by set master as yarn-client on java
code. It works fine with count task by not working with other command. 
It threw ClassCastException:

java.lang.ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.fun$2 of
type org.apache.spark.api.java.function.Function2 in instance of
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1
at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I'm using Hadoop 2.5.1 and Spark Assembly 1.2.0 for Hadoop 2.4.0
And submit job in local.

Any ideal for this issue?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-YARN-java-lang-ClassCastException-SerializedLambda-to-org-apache-spark-api-java-function-Fu1-tp21261.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark streaming kinesis issue

2015-01-20 Thread Hafiz Mujadid
Hi experts!

I am using spark streaming with kinesis and getting this exception while
running program 

 java.lang.LinkageError: loader (instance of 
org/apache/spark/executor/ChildExecutorURLClassLoader$userClassLoader$):
attempted  duplicate class definition for name:
"com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitialPositionInStream"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)



Is there any solution to this problem?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kinesis-issue-tp21262.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: IF statement doesn't work in Spark-SQL?

2015-01-20 Thread DEVAN M.S.
Which context are you using HiveContext or SQLContext ? Can you try
with HiveContext
??


Devan M.S. | Research Associate | Cyber Security | AMRITA VISHWA
VIDYAPEETHAM | Amritapuri | Cell +919946535290 |


On Tue, Jan 20, 2015 at 3:49 PM, Xuelin Cao  wrote:

>
> Hi, I'm using Spark 1.2
>
>
> On Tue, Jan 20, 2015 at 5:59 PM, Wang, Daoyuan 
> wrote:
>
>>  Hi Xuelin,
>>
>>
>>
>> What version of Spark are you using?
>>
>>
>>
>> Thanks,
>>
>> Daoyuan
>>
>>
>>
>> *From:* Xuelin Cao [mailto:xuelincao2...@gmail.com]
>> *Sent:* Tuesday, January 20, 2015 5:22 PM
>> *To:* User
>> *Subject:* IF statement doesn't work in Spark-SQL?
>>
>>
>>
>>
>>
>> Hi,
>>
>>
>>
>>   I'm trying to migrate some hive scripts to Spark-SQL. However, I
>> found some statement is incompatible in Spark-sql.
>>
>>
>>
>>   Here is my SQL. And the same SQL works fine in HIVE environment.
>>
>>
>>
>> SELECT
>>
>>   *if(ad_user_id>1000, 1000, ad_user_id) as user_id*
>>
>> FROM
>>
>>   ad_search_keywords
>>
>>
>>
>>  What I found is, the parser reports error on the "*if*" statement:
>>
>>
>>
>> No function to evaluate expression. type: AttributeReference, tree:
>> ad_user_id#4
>>
>>
>>
>>
>>
>>  Anyone have any idea about this?
>>
>>
>>
>>
>>
>
>


spark streaming with checkpoint

2015-01-20 Thread balu.naren
I am a beginner to spark streaming. So have a basic doubt regarding
checkpoints. My use case is to calculate the no of unique users by day. I am
using reduce by key and window for this. Where my window duration is 24
hours and slide duration is 5 mins. I am updating the processed record to
mongodb. Currently I am replace the existing record each time. But I see the
memory is slowly increasing over time and kills the process after 1 and 1/2
hours(in aws small instance). The DB write after the restart clears all the
old data. So I understand checkpoint is the solution for this. But my doubt
is
  
 What should my check point duration be..? As per documentation it says 5-10
times of slide duration. But I need the data of entire day. So it is ok to
keep 24 hrs.
Where ideally should the checkpoint be..? Initially when I receive the
stream or just before the window operation or after the data reduction has
taken place.

Appreciate your help.
Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-with-checkpoint-tp21263.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: IF statement doesn't work in Spark-SQL?

2015-01-20 Thread DEVAN M.S.
Add one more library

libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.2.0"


val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

repalce sqlContext with hiveContext. Its working while using HiveContext
for me.



Devan M.S. | Research Associate | Cyber Security | AMRITA VISHWA
VIDYAPEETHAM | Amritapuri | Cell +919946535290 |


On Tue, Jan 20, 2015 at 4:45 PM, DEVAN M.S.  wrote:

> Which context are you using HiveContext or SQLContext ? Can you try with 
> HiveContext
> ??
>
>
> Devan M.S. | Research Associate | Cyber Security | AMRITA VISHWA
> VIDYAPEETHAM | Amritapuri | Cell +919946535290 |
>
>
> On Tue, Jan 20, 2015 at 3:49 PM, Xuelin Cao 
> wrote:
>
>>
>> Hi, I'm using Spark 1.2
>>
>>
>> On Tue, Jan 20, 2015 at 5:59 PM, Wang, Daoyuan 
>> wrote:
>>
>>>  Hi Xuelin,
>>>
>>>
>>>
>>> What version of Spark are you using?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Daoyuan
>>>
>>>
>>>
>>> *From:* Xuelin Cao [mailto:xuelincao2...@gmail.com]
>>> *Sent:* Tuesday, January 20, 2015 5:22 PM
>>> *To:* User
>>> *Subject:* IF statement doesn't work in Spark-SQL?
>>>
>>>
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>>   I'm trying to migrate some hive scripts to Spark-SQL. However, I
>>> found some statement is incompatible in Spark-sql.
>>>
>>>
>>>
>>>   Here is my SQL. And the same SQL works fine in HIVE environment.
>>>
>>>
>>>
>>> SELECT
>>>
>>>   *if(ad_user_id>1000, 1000, ad_user_id) as user_id*
>>>
>>> FROM
>>>
>>>   ad_search_keywords
>>>
>>>
>>>
>>>  What I found is, the parser reports error on the "*if*" statement:
>>>
>>>
>>>
>>> No function to evaluate expression. type: AttributeReference, tree:
>>> ad_user_id#4
>>>
>>>
>>>
>>>
>>>
>>>  Anyone have any idea about this?
>>>
>>>
>>>
>>>
>>>
>>
>>
>


Re: IF statement doesn't work in Spark-SQL?

2015-01-20 Thread Xuelin Cao
Hi,

 Yes, this is what I'm doing. I'm using hiveContext.hql() to run my
query.

  But, the problem still happens.



On Tue, Jan 20, 2015 at 7:24 PM, DEVAN M.S.  wrote:

> Add one more library
>
> libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.2.0"
>
>
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>
> repalce sqlContext with hiveContext. Its working while using HiveContext
> for me.
>
>
>
> Devan M.S. | Research Associate | Cyber Security | AMRITA VISHWA
> VIDYAPEETHAM | Amritapuri | Cell +919946535290 |
>
>
> On Tue, Jan 20, 2015 at 4:45 PM, DEVAN M.S.  wrote:
>
>> Which context are you using HiveContext or SQLContext ? Can you try with 
>> HiveContext
>> ??
>>
>>
>> Devan M.S. | Research Associate | Cyber Security | AMRITA VISHWA
>> VIDYAPEETHAM | Amritapuri | Cell +919946535290 |
>>
>>
>> On Tue, Jan 20, 2015 at 3:49 PM, Xuelin Cao 
>> wrote:
>>
>>>
>>> Hi, I'm using Spark 1.2
>>>
>>>
>>> On Tue, Jan 20, 2015 at 5:59 PM, Wang, Daoyuan 
>>> wrote:
>>>
  Hi Xuelin,



 What version of Spark are you using?



 Thanks,

 Daoyuan



 *From:* Xuelin Cao [mailto:xuelincao2...@gmail.com]
 *Sent:* Tuesday, January 20, 2015 5:22 PM
 *To:* User
 *Subject:* IF statement doesn't work in Spark-SQL?





 Hi,



   I'm trying to migrate some hive scripts to Spark-SQL. However, I
 found some statement is incompatible in Spark-sql.



   Here is my SQL. And the same SQL works fine in HIVE environment.



 SELECT

   *if(ad_user_id>1000, 1000, ad_user_id) as user_id*

 FROM

   ad_search_keywords



  What I found is, the parser reports error on the "*if*" statement:



 No function to evaluate expression. type: AttributeReference, tree:
 ad_user_id#4





  Anyone have any idea about this?





>>>
>>>
>>
>


Scala Spark SQL row object Ordinal Method Call Aliasing

2015-01-20 Thread Night Wolf
In Spark SQL we have Row objects which contain a list of fields that make
up a row. A Rowhas ordinal accessors such as .getInt(0) or getString(2).

Say ordinal 0 = ID and ordinal 1 = Name. It becomes hard to remember what
ordinal is what, making the code confusing.

Say for example I have the following code

def doStuff(row: Row) = {
  //extract some items from the row into a tuple;
  (row.getInt(0), row.getString(1)) //tuple of ID, Name}

The question becomes how could I create aliases for these fields in a Row
object?

I was thinking I could create methods which take a implicit Row object;

def id(implicit row: Row) = row.getInt(0)def name(implicit row: Row) =
row.getString(1)

I could then rewrite the above as;

def doStuff(implicit row: Row) = {
  //extract some items from the row into a tuple;
  (id, name) //tuple of ID, Name}

Is there a better/neater approach?


Cheers,

~NW


Can multiple streaming apps use the same checkpoint directory?

2015-01-20 Thread Ashic Mahtab
Hi,
For client mode spark submits of applications, we can do the following:

def createStreamingContext() = {
...
 val sc = new SparkContext(conf)
 // Create a StreamingContext with a 1 second batch size
 val ssc = new StreamingContext(sc, Seconds(1))
}
...
val ssc = StreamingContext.getOrCreate(checkPointdir, createStreamingContext _)

If the driver goes down, and we restart it, it'll pick up where it left off. My 
question is if multiple streaming apps are submitted through the same machine, 
can the share the same check point directory, or does each have to have its own?

Thanks,
Ashic.
  

Re: IF statement doesn't work in Spark-SQL?

2015-01-20 Thread DEVAN M.S.
Can you share your code ?


Devan M.S. | Research Associate | Cyber Security | AMRITA VISHWA
VIDYAPEETHAM | Amritapuri | Cell +919946535290 |


On Tue, Jan 20, 2015 at 5:03 PM, Xuelin Cao  wrote:

>
> Hi,
>
>  Yes, this is what I'm doing. I'm using hiveContext.hql() to run my
> query.
>
>   But, the problem still happens.
>
>
>
> On Tue, Jan 20, 2015 at 7:24 PM, DEVAN M.S.  wrote:
>
>> Add one more library
>>
>> libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.2.0"
>>
>>
>> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>
>> repalce sqlContext with hiveContext. Its working while using HiveContext
>> for me.
>>
>>
>>
>> Devan M.S. | Research Associate | Cyber Security | AMRITA VISHWA
>> VIDYAPEETHAM | Amritapuri | Cell +919946535290 |
>>
>>
>> On Tue, Jan 20, 2015 at 4:45 PM, DEVAN M.S.  wrote:
>>
>>> Which context are you using HiveContext or SQLContext ? Can you try
>>> with HiveContext ??
>>>
>>>
>>> Devan M.S. | Research Associate | Cyber Security | AMRITA VISHWA
>>> VIDYAPEETHAM | Amritapuri | Cell +919946535290 |
>>>
>>>
>>> On Tue, Jan 20, 2015 at 3:49 PM, Xuelin Cao 
>>> wrote:
>>>

 Hi, I'm using Spark 1.2


 On Tue, Jan 20, 2015 at 5:59 PM, Wang, Daoyuan 
 wrote:

>  Hi Xuelin,
>
>
>
> What version of Spark are you using?
>
>
>
> Thanks,
>
> Daoyuan
>
>
>
> *From:* Xuelin Cao [mailto:xuelincao2...@gmail.com]
> *Sent:* Tuesday, January 20, 2015 5:22 PM
> *To:* User
> *Subject:* IF statement doesn't work in Spark-SQL?
>
>
>
>
>
> Hi,
>
>
>
>   I'm trying to migrate some hive scripts to Spark-SQL. However, I
> found some statement is incompatible in Spark-sql.
>
>
>
>   Here is my SQL. And the same SQL works fine in HIVE environment.
>
>
>
> SELECT
>
>   *if(ad_user_id>1000, 1000, ad_user_id) as user_id*
>
> FROM
>
>   ad_search_keywords
>
>
>
>  What I found is, the parser reports error on the "*if*"
> statement:
>
>
>
> No function to evaluate expression. type: AttributeReference, tree:
> ad_user_id#4
>
>
>
>
>
>  Anyone have any idea about this?
>
>
>
>
>


>>>
>>
>


RE: Does Spark automatically run different stages concurrently when possible?

2015-01-20 Thread Bob Tiernay
I found the following to be a good discussion of the same topic:
http://apache-spark-user-list.1001560.n3.nabble.com/The-concurrent-model-of-spark-job-stage-task-td13083.html
 


> From: so...@cloudera.com
> Date: Tue, 20 Jan 2015 10:02:20 +
> Subject: Re: Does Spark automatically run different stages concurrently when 
> possible?
> To: paliwalash...@gmail.com
> CC: davidkl...@hotmail.com; user@spark.apache.org
> 
> You can persist the RDD in (2) right after it is created. It will not
> cause it to be persisted immediately, but rather the first time it is
> materialized. If you persist after (3) is calculated, then it will be
> re-calculated (and persisted) after (4) is calculated.
> 
> On Tue, Jan 20, 2015 at 3:38 AM, Ashish  wrote:
> > Sean,
> >
> > A related question. When to persist the RDD after step 2 or after Step
> > 3 (nothing would happen before step 3 I assume)?
> >
> > On Mon, Jan 19, 2015 at 5:17 PM, Sean Owen  wrote:
> >> From the OP:
> >>
> >> (1) val lines = Import full dataset using sc.textFile
> >> (2) val ABonly = Filter out all rows from "lines" that are not of type A 
> >> or B
> >> (3) val processA = Process only the A rows from ABonly
> >> (4) val processB = Process only the B rows from ABonly
> >>
> >> I assume that 3 and 4 are actions, or else nothing happens here at all.
> >>
> >> When 3 is invoked, it will compute 1, then 2, then 3. 4 will happen
> >> after 3, and may even cause 1 and 2 to happen again if nothing is
> >> persisted.
> >>
> >> You can invoke 3 and 4 in parallel on the driver if you like. That's
> >> fine. But actions are blocking in the driver.
> >>
> >>
> >>
> >> On Mon, Jan 19, 2015 at 8:21 AM, davidkl  wrote:
> >>> Hi Jon, I am looking for an answer for a similar question in the doc now, 
> >>> so
> >>> far no clue.
> >>>
> >>> I would need to know what is spark behaviour in a situation like the 
> >>> example
> >>> you provided, but taking into account also that there are multiple
> >>> partitions/workers.
> >>>
> >>> I could imagine it's possible that different spark workers are not
> >>> synchronized in terms of waiting for each other to progress to the next
> >>> step/stage for the partitions of data they get assigned, while I believe 
> >>> in
> >>> streaming they would wait for the current batch to complete before they
> >>> start working on a new one.
> >>>
> >>> In the code I am working on, I need to make sure a particular step is
> >>> completed (in all workers, for all partitions) before next transformation 
> >>> is
> >>> applied.
> >>>
> >>> Would be great if someone could clarify or point to these issues in the 
> >>> doc!
> >>> :-)
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context: 
> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html
> >>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>>
> >>> -
> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>> For additional commands, e-mail: user-h...@spark.apache.org
> >>>
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
> >
> >
> > --
> > thanks
> > ashish
> >
> > Blog: http://www.ashishpaliwal.com/blog
> > My Photo Galleries: http://www.pbase.com/ashishpaliwal
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

Re: Issues with constants in Spark HiveQL queries

2015-01-20 Thread yana
I run Spark 1.2 and do not have this issue. I dont believe the Hive version 
would matter(I run spark1.2 with Hive12 profile) but that would be a good test. 
The last version I tried for you was a cdh4.2 spark1.2 prebuilt without 
pointing to an external hive install(in fact I tried it on a machine w/ no 
other hadoop/hive jars). So download, unzip and run spark shell. I dont believe 
it's a bug personally. When you say typo do you mean there was indeed token 
Plus in your string? If you remove that token what stacktrace do you get?


Sent on the new Sprint Network from my Samsung Galaxy S®4.

 Original message From: Pala M Muthaia 
 Date:01/19/2015  8:26 PM  (GMT-05:00) 
To: Yana Kadiyska  Cc: "Cheng, 
Hao" ,user@spark.apache.org Subject: Re: Issues 
with constants in Spark HiveQL queries 
Yes we tried the master branch (sometime last week) and there was no 
issue, but the above repro is for branch 1.2 and Hive 0.13. Isn't that the 
final release branch for Spark 1.2? 

If so, a patch needs to be created or back-ported from master?

(Yes the obvious typo in the column name was introduced in this email only, so 
is irrelevant to the error).

On Wed, Jan 14, 2015 at 5:52 PM, Yana Kadiyska  wrote:
yeah, that makes sense. Pala, are you on a prebuild version of Spark -- I just 
tried the CDH4 prebuilt...Here is what I get for the = token:



The literal type shows as 290, not 291, and 290 is numeric. According to this 
http://grepcode.com/file/repo1.maven.org/maven2/org.apache.hive/hive-exec/0.13.1/org/apache/hadoop/hive/ql/parse/HiveParser.java#HiveParser
 291 is token PLUS which is really weird...


On Wed, Jan 14, 2015 at 7:47 PM, Cheng, Hao  wrote:
The log showed it failed in parsing, so the typo stuff shouldn’t be the root 
cause. BUT I couldn’t reproduce that with master branch.

 

I did the test as follow:

 

sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.13.1 hive/console

scala> sql(“SELECT user_id FROM actions where conversion_aciton_id=20141210”)

 

sbt/sbt –Phadoop-2.3.0 –Phadoop-2.3 –Phive –Phive-0.12.0 hive/console

scala> sql(“SELECT user_id FROM actions where conversion_aciton_id=20141210”)

 

 

From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com] 
Sent: Wednesday, January 14, 2015 11:12 PM
To: Pala M Muthaia
Cc: user@spark.apache.org
Subject: Re: Issues with constants in Spark HiveQL queries

 

Just a guess but what is the type of conversion_aciton_id? I do queries over an 
epoch all the time with no issues(where epoch's type is bigint). You can see 
the source here 
https://github.com/apache/spark/blob/v1.2.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
 -- not sure what ASTNode type: 291 but it sounds like it's not considered 
numeric? If it's a string it should be conversion_aciton_id='20141210' (single 
quotes around the string)

 

On Tue, Jan 13, 2015 at 5:25 PM, Pala M Muthaia  
wrote:

Hi,

 

We are testing Spark SQL-Hive QL, on Spark 1.2.0. We have run some simple 
queries successfully, but we hit the following issue whenever we attempt to use 
a constant in the query predicate.

 

It seems like an issue with parsing constant.

 

Query: SELECT user_id FROM actions where conversion_aciton_id=20141210

 

Error:

scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 
:

20141210

 

Any ideas? This seems very basic, so we may be missing something basic, but i 
haven't figured out what it is.

 

---

 

Full shell output below:

 

scala> sqlContext.sql("SELECT user_id FROM actions where 
conversion_aciton_id=20141210")

15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM 
actions where conversion_aciton_id=20141210

15/01/13 16:55:54 INFO ParseDriver: Parse Completed

15/01/13 16:55:54 INFO ParseDriver: Parsing command: SELECT user_id FROM 
actions where conversion_aciton_id=20141210

15/01/13 16:55:54 INFO ParseDriver: Parse Completed

java.lang.RuntimeException:

Unsupported language features in query: SELECT user_id FROM actions where 
conversion_aciton_id=20141210

TOK_QUERY

  TOK_FROM

TOK_TABREF

  TOK_TABNAME

actions

  TOK_INSERT

TOK_DESTINATION

  TOK_DIR

TOK_TMP_FILE

TOK_SELECT

  TOK_SELEXPR

TOK_TABLE_OR_COL

  user_id

TOK_WHERE

  =

TOK_TABLE_OR_COL

  conversion_aciton_id

20141210

 

scala.NotImplementedError: No parse rules for ASTNode type: 291, text: 20141210 
:

20141210

" +

 

org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1110)

 

at scala.sys.package$.error(package.scala:27)

at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:251)

at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)

at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)

at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)

at scala.util.parsing.combinator.Parsers

Saving a mllib model in Spark SQL

2015-01-20 Thread Divyansh Jain
Hey people,

I have run into some issues regarding saving the k-means mllib model in
Spark SQL by converting to a schema RDD. This is what I am doing:

case class Model(id: String, model:
org.apache.spark.mllib.clustering.KMeansModel)
import sqlContext.createSchemaRDD
val rowRdd = sc.makeRDD(Seq("id", model)).map(p => Model("id", model))

This is the error that I get :

scala.MatchError: org.apache.spark.mllib.classification.ClassificationModel
(of class scala.reflect.internal.Types$TypeRef$$anon$6)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:53)
  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:64)
  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:62)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:62)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:50)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:44)
  at
org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:229)
  at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:94)

Any help would be appreciated. Thanks!







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saving-a-mllib-model-in-Spark-SQL-tp21264.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming checkpoint recovery causes IO re-execution

2015-01-20 Thread RodrigoB
Hi Hannes,

Good to know I'm not alone on the boat. Sorry about not posting back, I
haven't gone in a while onto the user list. 

It's on my agenda to get over this issue. Will be very important for our
recovery implementation. I have done an internal proof of concept but
without any conclusions so far. 

The main approach is to have full control over offsets, meaning upon each
processed batch we will need to persist the last processed event (I'm using
Kafka btw) and keep the offset somewhere, so that upon recovery we only
start the streaming from the last processed one. This kind of goes in
conflict with the new ReliableReceiver implementation, where that control is
taken away from the processing layer... 
When recovering Spark Streaming, we need to control the recovered batches so
that only internal state gets updated and no IO gets executed. For this we
need to make internal changes to Spark Streaming

I exposed a function that identifies how many batches are being recovered.
Then I passed that info upfront to the workers, and with a counter they are
aware of how many batches were recomputed, thus avoiding IO re-execution.
This is very much in embryo stage so I can't actually help you much at this
stage...
This is the function I've created inside JobGenerator class to access the
recovered batches:

def getDownTimes() : Seq[Time] =
  {
println("123")
if (ssc.isCheckpointPresent) {
  val batchDuration = ssc.graph.batchDuration

  // Batches when the master was down, that is,
  // between the checkpoint and current restart time
  val checkpointTime = ssc.initialCheckpoint.checkpointTime
  val restartTime = new
Time(timer.getRestartTime(graph.zeroTime.milliseconds))
  val downTimes = checkpointTime.until(restartTime, batchDuration)
  logInfo("Batches during down time (" + downTimes.size + " batches): "
+ downTimes.mkString(", "))

  downTimes
}
else
  Seq[Time]()
  }

Has been a while since I last visited this issue so I'm probably not able to
give you too many details right now, but I expect to have a concrete
solution on which ultimately I could push as proposal to the Spark dev team.

I will definitely notify people on this thread at least.

Tnks,
Rod




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p21265.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark streaming with checkpoint

2015-01-20 Thread Shao, Saisai
Hi,

Seems you have such a large window (24 hours), so the phenomena of memory 
increasing is expectable, because of window operation will cache the RDD within 
this window in memory. So for your requirement, memory should be enough to hold 
the data of 24 hours.

I don't think checkpoint in Spark Streaming can alleviate such problem, because 
checkpoint are mainly for fault tolerance.

Thanks
Jerry

From: balu.naren [mailto:balu.na...@gmail.com]
Sent: Tuesday, January 20, 2015 7:17 PM
To: user@spark.apache.org
Subject: spark streaming with checkpoint


I am a beginner to spark streaming. So have a basic doubt regarding 
checkpoints. My use case is to calculate the no of unique users by day. I am 
using reduce by key and window for this. Where my window duration is 24 hours 
and slide duration is 5 mins. I am updating the processed record to mongodb. 
Currently I am replace the existing record each time. But I see the memory is 
slowly increasing over time and kills the process after 1 and 1/2 hours(in aws 
small instance). The DB write after the restart clears all the old data. So I 
understand checkpoint is the solution for this. But my doubt is

  *   What should my check point duration be..? As per documentation it says 
5-10 times of slide duration. But I need the data of entire day. So it is ok to 
keep 24 hrs.
  *   Where ideally should the checkpoint be..? Initially when I receive the 
stream or just before the window operation or after the data reduction has 
taken place.

Appreciate your help.
Thank you


View this message in context: spark streaming with 
checkpoint
Sent from the Apache Spark User List mailing list 
archive at Nabble.com.


getting started writing unit tests for my app

2015-01-20 Thread Matthew Cornell
Hi Folks,

I'm writing a GraphX app and I need to do some test-driven development.
I've got Spark running on our little cluster and have built and run some
hello world apps, so that's all good.

I've looked through the test source and found lots of helpful examples that
use SharedSparkContext, and to a greater extent, LocalSparkContext, but I
don't have those classes available to my app, which has a library set up to
use the lib/spark-assembly-1.2.0-hadoop2.0.0-mr1-cdh4.2.0.jar that came
with the compiled version I downloaded. (I don't plan on using Spark with
our CDH4 Hadoop, but I picked that download anyway.)

So I've downloaded the source code, found LocalSparkContext (two actually -
one each in core/ and graphx/ ), but now I'm stuck figuring out how to
build a jar (say lib/1.2.0-hadoop2.0.0-mr1-cdh4.2.0-TESTS.jar ) that I can
include in my tests' library. I'm not very familiar with Maven or SBT, so
doing the build is frankly daunting. I could really use your advice here!

Thank you,

matt


spark-submit --py-files remote: "Only local additional python files are supported"

2015-01-20 Thread Vladimir Grigor
Hi all!

I found this problem when I tried running python application on Amazon's
EMR yarn cluster.

It is possible to run bundled example applications on EMR but I cannot
figure out how to run a little bit more complex python application which
depends on some other python scripts. I tried adding those files with
'--py-files' and it works fine in local mode but it fails and gives me
following message when run in EMR:
"Error: Only local python files are supported:
s3://pathtomybucket/mylibrary.py".

Simplest way to reproduce in local:
bin/spark-submit --py-files s3://whatever.path.com/library.py main.py

Actual commands to run it in EMR
#launch cluster
aws emr create-cluster --name SparkCluster --ami-version 3.3.1
--instance-type m1.medium --instance-count 2  --ec2-attributes
KeyName=key20141114 --log-uri s3://pathtomybucket/cluster_logs
--enable-debugging --use-default-roles  --bootstrap-action
Name=Spark,Path=s3://pathtomybucket/bootstrap-actions/spark/install-spark,Args=["-s","
http://pathtomybucket/bootstrap-actions/spark
","-l","WARN","-v","1.2","-b","2014121700","-x"]
#{
#   "ClusterId": "j-2Y58DME79MPQJ"
#}

#run application
aws emr add-steps --cluster-id "j-2Y58DME79MPQJ" --steps
ActionOnFailure=CONTINUE,Name=SparkPy,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,cluster,--master,yarn-cluster,--py-files,s3://pathtomybucket/tasks/demo/main.py,main.py]
#{
#"StepIds": [
#"s-2UP4PP75YX0KU"
#]
#}
And in stderr of that step I get "Error: Only local python files are
supported: s3://pathtomybucket/tasks/demo/main.py".

What is the workaround or correct way to do it? Using hadoop's distcp to
copy dependency files from s3 to nodes as another pre-step?

Regards, Vladimir


Re: Error for first run from iPython Notebook

2015-01-20 Thread Dave
Not sure if anyone who can help has seen this. Any suggestions would be
appreciated, thanks!

On Mon Jan 19 2015 at 1:50:43 PM Dave  wrote:

> Hi,
>
> I've setup my first spark cluster (1 master, 2 workers) and an iPython
> notebook server that I'm trying to setup to access the cluster. I'm running
> the workers from Anaconda to make sure the python setup is correct on each
> box. The iPy notebook server appears to have everything setup correctly,
> and I'm able to initialize Spark and push a job out. However, the job is
> failing, and I'm not sure how to troubleshoot. Here's the code:
>
> from pyspark import SparkContext
> CLUSTER_URL = 'spark://192.168.1.20:7077'
> sc = SparkContext( CLUSTER_URL, 'pyspark')
> def sample(p):
> x, y = random(), random()
> return 1 if x*x + y*y < 1 else 0
>
> count = sc.parallelize(xrange(0, 20)).map(sample).reduce(lambda a, b: a +
> b)
> print "Pi is roughly %f" % (4.0 * count / 20)
>
>
> And here's the error:
>
> Py4JJavaError Traceback (most recent call 
> last) in ()  3 return 1 if x*x 
> + y*y < 1 else 0  4 > 5 count = sc.parallelize(xrange(0, 
> 20)).map(sample).reduce(lambda a, b: a + b)  6 print "Pi is roughly %f" % 
> (4.0 * count / 20)
> /opt/spark-1.2.0/python/pyspark/rdd.pyc in reduce(self, f)713 
> yield reduce(f, iterator, initial)714 --> 715 vals = 
> self.mapPartitions(func).collect()716 if vals:717 
> return reduce(f, vals)
> /opt/spark-1.2.0/python/pyspark/rdd.pyc in collect(self)674 """   
>  675 with SCCallSiteSync(self.context) as css:--> 676 
> bytesInJava = self._jrdd.collect().iterator()677 return 
> list(self._collect_iterator_through_file(bytesInJava))678
> /opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in 
> __call__(self, *args)536 answer = 
> self.gateway_client.send_command(command)537 return_value = 
> get_return_value(answer, self.gateway_client,--> 538 
> self.target_id, self.name)539 540 for temp_arg in temp_args:
> /opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)298  
>raise Py4JJavaError(299 'An error occurred while 
> calling {0}{1}{2}.\n'.--> 300 format(target_id, '.', 
> name), value)301 else:302 raise Py4JError(
> Py4JJavaError: An error occurred while calling o28.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 
> in stage 0.0 failed 4 times, most recent failure: Lost task 31.3 in stage 0.0 
> (TID 72, 192.168.1.21): org.apache.spark.api.python.PythonException: 
> Traceback (most recent call last):
>   File "/opt/spark-1.2.0/python/pyspark/worker.py", line 107, in main
> process()
>   File "/opt/spark-1.2.0/python/pyspark/worker.py", line 98, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/opt/spark-1.2.0/python/pyspark/serializers.py", line 227, in 
> dump_stream
> vs = list(itertools.islice(iterator, batch))
>   File "/opt/spark-1.2.0/python/pyspark/rdd.py", line 710, in func
> initial = next(iterator)
>   File "", line 2, in sample
> TypeError: 'module' object is not callable
>
>   at 
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>   at 
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:174)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSch

Re: Scala Spark SQL row object Ordinal Method Call Aliasing

2015-01-20 Thread Sunita Arvind
The below is not exactly a solution to your question but this is what we
are doing. For the first time we do end up doing row.getstring() and we
immediately parse it through a map function which aligns it to either a
case class or a structType. Then we register it as a table and use just
column names. The spark sql wiki has good examples for this. Looks more
easy to manage to me than your solution below.

Agree with you on the fact that when there are lot of columns,
row.getString() even once is not convenient

Regards

Sunita

On Tuesday, January 20, 2015, Night Wolf  wrote:

> In Spark SQL we have Row objects which contain a list of fields that make
> up a row. A Rowhas ordinal accessors such as .getInt(0) or getString(2).
>
> Say ordinal 0 = ID and ordinal 1 = Name. It becomes hard to remember what
> ordinal is what, making the code confusing.
>
> Say for example I have the following code
>
> def doStuff(row: Row) = {
>   //extract some items from the row into a tuple;
>   (row.getInt(0), row.getString(1)) //tuple of ID, Name}
>
> The question becomes how could I create aliases for these fields in a Row
> object?
>
> I was thinking I could create methods which take a implicit Row object;
>
> def id(implicit row: Row) = row.getInt(0)def name(implicit row: Row) = 
> row.getString(1)
>
> I could then rewrite the above as;
>
> def doStuff(implicit row: Row) = {
>   //extract some items from the row into a tuple;
>   (id, name) //tuple of ID, Name}
>
> Is there a better/neater approach?
>
>
> Cheers,
>
> ~NW
>


spark-submit --py-files remote: "Only local additional python files are supported"

2015-01-20 Thread Vladimir Grigor
Hi all!

I found this problem when I tried running python application on Amazon's
EMR yarn cluster.

It is possible to run bundled example applications on EMR but I cannot
figure out how to run a little bit more complex python application which
depends on some other python scripts. I tried adding those files with
'--py-files' and it works fine in local mode but it fails and gives me
following message when run in EMR:
"Error: Only local python files are supported:
s3://pathtomybucket/mylibrary.py".

Simplest way to reproduce in local:
bin/spark-submit --py-files s3://whatever.path.com/library.py main.py

Actual commands to run it in EMR
#launch cluster
aws emr create-cluster --name SparkCluster --ami-version 3.3.1
--instance-type m1.medium --instance-count 2  --ec2-attributes
KeyName=key20141114 --log-uri s3://pathtomybucket/cluster_logs
--enable-debugging --use-default-roles  --bootstrap-action
Name=Spark,Path=s3://pathtomybucket/bootstrap-actions/spark/install-spark,Args=["-s","
http://pathtomybucket/bootstrap-actions/spark
","-l","WARN","-v","1.2","-b","2014121700","-x"]
#{
#   "ClusterId": "j-2Y58DME79MPQJ"
#}

#run application
aws emr add-steps --cluster-id "j-2Y58DME79MPQJ" --steps
ActionOnFailure=CONTINUE,Name=SparkPy,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,cluster,--master,yarn-cluster,--py-files,s3://pathtomybucket/tasks/demo/main.py,main.py]
#{
#"StepIds": [
#"s-2UP4PP75YX0KU"
#]
#}
And in stderr of that step I get "Error: Only local python files are
supported: s3://pathtomybucket/tasks/demo/main.py".

What is the workaround or correct way to do it? Using hadoop's distcp to
copy dependency files from s3 to nodes as another pre-step?

Regards, Vladimir


Re: Why custom parquet format hive table execute "ParquetTableScan" physical plan, not "HiveTableScan"?

2015-01-20 Thread Yana Kadiyska
Hm, you might want to ask on the dev list if you don't get a good answer
here. I'm also trying to decipher this part of the code as I'm having
issues with predicate pushes. I can see (in master branch) that the SQL
codepath (which is taken if you don't convert the
metastore) 
C:\spark-master\sql\core\src\main\scala\org\apache\spark\sql\parquet\ParquetTableOperations.scala
around line 107 pushed the parquet filters into a hadoop configuration
object . Spark1.2 has similar code in the same file, via method
ParquetInputFormat.setFilterPredicate. But I think in the case where you go
through HiveTableScan you'd go through
C:\spark-master\sql\hive\src\main\scala\org\apache\spark\sql\hive\TableReader.scala
and I don't see anything happening with the filters there. But I'm not a
dev on this project -- mostly I'm really interested in the answer. Please
do update if you figure this out!

On Mon, Jan 19, 2015 at 8:02 PM, Xiaoyu Wang  wrote:

> The *spark.sql.parquet.**filterPushdown=true *has been turned on. But set
> *spark.sql.hive.**convertMetastoreParquet *to *false*. the first
> parameter is lose efficacy!!!
>
> 2015-01-20 6:52 GMT+08:00 Yana Kadiyska :
>
>> If you're talking about filter pushdowns for parquet files this also has
>> to be turned on explicitly. Try  *spark.sql.parquet.**filterPushdown=true
>> . *It's off by default
>>
>> On Mon, Jan 19, 2015 at 3:46 AM, Xiaoyu Wang  wrote:
>>
>>> Yes it works!
>>> But the filter can't pushdown!!!
>>>
>>> If custom parquetinputformat only implement the datasource API?
>>>
>>>
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
>>>
>>> 2015-01-16 21:51 GMT+08:00 Xiaoyu Wang :
>>>
 Thanks yana!
 I will try it!

 在 2015年1月16日,20:51,yana  写道:

 I think you might need to set
 spark.sql.hive.convertMetastoreParquet to false if I understand that
 flag correctly

 Sent on the new Sprint Network from my Samsung Galaxy S®4.


  Original message 
 From: Xiaoyu Wang
 Date:01/16/2015 5:09 AM (GMT-05:00)
 To: user@spark.apache.org
 Subject: Why custom parquet format hive table execute
 "ParquetTableScan" physical plan, not "HiveTableScan"?

 Hi all!

 In the Spark SQL1.2.0.
 I create a hive table with custom parquet inputformat and outputformat.
 like this :
 CREATE TABLE test(
   id string,
   msg string)
 CLUSTERED BY (
   id)
 SORTED BY (
   id ASC)
 INTO 10 BUCKETS
 ROW FORMAT SERDE
   '*com.a.MyParquetHiveSerDe*'
 STORED AS INPUTFORMAT
   '*com.a.MyParquetInputFormat*'
 OUTPUTFORMAT
   '*com.a.MyParquetOutputFormat*';

 And the spark shell see the plan of "select * from test" is :

 [== Physical Plan ==]
 [!OutputFaker [id#5,msg#6]]
 [ *ParquetTableScan* [id#12,msg#13], (ParquetRelation
 hdfs://hadoop/user/hive/warehouse/test.db/test, Some(Configuration:
 core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,
 yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml),
 org.apache.spark.sql.hive.HiveContext@6d15a113, []), []]

 *Not HiveTableScan*!!!
 *So it dosn't execute my custom inputformat!*
 Why? How can it execute my custom inputformat?

 Thanks!



>>>
>>
>


RE: Can I save RDD to local file system and then read it back on spark cluster with multiple nodes?

2015-01-20 Thread Wang, Ningjun (LNG-NPV)
Can anybody answer this? Do I have to have hdfs to achieve this?

Regards,

Ningjun Wang
Consulting Software Engineer
LexisNexis
121 Chanlon Road
New Providence, NJ 07974-1541

From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com]
Sent: Friday, January 16, 2015 1:15 PM
To: Imran Rashid
Cc: user@spark.apache.org
Subject: RE: Can I save RDD to local file system and then read it back on spark 
cluster with multiple nodes?

I need to save RDD to file system and then restore my RDD from the file system 
in the future. I don’t have any hdfs file system and don’t want to go the 
hassle of setting up a hdfs system. So how can I achieve this? The application 
need to be run on a cluster with multiple nodes.

Regards,

Ningjun

From: imranra...@gmail.com 
[mailto:imranra...@gmail.com] On Behalf Of Imran Rashid
Sent: Friday, January 16, 2015 12:14 PM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: Can I save RDD to local file system and then read it back on spark 
cluster with multiple nodes?


I'm not positive, but I think this is very unlikely to work.

First, when you call sc.objectFile(...),  I think the *driver* will need to 
know something about the file, eg to know how many tasks to create.  But it 
won't even be able to see the file, since it only lives on the local filesystem 
of the cluster nodes.

If you really wanted to, you could probably write out some small metadata about 
the files and write your own version of objectFile that uses it.  But I think 
there is a bigger conceptual issue.  You might not in general be sure that you 
are running on the same nodes when you save the file, as when you read it back 
in.  So the file might not be present on the local filesystem for the active 
executors.  You might be able to guarantee it for the specific cluster setup 
you have now, but it might limit you down the road.

What are you trying to achieve?  There might be a better way.  I believe 
writing to hdfs will usually write one local copy, so you'd still be doing a 
local read when you reload the data.

Imran
On Jan 16, 2015 6:19 AM, "Wang, Ningjun (LNG-NPV)" 
mailto:ningjun.w...@lexisnexis.com>> wrote:
I have asked this question before but get no answer. Asking again.

Can I save RDD to the local file system and then read it back on a spark 
cluster with multiple nodes?

rdd.saveAsObjectFile(“file:///home/data/rdd1”)

val rdd2 = sc.objectFile(“file:///home/data/rdd1”)

This will works if the cluster has only one node. But my cluster has 3 nodes 
and each node has a local dir called /home/data. Is rdd saved to the local dir 
across 3 nodes? If so, does sc.objectFile(…) smart enough to read the local dir 
in all 3 nodes to merge them into a single rdd?

Ningjun



Apply function to all elements along each key

2015-01-20 Thread Luis Guerra
Hi all,

I would like to apply a function over all elements for each key (assuming
key-value RDD). For instance, imagine I have:

import numpy as np
a = np.array([[1, 'hola', 'adios'],[2, 'hi', 'bye'],[2, 'hello',
'goodbye']])
a = sc.parallelize(a)

Then I want to create a key-value RDD, using the first element of each []
as key:

b = a.groupBy(lambda x: x[0])

And finally, I want to filter only those values where the second element is
equal along each key (or there is only one element). So, for key 1, there
is only one element ('hola'), whereas there are 2 different elements for
key 2 ('hi', 'hello'). Therefore, only values associated to key 1 must be
returned:

def test(group):
x = group[0][1]
for g in group[1:]:
y = g[1]
if x != y:
return []
else:
x = y
return group

c = flatMap(lambda (x,y): test(y.data))

Is there a more efficient way to do this?

Many thanks in advance,

Best


NPE in Parquet

2015-01-20 Thread Alessandro Baretta
All,

I strongly suspect this might be caused by a glitch in the communication
with Google Cloud Storage where my job is writing to, as this NPE exception
shows up fairly randomly. Any ideas?

Exception in thread "Thread-126" java.lang.NullPointerException
at
scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:114)
at
scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:114)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at
scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.scala:447)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.readSchemaFromFile(ParquetTypes.scala:485)
at
org.apache.spark.sql.parquet.ParquetRelation.(ParquetRelation.scala:65)
at org.apache.spark.sql.SQLContext.parquetFile(SQLContext.scala:190)
at
Truven$Stats$anonfun$save_to_parquet$3$anonfun$21$anon$7.run(Truven.scala:957)


Alex


PySpark Client

2015-01-20 Thread Chris Beavers
Hey all,

Is there any notion of a lightweight python client for submitting jobs to a
Spark cluster remotely? If I essentially install Spark on the client
machine, and that machine has the same OS, same version of Python, etc.,
then I'm able to communicate with the cluster just fine. But if Python
versions differ slightly, then I start to see a lot of opaque errors that
often bubble up as EOFExceptions. Furthermore, this just seems like a very
heavy weight way to set up a client.

Does anyone have any suggestions for setting up a thin pyspark client on a
node which doesn't necessarily conform to the homogeneity of the target
Spark cluster?

Best,
Chris


Re: NPE in Parquet

2015-01-20 Thread Iulian Dragoș
It’s an array.length, where the array is null. Looking through the code, it
looks like the type converter assumes that FileSystem.globStatus never
returns null, but that is not the case. Digging through the Hadoop
codebase, inside Globber.glob, here’s what I found:

/*
 * When the input pattern "looks" like just a simple filename, and we
 * can't find it, we return null rather than an empty array.
 * This is a special case which the shell relies on.
 *
 * To be more precise: if there were no results, AND there were no
 * groupings (aka brackets), and no wildcards in the input (aka stars),
 * we return null.
 */

 if ((!sawWildcard) && results.isEmpty() &&
(flattenedPatterns.size() <= 1)) {
  return null;
}

So, if your file is a concrete filename, without wildcards, you might get a
null back. Seems like a bug in ParquetTypesConverter.

iulian
​

On Tue, Jan 20, 2015 at 5:29 PM, Alessandro Baretta 
wrote:

> All,
>
> I strongly suspect this might be caused by a glitch in the communication
> with Google Cloud Storage where my job is writing to, as this NPE exception
> shows up fairly randomly. Any ideas?
>
> Exception in thread "Thread-126" java.lang.NullPointerException
> at
> scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:114)
> at
> scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:114)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at
> scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.scala:447)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.readSchemaFromFile(ParquetTypes.scala:485)
> at
> org.apache.spark.sql.parquet.ParquetRelation.(ParquetRelation.scala:65)
> at
> org.apache.spark.sql.SQLContext.parquetFile(SQLContext.scala:190)
> at
> Truven$Stats$anonfun$save_to_parquet$3$anonfun$21$anon$7.run(Truven.scala:957)
>
>
> Alex
>



-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Aggregate order semantics when spilling

2015-01-20 Thread Justin Uang
Hi,

I am trying to aggregate a key based on some timestamp, and I believe that
spilling to disk is changing the order of the data fed into the combiner.

I have some timeseries data that is of the form: ("key", "date", "other
data")

Partition 1
("A", 2, ...)
("B", 4, ...)
("A", 1, ...)
("A", 3, ...)
("B", 6, ...)

which I then partition by key, then sort within the partition:

Partition 1
("A", 1, ...)
("A", 2, ...)
("A", 3, ...)
("A", 4, ...)

Partition 2
("B", 4, ...)
("B", 6, ...)

If I run a combineByKey with the same partitioner, then the items for each
key will be fed into the ExternalAppendOnlyMap in the correct order.
However, if I spill, then the time slices are spilled to disk as multiple
partial combiners. When its time to merge the spilled combiners for each
key, the combiners are combined in the wrong order.

For example, if during a groupByKey, [("A", 1, ...), ("A", 2...)] and
[("A", 3, ...), ("A", 4, ...)] are spilled separately, it's possible that
the combiners can be combined in the wrong order, like [("A", 3, ...),
("A", 4, ...), ("A", 1, ...), ("A", 2, ...)], which invalidates the
invariant that all the values for A are passed in order to the combiners.

I'm not an expert, but I suspect that this is because we use a heap ordered
by key when iterating, which doesn't retain the order the spilled
combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index),
where spill_index is incremented each time we spill? This would mean that
we would pop and merge the combiners of each key in order, resulting in
[("A", 1, ...), ("A", 2, ...), ("A", 3, ...), ("A", 4, ...)].

Thanks in advance for the help! If there is a way to do this already in
Spark 1.2, can someone point it out to me?

Best,

Justin


Re: How to output to S3 and keep the order

2015-01-20 Thread Anny Chen
Thanks Aniket! It is working now.

Anny

On Mon, Jan 19, 2015 at 5:56 PM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> When you repartiton, ordering can get lost. You would need to sort after
> repartitioning.
>
> Aniket
>
> On Tue, Jan 20, 2015, 7:08 AM anny9699  wrote:
>
>> Hi,
>>
>> I am using Spark on AWS and want to write the output to S3. It is a
>> relatively small file and I don't want them to output as multiple parts.
>> So
>> I use
>>
>> result.repartition(1).saveAsTextFile("s3://...")
>>
>> However as long as I am using the saveAsTextFile method, the output
>> doesn't
>> keep the original order. But if I use BufferedWriter in Java to write the
>> output, I could only write to the master machine instead of S3 directly.
>> Is
>> there a way that I could write to S3 and the same time keep the order?
>>
>> Thanks a lot!
>> Anny
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/How-to-output-to-S3-and-keep-the-order-tp21246.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: PySpark Client

2015-01-20 Thread Andrew Or
Hi Chris,

Short answer is no, not yet.

Longer answer is that PySpark only supports client mode, which means your
driver runs on the same machine as your submission client. By corollary
this means your submission client must currently depend on all of Spark and
its dependencies. There is a patch that supports this for *cluster* mode
(as opposed to client mode), which would be the first step towards what you
want.

-Andrew

2015-01-20 8:36 GMT-08:00 Chris Beavers :

> Hey all,
>
> Is there any notion of a lightweight python client for submitting jobs to
> a Spark cluster remotely? If I essentially install Spark on the client
> machine, and that machine has the same OS, same version of Python, etc.,
> then I'm able to communicate with the cluster just fine. But if Python
> versions differ slightly, then I start to see a lot of opaque errors that
> often bubble up as EOFExceptions. Furthermore, this just seems like a very
> heavy weight way to set up a client.
>
> Does anyone have any suggestions for setting up a thin pyspark client on a
> node which doesn't necessarily conform to the homogeneity of the target
> Spark cluster?
>
> Best,
> Chris
>


Re: spark-submit --py-files remote: "Only local additional python files are supported"

2015-01-20 Thread Andrew Or
Hi Vladimir,

Yes, as the error messages suggests, PySpark currently only supports local
files. This does not mean it only runs in local mode, however; you can
still run PySpark on any cluster manager (though only in client mode). All
this means is that your python files must be on your local file system.
Until this is supported, the straightforward workaround then is to just
copy the files to your local machine.

-Andrew

2015-01-20 7:38 GMT-08:00 Vladimir Grigor :

> Hi all!
>
> I found this problem when I tried running python application on Amazon's
> EMR yarn cluster.
>
> It is possible to run bundled example applications on EMR but I cannot
> figure out how to run a little bit more complex python application which
> depends on some other python scripts. I tried adding those files with
> '--py-files' and it works fine in local mode but it fails and gives me
> following message when run in EMR:
> "Error: Only local python files are supported:
> s3://pathtomybucket/mylibrary.py".
>
> Simplest way to reproduce in local:
> bin/spark-submit --py-files s3://whatever.path.com/library.py main.py
>
> Actual commands to run it in EMR
> #launch cluster
> aws emr create-cluster --name SparkCluster --ami-version 3.3.1
> --instance-type m1.medium --instance-count 2  --ec2-attributes
> KeyName=key20141114 --log-uri s3://pathtomybucket/cluster_logs
> --enable-debugging --use-default-roles  --bootstrap-action
> Name=Spark,Path=s3://pathtomybucket/bootstrap-actions/spark/install-spark,Args=["-s","
> http://pathtomybucket/bootstrap-actions/spark
> ","-l","WARN","-v","1.2","-b","2014121700","-x"]
> #{
> #   "ClusterId": "j-2Y58DME79MPQJ"
> #}
>
> #run application
> aws emr add-steps --cluster-id "j-2Y58DME79MPQJ" --steps
> ActionOnFailure=CONTINUE,Name=SparkPy,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,cluster,--master,yarn-cluster,--py-files,s3://pathtomybucket/tasks/demo/main.py,main.py]
> #{
> #"StepIds": [
> #"s-2UP4PP75YX0KU"
> #]
> #}
> And in stderr of that step I get "Error: Only local python files are
> supported: s3://pathtomybucket/tasks/demo/main.py".
>
> What is the workaround or correct way to do it? Using hadoop's distcp to
> copy dependency files from s3 to nodes as another pre-step?
>
> Regards, Vladimir
>


Re: Does Spark automatically run different stages concurrently when possible?

2015-01-20 Thread Kane Kim
Related question - is execution of different stages optimized? I.e.
map followed by a filter will require 2 loops or they will be combined
into single one?

On Tue, Jan 20, 2015 at 4:33 AM, Bob Tiernay  wrote:
> I found the following to be a good discussion of the same topic:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/The-concurrent-model-of-spark-job-stage-task-td13083.html
>
>
>> From: so...@cloudera.com
>> Date: Tue, 20 Jan 2015 10:02:20 +
>> Subject: Re: Does Spark automatically run different stages concurrently
>> when possible?
>> To: paliwalash...@gmail.com
>> CC: davidkl...@hotmail.com; user@spark.apache.org
>
>>
>> You can persist the RDD in (2) right after it is created. It will not
>> cause it to be persisted immediately, but rather the first time it is
>> materialized. If you persist after (3) is calculated, then it will be
>> re-calculated (and persisted) after (4) is calculated.
>>
>> On Tue, Jan 20, 2015 at 3:38 AM, Ashish  wrote:
>> > Sean,
>> >
>> > A related question. When to persist the RDD after step 2 or after Step
>> > 3 (nothing would happen before step 3 I assume)?
>> >
>> > On Mon, Jan 19, 2015 at 5:17 PM, Sean Owen  wrote:
>> >> From the OP:
>> >>
>> >> (1) val lines = Import full dataset using sc.textFile
>> >> (2) val ABonly = Filter out all rows from "lines" that are not of type
>> >> A or B
>> >> (3) val processA = Process only the A rows from ABonly
>> >> (4) val processB = Process only the B rows from ABonly
>> >>
>> >> I assume that 3 and 4 are actions, or else nothing happens here at all.
>> >>
>> >> When 3 is invoked, it will compute 1, then 2, then 3. 4 will happen
>> >> after 3, and may even cause 1 and 2 to happen again if nothing is
>> >> persisted.
>> >>
>> >> You can invoke 3 and 4 in parallel on the driver if you like. That's
>> >> fine. But actions are blocking in the driver.
>> >>
>> >>
>> >>
>> >> On Mon, Jan 19, 2015 at 8:21 AM, davidkl 
>> >> wrote:
>> >>> Hi Jon, I am looking for an answer for a similar question in the doc
>> >>> now, so
>> >>> far no clue.
>> >>>
>> >>> I would need to know what is spark behaviour in a situation like the
>> >>> example
>> >>> you provided, but taking into account also that there are multiple
>> >>> partitions/workers.
>> >>>
>> >>> I could imagine it's possible that different spark workers are not
>> >>> synchronized in terms of waiting for each other to progress to the
>> >>> next
>> >>> step/stage for the partitions of data they get assigned, while I
>> >>> believe in
>> >>> streaming they would wait for the current batch to complete before
>> >>> they
>> >>> start working on a new one.
>> >>>
>> >>> In the code I am working on, I need to make sure a particular step is
>> >>> completed (in all workers, for all partitions) before next
>> >>> transformation is
>> >>> applied.
>> >>>
>> >>> Would be great if someone could clarify or point to these issues in
>> >>> the doc!
>> >>> :-)
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> >>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>> >
>> >
>> > --
>> > thanks
>> > ashish
>> >
>> > Blog: http://www.ashishpaliwal.com/blog
>> > My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Aggregate order semantics when spilling

2015-01-20 Thread Andrew Or
Hi Justin,

I believe the intended semantics of groupByKey or cogroup is that the
ordering *within a key *is not preserved if you spill. In fact, the test
cases for the ExternalAppendOnlyMap only assert that the Set representation
of the results is as expected (see this line
).
This is because these Spark primitives literally just group the values by a
key but does not provide any ordering guarantees.

However, if ordering within a key is a requirement for your application,
then you may need to write your own PairRDDFunction that calls
combineByKey. You can model your method after groupByKey, but change the
combiner function slightly to take ordering into account. This may add some
overhead to your application since you need to insert every value in the
appropriate place, but since you're spilling anyway the overhead will
likely be shadowed by disk I/O.

Let me know if that works.
-Andrew


2015-01-20 9:18 GMT-08:00 Justin Uang :

> Hi,
>
> I am trying to aggregate a key based on some timestamp, and I believe that
> spilling to disk is changing the order of the data fed into the combiner.
>
> I have some timeseries data that is of the form: ("key", "date", "other
> data")
>
> Partition 1
> ("A", 2, ...)
> ("B", 4, ...)
> ("A", 1, ...)
> ("A", 3, ...)
> ("B", 6, ...)
>
> which I then partition by key, then sort within the partition:
>
> Partition 1
> ("A", 1, ...)
> ("A", 2, ...)
> ("A", 3, ...)
> ("A", 4, ...)
>
> Partition 2
> ("B", 4, ...)
> ("B", 6, ...)
>
> If I run a combineByKey with the same partitioner, then the items for each
> key will be fed into the ExternalAppendOnlyMap in the correct order.
> However, if I spill, then the time slices are spilled to disk as multiple
> partial combiners. When its time to merge the spilled combiners for each
> key, the combiners are combined in the wrong order.
>
> For example, if during a groupByKey, [("A", 1, ...), ("A", 2...)] and
> [("A", 3, ...), ("A", 4, ...)] are spilled separately, it's possible that
> the combiners can be combined in the wrong order, like [("A", 3, ...),
> ("A", 4, ...), ("A", 1, ...), ("A", 2, ...)], which invalidates the
> invariant that all the values for A are passed in order to the combiners.
>
> I'm not an expert, but I suspect that this is because we use a heap
> ordered by key when iterating, which doesn't retain the order the spilled
> combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index),
> where spill_index is incremented each time we spill? This would mean that
> we would pop and merge the combiners of each key in order, resulting in
> [("A", 1, ...), ("A", 2, ...), ("A", 3, ...), ("A", 4, ...)].
>
> Thanks in advance for the help! If there is a way to do this already in
> Spark 1.2, can someone point it out to me?
>
> Best,
>
> Justin
>


dynamically change receiver for a spark stream

2015-01-20 Thread jamborta
Hi all,

we have been trying to setup a stream using a custom receiver that would
pick up data from sql databases. we'd like to keep that stream context
running and dynamically change the streams on demand, adding and removing
streams based on demand. alternativel, if a stream is fixed, is it possible
to stop a stream, change to config and start again? 

thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there any way to support multiple users executing SQL on thrift server?

2015-01-20 Thread Cheng Lian

Hey Yi,

I'm quite unfamiliar with Hadoop/HDFS auth mechanisms for now, but would 
like to investigate this issue later. Would you please open an JIRA for 
it? Thanks!


Cheng

On 1/19/15 1:00 AM, Yi Tian wrote:


Is there any way to support multiple users executing SQL on one thrift 
server?


I think there are some problems for spark 1.2.0, for example:

 1. Start thrift server with user A
 2. Connect to thrift server via beeline with user B
 3. Execute “insert into table dest select … from table src”

then we found these items on hdfs:

|drwxr-xr-x   - B supergroup  0 2015-01-16 16:42 
/tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1
drwxr-xr-x   - B supergroup  0 2015-01-16 16:42 
/tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary
drwxr-xr-x   - B supergroup  0 2015-01-16 16:42 
/tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0
drwxr-xr-x   - A supergroup  0 2015-01-16 16:42 
/tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/_temporary
drwxr-xr-x   - A supergroup  0 2015-01-16 16:42 
/tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/task_201501161642_0022_m_00
-rw-r--r--   3 A supergroup   2671 2015-01-16 16:42 
/tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/task_201501161642_0022_m_00/part-0
|

You can see all the temporary path created on driver side (thrift 
server side) is owned by user B (which is what we expected).


But all the output data created on executor side is owned by user A, 
(which is NOT what we expected).
error owner of the output data cause 
|org.apache.hadoop.security.AccessControlException| while the driver 
side moving output data into |dest| table.


Is anyone know how to resolve this problem?

​




Re: Why custom parquet format hive table execute "ParquetTableScan" physical plan, not "HiveTableScan"?

2015-01-20 Thread Cheng Lian
|spark.sql.parquet.filterPushdown| defaults to |false| because there’s a 
bug in Parquet which may cause NPE, please refer to 
http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration


This bug hasn’t been fixed in Parquet master. We’ll turn this on once 
the bug is fixed.


Cheng

On 1/19/15 5:02 PM, Xiaoyu Wang wrote:

The *spark.sql.parquet.**filterPushdown=true *has been turned on. But 
set *spark.sql.hive.**convertMetastoreParquet *to *false*. the first 
parameter is lose efficacy!!!


2015-01-20 6:52 GMT+08:00 Yana Kadiyska >:


If you're talking about filter pushdowns for parquet files this
also has to be turned on explicitly. Try
*spark.sql.parquet.**filterPushdown=true . *It's off by default

On Mon, Jan 19, 2015 at 3:46 AM, Xiaoyu Wang mailto:wangxy...@gmail.com>> wrote:

Yes it works!
But the filter can't pushdown!!!

If custom parquetinputformat only implement the datasource API?


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

2015-01-16 21:51 GMT+08:00 Xiaoyu Wang mailto:wangxy...@gmail.com>>:

Thanks yana!
I will try it!


在 2015年1月16日,20:51,yana mailto:yana.kadiy...@gmail.com>> 写道:

I think you might need to set
spark.sql.hive.convertMetastoreParquet to false if I
understand that flag correctly

Sent on the new Sprint Network from my Samsung Galaxy S®4.


 Original message 
From: Xiaoyu Wang
Date:01/16/2015 5:09 AM (GMT-05:00)
To: user@spark.apache.org 
Subject: Why custom parquet format hive table execute
"ParquetTableScan" physical plan, not "HiveTableScan"?

Hi all!

In the Spark SQL1.2.0.
I create a hive table with custom parquet inputformat and
outputformat.
like this :
CREATE TABLE test(
  id string,
  msg string)
CLUSTERED BY (
  id)
SORTED BY (
  id ASC)
INTO 10 BUCKETS
ROW FORMAT SERDE
  '*com.a.MyParquetHiveSerDe*'
STORED AS INPUTFORMAT
  '*com.a.MyParquetInputFormat*'
OUTPUTFORMAT
  '*com.a.MyParquetOutputFormat*';

And the spark shell see the plan of "select * from test" is :

[== Physical Plan ==]
[!OutputFaker [id#5,msg#6]]
[ *ParquetTableScan* [id#12,msg#13], (ParquetRelation
hdfs://hadoop/user/hive/warehouse/test.db/test,
Some(Configuration: core-default.xml, core-site.xml,
mapred-default.xml, mapred-site.xml, yarn-default.xml,
yarn-site.xml, hdfs-default.xml, hdfs-site.xml),
org.apache.spark.sql.hive.HiveContext@6d15a113, []), []]

*Not HiveTableScan*!!!
*So it dosn't execute my custom inputformat!*
Why? How can it execute my custom inputformat?

Thanks!






​


Re: Why custom parquet format hive table execute "ParquetTableScan" physical plan, not "HiveTableScan"?

2015-01-20 Thread Cheng Lian
In Spark SQL, Parquet filter pushdown doesn’t cover |HiveTableScan| for 
now. May I ask why do you prefer |HiveTableScan| rather than 
|ParquetTableScan|?


Cheng

On 1/19/15 5:02 PM, Xiaoyu Wang wrote:

The *spark.sql.parquet.**filterPushdown=true *has been turned on. But 
set *spark.sql.hive.**convertMetastoreParquet *to *false*. the first 
parameter is lose efficacy!!!


2015-01-20 6:52 GMT+08:00 Yana Kadiyska >:


If you're talking about filter pushdowns for parquet files this
also has to be turned on explicitly. Try
*spark.sql.parquet.**filterPushdown=true . *It's off by default

On Mon, Jan 19, 2015 at 3:46 AM, Xiaoyu Wang mailto:wangxy...@gmail.com>> wrote:

Yes it works!
But the filter can't pushdown!!!

If custom parquetinputformat only implement the datasource API?


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

2015-01-16 21:51 GMT+08:00 Xiaoyu Wang mailto:wangxy...@gmail.com>>:

Thanks yana!
I will try it!


在 2015年1月16日,20:51,yana mailto:yana.kadiy...@gmail.com>> 写道:

I think you might need to set
spark.sql.hive.convertMetastoreParquet to false if I
understand that flag correctly

Sent on the new Sprint Network from my Samsung Galaxy S®4.


 Original message 
From: Xiaoyu Wang
Date:01/16/2015 5:09 AM (GMT-05:00)
To: user@spark.apache.org 
Subject: Why custom parquet format hive table execute
"ParquetTableScan" physical plan, not "HiveTableScan"?

Hi all!

In the Spark SQL1.2.0.
I create a hive table with custom parquet inputformat and
outputformat.
like this :
CREATE TABLE test(
  id string,
  msg string)
CLUSTERED BY (
  id)
SORTED BY (
  id ASC)
INTO 10 BUCKETS
ROW FORMAT SERDE
  '*com.a.MyParquetHiveSerDe*'
STORED AS INPUTFORMAT
  '*com.a.MyParquetInputFormat*'
OUTPUTFORMAT
  '*com.a.MyParquetOutputFormat*';

And the spark shell see the plan of "select * from test" is :

[== Physical Plan ==]
[!OutputFaker [id#5,msg#6]]
[ *ParquetTableScan* [id#12,msg#13], (ParquetRelation
hdfs://hadoop/user/hive/warehouse/test.db/test,
Some(Configuration: core-default.xml, core-site.xml,
mapred-default.xml, mapred-site.xml, yarn-default.xml,
yarn-site.xml, hdfs-default.xml, hdfs-site.xml),
org.apache.spark.sql.hive.HiveContext@6d15a113, []), []]

*Not HiveTableScan*!!!
*So it dosn't execute my custom inputformat!*
Why? How can it execute my custom inputformat?

Thanks!






​


Re: dynamically change receiver for a spark stream

2015-01-20 Thread Akhil Das
Can you not do it with RDDs?

Thanks
Best Regards

On Wed, Jan 21, 2015 at 12:38 AM, jamborta  wrote:

> Hi all,
>
> we have been trying to setup a stream using a custom receiver that would
> pick up data from sql databases. we'd like to keep that stream context
> running and dynamically change the streams on demand, adding and removing
> streams based on demand. alternativel, if a stream is fixed, is it possible
> to stop a stream, change to config and start again?
>
> thanks,
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Scala Spark SQL row object Ordinal Method Call Aliasing

2015-01-20 Thread Cheng Lian
I had once worked on a named row feature but haven’t got time to finish 
it. It looks like this:


|sql("...").named.map { row:NamedRow  =>
  row[Int]('key) -> row[String]('value)
}
|

Basically the |named| method generates a field name to ordinal map for 
each RDD partition. This map is then shared shared by all |NamedRow| 
instances within a partition. Not exactly what you want, but might be 
helpful.


Cheng

On 1/20/15 3:39 AM, Night Wolf wrote:

In Spark SQL we have|Row|objects which contain a list of fields that 
make up a row. A|Row|has ordinal accessors such 
as|.getInt(0)|or|getString(2)|.


Say ordinal 0 = ID and ordinal 1 = Name. It becomes hard to remember 
what ordinal is what, making the code confusing.


Say for example I have the following code

|def  doStuff(row:  Row)  =  {
   //extract some items from the row into a tuple;
   (row.getInt(0),  row.getString(1))  //tuple of ID, Name
}|

The question becomes how could I create aliases for these fields in a 
Row object?


I was thinking I could create methods which take a implicit Row object;

|def  id(implicit  row:  Row)  =  row.getInt(0)
def  name(implicit  row:  Row)  =  row.getString(1)|

I could then rewrite the above as;

|def  doStuff(implicit  row:  Row)  =  {
   //extract some items from the row into a tuple;
   (id,  name)  //tuple of ID, Name
}|

Is there a better/neater approach?


Cheers,

~NW


​


Re: Saving a mllib model in Spark SQL

2015-01-20 Thread Cheng Lian
This is because |KMeanModel| is neither a built-in type nor a user 
defined type recognized by Spark SQL. I think you can write your own UDT 
version of |KMeansModel| in this case. You may refer to 
|o.a.s.mllib.linalg.Vector| and |o.a.s.mllib.linalg.VectorUDT| as an 
example.


Cheng

On 1/20/15 5:34 AM, Divyansh Jain wrote:


Hey people,

I have run into some issues regarding saving the k-means mllib model in
Spark SQL by converting to a schema RDD. This is what I am doing:

case class Model(id: String, model:
org.apache.spark.mllib.clustering.KMeansModel)
import sqlContext.createSchemaRDD
val rowRdd = sc.makeRDD(Seq("id", model)).map(p => Model("id", model))

This is the error that I get :

scala.MatchError: org.apache.spark.mllib.classification.ClassificationModel
(of class scala.reflect.internal.Types$TypeRef$anon$6)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:53)
  at
org.apache.spark.sql.catalyst.ScalaReflection$anonfun$schemaFor$1.apply(ScalaReflection.scala:64)
  at
org.apache.spark.sql.catalyst.ScalaReflection$anonfun$schemaFor$1.apply(ScalaReflection.scala:62)
  at
scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
  at
scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:62)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:50)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:44)
  at
org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:229)
  at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:94)

Any help would be appreciated. Thanks!







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saving-a-mllib-model-in-Spark-SQL-tp21264.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



​


Confused about shuffle read and shuffle write

2015-01-20 Thread Darin McBeath
I have the following code in a Spark Job.
 // Get the baseline input file(s) JavaPairRDD 
hsfBaselinePairRDDReadable = sc.hadoopFile(baselineInputBucketFile, 
SequenceFileInputFormat.class, Text.class, Text.class); JavaPairRDD hsfBaselinePairRDD = hsfBaselinePairRDDReadable.mapToPair(new 
ConvertFromWritableTypes()).partitionBy(new 
HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_ONLY_SER());
 
 // Use 'substring' to extract epoch values. JavaPairRDD 
baselinePairRDD = hsfBaselinePairRDD.mapValues(new 
ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_ONLY_SER());
When looking at the STAGE information for my job, I notice the following:
To construct hsfBaselinePairRDD, it takes about 7.5 minutes, with 931GB of 
input (from S3) and 377GB of shuffle write (presumably because of the hash 
partitioning).  This all makes sense.
To construct the baselinePairRDD, it also takes about 7.5 minutes.  I thought 
that was a bit odd.  But what I thought was really odd is why there was also 
330GB of shuffle read in this stage.  I would have thought there should be 0 
shuffle read in this stage.  
What I'm confused about is why there is even any 'shuffle read' when 
constructing the baselinePairRDD.  If anyone could shed some light on this it 
would be appreciated.
Thanks.
Darin.

Re: IF statement doesn't work in Spark-SQL?

2015-01-20 Thread Cheng Lian
|IF| is implemented as a generic UDF in Hive (|GenericUDFIf|). It seems 
that this function can’t be properly resolved. Could you provide a 
minimum code snippet that reproduces this issue?


Cheng

On 1/20/15 1:22 AM, Xuelin Cao wrote:



Hi,

  I'm trying to migrate some hive scripts to Spark-SQL. However, I 
found some statement is incompatible in Spark-sql.


Here is my SQL. And the same SQL works fine in HIVE environment.

SELECT
*if(ad_user_id>1000, 1000, ad_user_id) as user_id*
FROM
  ad_search_keywords

 What I found is, the parser reports error on the "*if*" statement:

No function to evaluate expression. type: AttributeReference, tree: 
ad_user_id#4


 Anyone have any idea about this?



​


Re: Scala Spark SQL row object Ordinal Method Call Aliasing

2015-01-20 Thread Michael Armbrust
I use extractors:

sql("SELECT name, age FROM people").map {
  Row(name: String, age: Int) =>
...
}

On Tue, Jan 20, 2015 at 6:48 AM, Sunita Arvind 
wrote:

> The below is not exactly a solution to your question but this is what we
> are doing. For the first time we do end up doing row.getstring() and we
> immediately parse it through a map function which aligns it to either a
> case class or a structType. Then we register it as a table and use just
> column names. The spark sql wiki has good examples for this. Looks more
> easy to manage to me than your solution below.
>
> Agree with you on the fact that when there are lot of columns,
> row.getString() even once is not convenient
>
> Regards
>
> Sunita
>
>
> On Tuesday, January 20, 2015, Night Wolf  wrote:
>
>> In Spark SQL we have Row objects which contain a list of fields that
>> make up a row. A Rowhas ordinal accessors such as .getInt(0) or
>> getString(2).
>>
>> Say ordinal 0 = ID and ordinal 1 = Name. It becomes hard to remember what
>> ordinal is what, making the code confusing.
>>
>> Say for example I have the following code
>>
>> def doStuff(row: Row) = {
>>   //extract some items from the row into a tuple;
>>   (row.getInt(0), row.getString(1)) //tuple of ID, Name}
>>
>> The question becomes how could I create aliases for these fields in a Row
>> object?
>>
>> I was thinking I could create methods which take a implicit Row object;
>>
>> def id(implicit row: Row) = row.getInt(0)def name(implicit row: Row) = 
>> row.getString(1)
>>
>> I could then rewrite the above as;
>>
>> def doStuff(implicit row: Row) = {
>>   //extract some items from the row into a tuple;
>>   (id, name) //tuple of ID, Name}
>>
>> Is there a better/neater approach?
>>
>>
>> Cheers,
>>
>> ~NW
>>
>


Re: Spark SQL: Assigning several aliases to the output (several return values) of an UDF

2015-01-20 Thread Cheng Lian
Guess this can be helpful: 
http://stackoverflow.com/questions/14252615/stack-function-in-hive-how-to-specify-multiple-aliases


On 1/19/15 8:26 AM, mucks17 wrote:

Hello



I use Hive on Spark and have an issue with assigning several aliases to the
output (several return values) of an UDF. I ran in several issues and ended
up with a workaround (described at the end of this message).

- Is assigning several aliases to the output of an UDF not supported by
Spark SQL yet?

- Is there a smarter solution than the one I ended up with finally - see
(3)?



1)

The query with following syntax is rejected due to the assigment of multiple
aliases.

Query

SELECT my_function(param_one, param_two) AS (return_one, return_two,
return_three)
FROM my_table;

Error

Unsupported language features in query: SELECT my_function(param_one,
param_two) AS (return_one, return_two, return_three)
FROM my_table;

TOK_QUERY
   TOK_FROM
 TOK_TABREF
   TOK_TABNAME
 my_table
 TOK_SELECT
   TOK_SELEXPR
 TOK_FUNCTION
   my_function
   TOK_TABLE_OR_COL
 param_one
   TOK_TABLE_OR_COL
 param_two
 return_one
 return_two
 return_three



2)

Because of this error I searched for a way to avoid assigning multiple
aliases to the UDF. I ended up having the following query and encountered
another error/issue.
Note: This error only occurs when having "c_0" in select clause. Only
selecting "c_1" and "c_2" works fine.

Query

SELECT return.c_0 AS return_one, return.c_1 AS return_two, return.c_2 AS
return_three FROM (SELECT my_function(param_one, param_two) FROM my_table)
return;

Error

java.lang.RuntimeException: Couldn't find c_0#504 in
[c_0#521L,c_1#522,c_2#523]



3)

My final (working) workaround is wrapping the actual query (the one with the
UDF) with an additional select statement.

Query

SELECT result.c_0 AS return_one, result.c_1 AS return_two, result.c_2 AS
return_three FROM(SELECT * FROM (SELECT my_function(param_one, param_two)
FROM my_table) return) result;

Error

No error :)



Kind regards
Max



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Assigning-several-aliases-to-the-output-several-return-values-of-an-UDF-tp21238.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLib: How to set preferences for ALS implicit feedback in Collaborative Filtering?

2015-01-20 Thread Xiangrui Meng
The assumption of implicit feedback model is that the unobserved
ratings are more likely to be negative. So you may want to add some
negatives for evaluation. Otherwise, the input ratings are all 1 and
the test ratings are all 1 as well. The baseline predictor, which uses
the average rating (that is 1), could easily give you an RMSE of 0.0.
-Xiangrui

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to create distributed matrixes from hive tables.

2015-01-20 Thread Xiangrui Meng
You can get a SchemaRDD from the Hive table, map it into a RDD of
Vectors, and then construct a RowMatrix. The transformations are lazy,
so there is no external storage requirement for intermediate data.
-Xiangrui

On Sun, Jan 18, 2015 at 4:07 AM, guxiaobo1982  wrote:
> Hi,
>
> We have large datasets with data format for Spark MLLib matrix, but there
> are pre-computed by Hive and stored inside Hive, my question is can we
> create a distributed matrix such as IndexedRowMatrix directlly from Hive
> tables, avoiding reading data from Hive tables and feed them into an empty
> Matrix.
>
> Regards
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [SQL] Using HashPartitioner to distribute by column

2015-01-20 Thread Cheng Lian
First of all, even if the underlying dataset is partitioned as expected, 
a shuffle can’t be avoided. Because Spark SQL knows nothing about the 
underlying data distribution. However, this does reduce network IO.


You can prepare your data like this (say |CustomerCode| is a string 
field with ordinal 1):


|val  schemaRdd  =  sql(...)
val  schema  =  schemaRdd.schema
val  prepared  =  schemaRdd.keyBy(_.getString(1)).partitionBy(new  
HashPartitioner(n)).values.applySchema(schema)
|

|n| should be equal to |spark.sql.shuffle.partitions|.

Cheng

On 1/19/15 7:44 AM, Mick Davies wrote:


Is it possible to use a HashPartioner or something similar to distribute a
SchemaRDDs data by the hash of a particular column or set of columns.

Having done this I would then hope that GROUP BY could avoid shuffle

E.g. set up a HashPartioner on CustomerCode field so that

SELECT CustomerCode, SUM(Cost)
FROM Orders
GROUP BY CustomerCode

would not need to shuffle.

Cheers
Mick





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



​


Re: Saving a mllib model in Spark SQL

2015-01-20 Thread Xiangrui Meng
You can save the cluster centers as a SchemaRDD of two columns (id:
Int, center: Array[Double]). When you load it back, you can construct
the k-means model from its cluster centers. -Xiangrui

On Tue, Jan 20, 2015 at 11:55 AM, Cheng Lian  wrote:
> This is because KMeanModel is neither a built-in type nor a user defined
> type recognized by Spark SQL. I think you can write your own UDT version of
> KMeansModel in this case. You may refer to o.a.s.mllib.linalg.Vector and
> o.a.s.mllib.linalg.VectorUDT as an example.
>
> Cheng
>
> On 1/20/15 5:34 AM, Divyansh Jain wrote:
>
> Hey people,
>
> I have run into some issues regarding saving the k-means mllib model in
> Spark SQL by converting to a schema RDD. This is what I am doing:
>
> case class Model(id: String, model:
> org.apache.spark.mllib.clustering.KMeansModel)
> import sqlContext.createSchemaRDD
> val rowRdd = sc.makeRDD(Seq("id", model)).map(p => Model("id", model))
>
> This is the error that I get :
>
> scala.MatchError: org.apache.spark.mllib.classification.ClassificationModel
> (of class scala.reflect.internal.Types$TypeRef$anon$6)
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:53)
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$anonfun$schemaFor$1.apply(ScalaReflection.scala:64)
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$anonfun$schemaFor$1.apply(ScalaReflection.scala:62)
>   at
> scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
>   at
> scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:62)
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:50)
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:44)
>   at
> org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:229)
>   at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:94)
>
> Any help would be appreciated. Thanks!
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Saving-a-mllib-model-in-Spark-SQL-tp21264.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL schemaRDD & MapPartitions calls - performance issues - columnar formats?

2015-01-20 Thread Cheng Lian

On 1/15/15 11:26 PM, Nathan McCarthy wrote:


Thanks Cheng!

Is there any API I can get access too (e.g. ParquetTableScan) which 
would allow me to load up the low level/baseRDD of just RDD[Row] so I 
could avoid the defensive copy (maybe lose our on columnar storage etc.).


We have parts of our pipeline using SparkSQL/SchemaRDDs and others 
using the core RDD api (mapPartitions etc.). Any tips?


(Michael has already answered this)



Out of curiosity, a lot of SparkSQL functions seem to run in a 
mapPartiton (e.g. Distinct). Does a defensive copy happen there too?


Only if necessary. For example, |Sort| does defensive copy as it needs 
to cache rows for sorting.




Keen to get the best performance and the best blend of SparkSQL and 
functional Spark.


Cheers,
Nathan

From: Cheng Lian mailto:lian.cs@gmail.com>>
Date: Monday, 12 January 2015 1:21 am
To: Nathan >, Michael Armbrust 
mailto:mich...@databricks.com>>
Cc: "user@spark.apache.org " 
mailto:user@spark.apache.org>>
Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance 
issues - columnar formats?



On 1/11/15 1:40 PM, Nathan McCarthy wrote:

Thanks Cheng & Michael! Makes sense. Appreciate the tips!

Idiomatic scala isn't performant. I’ll definitely start using while 
loops or tail recursive methods. I have noticed this in the spark 
code base.


I might try turning off columnar compression (via 
/spark.sql.inMemoryColumnarStorage.compressed=false /correct?) and 
see how performance compares to the primitive objects. Would you 
expect to see similar runtimes vs the primitive objects? We do have 
the luxury of lots of memory at the moment so this might give us an 
additional performance boost.
Turning off compression should be faster, but still slower than 
directly using primitive objects. Because Spark SQL also serializes 
all objects within a column into byte buffers in a compact format. 
However, this radically reduces number of Java objects in the heap and 
is more GC friendly. When running large queries, cost introduced by GC 
can be significant.


Regarding the defensive copying of row objects. Can we switch this 
off and just be aware of the risks? Is MapPartitions on SchemaRDDs 
and operating on the Row object the most performant way to be 
flipping between SQL & Scala user code? Is there anything else I 
could be doing?
This can be very dangerous and error prone. Whenever an operator tries 
to cache row objects, turning off defensive copying can introduce 
wrong query result. For example, sort-based shuffle caches rows to do 
sorting. In some cases, sample operator may also cache row objects. 
This is very implementation specific and may change between versions.


Cheers,
~N

From: Michael Armbrust >

Date: Saturday, 10 January 2015 3:41 am
To: Cheng Lian mailto:lian.cs@gmail.com>>
Cc: Nathan >, "user@spark.apache.org 
" >
Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance 
issues - columnar formats?


The other thing to note here is that Spark SQL defensively copies 
rows when we switch into user code.  This probably explains the 
difference between 1 & 2.


The difference between 1 & 3 is likely the cost of decompressing the 
column buffers vs. accessing a bunch of uncompressed primitive objects.


On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian > wrote:


Hey Nathan,

Thanks for sharing, this is a very interesting post :) My
comments are inlined below.

Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:

Hi,

I’m trying to use a combination of SparkSQL and ‘normal'
Spark/Scala via rdd.mapPartitions(…). Using the latest release
1.2.0.

Simple example; load up some sample data from parquet on HDFS
(about 380m rows, 10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
t.registerTempTable("test1”)
sqlC.cacheTable("test1”)

Now lets do some operations on it; I want the total sales &
quantities sold for each hour in the day so I choose 3 out of
the 10 possible columns...

sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group
by Hour").collect().foreach(println)

After the table has been 100% cached in memory, this takes
around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t
production ready code but gets the job done).

  val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
rddPC.mapPartitions { case hrs =>
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r <- hrs) {
  val hr = r.getInt(0)
  qtySum(hr) += r.getDouble(1)
salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator

Re: Spark Sql reading whole table from cache instead of required coulmns

2015-01-20 Thread Cheng Lian

Hey Surbhit,

In this case, the web UI stats is not accurate. Please refer to this 
thread for an explanation: 
https://www.mail-archive.com/user@spark.apache.org/msg18919.html


Cheng

On 1/13/15 1:46 AM, Surbhit wrote:

Hi,

I am using spark 1.1.0.
I am using the spark-sql shell to run all the below queries.

I have created an external parquet table using the below SQL:

create external table daily (<15 column names>)
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat'
OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat'
LOCATION '';

Then I cache the table using the following set of commands:

set spark.sql.inMemoryColumnarStorage.compressed=true;
cache table daily;
select count(*) from daily;

The in-memory size of this table after caching is ~40 G. Complete table gets
cached in memory.

Now when I run a simple query which involves only one of the 15 columns of
this table, the whole table(~40 G) is read from the cache instead of just
one column as shown by the spark web UI. A sample query that I fired after
caching the table is:

select count(distinct col1) from daily;

I expect that only the required column should be read from the cache as the
data is stored in columnar format in cache.

Can someone please tell me if my expectation is correct. And if yes, than
what am I missing here, any configuration or something which will give me
the desired result.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Sql-reading-whole-table-from-cache-instead-of-required-coulmns-tp21113.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Saving a mllib model in Spark SQL

2015-01-20 Thread Cheng Lian
Yeah, as Michael said, I forgot that UDT is not a public API. Xiangrui's 
suggestion makes more sense.


Cheng

On 1/20/15 12:49 PM, Xiangrui Meng wrote:

You can save the cluster centers as a SchemaRDD of two columns (id:
Int, center: Array[Double]). When you load it back, you can construct
the k-means model from its cluster centers. -Xiangrui

On Tue, Jan 20, 2015 at 11:55 AM, Cheng Lian  wrote:

This is because KMeanModel is neither a built-in type nor a user defined
type recognized by Spark SQL. I think you can write your own UDT version of
KMeansModel in this case. You may refer to o.a.s.mllib.linalg.Vector and
o.a.s.mllib.linalg.VectorUDT as an example.

Cheng

On 1/20/15 5:34 AM, Divyansh Jain wrote:

Hey people,

I have run into some issues regarding saving the k-means mllib model in
Spark SQL by converting to a schema RDD. This is what I am doing:

case class Model(id: String, model:
org.apache.spark.mllib.clustering.KMeansModel)
import sqlContext.createSchemaRDD
val rowRdd = sc.makeRDD(Seq("id", model)).map(p => Model("id", model))

This is the error that I get :

scala.MatchError: org.apache.spark.mllib.classification.ClassificationModel
(of class scala.reflect.internal.Types$TypeRef$anon$6)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:53)
  at
org.apache.spark.sql.catalyst.ScalaReflection$anonfun$schemaFor$1.apply(ScalaReflection.scala:64)
  at
org.apache.spark.sql.catalyst.ScalaReflection$anonfun$schemaFor$1.apply(ScalaReflection.scala:62)
  at
scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
  at
scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:62)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:50)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:44)
  at
org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperators.scala:229)
  at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:94)

Any help would be appreciated. Thanks!







--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Saving-a-mllib-model-in-Spark-SQL-tp21264.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Support for SQL on unions of tables (merge tables?)

2015-01-20 Thread Cheng Lian
I think you can resort to a Hive table partitioned by date 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables


On 1/11/15 9:51 PM, Paul Wais wrote:


Dear List,

What are common approaches for addressing over a union of tables / 
RDDs?  E.g. suppose I have a collection of log files in HDFS, one log 
file per day, and I want to compute the sum of some field over a date 
range in SQL.  Using log schema, I can read each as a distinct 
SchemaRDD, but I want to union them all and query against one 'table'.


If this data were in MySQL, I could have a table for each day of data 
and use a MyISAM merge table to union these tables together and just 
query against the merge table.  What's nice here is that MySQL 
persists the merge table, and the merge table is r/w, so one can just 
update the merge table once per day.  (What's not nice is that merge 
tables scale poorly, backup admin is a pain, and oh hey I'd like to 
use Spark not MySQL).


One naive and untested idea (that achieves implicit persistence): scan 
an HDFS directory for log files, create one RDD per file, union() the 
RDDs, then create a Schema RDD from that union().


A few specific questions:
 * Any good approaches to a merge / union table? (Other than the naive 
idea above).  Preferably with some way to persist that table / RDD 
between Spark runs.  (How does Impala approach this problem?)


 * Has anybody tried joining against such a union of tables / RDDs on 
a very large amount of data?  When I've tried (non-spark-sql) 
union()ing Sequence Files, and then join()ing them against another 
RDD, Spark seems to try to compute the full union before doing any 
join() computation (and eventually OOMs the cluster because the union 
of Sequence Files is so big). I haven't tried something similar with 
Spark SQL.


 * Are there any plans related to this in the Spark roadmap?  (This 
feature would be a nice compliment to, say, persistent RDD indices for 
interactive querying).


 * Related question: are there plans to use Parquet Index Pages to 
make Spark SQL faster?  E.g. log indices over date ranges would be 
relevant here.


All the best,
-Paul





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Aggregate order semantics when spilling

2015-01-20 Thread Justin Uang
Hi Andrew,

Thanks for your response! For our use case, we aren't actually grouping,
but rather updating running aggregates. I just picked grouping because it
made the example easier to write out. However, when we merge combiners, the
combiners have to have data that are adjacent to each other in the original
partition.

I feel that requiring groupByKey/cogroup to insert values into the correct
place is quite expensive, and may not be possible for combiners that are
trying to collapse down the data while assuming order. Would it be really
expensive or perilous to the API if we just had combineByKey merge
combiners in the same order as the data slices they represent? I have a
very simple prototype that adds this additional semantic (it's only a 16
line diff):

https://github.com/justinuang/spark/commit/b92ee6a6dbf70207eca68296289cb62c3cea76b8

It looks like the additional comparison is trivial to runtime, and this
doesn't break any backcompat.

Thanks,

Justin

On Tue, Jan 20, 2015 at 8:03 PM, Andrew Or  wrote:

> Hi Justin,
>
> I believe the intended semantics of groupByKey or cogroup is that the
> ordering *within a key *is not preserved if you spill. In fact, the test
> cases for the ExternalAppendOnlyMap only assert that the Set representation
> of the results is as expected (see this line
> ).
> This is because these Spark primitives literally just group the values by a
> key but does not provide any ordering guarantees.
>
> However, if ordering within a key is a requirement for your application,
> then you may need to write your own PairRDDFunction that calls
> combineByKey. You can model your method after groupByKey, but change the
> combiner function slightly to take ordering into account. This may add some
> overhead to your application since you need to insert every value in the
> appropriate place, but since you're spilling anyway the overhead will
> likely be shadowed by disk I/O.
>
> Let me know if that works.
> -Andrew
>
>
> 2015-01-20 9:18 GMT-08:00 Justin Uang :
>
> Hi,
>>
>> I am trying to aggregate a key based on some timestamp, and I believe
>> that spilling to disk is changing the order of the data fed into the
>> combiner.
>>
>> I have some timeseries data that is of the form: ("key", "date", "other
>> data")
>>
>> Partition 1
>> ("A", 2, ...)
>> ("B", 4, ...)
>> ("A", 1, ...)
>> ("A", 3, ...)
>> ("B", 6, ...)
>>
>> which I then partition by key, then sort within the partition:
>>
>> Partition 1
>> ("A", 1, ...)
>> ("A", 2, ...)
>> ("A", 3, ...)
>> ("A", 4, ...)
>>
>> Partition 2
>> ("B", 4, ...)
>> ("B", 6, ...)
>>
>> If I run a combineByKey with the same partitioner, then the items for
>> each key will be fed into the ExternalAppendOnlyMap in the correct order.
>> However, if I spill, then the time slices are spilled to disk as multiple
>> partial combiners. When its time to merge the spilled combiners for each
>> key, the combiners are combined in the wrong order.
>>
>> For example, if during a groupByKey, [("A", 1, ...), ("A", 2...)] and
>> [("A", 3, ...), ("A", 4, ...)] are spilled separately, it's possible that
>> the combiners can be combined in the wrong order, like [("A", 3, ...),
>> ("A", 4, ...), ("A", 1, ...), ("A", 2, ...)], which invalidates the
>> invariant that all the values for A are passed in order to the combiners.
>>
>> I'm not an expert, but I suspect that this is because we use a heap
>> ordered by key when iterating, which doesn't retain the order the spilled
>> combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index),
>> where spill_index is incremented each time we spill? This would mean that
>> we would pop and merge the combiners of each key in order, resulting in
>> [("A", 1, ...), ("A", 2, ...), ("A", 3, ...), ("A", 4, ...)].
>>
>> Thanks in advance for the help! If there is a way to do this already in
>> Spark 1.2, can someone point it out to me?
>>
>> Best,
>>
>> Justin
>>
>
>


Re: Error for first run from iPython Notebook

2015-01-20 Thread Felix C
+1. I can confirm this. It says collect fails in Py4J

--- Original Message ---

From: "Dave" 
Sent: January 20, 2015 6:49 AM
To: user@spark.apache.org
Subject: Re: Error for first run from iPython Notebook

Not sure if anyone who can help has seen this. Any suggestions would be
appreciated, thanks!

On Mon Jan 19 2015 at 1:50:43 PM Dave  wrote:

> Hi,
>
> I've setup my first spark cluster (1 master, 2 workers) and an iPython
> notebook server that I'm trying to setup to access the cluster. I'm running
> the workers from Anaconda to make sure the python setup is correct on each
> box. The iPy notebook server appears to have everything setup correctly,
> and I'm able to initialize Spark and push a job out. However, the job is
> failing, and I'm not sure how to troubleshoot. Here's the code:
>
> from pyspark import SparkContext
> CLUSTER_URL = 'spark://192.168.1.20:7077'
> sc = SparkContext( CLUSTER_URL, 'pyspark')
> def sample(p):
> x, y = random(), random()
> return 1 if x*x + y*y < 1 else 0
>
> count = sc.parallelize(xrange(0, 20)).map(sample).reduce(lambda a, b: a +
> b)
> print "Pi is roughly %f" % (4.0 * count / 20)
>
>
> And here's the error:
>
> Py4JJavaError Traceback (most recent call 
> last) in ()  3 return 1 if x*x 
> + y*y < 1 else 0  4 > 5 count = sc.parallelize(xrange(0, 
> 20)).map(sample).reduce(lambda a, b: a + b)  6 print "Pi is roughly %f" % 
> (4.0 * count / 20)
> /opt/spark-1.2.0/python/pyspark/rdd.pyc in reduce(self, f)713 
> yield reduce(f, iterator, initial)714 --> 715 vals = 
> self.mapPartitions(func).collect()716 if vals:717 
> return reduce(f, vals)
> /opt/spark-1.2.0/python/pyspark/rdd.pyc in collect(self)674 """   
>  675 with SCCallSiteSync(self.context) as css:--> 676 
> bytesInJava = self._jrdd.collect().iterator()677 return 
> list(self._collect_iterator_through_file(bytesInJava))678
> /opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in 
> __call__(self, *args)536 answer = 
> self.gateway_client.send_command(command)537 return_value = 
> get_return_value(answer, self.gateway_client,--> 538 
> self.target_id, self.name)539 540 for temp_arg in temp_args:
> /opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in 
> get_return_value(answer, gateway_client, target_id, name)298  
>raise Py4JJavaError(299 'An error occurred while 
> calling {0}{1}{2}.\n'.--> 300 format(target_id, '.', 
> name), value)301 else:302 raise Py4JError(
> Py4JJavaError: An error occurred while calling o28.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 
> in stage 0.0 failed 4 times, most recent failure: Lost task 31.3 in stage 0.0 
> (TID 72, 192.168.1.21): org.apache.spark.api.python.PythonException: 
> Traceback (most recent call last):
>   File "/opt/spark-1.2.0/python/pyspark/worker.py", line 107, in main
> process()
>   File "/opt/spark-1.2.0/python/pyspark/worker.py", line 98, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/opt/spark-1.2.0/python/pyspark/serializers.py", line 227, in 
> dump_stream
> vs = list(itertools.islice(iterator, batch))
>   File "/opt/spark-1.2.0/python/pyspark/rdd.py", line 710, in func
> initial = next(iterator)
>   File "", line 2, in sample
> TypeError: 'module' object is not callable
>
>   at 
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
>   at 
> org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:174)
>   at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(A

RE: Can I save RDD to local file system and then read it back on spark cluster with multiple nodes?

2015-01-20 Thread Mohammed Guller
I don’t think it will work without HDFS.

Mohammed

From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com]
Sent: Tuesday, January 20, 2015 7:55 AM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: RE: Can I save RDD to local file system and then read it back on spark 
cluster with multiple nodes?

Can anybody answer this? Do I have to have hdfs to achieve this?

Regards,

Ningjun Wang
Consulting Software Engineer
LexisNexis
121 Chanlon Road
New Providence, NJ 07974-1541

From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com]
Sent: Friday, January 16, 2015 1:15 PM
To: Imran Rashid
Cc: user@spark.apache.org
Subject: RE: Can I save RDD to local file system and then read it back on spark 
cluster with multiple nodes?

I need to save RDD to file system and then restore my RDD from the file system 
in the future. I don’t have any hdfs file system and don’t want to go the 
hassle of setting up a hdfs system. So how can I achieve this? The application 
need to be run on a cluster with multiple nodes.

Regards,

Ningjun

From: imranra...@gmail.com 
[mailto:imranra...@gmail.com] On Behalf Of Imran Rashid
Sent: Friday, January 16, 2015 12:14 PM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: Can I save RDD to local file system and then read it back on spark 
cluster with multiple nodes?


I'm not positive, but I think this is very unlikely to work.

First, when you call sc.objectFile(...),  I think the *driver* will need to 
know something about the file, eg to know how many tasks to create.  But it 
won't even be able to see the file, since it only lives on the local filesystem 
of the cluster nodes.

If you really wanted to, you could probably write out some small metadata about 
the files and write your own version of objectFile that uses it.  But I think 
there is a bigger conceptual issue.  You might not in general be sure that you 
are running on the same nodes when you save the file, as when you read it back 
in.  So the file might not be present on the local filesystem for the active 
executors.  You might be able to guarantee it for the specific cluster setup 
you have now, but it might limit you down the road.

What are you trying to achieve?  There might be a better way.  I believe 
writing to hdfs will usually write one local copy, so you'd still be doing a 
local read when you reload the data.

Imran
On Jan 16, 2015 6:19 AM, "Wang, Ningjun (LNG-NPV)" 
mailto:ningjun.w...@lexisnexis.com>> wrote:
I have asked this question before but get no answer. Asking again.

Can I save RDD to the local file system and then read it back on a spark 
cluster with multiple nodes?

rdd.saveAsObjectFile(“file:///home/data/rdd1”)

val rdd2 = sc.objectFile(“file:///home/data/rdd1”)

This will works if the cluster has only one node. But my cluster has 3 nodes 
and each node has a local dir called /home/data. Is rdd saved to the local dir 
across 3 nodes? If so, does sc.objectFile(…) smart enough to read the local dir 
in all 3 nodes to merge them into a single rdd?

Ningjun



Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Shailesh Birari
Hello,

I recently upgraded my setup from Spark 1.1 to Spark 1.2.
My existing applications are working fine on ubuntu cluster.
But, when I try to execute Spark MLlib application from Eclipse (Windows
node) it gives java.lang.NoClassDefFoundError:
com/google/common/base/Preconditions exception.

Note, 
   1. With Spark 1.1 this was working fine.
   2. The Spark 1.2 jar files are linked in Eclipse project.
   3. Checked the jar -tf output and found the above com.google.common.base
is not present.

-Exception
log:

Exception in thread "main" java.lang.NoClassDefFoundError:
com/google/common/base/Preconditions
at
org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:94)
at
org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77)
at
org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62)
at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
at org.apache.spark.SparkContext.(SparkContext.scala:340)
at
org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74)
at 
org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala)
Caused by: java.lang.ClassNotFoundException:
com.google.common.base.Preconditions
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 7 more

-

jar -tf output:


consb2@CONSB2A
/cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib
$ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions
org/spark-project/guava/common/base/Preconditions.class
org/spark-project/guava/common/math/MathPreconditions.class
com/clearspring/analytics/util/Preconditions.class
parquet/Preconditions.class
com/google/inject/internal/util/$Preconditions.class

---

Please help me in resolving this.

Thanks,
  Shailesh




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-com-google-common-base-Preconditions-java-lang-NoClassDefFoundErro-tp21271.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Sean Owen
Guava is shaded in Spark 1.2+. It looks like you are mixing versions
of Spark then, with some that still refer to unshaded Guava. Make sure
you are not packaging Spark with your app and that you don't have
other versions lying around.

On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari  wrote:
> Hello,
>
> I recently upgraded my setup from Spark 1.1 to Spark 1.2.
> My existing applications are working fine on ubuntu cluster.
> But, when I try to execute Spark MLlib application from Eclipse (Windows
> node) it gives java.lang.NoClassDefFoundError:
> com/google/common/base/Preconditions exception.
>
> Note,
>1. With Spark 1.1 this was working fine.
>2. The Spark 1.2 jar files are linked in Eclipse project.
>3. Checked the jar -tf output and found the above com.google.common.base
> is not present.
>
> -Exception
> log:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> com/google/common/base/Preconditions
> at
> org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:94)
> at
> org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77)
> at
> org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62)
> at 
> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
> at org.apache.spark.SparkContext.(SparkContext.scala:340)
> at
> org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74)
> at 
> org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala)
> Caused by: java.lang.ClassNotFoundException:
> com.google.common.base.Preconditions
> at java.net.URLClassLoader$1.run(Unknown Source)
> at java.net.URLClassLoader$1.run(Unknown Source)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> ... 7 more
>
> -
>
> jar -tf output:
>
>
> consb2@CONSB2A
> /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib
> $ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions
> org/spark-project/guava/common/base/Preconditions.class
> org/spark-project/guava/common/math/MathPreconditions.class
> com/clearspring/analytics/util/Preconditions.class
> parquet/Preconditions.class
> com/google/inject/internal/util/$Preconditions.class
>
> ---
>
> Please help me in resolving this.
>
> Thanks,
>   Shailesh
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-com-google-common-base-Preconditions-java-lang-NoClassDefFoundErro-tp21271.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Ted Yu
Please also see this thread:
http://search-hadoop.com/m/JW1q5De7pU1

On Tue, Jan 20, 2015 at 3:58 PM, Sean Owen  wrote:

> Guava is shaded in Spark 1.2+. It looks like you are mixing versions
> of Spark then, with some that still refer to unshaded Guava. Make sure
> you are not packaging Spark with your app and that you don't have
> other versions lying around.
>
> On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari 
> wrote:
> > Hello,
> >
> > I recently upgraded my setup from Spark 1.1 to Spark 1.2.
> > My existing applications are working fine on ubuntu cluster.
> > But, when I try to execute Spark MLlib application from Eclipse (Windows
> > node) it gives java.lang.NoClassDefFoundError:
> > com/google/common/base/Preconditions exception.
> >
> > Note,
> >1. With Spark 1.1 this was working fine.
> >2. The Spark 1.2 jar files are linked in Eclipse project.
> >3. Checked the jar -tf output and found the above
> com.google.common.base
> > is not present.
> >
> >
> -Exception
> > log:
> >
> > Exception in thread "main" java.lang.NoClassDefFoundError:
> > com/google/common/base/Preconditions
> > at
> >
> org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:94)
> > at
> >
> org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77)
> > at
> >
> org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62)
> > at
> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
> > at org.apache.spark.SparkContext.(SparkContext.scala:340)
> > at
> >
> org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74)
> > at
> org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala)
> > Caused by: java.lang.ClassNotFoundException:
> > com.google.common.base.Preconditions
> > at java.net.URLClassLoader$1.run(Unknown Source)
> > at java.net.URLClassLoader$1.run(Unknown Source)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(Unknown Source)
> > at java.lang.ClassLoader.loadClass(Unknown Source)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
> > at java.lang.ClassLoader.loadClass(Unknown Source)
> > ... 7 more
> >
> >
> -
> >
> > jar -tf output:
> >
> >
> > consb2@CONSB2A
> > /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib
> > $ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions
> > org/spark-project/guava/common/base/Preconditions.class
> > org/spark-project/guava/common/math/MathPreconditions.class
> > com/clearspring/analytics/util/Preconditions.class
> > parquet/Preconditions.class
> > com/google/inject/internal/util/$Preconditions.class
> >
> >
> ---
> >
> > Please help me in resolving this.
> >
> > Thanks,
> >   Shailesh
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-com-google-common-base-Preconditions-java-lang-NoClassDefFoundErro-tp21271.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Can I save RDD to local file system and then read it back on spark cluster with multiple nodes?

2015-01-20 Thread Davies Liu
If the dataset is not huge (in a few GB), you can setup NFS instead of
HDFS (which is much harder to setup):

1. export a directory in master (or anyone in the cluster)
2. mount it in the same position across all slaves
3. read/write from it by file:///path/to/monitpoint

On Tue, Jan 20, 2015 at 7:55 AM, Wang, Ningjun (LNG-NPV)
 wrote:
> Can anybody answer this? Do I have to have hdfs to achieve this?
>
>
>
> Regards,
>
>
>
> Ningjun Wang
>
> Consulting Software Engineer
>
> LexisNexis
>
> 121 Chanlon Road
>
> New Providence, NJ 07974-1541
>
>
>
> From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com]
> Sent: Friday, January 16, 2015 1:15 PM
> To: Imran Rashid
> Cc: user@spark.apache.org
> Subject: RE: Can I save RDD to local file system and then read it back on
> spark cluster with multiple nodes?
>
>
>
> I need to save RDD to file system and then restore my RDD from the file
> system in the future. I don’t have any hdfs file system and don’t want to go
> the hassle of setting up a hdfs system. So how can I achieve this? The
> application need to be run on a cluster with multiple nodes.
>
>
>
> Regards,
>
>
>
> Ningjun
>
>
>
> From: imranra...@gmail.com [mailto:imranra...@gmail.com] On Behalf Of Imran
> Rashid
> Sent: Friday, January 16, 2015 12:14 PM
> To: Wang, Ningjun (LNG-NPV)
> Cc: user@spark.apache.org
> Subject: Re: Can I save RDD to local file system and then read it back on
> spark cluster with multiple nodes?
>
>
>
> I'm not positive, but I think this is very unlikely to work.
>
> First, when you call sc.objectFile(...),  I think the *driver* will need to
> know something about the file, eg to know how many tasks to create.  But it
> won't even be able to see the file, since it only lives on the local
> filesystem of the cluster nodes.
>
> If you really wanted to, you could probably write out some small metadata
> about the files and write your own version of objectFile that uses it.  But
> I think there is a bigger conceptual issue.  You might not in general be
> sure that you are running on the same nodes when you save the file, as when
> you read it back in.  So the file might not be present on the local
> filesystem for the active executors.  You might be able to guarantee it for
> the specific cluster setup you have now, but it might limit you down the
> road.
>
> What are you trying to achieve?  There might be a better way.  I believe
> writing to hdfs will usually write one local copy, so you'd still be doing a
> local read when you reload the data.
>
> Imran
>
> On Jan 16, 2015 6:19 AM, "Wang, Ningjun (LNG-NPV)"
>  wrote:
>
> I have asked this question before but get no answer. Asking again.
>
>
>
> Can I save RDD to the local file system and then read it back on a spark
> cluster with multiple nodes?
>
>
>
> rdd.saveAsObjectFile(“file:///home/data/rdd1”)
>
>
>
> val rdd2 = sc.objectFile(“file:///home/data/rdd1”)
>
>
>
> This will works if the cluster has only one node. But my cluster has 3 nodes
> and each node has a local dir called /home/data. Is rdd saved to the local
> dir across 3 nodes? If so, does sc.objectFile(…) smart enough to read the
> local dir in all 3 nodes to merge them into a single rdd?
>
>
>
> Ningjun
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.1.0 - spark-submit failed

2015-01-20 Thread ey-chih chow
Hi,

I issued the following command in a ec2 cluster launched using spark-ec2:

~/spark/bin/spark-submit --class com.crowdstar.cluster.etl.ParseAndClean
--master spark://ec2-54-185-107-113.us-west-2.compute.amazonaws.com:7077
--deploy-mode cluster --total-executor-cores 4
file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar
/ETL/input/2015/01/10/12/10Jan2015.avro
file:///tmp/etl-admin/vertica/VERTICA.avdl
file:///tmp/etl-admin/vertica/extras.json
file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar

The command failed with the following error logs in Spark-UI.  Is there any
suggestion on how to fix the problem?  Thanks.

Ey-Chih Chow

==

Launch Command: "/usr/lib/jvm/java-1.7.0/bin/java" "-cp"
"/root/spark/work/driver-20150120200843-/spark-etl-0.0.1-SNAPSHOT.jar/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/root/spark/lib/datanucleus-api-jdo-3.2.1.jar:/root/spark/lib/datanucleus-core-3.2.2.jar:/root/spark/lib/datanucleus-rdbms-3.2.1.jar"
"-XX:MaxPermSize=128m"
"-Dspark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/"
"-Dspark.executor.memory=13000m" "-Dspark.akka.askTimeout=10"
"-Dspark.cores.max=4"
"-Dspark.app.name=com.crowdstar.cluster.etl.ParseAndClean"
"-Dspark.jars=file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar"
"-Dspark.executor.extraClassPath=/root/ephemeral-hdfs/conf"
"-Dspark.master=spark://ec2-54-203-58-2.us-west-2.compute.amazonaws.com:7077"
"-Dakka.loglevel=WARNING" "-Xms512M" "-Xmx512M"
"org.apache.spark.deploy.worker.DriverWrapper"
"akka.tcp://sparkwor...@ip-10-33-140-157.us-west-2.compute.internal:47585/user/Worker"
"com.crowdstar.cluster.etl.ParseAndClean"
"/ETL/input/2015/01/10/12/10Jan2015.avro"
"file:///tmp/etl-admin/vertica/VERTICA.avdl"
"file:///tmp/etl-admin/vertica/extras.json"
"file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar"


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/root/spark/work/driver-20150120200843-/spark-etl-0.0.1-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/root/spark/lib/spark-assembly-1.1.0-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/01/20 20:08:45 INFO spark.SecurityManager: Changing view acls to: root,
15/01/20 20:08:45 INFO spark.SecurityManager: Changing modify acls to: root,
15/01/20 20:08:45 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(root, ); users with modify permissions: Set(root, )
15/01/20 20:08:45 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/01/20 20:08:45 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [Driver-akka.actor.default-dispatcher-3] shutting down ActorSystem
[Driver]
java.lang.NoSuchMethodError:
org.jboss.netty.channel.socket.nio.NioWorkerPool.(Ljava/util/concurrent/Executor;I)V
at
akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:282)
at
akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:239)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:161)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:200)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
at
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at
akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
at
akka.remote.EndpointManager

Re: Does Spark automatically run different stages concurrently when possible?

2015-01-20 Thread Mark Hamstra
A map followed by a filter will not be two stages, but rather one stage that 
pipelines the map and filter.


> On Jan 20, 2015, at 10:26 AM, Kane Kim  wrote:
> 
> Related question - is execution of different stages optimized? I.e.
> map followed by a filter will require 2 loops or they will be combined
> into single one?
> 
>> On Tue, Jan 20, 2015 at 4:33 AM, Bob Tiernay  wrote:
>> I found the following to be a good discussion of the same topic:
>> 
>> http://apache-spark-user-list.1001560.n3.nabble.com/The-concurrent-model-of-spark-job-stage-task-td13083.html
>> 
>> 
>>> From: so...@cloudera.com
>>> Date: Tue, 20 Jan 2015 10:02:20 +
>>> Subject: Re: Does Spark automatically run different stages concurrently
>>> when possible?
>>> To: paliwalash...@gmail.com
>>> CC: davidkl...@hotmail.com; user@spark.apache.org
>> 
>>> 
>>> You can persist the RDD in (2) right after it is created. It will not
>>> cause it to be persisted immediately, but rather the first time it is
>>> materialized. If you persist after (3) is calculated, then it will be
>>> re-calculated (and persisted) after (4) is calculated.
>>> 
 On Tue, Jan 20, 2015 at 3:38 AM, Ashish  wrote:
 Sean,
 
 A related question. When to persist the RDD after step 2 or after Step
 3 (nothing would happen before step 3 I assume)?
 
> On Mon, Jan 19, 2015 at 5:17 PM, Sean Owen  wrote:
> From the OP:
> 
> (1) val lines = Import full dataset using sc.textFile
> (2) val ABonly = Filter out all rows from "lines" that are not of type
> A or B
> (3) val processA = Process only the A rows from ABonly
> (4) val processB = Process only the B rows from ABonly
> 
> I assume that 3 and 4 are actions, or else nothing happens here at all.
> 
> When 3 is invoked, it will compute 1, then 2, then 3. 4 will happen
> after 3, and may even cause 1 and 2 to happen again if nothing is
> persisted.
> 
> You can invoke 3 and 4 in parallel on the driver if you like. That's
> fine. But actions are blocking in the driver.
> 
> 
> 
> On Mon, Jan 19, 2015 at 8:21 AM, davidkl 
> wrote:
>> Hi Jon, I am looking for an answer for a similar question in the doc
>> now, so
>> far no clue.
>> 
>> I would need to know what is spark behaviour in a situation like the
>> example
>> you provided, but taking into account also that there are multiple
>> partitions/workers.
>> 
>> I could imagine it's possible that different spark workers are not
>> synchronized in terms of waiting for each other to progress to the
>> next
>> step/stage for the partitions of data they get assigned, while I
>> believe in
>> streaming they would wait for the current batch to complete before
>> they
>> start working on a new one.
>> 
>> In the code I am working on, I need to make sure a particular step is
>> completed (in all workers, for all partitions) before next
>> transformation is
>> applied.
>> 
>> Would be great if someone could clarify or point to these issues in
>> the doc!
>> :-)
>> 
>> 
>> 
>> 
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-automatically-run-different-stages-concurrently-when-possible-tp21075p21227.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 --
 thanks
 ashish
 
 Blog: http://www.ashishpaliwal.com/blog
 My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.1.0 - spark-submit failed

2015-01-20 Thread Ted Yu
Please check which netty jar(s) are on the classpath.

NioWorkerPool(Executor workerExecutor, int workerCount) was added in netty
3.5.4

Cheers

On Tue, Jan 20, 2015 at 4:15 PM, ey-chih chow  wrote:

> Hi,
>
> I issued the following command in a ec2 cluster launched using spark-ec2:
>
> ~/spark/bin/spark-submit --class com.crowdstar.cluster.etl.ParseAndClean
> --master spark://ec2-54-185-107-113.us-west-2.compute.amazonaws.com:7077
> --deploy-mode cluster --total-executor-cores 4
> file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar
> /ETL/input/2015/01/10/12/10Jan2015.avro
> file:///tmp/etl-admin/vertica/VERTICA.avdl
> file:///tmp/etl-admin/vertica/extras.json
> file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar
>
> The command failed with the following error logs in Spark-UI.  Is there any
> suggestion on how to fix the problem?  Thanks.
>
> Ey-Chih Chow
>
> ==
>
> Launch Command: "/usr/lib/jvm/java-1.7.0/bin/java" "-cp"
>
> "/root/spark/work/driver-20150120200843-/spark-etl-0.0.1-SNAPSHOT.jar/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/root/spark/lib/datanucleus-api-jdo-3.2.1.jar:/root/spark/lib/datanucleus-core-3.2.2.jar:/root/spark/lib/datanucleus-rdbms-3.2.1.jar"
> "-XX:MaxPermSize=128m"
> "-Dspark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/"
> "-Dspark.executor.memory=13000m" "-Dspark.akka.askTimeout=10"
> "-Dspark.cores.max=4"
> "-Dspark.app.name=com.crowdstar.cluster.etl.ParseAndClean"
> "-Dspark.jars=file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar"
> "-Dspark.executor.extraClassPath=/root/ephemeral-hdfs/conf"
> "-Dspark.master=spark://
> ec2-54-203-58-2.us-west-2.compute.amazonaws.com:7077"
> "-Dakka.loglevel=WARNING" "-Xms512M" "-Xmx512M"
> "org.apache.spark.deploy.worker.DriverWrapper"
> "akka.tcp://sparkwor...@ip-10-33-140-157.us-west-2.compute.internal
> :47585/user/Worker"
> "com.crowdstar.cluster.etl.ParseAndClean"
> "/ETL/input/2015/01/10/12/10Jan2015.avro"
> "file:///tmp/etl-admin/vertica/VERTICA.avdl"
> "file:///tmp/etl-admin/vertica/extras.json"
> "file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar"
> 
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
>
> [jar:file:/root/spark/work/driver-20150120200843-/spark-etl-0.0.1-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/root/spark/lib/spark-assembly-1.1.0-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/01/20 20:08:45 INFO spark.SecurityManager: Changing view acls to: root,
> 15/01/20 20:08:45 INFO spark.SecurityManager: Changing modify acls to:
> root,
> 15/01/20 20:08:45 INFO spark.SecurityManager: SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(root, ); users with modify permissions: Set(root, )
> 15/01/20 20:08:45 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 15/01/20 20:08:45 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [Driver-akka.actor.default-dispatcher-3] shutting down ActorSystem
> [Driver]
> java.lang.NoSuchMethodError:
>
> org.jboss.netty.channel.socket.nio.NioWorkerPool.(Ljava/util/concurrent/Executor;I)V
> at
> akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:282)
> at
> akka.remote.transport.netty.NettyTransport.(NettyTransport.scala:239)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
>
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
> at scala.util.Try$.apply(Try.scala:161)
> at
>
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
> at
>
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
> at
>
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
> at scala.util.Success.flatMap(Try.scala:200)
> at
>
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
> at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
> at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
> at
>
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterato

Python connector for spark-cassandra

2015-01-20 Thread Nishant Sinha
Hello everyone,

Is there a python connector for Spark and Cassandra as there is one for
Java. I found a Java connector by DataStax on github:

https://github.com/datastax/spark-cassandra-connector

I am looking for something similar in Java.

Thanks


Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Shailesh Birari
Hello,

I double checked the libraries. I am linking only with Spark 1.2.
Along with Spark 1.2 jars I have Scala 2.10 jars and JRE 7 jars linked and
nothing else.

Thanks,
   Shailesh

On Wed, Jan 21, 2015 at 12:58 PM, Sean Owen  wrote:

> Guava is shaded in Spark 1.2+. It looks like you are mixing versions
> of Spark then, with some that still refer to unshaded Guava. Make sure
> you are not packaging Spark with your app and that you don't have
> other versions lying around.
>
> On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari 
> wrote:
> > Hello,
> >
> > I recently upgraded my setup from Spark 1.1 to Spark 1.2.
> > My existing applications are working fine on ubuntu cluster.
> > But, when I try to execute Spark MLlib application from Eclipse (Windows
> > node) it gives java.lang.NoClassDefFoundError:
> > com/google/common/base/Preconditions exception.
> >
> > Note,
> >1. With Spark 1.1 this was working fine.
> >2. The Spark 1.2 jar files are linked in Eclipse project.
> >3. Checked the jar -tf output and found the above
> com.google.common.base
> > is not present.
> >
> >
> -Exception
> > log:
> >
> > Exception in thread "main" java.lang.NoClassDefFoundError:
> > com/google/common/base/Preconditions
> > at
> >
> org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:94)
> > at
> >
> org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77)
> > at
> >
> org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62)
> > at
> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
> > at org.apache.spark.SparkContext.(SparkContext.scala:340)
> > at
> >
> org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74)
> > at
> org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala)
> > Caused by: java.lang.ClassNotFoundException:
> > com.google.common.base.Preconditions
> > at java.net.URLClassLoader$1.run(Unknown Source)
> > at java.net.URLClassLoader$1.run(Unknown Source)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(Unknown Source)
> > at java.lang.ClassLoader.loadClass(Unknown Source)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
> > at java.lang.ClassLoader.loadClass(Unknown Source)
> > ... 7 more
> >
> >
> -
> >
> > jar -tf output:
> >
> >
> > consb2@CONSB2A
> > /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib
> > $ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions
> > org/spark-project/guava/common/base/Preconditions.class
> > org/spark-project/guava/common/math/MathPreconditions.class
> > com/clearspring/analytics/util/Preconditions.class
> > parquet/Preconditions.class
> > com/google/inject/internal/util/$Preconditions.class
> >
> >
> ---
> >
> > Please help me in resolving this.
> >
> > Thanks,
> >   Shailesh
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-com-google-common-base-Preconditions-java-lang-NoClassDefFoundErro-tp21271.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: S3 Bucket Access

2015-01-20 Thread bbailey
Hi sranga,

Were you ever able to get authentication working with the temporary IAM
credentials (id, secret, & token)? I am in the same situation and it would
be great if we could document a solution so others can benefit from this 

Thanks!


sranga wrote
> Thanks Rishi. That is exactly what I am trying to do now :)
> 
> On Tue, Oct 14, 2014 at 2:41 PM, Rishi Pidva <

> rpidva@

> > wrote:
> 
>>
>> As per EMR documentation:
>> http://docs.amazonaws.cn/en_us/ElasticMapReduce/latest/DeveloperGuide/emr-iam-roles.html
>> Access AWS Resources Using IAM Roles
>>
>> If you've launched your cluster with an IAM role, applications running on
>> the EC2 instances of that cluster can use the IAM role to obtain
>> temporary
>> account credentials to use when calling services in AWS.
>>
>> The version of Hadoop available on AMI 2.3.0 and later has already been
>> updated to make use of IAM roles. If your application runs strictly on
>> top
>> of the Hadoop architecture, and does not directly call any service in
>> AWS,
>> it should work with IAM roles with no modification.
>>
>> If your application calls services in AWS directly, you'll need to update
>> it to take advantage of IAM roles. This means that instead of obtaining
>> account credentials from/home/hadoop/conf/core-site.xml on the EC2
>> instances in the cluster, your application will now either use an SDK to
>> access the resources using IAM roles, or call the EC2 instance metadata
>> to
>> obtain the temporary credentials.
>> --
>>
>> Maybe you can use AWS SDK in your application to provide AWS credentials?
>>
>> https://github.com/seratch/AWScala
>>
>>
>> On Oct 14, 2014, at 11:10 AM, Ranga <

> sranga@

> > wrote:
>>





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3-Bucket-Access-tp16303p21273.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Frank Austin Nothaft
Shailesh,

To add, are you packaging Hadoop in your app? Hadoop will pull in Guava. Not 
sure if you are using Maven (or what) to build, but if you can pull up your 
builds dependency tree, you will likely find com.google.guava being brought in 
by one of your dependencies.

Regards,

Frank Austin Nothaft
fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edu
202-340-0466

On Jan 20, 2015, at 5:13 PM, Shailesh Birari  wrote:

> Hello,
> 
> I double checked the libraries. I am linking only with Spark 1.2.
> Along with Spark 1.2 jars I have Scala 2.10 jars and JRE 7 jars linked and 
> nothing else.
> 
> Thanks,
>Shailesh
> 
> On Wed, Jan 21, 2015 at 12:58 PM, Sean Owen  wrote:
> Guava is shaded in Spark 1.2+. It looks like you are mixing versions
> of Spark then, with some that still refer to unshaded Guava. Make sure
> you are not packaging Spark with your app and that you don't have
> other versions lying around.
> 
> On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari  wrote:
> > Hello,
> >
> > I recently upgraded my setup from Spark 1.1 to Spark 1.2.
> > My existing applications are working fine on ubuntu cluster.
> > But, when I try to execute Spark MLlib application from Eclipse (Windows
> > node) it gives java.lang.NoClassDefFoundError:
> > com/google/common/base/Preconditions exception.
> >
> > Note,
> >1. With Spark 1.1 this was working fine.
> >2. The Spark 1.2 jar files are linked in Eclipse project.
> >3. Checked the jar -tf output and found the above com.google.common.base
> > is not present.
> >
> > -Exception
> > log:
> >
> > Exception in thread "main" java.lang.NoClassDefFoundError:
> > com/google/common/base/Preconditions
> > at
> > org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:94)
> > at
> > org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77)
> > at
> > org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62)
> > at 
> > org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
> > at org.apache.spark.SparkContext.(SparkContext.scala:340)
> > at
> > org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74)
> > at 
> > org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala)
> > Caused by: java.lang.ClassNotFoundException:
> > com.google.common.base.Preconditions
> > at java.net.URLClassLoader$1.run(Unknown Source)
> > at java.net.URLClassLoader$1.run(Unknown Source)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(Unknown Source)
> > at java.lang.ClassLoader.loadClass(Unknown Source)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
> > at java.lang.ClassLoader.loadClass(Unknown Source)
> > ... 7 more
> >
> > -
> >
> > jar -tf output:
> >
> >
> > consb2@CONSB2A
> > /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib
> > $ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions
> > org/spark-project/guava/common/base/Preconditions.class
> > org/spark-project/guava/common/math/MathPreconditions.class
> > com/clearspring/analytics/util/Preconditions.class
> > parquet/Preconditions.class
> > com/google/inject/internal/util/$Preconditions.class
> >
> > ---
> >
> > Please help me in resolving this.
> >
> > Thanks,
> >   Shailesh
> >
> >
> >
> >
> > --
> > View this message in context: 
> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-com-google-common-base-Preconditions-java-lang-NoClassDefFoundErro-tp21271.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> 



What will happen if the Driver exits abnormally?

2015-01-20 Thread personal_email0
As title.
Is there some mechanism to recover to make the job can be completed? 

Any comments will be very appreciated.

Best Regards,
Anzhsoft



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



MapType in spark-sql

2015-01-20 Thread Kevin Jung
Hi all
How can I add MapType and ArrayType to schema when I create StructType
programmatically?
val schema =
  StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName,
StringType, true)))
above code from spark document works fine but if I change StringType to
MapType or ArrayType , it isn't compiled. Thanks in advance.

Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MapType-in-spark-sql-tp21274.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [SparkSQL] Try2: Parquet predicate pushdown troubles

2015-01-20 Thread Cheng Lian

Hey Yana,

Sorry for the late reply, missed this important thread somehow. And many 
thanks for reporting this. It turned out to be a bug — filter pushdown 
is only enabled when using client side metadata, which is not expected, 
because task side metadata code path is more performant. And I guess 
that the reason why setting |parquet.task.side.metadata| to |false| 
didn’t reduce input size for you is because you set the configuration 
with Spark API, or put it into |spark-defaults.conf|. This configuration 
goes to Hadoop |Configuration|, and Spark only merge properties whose 
names start with |spark.hadoop| into Hadoop |Configuration| instances. 
You may try to put |parquet.task.side.metadata| config into Hadoop 
|core-site.xml|, and then re-run the query. I can see significant 
differences by doing so.


I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for 
reporting all the details!


Cheng

On 1/13/15 12:56 PM, Yana Kadiyska wrote:

Attempting to bump this up in case someone can help out after all. I 
spent a few good hours stepping through the code today, so I'll 
summarize my observations both in hope I get some help and to help 
others that might be looking into this:


1. I am setting *spark.sql.parquet.**filterPushdown=true*
2. I can see by stepping through the driver debugger that 
PaquetTableOperations.execute sets the filters via 
ParquetInputFormat.setFilterPredicate (I checked the conf object, 
things appear OK there)
3. In FilteringParquetRowInputFormat, I get through the codepath for 
getTaskSideSplits. It seems that the codepath for getClientSideSplits 
would try to drop rowGroups but I don't see similar in getTaskSideSplit.


Does anyone have pointers on where to look after this? Where is 
rowgroup filtering happening in the case of getTaskSideSplits? I can 
attach to the executor but am not quite sure what code related to 
Parquet gets called executor side...also don't see any messages in the 
executor logs related to Filtering predicates.
For comparison, I went through the getClientSideSplits and can see 
that predicate pushdown works OK:

|sc.hadoopConfiguration.set("parquet.task.side.metadata","false")

15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side 
Metadata Split Strategy
15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 
1417384800)
15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row groups 
that do not pass filter predicate (28 %) !
|
​

Is it possible that this is just a UI bug? I can see Input=4G when 
using ("parquet.task.side.metadata","false") and Input=140G when using 
("parquet.task.side.metadata","true") but the runtimes are very 
comparable?


Inline image 1


JobId 4 is the ClientSide split, JobId 5 is the TaskSide split.



On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska > wrote:


I am running the following (connecting to an external Hive Metastore)

 /a/shark/spark/bin/spark-shell --master spark://ip:7077  --conf
*spark.sql.parquet.filterPushdown=true*

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

and then ran two queries:

|sqlContext.sql("select count(*) from table where partition='blah' ")
and
sqlContext.sql("select count(*) from table where partition='blah' and 
epoch=1415561604")
|

​

According to the Input tab in the UI both scan about 140G of data
which is the size of my whole partition. So I have two questions --

1. is there a way to tell from the plan if a predicate pushdown is
supposed to happen?
I see this for the second query

|res0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
  Exchange SinglePartition
   Aggregate true, [], [COUNT(1) AS PartialCount#49L]
OutputFaker []
 Project []
  ParquetTableScan [epoch#139L], (ParquetRelation 
|

​
2. am I doing something obviously wrong that this is not working?
(Im guessing it's not woring because the input size for the second
query shows unchanged and the execution time is almost 2x as long)

thanks in advance for any insights



​


Re: How to compute RDD[(String, Set[String])] that include large Set

2015-01-20 Thread jagaximo
Kevin (Sangwoo) Kim wrote
> If keys are not too many, 
> You can do like this:
> 
> val data = List(
>   ("A", Set(1,2,3)),
>   ("A", Set(1,2,4)),
>   ("B", Set(1,2,3))
> )
> val rdd = sc.parallelize(data)
> rdd.persist()
> 
> rdd.filter(_._1 == "A").flatMap(_._2).distinct.count
> rdd.filter(_._1 == "B").flatMap(_._2).distinct.count
> rdd.unpersist()
> 
> ==
> data: List[(String, scala.collection.mutable.Set[Int])] = List((A,Set(1,
> 2, 3)), (A,Set(1, 2, 4)), (B,Set(1, 2, 3)))
> rdd: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.Set[Int])]
> = ParallelCollectionRDD[6940] at parallelize at 
> 
> :66
> res332: rdd.type = ParallelCollectionRDD[6940] at parallelize at 
> 
> :66
> res334: Long = 4
> res335: Long = 3
> res336: rdd.type = ParallelCollectionRDD[6940] at parallelize at 
> 
> :66
> 
> Regards,
> Kevin

Wow, Got it! good solution
Fortunately, I know what keys have large size Set, I was able to adopt this
approach.

thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String-that-include-large-Set-tp21248p21275.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Shailesh Birari
Hi Frank,

Its a normal eclipse project where I added Scala and Spark libraries as
user libraries.
Though, I am not attaching any hadoop libraries, in my application code I
have following line.

  System.setProperty("hadoop.home.dir", "C:\\SB\\HadoopWin")

This Hadoop home dir contains "winutils.exe" only.

Don't think that its an issue.

Please suggest.

Thanks,
  Shailesh


On Wed, Jan 21, 2015 at 2:19 PM, Frank Austin Nothaft  wrote:

> Shailesh,
>
> To add, are you packaging Hadoop in your app? Hadoop will pull in Guava.
> Not sure if you are using Maven (or what) to build, but if you can pull up
> your builds dependency tree, you will likely find com.google.guava being
> brought in by one of your dependencies.
>
> Regards,
>
> Frank Austin Nothaft
> fnoth...@berkeley.edu
> fnoth...@eecs.berkeley.edu
> 202-340-0466
>
> On Jan 20, 2015, at 5:13 PM, Shailesh Birari  wrote:
>
> Hello,
>
> I double checked the libraries. I am linking only with Spark 1.2.
> Along with Spark 1.2 jars I have Scala 2.10 jars and JRE 7 jars linked and
> nothing else.
>
> Thanks,
>Shailesh
>
> On Wed, Jan 21, 2015 at 12:58 PM, Sean Owen  wrote:
>
>> Guava is shaded in Spark 1.2+. It looks like you are mixing versions
>> of Spark then, with some that still refer to unshaded Guava. Make sure
>> you are not packaging Spark with your app and that you don't have
>> other versions lying around.
>>
>> On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari 
>> wrote:
>> > Hello,
>> >
>> > I recently upgraded my setup from Spark 1.1 to Spark 1.2.
>> > My existing applications are working fine on ubuntu cluster.
>> > But, when I try to execute Spark MLlib application from Eclipse (Windows
>> > node) it gives java.lang.NoClassDefFoundError:
>> > com/google/common/base/Preconditions exception.
>> >
>> > Note,
>> >1. With Spark 1.1 this was working fine.
>> >2. The Spark 1.2 jar files are linked in Eclipse project.
>> >3. Checked the jar -tf output and found the above
>> com.google.common.base
>> > is not present.
>> >
>> >
>> -Exception
>> > log:
>> >
>> > Exception in thread "main" java.lang.NoClassDefFoundError:
>> > com/google/common/base/Preconditions
>> > at
>> >
>> org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:94)
>> > at
>> >
>> org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77)
>> > at
>> >
>> org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62)
>> > at
>> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
>> > at org.apache.spark.SparkContext.(SparkContext.scala:340)
>> > at
>> >
>> org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74)
>> > at
>> org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala)
>> > Caused by: java.lang.ClassNotFoundException:
>> > com.google.common.base.Preconditions
>> > at java.net.URLClassLoader$1.run(Unknown Source)
>> > at java.net.URLClassLoader$1.run(Unknown Source)
>> > at java.security.AccessController.doPrivileged(Native Method)
>> > at java.net.URLClassLoader.findClass(Unknown Source)
>> > at java.lang.ClassLoader.loadClass(Unknown Source)
>> > at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
>> > at java.lang.ClassLoader.loadClass(Unknown Source)
>> > ... 7 more
>> >
>> >
>> -
>> >
>> > jar -tf output:
>> >
>> >
>> > consb2@CONSB2A
>> > /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib
>> > $ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions
>> > org/spark-project/guava/common/base/Preconditions.class
>> > org/spark-project/guava/common/math/MathPreconditions.class
>> > com/clearspring/analytics/util/Preconditions.class
>> > parquet/Preconditions.class
>> > com/google/inject/internal/util/$Preconditions.class
>> >
>> >
>> ---
>> >
>> > Please help me in resolving this.
>> >
>> > Thanks,
>> >   Shailesh
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-com-google-common-base-Preconditions-java-lang-NoClassDefFoundErro-tp21271.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>
>


Re: Spark Streaming with Kafka

2015-01-20 Thread firemonk9
Hi,

   I am having similar issues. Have you found any resolution ?

Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p21276.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to compute RDD[(String, Set[String])] that include large Set

2015-01-20 Thread Kevin (Sangwoo) Kim
Great to hear you got solution!!
Cheers!

Kevin

On Wed Jan 21 2015 at 11:13:44 AM jagaximo 
wrote:

> Kevin (Sangwoo) Kim wrote
> > If keys are not too many,
> > You can do like this:
> >
> > val data = List(
> >   ("A", Set(1,2,3)),
> >   ("A", Set(1,2,4)),
> >   ("B", Set(1,2,3))
> > )
> > val rdd = sc.parallelize(data)
> > rdd.persist()
> >
> > rdd.filter(_._1 == "A").flatMap(_._2).distinct.count
> > rdd.filter(_._1 == "B").flatMap(_._2).distinct.count
> > rdd.unpersist()
> >
> > ==
> > data: List[(String, scala.collection.mutable.Set[Int])] = List((A,Set(1,
> > 2, 3)), (A,Set(1, 2, 4)), (B,Set(1, 2, 3)))
> > rdd: org.apache.spark.rdd.RDD[(String, scala.collection.mutable.Set[
> Int])]
> > = ParallelCollectionRDD[6940] at parallelize at
> > 
> > :66
> > res332: rdd.type = ParallelCollectionRDD[6940] at parallelize at
> > 
> > :66
> > res334: Long = 4
> > res335: Long = 3
> > res336: rdd.type = ParallelCollectionRDD[6940] at parallelize at
> > 
> > :66
> >
> > Regards,
> > Kevin
>
> Wow, Got it! good solution
> Fortunately, I know what keys have large size Set, I was able to adopt this
> approach.
>
> thanks!
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String-
> that-include-large-Set-tp21248p21275.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: MapType in spark-sql

2015-01-20 Thread Cheng Lian
You need to provide key type, value type for map type, element type for 
array type, and whether they contain null:


|StructType(Array(
  StructField("map_field",MapType(keyType =IntegerType, valueType =StringType, 
containsNull =true), nullable =true),
  StructField("array_field",ArrayType(elementType =DoubleType, containsNull 
=true), nullable =true)))
|

Cheng

On 1/20/15 5:50 PM, Kevin Jung wrote:


Hi all
How can I add MapType and ArrayType to schema when I create StructType
programmatically?
val schema =
   StructType(
 schemaString.split(" ").map(fieldName => StructField(fieldName,
StringType, true)))
above code from spark document works fine but if I change StringType to
MapType or ArrayType , it isn't compiled. Thanks in advance.

Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MapType-in-spark-sql-tp21274.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



​


Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Aaron Davidson
Spark's network-common package depends on guava as a "provided" dependency
in order to avoid conflicting with other libraries (e.g., Hadoop) that
depend on specific versions. com/google/common/base/Preconditions has been
present in Guava since version 2, so this is likely a "dependency not
found" rather than "wrong version of dependency" issue.

To resolve this, please depend on some version of Guava (14.0.1 is
guaranteed to work, as should any other version from the past few years).

On Tue, Jan 20, 2015 at 6:16 PM, Shailesh Birari 
wrote:

> Hi Frank,
>
> Its a normal eclipse project where I added Scala and Spark libraries as
> user libraries.
> Though, I am not attaching any hadoop libraries, in my application code I
> have following line.
>
>   System.setProperty("hadoop.home.dir", "C:\\SB\\HadoopWin")
>
> This Hadoop home dir contains "winutils.exe" only.
>
> Don't think that its an issue.
>
> Please suggest.
>
> Thanks,
>   Shailesh
>
>
> On Wed, Jan 21, 2015 at 2:19 PM, Frank Austin Nothaft <
> fnoth...@berkeley.edu> wrote:
>
>> Shailesh,
>>
>> To add, are you packaging Hadoop in your app? Hadoop will pull in Guava.
>> Not sure if you are using Maven (or what) to build, but if you can pull up
>> your builds dependency tree, you will likely find com.google.guava being
>> brought in by one of your dependencies.
>>
>> Regards,
>>
>> Frank Austin Nothaft
>> fnoth...@berkeley.edu
>> fnoth...@eecs.berkeley.edu
>> 202-340-0466
>>
>> On Jan 20, 2015, at 5:13 PM, Shailesh Birari  wrote:
>>
>> Hello,
>>
>> I double checked the libraries. I am linking only with Spark 1.2.
>> Along with Spark 1.2 jars I have Scala 2.10 jars and JRE 7 jars linked
>> and nothing else.
>>
>> Thanks,
>>Shailesh
>>
>> On Wed, Jan 21, 2015 at 12:58 PM, Sean Owen  wrote:
>>
>>> Guava is shaded in Spark 1.2+. It looks like you are mixing versions
>>> of Spark then, with some that still refer to unshaded Guava. Make sure
>>> you are not packaging Spark with your app and that you don't have
>>> other versions lying around.
>>>
>>> On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari 
>>> wrote:
>>> > Hello,
>>> >
>>> > I recently upgraded my setup from Spark 1.1 to Spark 1.2.
>>> > My existing applications are working fine on ubuntu cluster.
>>> > But, when I try to execute Spark MLlib application from Eclipse
>>> (Windows
>>> > node) it gives java.lang.NoClassDefFoundError:
>>> > com/google/common/base/Preconditions exception.
>>> >
>>> > Note,
>>> >1. With Spark 1.1 this was working fine.
>>> >2. The Spark 1.2 jar files are linked in Eclipse project.
>>> >3. Checked the jar -tf output and found the above
>>> com.google.common.base
>>> > is not present.
>>> >
>>> >
>>> -Exception
>>> > log:
>>> >
>>> > Exception in thread "main" java.lang.NoClassDefFoundError:
>>> > com/google/common/base/Preconditions
>>> > at
>>> >
>>> org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:94)
>>> > at
>>> >
>>> org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77)
>>> > at
>>> >
>>> org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62)
>>> > at
>>> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
>>> > at org.apache.spark.SparkContext.(SparkContext.scala:340)
>>> > at
>>> >
>>> org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74)
>>> > at
>>> org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala)
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > com.google.common.base.Preconditions
>>> > at java.net.URLClassLoader$1.run(Unknown Source)
>>> > at java.net.URLClassLoader$1.run(Unknown Source)
>>> > at java.security.AccessController.doPrivileged(Native Method)
>>> > at java.net.URLClassLoader.findClass(Unknown Source)
>>> > at java.lang.ClassLoader.loadClass(Unknown Source)
>>> > at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
>>> > at java.lang.ClassLoader.loadClass(Unknown Source)
>>> > ... 7 more
>>> >
>>> >
>>> -
>>> >
>>> > jar -tf output:
>>> >
>>> >
>>> > consb2@CONSB2A
>>> > /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib
>>> > $ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions
>>> > org/spark-project/guava/common/base/Preconditions.class
>>> > org/spark-project/guava/common/math/MathPreconditions.class
>>> > com/clearspring/analytics/util/Preconditions.class
>>> > parquet/Preconditions.class
>>> > com/google/inject/internal/util/$Preconditions.class
>>> >
>>> >
>>> 

Task result deserialization error (1.1.0)

2015-01-20 Thread Dmitriy Lyubimov
Hi,

I am getting task result deserialization error (kryo is enabled). Is it
some sort of `chill` registration issue at front end?

This is application that lists spark as maven dependency (so it gets
correct hadoop and chill dependencies in classpath, i checked).

Thanks in advance.

15/01/20 18:21:51 ERROR TaskResultGetter: Exception while getting task
result
java.lang.ArrayStoreException: scala.Tuple1
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at


RE: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Bob Tiernay
If using Maven, one simply use whatever version they prefer and at build time 
and the artifact using something like:

org.apache.maven.plugins
maven-shade-plugin  
package  
shade

com.google.common  
org.shaded.google.common
  


  




Of course this won't help during development if there are conflicts.

From: ilike...@gmail.com
Date: Tue, 20 Jan 2015 18:26:32 -0800
Subject: Re: Spark 1.2 - com/google/common/base/Preconditions 
java.lang.NoClassDefFoundErro
To: sbirar...@gmail.com
CC: fnoth...@berkeley.edu; so...@cloudera.com; user@spark.apache.org

Spark's network-common package depends on guava as a "provided" dependency in 
order to avoid conflicting with other libraries (e.g., Hadoop) that depend on 
specific versions. com/google/common/base/Preconditions has been present in 
Guava since version 2, so this is likely a "dependency not found" rather than 
"wrong version of dependency" issue.
To resolve this, please depend on some version of Guava (14.0.1 is guaranteed 
to work, as should any other version from the past few years).
On Tue, Jan 20, 2015 at 6:16 PM, Shailesh Birari  wrote:
Hi Frank,
Its a normal eclipse project where I added Scala and Spark libraries as user 
libraries.Though, I am not attaching any hadoop libraries, in my application 
code I have following line.
  System.setProperty("hadoop.home.dir", "C:\\SB\\HadoopWin")

This Hadoop home dir contains "winutils.exe" only.
Don't think that its an issue.
Please suggest.
Thanks,  Shailesh

On Wed, Jan 21, 2015 at 2:19 PM, Frank Austin Nothaft  
wrote:
Shailesh,
To add, are you packaging Hadoop in your app? Hadoop will pull in Guava. Not 
sure if you are using Maven (or what) to build, but if you can pull up your 
builds dependency tree, you will likely find com.google.guava being brought in 
by one of your dependencies.
Regards,

Frank Austin Nothaftfnothaft@berkeley.edufnothaft@eecs.berkeley.edu202-340-0466



On Jan 20, 2015, at 5:13 PM, Shailesh Birari  wrote:
Hello,
I double checked the libraries. I am linking only with Spark 1.2.Along with 
Spark 1.2 jars I have Scala 2.10 jars and JRE 7 jars linked and nothing else.
Thanks,   Shailesh
On Wed, Jan 21, 2015 at 12:58 PM, Sean Owen  wrote:
Guava is shaded in Spark 1.2+. It looks like you are mixing versions

of Spark then, with some that still refer to unshaded Guava. Make sure

you are not packaging Spark with your app and that you don't have

other versions lying around.



On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari  wrote:

> Hello,

>

> I recently upgraded my setup from Spark 1.1 to Spark 1.2.

> My existing applications are working fine on ubuntu cluster.

> But, when I try to execute Spark MLlib application from Eclipse (Windows

> node) it gives java.lang.NoClassDefFoundError:

> com/google/common/base/Preconditions exception.

>

> Note,

>1. With Spark 1.1 this was working fine.

>2. The Spark 1.2 jar files are linked in Eclipse project.

>3. Checked the jar -tf output and found the above com.google.common.base

> is not present.

>

> -Exception

> log:

>

> Exception in thread "main" java.lang.NoClassDefFoundError:

> com/google/common/base/Preconditions

> at

> org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:94)

> at

> org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77)

> at

> org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62)

> at 
> org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)

> at org.apache.spark.SparkContext.(SparkContext.scala:340)

> at

> org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74)

> at 
> org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala)

> Caused by: java.lang.ClassNotFoundException:

> com.google.common.base.Preconditions

> at java.net.URLClassLoader$1.run(Unknown Source)

> at java.net.URLClassLoader$1.run(Unknown Source)

> at java.security.AccessController.doPrivileged(Native Method)

> at java.net.URLClassLoader.findClass(Unknown Source)

> at java.lang.ClassLoader.loadClass(Unknown Source)

> at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)

> at java.lang.ClassLoader.loadClass(Unknown Source)

> ... 7 more

>

> -

>

> jar -tf output:

>

>

> consb2@CONSB2A

> /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib

> $ jar -tf spark

Re: Spark 1.2 - com/google/common/base/Preconditions java.lang.NoClassDefFoundErro

2015-01-20 Thread Shailesh Birari
Thanks Aaron.

Adding Guava jar resolves the issue.

Shailesh

On Wed, Jan 21, 2015 at 3:26 PM, Aaron Davidson  wrote:

> Spark's network-common package depends on guava as a "provided" dependency
> in order to avoid conflicting with other libraries (e.g., Hadoop) that
> depend on specific versions. com/google/common/base/Preconditions has been
> present in Guava since version 2, so this is likely a "dependency not
> found" rather than "wrong version of dependency" issue.
>
> To resolve this, please depend on some version of Guava (14.0.1 is
> guaranteed to work, as should any other version from the past few years).
>
> On Tue, Jan 20, 2015 at 6:16 PM, Shailesh Birari 
> wrote:
>
>> Hi Frank,
>>
>> Its a normal eclipse project where I added Scala and Spark libraries as
>> user libraries.
>> Though, I am not attaching any hadoop libraries, in my application code I
>> have following line.
>>
>>   System.setProperty("hadoop.home.dir", "C:\\SB\\HadoopWin")
>>
>> This Hadoop home dir contains "winutils.exe" only.
>>
>> Don't think that its an issue.
>>
>> Please suggest.
>>
>> Thanks,
>>   Shailesh
>>
>>
>> On Wed, Jan 21, 2015 at 2:19 PM, Frank Austin Nothaft <
>> fnoth...@berkeley.edu> wrote:
>>
>>> Shailesh,
>>>
>>> To add, are you packaging Hadoop in your app? Hadoop will pull in Guava.
>>> Not sure if you are using Maven (or what) to build, but if you can pull up
>>> your builds dependency tree, you will likely find com.google.guava being
>>> brought in by one of your dependencies.
>>>
>>> Regards,
>>>
>>> Frank Austin Nothaft
>>> fnoth...@berkeley.edu
>>> fnoth...@eecs.berkeley.edu
>>> 202-340-0466
>>>
>>> On Jan 20, 2015, at 5:13 PM, Shailesh Birari 
>>> wrote:
>>>
>>> Hello,
>>>
>>> I double checked the libraries. I am linking only with Spark 1.2.
>>> Along with Spark 1.2 jars I have Scala 2.10 jars and JRE 7 jars linked
>>> and nothing else.
>>>
>>> Thanks,
>>>Shailesh
>>>
>>> On Wed, Jan 21, 2015 at 12:58 PM, Sean Owen  wrote:
>>>
 Guava is shaded in Spark 1.2+. It looks like you are mixing versions
 of Spark then, with some that still refer to unshaded Guava. Make sure
 you are not packaging Spark with your app and that you don't have
 other versions lying around.

 On Tue, Jan 20, 2015 at 11:55 PM, Shailesh Birari 
 wrote:
 > Hello,
 >
 > I recently upgraded my setup from Spark 1.1 to Spark 1.2.
 > My existing applications are working fine on ubuntu cluster.
 > But, when I try to execute Spark MLlib application from Eclipse
 (Windows
 > node) it gives java.lang.NoClassDefFoundError:
 > com/google/common/base/Preconditions exception.
 >
 > Note,
 >1. With Spark 1.1 this was working fine.
 >2. The Spark 1.2 jar files are linked in Eclipse project.
 >3. Checked the jar -tf output and found the above
 com.google.common.base
 > is not present.
 >
 >
 -Exception
 > log:
 >
 > Exception in thread "main" java.lang.NoClassDefFoundError:
 > com/google/common/base/Preconditions
 > at
 >
 org.apache.spark.network.client.TransportClientFactory.(TransportClientFactory.java:94)
 > at
 >
 org.apache.spark.network.TransportContext.createClientFactory(TransportContext.java:77)
 > at
 >
 org.apache.spark.network.netty.NettyBlockTransferService.init(NettyBlockTransferService.scala:62)
 > at
 org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
 > at
 org.apache.spark.SparkContext.(SparkContext.scala:340)
 > at
 >
 org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74)
 > at
 org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala)
 > Caused by: java.lang.ClassNotFoundException:
 > com.google.common.base.Preconditions
 > at java.net.URLClassLoader$1.run(Unknown Source)
 > at java.net.URLClassLoader$1.run(Unknown Source)
 > at java.security.AccessController.doPrivileged(Native Method)
 > at java.net.URLClassLoader.findClass(Unknown Source)
 > at java.lang.ClassLoader.loadClass(Unknown Source)
 > at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
 > at java.lang.ClassLoader.loadClass(Unknown Source)
 > ... 7 more
 >
 >
 -
 >
 > jar -tf output:
 >
 >
 > consb2@CONSB2A
 > /cygdrive/c/SB/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib
 > $ jar -tf spark-assembly-1.2.0-hadoop2.4.0.jar | grep Preconditions
 > org/spark-project/guava/common/base/Preconditions.class
 > org/spark-project/guava/common/math/MathPreconditions.class
>>>

RE: dynamically change receiver for a spark stream

2015-01-20 Thread Shao, Saisai
Hi,

I don't think current Spark Streaming support this feature, all the DStream 
lineage is fixed after the context is started.

Also stopping a stream is not supported, instead currently we need to stop the 
whole streaming context to meet what you want.

Thanks
Saisai

-Original Message-
From: jamborta [mailto:jambo...@gmail.com] 
Sent: Wednesday, January 21, 2015 3:09 AM
To: user@spark.apache.org
Subject: dynamically change receiver for a spark stream

Hi all,

we have been trying to setup a stream using a custom receiver that would pick 
up data from sql databases. we'd like to keep that stream context running and 
dynamically change the streams on demand, adding and removing streams based on 
demand. alternativel, if a stream is fixed, is it possible to stop a stream, 
change to config and start again? 

thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark sc.textFile uses only 4 out of 32 threads per node

2015-01-20 Thread Nicholas Chammas
Are the gz files roughly equal in size? Do you know that your partitions
are roughly balanced? Perhaps some cores get assigned tasks that end very
quickly, while others get most of the work.

On Sat Jan 17 2015 at 2:02:49 AM Gautham Anil 
wrote:

> Hi,
>
> Thanks for getting back to me. Sorry for the delay. I am still having
> this issue.
>
> @sun: To clarify, The machine actually has 16 usable threads and the
> job has more than 100 gzip files. So, there are enough partitions to
> use all threads.
>
> @nicholas: The number of partitions match the number of files: > 100.
>
> @Sebastian: I understand the lazy loading behavior. For this reason, I
> usually use a .count() to force the transformation (.first() will not
> be enough). Still, during the transformation, only 4 cores are used
> for processing the input files.
>
> I don't know if this issue is noticed by other people. Can anyone
> reproduce it with v1.1?
>
>
> On Wed, Dec 17, 2014 at 2:14 AM, Nicholas Chammas
>  wrote:
> > Rui is correct.
> >
> > Check how many partitions your RDD has after loading the gzipped files.
> e.g.
> > rdd.getNumPartitions().
> >
> > If that number is way less than the number of cores in your cluster (in
> your
> > case I suspect the number is 4), then explicitly repartition the RDD to
> > match the number of cores in your cluster, or some multiple thereof.
> >
> > For example:
> >
> > new_rdd = rdd.repartition(sc.defaultParallelism * 3)
> >
> > Operations on new_rdd should utilize all the cores in your cluster.
> >
> > Nick
> >
> >
> > On Wed Dec 17 2014 at 1:42:16 AM Sun, Rui  wrote:
> >>
> >> Gautham,
> >>
> >> How many number of gz files do you have?  Maybe the reason is that gz
> file
> >> is compressed that can't be splitted for processing by Mapreduce. A
> single
> >> gz  file can only be processed by a single Mapper so that the CPU treads
> >> can't be fully utilized.
> >>
> >> -Original Message-
> >> From: Gautham [mailto:gautham.a...@gmail.com]
> >> Sent: Wednesday, December 10, 2014 3:00 AM
> >> To: u...@spark.incubator.apache.org
> >> Subject: pyspark sc.textFile uses only 4 out of 32 threads per node
> >>
> >> I am having an issue with pyspark launched in ec2 (using spark-ec2)
> with 5
> >> r3.4xlarge machines where each has 32 threads and 240GB of RAM. When I
> do
> >> sc.textFile to load data from a number of gz files, it does not
> progress as
> >> fast as expected. When I log-in to a child node and run top, I see only
> 4
> >> threads at 100 cpu. All remaining 28 cores were idle. This is not an
> issue
> >> when processing the strings after loading, when all the cores are used
> to
> >> process the data.
> >>
> >> Please help me with this? What setting can be changed to get the CPU
> usage
> >> back up to full?
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-
> sc-textFile-uses-only-4-out-of-32-threads-per-node-tp20595.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
> additional
> >> commands, e-mail: user-h...@spark.apache.org
> >>
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
>
>
> --
> Gautham Anil
>
> "The first principle is that you must not fool yourself. And you are
> the easiest person to fool" - Richard P. Feynman
>


Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-20 Thread TJ Klein
Hi,

I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
PySpark. Initially, I was super glad, noticing that Spark 1.2 is way faster
than Spark 1.1. However, the initial joy faded quickly when I noticed that
all my stuff didn't successfully terminate operations anymore. Using Spark
1.1 it still works perfectly fine, though. 
Specifically, the execution just freezes without any error output at one
point, when calling a joint map() and collect() statement (after having it
called many times successfully before in a loop).

Any clue? Or do I have to wait for the next version?

Best,
 Tassilo



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RangePartitioner

2015-01-20 Thread Rishi Yadav
I am joining two tables as below, the program stalls at below log line and 
never proceeds.
What might be the issue and possible solution?


>>> INFO SparkContext: Starting job: RangePartitioner at Exchange.scala:79


Table 1 has  450 columns
Table2 has  100 columns


Both tables have few million rows




            val table1= myTable1.as('table1)
            val table2= myTable2.as('table2)
            val results= table1.join(table2,LeftOuter,Some("table1.Id".attr === 
"table2.id".attr ))




           println(results.count())

Thanks and Regards,
Rishi
@meditativesoul

Fwd: [Spark Streaming] The FileInputDStream newFilesOnly=false does not work in 1.2 since

2015-01-20 Thread Terry Hole
Hi,

I am trying to move from 1.1 to 1.2 and found that the newFilesOnly=false
(Intend to include old files) does not work anymore. It works great in 1.1,
this should be introduced by the last change of this class.



Does this flag behavior change or is it a regression?

Issue should be caused by this code:
>From line 157 in FileInputDStream.scala
val modTimeIgnoreThreshold = math.max(
initialModTimeIgnoreThreshold,   // initial threshold based on
newFilesOnly setting
currentTime - durationToRemember.milliseconds  // trailing end of
the remember window
  )


Regards

- Terry


KNN for large data set

2015-01-20 Thread DEVAN M.S.
Hi all,

Please help me to find out best way for K-nearest neighbor using spark for
large data sets.


Re: How to share a NonSerializable variable among tasks in the same worker node?

2015-01-20 Thread Fengyun RAO
currently we migrate from 1.1 to 1.2, and found our program 3x slower,
maybe due to the singleton hack?

could you explain in detail why or how "The singleton hack works very
different in spark 1.2.0 "

thanks!

2015-01-18 20:56 GMT+08:00 octavian.ganea :

> The singleton hack works very different in spark 1.2.0 (it does not work if
> the program has multiple map-reduce jobs in the same program). I guess
> there
> should be an official documentation on how to have each machine/node do an
> init step locally before executing any other instructions (e.g. loading
> locally a very big object once at the begining that can be used in all
> further map jobs that will be assigned to that worker).
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21219.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-20 Thread Davies Liu
Could you provide a short script to reproduce this issue?

On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein  wrote:
> Hi,
>
> I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
> PySpark. Initially, I was super glad, noticing that Spark 1.2 is way faster
> than Spark 1.1. However, the initial joy faded quickly when I noticed that
> all my stuff didn't successfully terminate operations anymore. Using Spark
> 1.1 it still works perfectly fine, though.
> Specifically, the execution just freezes without any error output at one
> point, when calling a joint map() and collect() statement (after having it
> called many times successfully before in a loop).
>
> Any clue? Or do I have to wait for the next version?
>
> Best,
>  Tassilo
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark Streaming] The FileInputDStream newFilesOnly=false does not work in 1.2 since

2015-01-20 Thread Sean Owen
See also SPARK-3276 and SPARK-3553. Can you say more about the
problem? what are the file timestamps, what happens when you run, what
log messages if any are relevant. I do not expect there was any
intended behavior change.

On Wed, Jan 21, 2015 at 5:17 AM, Terry Hole  wrote:
> Hi,
>
> I am trying to move from 1.1 to 1.2 and found that the newFilesOnly=false
> (Intend to include old files) does not work anymore. It works great in 1.1,
> this should be introduced by the last change of this class.
>
>
>
> Does this flag behavior change or is it a regression?
>
> Issue should be caused by this code:
> From line 157 in FileInputDStream.scala
> val modTimeIgnoreThreshold = math.max(
> initialModTimeIgnoreThreshold,   // initial threshold based on
> newFilesOnly setting
> currentTime - durationToRemember.milliseconds  // trailing end of
> the remember window
>   )
>
>
> Regards
>
> - Terry
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark 1.2 three times slower than spark 1.1, why?

2015-01-20 Thread Fengyun RAO
Currently we are migrating from spark 1.1 to spark 1.2, but found the
program 3x slower, with nothing else changed.
note: our program in spark 1.1 has successfully processed a whole year
data, quite stable.

the main script is as below

sc.textFile(inputPath)
.flatMap(line => LogParser.parseLine(line))
.groupByKey(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex(...)
.foreach(_ => {})

where LogParser is a singleton which may take some time to initialized and
is shared across the execuator.

the flatMap stage is 3x slower.

We tried to change spark.shuffle.manager back to hash, and
spark.shuffle.blockTransferService back to nio, but didn’t help.

May somebody explain possible causes, or what should we test or change to
find it out
​


Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-20 Thread Davies Liu
Maybe some change related to serialize the closure cause LogParser is
not a singleton any more, then it is initialized for every task.

Could you change it to a Broadcast?

On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO  wrote:
> Currently we are migrating from spark 1.1 to spark 1.2, but found the
> program 3x slower, with nothing else changed.
> note: our program in spark 1.1 has successfully processed a whole year data,
> quite stable.
>
> the main script is as below
>
> sc.textFile(inputPath)
> .flatMap(line => LogParser.parseLine(line))
> .groupByKey(new HashPartitioner(numPartitions))
> .mapPartitionsWithIndex(...)
> .foreach(_ => {})
>
> where LogParser is a singleton which may take some time to initialized and
> is shared across the execuator.
>
> the flatMap stage is 3x slower.
>
> We tried to change spark.shuffle.manager back to hash, and
> spark.shuffle.blockTransferService back to nio, but didn’t help.
>
> May somebody explain possible causes, or what should we test or change to
> find it out

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-20 Thread Tassilo Klein
Hi,

It's a bit of a longer script that runs some deep learning training.
Therefore it is a bit hard to wrap up easily.

Essentially I am having a loop, in which a gradient is computed on each
node and collected (this is where it freezes at some point).

 grads = zipped_trainData.map(distributed_gradient_computation).collect()


The distributed_gradient_computation mainly contains a Theano derived
function. The theano function itself is a broadcast variable.

Let me know if you need more information.

Best,
 Tassilo

On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu  wrote:

> Could you provide a short script to reproduce this issue?
>
> On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein  wrote:
> > Hi,
> >
> > I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
> > PySpark. Initially, I was super glad, noticing that Spark 1.2 is way
> faster
> > than Spark 1.1. However, the initial joy faded quickly when I noticed
> that
> > all my stuff didn't successfully terminate operations anymore. Using
> Spark
> > 1.1 it still works perfectly fine, though.
> > Specifically, the execution just freezes without any error output at one
> > point, when calling a joint map() and collect() statement (after having
> it
> > called many times successfully before in a loop).
> >
> > Any clue? Or do I have to wait for the next version?
> >
> > Best,
> >  Tassilo
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-20 Thread Fengyun RAO
the LogParser instance is not serializable, and thus cannot be a broadcast,

what’s worse, it contains an LRU cache, which is essential to the
performance, and we would like to share among all the tasks on the same
node.

If it is the case, what’s the recommended way to share a variable among all
the tasks within the same executor.
​

2015-01-21 15:04 GMT+08:00 Davies Liu :

> Maybe some change related to serialize the closure cause LogParser is
> not a singleton any more, then it is initialized for every task.
>
> Could you change it to a Broadcast?
>
> On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO 
> wrote:
> > Currently we are migrating from spark 1.1 to spark 1.2, but found the
> > program 3x slower, with nothing else changed.
> > note: our program in spark 1.1 has successfully processed a whole year
> data,
> > quite stable.
> >
> > the main script is as below
> >
> > sc.textFile(inputPath)
> > .flatMap(line => LogParser.parseLine(line))
> > .groupByKey(new HashPartitioner(numPartitions))
> > .mapPartitionsWithIndex(...)
> > .foreach(_ => {})
> >
> > where LogParser is a singleton which may take some time to initialized
> and
> > is shared across the execuator.
> >
> > the flatMap stage is 3x slower.
> >
> > We tried to change spark.shuffle.manager back to hash, and
> > spark.shuffle.blockTransferService back to nio, but didn’t help.
> >
> > May somebody explain possible causes, or what should we test or change to
> > find it out
>


Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-20 Thread Davies Liu
Could you try to disable the new feature of reused worker by:
spark.python.worker.reuse = false

On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein  wrote:
> Hi,
>
> It's a bit of a longer script that runs some deep learning training.
> Therefore it is a bit hard to wrap up easily.
>
> Essentially I am having a loop, in which a gradient is computed on each node
> and collected (this is where it freezes at some point).
>
>  grads = zipped_trainData.map(distributed_gradient_computation).collect()
>
>
> The distributed_gradient_computation mainly contains a Theano derived
> function. The theano function itself is a broadcast variable.
>
> Let me know if you need more information.
>
> Best,
>  Tassilo
>
> On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu  wrote:
>>
>> Could you provide a short script to reproduce this issue?
>>
>> On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein  wrote:
>> > Hi,
>> >
>> > I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
>> > PySpark. Initially, I was super glad, noticing that Spark 1.2 is way
>> > faster
>> > than Spark 1.1. However, the initial joy faded quickly when I noticed
>> > that
>> > all my stuff didn't successfully terminate operations anymore. Using
>> > Spark
>> > 1.1 it still works perfectly fine, though.
>> > Specifically, the execution just freezes without any error output at one
>> > point, when calling a joint map() and collect() statement (after having
>> > it
>> > called many times successfully before in a loop).
>> >
>> > Any clue? Or do I have to wait for the next version?
>> >
>> > Best,
>> >  Tassilo
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Closing over a var with changing value in Streaming application

2015-01-20 Thread Tobias Pfeiffer
Hi,

I am developing a Spark Streaming application where I want every item in my
stream to be assigned a unique, strictly increasing Long. My input data
already has RDD-local integers (from 0 to N-1) assigned, so I am doing the
following:

  var totalNumberOfItems = 0L
  // update the keys of the stream data
  val globallyIndexedItems = inputStream.map(keyVal =>
  (keyVal._1 + totalNumberOfItems, keyVal._2))
  // increase the number of total seen items
  inputStream.foreachRDD(rdd => {
totalNumberOfItems += rdd.count
  })

Now this works on my local[*] Spark instance, but I was wondering if this
is actually an ok thing to do. I don't want this to break when going to a
YARN cluster...

The function increasing totalNumberOfItems is closing over a var and
running in the driver, so I think this is ok. Here is my concern: What
about the function in the inputStream.map(...) block? This one is closing
over a var that has a different value in every interval. Will the closure
be serialized with that new value in every interval? Or only once with the
initial value and this will always be 0 during the runtime of the program?

As I said, it works locally, but I was wondering if I can really assume
that the closure is serialized with a new value in every interval.

Thanks,
Tobias


Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-20 Thread Sean Owen
I don't know of any reason to think the singleton pattern doesn't work or
works differently. I wonder if, for example, task scheduling is different
in 1.2 and you have more partitions across more workers and so are loading
more copies more slowly into your singletons.
On Jan 21, 2015 7:13 AM, "Fengyun RAO"  wrote:

> the LogParser instance is not serializable, and thus cannot be a
> broadcast,
>
> what’s worse, it contains an LRU cache, which is essential to the
> performance, and we would like to share among all the tasks on the same
> node.
>
> If it is the case, what’s the recommended way to share a variable among
> all the tasks within the same executor.
> ​
>
> 2015-01-21 15:04 GMT+08:00 Davies Liu :
>
>> Maybe some change related to serialize the closure cause LogParser is
>> not a singleton any more, then it is initialized for every task.
>>
>> Could you change it to a Broadcast?
>>
>> On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO 
>> wrote:
>> > Currently we are migrating from spark 1.1 to spark 1.2, but found the
>> > program 3x slower, with nothing else changed.
>> > note: our program in spark 1.1 has successfully processed a whole year
>> data,
>> > quite stable.
>> >
>> > the main script is as below
>> >
>> > sc.textFile(inputPath)
>> > .flatMap(line => LogParser.parseLine(line))
>> > .groupByKey(new HashPartitioner(numPartitions))
>> > .mapPartitionsWithIndex(...)
>> > .foreach(_ => {})
>> >
>> > where LogParser is a singleton which may take some time to initialized
>> and
>> > is shared across the execuator.
>> >
>> > the flatMap stage is 3x slower.
>> >
>> > We tried to change spark.shuffle.manager back to hash, and
>> > spark.shuffle.blockTransferService back to nio, but didn’t help.
>> >
>> > May somebody explain possible causes, or what should we test or change
>> to
>> > find it out
>>
>
>


Re: Closing over a var with changing value in Streaming application

2015-01-20 Thread Akhil Das
How about using accumulators
?

Thanks
Best Regards

On Wed, Jan 21, 2015 at 12:53 PM, Tobias Pfeiffer  wrote:

> Hi,
>
> I am developing a Spark Streaming application where I want every item in
> my stream to be assigned a unique, strictly increasing Long. My input data
> already has RDD-local integers (from 0 to N-1) assigned, so I am doing the
> following:
>
>   var totalNumberOfItems = 0L
>   // update the keys of the stream data
>   val globallyIndexedItems = inputStream.map(keyVal =>
>   (keyVal._1 + totalNumberOfItems, keyVal._2))
>   // increase the number of total seen items
>   inputStream.foreachRDD(rdd => {
> totalNumberOfItems += rdd.count
>   })
>
> Now this works on my local[*] Spark instance, but I was wondering if this
> is actually an ok thing to do. I don't want this to break when going to a
> YARN cluster...
>
> The function increasing totalNumberOfItems is closing over a var and
> running in the driver, so I think this is ok. Here is my concern: What
> about the function in the inputStream.map(...) block? This one is closing
> over a var that has a different value in every interval. Will the closure
> be serialized with that new value in every interval? Or only once with the
> initial value and this will always be 0 during the runtime of the program?
>
> As I said, it works locally, but I was wondering if I can really assume
> that the closure is serialized with a new value in every interval.
>
> Thanks,
> Tobias
>
>


Re: Closing over a var with changing value in Streaming application

2015-01-20 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 21, 2015 at 4:46 PM, Akhil Das 
wrote:

> How about using accumulators
> ?
>

As far as I understand, they solve the part of the problem that I am not
worried about, namely increasing the counter. I was more worried about
getting that counter/accumulator value back to the executors.

Thanks
Tobias


  1   2   >