Best way to store Avro Objects as Parquet using SPARK

2016-03-20 Thread Manivannan Selvadurai
Hi All,

  In my current project there is a requirement to store avro data
(json format) as parquet files.
I was able to use AvroParquetWriter in separately to create the Parquet
Files. The parquet files along with the data also had the 'avro schema'
stored on them as a part of their footer.

   But when tired using Spark streamng I could not find a way to
store the data with the avro schema information. The closest that I got was
to create a Dataframe using the json RDDs and store them as parquet. Here
the parquet files had a spark specific schema in their footer.

  Is this the right approach or do I have a better one. Please guide me.


We are using Spark 1.4.1.

Thanks In Advance!!


Re: best practices: running multi user jupyter notebook server

2016-03-20 Thread charles li
Hi, andy, I think you can make that with some open source packages/libs
built for IPython and  Spark.

here is one : https://github.com/litaotao/IPython-Dashboard

On Thu, Mar 17, 2016 at 1:36 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> We are considering deploying a notebook server for use by two kinds of
> users
>
>
>1. interactive dashboard.
>   1. I.e. Forms allow users to select data sets and visualizations
>   2. Review real time graphs of data captured by our spark streams
>2. General notebooks for Data Scientists
>
>
> My concern is interactive spark jobs can can consume a lot of cluster
> resource and many users may be sloppy/lazy. I.E. Just kill their browsers
> instead of shutting down their notebooks cleanly
>
> What are best practices?
>
>
> Kind regards
>
> Andy
>



-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: Error using collectAsMap() in scala

2016-03-20 Thread Shishir Anshuman
I have stored the contents of two csv files in separate RDDs.

file1.csv format*: (column1,column2,column3)*
file2.csv format*: (column1, column2)*

*column1 of file1 *and* column2 of file2 *contains similar data. I want to
compare the two columns and if match is found:

   - Replace the data at *column1(file1)* with the* column1(file2)*


For this reason, I am not using normal RDD.

I am still new to apache spark, so any suggestion will be greatly
appreciated.

On Mon, Mar 21, 2016 at 10:09 AM, Prem Sure  wrote:

> any specific reason you would like to use collectasmap only? You probably
> move to normal RDD instead of a Pair.
>
>
> On Monday, March 21, 2016, Mark Hamstra  wrote:
>
>> You're not getting what Ted is telling you.  Your `dict` is an
>> RDD[String]  -- i.e. it is a collection of a single value type, String.
>> But `collectAsMap` is only defined for PairRDDs that have key-value pairs
>> for their data elements.  Both a key and a value are needed to collect into
>> a Map[K, V].
>>
>> On Sun, Mar 20, 2016 at 8:19 PM, Shishir Anshuman <
>> shishiranshu...@gmail.com> wrote:
>>
>>> yes I have included that class in my code.
>>> I guess its something to do with the RDD format. Not able to figure out
>>> the exact reason.
>>>
>>> On Fri, Mar 18, 2016 at 9:27 AM, Ted Yu  wrote:
>>>
 It is defined in:
 core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

 On Thu, Mar 17, 2016 at 8:55 PM, Shishir Anshuman <
 shishiranshu...@gmail.com> wrote:

> I am using following code snippet in scala:
>
>
> *val dict: RDD[String] = sc.textFile("path/to/csv/file")*
> *val dict_broadcast=sc.broadcast(dict.collectAsMap())*
>
> On compiling It generates this error:
>
> *scala:42: value collectAsMap is not a member of
> org.apache.spark.rdd.RDD[String]*
>
>
> *val dict_broadcast=sc.broadcast(dict.collectAsMap())
> ^*
>


>>>
>>


Re: What is the most efficient and scalable way to get all the recommendation results from ALS model ?

2016-03-20 Thread Dhaval Modi
+1
On Mar 21, 2016 09:52, "Hiroyuki Yamada"  wrote:

> Could anyone give me some advices or recommendations or usual ways to do
> this ?
>
> I am trying to get all (probably top 100) product recommendations for each
> user from a model (MatrixFactorizationModel),
> but I haven't figured out yet to do it efficiently.
>
> So far,
> calling predict (predictAll in pyspark) method with user-product matrix
> uses too much memory and couldn't complete due to a lack of memory,
> and
> calling predict for each user (or for each some users like 100 uses or so)
> takes too much time to get all the recommendations.
>
> I am using spark 1.4.1 and running 5-node cluster with 8GB RAM each.
> I only use small-sized data set so far, like about 5 users and 5000
> products with only about 10 ratings.
>
> Thanks.
>
>
> On Sat, Mar 19, 2016 at 7:58 PM, Hiroyuki Yamada 
> wrote:
>
>> Hi,
>>
>> I'm testing Collaborative Filtering with Milib.
>> Making a model by ALS.trainImplicit (or train) seems scalable as far as I
>> have tested,
>> but I'm wondering how I can get all the recommendation results
>> efficiently.
>>
>> The predictAll method can get all the results,
>> but it needs the whole user-product matrix in memory as an input.
>> So if there are 1 million users and 1 million products, then the number
>> of elements is too large (1 million x 1 million)
>> and the amount of memory to hold them is more than a few TB even when the
>> element size in only 4B,
>> which is not a realistic size of memory even now.
>>
>> # (100*100)*4/1000/1000/1000/1000 => near equals 4TB)
>>
>> We can, of course, use predict method per user,
>> but, as far as I tried, it is very slow to get 1 million users' results.
>>
>> Do I miss something ?
>> Are there any other better ways to get all the recommendation results in
>> scalable and efficient way ?
>>
>> Best regards,
>> Hiro
>>
>>
>>
>


Re: Error using collectAsMap() in scala

2016-03-20 Thread Prem Sure
any specific reason you would like to use collectasmap only? You probably
move to normal RDD instead of a Pair.


On Monday, March 21, 2016, Mark Hamstra  wrote:

> You're not getting what Ted is telling you.  Your `dict` is an RDD[String]
>  -- i.e. it is a collection of a single value type, String.  But
> `collectAsMap` is only defined for PairRDDs that have key-value pairs for
> their data elements.  Both a key and a value are needed to collect into a
> Map[K, V].
>
> On Sun, Mar 20, 2016 at 8:19 PM, Shishir Anshuman <
> shishiranshu...@gmail.com
> > wrote:
>
>> yes I have included that class in my code.
>> I guess its something to do with the RDD format. Not able to figure out
>> the exact reason.
>>
>> On Fri, Mar 18, 2016 at 9:27 AM, Ted Yu > > wrote:
>>
>>> It is defined in:
>>> core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
>>>
>>> On Thu, Mar 17, 2016 at 8:55 PM, Shishir Anshuman <
>>> shishiranshu...@gmail.com
>>> > wrote:
>>>
 I am using following code snippet in scala:


 *val dict: RDD[String] = sc.textFile("path/to/csv/file")*
 *val dict_broadcast=sc.broadcast(dict.collectAsMap())*

 On compiling It generates this error:

 *scala:42: value collectAsMap is not a member of
 org.apache.spark.rdd.RDD[String]*


 *val dict_broadcast=sc.broadcast(dict.collectAsMap())
   ^*

>>>
>>>
>>
>


Re: Error using collectAsMap() in scala

2016-03-20 Thread Mark Hamstra
You're not getting what Ted is telling you.  Your `dict` is an RDD[String]
 -- i.e. it is a collection of a single value type, String.  But
`collectAsMap` is only defined for PairRDDs that have key-value pairs for
their data elements.  Both a key and a value are needed to collect into a
Map[K, V].

On Sun, Mar 20, 2016 at 8:19 PM, Shishir Anshuman  wrote:

> yes I have included that class in my code.
> I guess its something to do with the RDD format. Not able to figure out
> the exact reason.
>
> On Fri, Mar 18, 2016 at 9:27 AM, Ted Yu  wrote:
>
>> It is defined in:
>> core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
>>
>> On Thu, Mar 17, 2016 at 8:55 PM, Shishir Anshuman <
>> shishiranshu...@gmail.com> wrote:
>>
>>> I am using following code snippet in scala:
>>>
>>>
>>> *val dict: RDD[String] = sc.textFile("path/to/csv/file")*
>>> *val dict_broadcast=sc.broadcast(dict.collectAsMap())*
>>>
>>> On compiling It generates this error:
>>>
>>> *scala:42: value collectAsMap is not a member of
>>> org.apache.spark.rdd.RDD[String]*
>>>
>>>
>>> *val dict_broadcast=sc.broadcast(dict.collectAsMap())
>>>   ^*
>>>
>>
>>
>


