Authorization Support(on all operations not only DDL) in Spark Sql

2016-04-26 Thread our...@cnsuning.com
hi rxin,  
Will   Spark sql   Support  Authorization  not only DDL ? 
 In my user case ,a  hive table  was granted  read to  userA and other 
user don't have permission to read  , but userB can read this hive table using 
spark sql.

 











Ricky  Ou







spark-sql[1.4.0] not compatible hive sql when using in with date_sub or regexp_replace

2016-01-25 Thread our...@cnsuning.com
hi , all
when migrating hive sql to spark sql  encountor  a  incompatibility 
problem . Please give me some suggestions.


hive table description and data format as following :
1
use spark;
drop table spark.test_or1;
CREATE TABLE `spark.test_or1`( 
`statis_date` string, 
`lbl_nm` string)  row format delimited fields terminated by ',';; 

example data data.txt:
20160110 , item_XX_tab_buy03
20160114  , item_XX_tab_buy01   
20160115 , item_XX_tab_buy11
20160118 , item_XX_tab_buy01
20160101 , item_XX_tab_buy01
20160102 , item_XX_tab_buy03
20160103 , item_XX_tab_buy04

load data local inpath 'data.txt' into table  spark.test_or1; 


when execute this hive command in spark-sql(1.4.0)   encountor 
UnresolvedException: Invalid call to dataType on unresolved object, tree: 
'ta.lbl_nm . However ,this command can be executed succussfully in hive-shell;
select case when ta.lbl_nm in ('item_XX_tab_buy03','item_XX_gmq_buy01') 
then 1 
else 0 
end as  defaultVaule
from spark.test_or1 ta 
where ta.statis_date <= '20160118' 
and ta.statis_date > 
regexp_replace(date_sub(from_unixtime(to_unix_timestamp('20160118', 
'MMdd'), '-MM-dd'), 20), '-', '')  limit 10 ;  



this command also has errors : UnresolvedException: Invalid call to dataType on 
unresolved object, tree: 'ta.lbl_nm . However ,this command can be executed 
succussfully in hive-shell;
select case when ta.lbl_nm in ('item_XX_tab_buy03','item_XX_gmq_buy01') 
then 1 
else 0 
end as  defaultVaule
from spark.test_or1 ta 
where ta.statis_date <= '20160118' 
and ta.statis_date > date_sub(from_unixtime(to_unix_timestamp('20160118', 
'MMdd'), 'MMdd'), 20)  limit 10 ;  


this command also has errors : UnresolvedException: Invalid call to dataType on 
unresolved object, tree: 'ta.lbl_nm . However ,this command can be executed 
succussfully in hive-shell
select case when ta.lbl_nm in ('item_XX_tab_buy03','item_XX_gmq_buy01') 
then 1 
else 0 
end as  defaultVaule
from spark.test_or1 ta 
where ta.statis_date <= '20160118' 
and ta.statis_date > regexp_replace(2016-01-18, '-', '')  limit 10 ;   


while change `in`  to `==` , this command can be executed succussfully in 
hive-shell and spark sql 
select case when ta.lbl_nm='item_XX_tab_buy03' and ta.lbl_nm = 
'item_XX_gmq_buy01' 
then 1 
else 0 
end as  defaultVaule
from spark.test_or1 ta 
where ta.statis_date <= '20160118' 
and ta.statis_date > 
regexp_replace(date_sub(from_unixtime(to_unix_timestamp('20160118', 
'MMdd'), '-MM-dd'), 20), '-', '')  limit 10 ;  


spark-sql> 
> 
> select case when ta.lbl_nm='item_XX_tab_buy03' and ta.lbl_nm = 
> 'item_XX_gmq_buy01' 
> then 1 
> else 0 
> end as defaultVaule 
> from spark.test_or1 ta 
> where ta.statis_date <= '20160118' 
> and ta.statis_date > 
> regexp_replace(date_sub(from_unixtime(to_unix_timestamp('20160118', 
> 'MMdd'), '-MM-dd'), 20), '-', '') limit 10 ; 
0 
0 
0 
0 
0 
0 
Time taken: 3.725 seconds, Fetched 6 row(s)


