Re: how to us DataFrame.na.fill based on condition

2015-11-23 Thread Davies Liu
DataFrame.replace(to_replace, value, subset=None)

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace

On Mon, Nov 23, 2015 at 11:05 AM, Vishnu Viswanath
 wrote:
> Hi
>
> Can someone tell me if there is a way I can use the fill method in
> DataFrameNaFunctions based on some condition.
>
> e.g., df.na.fill("value1","column1","condition1")
> df.na.fill("value2","column1","condition2")
>
> i want to fill nulls in column1 with values - either value 1 or value 2,
> based on some condition.
>
> Thanks,

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to us DataFrame.na.fill based on condition

2015-11-23 Thread Vishnu Viswanath
Hi

Can someone tell me if there is a way I can use the fill method
in DataFrameNaFunctions based on some condition.

e.g., df.na.fill("value1","column1","condition1")
df.na.fill("value2","column1","condition2")

i want to fill nulls in column1 with values - either value 1 or value 2,
based on some condition.

Thanks,


Re: Please add us to the Powered by Spark page

2015-11-23 Thread Sujit Pal
Sorry to be a nag, I realize folks with edit rights on the Powered by Spark
page are very busy people, but its been 10 days since my original request,
was wondering if maybe it just fell through the cracks. If I should submit
via some other channel that will make sure it is looked at (or better yet,
a self service option), please let me know and I will do so.

Here is the information again.

Organization Name: Elsevier Labs
URL: http://labs.elsevier.com
Spark components: Spark Core, Spark SQL, MLLib, GraphX.
Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as a
Service, Content and Event Analytics, Content/Event based Predictive Models
and Big Data Processing. We use Scala and Python over Databricks Notebooks
for most of our work.

Thanks very much,
Sujit

On Fri, Nov 13, 2015 at 9:21 AM, Sujit Pal  wrote:

> Hello,
>
> We have been using Spark at Elsevier Labs for a while now. Would love to
> be added to the “Powered By Spark” page.
>
> Organization Name: Elsevier Labs
> URL: http://labs.elsevier.com
> Spark components: Spark Core, Spark SQL, MLLib, GraphX.
> Use Case: Building Machine Reading Pipeline, Knowledge Graphs, Content as
> a Service, Content and Event Analytics, Content/Event based Predictive
> Models and Big Data Processing. We use Scala and Python over Databricks
> Notebooks for most of our work.
>
> Thanks very much,
>
> Sujit Pal
> Technical Research Director
> Elsevier Labs
> sujit@elsevier.com
>
>
>


Re: how to us DataFrame.na.fill based on condition

2015-11-23 Thread Vishnu Viswanath
Thanks for the reply Davies

I think replace, replaces a value with another value. But what I want to do
is fill in the null value of a column.( I don't have a to_replace here )

Regards,
Vishnu

On Mon, Nov 23, 2015 at 1:37 PM, Davies Liu  wrote:

> DataFrame.replace(to_replace, value, subset=None)
>
>
> http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace
>
> On Mon, Nov 23, 2015 at 11:05 AM, Vishnu Viswanath
>  wrote:
> > Hi
> >
> > Can someone tell me if there is a way I can use the fill method in
> > DataFrameNaFunctions based on some condition.
> >
> > e.g., df.na.fill("value1","column1","condition1")
> > df.na.fill("value2","column1","condition2")
> >
> > i want to fill nulls in column1 with values - either value 1 or value 2,
> > based on some condition.
> >
> > Thanks,
>



-- 
Thanks and Regards,
Vishnu Viswanath
+1 309 550 2311
*www.vishnuviswanath.com *


Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Andy Davidson
Hi Sabarish

I am but a simple padawan :-) I do not understand your answer. Why would
Spark be creating so many empty partitions? My real problem is my
application is very slow. I happened to notice thousands of empty files
being created. I thought this is a hint to why my app is slow.

My program calls sample( 0.01).filter(not null).saveAsTextFile(). This takes
about 35 min, to scan 500,000 JSON strings and write 5000 to disk. The total
data writing in 38M.

The data is read from HDFS. My understanding is Spark can not know in
advance how HDFS partitioned the data. Spark knows I have a master and 3
slaves machines. It knows how many works/executors are assigned to my Job. I
would expect spark would be smart enough not create more partitions than I
have worker machines?

Also given I am not using any key/value operations like Join() or doing
multiple scans I would assume my app would not benefit from partitioning.


Kind regards

Andy


From:  Sabarish Sasidharan 
Date:  Saturday, November 21, 2015 at 7:20 PM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: newbie : why are thousands of empty files being created on
HDFS?

> 
> Those are empty partitions. I don't see the number of partitions specified in
> code. That then implies the default parallelism config is being used and is
> set to a very high number, the sum of empty + non empty files.
> 
> Regards
> Sab
> 
> On 21-Nov-2015 11:59 pm, "Andy Davidson" 
> wrote:
>> I start working on a very simple ETL pipeline for a POC. It reads a in a data
>> set of tweets stored as JSON strings on in HDFS and randomly selects 1% of
>> the observations and writes them to HDFS. It seems to run very slowly. E.G.
>> To write 4720 observations takes 1:06:46.577795. I
>> Also noticed that RDD saveAsTextFile is creating thousands of empty files.
>> 
>> I assume creating all these empty files must be slowing down the system. Any
>> idea why this is happening? Do I have write a script to periodical remove
>> empty files?
>> 
>> 
>> Kind regards
>> 
>> Andy
>> 
>> tweetStrings = sc.textFile(inputDataURL)
>> 
>> 
>> def removeEmptyLines(line) :
>> if line:
>> return True
>> else :
>> emptyLineCount.add(1);
>> return False
>> 
>> emptyLineCount = sc.accumulator(0)
>> sample = (tweetStrings.filter(removeEmptyLines)
>>   .sample(withReplacement=False, fraction=0.01, seed=345678))
>> 
>> startTime = datetime.datetime.now()
>> sample.saveAsTextFile(saveDataURL)
>> 
>> endTime = datetime.datetime.now()
>> print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>> 
>> elapsed time:1:06:46.577795
>> 
>> 
>> Total number of empty files
>> $ hadoop fs -du {saveDataURL} | grep '^0' | wc ­l
>> 223515
>> Total number of files with data
>> $ hadoop fs -du {saveDataURL} | grep ­v '^0' | wc ­l
>> 4642
>> 
>> 
>> 
>> I randomly pick a part file. It¹s size is 9251




spark-csv on Amazon EMR

2015-11-23 Thread Daniel Lopes
Hi,

Some know how to use spark-csv in create-cluster statement of Amazon EMR
CLI?

Best,

-- 
*Daniel Lopes, B.Eng*
Data Scientist - BankFacil
CREA/SP 5069410560

Mob +55 (18) 99764-2733 
Ph +55 (11) 3522-8009
http://about.me/dannyeuu

Av. Nova Independência, 956, São Paulo, SP
Bairro Brooklin Paulista
CEP 04570-001
https://www.bankfacil.com.br


Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Xiao Li
In your case, maybe you can try to call the function coalesce?

Good luck,

Xiao Li

2015-11-23 12:15 GMT-08:00 Andy Davidson :

> Hi Sabarish
>
> I am but a simple padawan :-) I do not understand your answer. Why would
> Spark be creating so many empty partitions? My real problem is my
> application is very slow. I happened to notice thousands of empty files
> being created. I thought this is a hint to why my app is slow.
>
> My program calls sample( 0.01).filter(not null).saveAsTextFile(). This
> takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk.
> The total data writing in 38M.
>
> The data is read from HDFS. My understanding is Spark can not know in
> advance how HDFS partitioned the data. Spark knows I have a master and 3
> slaves machines. It knows how many works/executors are assigned to my Job.
> I would expect spark would be smart enough not create more partitions than
> I have worker machines?
>
> Also given I am not using any key/value operations like Join() or doing
> multiple scans I would assume my app would not benefit from partitioning.
>
>
> Kind regards
>
> Andy
>
>
> From: Sabarish Sasidharan 
> Date: Saturday, November 21, 2015 at 7:20 PM
> To: Andrew Davidson 
> Cc: "user @spark" 
> Subject: Re: newbie : why are thousands of empty files being created on
> HDFS?
>
> Those are empty partitions. I don't see the number of partitions specified
> in code. That then implies the default parallelism config is being used and
> is set to a very high number, the sum of empty + non empty files.
>
> Regards
> Sab
> On 21-Nov-2015 11:59 pm, "Andy Davidson" 
> wrote:
>
>> I start working on a very simple ETL pipeline for a POC. It reads a in a
>> data set of tweets stored as JSON strings on in HDFS and randomly selects
>> 1% of the observations and writes them to HDFS. It seems to run very
>> slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
>> Also noticed that RDD saveAsTextFile is creating thousands of empty
>> files.
>>
>> I assume creating all these empty files must be slowing down the system. Any
>> idea why this is happening? Do I have write a script to periodical remove
>> empty files?
>>
>>
>> Kind regards
>>
>> Andy
>>
>> tweetStrings = sc.textFile(inputDataURL)
>>
>> def removeEmptyLines(line) :
>> if line:
>> return True
>> else :
>> emptyLineCount.add(1);
>> return False
>>
>> emptyLineCount = sc.accumulator(0)
>> sample = (tweetStrings.filter(removeEmptyLines)
>>   .sample(withReplacement=False, fraction=0.01, seed=345678))
>>
>> startTime = datetime.datetime.now()
>> sample.saveAsTextFile(saveDataURL)
>>
>> endTime = datetime.datetime.now()
>> print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>>
>> elapsed time:1:06:46.577795
>>
>>
>>
>> *Total number of empty files*
>>
>> $ hadoop fs -du {saveDataURL} | grep '^0' | wc –l
>>
>> 223515
>>
>> *Total number of files with data*
>>
>> $ hadoop fs -du {saveDataURL} | grep –v '^0' | wc –l
>>
>> 4642
>>
>>
>> I randomly pick a part file. It’s size is 9251
>>
>>


Re: SparkR DataFrame , Out of memory exception for very small file.

2015-11-23 Thread Vipul Rai
Hi Jeff,

This is only part of the actual code.

My questions are mentioned in comments near the code.

SALES<- SparkR::sql(hiveContext, "select * from sales")
PRICING<- SparkR::sql(hiveContext, "select * from pricing")


## renaming of columns ##
#sales file#

# Is this right ??? Do we have to create a new DF for every column Addition
to the original DF.

# And if we do that , then what about the older DF , they will also take
memory ?

names(SALES)[which(names(SALES)=="div_no")]<-"DIV_NO"
names(SALES)[which(names(SALES)=="store_no")]<-"STORE_NO"

#pricing file#
names(PRICING)[which(names(PRICING)=="price_type_cd")]<-"PRICE_TYPE"
names(PRICING)[which(names(PRICING)=="price_amt")]<-"PRICE_AMT"

registerTempTable(SALES,"sales")
registerTempTable(PRICING,"pricing")

#merging sales and pricing file#
merg_sales_pricing<- SparkR::sql(hiveContext,"select .")

head(merg_sales_pricing)


​Thanks,
Vipul​


On 23 November 2015 at 14:52, Jeff Zhang  wrote:

> If possible, could you share your code ? What kind of operation are you
> doing on the dataframe ?
>
> On Mon, Nov 23, 2015 at 5:10 PM, Vipul Rai  wrote:
>
>> Hi Zeff,
>>
>> Thanks for the reply, but could you tell me why is it taking so much time.
>> What could be wrong , also when I remove the DataFrame from memory using
>> rm().
>> It does not clear the memory but the object is deleted.
>>
>> Also , What about the R functions which are not supported in SparkR.
>> Like ddply ??
>>
>> How to access the nth ROW of SparkR DataFrame.
>>
>> ​Regards,
>> Vipul​
>>
>> On 23 November 2015 at 14:25, Jeff Zhang  wrote:
>>
>>> >>> Do I need to create a new DataFrame for every update to the
>>> DataFrame like
>>> addition of new column or  need to update the original sales DataFrame.
>>>
>>> Yes, DataFrame is immutable, and every mutation of DataFrame will
>>> produce a new DataFrame.
>>>
>>>
>>>
>>> On Mon, Nov 23, 2015 at 4:44 PM, Vipul Rai 
>>> wrote:
>>>
 Hello Rui,

 Sorry , What I meant was the resultant of the original dataframe to
 which a new column was added gives a new DataFrame.

 Please check this for more

 https://spark.apache.org/docs/1.5.1/api/R/index.html

 Check for
 WithColumn


 Thanks,
 Vipul


 On 23 November 2015 at 12:42, Sun, Rui  wrote:

> Vipul,
>
> Not sure if I understand your question. DataFrame is immutable. You
> can't update a DataFrame.
>
> Could you paste some log info for the OOM error?
>
> -Original Message-
> From: vipulrai [mailto:vipulrai8...@gmail.com]
> Sent: Friday, November 20, 2015 12:11 PM
> To: user@spark.apache.org
> Subject: SparkR DataFrame , Out of memory exception for very small
> file.
>
> Hi Users,
>
> I have a general doubt regarding DataFrames in SparkR.
>
> I am trying to read a file from Hive and it gets created as DataFrame.
>
> sqlContext <- sparkRHive.init(sc)
>
> #DF
> sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true',
>  source = "com.databricks.spark.csv",
> inferSchema='true')
>
> registerTempTable(sales,"Sales")
>
> Do I need to create a new DataFrame for every update to the DataFrame
> like addition of new column or  need to update the original sales 
> DataFrame.
>
> sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as
> a")
>
>
> Please help me with this , as the orignal file is only 20MB but it
> throws out of memory exception on a cluster of 4GB Master and Two workers
> of 4GB each.
>
> Also, what is the logic with DataFrame do I need to register and drop
> tempTable after every update??
>
> Thanks,
> Vipul
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
> additional commands, e-mail: user-h...@spark.apache.org
>
>


 --
 Regards,
 Vipul Rai
 www.vipulrai.me
 +91-8892598819
 

>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>>
>> --
>> Regards,
>> Vipul Rai
>> www.vipulrai.me
>> +91-8892598819
>> 
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Regards,
Vipul Rai
www.vipulrai.me
+91-8892598819



Re: how to use sc.hadoopConfiguration from pyspark