Re: What is the most efficient and scalable way to get all the recommendation results from ALS model ?

2016-03-20 Thread Hiroyuki Yamada
Could anyone give me some advices or recommendations or usual ways to do
this ?

I am trying to get all (probably top 100) product recommendations for each
user from a model (MatrixFactorizationModel),
but I haven't figured out yet to do it efficiently.

So far,
calling predict (predictAll in pyspark) method with user-product matrix
uses too much memory and couldn't complete due to a lack of memory,
and
calling predict for each user (or for each some users like 100 uses or so)
takes too much time to get all the recommendations.

I am using spark 1.4.1 and running 5-node cluster with 8GB RAM each.
I only use small-sized data set so far, like about 5 users and 5000
products with only about 10 ratings.

Thanks.


On Sat, Mar 19, 2016 at 7:58 PM, Hiroyuki Yamada  wrote:

> Hi,
>
> I'm testing Collaborative Filtering with Milib.
> Making a model by ALS.trainImplicit (or train) seems scalable as far as I
> have tested,
> but I'm wondering how I can get all the recommendation results efficiently.
>
> The predictAll method can get all the results,
> but it needs the whole user-product matrix in memory as an input.
> So if there are 1 million users and 1 million products, then the number of
> elements is too large (1 million x 1 million)
> and the amount of memory to hold them is more than a few TB even when the
> element size in only 4B,
> which is not a realistic size of memory even now.
>
> # (100*100)*4/1000/1000/1000/1000 => near equals 4TB)
>
> We can, of course, use predict method per user,
> but, as far as I tried, it is very slow to get 1 million users' results.
>
> Do I miss something ?
> Are there any other better ways to get all the recommendation results in
> scalable and efficient way ?
>
> Best regards,
> Hiro
>
>
>


Re: Get the number of days dynamically in with Column

2016-03-20 Thread Silvio Fiorito
I’m not entirely sure if this is what you’re asking, but you could just use the 
datediff function:

val df2 = df.withColumn("ID”, datediff($"end", $"start”))

If you want it formatted as {n}D then:

val df2 = df.withColumn("ID", concat(datediff($"end", $"start"),lit("D")))

Thanks,
Silvio

From: Divya Gehlot >
Date: Sunday, March 20, 2016 at 11:42 PM
To: "user @spark" >
Subject: Get the number of days dynamically in with Column

I have a time stamping table which has data like
No of Days ID
11D
22D



and so on till 30 days

Have another Dataframe with
start date and end date
I need to get the difference between these two days and get the ID from Time 
Stamping table and do With Column .

The tedious solution is


val dfTimeStamping = df.withColumn("ID",when(Diff between Start date and 
Enddate ,"1D").when(Diff between Start date and Enddate ,"2D")).. have to do 
till 30 days .

How can I do it dynamically ?


Thanks,
Divya





Get the number of days dynamically in with Column

2016-03-20 Thread Divya Gehlot
I have a time stamping table which has data like
No of Days ID
11D
22D



and so on till 30 days

Have another Dataframe with
start date and end date
I need to get the difference between these two days and get the ID from
Time Stamping table and do With Column .

The tedious solution is


val dfTimeStamping = df.withColumn("ID",when(Diff between Start date and
Enddate ,"1D").when(Diff between Start date and Enddate ,"2D")).. have to
do till 30 days .

How can I do it dynamically ?


Thanks,
Divya


Spark: The build-in indexes in ORC file do not work.

2016-03-20 Thread Joseph
Hi all,

Has anyone used ORC indexes in sparkSQL? Does SparkSQL support ORC indexes 
completely?

I user  the shell script "${SPARK_HOME}/bin/spark-sql" to run sparksql REPL and 
execute my query statement.

The following is my test in sparksql REPL:
spark-sql>set spark.sql.orc.filterPushdown=true;
spark-sql>select count(*) from gprs where terminal_type=25080;Time taken: 
about 5 senconds
spark-sql>select * from gprs where terminal_type=25080;Time 
taken: about 107 senconds

The value of column terminal_type is in [0,25066] in my data.
Both of the two query statements would not scan the whole data (if used file 
stats), but why was the time gap so large?

spark-sql>set spark.sql.orc.filterPushdown=false;
spark-sql>select count(*) from gprs where terminal_type=25080;Time taken: 
about 5 senconds
spark-sql>select * from gprs where terminal_type=25080;Time 
taken: about 107 senconds

So, when I disaled spark.sql.orc.filterPushdown,  there was no difference (I 
mean select * from ...) of time taken relative to enable 
spark.sql.orc.filterPushdown. 

I have tried explain extended command, but it did not show any information that 
indicated the query statement had used ORC stats.
Is there any way to check the use of stats? 

Appendix:
Cluster enviroment:
Hadoop 2.7.2, spark 1.6.1, 3 nodes, 3 works per node, 8 cores per work, 16 
GB per work, 16GB per executor, block size is 256M, 3 replications per block, 4 
disks per datanode.

data size:
Toal 800 ORC files, each file is about 51MB, total 560,000,000 rows,57 
colunms, only one table named gprs(ORC format).

Thanks!


Joseph
 
 
 
 


Re: Building spark submodule source code

2016-03-20 Thread Ted Yu
To speed up the build process, take a look at install_zinc() in build/mvn,
around line 83.

And the following around line 137:

# Now that zinc is ensured to be installed, check its status and, if its
# not running or just installed, start it

FYI

On Sun, Mar 20, 2016 at 7:44 PM, Tenghuan He  wrote:

> Hi everyone,
>
> I am trying to add a new method to spark RDD. After changing the code
> of RDD.scala and running the following command
> mvn -pl :spark-core_2.10 -DskipTests clean install
> It BUILD SUCCESS, however, when starting the bin\spark-shell, my
> method cannot be found.
> Do I have to rebuild the whole spark project instead the spark-core
> submodule to make the changes work?
> Rebuiling the whole project is too time consuming, is there any better
> choice?
>
>
> Thanks & Best Regards
>
> Tenghuan He
>
>


Building spark submodule source code

2016-03-20 Thread Tenghuan He
Hi everyone,

I am trying to add a new method to spark RDD. After changing the code
of RDD.scala and running the following command
mvn -pl :spark-core_2.10 -DskipTests clean install
It BUILD SUCCESS, however, when starting the bin\spark-shell, my method
cannot be found.
Do I have to rebuild the whole spark project instead the spark-core
submodule to make the changes work?
Rebuiling the whole project is too time consuming, is there any better
choice?


Thanks & Best Regards

Tenghuan He


support vector machine question

2016-03-20 Thread prem09
Hi, 
I created a dataset of 100 points, ranging from X=1.0 to to X=100.0. I let
the y variable be 0.0 if X < 51.0 and 1.0 otherwise. I then fit a
SVMwithSGD. When I predict the y values for the same values of X as in the
sample, I get back 1.0 for each predicted y! 

Incidentally, I don't get perfect separation when I replace SVMwithSGD with
LogisticRegressionWithSGD or NaiveBayes. 

Here's the code: 

 
import sys 
from pyspark import SparkContext 
from pyspark.mllib.classification import LogisticRegressionWithSGD,
LogisticRegressionModel 
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel 
from pyspark.mllib.classification import SVMWithSGD, SVMModel 
from pyspark.mllib.regression import LabeledPoint 
import numpy as np 

# Load a text file and convert each line to a tuple. 
sc=SparkContext(appName="Prem") 

# Load and parse the data 
def parsePoint(line): 
values = [float(x) for x in line.split('\t')] 
return LabeledPoint(values[0], values[1:]) 

data = sc.textFile("c:/python27/classifier.txt") 
parsedData = data.map(parsePoint) 
print parsedData 

# Build the model 
model = SVMWithSGD.train(parsedData, iterations=100) 
model.setThreshold(0.5) 
print model 

### Build the model 
##model = LogisticRegressionWithSGD.train(parsedData, iterations=100,
intercept=True) 
##print model 