detail error log
16/01/26 11:10:36 INFO ParseDriver: Parse Completed 
16/01/26 11:10:36 INFO HiveMetaStore: 0: get_table : db=spark tbl=test_or1 
16/01/26 11:10:36 INFO audit: ugi=spark ip=unknown-ip-addr cmd=get_table : 
db=spark tbl=test_or1 
16/01/26 11:10:37 ERROR SparkSQLDriver: Failed in [ select case when ta.lbl_nm 
in ('item_XX_tab_buy03','item_XX_gmq_buy01') 
then 1 
else 0 
end as defaultVaule 
from spark.test_or1 ta 
where ta.statis_date <= '20160118' 
and ta.statis_date > 
regexp_replace(date_sub(from_unixtime(to_unix_timestamp('20160118', 
'MMdd'), '-MM-dd'), 20), '-', '') limit 10 ] 
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
dataType on unresolved object, tree: 'ta.lbl_nm 
at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:59)
 
at 
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5$$anonfun$applyOrElse$15.apply(HiveTypeCoercion.scala:299)
 
at 
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5$$anonfun$applyOrElse$15.apply(HiveTypeCoercion.scala:299)
 
at 
scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:80) 
at scala.collection.immutable.List.exists(List.scala:84) 
at 
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5.applyOrElse(HiveTypeCoercion.scala:299)
 
