Re:Upgrading from Spark SQL 3.2 to 3.3 faild

2023-02-15 Thread lk_spark
I need to use cast function to surround computed expression then explain the 
SQL is ok, for example :

cast(a.Split_Amt * b.percent / 100 asdecimal(20,8)) as split_amt



I don't know why , is there a config property could compatibility with spark3.2 
? 










At 2023-02-16 13:47:25, "lk_spark"  wrote:

hi,all :
I have a sql statement wich can be run on spark 3.2.1 but not on spark 3.3.1 . 
when I try to explain it, will got error with message: 
org.apache.spark.sql.catalyst.expressions.Literal cannot be cast to 
org.apache.spark.sql.catalyst.expressions.AnsiCast


execute the sql, error stack is : 


ERROR ExecuteStatement: Error operating ExecuteStatement: 
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.Literal 
cannot be cast to org.apache.spark.sql.catalyst.expressions.AnsiCast
at 
org.apache.spark.sql.catalyst.expressions.CheckOverflowInTableInsert.withNewChildInternal(Cast.scala:2362)
at 
org.apache.spark.sql.catalyst.expressions.CheckOverflowInTableInsert.withNewChildInternal(Cast.scala:2360)
at 
org.apache.spark.sql.catalyst.trees.UnaryLike.$anonfun$mapChildren$5(TreeNode.scala:1233)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at 
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1232)
at 
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
at 
org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:498)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:635)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:635)
at 
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
at 
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
at 
org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:498)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:635)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:188)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:200)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:200)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:211)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:216)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:216)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:221)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:221)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:188)
at 
org.apache.spark.sql.catalyst.optimizer.NullPropagation$$anonfun$apply$19.applyOrElse(expressions.scala:804)
at 
org.apache.spark.sql.catalyst.optimizer.NullPropagation$$anonfun$apply$19.applyOrElse(expressions.scala:801)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode

Upgrading from Spark SQL 3.2 to 3.3 faild

2023-02-15 Thread lk_spark
hi,all :
I have a sql statement wich can be run on spark 3.2.1 but not on spark 3.3.1 . 
when I try to explain it, will got error with message: 
org.apache.spark.sql.catalyst.expressions.Literal cannot be cast to 
org.apache.spark.sql.catalyst.expressions.AnsiCast


execute the sql, error stack is : 


ERROR ExecuteStatement: Error operating ExecuteStatement: 
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.Literal 
cannot be cast to org.apache.spark.sql.catalyst.expressions.AnsiCast
at 
org.apache.spark.sql.catalyst.expressions.CheckOverflowInTableInsert.withNewChildInternal(Cast.scala:2362)
at 
org.apache.spark.sql.catalyst.expressions.CheckOverflowInTableInsert.withNewChildInternal(Cast.scala:2360)
at 
org.apache.spark.sql.catalyst.trees.UnaryLike.$anonfun$mapChildren$5(TreeNode.scala:1233)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at 
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1232)
at 
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
at 
org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:498)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:635)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:635)
at 
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
at 
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
at 
org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:498)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:635)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:188)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:200)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:200)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:211)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:216)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:216)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:221)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:221)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:188)
at 
org.apache.spark.sql.catalyst.optimizer.NullPropagation$$anonfun$apply$19.applyOrElse(expressions.scala:804)
at 
org.apache.spark.sql.catalyst.optimizer.NullPropagation$$anonfun$apply$19.applyOrElse(expressions.scala:801)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 

Does 'Stage cancelled because SparkContext was shut down' is a error

2022-09-28 Thread lk_spark
hi,all :
  when I try to merge a iceberg table  by spark , I can see faild job on 
spark ui , but the spark application final state is SUCCEEDED.
  I submit an issue : https://github.com/apache/iceberg/issues/5876
  I wonder to know is this a real error ? thanks .

Re:NoSuchMethodError: org.apache.spark.sql.execution.command.CreateViewCommand.copy

2022-03-21 Thread lk_spark
sorry, it's my env problem.
















At 2022-03-21 14:00:01, "lk_spark"  wrote:

hi, all :
I got a strange error: 


bin/spark-shell --deploy-mode client
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
22/03/21 13:51:39 WARN util.Utils: spark.executor.instances less than 
spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please 
update your configs.
22/03/21 13:51:46 WARN util.Utils: spark.executor.instances less than 
spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please 
update your configs.
22/03/21 13:51:46 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
Attempted to request executors before the AM has registered!
Spark context Web UI available at http://client-10-0-161-29:4040
Spark context available as 'sc' (master = yarn, app id = 
application_1644825367082_16937).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
  /_/
 
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_281)
Type in expressions to have them evaluated.
Type :help for more information.


scala>  val parqfile = 
spark.read.parquet("/tmp/datax/tmp/python/ods_io_install/ods_io_install/")
parqfile: org.apache.spark.sql.DataFrame = [spid: string, region_rule: string 
... 7 more fields]


scala> parqfile.printSchema
root
 |-- spid: string (nullable = true)
 |-- region_rule: string (nullable = true)
 |-- app_version: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- is_install: string (nullable = true)
 |-- last_install_time: string (nullable = true)
 |-- last_uninstall_time: string (nullable = true)
 |-- last_use_time: string (nullable = true)
 |-- pdate: integer (nullable = true)




scala> parqfile.show(2)
+-+-+---++--+-+---+++
| spid|  region_rule|app_version|   
device_id|is_install|last_install_time|last_uninstall_time|   
last_use_time|   pdate|
+-+-+---++--+-+---+++
|13025|北京市房屋建筑与装饰工程预算定额计...|   1.0.29.2|ea68f0cc-7038-43a...| 1|
 null|   null|2021-06-05 11:49:...|20220320|
|13025| 山东省建筑工程消耗量定额计算规则(...|   1.0.31.0|c16e1260-5700-4a4...| 1|   
  null|   null|2022-01-08 17:55:...|20220320|
+-+-+---++--+-+---+++
only showing top 2 rows




scala> parqfile.createOrReplaceTempView("ods_io_install_temp")
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask 
replaced a previously registered function.
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_hash 
replaced a previously registered function.
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function 
mask_first_n replaced a previously registered function.
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function 
mask_last_n replaced a previously registered function.
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function 
mask_show_last_n replaced a previously registered function.
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function 
mask_show_first_n replaced a previously registered function.
java.lang.NoSuchMethodError: 
org.apache.spark.sql.execution.command.CreateViewCommand.copy(Lorg/apache/spark/sql/catalyst/TableIdentifier;Lscala/collection/Seq;Lscala/Option;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;ZZLorg/apache/spark/sql/catalyst/analysis/ViewType;Z)Lorg/apache/spark/sql/execution/command/CreateViewCommand;
  at 
org.apache.spark.sql.catalyst.optimizer.SubmarineRowFilterExtension.apply(SubmarineRowFilterExtension.scala:125)
  at 
org.apache.spark.sql.catalyst.optimizer.SubmarineRowFilterExtension.apply(SubmarineRowFilterExtension.scala:41)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  at scala.collection.immutable.List.foldLeft(List.scala:91)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at 
org.apache.spark.sql.cataly

NoSuchMethodError: org.apache.spark.sql.execution.command.CreateViewCommand.copy

2022-03-21 Thread lk_spark
hi, all :
I got a strange error: 


bin/spark-shell --deploy-mode client
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
22/03/21 13:51:39 WARN util.Utils: spark.executor.instances less than 
spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please 
update your configs.
22/03/21 13:51:46 WARN util.Utils: spark.executor.instances less than 
spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please 
update your configs.
22/03/21 13:51:46 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
Attempted to request executors before the AM has registered!
Spark context Web UI available at http://client-10-0-161-29:4040
Spark context available as 'sc' (master = yarn, app id = 
application_1644825367082_16937).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
  /_/
 
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_281)
Type in expressions to have them evaluated.
Type :help for more information.


scala>  val parqfile = 
spark.read.parquet("/tmp/datax/tmp/python/ods_io_install/ods_io_install/")
parqfile: org.apache.spark.sql.DataFrame = [spid: string, region_rule: string 
... 7 more fields]


scala> parqfile.printSchema
root
 |-- spid: string (nullable = true)
 |-- region_rule: string (nullable = true)
 |-- app_version: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- is_install: string (nullable = true)
 |-- last_install_time: string (nullable = true)
 |-- last_uninstall_time: string (nullable = true)
 |-- last_use_time: string (nullable = true)
 |-- pdate: integer (nullable = true)