### Build the model 
##model = NaiveBayes.train(parsedData) 
##print model 

for i in range(100): 
print i+1, model.predict(np.array([float(i+1)])) 

= 

Incidentally, the weights I observe in MLlib are 0.8949991, while if I run
it using the scikit-learn library version of support vector machine, I get
0.05417109. Is this indicative of the problem? 
Can you please let me know what I am doing wrong? 

Thanks, 
Prem



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/support-vector-machine-question-tp26543.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: reading csv file, operation on column or columns

2016-03-20 Thread Mich Talebzadeh
Apologies. Good point

def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
newType:String) = {
 | val df_1 = df.withColumnRenamed(name, "ConvertColumn")
 | df_1.withColumn(name,
df_1.col("ConvertColumn").cast(newType)).drop("ConvertColumn")
 | }
val df_3 = convertColumn(df_2, "InvoiceNumber","Integer")
df_3: org.apache.spark.sql.DataFrame = [Payment date: string, Net: string,
VAT: string, InvoiceNumber: int]

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 20 March 2016 at 22:48, Ted Yu  wrote:

> Mich:
> Looks like convertColumn() is method of your own - I don't see it in Spark
> code base.
>
> On Sun, Mar 20, 2016 at 3:38 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Pretty straight forward as pointed out by Ted.
>>
>> --read csv file into a df
>> val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load("/data/stg/table2")
>>
>> scala> df.printSchema
>> root
>>  |-- Invoice Number: string (nullable = true)
>>  |-- Payment date: string (nullable = true)
>>  |-- Net: string (nullable = true)
>>  |-- VAT: string (nullable = true)
>>  |-- Total: string (nullable = true)
>> --
>> --rename the first column as InvoiceNumber getting rid of space
>> --
>> scala> val df_1 = df.withColumnRenamed("Invoice Number","InvoiceNumber")
>> df_1: org.apache.spark.sql.DataFrame = [InvoiceNumber: string, Payment
>> date: string, Net: string, VAT: string, Total: string]
>> --
>> --drop column Total
>> --
>> scala> val df_2 = df_1.drop("Total")
>> df_2: org.apache.spark.sql.DataFrame = [InvoiceNumber: string, Payment
>> date: string, Net: string, VAT: string]
>> --
>> -- Change InvoiceNumber from String to Integer
>> --
>> scala> val df_3 = convertColumn(df_2, "InvoiceNumber","Integer")
>> df_3: org.apache.spark.sql.DataFrame = [Payment date: string, Net:
>> string, VAT: string, InvoiceNumber: int]
>>
>>
>> HTH
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 20 March 2016 at 22:15, Ted Yu  wrote:
>>
>>> Please refer to the following methods of DataFrame:
>>>
>>>   def withColumn(colName: String, col: Column): DataFrame = {
>>>
>>>   def drop(colName: String): DataFrame = {
>>>
>>> On Sun, Mar 20, 2016 at 2:47 PM, Ashok Kumar <
>>> ashok34...@yahoo.com.invalid> wrote:
>>>
 Gurus,

 I would like to read a csv file into a Data Frame but able to rename
 the column name, change a column type from String to Integer or drop the
 column from further analysis before saving data as parquet file?

 Thanks

>>>
>>>
>>
>


Re: reading csv file, operation on column or columns

2016-03-20 Thread Ted Yu
Mich:
Looks like convertColumn() is method of your own - I don't see it in Spark
code base.

On Sun, Mar 20, 2016 at 3:38 PM, Mich Talebzadeh 
wrote:

> Pretty straight forward as pointed out by Ted.
>
> --read csv file into a df
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
>
> scala> df.printSchema
> root
>  |-- Invoice Number: string (nullable = true)
>  |-- Payment date: string (nullable = true)
>  |-- Net: string (nullable = true)
>  |-- VAT: string (nullable = true)
>  |-- Total: string (nullable = true)
> --
> --rename the first column as InvoiceNumber getting rid of space
> --
> scala> val df_1 = df.withColumnRenamed("Invoice Number","InvoiceNumber")
> df_1: org.apache.spark.sql.DataFrame = [InvoiceNumber: string, Payment
> date: string, Net: string, VAT: string, Total: string]
> --
> --drop column Total
> --
> scala> val df_2 = df_1.drop("Total")
> df_2: org.apache.spark.sql.DataFrame = [InvoiceNumber: string, Payment
> date: string, Net: string, VAT: string]
> --
> -- Change InvoiceNumber from String to Integer
> --
> scala> val df_3 = convertColumn(df_2, "InvoiceNumber","Integer")
> df_3: org.apache.spark.sql.DataFrame = [Payment date: string, Net: string,
> VAT: string, InvoiceNumber: int]
>
>
> HTH
>
>
>
>
>
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 20 March 2016 at 22:15, Ted Yu  wrote:
>
>> Please refer to the following methods of DataFrame:
>>
>>   def withColumn(colName: String, col: Column): DataFrame = {
>>
>>   def drop(colName: String): DataFrame = {
>>
>> On Sun, Mar 20, 2016 at 2:47 PM, Ashok Kumar <
>> ashok34...@yahoo.com.invalid> wrote:
>>
>>> Gurus,
>>>
>>> I would like to read a csv file into a Data Frame but able to rename the
>>> column name, change a column type from String to Integer or drop the column
>>> from further analysis before saving data as parquet file?
>>>
>>> Thanks
>>>
>>
>>
>


Re: reading csv file, operation on column or columns

2016-03-20 Thread Mich Talebzadeh
Pretty straight forward as pointed out by Ted.

--read csv file into a df
val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")

scala> df.printSchema
root
 |-- Invoice Number: string (nullable = true)
 |-- Payment date: string (nullable = true)
 |-- Net: string (nullable = true)
 |-- VAT: string (nullable = true)
 |-- Total: string (nullable = true)
--
--rename the first column as InvoiceNumber getting rid of space
--
scala> val df_1 = df.withColumnRenamed("Invoice Number","InvoiceNumber")
df_1: org.apache.spark.sql.DataFrame = [InvoiceNumber: string, Payment
date: string, Net: string, VAT: string, Total: string]
--
--drop column Total
--
scala> val df_2 = df_1.drop("Total")
df_2: org.apache.spark.sql.DataFrame = [InvoiceNumber: string, Payment
date: string, Net: string, VAT: string]
--
-- Change InvoiceNumber from String to Integer
--
scala> val df_3 = convertColumn(df_2, "InvoiceNumber","Integer")
df_3: org.apache.spark.sql.DataFrame = [Payment date: string, Net: string,
VAT: string, InvoiceNumber: int]


HTH













Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 20 March 2016 at 22:15, Ted Yu  wrote:

> Please refer to the following methods of DataFrame:
>
>   def withColumn(colName: String, col: Column): DataFrame = {
>
>   def drop(colName: String): DataFrame = {
>
> On Sun, Mar 20, 2016 at 2:47 PM, Ashok Kumar  > wrote:
>
>> Gurus,
>>
>> I would like to read a csv file into a Data Frame but able to rename the
>> column name, change a column type from String to Integer or drop the column
>> from further analysis before saving data as parquet file?
>>
>> Thanks
>>
>
>


Re: reading csv file, operation on column or columns

2016-03-20 Thread Ted Yu
Please refer to the following methods of DataFrame:

  def withColumn(colName: String, col: Column): DataFrame = {

  def drop(colName: String): DataFrame = {

On Sun, Mar 20, 2016 at 2:47 PM, Ashok Kumar 
wrote:

> Gurus,
>
> I would like to read a csv file into a Data Frame but able to rename the
> column name, change a column type from String to Integer or drop the column
> from further analysis before saving data as parquet file?
>
> Thanks
>


reading csv file, operation on column or columns

2016-03-20 Thread Ashok Kumar
Gurus,
I would like to read a csv file into a Data Frame but able to rename the column 
name, change a column type from String to Integer or drop the column from 
further analysis before saving data as parquet file?
Thanks

Re: Flume with Spark Streaming Sink

2016-03-20 Thread Luciano Resende
You should use it as described in the documentation and passing it as a
package:

 ./bin/spark-submit --packages
org.apache.spark:spark-streaming-flume_2.10:1.6.1 ...




On Sun, Mar 20, 2016 at 9:22 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I'm trying to use the Spark Sink with Flume but it seems I'm missing some
> of the dependencies.
> I'm running the following code:
>
> ./bin/spark-shell --master yarn --jars
> /home/impact/flumeStreaming/spark-streaming-flume_2.10-1.6.1.jar,/home/impact/flumeStreaming/flume-ng-core-1.6.0.jar,/home/impact/flumeStreaming/flume-ng-sdk-1.6.0.jar
>
>
> import org.apache.spark.streaming.flume._
>
> import org.apache.spark.streaming._
>
> val ssc = new StreamingContext(sc, Seconds(60))
> val flumeStream = FlumeUtils.createPollingStream(ssc, "impact1", )
>
> flumeStream.print
> ssc.start
>
>
> And getting this execption.
>
> 16/03/20 18:17:17 INFO scheduler.ReceiverTracker: Registered receiver for
> stream 0 from impact3.indigo.co.il:51581
> 16/03/20 18:17:17 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 4.0 (TID 76, impact3.indigo.co.il): java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/flume/sink/SparkFlumeProtocol$Callback
> at
> org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:84)
>
>
> What deps am I missing ?
>
> Thank you.
> Daniel
>



-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: Flume with Spark Streaming Sink

2016-03-20 Thread Ted Yu
$ jar tvf
./external/flume-sink/target/spark-streaming-flume-sink_2.10-1.6.1.jar |
grep SparkFlumeProtocol
   841 Thu Mar 03 11:09:36 PST 2016
org/apache/spark/streaming/flume/sink/SparkFlumeProtocol$Callback.class
  2363 Thu Mar 03 11:09:36 PST 2016
org/apache/spark/streaming/flume/sink/SparkFlumeProtocol.class

On Sun, Mar 20, 2016 at 9:22 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I'm trying to use the Spark Sink with Flume but it seems I'm missing some
> of the dependencies.
> I'm running the following code:
>
> ./bin/spark-shell --master yarn --jars
> /home/impact/flumeStreaming/spark-streaming-flume_2.10-1.6.1.jar,/home/impact/flumeStreaming/flume-ng-core-1.6.0.jar,/home/impact/flumeStreaming/flume-ng-sdk-1.6.0.jar
>
>
> import org.apache.spark.streaming.flume._
>
> import org.apache.spark.streaming._
>
> val ssc = new StreamingContext(sc, Seconds(60))
> val flumeStream = FlumeUtils.createPollingStream(ssc, "impact1", )
>
> flumeStream.print
> ssc.start
>
>
> And getting this execption.
>
> 16/03/20 18:17:17 INFO scheduler.ReceiverTracker: Registered receiver for
> stream 0 from impact3.indigo.co.il:51581
> 16/03/20 18:17:17 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 4.0 (TID 76, impact3.indigo.co.il): java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/flume/sink/SparkFlumeProtocol$Callback
> at
> org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:84)
>
>
> What deps am I missing ?
>
> Thank you.
> Daniel
>


Flume with Spark Streaming Sink

2016-03-20 Thread Daniel Haviv
Hi,
I'm trying to use the Spark Sink with Flume but it seems I'm missing some
of the dependencies.
I'm running the following code:

./bin/spark-shell --master yarn --jars
/home/impact/flumeStreaming/spark-streaming-flume_2.10-1.6.1.jar,/home/impact/flumeStreaming/flume-ng-core-1.6.0.jar,/home/impact/flumeStreaming/flume-ng-sdk-1.6.0.jar


import org.apache.spark.streaming.flume._

import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(60))
val flumeStream = FlumeUtils.createPollingStream(ssc, "impact1", )

flumeStream.print
ssc.start


And getting this execption.

16/03/20 18:17:17 INFO scheduler.ReceiverTracker: Registered receiver for
stream 0 from impact3.indigo.co.il:51581
16/03/20 18:17:17 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0
(TID 76, impact3.indigo.co.il): java.lang.NoClassDefFoundError:
org/apache/spark/streaming/flume/sink/SparkFlumeProtocol$Callback
at
org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:84)


What deps am I missing ?

Thank you.
Daniel


Re: Spark 2.0 Shell -csv package weirdness

2016-03-20 Thread Marco Mistroni
Hi
I try tomorrow same settings as you to see if I can experience same issues.
Will report back once done
Thanks
On 20 Mar 2016 3:50 pm, "Vincent Ohprecio"  wrote:

> Thanks Mich and Marco for your help. I have created a ticket to look into
> it on dev channel.
> Here is the issue https://issues.apache.org/jira/browse/SPARK-14031
>
> On Sun, Mar 20, 2016 at 2:57 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Vincent,
>>
>> I downloads the CSV file and did the test.
>>
>> Spark version 1.5.2
>>
>> The full code as follows. Minor changes to delete
>> yearAndCancelled.parquet and output.csv files if they are already created
>>
>> //$SPARK_HOME/bin/spark-shell --packages
>> com.databricks:spark-csv_2.11:1.3.0
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); sqlContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> def TimeTaken[R](block: => R): R = {
>> val t0 = System.nanoTime()
>> val result = block// call-by-name
>> val t1 = System.nanoTime()
>> println("Elapsed time: " + (t1 - t0) + "ns")
>> result
>> }
>> //
>> // Get a DF first based on Databricks CSV libraries
>> //
>> val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load("/data/stg2/2008.csv.bz2")
>> val df_1 = df.withColumnRenamed("Year","oldYear")
>> val df_2 =
>> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
>> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
>> newType:String) = {
>>   val df_1 = df.withColumnRenamed(name, "swap")
>>   df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
>> }
>> val df_3 = convertColumn(df_2, "ArrDelay", "int")
>> val df_4 = convertColumn(df_2, "DepDelay", "int")
>> // clean up the files in HDFS directory first if exist
>> val hadoopConf = new org.apache.hadoop.conf.Configuration()
>> val hdfs = org.apache.hadoop.fs.FileSystem.get(new
>> java.net.URI("hdfs://rhes564:9000"), hadoopConf)
>> val output1 = "hdfs://rhes564:9000/user/hduser/yearAndCancelled.parquet"
>> try { hdfs.delete(new org.apache.hadoop.fs.Path(output1), true) } catch {
>> case _ : Throwable => { } }
>> val output2 = "hdfs://rhes564:9000/user/hduser/output.csv"
>> try { hdfs.delete(new org.apache.hadoop.fs.Path(output2), true) } catch {
>> case _ : Throwable => { } }
>> // test write to parquet is fast
>> df_4.select("Year",
>> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
>> val selectedData = df_4.select("Year", "Cancelled")
>> val howLong =
>> TimeTaken(selectedData.write.format("com.databricks.spark.csv").option("header",
>> "true").save("output.csv"))
>> println ("\nFinished at"); sqlContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit()
>>
>>
>> My results are as follows
>>
>> Started at
>> [20/03/2016 10:02:02.02]
>>
>> Elapsed time: 63984582000ns
>> howLong: Unit = ()
>> Finished at
>> [20/03/2016 10:04:59.59]
>>
>> So the whole job finished just under 3 minutes. The elapsed time for
>> saving output.csv took 63 seconds. That CSV file has 7,009,728 rows
>>
>> HTH
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 19 March 2016 at 22:36, Vincent Ohprecio  wrote:
>>
>>> parquet works super fast but writes to csv took an hour.  another tested
>>> with 1.5 and it was fast.  im gonna try a few more setups to test. im
>>> testing 2.0
>>>
>>> thanks for your help
>>>
>>> Sent from my iDevice
>>>
>>> On Mar 19, 2016, at 3:30 PM, Mich Talebzadeh 
>>> wrote:
>>>
>>> Hi Vince,
>>>
>>> We had a similar case a while back. I tried two solutions in both Spark
>>> on Hive metastore and Hive on Spark engine.
>>>
>>> Hive version 2
>>> Spark as Hive engine 1.3.1
>>>
>>> Basically
>>>
>>> --1 Move .CSV data into HDFS:
>>> --2 Create an external table (all columns as string)
>>> --3 Create the ORC table (majority Int)
>>> --4 Insert the data from the external table to the Hive ORC table
>>> compressed as zlib
>>>
>>> ORC seems to be in this case a good candidate as a simple insert/select
>>> from external table to ORC takes no time. I bucketed ORC table and marked
>>> it as transactional in case one needs to make a correction to it (not
>>> really needed).
>>>
>>> The whole process was time stamped and it took 5 minutes to complete and
>>> there were 7,009,728 rows in total.
>>>
>>>
>>> +-+--+
>>> |starttime|
>>> +-+--+
>>> | 19/03/2016 22:21:19.19  |
>>> +-+--+
>>>
>>> 

Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-20 Thread Surendra , Manchikanti
Hi,

Can you check Kafka topic replication ? And leader information?

Regards,
Surendra M



-- Surendra Manchikanti

On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss  wrote:

> Hi,
>
> I have a SparkStream (with Kafka) job, after running several days, it
> failed with following errors:
> ERROR DirectKafkaInputDStream:
> ArrayBuffer(java.nio.channels.ClosedChannelException)
>
> Any idea what would be wrong? will it be SparkStreaming buffer overflow
> issue?
>
>
>
> Regards
>
>
>
>
>
>
> *** from the log ***
>
> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect is
> overridden to
>
> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
> ArrayBuffer(java.nio.channels.ClosedChannelException)
>
> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
> java.nio.channels.ClosedChannelException
>
> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
> 1458188031800 ms
>
> org.apache.spark.SparkException:
> ArrayBuffer(java.nio.channels.ClosedChannelException)
>
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>
> at scala.Option.orElse(Option.scala:257)
>
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
>
> at scala.util.Try$.apply(Try.scala:161)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
>
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Exception in thread "main" org.apache.spark.SparkException:
> ArrayBuffer(java.nio.channels.ClosedChannelException)
>
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>
> at
> 

Re: Spark 2.0 Shell -csv package weirdness

2016-03-20 Thread Vincent Ohprecio
Thanks Mich and Marco for your help. I have created a ticket to look into
it on dev channel.
Here is the issue https://issues.apache.org/jira/browse/SPARK-14031

On Sun, Mar 20, 2016 at 2:57 AM, Mich Talebzadeh 
wrote:

> Hi Vincent,
>
> I downloads the CSV file and did the test.
>
> Spark version 1.5.2
>
> The full code as follows. Minor changes to delete yearAndCancelled.parquet
> and output.csv files if they are already created
>
> //$SPARK_HOME/bin/spark-shell --packages
> com.databricks:spark-csv_2.11:1.3.0
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> def TimeTaken[R](block: => R): R = {
> val t0 = System.nanoTime()
> val result = block// call-by-name
> val t1 = System.nanoTime()
> println("Elapsed time: " + (t1 - t0) + "ns")
> result
> }
> //
> // Get a DF first based on Databricks CSV libraries
> //
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg2/2008.csv.bz2")
> val df_1 = df.withColumnRenamed("Year","oldYear")
> val df_2 =
> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
> newType:String) = {
>   val df_1 = df.withColumnRenamed(name, "swap")
>   df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
> }
> val df_3 = convertColumn(df_2, "ArrDelay", "int")
> val df_4 = convertColumn(df_2, "DepDelay", "int")
> // clean up the files in HDFS directory first if exist
> val hadoopConf = new org.apache.hadoop.conf.Configuration()
> val hdfs = org.apache.hadoop.fs.FileSystem.get(new
> java.net.URI("hdfs://rhes564:9000"), hadoopConf)
> val output1 = "hdfs://rhes564:9000/user/hduser/yearAndCancelled.parquet"
> try { hdfs.delete(new org.apache.hadoop.fs.Path(output1), true) } catch {
> case _ : Throwable => { } }
> val output2 = "hdfs://rhes564:9000/user/hduser/output.csv"
> try { hdfs.delete(new org.apache.hadoop.fs.Path(output2), true) } catch {
> case _ : Throwable => { } }
> // test write to parquet is fast
> df_4.select("Year",
> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
> val selectedData = df_4.select("Year", "Cancelled")
> val howLong =
> TimeTaken(selectedData.write.format("com.databricks.spark.csv").option("header",
> "true").save("output.csv"))
> println ("\nFinished at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit()
>
>
> My results are as follows
>
> Started at
> [20/03/2016 10:02:02.02]
>
> Elapsed time: 63984582000ns
> howLong: Unit = ()
> Finished at
> [20/03/2016 10:04:59.59]
>
> So the whole job finished just under 3 minutes. The elapsed time for
> saving output.csv took 63 seconds. That CSV file has 7,009,728 rows
>
> HTH
>
>
>
>
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 19 March 2016 at 22:36, Vincent Ohprecio  wrote:
>
>> parquet works super fast but writes to csv took an hour.  another tested
>> with 1.5 and it was fast.  im gonna try a few more setups to test. im
>> testing 2.0
>>
>> thanks for your help
>>
>> Sent from my iDevice
>>
>> On Mar 19, 2016, at 3:30 PM, Mich Talebzadeh 
>> wrote:
>>
>> Hi Vince,
>>
>> We had a similar case a while back. I tried two solutions in both Spark
>> on Hive metastore and Hive on Spark engine.
>>
>> Hive version 2
>> Spark as Hive engine 1.3.1
>>
>> Basically
>>
>> --1 Move .CSV data into HDFS:
>> --2 Create an external table (all columns as string)
>> --3 Create the ORC table (majority Int)
>> --4 Insert the data from the external table to the Hive ORC table
>> compressed as zlib
>>
>> ORC seems to be in this case a good candidate as a simple insert/select
>> from external table to ORC takes no time. I bucketed ORC table and marked
>> it as transactional in case one needs to make a correction to it (not
>> really needed).
>>
>> The whole process was time stamped and it took 5 minutes to complete and
>> there were 7,009,728 rows in total.
>>
>>
>> +-+--+
>> |starttime|
>> +-+--+
>> | 19/03/2016 22:21:19.19  |
>> +-+--+
>>
>> +-+--+
>> | endtime |
>> +-+--+
>> | 19/03/2016 22:26:12.12  |
>> +-+--+
>>
>>
>>
>> This is the code. I will try spark code later with parquet
>>
>> select from_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') AS
>> StartTime;
>> set hive.exec.reducers.max=256;

Re: Limit pyspark.daemon threads

2016-03-20 Thread Ted Yu
I took a look at docs/configuration.md
Though I didn't find answer for your first question, I think the following
pertains to your second question:


  spark.python.worker.memory
  512m
  
Amount of memory to use per python worker process during aggregation,
in the same
format as JVM memory strings (e.g. 512m, 2g).
If the memory
used during aggregation goes above this amount, it will spill the data
into disks.
  


On Thu, Mar 17, 2016 at 7:43 AM, Carlile, Ken 
wrote:

> Hello,
>
> We have an HPC cluster that we run Spark jobs on using standalone mode and
> a number of scripts I’ve built up to dynamically schedule and start spark
> clusters within the Grid Engine framework. Nodes in the cluster have 16
> cores and 128GB of RAM.
>
> My users use pyspark heavily. We’ve been having a number of problems with
> nodes going offline with extraordinarily high load. I was able to look at
> one of those nodes today before it went truly sideways, and I discovered
> that the user was running 50 pyspark.daemon threads (remember, this is a 16
> core box), and the load was somewhere around 25 or so, with all CPUs maxed
> out at 100%.
>
> So while the spark worker is aware it’s only got 16 cores and behaves
> accordingly, pyspark seems to be happy to overrun everything like crazy. Is
> there a global parameter I can use to limit pyspark threads to a sane
> number, say 15 or 16? It would also be interesting to set a memory limit,
> which leads to another question.
>
> How is memory managed when pyspark is used? I have the spark worker memory
> set to 90GB, and there is 8GB of system overhead (GPFS caching), so if
> pyspark operates outside of the JVM memory pool, that leaves it at most
> 30GB to play with, assuming there is no overhead outside the JVM’s 90GB
> heap (ha ha.)
>
> Thanks,
> Ken Carlile
> Sr. Unix Engineer
> HHMI/Janelia Research Campus
> 571-209-4363
>
>


MLPC model can not be saved

2016-03-20 Thread HanPan
Hi Guys,

 

 I built a ML pipeline that includes multilayer perceptron
classifier, I got the following error message when I tried to save the
pipeline model. It seems like MLPC model can not be saved which means I have
no ways to save the trained model. Is there any way to save the model that I
can use it for future prediction.

 

 Exception in thread "main" java.lang.UnsupportedOperationException:
Pipeline write will fail on this Pipeline because it contains a stage which
does not implement Writable. Non-Writable stage: mlpc_2d8b74f6da60 of type
class
org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel

 at
org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$validateStages$1.apply
(Pipeline.scala:218)

 at
org.apache.spark.ml.Pipeline$SharedReadWrite$$anonfun$validateStages$1.apply
(Pipeline.scala:215)

 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala
:33)

 at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

 at
org.apache.spark.ml.Pipeline$SharedReadWrite$.validateStages(Pipeline.scala:
215)

 at
org.apache.spark.ml.PipelineModel$PipelineModelWriter.(Pipeline.scala:
325)

 at org.apache.spark.ml.PipelineModel.write(Pipeline.scala:309)

 at
org.apache.spark.ml.util.MLWritable$class.save(ReadWrite.scala:130)

 at org.apache.spark.ml.PipelineModel.save(Pipeline.scala:280)

 at
cn.thinkingdata.nlp.spamclassifier.FFNNSpamClassifierPipeLine.main(FFNNSpamC
lassifierPipeLine.java:76)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)

 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)

 at java.lang.reflect.Method.invoke(Method.java:497)

 at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ru
nMain(SparkSubmit.scala:731)

 at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

 at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 

Thanks

Pan



Re: Stanford CoreNLP sentiment extraction: lost executor

2016-03-20 Thread tundo
*SOLVED:*
Unfortunately, stderr log in Hadoop's Resource Manager UI was not useful
since it just reported "... Lost executor XX on workerYYY...". Therefore, I
dumped locally the whole app-related logs: /yarn logs -applicationId
application_1458320004153_0343   > ~/application_1458320004153_0343.txt/.
That log is split by container and I've noticed that each "workerYYY" had a
WARN and an sudden crash after a bit. 

Some "Untokenizable" chars in the /email.body/ made CoreNLP throw a WARN at
first and then crash/loose the executor after a few seconds. This happened
systematically and I think it is a bug in CoreNLP 3.4.1. 
The solution was just sanitise the input fed to the Annotation constructor.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stanford-CoreNLP-sentiment-extraction-lost-executor-tp26536p26541.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-submit reset JVM

2016-03-20 Thread Ted Yu
Not that I know of.

Can you be a little more specific on which JVM(s) you want restarted
(assuming spark-submit is used to start a second job) ?

Thanks

On Sun, Mar 20, 2016 at 6:20 AM, Udo Fholl  wrote:

> Hi all,
>
> Is there a way for spark-submit to restart the JVM in the worker machines?
>
> Thanks.
>
> Udo.
>


ClassNotFoundException in RDD.map

2016-03-20 Thread Dirceu Semighini Filho
Hello,
I found a strange behavior after executing a prediction with MLIB.
My code return an RDD[(Any,Double)] where Any is the id of my dataset,
which is BigDecimal, and Double is the prediction for that line.
When I run
myRdd.take(10) it returns ok
res16: Array[_ >: (Double, Double) <: (Any, Double)] =
Array((1921821857196754403.00,0.1690292052496703),
(454575632374427.00,0.16902820241892452),
(989198096568001939.00,0.16903432789699502),
(14284129652106187990.00,0.16903517653451386),
(17980228074225252497.00,0.16903151028332508),
(3861345958263692781.00,0.16903056986183976),
(17558198701997383205.00,0.1690295450319745),
(10651576092054552310.00,0.1690286445174418),
(4534494349035056215.00,0.16903303401862327),
(5551671513234217935.00,0.16902303368995966))
But when I try to run some map on it:
myRdd.map(_._1).take(10)
It throws a ClassCastException:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 72.0 failed 4 times, most recent failure: Lost task 0.3 in stage
72.0 (TID 1774, 172.31.23.208): java.lang.ClassNotFoundException:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:278)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 