at 
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5.applyOrElse(HiveTypeCoercion.scala:298)
 
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
 
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
 
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
 
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:221) 
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$7.apply(TreeNode.scala:261)
 
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(Trav

Re: Re: --driver-java-options not support multiple JVM configuration ?

2016-01-20 Thread our...@cnsuning.com
Marcelo, 
error also exists  with quotes around "$sparkdriverextraJavaOptions":
Unrecognized VM option 
'newsize=2096m,-XX:MaxPermSize=512m,-XX:+PrintGCDetails,-XX:+PrintGCTimeStamps,-XX:+UseParNewGC,-XX:+UseConcMarkSweepGC,-XX:CMSInitiatingOccupancyFraction=80,-XX:GCTimeLimit=5,-XX:GCHeapFreeLimit=95'


 
From: Marcelo Vanzin
Date: 2016-01-21 12:09
To: our...@cnsuning.com
CC: user
Subject: Re: --driver-java-options not support multiple JVM configuration ?
On Wed, Jan 20, 2016 at 7:38 PM, our...@cnsuning.com
 wrote:
> --driver-java-options $sparkdriverextraJavaOptions \
 
You need quotes around "$sparkdriverextraJavaOptions".
 
-- 
Marcelo
 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
 


--driver-java-options not support multiple JVM configuration ?

2016-01-20 Thread our...@cnsuning.com
hi all;
 --driver-java-options not support multiple JVM configuration.

the submot as following:

Cores=16 
sparkdriverextraJavaOptions="-XX:newsize=2096m -XX:MaxPermSize=512m 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseP 
arNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=80 
-XX:GCTimeLimit=5 -XX:GCHeapFreeLimit=95" 
main1=com.suning.spark.streaming.ppsc.RecommendBasedShoppingCart 
spark-submit --deploy-mode cluster \ 
--total-executor-cores $Cores \ 
--executor-memory 8g \ 
--driver-memory 16g \ 
--conf spark.driver.cores=4 \ 
--driver-java-options $sparkdriverextraJavaOptions \ 
--class $main1 \ 
hdfs:///user/bdapp/$appjars 

error :
  Error: Unrecognized option '-XX:MaxPermSize=512m  ;


when change to :

sparkdriverextraJavaOptions="-XX:newsize=2096m,-XX:MaxPermSize=512m,-XX:+PrintGCDetails,-XX:+PrintGCTimeStamps,-XX:+UseParNewGC,-XX:+UseConcMarkSweepGC,-XX:CMSInitiatingOccupancyFraction=80,-XX:GCTimeLimit=5,-XX:GCHeapFreeLimit=95"

the driver errors is :
Unrecognized VM option 
'newsize=2096m,-XX:MaxPermSize=512m,-XX:+PrintGCDetails,-XX:+PrintGCTimeStamps,-XX:+UseParNewGC,-XX:+UseConcMarkSweepGC,-XX:CMSInitiatingOccupancyFraction=80,-XX:GCTimeLimit=5,-XX:GCHeapFreeLimit=95














Re: Re: spark streaming updateStateByKey state is nonsupport other type except ClassTag such as list?

2015-12-22 Thread our...@cnsuning.com
Dean,
  the following code  test pasted . Thank for you again.

 if (args.length < 2) {
   System.err.println("Usage: StatefulNetworkWordBiggest3Vaules  
")
   System.exit(1)
 }


 val updateFunc = (key:String,values: Seq[Seq[Int]], state: Option[Seq[Int]]) 
=> {
   if(values.length>0){
 Some(values(0))
   }else {
 Some(state.getOrElse(Seq(0)))
   }
 }

 val newUpdateFunc = (iterator: Iterator[(String, Seq[Seq[Int]], 
Option[Seq[Int]])]) => {
   iterator.flatMap(t => updateFunc(t._1,t._2, t._3).map(s => (t._1, s)))
 }

 val sparkConf = new SparkConf().setAppName("StatefulNetworkWordBiggest3Vaules")
 // Create the context with a 1 second batch size
 val ssc = new StreamingContext(sparkConf, Seconds(1))
 ssc.checkpoint("/user/spark/StatefulNetworkWordBiggest3Vaules1")

 // Initial RDD input to updateStateByKey
 val initialRDD = ssc.sparkContext.parallelize(List(("hello", Seq(0)), 
("world", Seq(0

 // Create a ReceiverInputDStream on target ip:port and count the
 // words in input stream of \n delimited test (eg. generated by 'nc')
 val lines = ssc.socketTextStream(args(0), args(1).toInt)
 val words = lines.flatMap(_.split(" "))
 val wordDstream = words.map(x => {
   val v= scala.util.Random.nextInt(1000)
   (x,Seq(v))
 })
 // Update the cumulative count using updateStateByKey
 // This will give a Dstream made of state (which is the cumulative count of 
the words)
// wordDstream.updateStateByKey(newUpdateFunc, new HashPartitioner 
(ssc.sparkContext.defaultParallelism),true,initialRDD)
 val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc, new 
HashPartitioner (ssc.sparkContext.defaultParallelism),true, initialRDD)
 stateDstream.print()
 ssc.start()
 ssc.awaitTermination()











Ricky  Ou(欧   锐)


 
From: our...@cnsuning.com
Date: 2015-12-23 14:19
To: Dean Wampler
CC: user; t...@databricks.com
Subject: Re: Re: spark streaming updateStateByKey state is nonsupport other 
type except ClassTag such as list?


 as the following code modified form StateflNetwork in exampile package 
if (args.length < 2) {
  System.err.println("Usage: StatefulNetworkWordBiggest3Vaules  
")
  System.exit(1)
}
/**
 * state is min(max(3))
 */

val updateFunc = (key:String,values: Seq[Seq[Int]], state: Seq[Int]) => {
  values(0)
}

val newUpdateFunc = (iterator: Iterator[(String, Seq[Seq[Int]], Seq[Int])]) => {
  iterator.flatMap(t => updateFunc(t._1,t._2, t._3).map(s => (t._1, s)))
}

val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")

// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List(("hello", Seq(1)), ("world", 
Seq(1

// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, scala.util.Random.nextInt(1000)))
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the 
words)
val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc,
  new HashPartitioner (ssc.sparkContext.defaultParallelism))
stateDstream.print()
ssc.start()
ssc.awaitTermination() 

the compile error:
Error:(77, 52) overloaded method value updateStateByKey with alternatives: 
(updateFunc: Iterator[(String, Seq[Int], Option[Seq[Int]])] => 
Iterator[(String, Seq[Int])],partitioner: 
org.apache.spark.Partitioner,rememberPartitioner: Boolean,initialRDD: 
org.apache.spark.rdd.RDD[(String, Seq[Int])])(implicit evidence$7: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])]  
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],partitioner: 
org.apache.spark.Partitioner,initialRDD: org.apache.spark.rdd.RDD[(String, 
Seq[Int])])(implicit evidence$6: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])]  
(updateFunc: Iterator[(String, Seq[Int], Option[Seq[Int]])] => 
Iterator[(String, Seq[Int])],partitioner: 
org.apache.spark.Partitioner,rememberPartitioner: Boolean)(implicit evidence$5: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])]  
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],partitioner: 
org.apache.spark.Partitioner)(implicit evidence$4: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])]  
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],numPartitions: 
Int)(implicit evidence$3: 
scala.reflect.ClassTa

Re: Re: spark streaming updateStateByKey state is nonsupport other type except ClassTag such as list?

2015-12-22 Thread our...@cnsuning.com


 as the following code modified form StateflNetwork in exampile package 
if (args.length < 2) {
  System.err.println("Usage: StatefulNetworkWordBiggest3Vaules  
")
  System.exit(1)
}
/**
 * state is min(max(3))
 */

val updateFunc = (key:String,values: Seq[Seq[Int]], state: Seq[Int]) => {
  values(0)
}

val newUpdateFunc = (iterator: Iterator[(String, Seq[Seq[Int]], Seq[Int])]) => {
  iterator.flatMap(t => updateFunc(t._1,t._2, t._3).map(s => (t._1, s)))
}

val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")

// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List(("hello", Seq(1)), ("world", 
Seq(1

// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, scala.util.Random.nextInt(1000)))
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the 
words)
val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc,
  new HashPartitioner (ssc.sparkContext.defaultParallelism))
stateDstream.print()
ssc.start()
ssc.awaitTermination() 

the compile error:
Error:(77, 52) overloaded method value updateStateByKey with alternatives: 
(updateFunc: Iterator[(String, Seq[Int], Option[Seq[Int]])] => 
Iterator[(String, Seq[Int])],partitioner: 
org.apache.spark.Partitioner,rememberPartitioner: Boolean,initialRDD: 
org.apache.spark.rdd.RDD[(String, Seq[Int])])(implicit evidence$7: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])]  
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],partitioner: 
org.apache.spark.Partitioner,initialRDD: org.apache.spark.rdd.RDD[(String, 
Seq[Int])])(implicit evidence$6: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])]  
(updateFunc: Iterator[(String, Seq[Int], Option[Seq[Int]])] => 
Iterator[(String, Seq[Int])],partitioner: 
org.apache.spark.Partitioner,rememberPartitioner: Boolean)(implicit evidence$5: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])]  
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],partitioner: 
org.apache.spark.Partitioner)(implicit evidence$4: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])]  
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]],numPartitions: 
Int)(implicit evidence$3: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])]  
(updateFunc: (Seq[Int], Option[Seq[Int]]) => Option[Seq[Int]])(implicit 
evidence$2: 
scala.reflect.ClassTag[Seq[Int]])org.apache.spark.streaming.dstream.DStream[(String,
 Seq[Int])] 