scala> parqfile.show(2)
+-+-+---++--+-+---+++
| spid|  region_rule|app_version|   
device_id|is_install|last_install_time|last_uninstall_time|   
last_use_time|   pdate|
+-+-+---++--+-+---+++
|13025|北京市房屋建筑与装饰工程预算定额计...|   1.0.29.2|ea68f0cc-7038-43a...| 1|
 null|   null|2021-06-05 11:49:...|20220320|
|13025| 山东省建筑工程消耗量定额计算规则(...|   1.0.31.0|c16e1260-5700-4a4...| 1|   
  null|   null|2022-01-08 17:55:...|20220320|
+-+-+---++--+-+---+++
only showing top 2 rows




scala> parqfile.createOrReplaceTempView("ods_io_install_temp")
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask 
replaced a previously registered function.
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_hash 
replaced a previously registered function.
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function 
mask_first_n replaced a previously registered function.
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function 
mask_last_n replaced a previously registered function.
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function 
mask_show_last_n replaced a previously registered function.
22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function 
mask_show_first_n replaced a previously registered function.
java.lang.NoSuchMethodError: 
org.apache.spark.sql.execution.command.CreateViewCommand.copy(Lorg/apache/spark/sql/catalyst/TableIdentifier;Lscala/collection/Seq;Lscala/Option;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;ZZLorg/apache/spark/sql/catalyst/analysis/ViewType;Z)Lorg/apache/spark/sql/execution/command/CreateViewCommand;
  at 
org.apache.spark.sql.catalyst.optimizer.SubmarineRowFilterExtension.apply(SubmarineRowFilterExtension.scala:125)
  at 
org.apache.spark.sql.catalyst.optimizer.SubmarineRowFilterExtension.apply(SubmarineRowFilterExtension.scala:41)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  at scala.collection.immutable.List.foldLeft(List.scala:91)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
  at 

Why NPE happen with multi threading in cluster mode but not client model

2020-12-02 Thread lk_spark
hi,all :
   I'm using spark2.4, I try to use multi thread to use sparkcontext , I found 
a example : 
https://hadoopist.wordpress.com/2017/02/03/how-to-use-threads-in-spark-job-to-achieve-parallel-read-and-writes/
   some code like this :
   for (a <- 0 until 4) {
  val thread = new Thread {
override def run() {
  sc.parallelize(Array("ddd", "eee", 
"fff")).toDF().write.format("parquet").mode("overWrite").save("/tmp/vgiri/file2")
   .
   
   when I run the code in local or client model , it will be work. But In 
cluster model I meet the ERROR: SPARK-29046 
https://issues.apache.org/jira/browse/SPARK-29046
   finally , I use java.util.concurrent.Semaphore to wait all the sub thread to 
be finished before the main thread stop the sparkcontext , then the code worked 
in cluster model.


   But I don't understand why In local or client model ,even I didn't use a 
Semaphore, the code can also work ?
   



Re: how spark structrued stream write to kudu

2019-11-25 Thread lk_spark
I found _sqlContext is null , how to resolve it ?

2019-11-25 

lk_spark 



发件人:"lk_spark"
发送时间:2019-11-25 16:00
主题:how spark structrued stream write to kudu
收件人:"user.spark"
抄送:

hi,all:
   I'm using spark 2.4.4 to readstream data from kafka and want to write to 
kudu 1.7.0 , my code like below : 

val kuduContext = new KuduContext("master:7051", spark.sparkContext)

val console = cnew.select("*").as[CstoreNew]
  .writeStream
  .option("checkpointLocation", "/tmp/t3/")
  .trigger(Trigger.Once())
  .foreach(new ForeachWriter[CstoreNew] {
override def open(partitionId: Long, version: Long): Boolean = {
  true
}
override def process(value: CstoreNew): Unit = {
  val spark = SparkSessionSingleton.getInstance(sparkConf)
  val valueDF = Seq(value).toDF()   // GET WRONG
  kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data")
}
override def close(errorOrNull: Throwable): Unit = {
}
  })
val query = console.start()
query.awaitTermination()

when run to val valueDF = Seq(value).toDF() I got error msg : 
Caused by: java.lang.NullPointerException
 at 
org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
 at 
com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122)
...

and  SQLImplicits.scala:228 is :

227:   implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): 
DatasetHolder[T] = {
228:DatasetHolder(_sqlContext.createDataset(s))
229:   }

can anyone give me some help?
2019-11-25


lk_spark 

how spark structrued stream write to kudu

2019-11-25 Thread lk_spark
hi,all:
   I'm using spark 2.4.4 to readstream data from kafka and want to write to 
kudu 1.7.0 , my code like below : 

val kuduContext = new KuduContext("master:7051", spark.sparkContext)

val console = cnew.select("*").as[CstoreNew]
  .writeStream
  .option("checkpointLocation", "/tmp/t3/")
  .trigger(Trigger.Once())
  .foreach(new ForeachWriter[CstoreNew] {
override def open(partitionId: Long, version: Long): Boolean = {
  true
}
override def process(value: CstoreNew): Unit = {
  val spark = SparkSessionSingleton.getInstance(sparkConf)
  val valueDF = Seq(value).toDF()   // GET WRONG
  kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data")
}
override def close(errorOrNull: Throwable): Unit = {
}
  })
val query = console.start()
query.awaitTermination()

when run to val valueDF = Seq(value).toDF() I got error msg : 
Caused by: java.lang.NullPointerException
 at 
org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
 at 
com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122)
...

and  SQLImplicits.scala:228 is :

227:   implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): 
DatasetHolder[T] = {
228:DatasetHolder(_sqlContext.createDataset(s))
229:   }

can anyone give me some help?
2019-11-25


lk_spark 

how to limit tasks num when read hive with orc

2019-11-11 Thread lk_spark
hi,all:
I have a hive table STORED AS INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' , many files of it is very 
small , when I use spark to read it , thousands tasks will start , how can I 
limit the task num ?

2019-11-12


lk_spark 

Re: Re: how can I dynamic parse json in kafka when using Structured Streaming

2019-09-17 Thread lk_spark
I want to parse the Struct of data dynamically , then write data to delta lake 
, I think it can automatically merge scheme. 

2019-09-17 

lk_spark 



发件人:Tathagata Das 
发送时间:2019-09-17 16:13
主题:Re: how can I dynamic parse json in kafka when using Structured Streaming
收件人:"lk_spark"
抄送:"user.spark"

You can use from_json built-in SQL function to parse json. 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column-



On Mon, Sep 16, 2019 at 7:39 PM lk_spark  wrote:

hi,all :
I'm using Structured Streaming to read kafka , the data type is json String 
, I want to parse  it and conver to a datafrme , my code can't pass compile , I 
don't know how to fix it: 

val lines = messages.selectExpr("CAST(value AS STRING) as value").as[String]

val words = lines.map(line => {
  var json: JValue = null
  try {
json = parse(line)
  } catch {
case ex: Exception => { println(ex.getMessage + " " + line) }
  }
  //var result: scala.collection.mutable.Map[String,String] = 
scala.collection.mutable.Map()
  val jsonObj = json.values.asInstanceOf[Map[String, _]]
  val valuse = jsonObj.values.toArray
  val schema = StructType(List())
  for ((k, v) <- jsonObj){
//result += (k -> jsonObj.get(k).toString())

if(v.isInstanceOf[String]){
  schema.add(k,StringType)
}else if (v.isInstanceOf[Int]){
  schema.add(k,IntegerType)
}/*else if (v.isInstanceOf[Array[String]]){
  schema.add(k,ArrayType(StringType))
}else if (v.isInstanceOf[Map[String,String]]){
  schema.add(k,MapType(StringType,StringType))
}*/
  }
  val row =  new GenericRowWithSchema(valuse,schema)
  row
})