spark-submit reset JVM

2016-03-20 Thread Udo Fholl
Hi all,

Is there a way for spark-submit to restart the JVM in the worker machines?

Thanks.

Udo.


Re: Can't zip RDDs with unequal numbers of partitions

2016-03-20 Thread Jakob Odersky
Can you share a snippet that reproduces the error? What was
spark.sql.autoBroadcastJoinThreshold before your last change?

On Thu, Mar 17, 2016 at 10:03 AM, Jiří Syrový  wrote:
> Hi,
>
> any idea what could be causing this issue? It started appearing after
> changing parameter
>
> spark.sql.autoBroadcastJoinThreshold to 10
>
>
> Caused by: java.lang.IllegalArgumentException: Can't zip RDDs with unequal
> numbers of partitions
> at
> org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.PartitionCoalescer.(CoalescedRDD.scala:172)
> at
> org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:85)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
> at org.apache.spark.ShuffleDependency.(Dependency.scala:91)
> at
> org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> ... 28 more
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Subquery performance

2016-03-20 Thread Michael Armbrust
If you encode the data in something like parquet we usually have more
information and will try to broadcast.

On Thu, Mar 17, 2016 at 7:27 PM, Younes Naguib <
younes.nag...@tritondigital.com> wrote:

> Anyways to cache the subquery or force a broadcast join without persisting
> it?
>
>
>
> y
>
>
>
> *From:* Michael Armbrust [mailto:mich...@databricks.com]
> *Sent:* March-17-16 8:59 PM
> *To:* Younes Naguib
> *Cc:* user@spark.apache.org
> *Subject:* Re: Subquery performance
>
>
>
> Try running EXPLAIN on both version of the query.
>
>
>
> Likely when you cache the subquery we know that its going to be small so
> use a broadcast join instead of a shuffling the data.
>
>
>
> On Thu, Mar 17, 2016 at 5:53 PM, Younes Naguib <
> younes.nag...@tritondigital.com> wrote:
>
> Hi all,
>
>
>
> I’m running a query that looks like the following:
>
> Select col1, count(1)
>
> From (Select col2, count(1) from tab2 group by col2)
>
> Inner join tab1 on (col1=col2)
>
> Group by col1
>
>
>
> This creates a very large shuffle, 10 times the data size, as if the
> subquery was executed for each row.
>
> Anything can be done to tune to help tune this?
>
> When the subquery in persisted, it runs much faster, and the shuffle is 50
> times smaller!
>
>
>
> *Thanks,*
>
> *Younes*
>
>
>