cannot be applied to (Iterator[(String, Seq[Seq[Int]], Seq[Int])] => 
Iterator[(String, Int)], org.apache.spark.HashPartitioner, Boolean, 
org.apache.spark.rdd.RDD[(String, Seq[Int])]) 
val stateDstream = wordDstream.updateStateByKey[Seq[Int]](newUpdateFunc, 
 










Ricky  Ou(欧   锐)

部   门:苏宁云商 IT总部技术支撑研发中心大
数据中心数据平台开发部
tel  :18551600418
email  : our...@cnsuning.com 

 
From: Dean Wampler
Date: 2015-12-23 00:46
To: our...@cnsuning.com
CC: user; t...@databricks.com
Subject: Re: spark streaming updateStateByKey state is nonsupport other type 
except ClassTag such as list?
There are ClassTags for Array, List, and Map, as well as for Int, etc. that you 
might have inside those collections. What do you mean by sql? Could you post 
more of your code?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com

On Mon, Dec 21, 2015 at 8:51 PM, our...@cnsuning.com  
wrote:

spark streaming updateStateByKey state no support Array type  without classTag? 
 how to slove the problem?

def updateStateByKey[S: ClassTag]( 
updateFunc: (Seq[V], Option[S]) => Option[S] 
): DStream[(K, S)] = ssc.withScope { 
updateStateByKey(updateFunc, defaultPartitioner()) 
}

ClassTag  not support other type eg:hashmap ,list ,sql.


my usecase as following:
 save the lastest three click log with collecting   goods  from  different 
topic with same  member ID,  then  the system will recommend related products 
according to  lastest three click log with collecting   goods.
I want to use updateStateByKey  state to save it ,however  updateStateByKey 
state is  nonsupport  other type except ClassTag  such as list.
 



thanks for 
your help






Ricky  Ou(欧   锐)






Re: Re: spark streaming updateStateByKey state is nonsupport other type except ClassTag such as list?

2015-12-22 Thread our...@cnsuning.com
So sorry , should be Seq, not sql . thanks for your help.





Ricky  Ou(欧   锐)


 
From: Dean Wampler
Date: 2015-12-23 00:46
To: our...@cnsuning.com
CC: user; t...@databricks.com
Subject: Re: spark streaming updateStateByKey state is nonsupport other type 
except ClassTag such as list?
There are ClassTags for Array, List, and Map, as well as for Int, etc. that you 
might have inside those collections. What do you mean by sql? Could you post 
more of your code?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com

On Mon, Dec 21, 2015 at 8:51 PM, our...@cnsuning.com  
wrote:

spark streaming updateStateByKey state no support Array type  without classTag? 
 how to slove the problem?

def updateStateByKey[S: ClassTag]( 
updateFunc: (Seq[V], Option[S]) => Option[S] 
): DStream[(K, S)] = ssc.withScope { 
updateStateByKey(updateFunc, defaultPartitioner()) 
}

ClassTag  not support other type eg:hashmap ,list ,sql.


my usecase as following:
 save the lastest three click log with collecting   goods  from  different 
topic with same  member ID,  then  the system will recommend related products 
according to  lastest three click log with collecting   goods.
I want to use updateStateByKey  state to save it ,however  updateStateByKey 
state is  nonsupport  other type except ClassTag  such as list.
 



thanks for 
your help






Ricky  Ou(欧   锐)






spark streaming updateStateByKey state is nonsupport other type except ClassTag such as list?

2015-12-21 Thread our...@cnsuning.com

spark streaming updateStateByKey state no support Array type  without classTag? 
 how to slove the problem?

def updateStateByKey[S: ClassTag]( 
updateFunc: (Seq[V], Option[S]) => Option[S] 
): DStream[(K, S)] = ssc.withScope { 
updateStateByKey(updateFunc, defaultPartitioner()) 
}

ClassTag  not support other type eg:hashmap ,list ,sql.


my usecase as following:
 save the lastest three click log with collecting   goods  from  different 
topic with same  member ID,  then  the system will recommend related products 
according to  lastest three click log with collecting   goods.
I want to use updateStateByKey  state to save it ,however  updateStateByKey 
state is  nonsupport  other type except ClassTag  such as list.
 



thanks for 
your help






Ricky  Ou(欧   锐)





spark sql throw java.lang.ArrayIndexOutOfBoundsException when use table.*

2015-11-29 Thread our...@cnsuning.com
hi all,
 throw java.lang.ArrayIndexOutOfBoundsException  when I use following 
spark sql on spark standlone or yarn.
   the sql:
select ta.* 
from bi_td.dm_price_seg_td tb 
join bi_sor.sor_ord_detail_tf ta 
on 1 = 1 
where ta.sale_dt = '20140514' 
and ta.sale_price >= tb.pri_from 
and ta.sale_price < tb.pri_to limit 10 ; 

But ,the result is correct when using no * as following:
select ta.sale_dt 
from bi_td.dm_price_seg_td tb 
join bi_sor.sor_ord_detail_tf ta 
on 1 = 1 
where ta.sale_dt = '20140514' 
and ta.sale_price >= tb.pri_from 
and ta.sale_price < tb.pri_to limit 10 ; 

standlone version is 1.4.0 and version spark on yarn  is 1.5.2
error log :
   
15/11/30 14:19:59 ERROR SparkSQLDriver: Failed in [select ta.* 
from bi_td.dm_price_seg_td tb 
join bi_sor.sor_ord_detail_tf ta 
on 1 = 1 
where ta.sale_dt = '20140514' 
and ta.sale_price >= tb.pri_from 
and ta.sale_price < tb.pri_to limit 10 ] 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3, namenode2-sit.cnsuning.com): java.lang.ArrayIndexOutOfBoundsException 