Error:(45, 26) Unable to find encoder for type 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit 
Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is 
needed to store org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 
instances in a Dataset. Primitive types (Int, String, etc) and Product types 
(case classes) are supported by importing spark.implicits._  Support for 
serializing other types will be added in future releases.
val words = lines.map(line => {

Error:(45, 26) not enough arguments for method map: (implicit evidence$6: 
org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema].
Unspecified value parameter evidence$6.
val words = lines.map(line => {



2019-09-17


lk_spark 

how can I dynamic parse json in kafka when using Structured Streaming

2019-09-16 Thread lk_spark
hi,all :
I'm using Structured Streaming to read kafka , the data type is json String 
, I want to parse  it and conver to a datafrme , my code can't pass compile , I 
don't know how to fix it: 

val lines = messages.selectExpr("CAST(value AS STRING) as value").as[String]

val words = lines.map(line => {
  var json: JValue = null
  try {
json = parse(line)
  } catch {
case ex: Exception => { println(ex.getMessage + " " + line) }
  }
  //var result: scala.collection.mutable.Map[String,String] = 
scala.collection.mutable.Map()
  val jsonObj = json.values.asInstanceOf[Map[String, _]]
  val valuse = jsonObj.values.toArray
  val schema = StructType(List())
  for ((k, v) <- jsonObj){
//result += (k -> jsonObj.get(k).toString())

if(v.isInstanceOf[String]){
  schema.add(k,StringType)
}else if (v.isInstanceOf[Int]){
  schema.add(k,IntegerType)
}/*else if (v.isInstanceOf[Array[String]]){
  schema.add(k,ArrayType(StringType))
}else if (v.isInstanceOf[Map[String,String]]){
  schema.add(k,MapType(StringType,StringType))
}*/
  }
  val row =  new GenericRowWithSchema(valuse,schema)
  row
})

Error:(45, 26) Unable to find encoder for type 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit 
Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is 
needed to store org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 
instances in a Dataset. Primitive types (Int, String, etc) and Product types 
(case classes) are supported by importing spark.implicits._  Support for 
serializing other types will be added in future releases.
val words = lines.map(line => {

Error:(45, 26) not enough arguments for method map: (implicit evidence$6: 
org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema].
Unspecified value parameter evidence$6.
val words = lines.map(line => {



2019-09-17


lk_spark 

how to get spark-sql lineage

2019-05-15 Thread lk_spark
hi,all:
When I use spark , if I run some SQL to do ETL how can I get lineage 
info. I found that , CDH spark have some config about lineage :
spark.lineage.enabled=true
spark.lineage.log.dir=/var/log/spark2/lineage
Are they also work for apache spark ?

2019-05-16


lk_spark 

Re: Re: how to generate a larg dataset paralleled

2018-12-14 Thread lk_spark
sorry, now what I can do is like this :

var df5 = spark.read.parquet("/user/devuser/testdata/df1").coalesce(1)
df5 = df5.union(df5).union(df5).union(df5).union(df5)

2018-12-14 

lk_spark 



发件人:15313776907 <15313776...@163.com>
发送时间:2018-12-14 16:39
主题:Re: how to generate a larg dataset paralleled
收件人:"lk_sp...@163.com"
抄送:"user@spark.apache.org"



I also have this problem, hope to be able to solve here, thank you 
On 12/14/2018 10:38,lk_spark wrote: 
hi,all:
I want't to generate some test data , which contained about one hundred 
million rows .
I create a dataset have ten rows ,and I do df.union operation in 'for' 
circulation , but this will case the operation only happen on driver node.
how can I do it on the whole cluster.

2018-12-14


lk_spark 

Re: Re: how to generate a larg dataset paralleled

2018-12-13 Thread lk_spark
generate some data in Spark .

2018-12-14 

lk_spark 



发件人:Jean Georges Perrin 
发送时间:2018-12-14 11:10
主题:Re: how to generate a larg dataset paralleled
收件人:"lk_spark"
抄送:"user.spark"

You just want to generate some data in Spark or ingest a large dataset outside 
of Spark? What’s the ultimate goal you’re pursuing?


jg



On Dec 13, 2018, at 21:38, lk_spark  wrote:


hi,all:
I want't to generate some test data , which contained about one hundred 
million rows .
I create a dataset have ten rows ,and I do df.union operation in 'for' 
circulation , but this will case the operation only happen on driver node.
how can I do it on the whole cluster.

2018-12-14


lk_spark 

how to generate a larg dataset paralleled

2018-12-13 Thread lk_spark
hi,all:
I want't to generate some test data , which contained about one hundred 
million rows .
I create a dataset have ten rows ,and I do df.union operation in 'for' 
circulation , but this will case the operation only happen on driver node.
how can I do it on the whole cluster.

2018-12-14


lk_spark 

Re: about LIVY-424

2018-11-11 Thread lk_spark
I'm using livy-0.5.0 with spark2.3.0,I started  a  session with 4GB mem for 
Driver, And I run code server times :
 var tmp1 = spark.sql("use tpcds_bin_partitioned_orc_2");var tmp2 = 
spark.sql("select count(1) from tpcds_bin_partitioned_orc_2.store_sales").show
the table have 5760749 rows data.
after run about 10 times , the Driver physical memory will beyond 4.5GB and 
killed by yarn.
I saw the old generation memory  keep growing and can not release by gc.

2018-11-12 

lk_spark 



发件人:"lk_hadoop"
发送时间:2018-11-12 09:37
主题:about LIVY-424
收件人:"user"
抄送:

hi,all:
I meet this issue https://issues.apache.org/jira/browse/LIVY-424  , 
anybody know how to resolve it?
2018-11-12


lk_hadoop 

Re: spark2.3 on kubernets

2018-04-07 Thread lk_spark
resolved. need to add "kubernetes.default.svc" to k8s api server TLS config.

2018-04-08 



lk_spark 



发件人:"lk_spark"<lk_sp...@163.com>
发送时间:2018-04-08 11:15
主题:spark2.3 on kubernets
收件人:"user"<user@spark.apache.org>
抄送:

hi,all:
   I am trying spark on k8s with Pi sample.
   I got error with driver :

   2018-04-08 03:08:40 INFO  SparkContext:54 - Successfully stopped SparkContext
Exception in thread "main" org.apache.spark.SparkException: External scheduler 
cannot be instantiated
 at 
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)
 at org.apache.spark.SparkContext.(SparkContext.scala:492)
 at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)
 at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)
 at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)
 at scala.Option.getOrElse(Option.scala:121)
 at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
 at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:31)
 at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: 
[get]  for kind: [Pod]  with name: 
[spark-pi-d439505012c5362ea6d26ea19df58bc1-driver]  in namespace: [default]  
failed.
 at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)
 at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)
 at 
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)
 at 
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)
 at 
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)
 ... 8 more
Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname 
kubernetes.default.svc not verified:
certificate: sha256/Buyjy7p4sLKA7rLJqT0N/Cr3sM82fyi3pmx+eSht0eI=
DN: CN=kube-apiserver, OU=Kubernetes-manual, O=Kubernetes, L=Taipei, 
ST=Taipei, C=TW
subjectAltNames: [10.96.0.1, 192.168.0.140, 127.0.0.1, kubernetes.default]
 at 
okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:290)
 at 
okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)
 at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)
 at 
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)
 at 
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
 at 
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
 at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at 
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at 
io.fabric8.kubernetes.client.utils.HttpClientUtils$2.intercept(HttpClientUtils.java:93)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
 at okhttp3.RealCall.execute(RealCall.java:69)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:377)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:343)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:312)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:295)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleGet(BaseOperation.java:783)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:217)
 ... 12 more

I

spark2.3 on kubernets

2018-04-07 Thread lk_spark
es.SparkPi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.node.selector.spark=true \
--conf 
spark.kubernetes.authenticate.driver.clientKeyFile=/etc/kubernetes/pki/apiserver.pem
 \
--conf spark.kubernetes.container.image=dpaas/apache/spark:2.3.0-v1 \
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar

2018-04-08


lk_spark 

Re: Re: Re: spark2.1 kafka0.10

2017-06-22 Thread lk_spark
thank you Kumar , I will try it later.

2017-06-22 

lk_spark 



发件人:Pralabh Kumar <pralabhku...@gmail.com>
发送时间:2017-06-22 20:20
主题:Re: Re: spark2.1 kafka0.10
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"user.spark"<user@spark.apache.org>

It looks like your replicas for partition are getting failed. If u have more 
brokers , can u try increasing ,replicas ,just to make sure atleast one leader 
is always available. 


On Thu, Jun 22, 2017 at 10:34 AM, lk_spark <lk_sp...@163.com> wrote:

each topic have 5 partition  ,  2 replicas .

2017-06-22 

lk_spark 



发件人:Pralabh Kumar <pralabhku...@gmail.com>
发送时间:2017-06-22 17:23
主题:Re: spark2.1 kafka0.10
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"user.spark"<user@spark.apache.org>

How many replicas ,you have for this topic . 


On Thu, Jun 22, 2017 at 9:19 AM, lk_spark <lk_sp...@163.com> wrote:

java.lang.IllegalStateException: No current assignment for partition pages-2
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
 at scala.Option.orElse(Option.scala:289)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
 at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
 at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
 at scala.util.Try$.apply(Try.scala:192)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


2017-06-22 

lk_spark 



发件人:"lk_spark"<lk_sp...@163.com>
发送时间:2017-06-22 11:13
主题:spark2.1 kafka0.10
收件人:"user.spark"<user@spark.apache.org>
抄送:

hi,all: 
when I run stream application for a few minutes ,I got this error : 

17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned 
partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, 
weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, 
clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, 
profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, 
weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 
with generation 3
17/06/22 10:34:56 INFO ConsumerCoor

Re: Re: spark2.1 kafka0.10

2017-06-21 Thread lk_spark
each topic have 5 partition  ,  2 replicas .

2017-06-22 

lk_spark 



发件人:Pralabh Kumar <pralabhku...@gmail.com>
发送时间:2017-06-22 17:23
主题:Re: spark2.1 kafka0.10
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"user.spark"<user@spark.apache.org>

How many replicas ,you have for this topic . 


On Thu, Jun 22, 2017 at 9:19 AM, lk_spark <lk_sp...@163.com> wrote:

java.lang.IllegalStateException: No current assignment for partition pages-2
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
 at scala.Option.orElse(Option.scala:289)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
 at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
 at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
 at scala.util.Try$.apply(Try.scala:192)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


2017-06-22 

lk_spark 



发件人:"lk_spark"<lk_sp...@163.com>
发送时间:2017-06-22 11:13
主题:spark2.1 kafka0.10
收件人:"user.spark"<user@spark.apache.org>
抄送:

hi,all: 
when I run stream application for a few minutes ,I got this error : 