[no subject]

2016-03-20 Thread Vinay Varma



Re: PySpark Issue: "org.apache.spark.shuffle.FetchFailedException: Failed to connect to..."

2016-03-20 Thread craigiggy
Also, this is the command I use to submit the Spark application:

**

where *recommendation_engine-0.1-py2.7.egg* is a Python egg of my own
library I've written for this application, and *'file'* and
*'/home/spark/enigma_analytics/tests/msg-epims0730_small.json'* are input
arguments for the application.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Issue-org-apache-spark-shuffle-FetchFailedException-Failed-to-connect-to-tp26511p26532.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 2.0 Shell -csv package weirdness

2016-03-20 Thread Mich Talebzadeh
Hi Vincent,

I downloads the CSV file and did the test.

Spark version 1.5.2

The full code as follows. Minor changes to delete yearAndCancelled.parquet
and output.csv files if they are already created

//$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
def TimeTaken[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block// call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
result
}
//
// Get a DF first based on Databricks CSV libraries
//
val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg2/2008.csv.bz2")
val df_1 = df.withColumnRenamed("Year","oldYear")
val df_2 =
df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
newType:String) = {
  val df_1 = df.withColumnRenamed(name, "swap")
  df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
}
val df_3 = convertColumn(df_2, "ArrDelay", "int")
val df_4 = convertColumn(df_2, "DepDelay", "int")
// clean up the files in HDFS directory first if exist
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new
java.net.URI("hdfs://rhes564:9000"), hadoopConf)
val output1 = "hdfs://rhes564:9000/user/hduser/yearAndCancelled.parquet"
try { hdfs.delete(new org.apache.hadoop.fs.Path(output1), true) } catch {
case _ : Throwable => { } }
val output2 = "hdfs://rhes564:9000/user/hduser/output.csv"
try { hdfs.delete(new org.apache.hadoop.fs.Path(output2), true) } catch {
case _ : Throwable => { } }
// test write to parquet is fast
df_4.select("Year",
"Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
val selectedData = df_4.select("Year", "Cancelled")
val howLong =
TimeTaken(selectedData.write.format("com.databricks.spark.csv").option("header",
"true").save("output.csv"))
println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
sys.exit()


My results are as follows

Started at
[20/03/2016 10:02:02.02]

Elapsed time: 63984582000ns
howLong: Unit = ()
Finished at
[20/03/2016 10:04:59.59]

So the whole job finished just under 3 minutes. The elapsed time for
saving output.csv
took 63 seconds. That CSV file has 7,009,728 rows

HTH












Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 19 March 2016 at 22:36, Vincent Ohprecio  wrote:

> parquet works super fast but writes to csv took an hour.  another tested
> with 1.5 and it was fast.  im gonna try a few more setups to test. im
> testing 2.0
>
> thanks for your help
>
> Sent from my iDevice
>
> On Mar 19, 2016, at 3:30 PM, Mich Talebzadeh 
> wrote:
>
> Hi Vince,
>
> We had a similar case a while back. I tried two solutions in both Spark on
> Hive metastore and Hive on Spark engine.
>
> Hive version 2
> Spark as Hive engine 1.3.1
>
> Basically
>
> --1 Move .CSV data into HDFS:
> --2 Create an external table (all columns as string)
> --3 Create the ORC table (majority Int)
> --4 Insert the data from the external table to the Hive ORC table
> compressed as zlib
>
> ORC seems to be in this case a good candidate as a simple insert/select
> from external table to ORC takes no time. I bucketed ORC table and marked
> it as transactional in case one needs to make a correction to it (not
> really needed).
>
> The whole process was time stamped and it took 5 minutes to complete and
> there were 7,009,728 rows in total.
>
>
> +-+--+
> |starttime|
> +-+--+
> | 19/03/2016 22:21:19.19  |
> +-+--+
>
> +-+--+
> | endtime |
> +-+--+
> | 19/03/2016 22:26:12.12  |
> +-+--+
>
>
>
> This is the code. I will try spark code later with parquet
>
> select from_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') AS
> StartTime;
> set hive.exec.reducers.max=256;
> use test;
> --set hive.execution.engine=mr;
> --2)
> DROP TABLE IF EXISTS stg_t2;
> CREATE EXTERNAL TABLE stg_t2 (
>Year string
> ,  Monthstring
> ,  DayofMonth   string
> ,  DayOfWeekstring
> ,  DepTime  string
> ,  CRSDepTime   string
> ,  ArrTime  string
> ,  CRSArrTime   string
> ,  UniqueCarrierstring
> ,  FlightNumstring
> ,  TailNum  string
> ,  ActualElapsedTimestring
> ,  CRSElapsedTime   string
> ,  

Re: spark launching range is 10 mins

2016-03-20 Thread Enrico Rotundo
You might wanna try to assign more cores to the driver?!

Sent from my iPhone

> On 20 Mar 2016, at 07:34, Jialin Liu  wrote:
> 
> Hi,
> I have set the partitions as 6000, and requested 100 nodes, with 32
> cores each node,
> and the number of executors is 32 per node
> 
> spark-submit --master $SPARKURL --executor-cores 32 --driver-memory
> 20G --executor-memory 80G single-file-test.py
> 
> 
> And I'm reading a 2.2 TB, the code, just has simple two steps,
> rdd=sc.read
> rdd.count
> Then I checked the log file, and history server, it shows that the
> count stage has a really large tasks launching range, e.g.,
> 
> 16/03/19 22:40:17
> 16/03/19 22:30:56
> 
> which is about 10 minutes,
> Has anyone experienced this before?
> Could you please let me know the reason and internal of Spark relating
> to this issue,
> and how to resolve it? Thanks much.
> 
> Best,
> Jialin
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ClassNotFoundException in RDD.map

2016-03-20 Thread Jakob Odersky
The error is very strange indeed, however without code that reproduces
it, we can't really provide much help beyond speculation.

One thing that stood out to me immediately is that you say you have an
RDD of Any where every Any should be a BigDecimal, so why not specify
that type information?
When using Any, a whole class of errors, that normally the typechecker
could catch, can slip through.

On Thu, Mar 17, 2016 at 10:25 AM, Dirceu Semighini Filho
 wrote:
> Hi Ted, thanks for answering.
> The map is just that, whenever I try inside the map it throws this
> ClassNotFoundException, even if I do map(f => f) it throws the exception.
> What is bothering me is that when I do a take or a first it returns the
> result, which make me conclude that the previous code isn't wrong.
>
> Kind Regards,
> Dirceu
>
>
> 2016-03-17 12:50 GMT-03:00 Ted Yu :
>>
>> bq. $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>>
>> Do you mind showing more of your code involving the map() ?
>>
>> On Thu, Mar 17, 2016 at 8:32 AM, Dirceu Semighini Filho
>>  wrote:
>>>
>>> Hello,
>>> I found a strange behavior after executing a prediction with MLIB.
>>> My code return an RDD[(Any,Double)] where Any is the id of my dataset,
>>> which is BigDecimal, and Double is the prediction for that line.
>>> When I run
>>> myRdd.take(10) it returns ok
>>> res16: Array[_ >: (Double, Double) <: (Any, Double)] =
>>> Array((1921821857196754403.00,0.1690292052496703),
>>> (454575632374427.00,0.16902820241892452),
>>> (989198096568001939.00,0.16903432789699502),
>>> (14284129652106187990.00,0.16903517653451386),
>>> (17980228074225252497.00,0.16903151028332508),
>>> (3861345958263692781.00,0.16903056986183976),
>>> (17558198701997383205.00,0.1690295450319745),
>>> (10651576092054552310.00,0.1690286445174418),
>>> (4534494349035056215.00,0.16903303401862327),
>>> (5551671513234217935.00,0.16902303368995966))
>>> But when I try to run some map on it:
>>> myRdd.map(_._1).take(10)
>>> It throws a ClassCastException:
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>>> in stage 72.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>>> 72.0 (TID 1774, 172.31.23.208): java.lang.ClassNotFoundException:
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:278)
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>>> at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
>>> at
>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> 