Driver stacktrace: 
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
 
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
 
at scala.Option.foreach(Option.scala:236) 
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
 
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
 
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
 
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) 
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215) 
at 
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) 
at 
org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:587)
 
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
 
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:308)
 
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376) 
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:311) 
at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:409) 
at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:425) 
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:166)
 
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ArrayIndexOutOfBoundsException 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3, namenode2-sit.cnsuning.com): java.lang.ArrayIndexOutOfBoundsException 

Driver stacktrace: 
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
 
at 
sca

Re: Re: --jars option using hdfs jars cannot effect when spark standlone deploymode with cluster

2015-11-03 Thread our...@cnsuning.com
Akhil,

 In locally ,all nodes will has the same  jar   because  the driver will be 
assgined to random node ;otherwise the driver log wiil report :no jar was 
founded .












Ricky  Ou(欧   锐)


 
From: Akhil Das
Date: 2015-11-02 17:59
To: our...@cnsuning.com
CC: user; 494165115
Subject: Re: --jars option using hdfs jars cannot effect when spark standlone 
deploymode with cluster
Can you give a try putting the jar locally without hdfs?

Thanks
Best Regards

On Wed, Oct 28, 2015 at 8:40 AM, our...@cnsuning.com  
wrote:
hi all,
   when using command:
spark-submit --deploy-mode cluster --jars hdfs:///user/spark/cypher.jar 
--class com.suning.spark.jdbc.MysqlJdbcTest hdfs:///user/spark/MysqlJdbcTest.jar
the program throw exception that  cannot find class in cypher.jar, the driver 
log show no --jars download with  cluster mode.  Isn't it  only use fatjar?
  



 



   











Ricky  Ou(欧   锐)





--jars option using hdfs jars cannot effect when spark standlone deploymode with cluster

2015-10-27 Thread our...@cnsuning.com
hi all,
   when using command:
spark-submit --deploy-mode cluster --jars hdfs:///user/spark/cypher.jar 
--class com.suning.spark.jdbc.MysqlJdbcTest hdfs:///user/spark/MysqlJdbcTest.jar
the program throw exception that  cannot find class in cypher.jar, the driver 
log show no --jars download with  cluster mode.  Isn't it  only use fatjar?
  



 



   











Ricky  Ou(欧   锐)




回复: sometimes No event logs found for application using same JavaSparkSQL example

2015-09-25 Thread our...@cnsuning.com
https://issues.apache.org/jira/browse/SPARK-10832






 
发件人: our...@cnsuning.com
发送时间: 2015-09-25 20:36
收件人: user
抄送: 494165115
主题: sometimes No event logs found for application using same JavaSparkSQL 
example
hi all,
   when  using JavaSparkSQL example,the code was submit many times as following:
/home/spark/software/spark/bin/spark-submit --deploy-mode cluster --class 
org.apache.spark.examples.sql.JavaSparkSQL 
hdfs://SuningHadoop2/user/spark/lib/spark-examples-1.4.0-hadoop2.4.0.jar

unfortunately , sometimes completed applications web shows has"No event 
logs found for application",but  a majority of same application is nomal . the 
detail information showed in jira SPARK-10832(SPARK-10832)



sometimes No event logs found for application using same JavaSparkSQL example

2015-09-25 Thread our...@cnsuning.com
hi all,
   when  using JavaSparkSQL example,the code was submit many times as following:
/home/spark/software/spark/bin/spark-submit --deploy-mode cluster --class 
org.apache.spark.examples.sql.JavaSparkSQL 
hdfs://SuningHadoop2/user/spark/lib/spark-examples-1.4.0-hadoop2.4.0.jar

unfortunately , sometimes completed applications web shows has"No event 
logs found for application",but  a majority of same application is nomal . the 
detail information showed in jira SPARK-10832(SPARK-10832)



Re: Re: Job aborted due to stage failure: java.lang.StringIndexOutOfBoundsException: String index out of range: 18

2015-08-28 Thread our...@cnsuning.com
Terry:
Unfortunately, error remains when use your advice.But error is changed ,now 
error is java.lang.ArrayIndexOutOfBoundsException: 71 
  error log as following:
   
15/08/28 19:13:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
9, 10.104.74.7): java.lang.ArrayIndexOutOfBoundsException: 71 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:23) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:23) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at 
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
 