2015-11-23 Thread Eike von Seggern
2015-11-23 10:26 GMT+01:00 Tamas Szuromi :
> Hello Eike,
>
> Thanks! Yes I'm using it with Hadoop 2.6 so I'll give a try to the 2.4
> build.
> Have you tried it with 1.6 Snapshot or do you know JIRA tickets for this
> missing libraries issues?

I've not tried 1.6.

https://issues.apache.org/jira/browse/SPARK-7442 is the only ticket
I've found in my browser's history.

Best

Eike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR DataFrame , Out of memory exception for very small file.

2015-11-23 Thread Jeff Zhang
>>> Do I need to create a new DataFrame for every update to the DataFrame
like
addition of new column or  need to update the original sales DataFrame.

Yes, DataFrame is immutable, and every mutation of DataFrame will produce a
new DataFrame.



On Mon, Nov 23, 2015 at 4:44 PM, Vipul Rai  wrote:

> Hello Rui,
>
> Sorry , What I meant was the resultant of the original dataframe to which
> a new column was added gives a new DataFrame.
>
> Please check this for more
>
> https://spark.apache.org/docs/1.5.1/api/R/index.html
>
> Check for
> WithColumn
>
>
> Thanks,
> Vipul
>
>
> On 23 November 2015 at 12:42, Sun, Rui  wrote:
>
>> Vipul,
>>
>> Not sure if I understand your question. DataFrame is immutable. You can't
>> update a DataFrame.
>>
>> Could you paste some log info for the OOM error?
>>
>> -Original Message-
>> From: vipulrai [mailto:vipulrai8...@gmail.com]
>> Sent: Friday, November 20, 2015 12:11 PM
>> To: user@spark.apache.org
>> Subject: SparkR DataFrame , Out of memory exception for very small file.
>>
>> Hi Users,
>>
>> I have a general doubt regarding DataFrames in SparkR.
>>
>> I am trying to read a file from Hive and it gets created as DataFrame.
>>
>> sqlContext <- sparkRHive.init(sc)
>>
>> #DF
>> sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true',
>>  source = "com.databricks.spark.csv", inferSchema='true')
>>
>> registerTempTable(sales,"Sales")
>>
>> Do I need to create a new DataFrame for every update to the DataFrame
>> like addition of new column or  need to update the original sales DataFrame.
>>
>> sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as a")
>>
>>
>> Please help me with this , as the orignal file is only 20MB but it throws
>> out of memory exception on a cluster of 4GB Master and Two workers of 4GB
>> each.
>>
>> Also, what is the logic with DataFrame do I need to register and drop
>> tempTable after every update??
>>
>> Thanks,
>> Vipul
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Regards,
> Vipul Rai
> www.vipulrai.me
> +91-8892598819
> 
>



-- 
Best Regards

Jeff Zhang


Re: A Problem About Running Spark 1.5 on YARN with Dynamic Alloction

2015-11-23 Thread Saisai Shao
Hi Tingwen,

Would you minding sharing your changes in
ExecutorAllocationManager#addExecutors().

>From my understanding and test, dynamic allocation can be worked when you
set the min to max number of executors to the same number.

Please check your Spark and Yarn log to make sure the executors are
correctly started, the warning log means currently resource is not enough
to submit tasks.

Thanks
Saisai


On Mon, Nov 23, 2015 at 8:41 PM, 谢廷稳  wrote:

> Hi all,
> I ran a SparkPi on YARN with Dynamic Allocation enabled and set 
> spark.dynamicAllocation.maxExecutors
> equals
> spark.dynamicAllocation.minExecutors,then I submit an application using:
> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
> yarn-cluster --driver-memory 4g --executor-memory 8g
> lib/spark-examples*.jar 200
>
> then, this application was submitted successfully, but the AppMaster
> always saying “15/11/23 20:13:08 WARN cluster.YarnClusterScheduler:
> Initial job has not accepted any resources; check your cluster UI to ensure
> that workers are registered and have sufficient resources”
> and when I open DEBUG,I found “15/11/23 20:24:00 DEBUG
> ExecutorAllocationManager: Not adding executors because our current target
> total is already 50 (limit 50)” in the console.
>
> I have fixed it by modifying code in
> ExecutorAllocationManager.addExecutors,Does this a bug or it was designed
> that we can’t set maxExecutors equals minExecutors?
>
> Thanks,
> Weber
>


Re:Re: RE: Error not found value sqlContext

2015-11-23 Thread prosp4300


So it is actually a compile time error in Eclipse, instead of jar generation 
from Eclipse, you can try to use sbt to assembly your jar, looks like your 
Eclipse does not recognize the Scala syntax properly.



At 2015-11-20 21:36:55, "satish chandra j"  wrote:

HI All,
I am getting this error while generating executable Jar file itself in Eclipse, 
if the Spark Application code has "import sqlContext.implicits._" line in it. 
Spark Applicaiton code  works fine if the above mentioned line does not exist 
as I have tested by fetching data from an RDBMS by implementing JDBCRDD


I tried couple of DataFrame related methods for which most of them errors 
stating that method has been overloaded


Please let me know if any further inputs needed to analyze it


Regards,
Satish Chandra


On Fri, Nov 20, 2015 at 5:46 PM, prosp4300  wrote:


Looks like a classpath problem, if you can provide the command you used to run 
your application and environment variable SPARK_HOME, it will help others to 
identify the root problem




在2015年11月20日 18:59,Satish 写道:
Hi Michael,
As my current Spark version is 1.4.0 than why it error out as "error: not 
found: value sqlContext" when I have "import sqlContext.implicits._" in my 
Spark Job

Regards
Satish Chandra
From: Michael Armbrust
Sent: ‎20-‎11-‎2015 01:36
To: satish chandra j
Cc: user; hari krishna
Subject: Re: Error not found value sqlContext


http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13



On Thu, Nov 19, 2015 at 4:19 AM, satish chandra j  
wrote:

HI All,
we have recently migrated from Spark 1.2.1 to Spark 1.4.0, I am fetching data 
from an RDBMS using JDBCRDD and register it as temp table to perform SQL query


Below approach is working fine in Spark 1.2.1:


JDBCRDD --> apply map using Case Class --> apply createSchemaRDD --> 
registerTempTable --> perform SQL Query


but now as createSchemaRDD is not supported in Spark 1.4.0



JDBCRDD --> apply map using Case Class with .toDF() --> registerTempTable --> 
perform SQL query on temptable




JDBCRDD --> apply map using Case Class --> RDD.toDF().registerTempTable --> 
perform SQL query on temptable



Only solution I get everywhere is to  use "import sqlContext.implicits._" after 
val SQLContext = new org.apache.spark.sql.SQLContext(sc)


But it errors with the two generic errors


1. error: not found: value sqlContext


2. value toDF is not a member of org.apache.spark.rdd.RDD


















Re: SparkR DataFrame , Out of memory exception for very small file.

2015-11-23 Thread Vipul Rai
Hello Rui,

Sorry , What I meant was the resultant of the original dataframe to which a
new column was added gives a new DataFrame.

Please check this for more

https://spark.apache.org/docs/1.5.1/api/R/index.html

Check for
WithColumn


Thanks,
Vipul


On 23 November 2015 at 12:42, Sun, Rui  wrote:

> Vipul,
>
> Not sure if I understand your question. DataFrame is immutable. You can't
> update a DataFrame.
>
> Could you paste some log info for the OOM error?
>
> -Original Message-
> From: vipulrai [mailto:vipulrai8...@gmail.com]
> Sent: Friday, November 20, 2015 12:11 PM
> To: user@spark.apache.org
> Subject: SparkR DataFrame , Out of memory exception for very small file.
>
> Hi Users,
>
> I have a general doubt regarding DataFrames in SparkR.
>
> I am trying to read a file from Hive and it gets created as DataFrame.
>
> sqlContext <- sparkRHive.init(sc)
>
> #DF
> sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true',
>  source = "com.databricks.spark.csv", inferSchema='true')
>
> registerTempTable(sales,"Sales")
>
> Do I need to create a new DataFrame for every update to the DataFrame like
> addition of new column or  need to update the original sales DataFrame.
>
> sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as a")
>
>
> Please help me with this , as the orignal file is only 20MB but it throws
> out of memory exception on a cluster of 4GB Master and Two workers of 4GB
> each.
>
> Also, what is the logic with DataFrame do I need to register and drop
> tempTable after every update??
>
> Thanks,
> Vipul
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Regards,
Vipul Rai
www.vipulrai.me
+91-8892598819



Re: how to use sc.hadoopConfiguration from pyspark

2015-11-23 Thread Tamas Szuromi
Hello Eike,

Thanks! Yes I'm using it with Hadoop 2.6 so I'll give a try to the 2.4
build.
Have you tried it with 1.6 Snapshot or do you know JIRA tickets for this
missing libraries issues?

Tamas





On 23 November 2015 at 10:21, Eike von Seggern 
wrote:

> Hello Tamas,
>
> 2015-11-20 17:23 GMT+01:00 Tamas Szuromi :
> >
> > Hello,
> >
> > I've just wanted to use sc._jsc.hadoopConfiguration().set('key','value')
> in pyspark 1.5.2 but I got set method not exists error.
>
>
> For me it's working with Spark 1.5.2 binary distribution built against
> Hadoop 2.4 (spark-1.5.2-bin-hadoop2.4):
>
>   1: sc._jsc.hadoopConfiguration().set
>   => 
>   2: sc._jsc.hadoopConfiguration().set("foo", "bar")
>
> Are you using the version built against Hadoop 2.6. I remember there
> were problems with missing libraries (or similar).
>
> Best
>
> Eike
>


Re: SparkR DataFrame , Out of memory exception for very small file.

2015-11-23 Thread Vipul Rai
Hi Zeff,

Thanks for the reply, but could you tell me why is it taking so much time.
What could be wrong , also when I remove the DataFrame from memory using
rm().
It does not clear the memory but the object is deleted.

Also , What about the R functions which are not supported in SparkR.
Like ddply ??

How to access the nth ROW of SparkR DataFrame.

​Regards,
Vipul​

On 23 November 2015 at 14:25, Jeff Zhang  wrote:

> >>> Do I need to create a new DataFrame for every update to the DataFrame
> like
> addition of new column or  need to update the original sales DataFrame.
>
> Yes, DataFrame is immutable, and every mutation of DataFrame will produce
> a new DataFrame.
>
>
>
> On Mon, Nov 23, 2015 at 4:44 PM, Vipul Rai  wrote:
>
>> Hello Rui,
>>
>> Sorry , What I meant was the resultant of the original dataframe to which
>> a new column was added gives a new DataFrame.
>>
>> Please check this for more
>>
>> https://spark.apache.org/docs/1.5.1/api/R/index.html
>>
>> Check for
>> WithColumn
>>
>>
>> Thanks,
>> Vipul
>>
>>
>> On 23 November 2015 at 12:42, Sun, Rui  wrote:
>>
>>> Vipul,
>>>
>>> Not sure if I understand your question. DataFrame is immutable. You
>>> can't update a DataFrame.
>>>
>>> Could you paste some log info for the OOM error?
>>>
>>> -Original Message-
>>> From: vipulrai [mailto:vipulrai8...@gmail.com]
>>> Sent: Friday, November 20, 2015 12:11 PM
>>> To: user@spark.apache.org
>>> Subject: SparkR DataFrame , Out of memory exception for very small file.
>>>
>>> Hi Users,
>>>
>>> I have a general doubt regarding DataFrames in SparkR.
>>>
>>> I am trying to read a file from Hive and it gets created as DataFrame.
>>>
>>> sqlContext <- sparkRHive.init(sc)
>>>
>>> #DF
>>> sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true',
>>>  source = "com.databricks.spark.csv", inferSchema='true')
>>>
>>> registerTempTable(sales,"Sales")
>>>
>>> Do I need to create a new DataFrame for every update to the DataFrame
>>> like addition of new column or  need to update the original sales DataFrame.
>>>
>>> sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as a")
>>>
>>>
>>> Please help me with this , as the orignal file is only 20MB but it
>>> throws out of memory exception on a cluster of 4GB Master and Two workers
>>> of 4GB each.
>>>
>>> Also, what is the logic with DataFrame do I need to register and drop
>>> tempTable after every update??
>>>
>>> Thanks,
>>> Vipul
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>>> additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Regards,
>> Vipul Rai
>> www.vipulrai.me
>> +91-8892598819
>> 
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Regards,
Vipul Rai
www.vipulrai.me
+91-8892598819



Re: how to use sc.hadoopConfiguration from pyspark

2015-11-23 Thread Eike von Seggern
Hello Tamas,

2015-11-20 17:23 GMT+01:00 Tamas Szuromi :
>
> Hello,
>
> I've just wanted to use sc._jsc.hadoopConfiguration().set('key','value') in 
> pyspark 1.5.2 but I got set method not exists error.


For me it's working with Spark 1.5.2 binary distribution built against
Hadoop 2.4 (spark-1.5.2-bin-hadoop2.4):

  1: sc._jsc.hadoopConfiguration().set
  => 
  2: sc._jsc.hadoopConfiguration().set("foo", "bar")

Are you using the version built against Hadoop 2.6. I remember there
were problems with missing libraries (or similar).

Best

Eike

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR DataFrame , Out of memory exception for very small file.

2015-11-23 Thread Jeff Zhang
If possible, could you share your code ? What kind of operation are you
doing on the dataframe ?

On Mon, Nov 23, 2015 at 5:10 PM, Vipul Rai  wrote:

> Hi Zeff,
>
> Thanks for the reply, but could you tell me why is it taking so much time.
> What could be wrong , also when I remove the DataFrame from memory using
> rm().
> It does not clear the memory but the object is deleted.
>
> Also , What about the R functions which are not supported in SparkR.
> Like ddply ??
>
> How to access the nth ROW of SparkR DataFrame.
>
> ​Regards,
> Vipul​
>
> On 23 November 2015 at 14:25, Jeff Zhang  wrote:
>
>> >>> Do I need to create a new DataFrame for every update to the
>> DataFrame like
>> addition of new column or  need to update the original sales DataFrame.
>>
>> Yes, DataFrame is immutable, and every mutation of DataFrame will produce
>> a new DataFrame.
>>
>>
>>
>> On Mon, Nov 23, 2015 at 4:44 PM, Vipul Rai 
>> wrote:
>>
>>> Hello Rui,
>>>
>>> Sorry , What I meant was the resultant of the original dataframe to
>>> which a new column was added gives a new DataFrame.
>>>
>>> Please check this for more
>>>
>>> https://spark.apache.org/docs/1.5.1/api/R/index.html
>>>
>>> Check for
>>> WithColumn
>>>
>>>
>>> Thanks,
>>> Vipul
>>>
>>>
>>> On 23 November 2015 at 12:42, Sun, Rui  wrote:
>>>
 Vipul,

 Not sure if I understand your question. DataFrame is immutable. You
 can't update a DataFrame.

 Could you paste some log info for the OOM error?

 -Original Message-
 From: vipulrai [mailto:vipulrai8...@gmail.com]
 Sent: Friday, November 20, 2015 12:11 PM
 To: user@spark.apache.org
 Subject: SparkR DataFrame , Out of memory exception for very small file.

 Hi Users,

 I have a general doubt regarding DataFrames in SparkR.

 I am trying to read a file from Hive and it gets created as DataFrame.

 sqlContext <- sparkRHive.init(sc)

 #DF
 sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true',
  source = "com.databricks.spark.csv",
 inferSchema='true')

 registerTempTable(sales,"Sales")

 Do I need to create a new DataFrame for every update to the DataFrame
 like addition of new column or  need to update the original sales 
 DataFrame.

 sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as
 a")


 Please help me with this , as the orignal file is only 20MB but it
 throws out of memory exception on a cluster of 4GB Master and Two workers
 of 4GB each.

 Also, what is the logic with DataFrame do I need to register and drop
 tempTable after every update??

 Thanks,
 Vipul



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org


>>>
>>>
>>> --
>>> Regards,
>>> Vipul Rai
>>> www.vipulrai.me
>>> +91-8892598819
>>> 
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
> Regards,
> Vipul Rai
> www.vipulrai.me
> +91-8892598819
> 
>



-- 
Best Regards

Jeff Zhang


DateTime Support - Hive Parquet

2015-11-23 Thread Bryan Jeffrey
All,

I am attempting to write objects that include a DateTime properties to a
persistent table using Spark 1.5.2 / HiveContext.  In 1.4.1 I was forced to
convert the DateTime properties to Timestamp properties.  I was under the
impression that this issue was fixed in the default Hive supported with
1.5.2 - however, I am still seeing the associated errors.

Is there a bug I can follow to determine when DateTime will be supported
for Parquet?

Regards,

Bryan Jeffrey


Re: Re: RE: Error not found value sqlContext

2015-11-23 Thread satish chandra j
Thanks for all the support.
It was a code issue which I overlooked it

Regards,
Satish Chandra

On Mon, Nov 23, 2015 at 3:49 PM, satish chandra j 
wrote:

> Sorry, just to understand my issue.if Eclipse could not understand
> Scala syntax properly than it should error for the other Spark Job which is
> fetching data from a RDBM and printing the output in console. I think there
> is some dependencies which are missing due to which "import
> sqlContext.implicits._" is not recognized during compile time
>
> Please let me know if any further inputs needed to fix the same
>
> Regards,
> Satish Chandra
>
> On Mon, Nov 23, 2015 at 3:29 PM, prosp4300  wrote:
>
>>
>>
>> So it is actually a compile time error in Eclipse, instead of jar
>> generation from Eclipse, you can try to use sbt to assembly your jar, looks
>> like your Eclipse does not recognize the Scala syntax properly.
>>
>>
>>
>> At 2015-11-20 21:36:55, "satish chandra j" 
>> wrote:
>>
>> HI All,
>> I am getting this error while generating executable Jar file itself in
>> Eclipse, if the Spark Application code has "import sqlContext.implicits._"
>> line in it. Spark Applicaiton code  works fine if the above mentioned line
>> does not exist as I have tested by fetching data from an RDBMS by
>> implementing JDBCRDD
>>
>> I tried couple of DataFrame related methods for which most of them errors
>> stating that method has been overloaded
>>
>> Please let me know if any further inputs needed to analyze it
>>
>> Regards,
>> Satish Chandra
>>
>> On Fri, Nov 20, 2015 at 5:46 PM, prosp4300  wrote:
>>
>>>
>>> Looks like a classpath problem, if you can provide the command you used
>>> to run your application and environment variable SPARK_HOME, it will help
>>> others to identify the root problem
>>>
>>>
>>> 在2015年11月20日 18:59,Satish  写道:
>>>
>>> Hi Michael,
>>> As my current Spark version is 1.4.0 than why it error out as "error:
>>> not found: value sqlContext" when I have "import sqlContext.implicits._" in
>>> my Spark Job
>>>
>>> Regards
>>> Satish Chandra
>>> --
>>> From: Michael Armbrust 
>>> Sent: ‎20-‎11-‎2015 01:36
>>> To: satish chandra j 
>>> Cc: user ; hari krishna 
>>> Subject: Re: Error not found value sqlContext
>>>
>>>
>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-10-12-to-13
>>>
>>> On Thu, Nov 19, 2015 at 4:19 AM, satish chandra j <
>>> jsatishchan...@gmail.com> wrote:
>>>
 HI All,
 we have recently migrated from Spark 1.2.1 to Spark 1.4.0, I am
 fetching data from an RDBMS using JDBCRDD and register it as temp table to
 perform SQL query

 Below approach is working fine in Spark 1.2.1:

 JDBCRDD --> apply map using Case Class --> apply createSchemaRDD -->
 registerTempTable --> perform SQL Query

 but now as createSchemaRDD is not supported in Spark 1.4.0

 JDBCRDD --> apply map using Case Class with* .toDF()* -->
 registerTempTable --> perform SQL query on temptable


 JDBCRDD --> apply map using Case Class --> RDD*.toDF()*.registerTempTable
 --> perform SQL query on temptable

 Only solution I get everywhere is to  use "import
 sqlContext.implicits._" after val SQLContext = new
 org.apache.spark.sql.SQLContext(sc)

 But it errors with the two generic errors

 *1. error: not found: value sqlContext*

 *2. value toDF is not a member of org.apache.spark.rdd.RDD*






>>>
>>>
>>>
>>
>>
>>
>>
>
>


Re: Data in one partition after reduceByKey

2015-11-23 Thread Patrick McGloin
I will answer my own question, since I figured it out.  Here is my answer
in case anyone else has the same issue.

My DateTimes were all without seconds and milliseconds since I wanted to
group data belonging to the same minute. The hashCode() for Joda DateTimes
which are one minute apart is a constant:

scala> val now = DateTime.now
now: org.joda.time.DateTime = 2015-11-23T11:14:17.088Z

scala> now.withSecondOfMinute(0).withMillisOfSecond(0).hashCode -
now.minusMinutes(1).withSecondOfMinute(0).withMillisOfSecond(0).hashCode
res42: Int = 6

As can be seen by this example, if the hashCode values are similarly
spaced, they can end up in the same partition:

scala> val nums = for(i <- 0 to 100) yield ((i*20 % 1000), i)
nums: scala.collection.immutable.IndexedSeq[(Int, Int)] =
Vector((0,0), (20,1), (40,2), (60,3), (80,4), (100,5), (120,6),
(140,7), (160,8), (180,9), (200,10), (220,11), (240,12), (260,13),
(280,14), (300,15), (320,16), (340,17), (360,18), (380,19), (400,20),
(420,21), (440,22), (460,23), (480,24), (500,25), (520,26), (540,27),
(560,28), (580,29), (600,30), (620,31), (640,32), (660,33), (680,34),
(700,35), (720,36), (740,37), (760,38), (780,39), (800,40), (820,41),
(840,42), (860,43), (880,44), (900,45), (920,46), (940,47), (960,48),
(980,49), (0,50), (20,51), (40,52), (60,53), (80,54), (100,55),
(120,56), (140,57), (160,58), (180,59), (200,60), (220,61), (240,62),
(260,63), (280,64), (300,65), (320,66), (340,67), (360,68), (380,69),
(400,70), (420,71), (440,72), (460,73), (480,74), (500...

scala> val rddNum = sc.parallelize(nums)
rddNum: org.apache.spark.rdd.RDD[(Int, Int)] =
ParallelCollectionRDD[0] at parallelize at :23

scala> val reducedNum = rddNum.reduceByKey(_+_)
reducedNum: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at
reduceByKey at :25

scala> reducedNum.mapPartitions(iter => Array(iter.size).iterator,
true).collect.toList

res2: List[Int] = List(50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0)

To distribute my data more evenly across the partitions I created my own
custom Partitoiner:

class JodaPartitioner(rddNumPartitions: Int) extends Partitioner {
  def numPartitions: Int = rddNumPartitions
  def getPartition(key: Any): Int = {
key match {
  case dateTime: DateTime =>
val sum = dateTime.getYear + dateTime.getMonthOfYear +
dateTime.getDayOfMonth + dateTime.getMinuteOfDay  +
dateTime.getSecondOfDay
sum % numPartitions
  case _ => 0
}
  }
}


On 20 November 2015 at 17:17, Patrick McGloin 
wrote:

> Hi,
>
> I have Spark application which contains the following segment:
>
> val reparitioned = rdd.repartition(16)
> val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned, startDate, 
> endDate)
> val mapped: RDD[(DateTime, myData)] = filtered.map(kv=(kv._1.processingTime, 
> kv._2))
> val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_)
>
> When I run this with some logging this is what I see:
>
> reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512, 2508, 
> 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)]
> filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 
> 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
> mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 
> 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
> reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)]
>
> My logging is done using these two lines:
>
> val sizes: RDD[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, 
> true)log.info(s"rdd ==> [${sizes.collect.toList}]")
>
> My question is why does my data end up in one partition after the
> reduceByKey? After the filter it can be seen that the data is evenly
> distributed, but the reduceByKey results in data in only one partition.
>
> Thanks,
>
> Patrick
>


Add Data Science Serbia meetup

2015-11-23 Thread Darko Marjanovic
Please add Data Science Serbia meetup group to list on the web site.

http://www.meetup.com/Data-Science-Serbia/

Thank you.

Best,

Darko Marjanovic
CEO & Co-Founder
Things Solver 
*M: +381637052054*
*E: da...@thingsolver.com *


Any workaround for Kafka couldn't find leaders for set?

2015-11-23 Thread Hudong Wang
Hi folks,
We have a 10 node cluster and have several topics. Each topic has about 256 
partitions with 3 replica factor. Now we run into an issue that in some topic, 
a few partition (< 10)'s leader is -1 and all of them has only one synced 
partition.
Exception in thread "main" org.apache.spark.SparkException: 
org.apache.spark.SparkException: Couldn't find leaders for Set([xxx,251], 
[xxx,253], [xxx,53], [xxx,161], [xxx,71], [xxx,163], [xxx,73])at 
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:416)
at 
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:416)
at scala.util.Either.fold(Either.scala:97)at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:415)
Is there any workaround so I can still createDirectStream with bad sets in 
system?
Many thanks!Tony

  

Re: how to us DataFrame.na.fill based on condition

2015-11-23 Thread Davies Liu
You could create a new column based on the expression: IF (condition1,
value1, old_column_value)

On Mon, Nov 23, 2015 at 11:57 AM, Vishnu Viswanath
 wrote:
> Thanks for the reply Davies
>
> I think replace, replaces a value with another value. But what I want to do
> is fill in the null value of a column.( I don't have a to_replace here )
>
> Regards,
> Vishnu
>
> On Mon, Nov 23, 2015 at 1:37 PM, Davies Liu  wrote:
>>
>> DataFrame.replace(to_replace, value, subset=None)
>>
>>
>> http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace
>>
>> On Mon, Nov 23, 2015 at 11:05 AM, Vishnu Viswanath
>>  wrote:
>> > Hi
>> >
>> > Can someone tell me if there is a way I can use the fill method in
>> > DataFrameNaFunctions based on some condition.
>> >
>> > e.g., df.na.fill("value1","column1","condition1")
>> > df.na.fill("value2","column1","condition2")
>> >
>> > i want to fill nulls in column1 with values - either value 1 or value 2,
>> > based on some condition.
>> >
>> > Thanks,
>
>
>
>
> --
> Thanks and Regards,
> Vishnu Viswanath
> +1 309 550 2311
> www.vishnuviswanath.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Andy Davidson
Hi Xiao and Sabarish

Using the Stage tab on the UI. It turns out you can see how many
partitions there are. If I did nothing I would have 228155 partition.
(This confirms what Sabarish said). I tried coalesce(3). RDD.count()
fails. I though given I have 3 workers and 1/3 of the data would easily
fit into memory this would be a good choice.

If I use coalesce(30) count works. How ever it still seems slow. It took
2.42 min to read 4720 records. My total data set size is 34M.

Any suggestions how to choose the number of partitions.?

 ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G')


The data was originally collected using spark stream. I noticed that the
number of default partitions == the number of files create on hdfs. I bet
each file is one spark streaming mini-batchI suspect if I concatenate
these into a small number of files things will run much faster. I suspect
I would not need to call coalesce() and that coalesce() is taking a lot of
time. Any suggestions how to choose the file number of files.

Kind regards

Andy


From:  Xiao Li 
Date:  Monday, November 23, 2015 at 12:21 PM
To:  Andrew Davidson 
Cc:  Sabarish Sasidharan , "user @spark"

Subject:  Re: newbie : why are thousands of empty files being created on
HDFS?


>In your case, maybe you can try to call the function coalesce?
>Good luck, 
>
>Xiao Li
>
>2015-11-23 12:15 GMT-08:00 Andy Davidson :
>
>Hi Sabarish
>
>I am but a simple padawan :-) I do not understand your answer. Why would
>Spark be creating so many empty partitions? My real problem is my
>application is very slow. I happened to notice thousands of empty files
>being created. I thought this is a hint to why my app is slow.
>
>My program calls sample( 0.01).filter(not null).saveAsTextFile(). This
>takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk.
>The total data writing in 38M.
>
>The data is read from HDFS. My understanding is Spark can not know in
>advance how HDFS partitioned the data. Spark knows I have a master and 3
>slaves machines. It knows how many works/executors are assigned to my
>Job. I would expect spark would be smart enough not create more
>partitions than I have worker machines?
>
>Also given I am not using any key/value operations like Join() or doing
>multiple scans I would assume my app would not benefit from partitioning.
>
>
>Kind regards
>
>Andy
>
>
>From:  Sabarish Sasidharan 
>Date:  Saturday, November 21, 2015 at 7:20 PM
>To:  Andrew Davidson 
>Cc:  "user @spark" 
>Subject:  Re: newbie : why are thousands of empty files being created on
>HDFS?
>
>
>
>Those are empty partitions. I don't see the number of partitions
>specified in code. That then implies the default parallelism config is
>being used and is set to a very high number, the sum of empty + non empty
>files.
>Regards
>Sab
>On 21-Nov-2015 11:59 pm, "Andy Davidson" 
>wrote:
>
>I start working on a very simple ETL pipeline for a POC. It reads a in a
>data set of tweets stored as JSON strings on in HDFS and randomly selects
>1% of the observations and writes them to HDFS. It seems to run very
>slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
>Also noticed that RDD saveAsTextFile is creating thousands of empty
>files. 
>
>I assume creating all these empty files must be slowing down the system.
>Any idea why this is happening? Do I have write a script to periodical
>remove empty files?
>
>
>Kind regards
>
>Andy
>
>tweetStrings = sc.textFile(inputDataURL)
>
>
>def removeEmptyLines(line) :
>if line:
>return True
>else :
>emptyLineCount.add(1);
>return False
>
>emptyLineCount = sc.accumulator(0)
>sample = (tweetStrings.filter(removeEmptyLines)
>  .sample(withReplacement=False, fraction=0.01, seed=345678))
>
>
>startTime = datetime.datetime.now()
>sample.saveAsTextFile(saveDataURL)
>
>endTime = datetime.datetime.now()
>print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>
>
>elapsed time:1:06:46.577795
>
>Total number of empty files$ hadoop fs -du {saveDataURL} | grep '^0' | wc
>­l223515
>Total number of files with data$ hadoop fs -du {saveDataURL} | grep ­v
>'^0' | wc ­l4642
>
>I randomly pick a part file. It¹s size is 9251
>
>
>
>
>
>
>
>
>
>
>
>
>
>



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need Help Diagnosing/operating/tuning