Re: best way to do deep learning on spark ?

2016-03-20 Thread James Hammerton
In the meantime there is also deeplearning4j which integrates with Spark
(for both Java and Scala): http://deeplearning4j.org/

Regards,

James

On 17 March 2016 at 02:32, Ulanov, Alexander 
wrote:

> Hi Charles,
>
>
>
> There is an implementation of multilayer perceptron in Spark (since 1.5):
>
>
> https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
>
>
>
> Other features such as autoencoder, convolutional layers, etc. are
> currently under development. Please refer to
> https://issues.apache.org/jira/browse/SPARK-5575
>
>
>
> Best regards, Alexander
>
>
>
> *From:* charles li [mailto:charles.up...@gmail.com]
> *Sent:* Wednesday, March 16, 2016 7:01 PM
> *To:* user 
> *Subject:* best way to do deep learning on spark ?
>
>
>
>
>
> Hi, guys, I'm new to MLlib on spark, after reading the document, it seems
> that MLlib does not support deep learning, I want to know is there any way
> to implement deep learning on spark ?
>
>
>
> *Do I must use 3-party package like caffe or tensorflow ?*
>
>
>
> or
>
>
>
> *Does deep learning module list in the MLlib development plan?*
>
>
>
>
> great thanks
>
>
>
> --
>
> *--*
>
> a spark lover, a quant, a developer and a good man.
>
>
>
> http://github.com/litaotao
>