17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned 
partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, 
weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, 
clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, 
profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, 
weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 
with generation 3
17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned partitions 
[comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, 
bizs-4, pages-4, weibo-4, clicks-1, comment-1, clicks-3, weibocomment-4, 
weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, 
bizs-3, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time 
1498098896000 ms
java.lang.IllegalStateException: No current assignment for partition pages-2

I don't know why ?

2017-06-22


lk_spark 

Re: spark2.1 kafka0.10

2017-06-21 Thread lk_spark
java.lang.IllegalStateException: No current assignment for partition pages-2
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
 at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
 at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
 at scala.Option.orElse(Option.scala:289)
 at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
 at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
 at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
 at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
 at scala.util.Try$.apply(Try.scala:192)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
 at 
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
 at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


2017-06-22 

lk_spark 



发件人:"lk_spark"<lk_sp...@163.com>
发送时间:2017-06-22 11:13
主题:spark2.1 kafka0.10
收件人:"user.spark"<user@spark.apache.org>
抄送:

hi,all: 
when I run stream application for a few minutes ,I got this error : 

17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned 
partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, 
weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, 
clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, 
profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, 
weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 
with generation 3
17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned partitions 
[comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, 
bizs-4, pages-4, weibo-4, clicks-1, comment-1, clicks-3, weibocomment-4, 
weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, 
bizs-3, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time 
1498098896000 ms
java.lang.IllegalStateException: No current assignment for partition pages-2

I don't know why ?

2017-06-22


lk_spark 

spark2.1 kafka0.10

2017-06-21 Thread lk_spark
hi,all: 
when I run stream application for a few minutes ,I got this error : 

17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned 
partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, 
weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, 
clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, 
profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, 
weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 
with generation 3
17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned partitions 
[comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, 
bizs-4, pages-4, weibo-4, clicks-1, comment-1, clicks-3, weibocomment-4, 
weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, 
bizs-3, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, 
clicks-0, weibocomment-1] for group youedata1
17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time 
1498098896000 ms
java.lang.IllegalStateException: No current assignment for partition pages-2

I don't know why ?

2017-06-22


lk_spark 

spark2.1 and kafka0.10

2017-06-20 Thread lk_spark
hi,all :
https://issues.apache.org/jira/browse/SPARK-19680

is this issue have any method to patch it ? I met the same problem.

2017-06-20


lk_spark 

Re: Re: Re: how to call udf with parameters

2017-06-15 Thread lk_spark
thanks Kumar ,  that really helpful !!


2017-06-16 

lk_spark 



发件人:Pralabh Kumar <pralabhku...@gmail.com>
发送时间:2017-06-16 18:30
主题:Re: Re: how to call udf with parameters
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"user.spark"<user@spark.apache.org>

val getlength=udf((idx1:Int,idx2:Int, data : String)=> 
data.substring(idx1,idx2))



data.select(getlength(lit(1),lit(2),data("col1"))).collect



On Fri, Jun 16, 2017 at 10:22 AM, Pralabh Kumar <pralabhku...@gmail.com> wrote:

Use lit , give me some time , I'll provide an example


On 16-Jun-2017 10:15 AM, "lk_spark" <lk_sp...@163.com> wrote:

thanks Kumar , I want to know how to cao udf with multiple parameters , maybe 
an udf to make a substr function,how can I pass parameter with begin and end 
index ?  I try it with errors. Does the udf parameters could only be a column 
type?

2017-06-16 

lk_spark 



发件人:Pralabh Kumar <pralabhku...@gmail.com>
发送时间:2017-06-16 17:49
主题:Re: how to call udf with parameters
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"user.spark"<user@spark.apache.org>

sample UDF
val getlength=udf((data:String)=>data.length())

data.select(getlength(data("col1")))



On Fri, Jun 16, 2017 at 9:21 AM, lk_spark <lk_sp...@163.com> wrote:

hi,all
 I define a udf with multiple parameters  ,but I don't know how to call it 
with DataFrame

UDF:

def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean, 
minTermLen: Int) =>
val terms = HanLP.segment(sentence).asScala
.

Call :

scala> val output = input.select(ssplit2($"text",true,true,2).as('words))
:40: error: type mismatch;
 found   : Boolean(true)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
 ^
:40: error: type mismatch;
 found   : Boolean(true)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
  ^
:40: error: type mismatch;
 found   : Int(2)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
   ^

scala> val output = 
input.select(ssplit2($"text",$"true",$"true",$"2").as('words))
org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given input 
columns: [id, text];;
'Project [UDF(text#6, 'true, 'true, '2) AS words#16]
+- Project [_1#2 AS id#5, _2#3 AS text#6]
   +- LocalRelation [_1#2, _2#3]


I need help!!


2017-06-16


lk_spark 

Re: Re: how to call udf with parameters

2017-06-15 Thread lk_spark
thanks Kumar , I want to know how to cao udf with multiple parameters , maybe 
an udf to make a substr function,how can I pass parameter with begin and end 
index ?  I try it with errors. Does the udf parameters could only be a column 
type?

2017-06-16 

lk_spark 



发件人:Pralabh Kumar <pralabhku...@gmail.com>
发送时间:2017-06-16 17:49
主题:Re: how to call udf with parameters
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"user.spark"<user@spark.apache.org>

sample UDF
val getlength=udf((data:String)=>data.length())

data.select(getlength(data("col1")))



On Fri, Jun 16, 2017 at 9:21 AM, lk_spark <lk_sp...@163.com> wrote:

hi,all
 I define a udf with multiple parameters  ,but I don't know how to call it 
with DataFrame

UDF:

def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean, 
minTermLen: Int) =>
val terms = HanLP.segment(sentence).asScala
.

Call :

scala> val output = input.select(ssplit2($"text",true,true,2).as('words))
:40: error: type mismatch;
 found   : Boolean(true)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
 ^
:40: error: type mismatch;
 found   : Boolean(true)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
  ^
:40: error: type mismatch;
 found   : Int(2)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
   ^

scala> val output = 
input.select(ssplit2($"text",$"true",$"true",$"2").as('words))
org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given input 
columns: [id, text];;
'Project [UDF(text#6, 'true, 'true, '2) AS words#16]
+- Project [_1#2 AS id#5, _2#3 AS text#6]
   +- LocalRelation [_1#2, _2#3]


I need help!!


2017-06-16


lk_spark 

how to call udf with parameters

2017-06-15 Thread lk_spark
hi,all
 I define a udf with multiple parameters  ,but I don't know how to call it 
with DataFrame
 
UDF:

def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean, 
minTermLen: Int) =>
val terms = HanLP.segment(sentence).asScala
.

Call :

scala> val output = input.select(ssplit2($"text",true,true,2).as('words))
:40: error: type mismatch;
 found   : Boolean(true)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
 ^
:40: error: type mismatch;
 found   : Boolean(true)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
  ^
:40: error: type mismatch;
 found   : Int(2)
 required: org.apache.spark.sql.Column
   val output = input.select(ssplit2($"text",true,true,2).as('words))
   ^

scala> val output = 
input.select(ssplit2($"text",$"true",$"true",$"2").as('words))
org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given input 
columns: [id, text];;
'Project [UDF(text#6, 'true, 'true, '2) AS words#16]
+- Project [_1#2 AS id#5, _2#3 AS text#6]
   +- LocalRelation [_1#2, _2#3]


I need help!!


2017-06-16


lk_spark 

spark on yarn cluster model can't use saveAsTable ?

2017-05-15 Thread lk_spark
hi,all:
I have a test under spark2.1.0 , which read txt files as DataFrame and 
save to hive . When I submit the app jar with yarn client model it works well , 
but If I submit with cluster model , it will not create table and write data , 
and I didn't find any error log ... can anybody give me some clue?

2017-05-15


lk_spark 

Re: Re: Re: how to call recommend method from ml.recommendation.ALS

2017-03-15 Thread lk_spark
Tank you , that's what I want to confirm.

2017-03-16 

lk_spark 



发件人:Yuhao Yang <hhb...@gmail.com>
发送时间:2017-03-16 13:05
主题:Re: Re: how to call recommend method from ml.recommendation.ALS
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"任弘迪"<ryan.hd@gmail.com>,"user.spark"<user@spark.apache.org>

This is something that was just added to ML and will probably be released with 
2.2. For now you can try to copy from the master code: 
https://github.com/apache/spark/blob/70f9d7f71c63d2b1fdfed75cb7a59285c272a62b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L352
 and give it a try.


Yuhao


2017-03-15 21:39 GMT-07:00 lk_spark <lk_sp...@163.com>:

thanks for your reply , what I exactly want to know is :
in package mllib.recommendation  , MatrixFactorizationModel have method like 
recommendProducts , but I didn't find it in package ml.recommendation.
how can I do the samething as mllib when I use ml. 
2017-03-16 

lk_spark 



发件人:任弘迪 <ryan.hd@gmail.com>
发送时间:2017-03-16 10:46
主题:Re: how to call recommend method from ml.recommendation.ALS
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"user.spark"<user@spark.apache.org>

if the num of user-item pairs to predict aren't too large, say millions, you 
could transform the target dataframe and save the result to a hive table, then 
build cache based on that table for online services. 


if it's not the case(such as billions of user item pairs to predict), you have 
to start a service with the model loaded, send user to the service, first match 
several hundreds of items from all items available which could itself be 
another service or cache, then transform this user and all items using the 
model to get prediction, and return items ordered by prediction.


On Thu, Mar 16, 2017 at 9:32 AM, lk_spark <lk_sp...@163.com> wrote:

hi,all:
   under spark2.0 ,I wonder to know after trained a 
ml.recommendation.ALSModel how I can do the recommend action?

   I try to save the model and load it by MatrixFactorizationModel but got 
error.

2017-03-16


lk_spark 

Re: Re: how to call recommend method from ml.recommendation.ALS

2017-03-15 Thread lk_spark
thanks for your reply , what I exactly want to know is :
in package mllib.recommendation  , MatrixFactorizationModel have method like 
recommendProducts , but I didn't find it in package ml.recommendation.
how can I do the samething as mllib when I use ml. 
2017-03-16 

lk_spark 



发件人:任弘迪 <ryan.hd@gmail.com>
发送时间:2017-03-16 10:46
主题:Re: how to call recommend method from ml.recommendation.ALS
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"user.spark"<user@spark.apache.org>

if the num of user-item pairs to predict aren't too large, say millions, you 
could transform the target dataframe and save the result to a hive table, then 
build cache based on that table for online services.


if it's not the case(such as billions of user item pairs to predict), you have 
to start a service with the model loaded, send user to the service, first match 
several hundreds of items from all items available which could itself be 
another service or cache, then transform this user and all items using the 
model to get prediction, and return items ordered by prediction.


On Thu, Mar 16, 2017 at 9:32 AM, lk_spark <lk_sp...@163.com> wrote:

hi,all:
   under spark2.0 ,I wonder to know after trained a 
ml.recommendation.ALSModel how I can do the recommend action?

   I try to save the model and load it by MatrixFactorizationModel but got 
error.

2017-03-16


lk_spark 

how to call recommend method from ml.recommendation.ALS

2017-03-15 Thread lk_spark
hi,all:
   under spark2.0 ,I wonder to know after trained a 
ml.recommendation.ALSModel how I can do the recommend action?

   I try to save the model and load it by MatrixFactorizationModel but got 
error.

2017-03-16


lk_spark 

java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext

2017-02-26 Thread lk_spark
hi,all:
   I want to extract some info from kafka useing sparkstream,my code like :
   
val keyword = ""
val system = "dmp"
val datetime_idx = 0
val datetime_length = 23
val logLevelBeginIdx = datetime_length + 2 - 1
val logLevelMaxLenght = 5
   
val lines = messages.filter(record => 
record.value().matches("\\d{4}.*")).map(record => {
  val assembly = record.topic()
  val value = record.value
  val datatime = value.substring(datetime_idx, datetime_length - 1)
  val level = value.substring(logLevelBeginIdx, logLevelBeginIdx + 
logLevelMaxLenght - 1)
  (assembly,value,datatime,level)
})

I will get error :
Caused by: java.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Serialization stack:
 - object not serializable (class: org.apache.spark.streaming.StreamingContext, 
value: org.apache.spark.streaming.StreamingContext@5a457aa1)
 - field (class: $iw, name: streamingContext, type: class 
org.apache.spark.streaming.StreamingContext)
 - object (class $iw, $iw@38eb2140)
 - field (class: $iw, name: $iw, type: class $iw)
 - object (class $iw, $iw@2a3ced3d)
 - field (class: $iw, name: $iw, type: class $iw)
 - object (class $iw, $iw@7c5dbca5)

==
  if I change the parameter to constant I will not got error  :
  
  val lines = messages.filter(record => 
record.value().matches("\\d{4}.*")).map(record => {
  val assembly = record.topic()
  val value = record.value
  val datatime = value.substring(0, 22)
  val level = value.substring(24, 27)
  (assembly,value,datatime,level)
  
})

how can I pass parameter to the map function.

2017-02-27


lk_spark 

help,I want to call spark-submit from java shell

2017-01-20 Thread lk_spark
hi,all:
   undering spark2.0 with hadoop 2.7.2
   my code like this:
   
   String c1 = "/bin/sh";
   String c2 = "-c";
   StringBuilder sb = new StringBuilder("cd 
/home/hadoop/dmp/spark-2.0.2-bin-hadoop2.7/bin;spark-submit --class 
com.hua.spark.dataload.DataLoadFromBase64JSON --master yarn --deploy-mode 
client  /home/hadoop/dmp/dataload-1.0-SNAPSHOT-jar-with-dependencies.jar ");
   Process pro = Runtime.getRuntime().exec(new String[]{c1,c2,sb.toString()});
   pro.waitFor();

  on the same node, I can exec the commond sucess from terminal , but in java I 
got error :  


17/01/20 06:39:05 ERROR TransportChannelHandler: Connection to 
/192.168.0.136:51197 has been quiet for 12 ms while there are outstanding 
requests. Assuming connection is dead; please adjust spark.network.timeout if 
this is wrong.
17/01/20 06:39:05 ERROR TransportResponseHandler: Still have 1 requests 
outstanding when connection from /192.168.0.136:51197 is closed
17/01/20 06:39:05 WARN NettyRpcEnv: Ignored failure: java.io.IOException: 
Connection from /192.168.0.136:51197 closed
17/01/20 06:39:05 ERROR CoarseGrainedExecutorBackend: Cannot register with 
driver: spark://CoarseGrainedScheduler@192.168.0.136:51197
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 
seconds. This timeout is controlled by spark.rpc.askTimeout
 at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
 at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
 at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
 at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
 at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
 at scala.util.Try$.apply(Try.scala:192)
 at scala.util.Failure.recover(Try.scala:216)
 at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
 at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at 
org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
 at 
scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at scala.concurrent.Promise$class.complete(Promise.scala:55)
 at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
 at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
 at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
 at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
 at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
 at 
scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
 at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
 at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
 at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
 at 
org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
 at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 
120 seconds
 ... 8 more
17/01/20 06:39:05 ERROR CoarseGrainedExecutorBackend: Driver 
192.168.0.136:51197 disassociated! Shutting down.

   

2017-01-20


lk_spark 

how to dynamic partition dataframe

2017-01-17 Thread lk_spark
hi,all:
I want partition data by reading a config file who tells me how to 
partition current input data.
DataFrameWriter have a method named with : partitionBy(colNames: String*): 
DataFrameWriter[T]   
why I can't pass parameter format with Seq[String] or Array[String]?

2017-01-18


lk_spark 

how to use newAPIHadoopFile

2017-01-16 Thread lk_spark
hi,all
I have a test with spark 2.0:

I have a test file: field delimiter with \t
kevin 30 2016
shen 30 2016
kai 33 2016
wei 30 2016
after useing:
var datas: RDD[(LongWritable, String)] = 
sc.newAPIHadoopFile(inputPath+filename, classOf[TextInputFormat], 
classOf[LongWritable], classOf[Text], hadoopConf).map { case (key, value) =>
(key, new String(value.getBytes, decode))
}
and I save RDD to hdfs I got this:
(0,kevin 30 2016)
(14,shen 30 20166)
(27,kai 33 201666)
(39,wei 30 201666)
It looks like after the reader read a line and it did't clean it's buffer or 
something?

2017-01-17


lk_spark 

Re: Re: Re: how to change datatype by useing StructType

2017-01-12 Thread lk_spark
a better way to answer my question:  use GenericRow instead of Row

val rows: RDD[Row] = spark.sparkContext.textFile("/sourcedata/test/test1").map {
  line =>
{
  val attributes: Array[String] = line.split(",")
  val ab = ArrayBuffer[Any]()
  
  for (i <- 0 until schemaType.length) {
if (schemaType(i).equalsIgnoreCase("int")) {
  ab += attributes(i).toInt
} else if (schemaType(i).equalsIgnoreCase("long")) {
  ab += attributes(i).toLong
} else {
  ab += attributes(i)
}
  }

  new GenericRow(ab.toArray)
    }
}

2017-01-13 

lk_spark 



发件人:"lk_spark"<lk_sp...@163.com>
发送时间:2017-01-13 09:49
主题:Re: Re: Re: how to change datatype by useing StructType
收件人:"Nicholas Hakobian"<nicholas.hakob...@rallyhealth.com>
抄送:"user.spark"<user@spark.apache.org>

Thank you Nicholas , if the sourcedata was csv format ,CSV reader  works well.

2017-01-13 

lk_spark 



发件人:Nicholas Hakobian <nicholas.hakob...@rallyhealth.com>
发送时间:2017-01-13 08:35
主题:Re: Re: Re: how to change datatype by useing StructType
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org>

Have you tried the native CSV reader (in spark 2) or the Databricks CSV reader 
(in 1.6). 


If your format is in a CSV like format it'll load it directly into a DataFrame. 
Its possible you have some rows where types are inconsistent.


Nicholas Szandor Hakobian, Ph.D. 
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com




On Thu, Jan 12, 2017 at 1:52 AM, lk_spark <lk_sp...@163.com> wrote:

I have try like this:
  
  val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*")
  val rowRDD = peopleRDD.map(_.split(",")).map(attributes => {
  val ab = ArrayBuffer[Any]()
  for (i <- 0 until schemaType.length) {
if (schemaType(i).equalsIgnoreCase("int")) {
  ab += attributes(i).toInt
} else if (schemaType(i).equalsIgnoreCase("long")) {
  ab += attributes(i).toLong
} else {
  ab += attributes(i)
}
  }
  Row(ab.toArray)
})

val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF .show

I got error:
 Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a valid 
external type for schema of string
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)

all the file was Any, what should I do?



2017-01-12 

lk_spark 



发件人:"lk_spark"<lk_sp...@163.com>
发送时间:2017-01-12 14:38
主题:Re: Re: how to change datatype by useing StructType
收件人:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org>
抄送:

yes, field year is in my data:

data:
  kevin,30,2016
  shen,30,2016
  kai,33,2016
  wei,30,2016

this will not work 
   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 
Row(attributes(0),attributes(1),attributes(2)))
but I need read data by configurable.
2017-01-12 

lk_spark 



发件人:ayan guha <guha.a...@gmail.com>
发送时间:2017-01-12 14:34
主题:Re: how to change datatype by useing StructType
收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org>
抄送:

Do you have year in your data?

On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote:





















hi,all



I have a txt file ,and I want to process it as dataframe 

:







data like this :



   name1,30



   name2,18







val schemaString = "name age year"


val xMap=new 

scala.collection.mutable.HashMap[String,DataType]()


xMap.put("name", StringType)
xMap.put("age", 

IntegerType)
xMap.put("year", 

IntegerType)

val fields = 

schemaString.split(" ").map(fieldName => StructField(fieldName, 

xMap.get(fieldName).get, nullable = true))
val schema = 

StructType(fields)

val peopleRDD = 

spark.sparkContext.textFile("/sourcedata/test/test*")


//spark.read.schema(schema).text("/sourcedata/test/test*")



val rowRDD = peopleRDD.map(_.split(",")).map(attributes 

=> Row(attributes(0),attributes(1))







// Apply the schema to the RDD
val 

peopleDF = spark.createDataFrame(rowRDD, schema)  







but when I write it to table or show it I will got 

error:











   Caused by: java.lang.

Re: Re: Re: how to change datatype by useing StructType

2017-01-12 Thread lk_spark
Thank you Nicholas , if the sourcedata was csv format ,CSV reader  works well.

2017-01-13 

lk_spark 



发件人:Nicholas Hakobian <nicholas.hakob...@rallyhealth.com>
发送时间:2017-01-13 08:35
主题:Re: Re: Re: how to change datatype by useing StructType
收件人:"lk_spark"<lk_sp...@163.com>
抄送:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org>

Have you tried the native CSV reader (in spark 2) or the Databricks CSV reader 
(in 1.6).


If your format is in a CSV like format it'll load it directly into a DataFrame. 
Its possible you have some rows where types are inconsistent.


Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com




On Thu, Jan 12, 2017 at 1:52 AM, lk_spark <lk_sp...@163.com> wrote:

I have try like this:
  
  val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*")
  val rowRDD = peopleRDD.map(_.split(",")).map(attributes => {
  val ab = ArrayBuffer[Any]()
  for (i <- 0 until schemaType.length) {
if (schemaType(i).equalsIgnoreCase("int")) {
  ab += attributes(i).toInt
} else if (schemaType(i).equalsIgnoreCase("long")) {
  ab += attributes(i).toLong
} else {
  ab += attributes(i)
}
  }
  Row(ab.toArray)
})

val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF .show

I got error:
 Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a valid 
external type for schema of string
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)

all the file was Any, what should I do?



2017-01-12 

lk_spark 



发件人:"lk_spark"<lk_sp...@163.com>
发送时间:2017-01-12 14:38
主题:Re: Re: how to change datatype by useing StructType
收件人:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org>
抄送:

yes, field year is in my data:

data:
  kevin,30,2016
  shen,30,2016
  kai,33,2016
  wei,30,2016

this will not work 
   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 
Row(attributes(0),attributes(1),attributes(2)))
but I need read data by configurable.
2017-01-12 

lk_spark 



发件人:ayan guha <guha.a...@gmail.com>
发送时间:2017-01-12 14:34
主题:Re: how to change datatype by useing StructType
收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org>
抄送:

Do you have year in your data?

On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote:





















hi,all



I have a txt file ,and I want to process it as dataframe 

:







data like this :



   name1,30



   name2,18







val schemaString = "name age year"


val xMap=new 

scala.collection.mutable.HashMap[String,DataType]()


xMap.put("name", StringType)
xMap.put("age", 

IntegerType)
xMap.put("year", 

IntegerType)

val fields = 

schemaString.split(" ").map(fieldName => StructField(fieldName, 

xMap.get(fieldName).get, nullable = true))
val schema = 

StructType(fields)

val peopleRDD = 

spark.sparkContext.textFile("/sourcedata/test/test*")


//spark.read.schema(schema).text("/sourcedata/test/test*")



val rowRDD = peopleRDD.map(_.split(",")).map(attributes 

=> Row(attributes(0),attributes(1))







// Apply the schema to the RDD
val 

peopleDF = spark.createDataFrame(rowRDD, schema)  







but when I write it to table or show it I will got 

error:











   Caused by: java.lang.RuntimeException: Error while encoding: 

java.lang.RuntimeException: java.lang.String is not a valid external type for 

schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top 

level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true) AS name#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, 

true], top level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true)







   if I change my code it will work:



   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 

Row(attributes(0),attributes(1).toInt)



   but this is not a good idea .







2017-01-12







lk_spark 

Re: Re: Re: how to change datatype by useing StructType

2017-01-12 Thread lk_spark
I have try like this:
  
  val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*")
  val rowRDD = peopleRDD.map(_.split(",")).map(attributes => {
  val ab = ArrayBuffer[Any]()
  for (i <- 0 until schemaType.length) {
if (schemaType(i).equalsIgnoreCase("int")) {
  ab += attributes(i).toInt
} else if (schemaType(i).equalsIgnoreCase("long")) {
  ab += attributes(i).toLong
} else {
  ab += attributes(i)
}
  }
  Row(ab.toArray)
})

val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF .show

I got error:
 Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a valid 
external type for schema of string
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
  at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)

all the file was Any, what should I do?



2017-01-12 

lk_spark 



发件人:"lk_spark"<lk_sp...@163.com>
发送时间:2017-01-12 14:38
主题:Re: Re: how to change datatype by useing StructType
收件人:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org>
抄送:

yes, field year is in my data:

data:
  kevin,30,2016
  shen,30,2016
  kai,33,2016
  wei,30,2016

this will not work 
   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 
Row(attributes(0),attributes(1),attributes(2)))
but I need read data by configurable.
2017-01-12 

lk_spark 



发件人:ayan guha <guha.a...@gmail.com>
发送时间:2017-01-12 14:34
主题:Re: how to change datatype by useing StructType
收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org>
抄送:

Do you have year in your data?

On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote:





















hi,all



I have a txt file ,and I want to process it as dataframe 

:







data like this :



   name1,30



   name2,18







val schemaString = "name age year"


val xMap=new 

scala.collection.mutable.HashMap[String,DataType]()


xMap.put("name", StringType)
xMap.put("age", 

IntegerType)
xMap.put("year", 

IntegerType)

val fields = 

schemaString.split(" ").map(fieldName => StructField(fieldName, 

xMap.get(fieldName).get, nullable = true))
val schema = 

StructType(fields)

val peopleRDD = 

spark.sparkContext.textFile("/sourcedata/test/test*")


//spark.read.schema(schema).text("/sourcedata/test/test*")



val rowRDD = peopleRDD.map(_.split(",")).map(attributes 

=> Row(attributes(0),attributes(1))







// Apply the schema to the RDD
val 

peopleDF = spark.createDataFrame(rowRDD, schema)  







but when I write it to table or show it I will got 

error:











   Caused by: java.lang.RuntimeException: Error while encoding: 

java.lang.RuntimeException: java.lang.String is not a valid external type for 

schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top 

level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true) AS name#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, 

true], top level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true)







   if I change my code it will work:



   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 

Row(attributes(0),attributes(1).toInt)



   but this is not a good idea .







2017-01-12







lk_spark 

Re: Re: how to change datatype by useing StructType

2017-01-11 Thread lk_spark
yes, field year is in my data:

data:
  kevin,30,2016
  shen,30,2016
  kai,33,2016
  wei,30,2016

this will not work 
   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 
Row(attributes(0),attributes(1),attributes(2)))
but I need read data by configurable.
2017-01-12 

lk_spark 



发件人:ayan guha <guha.a...@gmail.com>
发送时间:2017-01-12 14:34
主题:Re: how to change datatype by useing StructType
收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org>
抄送:

Do you have year in your data?

On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote:





















hi,all



I have a txt file ,and I want to process it as dataframe 

:







data like this :



   name1,30



   name2,18







val schemaString = "name age year"


val xMap=new 

scala.collection.mutable.HashMap[String,DataType]()


xMap.put("name", StringType)
xMap.put("age", 

IntegerType)
xMap.put("year", 

IntegerType)

val fields = 

schemaString.split(" ").map(fieldName => StructField(fieldName, 

xMap.get(fieldName).get, nullable = true))
val schema = 

StructType(fields)

val peopleRDD = 

spark.sparkContext.textFile("/sourcedata/test/test*")


//spark.read.schema(schema).text("/sourcedata/test/test*")



val rowRDD = peopleRDD.map(_.split(",")).map(attributes 

=> Row(attributes(0),attributes(1))







// Apply the schema to the RDD
val 

peopleDF = spark.createDataFrame(rowRDD, schema)  







but when I write it to table or show it I will got 

error:











   Caused by: java.lang.RuntimeException: Error while encoding: 

java.lang.RuntimeException: java.lang.String is not a valid external type for 

schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top 

level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true) AS name#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, 

true], top level row object).isNullAt) null else staticinvoke(class 

org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 

validateexternaltype(getexternalrowfield(assertnotnull(input[0, 

org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 

true)







   if I change my code it will work:



   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 

Row(attributes(0),attributes(1).toInt)



   but this is not a good idea .







2017-01-12







lk_spark 

how to change datatype by useing StructType

2017-01-11 Thread lk_spark
hi,all
I have a txt file ,and I want to process it as dataframe :

data like this :
   name1,30
   name2,18

val schemaString = "name age year"
val xMap=new scala.collection.mutable.HashMap[String,DataType]()
xMap.put("name", StringType)
xMap.put("age", IntegerType)
xMap.put("year", IntegerType)

val fields = schemaString.split(" ").map(fieldName => 
StructField(fieldName, xMap.get(fieldName).get, nullable = true))
val schema = StructType(fields)

val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*")
//spark.read.schema(schema).text("/sourcedata/test/test*")

val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 
Row(attributes(0),attributes(1))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)  
 
but when I write it to table or show it I will got error:


   Caused by: java.lang.RuntimeException: Error while encoding: 
java.lang.RuntimeException: java.lang.String is not a valid external type for 
schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row 
object).isNullAt) null else staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 
true) AS name#1
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row 
object).isNullAt) null else staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), 
true)

   if I change my code it will work:
   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => 