2015-11-23 Thread Igor Berman
you should check why executor is killed. as soon as it's killed you can get
all kind of strange exceptions...
either give your executors more memory(4G is rather small for spark )
or try to decrease your input or maybe split it into more partitions in
input format
23G in lzo might expand to x? in memory - it depends on your format

in general each executor has 4G of memory, when only part of it is used for
caching/shuffling(see spark configuration of diff fraction params
then you should divide this memory to number of cores in each executor
then you can understand approx what is your partition size...you can make
this arithmetic opposite way from size of partition to memory needed by
each executor

no point to make 300 retries...there is no magic in spark...if it fails
after 3 retry it will fail...

ui metrics can give you hints regarding partition size etc

On 23 November 2015 at 03:30, Jeremy Davis  wrote:

> It seems like the problem is related to —executor-cores. Is there possibly
> some sort of race condition when using multiple cores per executor?
>
>
> On Nov 22, 2015, at 12:38 PM, Jeremy Davis  wrote:
>
>
> Hello,
> I’m at a loss trying to diagnose why my spark job is failing. (works fine
> on small data)
> It is failing during the repartition, or on the subsequent steps.. which
> then seem to fail and fall back to repartitioning..
> I’ve tried adjusting every parameter I can find, but have had no success.
> Input is only 23GB of LZO )probably 8x compression), and I’ve verified all
> files are valid (not corrupted).
> I’ve tried more and less of : memory, partitions, executors, cores...
> I’ve set maxFailures up to 300.
> Setting 4GB heap usually makes it through repartitioning, but fails on
> subsequent steps (Sometimes being killed from running past memory limits).
> Larger Heaps usually don’t even make it through the first repartition due
> to all kinds of weird errors that look like read errors...
>
> I’m at a loss on how to debug this thing.
> Is there a tutorial somewhere?
>
> ——
>
>
> Spark 1.4.1
> Java 7
> Cluster has 3TB of memory, and 400 cores.
>
>
> Here are a collection of exceptions
>
>
> java.io.FileNotFoundException: 
> /var/storage/sdd3/nm-local/usercache/jeremy/appcache/application_1447722466442_1649/blockmgr-9ed5583f-cac1-4701-9f70-810c215b954f/13/shuffle_0_5_0.data
>  (No such file or directory)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.(FileOutputStream.java:221)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:128)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:215)
>   at 
> org.apache.spark.util.collection.ChainedBuffer.read(ChainedBuffer.scala:56)
>   at 
> org.apache.spark.util.collection.PartitionedSerializedPairBuffer$$anon$2.writeNext(PartitionedSerializedPairBuffer.scala:137)
>   at 
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:757)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:70)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
>
>
>
>
> java.lang.InternalError: lzo1x_decompress_safe returned: -6
>   at 
> com.hadoop.compression.lzo.LzoDecompressor.decompressBytesDirect(Native 
> Method)
>   at 
> com.hadoop.compression.lzo.LzoDecompressor.decompress(LzoDecompressor.java:315)
>   at 
> com.hadoop.compression.lzo.LzopDecompressor.decompress(LzopDecompressor.java:122)
>   at 
> com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:247)
>   at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
>   at java.io.InputStream.read(InputStream.java:101)
>   at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>   at 
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>   at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>   at 
> 

Spark Streaming idempotent writes to HDFS

2015-11-23 Thread Michael
Hi all,

I'm working on project with spark streaming, the goal is to process log
files from S3 and save them on hadoop to later analyze them with
sparkSQL. 
Everything works well except when I kill the spark application and
restart it: it picks up from the latest processed batch and reprocesses
it which results in duplicate data on hdfs.

How can I make the writing step on hdfs idempotent ? I couldn't find any
way to control for example the filenames of the parquet files being
written, the idea being to include the batch time so that the same batch
gets written always on the same path.
I've also tried with mode("overwrite") but looks that each batch gets
written on the same file every time.
Any help would be greatly appreciated.

Thanks,
Michael

--

def save_rdd(batch_time, rdd):
sqlContext = SQLContext(rdd.context)
df = sqlContext.createDataFrame(rdd, log_schema)

df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory)

def create_ssc(checkpoint_dir, spark_master):

sc = SparkContext(spark_master, app_name)
ssc = StreamingContext(sc, batch_interval)
ssc.checkpoint(checkpoint_dir)   

parsed = dstream.map(lambda line: log_parser(line))
parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd)

return ssc

ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda:
create_ssc(checkpoint_dir, spark_master)
ssc.start()
ssc.awaitTermination()

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: A Problem About Running Spark 1.5 on YARN with Dynamic Alloction

2015-11-23 Thread Saisai Shao
I don't think it is a bug, maybe something wrong with your Spark / Yarn
configurations.

On Tue, Nov 24, 2015 at 12:13 PM, 谢廷稳  wrote:

> OK,the YARN cluster was used by myself,it have 6 node witch can run over
> 100 executor, and the YARN RM logs showed that the Spark application did
> not requested resource from it.
>
> Is this a bug? Should I create a JIRA for this problem?
>
> 2015-11-24 12:00 GMT+08:00 Saisai Shao :
>
>> OK, so this looks like your Yarn cluster  does not allocate containers
>> which you supposed should be 50. Does the yarn cluster have enough resource
>> after allocating AM container, if not, that is the problem.
>>
>> The problem not lies in dynamic allocation from my guess of your
>> description. I said I'm OK with min and max executors to the same number.
>>
>> On Tue, Nov 24, 2015 at 11:54 AM, 谢廷稳  wrote:
>>
>>> Hi Saisai,
>>> I'm sorry for did not describe it clearly,YARN debug log said I have 50
>>> executors,but ResourceManager showed that I only have 1 container for the
>>> AppMaster.
>>>
>>> I have checked YARN RM logs,after AppMaster changed state from ACCEPTED
>>> to RUNNING,it did not have log about this job any more.So,the problem is I
>>> did not have any executor but ExecutorAllocationManager think I have.Would
>>> you minding having a test in your cluster environment?
>>> Thanks,
>>> Weber
>>>
>>> 2015-11-24 11:00 GMT+08:00 Saisai Shao :
>>>
 I think this behavior is expected, since you already have 50 executors
 launched, so no need to acquire additional executors. You change is not
 solid, it is just hiding the log.

 Again I think you should check the logs of Yarn and Spark to see if
 executors are started correctly. Why resource is still not enough where you
 already have 50 executors.

 On Tue, Nov 24, 2015 at 10:48 AM, 谢廷稳  wrote:

> Hi SaiSai,
> I have changed  "if (numExecutorsTarget >= maxNumExecutors)"  to "if
> (numExecutorsTarget > maxNumExecutors)" of the first line in the
> ExecutorAllocationManager#addExecutors() and it rans well.
> In my opinion,when I was set minExecutors equals maxExecutors,when the
> first time to add Executors,numExecutorsTarget equals maxNumExecutors and
> it repeat printe "DEBUG ExecutorAllocationManager: Not adding
> executors because our current target total is already 50 (limit 50)".
> Thanks
> Weber
>
> 2015-11-23 21:00 GMT+08:00 Saisai Shao :
>
>> Hi Tingwen,
>>
>> Would you minding sharing your changes in
>> ExecutorAllocationManager#addExecutors().
>>
>> From my understanding and test, dynamic allocation can be worked when
>> you set the min to max number of executors to the same number.
>>
>> Please check your Spark and Yarn log to make sure the executors are
>> correctly started, the warning log means currently resource is not enough
>> to submit tasks.
>>
>> Thanks
>> Saisai
>>
>>
>> On Mon, Nov 23, 2015 at 8:41 PM, 谢廷稳  wrote:
>>
>>> Hi all,
>>> I ran a SparkPi on YARN with Dynamic Allocation enabled and set 
>>> spark.dynamicAllocation.maxExecutors
>>> equals
>>> spark.dynamicAllocation.minExecutors,then I submit an application
>>> using:
>>> ./bin/spark-submit --class org.apache.spark.examples.SparkPi
>>> --master yarn-cluster --driver-memory 4g --executor-memory 8g
>>> lib/spark-examples*.jar 200
>>>
>>> then, this application was submitted successfully, but the AppMaster
>>> always saying “15/11/23 20:13:08 WARN cluster.YarnClusterScheduler:
>>> Initial job has not accepted any resources; check your cluster UI to 
>>> ensure
>>> that workers are registered and have sufficient resources”
>>> and when I open DEBUG,I found “15/11/23 20:24:00 DEBUG
>>> ExecutorAllocationManager: Not adding executors because our current 
>>> target
>>> total is already 50 (limit 50)” in the console.
>>>
>>> I have fixed it by modifying code in
>>> ExecutorAllocationManager.addExecutors,Does this a bug or it was 
>>> designed
>>> that we can’t set maxExecutors equals minExecutors?
>>>
>>> Thanks,
>>> Weber
>>>
>>
>>
>

>>>
>>
>


Re: load multiple directory using dataframe load