at 
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
 
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
 
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
at org.apache.spark.scheduler.Task.run(Task.scala:70) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
 
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 
at scala.Option.foreach(Option.scala:236) 
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
 
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
 
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)








 
From: Terry Hole
Date: 2015-08-28 17:22
To: our...@cnsuning.com
CC: user; hao.cheng; Huang, Jie
Subject: Re: Job aborted due to stage failure: 
java.lang.StringIndexOutOfBoundsException: String index out of range: 18
Ricky,


You may need to use map instead of flatMap in your case
val rowRDD=sc.textFile("/user/spark/short_model").map(_.split("\\t")).map(p => 
Row(...))
Thanks!-Terry

On Fri, Aug 28, 2015 at 5:08 PM, our...@cnsuning.com  
wrote:
hi all,
 
when using  spark sql ,A problem bothering me.

the codeing as following:
 val schemaString = 
"visitor_id,cust_num,gds_id,l1_gds_group_cd,l4_gds_group_cd,pc_gds_addcart,pc_gds_collect_num,pc_gds_four_page_pv,pc_gds_four_page_time,pc_gds_four_page_fromsearch_pv,pc_gds_four_page_fromlist_pv,pc_gds_four_page_fromrec_pv,pc_gds_four_page_fromcuxiao_pv,pc_four_page_num,pc_group_gds_addcart,pc_group_gds_collect,pc_group_fourpage_pv,pc_group_fourpage_time,pc_visitor_pv,pc_search_pv,pc_list_pv,pc_is_view1,pc_is_view,pc_view_cycle_days,pc_view_days,wap_gds_addcart,wap_gds_collect_num,wap_gds_four_page_pv,wap_gds_four_page_time,wap_gds_four_page_fromsearch_pv,wap_gds_four_page_fromlist_pv,wap_gds_four_page_fromrec_pv,wap_gds_four_page_fromcuxiao_pv,wap_four_page_num,wap_group_gds_addcart,wap_group_gds_collect,wap_group_fourpage_pv,wap_group_fourpage_time,wap_visitor_pv,wap_search_pv,wap_list_pv,wap_is_view1,wap_is_view,wap_view_cycle_days,wap_view_days,app_gds_addcart,app_gds_collect_num,app_gds_four_page_pv,app_gds_four_page_time,app_gds_four_page_fromsearch_pv,app_gds_four_page_fromlist_pv,app_gds_four_page_fromrec_pv,app_gds_four_page_fro

Job aborted due to stage failure: java.lang.StringIndexOutOfBoundsException: String index out of range: 18

2015-08-28 Thread our...@cnsuning.com
ng.StringIndexOutOfBoundsException (String index 
out of range: 18) [duplicate 2]
15/08/28 17:00:54 INFO TaskSetManager: Starting task 45.1 in stage 9.0 (TID 74, 
10.104.74.6, NODE_LOCAL, 1415 bytes)
15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.2 in stage 9.0 (TID 73) on 
executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException (String index 
out of range: 18) [duplicate 3]
15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.3 in stage 9.0 (TID 75, 
10.104.74.8, NODE_LOCAL, 1415 bytes)
15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.3 in stage 9.0 (TID 75) on 
executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException (String index 
out of range: 18) [duplicate 4]
15/08/28 17:00:54 ERROR TaskSetManager: Task 56 in stage 9.0 failed 4 times; 
aborting job
15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.1 in stage 9.0 (TID 74) on 
executor 10.104.74.6: java.lang.StringIndexOutOfBoundsException (String index 
out of range: 18) [duplicate 5]
15/08/28 17:00:54 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have 
all completed, from pool
15/08/28 17:00:54 INFO TaskSchedulerImpl: Cancelling stage 9
15/08/28 17:00:54 INFO DAGScheduler: ShuffleMapStage 9 (collect at 
:31) failed in 0.206 s
15/08/28 17:00:54 INFO DAGScheduler: Job 6 failed: collect at :31, 
took 0.293903 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 56 in 
stage 9.0 failed 4 times, most recent failure: Lost task 56.3 in stage 9.0 (TID 
75, 10.104.74.8): java.lang.StringIndexOutOfBoundsException: String index out 
of range: 18
at java.lang.String.charAt(String.java:658)
at scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:26)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:26)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
at 
org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)








Ricky  Ou(欧   锐)

部   门:苏宁云商 IT总部技术支撑研发中心大
数据中心数据平台开发部

email  : our...@cnsuning.com