Row(attributes(0),attributes(1).toInt)
   but this is not a good idea .

2017-01-12


lk_spark 

Re: Re: Re: how to add colum to dataframe

2016-12-06 Thread lk_spark
I have know what is the right way to do it:
val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
val df2 = 
df.withColumn("pa_bid",when(isnull($"url"),"".split("#")(0)).otherwise(split(split(col("url"),"_biz=")(1),
 "")(1)))
scala> df2.select("pa_bid","url").show
+++
|  pa_bid| url|
+++
|MjM5MjEyNTk2MA==|http://mp.weixin|
|MzAxODIwMDcwNA==|http://mp.weixin|
|MzIzMjQ4NzQwOA==|http://mp.weixin|
|MzAwOTIxMTcyMQ==|http://mp.weixin|
|MzA3OTAyNzY2OQ==|http://mp.weixin|
|MjM5NDAzMDAwMA==|http://mp.weixin|
|MzAwMjE4MzU0Nw==|http://mp.weixin|
|MzA4NzcyNjI0Mw==|http://mp.weixin|
|MzI5OTE5Nzc5Ng==|http://mp.weixin|


2016-12-06 

lk_spark 



发件人:"lk_spark"<lk_sp...@163.com>
发送时间:2016-12-06 17:44
主题:Re: Re: how to add colum to dataframe
收件人:"Pankaj Wahane"<pankajwah...@live.com>,"user.spark"<user@spark.apache.org>
抄送:

thanks for reply. I will search how to use na.fill . and I don't know how to 
get the value of the column and do some operation like substr or split.

2016-12-06 

lk_spark 



发件人:Pankaj Wahane <pankajwah...@live.com>
发送时间:2016-12-06 17:39
主题:Re: how to add colum to dataframe
收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org>
抄送:

You may want to try using df2.na.fill(…)
 
From: lk_spark <lk_sp...@163.com>
Date: Tuesday, 6 December 2016 at 3:05 PM
To: "user.spark" <user@spark.apache.org>
Subject: how to add colum to dataframe
 
hi,all:
   my spark version is 2.0
   I have a parquet file with one colum name url type is string,I wang get 
substring from the url and add it to the datafram:
   val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
   val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5)))
   df2.select("pa_bid","url").show
   +--++
|pa_bid| url|
+--++
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
 
Why what I got is null?
 
2016-12-06



lk_spark 

Re: Re: how to add colum to dataframe

2016-12-06 Thread lk_spark
thanks for reply. I will search how to use na.fill . and I don't know how to 
get the value of the column and do some operation like substr or split.

2016-12-06 

lk_spark 



发件人:Pankaj Wahane <pankajwah...@live.com>
发送时间:2016-12-06 17:39
主题:Re: how to add colum to dataframe
收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org>
抄送:

You may want to try using df2.na.fill(…)
 
From: lk_spark <lk_sp...@163.com>
Date: Tuesday, 6 December 2016 at 3:05 PM
To: "user.spark" <user@spark.apache.org>
Subject: how to add colum to dataframe
 
hi,all:
   my spark version is 2.0
   I have a parquet file with one colum name url type is string,I wang get 
substring from the url and add it to the datafram:
   val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
   val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5)))
   df2.select("pa_bid","url").show
   +--++
|pa_bid| url|
+--++
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
 
Why what I got is null?
 
2016-12-06



lk_spark 

how to add colum to dataframe

2016-12-06 Thread lk_spark
hi,all:
   my spark version is 2.0
   I have a parquet file with one colum name url type is string,I wang get 
substring from the url and add it to the datafram:
   val df = spark.read.parquet("/parquetdata/weixin/page/month=201607")
   val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5)))
   df2.select("pa_bid","url").show
   +--++
|pa_bid| url|
+--++
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|
|  null|http://mp.weixin|

Why what I got is null?


2016-12-06


lk_spark 

Re:RE: how to merge dataframe write output files

2016-11-10 Thread lk_spark
thank you for reply,Shreya:
It's because the files is too small and hdfs dosen't like small file . 
for your question. yes I want to create ExternalTable on the parquetfile 
floder. And how to use fragmented files as you mention?


the tests case as below:
bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g 
--executor-memory 8g --executor-cores 2 --num-executors 4
val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/")
df.createOrReplaceTempView("biztag") #almost 70M   3673411 rows
val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608")
df2.createOrReplaceTempView("biz1608")#almost 90M   381700  rows
for(i <- 1 to 61) {
  val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left 
join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}")
  dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}")
 }





At 2016-11-10 15:47:02, "Shreya Agarwal" <shrey...@microsoft.com> wrote:


Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

 

Not to mention that it’ll make your write incredibly slow and also it’ll take 
away all the speed of reading in the data from a parquet as there won’t be any 
parallelism at the time of input (if you try to input this parquet).

 

Again, the important question is – Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it’ll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

 

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark <user@spark.apache.org>
Subject: how to merge dataframe write output files

 

hi,all:

when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times 

Container exited with a non-zero exit code 143

more an more... 

-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc

more an more... 

2016-11-10

lk_spark

how to merge dataframe write output files

2016-11-09 Thread lk_spark
hi,all:
when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times 
Container exited with a non-zero exit code 143
more an more... 
-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc
more an more... 
2016-11-10


lk_spark 

Re: Re: How to iterate the element of an array in DataFrame?

2016-10-21 Thread lk_spark
how about change Schema from
root
 |-- category.firstCategory: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- category: string (nullable = true)
 |||-- weight: string (nullable = true)

to:

root
 |-- category: string (nullable = true)
 |-- weight: string (nullable = true)

2016-10-21 

lk_spark 



发件人:颜发才(Yan Facai) <yaf...@gmail.com>
发送时间:2016-10-21 15:35
主题:Re: How to iterate the element of an array in DataFrame?
收件人:"user.spark"<user@spark.apache.org>
抄送:

I don't know how to construct `array<struct<category:string,weight:string>>`.
Could anyone help me?


I try to get the array by :
scala> mblog_tags.map(_.getSeq[(String, String)](0))

while the result is:
res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value: 
array<struct<_1:string,_2:string>>]




How to express `struct<string, string>` ?






On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote:

Hi, I want to extract the attribute `weight` of an array, and combine them to 
construct a sparse vector. 



### My data is like this:

scala> mblog_tags.printSchema
root
 |-- category.firstCategory: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- category: string (nullable = true)
 |||-- weight: string (nullable = true)


scala> mblog_tags.show(false)
+--+
|category.firstCategory|
+--+
|[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]|
|[[tagCategory_029, 0.9]]  |
|[[tagCategory_029, 0.8]]  |
+--+



### And expected:
Vectors.sparse(100, Array(60, 29),  Array(0.8, 0.7))
Vectors.sparse(100, Array(29),  Array(0.9))
Vectors.sparse(100, Array(29),  Array(0.8))


How to iterate an array in DataFrame?

Thanks.

Spark ExternalTable doesn't recognize subdir

2016-10-19 Thread lk_spark
hi,all
   my issue is everyday I will receive some json datafile , I want to convert 
them to parquet file and save to hdfs,
   the floder will like this:
   /my_table_base_floder 
  /my_table_base_floder/day_2
  /my_table_base_floder/day_3
  
 
where the parquet files of "day_1" was store in  /my_table_base_floder 
then I run : 
sqlContext.createExternalTable("tpc1.customer","hdfs://master1:9000/my_table_base_floder","parquet")
but when I save parquet file to subdir ,for example :  
/my_table_base_floder/day_2  and refresh the metadata.
spark doesn't recognize the data in subdir. How I can do it ?


2016-10-20


lk_spark 

Re: Re: how to extract arraytype data to file

2016-10-18 Thread lk_spark
Thank you, all of you. explode() is helpful:

df.selectExpr("explode(bizs) as e").select("e.*").show()

2016-10-19 

lk_spark 



发件人:Hyukjin Kwon <gurwls...@gmail.com>
发送时间:2016-10-19 13:16
主题:Re: how to extract arraytype data to file
收件人:"Divya Gehlot"<divya.htco...@gmail.com>
抄送:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org>

This reminds me of 
https://github.com/databricks/spark-xml/issues/141#issuecomment-234835577

Maybe using explode() would be helpful.


Thanks!


2016-10-19 14:05 GMT+09:00 Divya Gehlot <divya.htco...@gmail.com>:

http://stackoverflow.com/questions/33864389/how-can-i-create-a-spark-dataframe-from-a-nested-array-of-struct-element



Hope this helps 




Thanks,
Divya 


On 19 October 2016 at 11:35, lk_spark <lk_sp...@163.com> wrote:

hi,all:
I want to read a json file and search it by sql .
the data struct should be :
bid: string (nullable = true)
code: string (nullable = true)
and the json file data should be like :
 {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"}
 {"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}
but in fact my json file data is :
{"bizs":[ 
{bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}]}
{"bizs":[ 
{bid":"MzI4MTI5Mzcy00==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc201==","code":"西早君"}]}

I load it by spark ,data schema shows like this :
root
 |-- bizs: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- bid: string (nullable = true)
 |||-- code: string (nullable = true)

I can select columns by : df.select("bizs.id","bizs.name")
but the colume values is in array type:
+++
|  id|code|
+++
|[4938200, 4938201...|[罗甸网警, 室内设计师杨焰红, ...|
|[4938300, 4938301...|[SDCS十全九美, 旅梦长大, ...|
|[4938400, 4938401...|[日重重工液压行走回转, 氧老家,...|
|[4938500, 4938501...|[PABXSLZ, 陈少燕, 笑蜜...|
|[4938600, 4938601...|[税海微云, 西域美农云家店, 福...|
+++

what I want is I can read colum in normal row type. how I can do it ?

2016-10-19


lk_spark 

how to extract arraytype data to file

2016-10-18 Thread lk_spark
hi,all:
I want to read a json file and search it by sql .
the data struct should be :
bid: string (nullable = true)
code: string (nullable = true)
and the json file data should be like :
 {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"}
 {"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}
but in fact my json file data is :
{"bizs":[ 
{bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}]}
{"bizs":[ 
{bid":"MzI4MTI5Mzcy00==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc201==","code":"西早君"}]}

I load it by spark ,data schema shows like this :
root
 |-- bizs: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- bid: string (nullable = true)
 |||-- code: string (nullable = true)

I can select columns by : df.select("bizs.id","bizs.name")
but the colume values is in array type:
+++
|  id|code|
+++
|[4938200, 4938201...|[罗甸网警, 室内设计师杨焰红, ...|
|[4938300, 4938301...|[SDCS十全九美, 旅梦长大, ...|
|[4938400, 4938401...|[日重重工液压行走回转, 氧老家,...|
|[4938500, 4938501...|[PABXSLZ, 陈少燕, 笑蜜...|
|[4938600, 4938601...|[税海微云, 西域美农云家店, 福...|
+++

what I want is I can read colum in normal row type. how I can do it ?

2016-10-19


lk_spark