Spark SQL 1.6.1 issue

2016-08-17 Thread thbeh
Running the query below I have been hitting - local class incompatible
exception, anyone know the cause?

val rdd = csc.cassandraSql("""select *, concat('Q', d_qoy) as qoy from
store_sales join date_dim on ss_sold_date_sk = d_date_sk join item on
ss_item_sk =
i_item_sk""").groupBy("i_category").pivot("qoy").agg(round(sum("ss_sales_price")/100,2))

The source data is from TPCDS test data and I am running in Zeppelin.


/INFO [2016-08-18 03:15:58,429] ({task-result-getter-2}
Logging.scala[logInfo]:58) - Lost task 3.0 in stage 3.0 (TID 52) on executor
ceph5.example.my: java.io.InvalidClassException
(org.apache.spark.sql.catalyst.expressions.Literal; local class
incompatible: stream classdesc serialVersionUID = 3305180847846277455, local
class serialVersionUID = -4259705229845269663) [duplicate 1]
 INFO [2016-08-18 03:15:58,429] ({task-result-getter-3}
Logging.scala[logInfo]:58) - Lost task 2.0 in stage 3.0 (TID 51) on executor
ceph5.example.my: java.io.InvalidClassException
(org.apache.spark.sql.catalyst.expressions.Literal; local class
incompatible: stream classdesc serialVersionUID = 3305180847846277455, local
class serialVersionUID = -4259705229845269663) [duplicate 2]
 INFO [2016-08-18 03:15:58,430] ({task-result-getter-3}
Logging.scala[logInfo]:58) - Lost task 6.0 in stage 3.0 (TID 55) on executor
ceph5.example.my: java.io.InvalidClassException
(org.apache.spark.sql.catalyst.expressions.Literal; local class
incompatible: stream classdesc serialVersionUID = 3305180847846277455, local
class serialVersionUID = -4259705229845269663) [duplicate 3]/

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-6-1-issue-tp27554.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Getting a TreeNode Exception while saving into Hadoop

2016-08-17 Thread Divya Gehlot
Can you please check order of all the data set of union all operations.


Are they in same order ?

On 9 August 2016 at 02:47, max square  wrote:

> Hey guys,
>
> I'm trying to save Dataframe in CSV format after performing unionAll
> operations on it.
> But I get this exception -
>
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> execute, tree:
> TungstenExchange hashpartitioning(mId#430,200)
>
> I'm saving it by
>
> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir +
> latest)
>
> It works perfectly if I don't do the unionAll operation.
> I see that the format isn't different by printing the part of the results.
>
> Any help regarding this would be appreciated.
>
>


RE: pyspark.sql.functions.last not working as expected

2016-08-17 Thread Alexander Peletz
So here is the test case from the commit adding the first/last methods here: 
https://github.com/apache/spark/pull/10957/commits/defcc02a8885e884d5140b11705b764a51753162



+  test("last/first with ignoreNulls") {


+val nullStr: String = null


+val df = Seq(


+  ("a", 0, nullStr),


+  ("a", 1, "x"),


+  ("a", 2, "y"),


+  ("a", 3, "z"),


+  ("a", 4, nullStr),


+  ("b", 1, nullStr),


+  ("b", 2, nullStr)).


+  toDF("key", "order", "value")


+val window = Window.partitionBy($"key").orderBy($"order")


+checkAnswer(


+  df.select(


+$"key",


+$"order",


+first($"value").over(window),


+first($"value", ignoreNulls = false).over(window),


+first($"value", ignoreNulls = true).over(window),


+last($"value").over(window),


+last($"value", ignoreNulls = false).over(window),


+last($"value", ignoreNulls = true).over(window)),


+  Seq(


+Row("a", 0, null, null, null, null, null, null),


+Row("a", 1, null, null, "x", "x", "x", "x"),


+Row("a", 2, null, null, "x", "y", "y", "y"),


+Row("a", 3, null, null, "x", "z", "z", "z"),


+Row("a", 4, null, null, "x", null, null, "z"),


+Row("b", 1, null, null, null, null, null, null),


+Row("b", 2, null, null, null, null, null, null)))


+  }



I would expect the correct results to be as follows instead of what is used 
above. Shouldn't we always return the first or last value in the partition 
based on the ordering? It looks something else is going on... can someone 
explain?

+  Seq(

+Row("a", 0, null, null, "x", null, null, "z"),

+Row("a", 1, null, null, "x", null, null, "z"),

+Row("a", 2, null, null, "x", null, null, "z"),

+Row("a", 3, null, null, "x", null, null, "z"),

+Row("a", 4, null, null, "x", null, null, "z"),

+Row("b", 1, null, null, null, null, null, null),

+Row("b", 2, null, null, null, null, null, null)))



From: Alexander Peletz [mailto:alexand...@slalom.com]
Sent: Wednesday, August 17, 2016 11:57 AM
To: user 
Subject: pyspark.sql.functions.last not working as expected

Hi,

I am using Spark 2.0 and I am getting unexpected results using the last() 
method. Has anyone else experienced this? I get the sense that last() is 
working correctly within a given data partition but not across the entire RDD. 
First() seems to work as expected so I can work around this by having a window 
that is in reverse order and use first() instead of last() but it would be 
great if last() actually worked.


Thanks,
Alexander


Alexander Peletz
Consultant

slalom

Fortune 100 Best Companies to Work For 2016
Glassdoor Best Places to Work 2016
Consulting Magazine Best Firms to Work For 2015

316 Stuart Street, Suite 300
Boston, MA 02116
706.614.5033 cell | 617.316.5400 office
alexand...@slalom.com



Re: How to combine two DStreams(pyspark)?

2016-08-17 Thread ayan guha
Wondering why are you creating separate dstreams? You should apply the
logic directly on input dstream
On 18 Aug 2016 06:40, "vidhan"  wrote:

> I have a *kafka* stream coming in with some input topic.
> This is the code i wrote for accepting *kafka* stream.
>
> *>>> conf = SparkConf().setAppName(appname)
> >>> sc = SparkContext(conf=conf)
> >>> ssc = StreamingContext(sc)
> >>> kvs = KafkaUtils.createDirectStream(ssc, topics,\
> {"metadata.broker.list": brokers})*
>
> Then I create two DStreams of the keys and values of the original stream.
>
> *>>> keys = kvs.map(lambda x: x[0].split(" "))
> >>> values = kvs.map(lambda x: x[1].split(" "))*
>
> Then I perform some computation in the values DStream.
> For Example,
> *>>> val = values.flatMap(lambda x: x*2)*
>
> Now, I need to combine the */keys/* and the */val/* *DStream* and return
> the
> result in the form of *Kafka* stream.
>
> How to combine val to the corressponding key?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-combine-two-DStreams-pyspark-tp27552.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Getting a TreeNode Exception while saving into Hadoop

2016-08-17 Thread max square
Thanks Harsh for the reply.

When I change the code to something like this -

  def saveAsLatest(df: DataFrame, fileSystem: FileSystem, bakDir: String) =
{

fileSystem.rename(new Path(bakDir + latest), new Path(bakDir + "/" +
ScalaUtil.currentDateTimeString))

fileSystem.create(new Path(bakDir + latest))


df.write.format("com.databricks.spark.csv").mode("Overwrite").options(Map("mode"
-> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir +
latest)

  }

It still shows the same error. Also, if I don't do the unionAll operation,
the operation is successful with the previous code.

Anything else that I should be checking?




On Wed, Aug 17, 2016 at 1:32 PM, HARSH TAKKAR  wrote:

> Hi
>
> I can see that exception is caused by following, csn you check where in
> your code you are using this path
>
> Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path
> does not exist: hdfs://testcluster:8020/experiments/vol/spark_chomp_
> data/bak/restaurants-bak/latest
>
> On Wed, 17 Aug 2016, 10:57 p.m. max square, 
> wrote:
>
>> /bump
>>
>> It'd be great if someone can point me to the correct direction.
>>
>> On Mon, Aug 8, 2016 at 5:07 PM, max square 
>> wrote:
>>
>>> Here's the complete stacktrace - https://gist.github.com/rohann/
>>> 649b0fcc9d5062ef792eddebf5a315c1
>>>
>>> For reference, here's the complete function -
>>>
>>>   def saveAsLatest(df: DataFrame, fileSystem: FileSystem, bakDir:
>>> String) = {
>>>
>>> fileSystem.rename(new Path(bakDir + latest), new Path(bakDir + "/"
>>> + ScalaUtil.currentDateTimeString))
>>>
>>> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
>>> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir
>>> + latest)
>>>
>>>   }
>>>
>>> On Mon, Aug 8, 2016 at 3:41 PM, Ted Yu  wrote:
>>>
 Mind showing the complete stack trace ?

 Thanks

 On Mon, Aug 8, 2016 at 12:30 PM, max square 
 wrote:

> Thanks Ted for the prompt reply.
>
> There are three or four DFs that are coming from various sources and
> I'm doing a unionAll on them.
>
> val placesProcessed = placesUnchanged.unionAll(place
> sAddedWithMerchantId).unionAll(placesUpdatedFromHotelsWithMerchantId
> ).unionAll(placesUpdatedFromRestaurantsWithMerchantId).unionAll(
> placesChanged)
>
> I'm using Spark 1.6.2.
>
> On Mon, Aug 8, 2016 at 3:11 PM, Ted Yu  wrote:
>
>> Can you show the code snippet for unionAll operation ?
>>
>> Which Spark release do you use ?
>>
>> BTW please use user@spark.apache.org in the future.
>>
>> On Mon, Aug 8, 2016 at 11:47 AM, max square 
>> wrote:
>>
>>> Hey guys,
>>>
>>> I'm trying to save Dataframe in CSV format after performing unionAll
>>> operations on it.
>>> But I get this exception -
>>>
>>> Exception in thread "main" org.apache.spark.sql.catalyst.
>>> errors.package$TreeNodeException: execute, tree:
>>> TungstenExchange hashpartitioning(mId#430,200)
>>>
>>> I'm saving it by
>>>
>>> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
>>> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir
>>> + latest)
>>>
>>> It works perfectly if I don't do the unionAll operation.
>>> I see that the format isn't different by printing the part of the
>>> results.
>>>
>>> Any help regarding this would be appreciated.
>>>
>>>
>>
>

>>>
>>


How to combine two DStreams(pyspark)?

2016-08-17 Thread vidhan
I have a *kafka* stream coming in with some input topic.
This is the code i wrote for accepting *kafka* stream.

*>>> conf = SparkConf().setAppName(appname)
>>> sc = SparkContext(conf=conf)
>>> ssc = StreamingContext(sc)
>>> kvs = KafkaUtils.createDirectStream(ssc, topics,\
{"metadata.broker.list": brokers})*

Then I create two DStreams of the keys and values of the original stream.

*>>> keys = kvs.map(lambda x: x[0].split(" "))
>>> values = kvs.map(lambda x: x[1].split(" "))*

Then I perform some computation in the values DStream.
For Example,
*>>> val = values.flatMap(lambda x: x*2)*

Now, I need to combine the */keys/* and the */val/* *DStream* and return the
result in the form of *Kafka* stream.

How to combine val to the corressponding key?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-combine-two-DStreams-pyspark-tp27552.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: VectorUDT with spark.ml.linalg.Vector

2016-08-17 Thread Michał Zieliński
I'm using Spark 1.6.2 for Vector-based UDAF and this works:

def inputSchema: StructType = new StructType().add("input", new VectorUDT())

Maybe it was made private in 2.0

On 17 August 2016 at 05:31, Alexey Svyatkovskiy 
wrote:

> Hi Yanbo,
>
> Thanks for your reply. I will keep an eye on that pull request.
> For now, I decided to just put my code inside org.apache.spark.ml to be
> able to access private classes.
>
> Thanks,
> Alexey
>
> On Tue, Aug 16, 2016 at 11:13 PM, Yanbo Liang  wrote:
>
>> It seams that VectorUDT is private and can not be accessed out of Spark
>> currently. It should be public but we need to do some refactor before make
>> it public. You can refer the discussion at https://github.com/apache/s
>> park/pull/12259 .
>>
>> Thanks
>> Yanbo
>>
>> 2016-08-16 9:48 GMT-07:00 alexeys :
>>
>>> I am writing an UDAF to be applied to a data frame column of type Vector
>>> (spark.ml.linalg.Vector). I rely on spark/ml/linalg so that I do not
>>> have to
>>> go back and forth between dataframe and RDD.
>>>
>>> Inside the UDAF, I have to specify a data type for the input, buffer, and
>>> output (as usual). VectorUDT is what I would use with
>>> spark.mllib.linalg.Vector:
>>> https://github.com/apache/spark/blob/master/mllib/src/main/s
>>> cala/org/apache/spark/mllib/linalg/Vectors.scala
>>>
>>> However, when I try to import it from spark.ml instead: import
>>> org.apache.spark.ml.linalg.VectorUDT
>>> I get a runtime error (no errors during the build):
>>>
>>> class VectorUDT in package linalg cannot be accessed in package
>>> org.apache.spark.ml.linalg
>>>
>>> Is it expected/can you suggest a workaround?
>>>
>>> I am using Spark 2.0.0
>>>
>>> Thanks,
>>> Alexey
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/VectorUDT-with-spark-ml-linalg-Vector-tp27542.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Spark ML : One hot Encoding for multiple columns

2016-08-17 Thread Nisha Muktewar
The OneHotEncoder does *not* accept multiple columns.

You can use Michal's suggestion where he uses Pipeline to set the stages
and then executes them.

The other option is to write a function that performs one hot encoding on a
column and returns a dataframe with the encoded column and then call it
multiple times for the rest of the columns.




On Wed, Aug 17, 2016 at 10:59 AM, janardhan shetty 
wrote:

> I had already tried this way :
>
> scala> val featureCols = Array("category","newone")
> featureCols: Array[String] = Array(category, newone)
>
> scala>  val indexer = new StringIndexer().setInputCol(
> featureCols).setOutputCol("categoryIndex").fit(df1)
> :29: error: type mismatch;
>  found   : Array[String]
>  required: String
> val indexer = new StringIndexer().setInputCol(
> featureCols).setOutputCol("categoryIndex").fit(df1)
>
>
> On Wed, Aug 17, 2016 at 10:56 AM, Nisha Muktewar 
> wrote:
>
>> I don't think it does. From the documentation:
>> https://spark.apache.org/docs/2.0.0-preview/ml-features.html
>> #onehotencoder, I see that it still accepts one column at a time.
>>
>> On Wed, Aug 17, 2016 at 10:18 AM, janardhan shetty <
>> janardhan...@gmail.com> wrote:
>>
>>> 2.0:
>>>
>>> One hot encoding currently accepts single input column is there a way to
>>> include multiple columns ?
>>>
>>
>>
>


Re: Undefined function json_array_to_map

2016-08-17 Thread vr spark
Hi Ted/All,
i did below to get fullstack and see below, not able to understand root
cause..

except Exception as error:

traceback.print_exc()

and this what i get...


 File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/context.py",
line 580, in sql

return DataFrame(self._ssql_ctx.sql(sqlQuery), self)

  File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
line 813, in __call__

answer, self.gateway_client, self.target_id, self.name)

  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
51, in deco

raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u'undefined function json_array_to_map; line 28 pos 73'

On Wed, Aug 17, 2016 at 8:59 AM, vr spark  wrote:

> spark 1.6.1
> python
>
> I0817 08:51:59.099356 15189 detector.cpp:481] A new leading master (UPID=
> master@10.224.167.25:5050) is detected
> I0817 08:51:59.099735 15188 sched.cpp:262] New master detected at
> master@x.y.17.25:4550
> I0817 08:51:59.100888 15188 sched.cpp:272] No credentials provided.
> Attempting to register without authentication
> I0817 08:51:59.326017 15190 sched.cpp:641] Framework registered with
> b859f266-9984-482d-8c0d-35bd88c1ad0a-6996
> 16/08/17 08:52:06 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording
> the schema version 1.2.0
> 16/08/17 08:52:06 WARN ObjectStore: Failed to get database default,
> returning NoSuchObjectException
> Traceback (most recent call last):
>   File "/data1/home/vttrich/spk/orig_qryhubb.py", line 17, in 
> res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date  >=
> 408910 limit 10")
>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/context.py",
> line 580, in sql
>   File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
> line 813, in __call__
>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 51, in deco
> pyspark.sql.utils.AnalysisException: u'undefined function
> json_array_to_map; line 28 pos 73'
> I0817 08:52:12.840224 15600 sched.cpp:1771] Asked to stop the driver
> I0817 08:52:12.841198 15189 sched.cpp:1040] Stopping framework
> 'b859f2f3-7484-482d-8c0d-35bd91c1ad0a-6326'
>
>
> On Wed, Aug 17, 2016 at 8:50 AM, Ted Yu  wrote:
>
>> Can you show the complete stack trace ?
>>
>> Which version of Spark are you using ?
>>
>> Thanks
>>
>> On Wed, Aug 17, 2016 at 8:46 AM, vr spark  wrote:
>>
>>> Hi,
>>> I am getting error on below scenario. Please suggest.
>>>
>>> i have  a virtual view in hive
>>>
>>> view name log_data
>>> it has 2 columns
>>>
>>> query_map   map
>>>
>>> parti_date int
>>>
>>>
>>> Here is my snippet for the spark data frame
>>>
>>> my dataframe
>>>
>>> res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date  >=
>>> 408910 limit 10")
>>>
>>> df=res.collect()
>>>
>>> print 'after collect'
>>>
>>> print df
>>>
>>>
>>> * File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>> line 51, in deco*
>>>
>>> *pyspark.sql.utils.AnalysisException: u'undefined function
>>> json_array_to_map; line 28 pos 73'*
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Spark ML : One hot Encoding for multiple columns

2016-08-17 Thread janardhan shetty
I had already tried this way :

scala> val featureCols = Array("category","newone")
featureCols: Array[String] = Array(category, newone)

scala>  val indexer = new
StringIndexer().setInputCol(featureCols).setOutputCol("categoryIndex").fit(df1)
:29: error: type mismatch;
 found   : Array[String]
 required: String
val indexer = new
StringIndexer().setInputCol(featureCols).setOutputCol("categoryIndex").fit(df1)


On Wed, Aug 17, 2016 at 10:56 AM, Nisha Muktewar  wrote:

> I don't think it does. From the documentation:
> https://spark.apache.org/docs/2.0.0-preview/ml-features.html#onehotencoder,
> I see that it still accepts one column at a time.
>
> On Wed, Aug 17, 2016 at 10:18 AM, janardhan shetty  > wrote:
>
>> 2.0:
>>
>> One hot encoding currently accepts single input column is there a way to
>> include multiple columns ?
>>
>
>


Re: Spark ML : One hot Encoding for multiple columns

2016-08-17 Thread Nisha Muktewar
I don't think it does. From the documentation:
https://spark.apache.org/docs/2.0.0-preview/ml-features.html#onehotencoder,
I see that it still accepts one column at a time.

On Wed, Aug 17, 2016 at 10:18 AM, janardhan shetty 
wrote:

> 2.0:
>
> One hot encoding currently accepts single input column is there a way to
> include multiple columns ?
>


Re: Spark ML : One hot Encoding for multiple columns

2016-08-17 Thread Michał Zieliński
You can it just map over your columns and create a pipeline:

val columns = Array("colA", "colB", "colC")
val transformers: Array[PipelineStage] = columns.map {
x => new OneHotEncoder().setInputCol(x).setOutputCol(x + "Encoded")
}
val pipeline = new Pipeline()
  .setStages(transformers)



On 17 August 2016 at 18:18, janardhan shetty  wrote:

> 2.0:
>
> One hot encoding currently accepts single input column is there a way to
> include multiple columns ?
>


Re: Getting a TreeNode Exception while saving into Hadoop

2016-08-17 Thread HARSH TAKKAR
Hi

I can see that exception is caused by following, csn you check where in
your code you are using this path

Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does
not exist:
hdfs://testcluster:8020/experiments/vol/spark_chomp_data/bak/restaurants-bak/latest

On Wed, 17 Aug 2016, 10:57 p.m. max square,  wrote:

> /bump
>
> It'd be great if someone can point me to the correct direction.
>
> On Mon, Aug 8, 2016 at 5:07 PM, max square 
> wrote:
>
>> Here's the complete stacktrace -
>> https://gist.github.com/rohann/649b0fcc9d5062ef792eddebf5a315c1
>>
>> For reference, here's the complete function -
>>
>>   def saveAsLatest(df: DataFrame, fileSystem: FileSystem, bakDir:
>> String) = {
>>
>> fileSystem.rename(new Path(bakDir + latest), new Path(bakDir + "/" +
>> ScalaUtil.currentDateTimeString))
>>
>> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
>> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir +
>> latest)
>>
>>   }
>>
>> On Mon, Aug 8, 2016 at 3:41 PM, Ted Yu  wrote:
>>
>>> Mind showing the complete stack trace ?
>>>
>>> Thanks
>>>
>>> On Mon, Aug 8, 2016 at 12:30 PM, max square 
>>> wrote:
>>>
 Thanks Ted for the prompt reply.

 There are three or four DFs that are coming from various sources and
 I'm doing a unionAll on them.

 val placesProcessed = placesUnchanged.unionAll(
 placesAddedWithMerchantId).unionAll(
 placesUpdatedFromHotelsWithMerchantId).unionAll(
 placesUpdatedFromRestaurantsWithMerchantId).unionAll(placesChanged)

 I'm using Spark 1.6.2.

 On Mon, Aug 8, 2016 at 3:11 PM, Ted Yu  wrote:

> Can you show the code snippet for unionAll operation ?
>
> Which Spark release do you use ?
>
> BTW please use user@spark.apache.org in the future.
>
> On Mon, Aug 8, 2016 at 11:47 AM, max square 
> wrote:
>
>> Hey guys,
>>
>> I'm trying to save Dataframe in CSV format after performing unionAll
>> operations on it.
>> But I get this exception -
>>
>> Exception in thread "main"
>> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
>> tree:
>> TungstenExchange hashpartitioning(mId#430,200)
>>
>> I'm saving it by
>>
>> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
>> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir
>> + latest)
>>
>> It works perfectly if I don't do the unionAll operation.
>> I see that the format isn't different by printing the part of the
>> results.
>>
>> Any help regarding this would be appreciated.
>>
>>
>

>>>
>>
>


Re: Getting a TreeNode Exception while saving into Hadoop

2016-08-17 Thread max square
/bump

It'd be great if someone can point me to the correct direction.

On Mon, Aug 8, 2016 at 5:07 PM, max square  wrote:

> Here's the complete stacktrace - https://gist.github.com/rohann/
> 649b0fcc9d5062ef792eddebf5a315c1
>
> For reference, here's the complete function -
>
>   def saveAsLatest(df: DataFrame, fileSystem: FileSystem, bakDir: String)
> = {
>
> fileSystem.rename(new Path(bakDir + latest), new Path(bakDir + "/" +
> ScalaUtil.currentDateTimeString))
>
> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir +
> latest)
>
>   }
>
> On Mon, Aug 8, 2016 at 3:41 PM, Ted Yu  wrote:
>
>> Mind showing the complete stack trace ?
>>
>> Thanks
>>
>> On Mon, Aug 8, 2016 at 12:30 PM, max square 
>> wrote:
>>
>>> Thanks Ted for the prompt reply.
>>>
>>> There are three or four DFs that are coming from various sources and I'm
>>> doing a unionAll on them.
>>>
>>> val placesProcessed = placesUnchanged.unionAll(placesAddedWithMerchantId
>>> ).unionAll(placesUpdatedFromHotelsWithMerchantId).unionAll(placesUpdat
>>> edFromRestaurantsWithMerchantId).unionAll(placesChanged)
>>>
>>> I'm using Spark 1.6.2.
>>>
>>> On Mon, Aug 8, 2016 at 3:11 PM, Ted Yu  wrote:
>>>
 Can you show the code snippet for unionAll operation ?

 Which Spark release do you use ?

 BTW please use user@spark.apache.org in the future.

 On Mon, Aug 8, 2016 at 11:47 AM, max square 
 wrote:

> Hey guys,
>
> I'm trying to save Dataframe in CSV format after performing unionAll
> operations on it.
> But I get this exception -
>
> Exception in thread "main" org.apache.spark.sql.catalyst.
> errors.package$TreeNodeException: execute, tree:
> TungstenExchange hashpartitioning(mId#430,200)
>
> I'm saving it by
>
> df.write.format("com.databricks.spark.csv").options(Map("mode" ->
> "DROPMALFORMED", "delimiter" -> "\t", "header" -> "true")).save(bakDir
> + latest)
>
> It works perfectly if I don't do the unionAll operation.
> I see that the format isn't different by printing the part of the
> results.
>
> Any help regarding this would be appreciated.
>
>

>>>
>>
>


error when running spark from oozie launcher

2016-08-17 Thread tkg_cangkul

hi i try to submit job spark with oozie. but i've got one problem here.
when i submit the same job. sometimes my job succeed but sometimes my 
job was failed.


i've got this error message when the job was failed :

org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V

anyone can help me to solve this? i've try to set -XX:MaxPermSize=512m  
-XX:PermSize=256m in spark.driver.extraJavaOptions properties but this 
not help enough for me.


Extract year from string format of date

2016-08-17 Thread Selvam Raman
Spark Version : 1.5.0


Record:
01-Jan-16

Expected Result:
2016

I used the below code which is shared in user group.

from_unixtime(unix_timestamp($"Creation Date","dd-MMM-yy"),""))

is this right approach or do we have any other approach.

NOTE:
i tried *year() *function but it gives only null values for the string same
for *to_date()* function.
-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark ML : One hot Encoding for multiple columns

2016-08-17 Thread janardhan shetty
2.0:

One hot encoding currently accepts single input column is there a way to
include multiple columns ?


[Community] Python support added to Spark Job Server

2016-08-17 Thread Evan Chan
Hi folks,

Just a friendly message that we have added Python support to the REST
Spark Job Server project.   If you are a Python user looking for a
RESTful way to manage your Spark jobs, please come have a look at our
project!


https://github.com/spark-jobserver/spark-jobserver

-Evan

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: UDF in SparkR

2016-08-17 Thread Yann-Aël Le Borgne
I experienced very slow execution time

http://stackoverflow.com/questions/38803546/spark-r-2-0-dapply-very-slow

and wondering why...

On Wed, Aug 17, 2016 at 1:12 PM, Felix Cheung 
wrote:

> This is supported in Spark 2.0.0 as dapply and gapply. Please see the API
> doc:
> https://spark.apache.org/docs/2.0.0/api/R/
>
> Feedback welcome and appreciated!
>
>
> _
> From: Yogesh Vyas 
> Sent: Tuesday, August 16, 2016 11:39 PM
> Subject: UDF in SparkR
> To: user 
>
>
>
> Hi,
>
> Is there is any way of using UDF in SparkR ?
>
> Regards,
> Yogesh
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>


-- 
=
Yann-Aël Le Borgne
Machine Learning Group
Université Libre de Bruxelles

http://mlg.ulb.ac.be
http://www.ulb.ac.be/di/map/yleborgn
=


Re: Attempting to accept an unknown offer

2016-08-17 Thread vr spark
My code is very simple, if i use other hive tables, my code works fine. This
 particular table (virtual view) is huge and might have  more metadata.

It has only two columns.

virtual view name is : cluster_table

# col_namedata_type

 ln   string

 parti  int


here is snippet...

from pyspark import SparkConf, SparkContext

from pyspark.sql import HiveContext

import pyspark.sql

import json

myconf=SparkConf().setAppName("sql")

spcont=SparkContext(conf=myconf)

sqlcont=HiveContext(spcont)

res=sqlcont.sql("select  parti FROM h.cluster_table WHERE parti > 408910
and parti <408911  limit 10")

print res.printSchema()

print 'res'

print res

df=res.collect()

print 'after collect'

print df


Here is the ouput after i submit the job

I0817 09:18:40.606465 31409 sched.cpp:262] New master detected at
master@x.y.17.56:6750

I0817 09:18:40.607461 31409 sched.cpp:272] No credentials provided.
Attempting to register without authentication

I0817 09:18:40.612763 31409 sched.cpp:641] Framework registered with
b859f2f3-7484-482d-8c0d-35bd91c1ad0a-6336

16/08/17 09:18:57 WARN ObjectStore: Version information not found in
metastore. hive.metastore.schema.verification is not enabled so recording
the schema version 1.2.0

16/08/17 09:18:57 WARN ObjectStore: Failed to get database default,
returning NoSuchObjectException

root

 |-- parti: integer (nullable = true)


None

res

DataFrame[partition_epoch_hourtenth: int]

2016-08-17 09:19:20,648:31315(0x7fafebfb1700):ZOO_WARN@zookeeper_interest@1557:
Exceeded deadline by 19ms

2016-08-17 09:19:30,662:31315(0x7fafebfb1700):ZOO_WARN@zookeeper_interest@1557:
Exceeded deadline by 13ms

W0817 09:20:01.715824 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676564

W0817 09:20:01.716455 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676630

W0817 09:20:01.716645 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676713

W0817 09:20:01.724409 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676554

W0817 09:20:01.724728 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676555

W0817 09:20:01.724936 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676556

W0817 09:20:01.725126 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676557

W0817 09:20:01.725309 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676558.

and many more lines like this on the screen with similar message



On Wed, Aug 17, 2016 at 9:08 AM, Ted Yu  wrote:

> Please include user@ in your reply.
>
> Can you reveal the snippet of hive sql ?
>
> On Wed, Aug 17, 2016 at 9:04 AM, vr spark  wrote:
>
>> spark 1.6.1
>> mesos
>> job is running for like 10-15 minutes and giving this message and i
>> killed it.
>>
>> In this job, i am creating data frame from a hive sql. There are other
>> similar jobs which work fine
>>
>> On Wed, Aug 17, 2016 at 8:52 AM, Ted Yu  wrote:
>>
>>> Can you provide more information ?
>>>
>>> Were you running on YARN ?
>>> Which version of Spark are you using ?
>>>
>>> Was your job failing ?
>>>
>>> Thanks
>>>
>>> On Wed, Aug 17, 2016 at 8:46 AM, vr spark  wrote:
>>>

 W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an
 unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492

 W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an
 unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493

 W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an
 unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494

 W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an
 unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495

 W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an
 unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496

 W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an
 unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497

 W0816 23:17:01.985801 16360 sched.cpp:1195] Attempting to accept an
 unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910498

 W0816 23:17:01.985961 16360 sched.cpp:1195] Attempting to accept an
 unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910499

 W0816 23:17:01.986121 16360 sched.cpp:1195] Attempting to accept an
 unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910500

 2016-08-16 23:18:41,877:16226(0x7f71271b6700):ZOO_WARN@zookeeper_intere
 st@1557: Exceeded 

Re: Attempting to accept an unknown offer

2016-08-17 Thread Ted Yu
Please include user@ in your reply.

Can you reveal the snippet of hive sql ?

On Wed, Aug 17, 2016 at 9:04 AM, vr spark  wrote:

> spark 1.6.1
> mesos
> job is running for like 10-15 minutes and giving this message and i killed
> it.
>
> In this job, i am creating data frame from a hive sql. There are other
> similar jobs which work fine
>
> On Wed, Aug 17, 2016 at 8:52 AM, Ted Yu  wrote:
>
>> Can you provide more information ?
>>
>> Were you running on YARN ?
>> Which version of Spark are you using ?
>>
>> Was your job failing ?
>>
>> Thanks
>>
>> On Wed, Aug 17, 2016 at 8:46 AM, vr spark  wrote:
>>
>>>
>>> W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an
>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492
>>>
>>> W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an
>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493
>>>
>>> W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an
>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494
>>>
>>> W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an
>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495
>>>
>>> W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an
>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496
>>>
>>> W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an
>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497
>>>
>>> W0816 23:17:01.985801 16360 sched.cpp:1195] Attempting to accept an
>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910498
>>>
>>> W0816 23:17:01.985961 16360 sched.cpp:1195] Attempting to accept an
>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910499
>>>
>>> W0816 23:17:01.986121 16360 sched.cpp:1195] Attempting to accept an
>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910500
>>>
>>> 2016-08-16 23:18:41,877:16226(0x7f71271b6700):ZOO_WARN@zookeeper_intere
>>> st@1557: Exceeded deadline by 13ms
>>>
>>> 2016-08-16 23:21:12,007:16226(0x7f71271b6700):ZOO_WARN@zookeeper_intere
>>> st@1557: Exceeded deadline by 11ms
>>>
>>>
>>>
>>>
>>
>


pyspark.sql.functions.last not working as expected

2016-08-17 Thread Alexander Peletz
Hi,

I am using Spark 2.0 and I am getting unexpected results using the last() 
method. Has anyone else experienced this? I get the sense that last() is 
working correctly within a given data partition but not across the entire RDD. 
First() seems to work as expected so I can work around this by having a window 
that is in reverse order and use first() instead of last() but it would be 
great if last() actually worked.


Thanks,
Alexander


Alexander Peletz
Consultant

slalom

Fortune 100 Best Companies to Work For 2016
Glassdoor Best Places to Work 2016
Consulting Magazine Best Firms to Work For 2015

316 Stuart Street, Suite 300
Boston, MA 02116
706.614.5033 cell | 617.316.5400 office
alexand...@slalom.com



Re: Spark DF CacheTable method. Will it save data to disk?

2016-08-17 Thread neil90
>From the spark
documentation(http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)
yes you can use persist on a dataframe instead of cache. All cache is, is a
shorthand for the default persist storage level "MEMORY_ONLY". If you want
to persist the dataframe to disk you should do
dataframe.persist(StorageLevel.DISK_ONLY). 

IMO If reads are expensive against the DB and your afraid of failure why not
just save the data as a parquet on your cluster in hive and read from there?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DF-CacheTable-method-Will-it-save-data-to-disk-tp27533p27551.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Attempting to accept an unknown offer

2016-08-17 Thread vr spark
W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492

W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493

W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494

W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495

W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496

W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497

W0816 23:17:01.985801 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910498

W0816 23:17:01.985961 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910499

W0816 23:17:01.986121 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910500

2016-08-16 23:18:41,877:16226(0x7f71271b6700):ZOO_WARN@
zookeeper_interest@1557: Exceeded deadline by 13ms

2016-08-16 23:21:12,007:16226(0x7f71271b6700):ZOO_WARN@
zookeeper_interest@1557: Exceeded deadline by 11ms


Re: Attempting to accept an unknown offer

2016-08-17 Thread Ted Yu
Can you provide more information ?

Were you running on YARN ?
Which version of Spark are you using ?

Was your job failing ?

Thanks

On Wed, Aug 17, 2016 at 8:46 AM, vr spark  wrote:

>
> W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492
>
> W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493
>
> W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494
>
> W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495
>
> W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496
>
> W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497
>
> W0816 23:17:01.985801 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910498
>
> W0816 23:17:01.985961 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910499
>
> W0816 23:17:01.986121 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910500
>
> 2016-08-16 23:18:41,877:16226(0x7f71271b6700):ZOO_WARN@zookeeper_
> interest@1557: Exceeded deadline by 13ms
>
> 2016-08-16 23:21:12,007:16226(0x7f71271b6700):ZOO_WARN@zookeeper_
> interest@1557: Exceeded deadline by 11ms
>
>
>
>


Re: Undefined function json_array_to_map

2016-08-17 Thread Ted Yu
Can you show the complete stack trace ?

Which version of Spark are you using ?

Thanks

On Wed, Aug 17, 2016 at 8:46 AM, vr spark  wrote:

> Hi,
> I am getting error on below scenario. Please suggest.
>
> i have  a virtual view in hive
>
> view name log_data
> it has 2 columns
>
> query_map   map
>
> parti_date int
>
>
> Here is my snippet for the spark data frame
>
> my dataframe
>
> res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date  >=
> 408910 limit 10")
>
> df=res.collect()
>
> print 'after collect'
>
> print df
>
>
> * File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 51, in deco*
>
> *pyspark.sql.utils.AnalysisException: u'undefined function
> json_array_to_map; line 28 pos 73'*
>
>
>
>
>


Undefined function json_array_to_map

2016-08-17 Thread vr spark
Hi,
I am getting error on below scenario. Please suggest.

i have  a virtual view in hive

view name log_data
it has 2 columns

query_map   map

parti_date int


Here is my snippet for the spark data frame

my dataframe

res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date  >=
408910 limit 10")

df=res.collect()

print 'after collect'

print df


* File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
51, in deco*

*pyspark.sql.utils.AnalysisException: u'undefined function
json_array_to_map; line 28 pos 73'*


Aggregations with scala pairs

2016-08-17 Thread Andrés Ivaldi
Hello, I'd like to report a wrong behavior of DataSet's API, I don´t know
how I can do that. My Jira account doesn't allow me to add a Issue

I'm using Apache 2.0.0 but the problem came since at least version 1.4
(given the doc since 1.3)

The problem is simple to reporduce, also the work arround, if we apply agg
over a DataSet with scala pairs over the same column, only one agg over
that column is actualy used, this is because the toMap that reduce the pair
values of the mane key to one and overwriting the value

class
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala


 def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
> = {
> agg((aggExpr +: aggExprs).toMap)
>   }


rewrited as somthing like this should work
 def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
= {
   toDF((aggExpr +: aggExprs).map { pairExpr =>
  strToExpr(pairExpr._2)(df(pairExpr._1).expr)
}.toSeq)
}


regards
-- 
Ing. Ivaldi Andres


Re: UDF in SparkR

2016-08-17 Thread Felix Cheung
This is supported in Spark 2.0.0 as dapply and gapply. Please see the API doc:
https://spark.apache.org/docs/2.0.0/api/R/

Feedback welcome and appreciated!


_
From: Yogesh Vyas >
Sent: Tuesday, August 16, 2016 11:39 PM
Subject: UDF in SparkR
To: user >


Hi,

Is there is any way of using UDF in SparkR ?

Regards,
Yogesh

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





How to implement a InputDStream like the twitter stream in Spark?

2016-08-17 Thread Xi Shen
Hi,

First I am not sure if I should inherit from InputDStream, or
ReceiverInputDStream. For ReceiverInputDStream, why would I want to run a
receiver on each worker nodes?

If I want to inherit InputDStream, what should I do in the comput() method?

-- 


Thanks,
David S.


Spark standalone or Yarn for resourcing

2016-08-17 Thread Ashok Kumar
Hi,
for small to medium size clusters I think Spark Standalone mode is a good 
choice.
We are contemplating moving to Yarn as our cluster grows. 
What are the pros and cons of using either please. Which one offers the best
Thanking you

Spark MLlib question: load model failed with exception:org.json4s.package$MappingException: Did not find value which can be converted into java.lang.String

2016-08-17 Thread luohui20001
Hello guys: I have a problem in loading recommend model. I have 2 models, 
one is good(able to get recommend result) and another is not working. I checked 
these 2 models, both are  MatrixFactorizationModel object. But in the metadata, 
one is a PipelineModel and another is a MatrixFactorizationModel. Is below 
exception caused by this?
here are my stack trace:Exception in thread "main" 
org.json4s.package$MappingException: Did not find value which can be converted 
into java.lang.String
at org.json4s.reflect.package$.fail(package.scala:96)
at org.json4s.Extraction$.convert(Extraction.scala:554)
at org.json4s.Extraction$.extract(Extraction.scala:331)
at org.json4s.Extraction$.extract(Extraction.scala:42)
at 
org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
at 
org.apache.spark.mllib.util.Loader$.loadMetadata(modelSaveLoad.scala:131)
at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel$.load(MatrixFactorizationModel.scala:330)
at 
org.brave.spark.ml.RecommandForMultiUsers$.main(RecommandForMultiUsers.scala:55)
at 
org.brave.spark.ml.RecommandForMultiUsers.main(RecommandForMultiUsers.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The attached files are my codes, FYI.
RecommandForMultiUsers.scala:55 is :val model = 
MatrixFactorizationModel.load(sc, modelpath)



 

ThanksBest regards!
San.Luo


codes to generate the bad model.scala
Description: Binary data


codes to generate the good model.scala
Description: Binary data


codes to load the model and generate the recommend result.scala
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Change nullable property in Dataset schema

2016-08-17 Thread Kazuaki Ishizaki
Thank you for your comments

> You should just Seq(...).toDS
I tried this, however the result is not changed.

>> val ds2 = ds1.map(e => e)
> Why are you e => e (since it's identity) and does nothing?
Yes, e => e does nothing. For the sake of simplicity of an example, I used 
the simplest expression in map(). In current Spark, an expression in map() 
does not change an schema for its output.

>   .as(RowEncoder(new StructType()
>  .add("value", ArrayType(IntegerType, false), nullable = 
false)))
Sorry, this was my mistake. It did not work for my purpose. It actually 
does nothing.

Kazuaki Ishizaki



From:   Jacek Laskowski 
To: Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc: user 
Date:   2016/08/15 04:56
Subject:Re: Change nullable property in Dataset schema



On Wed, Aug 10, 2016 at 12:04 AM, Kazuaki Ishizaki  
wrote:

>   import testImplicits._
>   test("test") {
> val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2),
> Array(3, 3)), 1).toDS

You should just Seq(...).toDS

> val ds2 = ds1.map(e => e)

Why are you e => e (since it's identity) and does nothing?

>   .as(RowEncoder(new StructType()
>  .add("value", ArrayType(IntegerType, false), nullable = 
false)))

I didn't know it's possible but looks like it's toDF where you could
replace the schema too (in a less involved way).

I learnt quite a lot from just a single email. Thanks!

Pozdrawiam,
Jacek

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org






UDF in SparkR

2016-08-17 Thread Yogesh Vyas
Hi,

Is there is any way of using UDF in SparkR ?

Regards,
Yogesh

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Change nullable property in Dataset schema

2016-08-17 Thread Kazuaki Ishizaki
My motivation is to simplify Java code generated by a compiler of 
Tungsten.

Here is a dump of generated code from the program.
https://gist.github.com/kiszk/402bd8bc45a14be29acb3674ebc4df24

If we can succeeded to let catalyst the result of map is never null, we 
can eliminate conditional branches.
For example, in the above URL, we can say the condition at line 45 is 
always false since the result of map() is never null by using our schema. 
As a result, we can eliminate assignments at lines 52 and 56, and 
conditional branches at lines 55 and 61.

Kazuaki Ishizaki



From:   Koert Kuipers 
To: Kazuaki Ishizaki/Japan/IBM@IBMJP
Cc: "user@spark.apache.org" 
Date:   2016/08/16 04:35
Subject:Re: Change nullable property in Dataset schema



why do you want the array to have nullable = false? what is the benefit?

On Wed, Aug 3, 2016 at 10:45 AM, Kazuaki Ishizaki  
wrote:
Dear all,
Would it be possible to let me know how to change nullable property in 
Dataset?

When I looked for how to change nullable property in Dataframe schema, I 
found the following approaches.
http://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe

https://github.com/apache/spark/pull/13873(Not merged yet)

However, I cannot find how to change nullable property in Dataset schema. 
Even when I wrote the following program, nullable property for "value: 
array" in ds2.schema is not changed.
If my understanding is correct, current Spark 2.0 uses an 
ExpressionEncoder that is generated based on Dataset[T] at 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L46


class Test extends QueryTest with SharedSQLContext {
  import testImplicits._
  test("test") {
val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), 
Array(3, 3)), 1).toDS
val schema = new StructType().add("array", ArrayType(IntegerType, 
false), false)
val inputObject = BoundReference(0, 
ScalaReflection.dataTypeFor[Array[Int]], false)
val encoder = new ExpressionEncoder[Array[Int]](schema, true,
  ScalaReflection.serializerFor[Array[Int]](inputObject).flatten,
  ScalaReflection.deserializerFor[Array[Int]],
  ClassTag[Array[Int]](classOf[Array[Int]]))
val ds2 = ds1.map(e => e)(encoder)
ds1.printSchema
ds2.printSchema
  }
}

root
 |-- value: array (nullable = true)
 ||-- element: integer (containsNull = false)

root
 |-- value: array (nullable = true) // Expected 
(nullable = false)
 ||-- element: integer (containsNull = false)


Kazuaki Ishizaki





Re: [SQL] Why does spark.read.csv.cache give me a WARN about cache but not text?!

2016-08-17 Thread Jacek Laskowski
Hi Michael,

Thanks a lot for your help. See below explains for csv and text. Do
you see anything worth investigating?

scala> spark.read.csv("people.csv").cache.explain(extended = true)
== Parsed Logical Plan ==
Relation[_c0#39,_c1#40,_c2#41,_c3#42] csv

== Analyzed Logical Plan ==
_c0: string, _c1: string, _c2: string, _c3: string
Relation[_c0#39,_c1#40,_c2#41,_c3#42] csv

== Optimized Logical Plan ==
InMemoryRelation [_c0#39, _c1#40, _c2#41, _c3#42], true, 1,
StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *FileScan csv [_c0#39,_c1#40,_c2#41,_c3#42] Batched: false,
Format: CSV, InputPaths: file:/Users/jacek/dev/oss/spark/people.csv,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<_c0:string,_c1:string,_c2:string,_c3:string>

== Physical Plan ==
InMemoryTableScan [_c0#39, _c1#40, _c2#41, _c3#42]
   +- InMemoryRelation [_c0#39, _c1#40, _c2#41, _c3#42], true, 1,
StorageLevel(disk, memory, deserialized, 1 replicas)
 +- *FileScan csv [_c0#39,_c1#40,_c2#41,_c3#42] Batched:
false, Format: CSV, InputPaths:
file:/Users/jacek/dev/oss/spark/people.csv, PartitionFilters: [],
PushedFilters: [], ReadSchema:
struct<_c0:string,_c1:string,_c2:string,_c3:string>


scala> spark.read.text("people.csv").cache.explain(extended = true)
== Parsed Logical Plan ==
Relation[value#24] text

== Analyzed Logical Plan ==
value: string
Relation[value#24] text

== Optimized Logical Plan ==
InMemoryRelation [value#24], true, 1, StorageLevel(disk, memory,
deserialized, 1 replicas)
   +- *FileScan text [value#24] Batched: false, Format:
org.apache.spark.sql.execution.datasources.text.TextFileFormat@262e2c8c,
InputPaths: file:/Users/jacek/dev/oss/spark/people.csv,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct

== Physical Plan ==
InMemoryTableScan [value#24]
   +- InMemoryRelation [value#24], true, 1, StorageLevel(disk,
memory, deserialized, 1 replicas)
 +- *FileScan text [value#24] Batched: false, Format:
org.apache.spark.sql.execution.datasources.text.TextFileFormat@262e2c8c,
InputPaths: file:/Users/jacek/dev/oss/spark/people.csv,
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct

The only thing I could find "interesting" is that TextFileFormat does
not print TEXT as CSV does. Anything special you see?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Aug 16, 2016 at 7:24 PM, Michael Armbrust
 wrote:
> try running explain on each of these.  my guess would be caching in broken
> in some cases.
>
> On Tue, Aug 16, 2016 at 6:05 PM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> Can anyone explain why spark.read.csv("people.csv").cache.show ends up
>> with a WARN while spark.read.text("people.csv").cache.show does not?
>> It happens in 2.0 and today's build.
>>
>> scala> sc.version
>> res5: String = 2.1.0-SNAPSHOT
>>
>> scala> spark.read.csv("people.csv").cache.show
>> +-+-+---++
>> |  _c0|  _c1|_c2| _c3|
>> +-+-+---++
>> |kolumna 1|kolumna 2|kolumn3|size|
>> |Jacek| Warszawa| Polska|  40|
>> +-+-+---++
>>
>> scala> spark.read.csv("people.csv").cache.show
>> 16/08/16 18:01:52 WARN CacheManager: Asked to cache already cached data.
>> +-+-+---++
>> |  _c0|  _c1|_c2| _c3|
>> +-+-+---++
>> |kolumna 1|kolumna 2|kolumn3|size|
>> |Jacek| Warszawa| Polska|  40|
>> +-+-+---++
>>
>> scala> spark.read.text("people.csv").cache.show
>> ++
>> |   value|
>> ++
>> |kolumna 1,kolumna...|
>> |Jacek,Warszawa,Po...|
>> ++
>>
>> scala> spark.read.text("people.csv").cache.show
>> ++
>> |   value|
>> ++
>> |kolumna 1,kolumna...|
>> |Jacek,Warszawa,Po...|
>> ++
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org