2015-11-23 Thread Fengdong Yu
hiveContext.read.format(“orc”).load(“bypath/*”)



> On Nov 24, 2015, at 1:07 PM, Renu Yadav  wrote:
> 
> Hi ,
> 
> I am using dataframe and want to load orc file using multiple directory
> like this:
> hiveContext.read.format.load("mypath/3660,myPath/3661")
> 
> but it is not working.
> 
> Please suggest how to achieve this
> 
> Thanks & Regards,
> Renu Yadav


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming idempotent writes to HDFS

2015-11-23 Thread Burak Yavuz
Not sure if it would be the most efficient, but maybe you can think of the
filesystem as a key value store, and write each batch to a sub-directory,
where the directory name is the batch time. If the directory already
exists, then you shouldn't write it. Then you may have a following batch
job that will coalesce files, in order to "close the day".

Burak

On Mon, Nov 23, 2015 at 8:58 PM, Michael  wrote:

> Hi all,
>
> I'm working on project with spark streaming, the goal is to process log
> files from S3 and save them on hadoop to later analyze them with
> sparkSQL.
> Everything works well except when I kill the spark application and
> restart it: it picks up from the latest processed batch and reprocesses
> it which results in duplicate data on hdfs.
>
> How can I make the writing step on hdfs idempotent ? I couldn't find any
> way to control for example the filenames of the parquet files being
> written, the idea being to include the batch time so that the same batch
> gets written always on the same path.
> I've also tried with mode("overwrite") but looks that each batch gets
> written on the same file every time.
> Any help would be greatly appreciated.
>
> Thanks,
> Michael
>
> --
>
> def save_rdd(batch_time, rdd):
> sqlContext = SQLContext(rdd.context)
> df = sqlContext.createDataFrame(rdd, log_schema)
>
> df.write.mode("append").partitionBy("log_date").parquet(hdfs_dest_directory)
>
> def create_ssc(checkpoint_dir, spark_master):
>
> sc = SparkContext(spark_master, app_name)
> ssc = StreamingContext(sc, batch_interval)
> ssc.checkpoint(checkpoint_dir)
>
> parsed = dstream.map(lambda line: log_parser(line))
> parsed.foreachRDD(lambda batch_time, rdd: save_rdd(batch_time, rdd)
>
> return ssc
>
> ssc = StreamingContext.getOrCreate(checkpoint_dir, lambda:
> create_ssc(checkpoint_dir, spark_master)
> ssc.start()
> ssc.awaitTermination()
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Relation between RDDs, DataFrames and Project Tungsten

2015-11-23 Thread Mark Hamstra
>
> In the near future, I guess GUI interfaces of Spark will be available
> soon. Spark users (e.g, CEOs) might not need to know what are RDDs at all.
> They can analyze their data by clicking a few buttons, instead of writing
> the programs. : )


That's not in the future.  :)

On Mon, Nov 23, 2015 at 7:30 PM, Xiao Li  wrote:

> Let me share my understanding.
>
> If we view Spark as analytics OS, RDD APIs are like OS system calls. These
> low-level system calls can be called in the program languages like C.
> DataFrame and Dataset APIs are like higher-level programming languages.
> They hide the low level complexity and the compiler (i.e., Catalyst) will
> optimize your programs. For most users, the SQL, DataFrame and Dataset APIs
> are good enough to satisfy most requirements.
>
> In the near future, I guess GUI interfaces of Spark will be available
> soon. Spark users (e.g, CEOs) might not need to know what are RDDs at all.
> They can analyze their data by clicking a few buttons, instead of writing
> the programs. : )
>
> Wish Spark will be the most popular analytics OS in the world! : )
>
> Have a good holiday everyone!
>
> Xiao Li
>
>
>
> 2015-11-23 17:56 GMT-08:00 Jakob Odersky :
>
>> Thanks Michael, that helped me a lot!
>>
>> On 23 November 2015 at 17:47, Michael Armbrust 
>> wrote:
>>
>>> Here is how I view the relationship between the various components of
>>> Spark:
>>>
>>>  - *RDDs - *a low level API for expressing DAGs that will be executed
>>> in parallel by Spark workers
>>>  - *Catalyst -* an internal library for expressing trees that we use to
>>> build relational algebra and expression evaluation.  There's also an
>>> optimizer and query planner than turns these into logical concepts into RDD
>>> actions.
>>>  - *Tungsten -* an internal optimized execution engine that can compile
>>> catalyst expressions into efficient java bytecode that operates directly on
>>> serialized binary data.  It also has nice low level data structures /
>>> algorithms like hash tables and sorting that operate directly on serialized
>>> data.  These are used by the physical nodes that are produced by the query
>>> planner (and run inside of RDD operation on workers).
>>>  - *DataFrames - *a user facing API that is similar to SQL/LINQ for
>>> constructing dataflows that are backed by catalyst logical plans
>>>  - *Datasets* - a user facing API that is similar to the RDD API for
>>> constructing dataflows that are backed by catalyst logical plans
>>>
>>> So everything is still operating on RDDs but I anticipate most users
>>> will eventually migrate to the higher level APIs for convenience and
>>> automatic optimization
>>>
>>> On Mon, Nov 23, 2015 at 4:18 PM, Jakob Odersky 
>>> wrote:
>>>
 Hi everyone,

 I'm doing some reading-up on all the newer features of Spark such as
 DataFrames, DataSets and Project Tungsten. This got me a bit confused on
 the relation between all these concepts.

 When starting to learn Spark, I read a book and the original paper on
 RDDs, this lead me to basically think "Spark == RDDs".
 Now, looking into DataFrames, I read that they are basically
 (distributed) collections with an associated schema, thus enabling
 declarative queries and optimization (through Catalyst). I am uncertain how
 DataFrames relate to RDDs: are DataFrames transformed to operations on RDDs
 once they have been optimized? Or are they completely different concepts?
 In case of the latter, do DataFrames still use the Spark scheduler and get
 broken down into a DAG of stages and tasks?

 Regarding project Tungsten, where does it fit in? To my understanding
 it is used to efficiently cache data in memory and may also be used to
 generate query code for specialized hardware. This sounds as though it
 would work on Spark's worker nodes, however it would also only work with
 schema-associated data (aka DataFrames), thus leading me to the conclusion
 that RDDs and DataFrames do not share a common backend which in turn
 contradicts my conception of "Spark == RDDs".

 Maybe I missed the obvious as these questions seem pretty basic,
 however I was unable to find clear answers in Spark documentation or
 related papers and talks. I would greatly appreciate any clarifications.

 thanks,
 --Jakob

>>>
>>>
>>
>


Re: How to have Kafka Direct Consumers show up in Kafka Consumer reporting?

2015-11-23 Thread swetha kasireddy
OK. I see the following to query the offsets. In our Kafka Stream, the
offsets are stored in ZooKeeper and I am not updating Offsets in Zookeeper.

How does Kafka Direct know which offsets to query?  Does it calculate
automatically as to which offsets to query?I have "auto.offset.reset" ->
"largest".

It looks like it is trying to query the offset from the latest offset each
time and those offsets are not available in Kafka Stream anymore. Other
Consumers that use the same stream and has zookeeper quorum seems to be
working fine.


 // Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array[OffsetRange]()

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
   ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
 println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }


On Mon, Nov 23, 2015 at 6:31 PM, swetha kasireddy  wrote:

> Also, does Kafka direct query the offsets from the zookeeper directly?
> From where does it get the offsets? There is data in those offsets, but
> somehow Kafka Direct does not seem to pick it up?
>
> On Mon, Nov 23, 2015 at 6:18 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> I mean to show the Spark Kafka Direct consumers in Kafka Stream UI.
>> Usually we create a consumer and the consumer gets shown in the Kafka
>> Stream UI. How do I log the offsets in the Spark Job?
>>
>> On Mon, Nov 23, 2015 at 6:11 PM, Cody Koeninger 
>> wrote:
>>
>>> What exactly do you mean by kafka consumer reporting?
>>>
>>> I'd log the offsets in your spark job and try running
>>>
>>> kafka-simple-consumer-shell.sh --partition $yourbadpartition
>>> --print-offsets
>>>
>>> at the same time your spark job is running
>>>
>>> On Mon, Nov 23, 2015 at 7:37 PM, swetha 
>>> wrote:
>>>
 Hi,

 We see a bunch of issues like the following in Our Spark Kafka Direct.
 Any
 idea  as to how make Kafka Direct Consumers show up in Kafka Consumer
 reporting to debug this issue?


 Job aborted due to stage failure: Task 47 in stage 336.0 failed 4 times,
 most recent failure: Lost task 47.3 in stage 336.0 (TID 5283,
 10.227.64.52):
 java.lang.AssertionError: assertion failed: Ran out of messages before
 reaching ending offset 225474235 for topic hubble_stream partition 55
 start
 225467496. This should not happen, and indicates that messages may have
 been
 lost



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Kafka-Direct-Consumers-show-up-in-Kafka-Consumer-reporting-tp25457.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


load multiple directory using dataframe load

2015-11-23 Thread Renu Yadav
Hi ,

I am using dataframe and want to load orc file using multiple directory
like this:
hiveContext.read.format.load("mypath/3660,myPath/3661")

but it is not working.

Please suggest how to achieve this

Thanks & Regards,
Renu Yadav


Re: Spark Kafka Direct Error

2015-11-23 Thread swetha kasireddy
Does Kafka direct query the offsets from the zookeeper directly? From where
does it get the offsets? There is data in those offsets, but somehow Kafka
Direct does not seem to pick it up. Other Consumers that use Zoo Keeper
Quorum of that Stream seems to be fine. Only Kafka Direct seems to have
issues. How does Kafka Direct know which offsets to query after getting the
initial batches from  "auto.offset.reset" -> "largest"?

On Mon, Nov 23, 2015 at 11:01 AM, Cody Koeninger  wrote:

> No, that means that at the time the batch was scheduled, the kafka leader
> reported the ending offset was 221572238, but during processing, kafka
> stopped returning messages before reaching that ending offset.
>
> That probably means something got screwed up with Kafka - e.g. you lost a
> leader and lost messages in the process.
>
> On Mon, Nov 23, 2015 at 12:57 PM, swetha 
> wrote:
>
>> Hi,
>>
>> I see the following error in my Spark Kafka Direct. Would this mean that
>> Kafka Direct is not able to catch up with the messages and is failing?
>>
>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
>> 10.227.64.52):
>> java.lang.AssertionError: assertion failed: Ran out of messages before
>> reaching ending offset 221572238 for topic hubble_stream partition 88
>> start
>> 221563725. This should not happen, and indicates that messages may have
>> been
>> lost
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: A Problem About Running Spark 1.5 on YARN with Dynamic Alloction

2015-11-23 Thread 谢廷稳
Hi Saisai,
Would you mind giving me some tips about this problem? After check YARN RM
logs, I think Spark application didn't request resources from it, So, I
guess this problem is none of YARN's business. and the spark conf of my
cluster will be list in the following:

spark.shuffle.service.enabled true
spark.dynamicAllocation.enabled true
spark.shuffle.service.port 7337
spark.dynamicAllocation.maxExecutors 50
spark.dynamicAllocation.minExecutors 50

If I change spark.dynamicAllocation.minExecutors from 50 to 49 or else less
than 50 it will work.  So, I think even if it isn't a bug about Dynamic
Alloction, something else may be wrong.
And, you said that you are OK with min and max executors to the same
number, Could you tell me your test cluster environment?

Thanks

2015-11-24 13:10 GMT+08:00 Saisai Shao :

> I don't think it is a bug, maybe something wrong with your Spark / Yarn
> configurations.
>
> On Tue, Nov 24, 2015 at 12:13 PM, 谢廷稳  wrote:
>
>> OK,the YARN cluster was used by myself,it have 6 node witch can run over
>> 100 executor, and the YARN RM logs showed that the Spark application did
>> not requested resource from it.
>>
>> Is this a bug? Should I create a JIRA for this problem?
>>
>> 2015-11-24 12:00 GMT+08:00 Saisai Shao :
>>
>>> OK, so this looks like your Yarn cluster  does not allocate containers
>>> which you supposed should be 50. Does the yarn cluster have enough resource
>>> after allocating AM container, if not, that is the problem.
>>>
>>> The problem not lies in dynamic allocation from my guess of your
>>> description. I said I'm OK with min and max executors to the same number.
>>>
>>> On Tue, Nov 24, 2015 at 11:54 AM, 谢廷稳  wrote:
>>>
 Hi Saisai,
 I'm sorry for did not describe it clearly,YARN debug log said I have 50
 executors,but ResourceManager showed that I only have 1 container for the
 AppMaster.

 I have checked YARN RM logs,after AppMaster changed state from ACCEPTED
 to RUNNING,it did not have log about this job any more.So,the problem is I
 did not have any executor but ExecutorAllocationManager think I have.Would
 you minding having a test in your cluster environment?
 Thanks,
 Weber

 2015-11-24 11:00 GMT+08:00 Saisai Shao :

> I think this behavior is expected, since you already have 50 executors
> launched, so no need to acquire additional executors. You change is not
> solid, it is just hiding the log.
>
> Again I think you should check the logs of Yarn and Spark to see if
> executors are started correctly. Why resource is still not enough where 
> you
> already have 50 executors.
>
> On Tue, Nov 24, 2015 at 10:48 AM, 谢廷稳  wrote:
>
>> Hi SaiSai,
>> I have changed  "if (numExecutorsTarget >= maxNumExecutors)"  to "if
>> (numExecutorsTarget > maxNumExecutors)" of the first line in the
>> ExecutorAllocationManager#addExecutors() and it rans well.
>> In my opinion,when I was set minExecutors equals maxExecutors,when
>> the first time to add Executors,numExecutorsTarget equals maxNumExecutors
>> and it repeat printe "DEBUG ExecutorAllocationManager: Not adding
>> executors because our current target total is already 50 (limit 50)".
>> Thanks
>> Weber
>>
>> 2015-11-23 21:00 GMT+08:00 Saisai Shao :
>>
>>> Hi Tingwen,
>>>
>>> Would you minding sharing your changes in
>>> ExecutorAllocationManager#addExecutors().
>>>
>>> From my understanding and test, dynamic allocation can be worked
>>> when you set the min to max number of executors to the same number.
>>>
>>> Please check your Spark and Yarn log to make sure the executors are
>>> correctly started, the warning log means currently resource is not 
>>> enough
>>> to submit tasks.
>>>
>>> Thanks
>>> Saisai
>>>
>>>
>>> On Mon, Nov 23, 2015 at 8:41 PM, 谢廷稳  wrote:
>>>
 Hi all,
 I ran a SparkPi on YARN with Dynamic Allocation enabled and set 
 spark.dynamicAllocation.maxExecutors
 equals
 spark.dynamicAllocation.minExecutors,then I submit an application
 using:
 ./bin/spark-submit --class org.apache.spark.examples.SparkPi
 --master yarn-cluster --driver-memory 4g --executor-memory 8g
 lib/spark-examples*.jar 200

 then, this application was submitted successfully, but the
 AppMaster always saying “15/11/23 20:13:08 WARN
 cluster.YarnClusterScheduler: Initial job has not accepted any 
 resources;
 check your cluster UI to ensure that workers are registered and have
 sufficient resources”
 and when I open DEBUG,I found “15/11/23 20:24:00 DEBUG
 ExecutorAllocationManager: Not 

Re: Re: A Problem About Running Spark 1.5 on YARN with Dynamic Alloction

2015-11-23 Thread cherrywayb...@gmail.com
can you show your parameter values in your env ?
yarn.nodemanager.resource.cpu-vcores 
yarn.nodemanager.resource.memory-mb



cherrywayb...@gmail.com
 
From: 谢廷稳
Date: 2015-11-24 12:13
To: Saisai Shao
CC: spark users
Subject: Re: A Problem About Running Spark 1.5 on YARN with Dynamic Alloction
OK,the YARN cluster was used by myself,it have 6 node witch can run over 100 
executor, and the YARN RM logs showed that the Spark application did not 
requested resource from it.

Is this a bug? Should I create a JIRA for this problem?

2015-11-24 12:00 GMT+08:00 Saisai Shao :
OK, so this looks like your Yarn cluster  does not allocate containers which 
you supposed should be 50. Does the yarn cluster have enough resource after 
allocating AM container, if not, that is the problem.

The problem not lies in dynamic allocation from my guess of your description. I 
said I'm OK with min and max executors to the same number.

On Tue, Nov 24, 2015 at 11:54 AM, 谢廷稳  wrote:
Hi Saisai,
I'm sorry for did not describe it clearly,YARN debug log said I have 50 
executors,but ResourceManager showed that I only have 1 container for the 
AppMaster.

I have checked YARN RM logs,after AppMaster changed state from ACCEPTED to 
RUNNING,it did not have log about this job any more.So,the problem is I did not 
have any executor but ExecutorAllocationManager think I have.Would you minding 
having a test in your cluster environment?
Thanks,
Weber

2015-11-24 11:00 GMT+08:00 Saisai Shao :
I think this behavior is expected, since you already have 50 executors 
launched, so no need to acquire additional executors. You change is not solid, 
it is just hiding the log.

Again I think you should check the logs of Yarn and Spark to see if executors 
are started correctly. Why resource is still not enough where you already have 
50 executors.

On Tue, Nov 24, 2015 at 10:48 AM, 谢廷稳  wrote:
Hi SaiSai,
I have changed  "if (numExecutorsTarget >= maxNumExecutors)"  to "if 
(numExecutorsTarget > maxNumExecutors)" of the first line in the 
ExecutorAllocationManager#addExecutors() and it rans well.
In my opinion,when I was set minExecutors equals maxExecutors,when the first 
time to add Executors,numExecutorsTarget equals maxNumExecutors and it repeat 
printe "DEBUG ExecutorAllocationManager: Not adding executors because our 
current target total is already 50 (limit 50)".
Thanks
Weber

2015-11-23 21:00 GMT+08:00 Saisai Shao :
Hi Tingwen,

Would you minding sharing your changes in 
ExecutorAllocationManager#addExecutors().

From my understanding and test, dynamic allocation can be worked when you set 
the min to max number of executors to the same number.

Please check your Spark and Yarn log to make sure the executors are correctly 
started, the warning log means currently resource is not enough to submit tasks.

Thanks
Saisai


On Mon, Nov 23, 2015 at 8:41 PM, 谢廷稳  wrote:
Hi all,
I ran a SparkPi on YARN with Dynamic Allocation enabled and set 
spark.dynamicAllocation.maxExecutors equals
spark.dynamicAllocation.minExecutors,then I submit an application using:
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
yarn-cluster --driver-memory 4g --executor-memory 8g lib/spark-examples*.jar 200

then, this application was submitted successfully, but the AppMaster always 
saying “15/11/23 20:13:08 WARN cluster.YarnClusterScheduler: Initial job has 
not accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient resources” 
and when I open DEBUG,I found “15/11/23 20:24:00 DEBUG 
ExecutorAllocationManager: Not adding executors because our current target 
total is already 50 (limit 50)” in the console.

I have fixed it by modifying code in 
ExecutorAllocationManager.addExecutors,Does this a bug or it was designed that 
we can’t set maxExecutors equals minExecutors?

Thanks,
Weber








Re: DateTime Support - Hive Parquet

2015-11-23 Thread Cheng Lian

Hey Bryan,

What do you mean by "DateTime properties"? Hive and Spark SQL both 
support DATE and TIMESTAMP types, but there's no DATETIME type. So I 
assume you are referring to Java class DateTime (possibly the one in 
joda)? Could you please provide a sample snippet that illustrates your 
requirement?


Cheng

On 11/23/15 9:40 PM, Bryan Jeffrey wrote:

All,

I am attempting to write objects that include a DateTime properties to 
a persistent table using Spark 1.5.2 / HiveContext.  In 1.4.1 I was 
forced to convert the DateTime properties to Timestamp properties.  I 
was under the impression that this issue was fixed in the default Hive 
supported with 1.5.2 - however, I am still seeing the associated errors.


Is there a bug I can follow to determine when DateTime will be 
supported for Parquet?


Regards,

Bryan Jeffrey



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Relation between RDDs, DataFrames and Project Tungsten

2015-11-23 Thread Jakob Odersky
Hi everyone,

I'm doing some reading-up on all the newer features of Spark such as
DataFrames, DataSets and Project Tungsten. This got me a bit confused on
the relation between all these concepts.

When starting to learn Spark, I read a book and the original paper on RDDs,
this lead me to basically think "Spark == RDDs".
Now, looking into DataFrames, I read that they are basically (distributed)
collections with an associated schema, thus enabling declarative queries
and optimization (through Catalyst). I am uncertain how DataFrames relate
to RDDs: are DataFrames transformed to operations on RDDs once they have
been optimized? Or are they completely different concepts? In case of the
latter, do DataFrames still use the Spark scheduler and get broken down
into a DAG of stages and tasks?

Regarding project Tungsten, where does it fit in? To my understanding it is
used to efficiently cache data in memory and may also be used to generate
query code for specialized hardware. This sounds as though it would work on
Spark's worker nodes, however it would also only work with
schema-associated data (aka DataFrames), thus leading me to the conclusion
that RDDs and DataFrames do not share a common backend which in turn
contradicts my conception of "Spark == RDDs".

Maybe I missed the obvious as these questions seem pretty basic, however I
was unable to find clear answers in Spark documentation or related papers
and talks. I would greatly appreciate any clarifications.

thanks,
--Jakob


spark-ec2 script to launch cluster running Spark 1.5.2 built with HIVE?

2015-11-23 Thread Jeff Schecter
Hi all,

As far as I can tell, the bundled spark-ec2 script provides no way to
launch a cluster running Spark 1.5.2 pre-built with HIVE.

That is to say, all of the pre-build versions of Spark 1.5.2 in the s3 bin
spark-related-packages are missing HIVE.

aws s3 ls s3://spark-related-packages/ | grep 1.5.2


Am I missing something here? I'd rather avoid resorting to whipping up
hacky patching scripts that might break with the next Spark point release
if at all possible.


How to have Kafka Direct Consumers show up in Kafka Consumer reporting?

2015-11-23 Thread swetha
Hi,

We see a bunch of issues like the following in Our Spark Kafka Direct. Any
idea  as to how make Kafka Direct Consumers show up in Kafka Consumer
reporting to debug this issue?


Job aborted due to stage failure: Task 47 in stage 336.0 failed 4 times,
most recent failure: Lost task 47.3 in stage 336.0 (TID 5283, 10.227.64.52):
java.lang.AssertionError: assertion failed: Ran out of messages before
reaching ending offset 225474235 for topic hubble_stream partition 55 start
225467496. This should not happen, and indicates that messages may have been
lost



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Kafka-Direct-Consumers-show-up-in-Kafka-Consumer-reporting-tp25457.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dataframe constructor

2015-11-23 Thread Fengdong Yu
just simple as:

val df = sqlContext.sql(“select * from table”)
or

val df = sqlContext.read.json(“hdfs_path”)




> On Nov 24, 2015, at 3:09 AM, spark_user_2015  wrote:
> 
> Dear all,
> 
> is the following usage of the Dataframe constructor correct or does it
> trigger any side effects that I should be aware of?
> 
> My goal is to keep track of my dataframe's state and allow custom
> transformations accordingly.
> 
>  val df: Dataframe = ...some dataframe...
>  val newDf = new DF(df.sqlContext, df.queryExecution.logical) with StateA 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-constructor-tp25455.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to have Kafka Direct Consumers show up in Kafka Consumer reporting?

2015-11-23 Thread Cody Koeninger
What exactly do you mean by kafka consumer reporting?

I'd log the offsets in your spark job and try running

kafka-simple-consumer-shell.sh --partition $yourbadpartition --print-offsets

at the same time your spark job is running

On Mon, Nov 23, 2015 at 7:37 PM, swetha  wrote:

> Hi,
>
> We see a bunch of issues like the following in Our Spark Kafka Direct. Any
> idea  as to how make Kafka Direct Consumers show up in Kafka Consumer
> reporting to debug this issue?
>
>
> Job aborted due to stage failure: Task 47 in stage 336.0 failed 4 times,
> most recent failure: Lost task 47.3 in stage 336.0 (TID 5283,
> 10.227.64.52):
> java.lang.AssertionError: assertion failed: Ran out of messages before
> reaching ending offset 225474235 for topic hubble_stream partition 55 start
> 225467496. This should not happen, and indicates that messages may have
> been
> lost
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Kafka-Direct-Consumers-show-up-in-Kafka-Consumer-reporting-tp25457.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Port Control for YARN-Aware Spark

2015-11-23 Thread gpriestley
Hello Community, 

I have what I hope to be a couple of quick questions regarding port control
on Spark which is Yarn-aware (cluster & client modes).  I'm aware that I can
control port configurations by setting driver.port, executor.port, etc to
use specified ports, but I'm not sure how/if that correlates to Spark when
it's executed in Yarn-Cluster mode and/or Yarn-Client mode.

Questions I have are:
1) How does the spark.yarn.am.port relate to defined ports within Spark
(driver, executor, block manager, etc.)?
2) Doe the spark.yarn.am.port parameter only relate to the spark
driver.port?
3) Is the spark.yarn.am.port applicable to Yarn-Cluster or Yarn-Client modes
or both?

Ultimately, I'm trying to remove a lot of the randomness of ports to avoid
potential conflicts.  This may either be controlled via a specified port..
or range of ports.

Cheers,
Grant



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Port-Control-for-YARN-Aware-Spark-tp25458.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Apache Cassandra Docker Images?

2015-11-23 Thread Renato Perini

Hello,
any planned support for official Docker images?
Would be great having some images using the cluster manager of choice 
(Standalone, Yarn, Mesos) with the latest Apache Spark distribution 
(ideally, using CentOS 7.x) for clusterizable containers.


Regards,
Renato Perini.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: A Problem About Running Spark 1.5 on YARN with Dynamic Alloction

2015-11-23 Thread 谢廷稳
Hi Saisai,
I'm sorry for did not describe it clearly,YARN debug log said I have 50
executors,but ResourceManager showed that I only have 1 container for the
AppMaster.

I have checked YARN RM logs,after AppMaster changed state from ACCEPTED to
RUNNING,it did not have log about this job any more.So,the problem is I did
not have any executor but ExecutorAllocationManager think I have.Would you
minding having a test in your cluster environment?
Thanks,
Weber

2015-11-24 11:00 GMT+08:00 Saisai Shao :

> I think this behavior is expected, since you already have 50 executors
> launched, so no need to acquire additional executors. You change is not
> solid, it is just hiding the log.
>
> Again I think you should check the logs of Yarn and Spark to see if
> executors are started correctly. Why resource is still not enough where you
> already have 50 executors.
>
> On Tue, Nov 24, 2015 at 10:48 AM, 谢廷稳  wrote:
>
>> Hi SaiSai,
>> I have changed  "if (numExecutorsTarget >= maxNumExecutors)"  to "if
>> (numExecutorsTarget > maxNumExecutors)" of the first line in the
>> ExecutorAllocationManager#addExecutors() and it rans well.
>> In my opinion,when I was set minExecutors equals maxExecutors,when the
>> first time to add Executors,numExecutorsTarget equals maxNumExecutors and
>> it repeat printe "DEBUG ExecutorAllocationManager: Not adding executors
>> because our current target total is already 50 (limit 50)".
>> Thanks
>> Weber
>>
>> 2015-11-23 21:00 GMT+08:00 Saisai Shao :
>>
>>> Hi Tingwen,
>>>
>>> Would you minding sharing your changes in
>>> ExecutorAllocationManager#addExecutors().
>>>
>>> From my understanding and test, dynamic allocation can be worked when
>>> you set the min to max number of executors to the same number.
>>>
>>> Please check your Spark and Yarn log to make sure the executors are
>>> correctly started, the warning log means currently resource is not enough
>>> to submit tasks.
>>>
>>> Thanks
>>> Saisai
>>>
>>>
>>> On Mon, Nov 23, 2015 at 8:41 PM, 谢廷稳  wrote:
>>>
 Hi all,
 I ran a SparkPi on YARN with Dynamic Allocation enabled and set 
 spark.dynamicAllocation.maxExecutors
 equals
 spark.dynamicAllocation.minExecutors,then I submit an application using:
 ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
 yarn-cluster --driver-memory 4g --executor-memory 8g
 lib/spark-examples*.jar 200

 then, this application was submitted successfully, but the AppMaster
 always saying “15/11/23 20:13:08 WARN cluster.YarnClusterScheduler:
 Initial job has not accepted any resources; check your cluster UI to ensure
 that workers are registered and have sufficient resources”
 and when I open DEBUG,I found “15/11/23 20:24:00 DEBUG
 ExecutorAllocationManager: Not adding executors because our current target
 total is already 50 (limit 50)” in the console.

 I have fixed it by modifying code in
 ExecutorAllocationManager.addExecutors,Does this a bug or it was designed
 that we can’t set maxExecutors equals minExecutors?

 Thanks,
 Weber

>>>
>>>
>>
>


Re: Relation between RDDs, DataFrames and Project Tungsten

2015-11-23 Thread Michael Armbrust
Here is how I view the relationship between the various components of Spark:

 - *RDDs - *a low level API for expressing DAGs that will be executed in
parallel by Spark workers
 - *Catalyst -* an internal library for expressing trees that we use to
build relational algebra and expression evaluation.  There's also an
optimizer and query planner than turns these into logical concepts into RDD
actions.
 - *Tungsten -* an internal optimized execution engine that can compile
catalyst expressions into efficient java bytecode that operates directly on
serialized binary data.  It also has nice low level data structures /
algorithms like hash tables and sorting that operate directly on serialized
data.  These are used by the physical nodes that are produced by the query
planner (and run inside of RDD operation on workers).
 - *DataFrames - *a user facing API that is similar to SQL/LINQ for
constructing dataflows that are backed by catalyst logical plans
 - *Datasets* - a user facing API that is similar to the RDD API for
constructing dataflows that are backed by catalyst logical plans

So everything is still operating on RDDs but I anticipate most users will
eventually migrate to the higher level APIs for convenience and automatic
optimization

On Mon, Nov 23, 2015 at 4:18 PM, Jakob Odersky  wrote:

> Hi everyone,
>
> I'm doing some reading-up on all the newer features of Spark such as
> DataFrames, DataSets and Project Tungsten. This got me a bit confused on
> the relation between all these concepts.
>
> When starting to learn Spark, I read a book and the original paper on
> RDDs, this lead me to basically think "Spark == RDDs".
> Now, looking into DataFrames, I read that they are basically (distributed)
> collections with an associated schema, thus enabling declarative queries
> and optimization (through Catalyst). I am uncertain how DataFrames relate
> to RDDs: are DataFrames transformed to operations on RDDs once they have
> been optimized? Or are they completely different concepts? In case of the
> latter, do DataFrames still use the Spark scheduler and get broken down
> into a DAG of stages and tasks?
>
> Regarding project Tungsten, where does it fit in? To my understanding it
> is used to efficiently cache data in memory and may also be used to
> generate query code for specialized hardware. This sounds as though it
> would work on Spark's worker nodes, however it would also only work with
> schema-associated data (aka DataFrames), thus leading me to the conclusion
> that RDDs and DataFrames do not share a common backend which in turn
> contradicts my conception of "Spark == RDDs".
>
> Maybe I missed the obvious as these questions seem pretty basic, however I
> was unable to find clear answers in Spark documentation or related papers
> and talks. I would greatly appreciate any clarifications.
>
> thanks,
> --Jakob
>


Re: How to have Kafka Direct Consumers show up in Kafka Consumer reporting?

2015-11-23 Thread swetha kasireddy
I mean to show the Spark Kafka Direct consumers in Kafka Stream UI. Usually
we create a consumer and the consumer gets shown in the Kafka Stream UI.
How do I log the offsets in the Spark Job?

On Mon, Nov 23, 2015 at 6:11 PM, Cody Koeninger  wrote:

> What exactly do you mean by kafka consumer reporting?
>
> I'd log the offsets in your spark job and try running
>
> kafka-simple-consumer-shell.sh --partition $yourbadpartition
> --print-offsets
>
> at the same time your spark job is running
>
> On Mon, Nov 23, 2015 at 7:37 PM, swetha  wrote:
>
>> Hi,
>>
>> We see a bunch of issues like the following in Our Spark Kafka Direct. Any
>> idea  as to how make Kafka Direct Consumers show up in Kafka Consumer
>> reporting to debug this issue?
>>
>>
>> Job aborted due to stage failure: Task 47 in stage 336.0 failed 4 times,
>> most recent failure: Lost task 47.3 in stage 336.0 (TID 5283,
>> 10.227.64.52):
>> java.lang.AssertionError: assertion failed: Ran out of messages before
>> reaching ending offset 225474235 for topic hubble_stream partition 55
>> start
>> 225467496. This should not happen, and indicates that messages may have
>> been
>> lost
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Kafka-Direct-Consumers-show-up-in-Kafka-Consumer-reporting-tp25457.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to have Kafka Direct Consumers show up in Kafka Consumer reporting?

2015-11-23 Thread swetha kasireddy
Also, does Kafka direct query the offsets from the zookeeper directly? From
where does it get the offsets? There is data in those offsets, but somehow
Kafka Direct does not seem to pick it up?

On Mon, Nov 23, 2015 at 6:18 PM, swetha kasireddy  wrote:

> I mean to show the Spark Kafka Direct consumers in Kafka Stream UI.
> Usually we create a consumer and the consumer gets shown in the Kafka
> Stream UI. How do I log the offsets in the Spark Job?
>
> On Mon, Nov 23, 2015 at 6:11 PM, Cody Koeninger 
> wrote:
>
>> What exactly do you mean by kafka consumer reporting?
>>
>> I'd log the offsets in your spark job and try running
>>
>> kafka-simple-consumer-shell.sh --partition $yourbadpartition
>> --print-offsets
>>
>> at the same time your spark job is running
>>
>> On Mon, Nov 23, 2015 at 7:37 PM, swetha 
>> wrote:
>>
>>> Hi,
>>>
>>> We see a bunch of issues like the following in Our Spark Kafka Direct.
>>> Any
>>> idea  as to how make Kafka Direct Consumers show up in Kafka Consumer
>>> reporting to debug this issue?
>>>
>>>
>>> Job aborted due to stage failure: Task 47 in stage 336.0 failed 4 times,
>>> most recent failure: Lost task 47.3 in stage 336.0 (TID 5283,
>>> 10.227.64.52):
>>> java.lang.AssertionError: assertion failed: Ran out of messages before
>>> reaching ending offset 225474235 for topic hubble_stream partition 55
>>> start
>>> 225467496. This should not happen, and indicates that messages may have
>>> been
>>> lost
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-have-Kafka-Direct-Consumers-show-up-in-Kafka-Consumer-reporting-tp25457.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: A Problem About Running Spark 1.5 on YARN with Dynamic Alloction

2015-11-23 Thread 谢廷稳
Hi SaiSai,
I have changed  "if (numExecutorsTarget >= maxNumExecutors)"  to "if
(numExecutorsTarget > maxNumExecutors)" of the first line in the
ExecutorAllocationManager#addExecutors() and it rans well.
In my opinion,when I was set minExecutors equals maxExecutors,when the
first time to add Executors,numExecutorsTarget equals maxNumExecutors and
it repeat printe "DEBUG ExecutorAllocationManager: Not adding executors
because our current target total is already 50 (limit 50)".
Thanks
Weber

2015-11-23 21:00 GMT+08:00 Saisai Shao :

> Hi Tingwen,
>
> Would you minding sharing your changes in
> ExecutorAllocationManager#addExecutors().
>
> From my understanding and test, dynamic allocation can be worked when you
> set the min to max number of executors to the same number.
>
> Please check your Spark and Yarn log to make sure the executors are
> correctly started, the warning log means currently resource is not enough
> to submit tasks.
>
> Thanks
> Saisai
>
>
> On Mon, Nov 23, 2015 at 8:41 PM, 谢廷稳  wrote:
>
>> Hi all,
>> I ran a SparkPi on YARN with Dynamic Allocation enabled and set 
>> spark.dynamicAllocation.maxExecutors
>> equals
>> spark.dynamicAllocation.minExecutors,then I submit an application using:
>> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
>> yarn-cluster --driver-memory 4g --executor-memory 8g
>> lib/spark-examples*.jar 200
>>
>> then, this application was submitted successfully, but the AppMaster
>> always saying “15/11/23 20:13:08 WARN cluster.YarnClusterScheduler:
>> Initial job has not accepted any resources; check your cluster UI to ensure
>> that workers are registered and have sufficient resources”
>> and when I open DEBUG,I found “15/11/23 20:24:00 DEBUG
>> ExecutorAllocationManager: Not adding executors because our current target
>> total is already 50 (limit 50)” in the console.
>>
>> I have fixed it by modifying code in
>> ExecutorAllocationManager.addExecutors,Does this a bug or it was designed
>> that we can’t set maxExecutors equals minExecutors?
>>
>> Thanks,
>> Weber
>>
>
>


Re: Relation between RDDs, DataFrames and Project Tungsten

2015-11-23 Thread Xiao Li
Let me share my understanding.

If we view Spark as analytics OS, RDD APIs are like OS system calls. These
low-level system calls can be called in the program languages like C.
DataFrame and Dataset APIs are like higher-level programming languages.
They hide the low level complexity and the compiler (i.e., Catalyst) will
optimize your programs. For most users, the SQL, DataFrame and Dataset APIs
are good enough to satisfy most requirements.

In the near future, I guess GUI interfaces of Spark will be available soon.
Spark users (e.g, CEOs) might not need to know what are RDDs at all. They
can analyze their data by clicking a few buttons, instead of writing the
programs. : )

Wish Spark will be the most popular analytics OS in the world! : )

Have a good holiday everyone!

Xiao Li



2015-11-23 17:56 GMT-08:00 Jakob Odersky :

> Thanks Michael, that helped me a lot!
>
> On 23 November 2015 at 17:47, Michael Armbrust 
> wrote:
>
>> Here is how I view the relationship between the various components of
>> Spark:
>>
>>  - *RDDs - *a low level API for expressing DAGs that will be executed in
>> parallel by Spark workers
>>  - *Catalyst -* an internal library for expressing trees that we use to
>> build relational algebra and expression evaluation.  There's also an
>> optimizer and query planner than turns these into logical concepts into RDD
>> actions.
>>  - *Tungsten -* an internal optimized execution engine that can compile
>> catalyst expressions into efficient java bytecode that operates directly on
>> serialized binary data.  It also has nice low level data structures /
>> algorithms like hash tables and sorting that operate directly on serialized
>> data.  These are used by the physical nodes that are produced by the query
>> planner (and run inside of RDD operation on workers).
>>  - *DataFrames - *a user facing API that is similar to SQL/LINQ for
>> constructing dataflows that are backed by catalyst logical plans
>>  - *Datasets* - a user facing API that is similar to the RDD API for
>> constructing dataflows that are backed by catalyst logical plans
>>
>> So everything is still operating on RDDs but I anticipate most users will
>> eventually migrate to the higher level APIs for convenience and automatic
>> optimization
>>
>> On Mon, Nov 23, 2015 at 4:18 PM, Jakob Odersky 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> I'm doing some reading-up on all the newer features of Spark such as
>>> DataFrames, DataSets and Project Tungsten. This got me a bit confused on
>>> the relation between all these concepts.
>>>
>>> When starting to learn Spark, I read a book and the original paper on
>>> RDDs, this lead me to basically think "Spark == RDDs".
>>> Now, looking into DataFrames, I read that they are basically
>>> (distributed) collections with an associated schema, thus enabling
>>> declarative queries and optimization (through Catalyst). I am uncertain how
>>> DataFrames relate to RDDs: are DataFrames transformed to operations on RDDs
>>> once they have been optimized? Or are they completely different concepts?
>>> In case of the latter, do DataFrames still use the Spark scheduler and get
>>> broken down into a DAG of stages and tasks?
>>>
>>> Regarding project Tungsten, where does it fit in? To my understanding it
>>> is used to efficiently cache data in memory and may also be used to
>>> generate query code for specialized hardware. This sounds as though it
>>> would work on Spark's worker nodes, however it would also only work with
>>> schema-associated data (aka DataFrames), thus leading me to the conclusion
>>> that RDDs and DataFrames do not share a common backend which in turn
>>> contradicts my conception of "Spark == RDDs".
>>>
>>> Maybe I missed the obvious as these questions seem pretty basic, however
>>> I was unable to find clear answers in Spark documentation or related papers
>>> and talks. I would greatly appreciate any clarifications.
>>>
>>> thanks,
>>> --Jakob
>>>
>>
>>
>


Re: Relation between RDDs, DataFrames and Project Tungsten

2015-11-23 Thread Jakob Odersky
Thanks Michael, that helped me a lot!

On 23 November 2015 at 17:47, Michael Armbrust 
wrote:

> Here is how I view the relationship between the various components of
> Spark:
>
>  - *RDDs - *a low level API for expressing DAGs that will be executed in
> parallel by Spark workers
>  - *Catalyst -* an internal library for expressing trees that we use to
> build relational algebra and expression evaluation.  There's also an
> optimizer and query planner than turns these into logical concepts into RDD
> actions.
>  - *Tungsten -* an internal optimized execution engine that can compile
> catalyst expressions into efficient java bytecode that operates directly on
> serialized binary data.  It also has nice low level data structures /
> algorithms like hash tables and sorting that operate directly on serialized
> data.  These are used by the physical nodes that are produced by the query
> planner (and run inside of RDD operation on workers).
>  - *DataFrames - *a user facing API that is similar to SQL/LINQ for
> constructing dataflows that are backed by catalyst logical plans
>  - *Datasets* - a user facing API that is similar to the RDD API for
> constructing dataflows that are backed by catalyst logical plans
>
> So everything is still operating on RDDs but I anticipate most users will
> eventually migrate to the higher level APIs for convenience and automatic
> optimization
>
> On Mon, Nov 23, 2015 at 4:18 PM, Jakob Odersky  wrote:
>
>> Hi everyone,
>>
>> I'm doing some reading-up on all the newer features of Spark such as
>> DataFrames, DataSets and Project Tungsten. This got me a bit confused on
>> the relation between all these concepts.
>>
>> When starting to learn Spark, I read a book and the original paper on
>> RDDs, this lead me to basically think "Spark == RDDs".
>> Now, looking into DataFrames, I read that they are basically
>> (distributed) collections with an associated schema, thus enabling
>> declarative queries and optimization (through Catalyst). I am uncertain how
>> DataFrames relate to RDDs: are DataFrames transformed to operations on RDDs
>> once they have been optimized? Or are they completely different concepts?
>> In case of the latter, do DataFrames still use the Spark scheduler and get
>> broken down into a DAG of stages and tasks?
>>
>> Regarding project Tungsten, where does it fit in? To my understanding it
>> is used to efficiently cache data in memory and may also be used to
>> generate query code for specialized hardware. This sounds as though it
>> would work on Spark's worker nodes, however it would also only work with
>> schema-associated data (aka DataFrames), thus leading me to the conclusion
>> that RDDs and DataFrames do not share a common backend which in turn
>> contradicts my conception of "Spark == RDDs".
>>
>> Maybe I missed the obvious as these questions seem pretty basic, however
>> I was unable to find clear answers in Spark documentation or related papers
>> and talks. I would greatly appreciate any clarifications.
>>
>> thanks,
>> --Jakob
>>
>
>


Re: Any workaround for Kafka couldn't find leaders for set?

2015-11-23 Thread Cody Koeninger
If you really want to just not process the bad topicpartitions, you can use
the version of createDirectStream that takes

fromOffsets: Map[TopicAndPartition, Long]

and exclude the broken topicpartitions from the map.

On Mon, Nov 23, 2015 at 4:54 PM, Hudong Wang  wrote:

> Hi folks,
>
> We have a 10 node cluster and have several topics. Each topic has about
> 256 partitions with 3 replica factor. Now we run into an issue that in some
> topic, a few partition (< 10)'s leader is -1 and all of them has only one
> synced partition.
>
> Exception in thread "main" org.apache.spark.SparkException:
> org.apache.spark.SparkException: Couldn't find leaders for Set([xxx,251],
> [xxx,253], [xxx,53], [xxx,161], [xxx,71], [xxx,163], [xxx,73])
> at
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:416)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:416)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:415)
>
> Is there any workaround so I can still createDirectStream with bad sets in
> system?
>
> Many thanks!
> Tony
>


Re: spark-ec2 script to launch cluster running Spark 1.5.2 built with HIVE?

2015-11-23 Thread Nicholas Chammas
Don't the Hadoop builds include Hive already? Like
spark-1.5.2-bin-hadoop2.6.tgz?

On Mon, Nov 23, 2015 at 7:49 PM Jeff Schecter  wrote:

> Hi all,
>
> As far as I can tell, the bundled spark-ec2 script provides no way to
> launch a cluster running Spark 1.5.2 pre-built with HIVE.
>
> That is to say, all of the pre-build versions of Spark 1.5.2 in the s3 bin
> spark-related-packages are missing HIVE.
>
> aws s3 ls s3://spark-related-packages/ | grep 1.5.2
>
>
> Am I missing something here? I'd rather avoid resorting to whipping up
> hacky patching scripts that might break with the next Spark point release
> if at all possible.
>


Re: A Problem About Running Spark 1.5 on YARN with Dynamic Alloction

2015-11-23 Thread Saisai Shao
I think this behavior is expected, since you already have 50 executors
launched, so no need to acquire additional executors. You change is not
solid, it is just hiding the log.

Again I think you should check the logs of Yarn and Spark to see if
executors are started correctly. Why resource is still not enough where you
already have 50 executors.

On Tue, Nov 24, 2015 at 10:48 AM, 谢廷稳  wrote:

> Hi SaiSai,
> I have changed  "if (numExecutorsTarget >= maxNumExecutors)"  to "if
> (numExecutorsTarget > maxNumExecutors)" of the first line in the
> ExecutorAllocationManager#addExecutors() and it rans well.
> In my opinion,when I was set minExecutors equals maxExecutors,when the
> first time to add Executors,numExecutorsTarget equals maxNumExecutors and
> it repeat printe "DEBUG ExecutorAllocationManager: Not adding executors
> because our current target total is already 50 (limit 50)".
> Thanks
> Weber
>
> 2015-11-23 21:00 GMT+08:00 Saisai Shao :
>
>> Hi Tingwen,
>>
>> Would you minding sharing your changes in
>> ExecutorAllocationManager#addExecutors().
>>
>> From my understanding and test, dynamic allocation can be worked when you
>> set the min to max number of executors to the same number.
>>
>> Please check your Spark and Yarn log to make sure the executors are
>> correctly started, the warning log means currently resource is not enough
>> to submit tasks.
>>
>> Thanks
>> Saisai
>>
>>
>> On Mon, Nov 23, 2015 at 8:41 PM, 谢廷稳  wrote:
>>
>>> Hi all,
>>> I ran a SparkPi on YARN with Dynamic Allocation enabled and set 
>>> spark.dynamicAllocation.maxExecutors
>>> equals
>>> spark.dynamicAllocation.minExecutors,then I submit an application using:
>>> ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master
>>> yarn-cluster --driver-memory 4g --executor-memory 8g
>>> lib/spark-examples*.jar 200
>>>
>>> then, this application was submitted successfully, but the AppMaster
>>> always saying “15/11/23 20:13:08 WARN cluster.YarnClusterScheduler:
>>> Initial job has not accepted any resources; check your cluster UI to ensure
>>> that workers are registered and have sufficient resources”
>>> and when I open DEBUG,I found “15/11/23 20:24:00 DEBUG
>>> ExecutorAllocationManager: Not adding executors because our current target
>>> total is already 50 (limit 50)” in the console.
>>>
>>> I have fixed it by modifying code in
>>> ExecutorAllocationManager.addExecutors,Does this a bug or it was designed
>>> that we can’t set maxExecutors equals minExecutors?
>>>
>>> Thanks,
>>> Weber
>>>
>>
>>
>


Re: Port Control for YARN-Aware Spark

2015-11-23 Thread Marcelo Vanzin
On Mon, Nov 23, 2015 at 6:24 PM, gpriestley  wrote:
> Questions I have are:
> 1) How does the spark.yarn.am.port relate to defined ports within Spark
> (driver, executor, block manager, etc.)?
> 2) Doe the spark.yarn.am.port parameter only relate to the spark
> driver.port?
> 3) Is the spark.yarn.am.port applicable to Yarn-Cluster or Yarn-Client modes
> or both?

All the "yarn.am" options are specific to the client-mode application
master (so they don't affect cluster mode), and are unrelated to any
other Spark service (such as the block manager).

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Don Drake
I'm seeing similar slowness in saveAsTextFile(), but only in Python.

I'm sorting data in a dataframe, then transform it and get a RDD, and then
coalesce(1).saveAsTextFile().

I converted the Python to Scala and the run-times were similar, except for
the saveAsTextFile() stage.  The scala version was much faster.

When looking at the executor logs during that stage, I see the following
when running the Scala code:

15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Getting 600
non-empty blocks out of 600 blocks

15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Started 184
remote fetches in 64 ms

15/11/23 20:51:30 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (0  time so far)

15/11/23 20:51:35 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (1  time so far)

15/11/23 20:51:40 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (2  times so far)

15/11/23 20:51:45 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (3  times so far)

15/11/23 20:51:50 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (4  times so far)

15/11/23 20:51:54 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (5  times so far)

15/11/23 20:51:59 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (6  times so far)

15/11/23 20:52:04 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (7  times so far)

15/11/23 20:52:09 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (8  times so far)



When running the Python version during the saveAsTextFile() stage, I see:

15/11/23 21:04:03 INFO python.PythonRunner: Times: total = 16190, boot = 5,
init = 144, finish = 16041

15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 82 ms

15/11/23 21:04:15 INFO python.PythonRunner: Times: total = 12180, boot =
-415, init = 447, finish = 12148

15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 129 ms

15/11/23 21:04:27 INFO python.PythonRunner: Times: total = 11450, boot =
-372, init = 398, finish = 11424

15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 70 ms

15/11/23 21:04:42 INFO python.PythonRunner: Times: total = 14480, boot =
-378, init = 403, finish = 14455

15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 62 ms

15/11/23 21:04:54 INFO python.PythonRunner: Times: total = 11868, boot =
-366, init = 381, finish = 11853

15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 59 ms

15/11/23 21:05:10 INFO python.PythonRunner: Times: total = 15375, boot =
-392, init = 403, finish = 15364

15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 48 ms


The python version is approximately 10 times slower than the Scala
version.  Any ideas why?


-Don

On Mon, Nov 23, 2015 at 4:31 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Xiao and Sabarish
>
> Using the Stage tab on the UI. It turns out you can see how many
> partitions there are. If I did nothing I would have 228155 partition.
> (This confirms what Sabarish said). I tried coalesce(3). RDD.count()
> fails. I though given I have 3 workers and 1/3 of the data would easily
> fit into memory this would be a good choice.
>
> If I use coalesce(30) count works. How ever it still seems slow. It took
> 2.42 min to read 4720 records. My total data set size is 34M.
>
> Any suggestions how to choose the number of partitions.?
>
>  ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G')
>
>
> The data was originally collected using spark stream. I noticed that the
> number of default partitions == the number of files create on hdfs. I bet
> each file is one spark streaming mini-batchI suspect if I concatenate
> these into a small number of files things will run much faster. I suspect
> I would not need to call coalesce() and that coalesce() is taking a lot of
> time. Any suggestions how to choose the file number of files.
>
> Kind regards
>
> Andy
>
>
> From:  Xiao Li 
> Date:  Monday, November 23, 2015 at 

Re: RDD partition after calling mapToPair

2015-11-23 Thread Thúy Hằng Lê
Thanks Cody,

I still have concerns about this.
What's do you mean by saying Spark direct stream doesn't have a default
partitioner? Could you please help me to explain more?

When i assign 20 cores to 20 Kafka partitions, I am expecting each core
will work on a partition. Is it correct?

I'm still couldn't figure out how RDD will be partitioned after mapToPair
function. It would be great if you could brieftly explain ( or send me some
document, i couldnt find it) about how shuffle work on mapToPair function.

Thank you very much.
On Nov 23, 2015 12:26 AM, "Cody Koeninger"  wrote:

> Spark direct stream doesn't have a default partitioner.
>
> If you know that you want to do an operation on keys that are already
> partitioned by kafka, just use mapPartitions or foreachPartition to avoid a
> shuffle.
>
> On Sat, Nov 21, 2015 at 11:46 AM, trung kien  wrote:
>
>> Hi all,
>>
>> I am having problem of understanding how RDD will be partitioned after
>> calling mapToPair function.
>> Could anyone give me more information about parititoning in this function?
>>
>> I have a simple application doing following job:
>>
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream(...)
>>
>> JavaPairDStream stats = messages.mapToPair(JSON_DECODE)
>>
>> .reduceByKey(SUM);
>>
>> saveToDB(stats)
>>
>> I setup 2 workers (each dedicate 20 cores) for this job.
>> My kafka topic has 40 partitions (I want each core handle a partition),
>> and the messages send to queue are partitioned by the same key as mapToPair
>> function.
>> I'm using default Partitioner of both Kafka and Sprark.
>>
>> Ideally, I shouldn't see the data shuffle between cores in mapToPair
>> stage, right?
>> However, in my Spark UI, I see that the "Locality Level" for this stage
>> is "ANY", which means data need to be transfered.
>> Any comments on this?
>>
>> --
>> Thanks
>> Kien
>>
>
>


A question about sql clustering

2015-11-23 Thread Cesar Flores
Let's assume that I have a code like the following:

val sqlQuery = "select * from whse.table_a cluster by user_id"
val df = hc.sql(sqlQuery)

My understanding is that the cluster function will partition the data frame
by user_id and also sort inside each partition (something very useful for
performing joins later). Is that true?

And second question, if I save *df* just after the query into a hive table,
when I reload this table from hive, does spark will remember the
partitioning?

I am using at the moment 1.3.1 spark version.

Thanks
-- 
Cesar Flores


Spark Kafka Direct Error

2015-11-23 Thread swetha
Hi,

I see the following error in my Spark Kafka Direct. Would this mean that
Kafka Direct is not able to catch up with the messages and is failing?

Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
most recent failure: Lost task 20.3 in stage 117.0 (TID 2114, 10.227.64.52):
java.lang.AssertionError: assertion failed: Ran out of messages before
reaching ending offset 221572238 for topic hubble_stream partition 88 start
221563725. This should not happen, and indicates that messages may have been
lost

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Kafka Direct Error

2015-11-23 Thread Cody Koeninger
No, that means that at the time the batch was scheduled, the kafka leader
reported the ending offset was 221572238, but during processing, kafka
stopped returning messages before reaching that ending offset.

That probably means something got screwed up with Kafka - e.g. you lost a
leader and lost messages in the process.

On Mon, Nov 23, 2015 at 12:57 PM, swetha  wrote:

> Hi,
>
> I see the following error in my Spark Kafka Direct. Would this mean that
> Kafka Direct is not able to catch up with the messages and is failing?
>
> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
> 10.227.64.52):
> java.lang.AssertionError: assertion failed: Ran out of messages before
> reaching ending offset 221572238 for topic hubble_stream partition 88 start
> 221563725. This should not happen, and indicates that messages may have
> been
> lost
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Dataframe constructor

2015-11-23 Thread spark_user_2015
Dear all,

is the following usage of the Dataframe constructor correct or does it
trigger any side effects that I should be aware of?

My goal is to keep track of my dataframe's state and allow custom
transformations accordingly.

  val df: Dataframe = ...some dataframe...
  val newDf = new DF(df.sqlContext, df.queryExecution.logical) with StateA 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-constructor-tp25455.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting different DESCRIBE results between SparkSQL and Hive

2015-11-23 Thread YaoPau
Example below.  The partition columns show up as regular columns.

I'll note that SHOW PARTITIONS works correctly in Spark SQL, so it's aware
of the partitions but it does not show them in DESCRIBE.



In Hive: "DESCRIBE pub.inventory_daily"

[['effective_date', 'string', ''],
 ['listing_skey', 'int', ''],
 ['car_durable_key', 'int', ''],
 ['car_id', 'int', ''],
 ['# Partition Information', 'NULL', 'NULL'],
 ['# col_name', 'data_type   ', 'comment '],
 ['', 'NULL', 'NULL'],
 ['year', 'smallint', ''],
 ['month', 'smallint', ''],
 ['day', 'smallint', '']]

In SparkSQL: hc.sql("DESCRIBE pub.inventory_daily").collect()

[Row(col_name=u'effective_date', data_type=u'string', comment=u''),
 Row(col_name=u'listing_skey', data_type=u'int', comment=u''),
 Row(col_name=u'car_durable_key', data_type=u'int', comment=u''),
 Row(col_name=u'car_id', data_type=u'int', comment=u''),
 Row(col_name=u'year', data_type=u'smallint', comment=u''),
 Row(col_name=u'month', data_type=u'smallint', comment=u''),
 Row(col_name=u'day', data_type=u'smallint', comment=u'')]





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-different-DESCRIBE-results-between-SparkSQL-and-Hive-tp25452.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD partition after calling mapToPair

2015-11-23 Thread Cody Koeninger
Partitioner is an optional field when defining an rdd.  KafkaRDD doesn't
define one, so you can't really assume anything about the way it's
partitioned, because spark doesn't know anything about the way it's
partitioned.  If you want to rely on some property of how things were
partitioned as they were being produced into kafka, you need to do
foreachPartition or mapPartition yourself.  Otherwise, spark will do a
shuffle for any operation that would ordinarily require a shuffle, even if
keys are already in the "right" place.

Regarding the assignment of cores to partitions, that's not really
accurate.  Each kafka partition will correspond to a spark partition.  If
you do an operation that shuffles, that relationship no longer holds true.
Even if you're doing a straight map operation without a shuffle, you will
probably get 1 executor core working on 1 partition, but there's no
guarantee the scheduler will do that, and no guarantee it'll be the same
core / partition relationship for the next batch.


On Mon, Nov 23, 2015 at 9:01 AM, Thúy Hằng Lê  wrote:

> Thanks Cody,
>
> I still have concerns about this.
> What's do you mean by saying Spark direct stream doesn't have a default
> partitioner? Could you please help me to explain more?
>
> When i assign 20 cores to 20 Kafka partitions, I am expecting each core
> will work on a partition. Is it correct?
>
> I'm still couldn't figure out how RDD will be partitioned after mapToPair
> function. It would be great if you could brieftly explain ( or send me some
> document, i couldnt find it) about how shuffle work on mapToPair function.
>
> Thank you very much.
> On Nov 23, 2015 12:26 AM, "Cody Koeninger"  wrote:
>
>> Spark direct stream doesn't have a default partitioner.
>>
>> If you know that you want to do an operation on keys that are already
>> partitioned by kafka, just use mapPartitions or foreachPartition to avoid a
>> shuffle.
>>
>> On Sat, Nov 21, 2015 at 11:46 AM, trung kien  wrote:
>>
>>> Hi all,
>>>
>>> I am having problem of understanding how RDD will be partitioned after
>>> calling mapToPair function.
>>> Could anyone give me more information about parititoning in this
>>> function?
>>>
>>> I have a simple application doing following job:
>>>
>>> JavaPairInputDStream messages =
>>> KafkaUtils.createDirectStream(...)
>>>
>>> JavaPairDStream stats = messages.mapToPair(JSON_DECODE)
>>>
>>> .reduceByKey(SUM);
>>>
>>> saveToDB(stats)
>>>
>>> I setup 2 workers (each dedicate 20 cores) for this job.
>>> My kafka topic has 40 partitions (I want each core handle a partition),
>>> and the messages send to queue are partitioned by the same key as mapToPair
>>> function.
>>> I'm using default Partitioner of both Kafka and Sprark.
>>>
>>> Ideally, I shouldn't see the data shuffle between cores in mapToPair
>>> stage, right?
>>> However, in my Spark UI, I see that the "Locality Level" for this stage
>>> is "ANY", which means data need to be transfered.
>>> Any comments on this?
>>>
>>> --
>>> Thanks
>>> Kien
>>>
>>
>>


spark 1.4.1 to oracle 11g write to an existing table

2015-11-23 Thread Siva Gudavalli
Hi,

I am trying to write a dataframe from Spark 1.4.1 to oracle 11g

I am using

dataframe.write.mode(SaveMode.Append).jdbc(url,tablename, properties)

this is always trying to create a Table.

I would like to insert records to an existing table instead of creating a
new one each single time. Please help

Let  me know if you need other details

Regards
Shiv