Re: df.dtypes -> pyspark.sql.types

2016-03-20 Thread Reynold Xin
We probably should have the alias. Is this still a problem on master
branch?

On Wed, Mar 16, 2016 at 9:40 AM, Ruslan Dautkhanov 
wrote:

> Running following:
>
> #fix schema for gaid which should not be Double
>> from pyspark.sql.types import *
>> customSchema = StructType()
>> for (col,typ) in tsp_orig.dtypes:
>> if col=='Agility_GAID':
>> typ='string'
>> customSchema.add(col,typ,True)
>
>
> Getting
>
>   ValueError: Could not parse datatype: bigint
>
>
> Looks like pyspark.sql.types doesn't know anything about bigint..
> Should it be aliased to LongType in pyspark.sql.types?
>
> Thanks
>
>
> On Wed, Mar 16, 2016 at 10:18 AM, Ruslan Dautkhanov 
> wrote:
>
>> Hello,
>>
>> Looking at
>>
>> https://spark.apache.org/docs/1.5.1/api/python/_modules/pyspark/sql/types.html
>>
>> and can't wrap my head around how to convert string data types names to
>> actual
>> pyspark.sql.types data types?
>>
>> Does pyspark.sql.types has an interface to return StringType() for
>> "string",
>> IntegerType() for "integer" etc? If it doesn't exist it would be great to
>> have such a
>> mapping function.
>>
>> Thank you.
>>
>>
>> ps. I have a data frame, and use its dtypes to loop through all columns
>> to fix a few
>> columns' data types as a workaround for SPARK-13866.
>>
>>
>> --
>> Ruslan Dautkhanov
>>
>
>


Restarting an executor during execution causes it to lose AWS credentials (anyone seen this?)

2016-03-20 Thread Allen George
Hi guys,

I'm having a problem where respawning a failed executor during a job that
reads/writes parquet on S3 causes subsequent tasks to fail because of
missing AWS keys.

Setup:

I'm using Spark 1.5.2 with Hadoop 2.7 and running experiments on a simple
standalone cluster:

1 master
2 workers

My application is co-located on the master machine, while the two workers
are on two other machines (one worker per machine). All machines are
running in EC2. I've configured my setup so that my application executes
its task on two executors (one executor per worker).

Application:

My application reads and writes parquet files on S3. I set the AWS keys on
the SparkContext by doing:

val sc = new SparkContext()
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3n.awsAccessKeyId", "SOME_KEY")
hadoopConf.set("fs.s3n.awsSecretAccessKey", "SOME_SECRET")

At this point I'm done, and I go ahead and use "sc".

Issue:

I can read and write parquet files without a problem with this setup. *BUT*
if an executor dies during a job and is respawned by a worker, tasks fail
with the following error:

"Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and
Secret Access Key must be specified as the username or password
(respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or
fs.s3n.awsSecretAccessKey properties (respectively)."

I've tried adding the AWS keys to core-site.xml, placing it in
"/etc/hadoop-conf", and setting HADOOP_CONF_DIR in spark-env.sh on the
master/worker machines, but that doesn't seem to help. I tried setting
AWS_ACCESS_KEY_ID, and AWS_SECRET_ACCESS_KEY in the worker environment, but
that didn't work either. It seems that somehow the AWS keys aren't being
picked by a newly-spawned executor. Has anyone seen this before? Is there a
problem with my configuration that's causing this?

Thanks!
Allen

Terminal Musings: http://www.allengeorge.com/
Raft in Java: https://github.com/allengeorge/libraft/
Twitter: https://twitter.com/allenageorge/


Re: spark launching range is 10 mins

2016-03-20 Thread Jialin Liu
Hi,
I have set the partitions as 6000, and requested 100 nodes, with 32
cores each node,
and the number of executors is 32 per node

spark-submit --master $SPARKURL --executor-cores 32 --driver-memory
20G --executor-memory 80G single-file-test.py


And I'm reading a 2.2 TB, the code, just has simple two steps,
rdd=sc.read
rdd.count
Then I checked the log file, and history server, it shows that the
count stage has a really large tasks launching range, e.g.,

16/03/19 22:40:17
16/03/19 22:30:56

which is about 10 minutes,
Has anyone experienced this before?
Could you please let me know the reason and internal of Spark relating
to this issue,
and how to resolve it? Thanks much.

Best,
Jialin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



bug spark should not use java.sql.timestamp was: sql timestamp timezone bug

2016-03-20 Thread Andy Davidson
Here is a nice analysis of the issue from the Cassandra mail list. (Datastax
is the Databricks for Cassandra)

Should I fill a bug?

Kind regards

Andy

http://stackoverflow.com/questions/2305973/java-util-date-vs-java-sql-date
and this one 

On Fri, Mar 18, 2016 at 11:35 AM Russell Spitzer 
wrote:
> Unfortunately part of Spark SQL. They have based their type on
> java.sql.timestamp (and date) which adjust to the client timezone when
> displaying and storing.
> See discussions
> http://stackoverflow.com/questions/9202857/timezones-in-sql-date-vs-java-sql-d
> ate
> And Code
> https://github.com/apache/spark/blob/bb3b3627ac3fcd18be7fb07b6d0ba5eae0342fc3/
> sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.s
> cala#L81-L93
> 

From:  Andrew Davidson 
Date:  Thursday, March 17, 2016 at 3:25 PM
To:  Andrew Davidson , "user @spark"

Subject:  Re: sql timestamp timezone bug

> 
> For completeness. Clearly spark sql returned a different data set
> 
> In [4]:
> rawDF.selectExpr("count(row_key) as num_samples",
> "sum(count) as total",
> "max(count) as max ").show()
> +---++-+
> |num_samples|total|max|
> +---++-+
> |   2037| 3867| 67|
> +---++-+
> 
> 
> From:  Andrew Davidson 
> Date:  Thursday, March 17, 2016 at 3:02 PM
> To:  "user @spark" 
> Subject:  sql timestamp timezone bug
> 
>> I am using pyspark 1.6.0 and
>> datastax:spark-cassandra-connector:1.6.0-M1-s_2.10 to analyze time series
>> data
>> 
>> The data is originally captured by a spark streaming app and written to
>> Cassandra. The value of the timestamp comes from
>> 
>> Rdd.foreachRDD(new VoidFunction2()
>> �});
>> 
>> I am confident the time stamp is stored correctly in cassandra and that
>> the clocks on the machines in my cluster are set correctly
>> 
>> I noticed that if I used Cassandra CQLSH to select a data set between two
>> points in time the row count did not match the row count I got when I did
>> the same select in spark using SQL, It appears the spark sql assumes all
>> timestamp strings are in the local time zone.
>> 
>> 
>> Here is what I expect. (this is what is returned by CQLSH)
>> cqlsh> select
>>... count(row_key) as num_samples, sum(count) as total, max(count)
>> as max
>>... from
>>... notification.json_timeseries
>>... where
>>... row_key in (똱ed', 똟lue')
>>... and created > '2016-03-12 00:30:00+'
>>... and created <= '2016-03-12 04:30:00+'
>>... allow filtering;
>> 
>>  num_samples | total| max
>> -+--+---
>> 3242 |11277 |  17
>> 
>> 
>> Here is  my pyspark select statement. Notice the 똠reated column encodes
>> the timezone¹. I am running this on my local mac (in PST timezone) and
>> connecting to my data center (which runs on UTC) over a VPN.
>> 
>> rawDF = sqlContext.read\
>> .format("org.apache.spark.sql.cassandra")\
>> .options(table="json_timeseries", keyspace="notification")\
>> .load() 
>> 
>> 
>> rawDF.registerTempTable(tmpTableName)
>> 
>> 
>> 
>> stmnt = "select \
>> row_key, created, count, unix_timestamp(created) as unixTimeStamp, \
>> unix_timestamp(created, '-MM-dd HH:mm:ss.z') as hack, \
>> to_utc_timestamp(created, 'gmt') as gmt \
>> from \
>> rawTable \
>> where \
>> (created > '{0}') and (created <= '{1}') \
>> and \
>> (row_key = 똱ed' or row_key = 똟lue¹) \
>> )".format('2016-03-12 00:30:00+', '2016-03-12 04:30:00+')
>> 
>> rawDF = sqlCtx.sql(stmnt).cache()
>> 
>> 
>> 
>> 
>> I get a different values for row count, max, �
>> 
>> If I convert the UTC time stamp string to my local timezone the row count
>> matches the count returned by  cqlsh
>> 
>> # pst works, matches cassandra cqlsh
>> # .format('2016-03-11 16:30:00+', '2016-03-11 20:30:00+')
>> 
>> Am I doing something wrong in my pyspark code?
>> 
>> 
>> Kind regards
>> 
>> Andy
>> 
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>>