Re:Upgrading from Spark SQL 3.2 to 3.3 faild
I need to use cast function to surround computed expression then explain the SQL is ok, for example : cast(a.Split_Amt * b.percent / 100 asdecimal(20,8)) as split_amt I don't know why , is there a config property could compatibility with spark3.2 ? At 2023-02-16 13:47:25, "lk_spark" wrote: hi,all : I have a sql statement wich can be run on spark 3.2.1 but not on spark 3.3.1 . when I try to explain it, will got error with message: org.apache.spark.sql.catalyst.expressions.Literal cannot be cast to org.apache.spark.sql.catalyst.expressions.AnsiCast execute the sql, error stack is : ERROR ExecuteStatement: Error operating ExecuteStatement: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.Literal cannot be cast to org.apache.spark.sql.catalyst.expressions.AnsiCast at org.apache.spark.sql.catalyst.expressions.CheckOverflowInTableInsert.withNewChildInternal(Cast.scala:2362) at org.apache.spark.sql.catalyst.expressions.CheckOverflowInTableInsert.withNewChildInternal(Cast.scala:2360) at org.apache.spark.sql.catalyst.trees.UnaryLike.$anonfun$mapChildren$5(TreeNode.scala:1233) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1232) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:498) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:635) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:635) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:498) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:635) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:188) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:200) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:200) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:211) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:216) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:216) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:221) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:221) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:188) at org.apache.spark.sql.catalyst.optimizer.NullPropagation$$anonfun$apply$19.applyOrElse(expressions.scala:804) at org.apache.spark.sql.catalyst.optimizer.NullPropagation$$anonfun$apply$19.applyOrElse(expressions.scala:801) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode
Upgrading from Spark SQL 3.2 to 3.3 faild
hi,all : I have a sql statement wich can be run on spark 3.2.1 but not on spark 3.3.1 . when I try to explain it, will got error with message: org.apache.spark.sql.catalyst.expressions.Literal cannot be cast to org.apache.spark.sql.catalyst.expressions.AnsiCast execute the sql, error stack is : ERROR ExecuteStatement: Error operating ExecuteStatement: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.Literal cannot be cast to org.apache.spark.sql.catalyst.expressions.AnsiCast at org.apache.spark.sql.catalyst.expressions.CheckOverflowInTableInsert.withNewChildInternal(Cast.scala:2362) at org.apache.spark.sql.catalyst.expressions.CheckOverflowInTableInsert.withNewChildInternal(Cast.scala:2360) at org.apache.spark.sql.catalyst.trees.UnaryLike.$anonfun$mapChildren$5(TreeNode.scala:1233) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1232) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:498) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:635) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:635) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:498) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:635) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:188) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:200) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:200) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:211) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:216) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:216) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:221) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:427) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:221) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:188) at org.apache.spark.sql.catalyst.optimizer.NullPropagation$$anonfun$apply$19.applyOrElse(expressions.scala:804) at org.apache.spark.sql.catalyst.optimizer.NullPropagation$$anonfun$apply$19.applyOrElse(expressions.scala:801) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at
Does 'Stage cancelled because SparkContext was shut down' is a error
hi,all : when I try to merge a iceberg table by spark , I can see faild job on spark ui , but the spark application final state is SUCCEEDED. I submit an issue : https://github.com/apache/iceberg/issues/5876 I wonder to know is this a real error ? thanks .
Re:NoSuchMethodError: org.apache.spark.sql.execution.command.CreateViewCommand.copy
sorry, it's my env problem. At 2022-03-21 14:00:01, "lk_spark" wrote: hi, all : I got a strange error: bin/spark-shell --deploy-mode client Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/03/21 13:51:39 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs. 22/03/21 13:51:46 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs. 22/03/21 13:51:46 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered! Spark context Web UI available at http://client-10-0-161-29:4040 Spark context available as 'sc' (master = yarn, app id = application_1644825367082_16937). Spark session available as 'spark'. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.2.1 /_/ Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_281) Type in expressions to have them evaluated. Type :help for more information. scala> val parqfile = spark.read.parquet("/tmp/datax/tmp/python/ods_io_install/ods_io_install/") parqfile: org.apache.spark.sql.DataFrame = [spid: string, region_rule: string ... 7 more fields] scala> parqfile.printSchema root |-- spid: string (nullable = true) |-- region_rule: string (nullable = true) |-- app_version: string (nullable = true) |-- device_id: string (nullable = true) |-- is_install: string (nullable = true) |-- last_install_time: string (nullable = true) |-- last_uninstall_time: string (nullable = true) |-- last_use_time: string (nullable = true) |-- pdate: integer (nullable = true) scala> parqfile.show(2) +-+-+---++--+-+---+++ | spid| region_rule|app_version| device_id|is_install|last_install_time|last_uninstall_time| last_use_time| pdate| +-+-+---++--+-+---+++ |13025|北京市房屋建筑与装饰工程预算定额计...| 1.0.29.2|ea68f0cc-7038-43a...| 1| null| null|2021-06-05 11:49:...|20220320| |13025| 山东省建筑工程消耗量定额计算规则(...| 1.0.31.0|c16e1260-5700-4a4...| 1| null| null|2022-01-08 17:55:...|20220320| +-+-+---++--+-+---+++ only showing top 2 rows scala> parqfile.createOrReplaceTempView("ods_io_install_temp") 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask replaced a previously registered function. 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_hash replaced a previously registered function. 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_first_n replaced a previously registered function. 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_last_n replaced a previously registered function. 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_show_last_n replaced a previously registered function. 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_show_first_n replaced a previously registered function. java.lang.NoSuchMethodError: org.apache.spark.sql.execution.command.CreateViewCommand.copy(Lorg/apache/spark/sql/catalyst/TableIdentifier;Lscala/collection/Seq;Lscala/Option;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;ZZLorg/apache/spark/sql/catalyst/analysis/ViewType;Z)Lorg/apache/spark/sql/execution/command/CreateViewCommand; at org.apache.spark.sql.catalyst.optimizer.SubmarineRowFilterExtension.apply(SubmarineRowFilterExtension.scala:125) at org.apache.spark.sql.catalyst.optimizer.SubmarineRowFilterExtension.apply(SubmarineRowFilterExtension.scala:41) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.cataly
NoSuchMethodError: org.apache.spark.sql.execution.command.CreateViewCommand.copy
hi, all : I got a strange error: bin/spark-shell --deploy-mode client Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/03/21 13:51:39 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs. 22/03/21 13:51:46 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs. 22/03/21 13:51:46 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered! Spark context Web UI available at http://client-10-0-161-29:4040 Spark context available as 'sc' (master = yarn, app id = application_1644825367082_16937). Spark session available as 'spark'. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.2.1 /_/ Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_281) Type in expressions to have them evaluated. Type :help for more information. scala> val parqfile = spark.read.parquet("/tmp/datax/tmp/python/ods_io_install/ods_io_install/") parqfile: org.apache.spark.sql.DataFrame = [spid: string, region_rule: string ... 7 more fields] scala> parqfile.printSchema root |-- spid: string (nullable = true) |-- region_rule: string (nullable = true) |-- app_version: string (nullable = true) |-- device_id: string (nullable = true) |-- is_install: string (nullable = true) |-- last_install_time: string (nullable = true) |-- last_uninstall_time: string (nullable = true) |-- last_use_time: string (nullable = true) |-- pdate: integer (nullable = true) scala> parqfile.show(2) +-+-+---++--+-+---+++ | spid| region_rule|app_version| device_id|is_install|last_install_time|last_uninstall_time| last_use_time| pdate| +-+-+---++--+-+---+++ |13025|北京市房屋建筑与装饰工程预算定额计...| 1.0.29.2|ea68f0cc-7038-43a...| 1| null| null|2021-06-05 11:49:...|20220320| |13025| 山东省建筑工程消耗量定额计算规则(...| 1.0.31.0|c16e1260-5700-4a4...| 1| null| null|2022-01-08 17:55:...|20220320| +-+-+---++--+-+---+++ only showing top 2 rows scala> parqfile.createOrReplaceTempView("ods_io_install_temp") 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask replaced a previously registered function. 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_hash replaced a previously registered function. 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_first_n replaced a previously registered function. 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_last_n replaced a previously registered function. 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_show_last_n replaced a previously registered function. 22/03/21 13:54:38 WARN analysis.SimpleFunctionRegistry: The function mask_show_first_n replaced a previously registered function. java.lang.NoSuchMethodError: org.apache.spark.sql.execution.command.CreateViewCommand.copy(Lorg/apache/spark/sql/catalyst/TableIdentifier;Lscala/collection/Seq;Lscala/Option;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;ZZLorg/apache/spark/sql/catalyst/analysis/ViewType;Z)Lorg/apache/spark/sql/execution/command/CreateViewCommand; at org.apache.spark.sql.catalyst.optimizer.SubmarineRowFilterExtension.apply(SubmarineRowFilterExtension.scala:125) at org.apache.spark.sql.catalyst.optimizer.SubmarineRowFilterExtension.apply(SubmarineRowFilterExtension.scala:41) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) at
Why NPE happen with multi threading in cluster mode but not client model
hi,all : I'm using spark2.4, I try to use multi thread to use sparkcontext , I found a example : https://hadoopist.wordpress.com/2017/02/03/how-to-use-threads-in-spark-job-to-achieve-parallel-read-and-writes/ some code like this : for (a <- 0 until 4) { val thread = new Thread { override def run() { sc.parallelize(Array("ddd", "eee", "fff")).toDF().write.format("parquet").mode("overWrite").save("/tmp/vgiri/file2") . when I run the code in local or client model , it will be work. But In cluster model I meet the ERROR: SPARK-29046 https://issues.apache.org/jira/browse/SPARK-29046 finally , I use java.util.concurrent.Semaphore to wait all the sub thread to be finished before the main thread stop the sparkcontext , then the code worked in cluster model. But I don't understand why In local or client model ,even I didn't use a Semaphore, the code can also work ?
Re: how spark structrued stream write to kudu
I found _sqlContext is null , how to resolve it ? 2019-11-25 lk_spark 发件人:"lk_spark" 发送时间:2019-11-25 16:00 主题:how spark structrued stream write to kudu 收件人:"user.spark" 抄送: hi,all: I'm using spark 2.4.4 to readstream data from kafka and want to write to kudu 1.7.0 , my code like below : val kuduContext = new KuduContext("master:7051", spark.sparkContext) val console = cnew.select("*").as[CstoreNew] .writeStream .option("checkpointLocation", "/tmp/t3/") .trigger(Trigger.Once()) .foreach(new ForeachWriter[CstoreNew] { override def open(partitionId: Long, version: Long): Boolean = { true } override def process(value: CstoreNew): Unit = { val spark = SparkSessionSingleton.getInstance(sparkConf) val valueDF = Seq(value).toDF() // GET WRONG kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data") } override def close(errorOrNull: Throwable): Unit = { } }) val query = console.start() query.awaitTermination() when run to val valueDF = Seq(value).toDF() I got error msg : Caused by: java.lang.NullPointerException at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228) at com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122) ... and SQLImplicits.scala:228 is : 227: implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { 228:DatasetHolder(_sqlContext.createDataset(s)) 229: } can anyone give me some help? 2019-11-25 lk_spark
how spark structrued stream write to kudu
hi,all: I'm using spark 2.4.4 to readstream data from kafka and want to write to kudu 1.7.0 , my code like below : val kuduContext = new KuduContext("master:7051", spark.sparkContext) val console = cnew.select("*").as[CstoreNew] .writeStream .option("checkpointLocation", "/tmp/t3/") .trigger(Trigger.Once()) .foreach(new ForeachWriter[CstoreNew] { override def open(partitionId: Long, version: Long): Boolean = { true } override def process(value: CstoreNew): Unit = { val spark = SparkSessionSingleton.getInstance(sparkConf) val valueDF = Seq(value).toDF() // GET WRONG kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data") } override def close(errorOrNull: Throwable): Unit = { } }) val query = console.start() query.awaitTermination() when run to val valueDF = Seq(value).toDF() I got error msg : Caused by: java.lang.NullPointerException at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228) at com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122) ... and SQLImplicits.scala:228 is : 227: implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { 228:DatasetHolder(_sqlContext.createDataset(s)) 229: } can anyone give me some help? 2019-11-25 lk_spark
how to limit tasks num when read hive with orc
hi,all: I have a hive table STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' , many files of it is very small , when I use spark to read it , thousands tasks will start , how can I limit the task num ? 2019-11-12 lk_spark
Re: Re: how can I dynamic parse json in kafka when using Structured Streaming
I want to parse the Struct of data dynamically , then write data to delta lake , I think it can automatically merge scheme. 2019-09-17 lk_spark 发件人:Tathagata Das 发送时间:2019-09-17 16:13 主题:Re: how can I dynamic parse json in kafka when using Structured Streaming 收件人:"lk_spark" 抄送:"user.spark" You can use from_json built-in SQL function to parse json. https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column- On Mon, Sep 16, 2019 at 7:39 PM lk_spark wrote: hi,all : I'm using Structured Streaming to read kafka , the data type is json String , I want to parse it and conver to a datafrme , my code can't pass compile , I don't know how to fix it: val lines = messages.selectExpr("CAST(value AS STRING) as value").as[String] val words = lines.map(line => { var json: JValue = null try { json = parse(line) } catch { case ex: Exception => { println(ex.getMessage + " " + line) } } //var result: scala.collection.mutable.Map[String,String] = scala.collection.mutable.Map() val jsonObj = json.values.asInstanceOf[Map[String, _]] val valuse = jsonObj.values.toArray val schema = StructType(List()) for ((k, v) <- jsonObj){ //result += (k -> jsonObj.get(k).toString()) if(v.isInstanceOf[String]){ schema.add(k,StringType) }else if (v.isInstanceOf[Int]){ schema.add(k,IntegerType) }/*else if (v.isInstanceOf[Array[String]]){ schema.add(k,ArrayType(StringType)) }else if (v.isInstanceOf[Map[String,String]]){ schema.add(k,MapType(StringType,StringType)) }*/ } val row = new GenericRowWithSchema(valuse,schema) row }) Error:(45, 26) Unable to find encoder for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is needed to store org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. val words = lines.map(line => { Error:(45, 26) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema]. Unspecified value parameter evidence$6. val words = lines.map(line => { 2019-09-17 lk_spark
how can I dynamic parse json in kafka when using Structured Streaming
hi,all : I'm using Structured Streaming to read kafka , the data type is json String , I want to parse it and conver to a datafrme , my code can't pass compile , I don't know how to fix it: val lines = messages.selectExpr("CAST(value AS STRING) as value").as[String] val words = lines.map(line => { var json: JValue = null try { json = parse(line) } catch { case ex: Exception => { println(ex.getMessage + " " + line) } } //var result: scala.collection.mutable.Map[String,String] = scala.collection.mutable.Map() val jsonObj = json.values.asInstanceOf[Map[String, _]] val valuse = jsonObj.values.toArray val schema = StructType(List()) for ((k, v) <- jsonObj){ //result += (k -> jsonObj.get(k).toString()) if(v.isInstanceOf[String]){ schema.add(k,StringType) }else if (v.isInstanceOf[Int]){ schema.add(k,IntegerType) }/*else if (v.isInstanceOf[Array[String]]){ schema.add(k,ArrayType(StringType)) }else if (v.isInstanceOf[Map[String,String]]){ schema.add(k,MapType(StringType,StringType)) }*/ } val row = new GenericRowWithSchema(valuse,schema) row }) Error:(45, 26) Unable to find encoder for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is needed to store org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. val words = lines.map(line => { Error:(45, 26) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema]. Unspecified value parameter evidence$6. val words = lines.map(line => { 2019-09-17 lk_spark
how to get spark-sql lineage
hi,all: When I use spark , if I run some SQL to do ETL how can I get lineage info. I found that , CDH spark have some config about lineage : spark.lineage.enabled=true spark.lineage.log.dir=/var/log/spark2/lineage Are they also work for apache spark ? 2019-05-16 lk_spark
Re: Re: how to generate a larg dataset paralleled
sorry, now what I can do is like this : var df5 = spark.read.parquet("/user/devuser/testdata/df1").coalesce(1) df5 = df5.union(df5).union(df5).union(df5).union(df5) 2018-12-14 lk_spark 发件人:15313776907 <15313776...@163.com> 发送时间:2018-12-14 16:39 主题:Re: how to generate a larg dataset paralleled 收件人:"lk_sp...@163.com" 抄送:"user@spark.apache.org" I also have this problem, hope to be able to solve here, thank you On 12/14/2018 10:38,lk_spark wrote: hi,all: I want't to generate some test data , which contained about one hundred million rows . I create a dataset have ten rows ,and I do df.union operation in 'for' circulation , but this will case the operation only happen on driver node. how can I do it on the whole cluster. 2018-12-14 lk_spark
Re: Re: how to generate a larg dataset paralleled
generate some data in Spark . 2018-12-14 lk_spark 发件人:Jean Georges Perrin 发送时间:2018-12-14 11:10 主题:Re: how to generate a larg dataset paralleled 收件人:"lk_spark" 抄送:"user.spark" You just want to generate some data in Spark or ingest a large dataset outside of Spark? What’s the ultimate goal you’re pursuing? jg On Dec 13, 2018, at 21:38, lk_spark wrote: hi,all: I want't to generate some test data , which contained about one hundred million rows . I create a dataset have ten rows ,and I do df.union operation in 'for' circulation , but this will case the operation only happen on driver node. how can I do it on the whole cluster. 2018-12-14 lk_spark
how to generate a larg dataset paralleled
hi,all: I want't to generate some test data , which contained about one hundred million rows . I create a dataset have ten rows ,and I do df.union operation in 'for' circulation , but this will case the operation only happen on driver node. how can I do it on the whole cluster. 2018-12-14 lk_spark
Re: about LIVY-424
I'm using livy-0.5.0 with spark2.3.0,I started a session with 4GB mem for Driver, And I run code server times : var tmp1 = spark.sql("use tpcds_bin_partitioned_orc_2");var tmp2 = spark.sql("select count(1) from tpcds_bin_partitioned_orc_2.store_sales").show the table have 5760749 rows data. after run about 10 times , the Driver physical memory will beyond 4.5GB and killed by yarn. I saw the old generation memory keep growing and can not release by gc. 2018-11-12 lk_spark 发件人:"lk_hadoop" 发送时间:2018-11-12 09:37 主题:about LIVY-424 收件人:"user" 抄送: hi,all: I meet this issue https://issues.apache.org/jira/browse/LIVY-424 , anybody know how to resolve it? 2018-11-12 lk_hadoop
Re: spark2.3 on kubernets
resolved. need to add "kubernetes.default.svc" to k8s api server TLS config. 2018-04-08 lk_spark 发件人:"lk_spark"<lk_sp...@163.com> 发送时间:2018-04-08 11:15 主题:spark2.3 on kubernets 收件人:"user"<user@spark.apache.org> 抄送: hi,all: I am trying spark on k8s with Pi sample. I got error with driver : 2018-04-08 03:08:40 INFO SparkContext:54 - Successfully stopped SparkContext Exception in thread "main" org.apache.spark.SparkException: External scheduler cannot be instantiated at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747) at org.apache.spark.SparkContext.(SparkContext.scala:492) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921) at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:31) at org.apache.spark.examples.SparkPi.main(SparkPi.scala) Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [spark-pi-d439505012c5362ea6d26ea19df58bc1-driver] in namespace: [default] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62) at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120) at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741) ... 8 more Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname kubernetes.default.svc not verified: certificate: sha256/Buyjy7p4sLKA7rLJqT0N/Cr3sM82fyi3pmx+eSht0eI= DN: CN=kube-apiserver, OU=Kubernetes-manual, O=Kubernetes, L=Taipei, ST=Taipei, C=TW subjectAltNames: [10.96.0.1, 192.168.0.140, 127.0.0.1, kubernetes.default] at okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:290) at okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251) at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151) at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195) at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at io.fabric8.kubernetes.client.utils.HttpClientUtils$2.intercept(HttpClientUtils.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185) at okhttp3.RealCall.execute(RealCall.java:69) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:377) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:343) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:312) at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:295) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleGet(BaseOperation.java:783) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:217) ... 12 more I
spark2.3 on kubernets
es.SparkPi \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.executor.instances=5 \ --conf spark.kubernetes.node.selector.spark=true \ --conf spark.kubernetes.authenticate.driver.clientKeyFile=/etc/kubernetes/pki/apiserver.pem \ --conf spark.kubernetes.container.image=dpaas/apache/spark:2.3.0-v1 \ local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar 2018-04-08 lk_spark
Re: Re: Re: spark2.1 kafka0.10
thank you Kumar , I will try it later. 2017-06-22 lk_spark 发件人:Pralabh Kumar <pralabhku...@gmail.com> 发送时间:2017-06-22 20:20 主题:Re: Re: spark2.1 kafka0.10 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"user.spark"<user@spark.apache.org> It looks like your replicas for partition are getting failed. If u have more brokers , can u try increasing ,replicas ,just to make sure atleast one leader is always available. On Thu, Jun 22, 2017 at 10:34 AM, lk_spark <lk_sp...@163.com> wrote: each topic have 5 partition , 2 replicas . 2017-06-22 lk_spark 发件人:Pralabh Kumar <pralabhku...@gmail.com> 发送时间:2017-06-22 17:23 主题:Re: spark2.1 kafka0.10 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"user.spark"<user@spark.apache.org> How many replicas ,you have for this topic . On Thu, Jun 22, 2017 at 9:19 AM, lk_spark <lk_sp...@163.com> wrote: java.lang.IllegalStateException: No current assignment for partition pages-2 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 2017-06-22 lk_spark 发件人:"lk_spark"<lk_sp...@163.com> 发送时间:2017-06-22 11:13 主题:spark2.1 kafka0.10 收件人:"user.spark"<user@spark.apache.org> 抄送: hi,all: when I run stream application for a few minutes ,I got this error : 17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1 17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1 17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 with generation 3 17/06/22 10:34:56 INFO ConsumerCoor
Re: Re: spark2.1 kafka0.10
each topic have 5 partition , 2 replicas . 2017-06-22 lk_spark 发件人:Pralabh Kumar <pralabhku...@gmail.com> 发送时间:2017-06-22 17:23 主题:Re: spark2.1 kafka0.10 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"user.spark"<user@spark.apache.org> How many replicas ,you have for this topic . On Thu, Jun 22, 2017 at 9:19 AM, lk_spark <lk_sp...@163.com> wrote: java.lang.IllegalStateException: No current assignment for partition pages-2 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 2017-06-22 lk_spark 发件人:"lk_spark"<lk_sp...@163.com> 发送时间:2017-06-22 11:13 主题:spark2.1 kafka0.10 收件人:"user.spark"<user@spark.apache.org> 抄送: hi,all: when I run stream application for a few minutes ,I got this error : 17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1 17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1 17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 with generation 3 17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, bizs-4, pages-4, weibo-4, clicks-1, comment-1, clicks-3, weibocomment-4, weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1 17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time 1498098896000 ms java.lang.IllegalStateException: No current assignment for partition pages-2 I don't know why ? 2017-06-22 lk_spark
Re: spark2.1 kafka0.10
java.lang.IllegalStateException: No current assignment for partition pages-2 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 2017-06-22 lk_spark 发件人:"lk_spark"<lk_sp...@163.com> 发送时间:2017-06-22 11:13 主题:spark2.1 kafka0.10 收件人:"user.spark"<user@spark.apache.org> 抄送: hi,all: when I run stream application for a few minutes ,I got this error : 17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1 17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1 17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 with generation 3 17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, bizs-4, pages-4, weibo-4, clicks-1, comment-1, clicks-3, weibocomment-4, weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1 17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time 1498098896000 ms java.lang.IllegalStateException: No current assignment for partition pages-2 I don't know why ? 2017-06-22 lk_spark
spark2.1 kafka0.10
hi,all: when I run stream application for a few minutes ,I got this error : 17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4, weibo-4, clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4, weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-1, weibo-1, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1 17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1 17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group youedata1 with generation 3 17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1, weibocomment-0, bizs-2, bizs-4, pages-4, weibo-4, clicks-1, comment-1, clicks-3, weibocomment-4, weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-3, clicks-2, weibo-3, clicks-4, comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1 17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time 1498098896000 ms java.lang.IllegalStateException: No current assignment for partition pages-2 I don't know why ? 2017-06-22 lk_spark
spark2.1 and kafka0.10
hi,all : https://issues.apache.org/jira/browse/SPARK-19680 is this issue have any method to patch it ? I met the same problem. 2017-06-20 lk_spark
Re: Re: Re: how to call udf with parameters
thanks Kumar , that really helpful !! 2017-06-16 lk_spark 发件人:Pralabh Kumar <pralabhku...@gmail.com> 发送时间:2017-06-16 18:30 主题:Re: Re: how to call udf with parameters 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"user.spark"<user@spark.apache.org> val getlength=udf((idx1:Int,idx2:Int, data : String)=> data.substring(idx1,idx2)) data.select(getlength(lit(1),lit(2),data("col1"))).collect On Fri, Jun 16, 2017 at 10:22 AM, Pralabh Kumar <pralabhku...@gmail.com> wrote: Use lit , give me some time , I'll provide an example On 16-Jun-2017 10:15 AM, "lk_spark" <lk_sp...@163.com> wrote: thanks Kumar , I want to know how to cao udf with multiple parameters , maybe an udf to make a substr function,how can I pass parameter with begin and end index ? I try it with errors. Does the udf parameters could only be a column type? 2017-06-16 lk_spark 发件人:Pralabh Kumar <pralabhku...@gmail.com> 发送时间:2017-06-16 17:49 主题:Re: how to call udf with parameters 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"user.spark"<user@spark.apache.org> sample UDF val getlength=udf((data:String)=>data.length()) data.select(getlength(data("col1"))) On Fri, Jun 16, 2017 at 9:21 AM, lk_spark <lk_sp...@163.com> wrote: hi,all I define a udf with multiple parameters ,but I don't know how to call it with DataFrame UDF: def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean, minTermLen: Int) => val terms = HanLP.segment(sentence).asScala . Call : scala> val output = input.select(ssplit2($"text",true,true,2).as('words)) :40: error: type mismatch; found : Boolean(true) required: org.apache.spark.sql.Column val output = input.select(ssplit2($"text",true,true,2).as('words)) ^ :40: error: type mismatch; found : Boolean(true) required: org.apache.spark.sql.Column val output = input.select(ssplit2($"text",true,true,2).as('words)) ^ :40: error: type mismatch; found : Int(2) required: org.apache.spark.sql.Column val output = input.select(ssplit2($"text",true,true,2).as('words)) ^ scala> val output = input.select(ssplit2($"text",$"true",$"true",$"2").as('words)) org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given input columns: [id, text];; 'Project [UDF(text#6, 'true, 'true, '2) AS words#16] +- Project [_1#2 AS id#5, _2#3 AS text#6] +- LocalRelation [_1#2, _2#3] I need help!! 2017-06-16 lk_spark
Re: Re: how to call udf with parameters
thanks Kumar , I want to know how to cao udf with multiple parameters , maybe an udf to make a substr function,how can I pass parameter with begin and end index ? I try it with errors. Does the udf parameters could only be a column type? 2017-06-16 lk_spark 发件人:Pralabh Kumar <pralabhku...@gmail.com> 发送时间:2017-06-16 17:49 主题:Re: how to call udf with parameters 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"user.spark"<user@spark.apache.org> sample UDF val getlength=udf((data:String)=>data.length()) data.select(getlength(data("col1"))) On Fri, Jun 16, 2017 at 9:21 AM, lk_spark <lk_sp...@163.com> wrote: hi,all I define a udf with multiple parameters ,but I don't know how to call it with DataFrame UDF: def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean, minTermLen: Int) => val terms = HanLP.segment(sentence).asScala . Call : scala> val output = input.select(ssplit2($"text",true,true,2).as('words)) :40: error: type mismatch; found : Boolean(true) required: org.apache.spark.sql.Column val output = input.select(ssplit2($"text",true,true,2).as('words)) ^ :40: error: type mismatch; found : Boolean(true) required: org.apache.spark.sql.Column val output = input.select(ssplit2($"text",true,true,2).as('words)) ^ :40: error: type mismatch; found : Int(2) required: org.apache.spark.sql.Column val output = input.select(ssplit2($"text",true,true,2).as('words)) ^ scala> val output = input.select(ssplit2($"text",$"true",$"true",$"2").as('words)) org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given input columns: [id, text];; 'Project [UDF(text#6, 'true, 'true, '2) AS words#16] +- Project [_1#2 AS id#5, _2#3 AS text#6] +- LocalRelation [_1#2, _2#3] I need help!! 2017-06-16 lk_spark
how to call udf with parameters
hi,all I define a udf with multiple parameters ,but I don't know how to call it with DataFrame UDF: def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean, minTermLen: Int) => val terms = HanLP.segment(sentence).asScala . Call : scala> val output = input.select(ssplit2($"text",true,true,2).as('words)) :40: error: type mismatch; found : Boolean(true) required: org.apache.spark.sql.Column val output = input.select(ssplit2($"text",true,true,2).as('words)) ^ :40: error: type mismatch; found : Boolean(true) required: org.apache.spark.sql.Column val output = input.select(ssplit2($"text",true,true,2).as('words)) ^ :40: error: type mismatch; found : Int(2) required: org.apache.spark.sql.Column val output = input.select(ssplit2($"text",true,true,2).as('words)) ^ scala> val output = input.select(ssplit2($"text",$"true",$"true",$"2").as('words)) org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given input columns: [id, text];; 'Project [UDF(text#6, 'true, 'true, '2) AS words#16] +- Project [_1#2 AS id#5, _2#3 AS text#6] +- LocalRelation [_1#2, _2#3] I need help!! 2017-06-16 lk_spark
spark on yarn cluster model can't use saveAsTable ?
hi,all: I have a test under spark2.1.0 , which read txt files as DataFrame and save to hive . When I submit the app jar with yarn client model it works well , but If I submit with cluster model , it will not create table and write data , and I didn't find any error log ... can anybody give me some clue? 2017-05-15 lk_spark
Re: Re: Re: how to call recommend method from ml.recommendation.ALS
Tank you , that's what I want to confirm. 2017-03-16 lk_spark 发件人:Yuhao Yang <hhb...@gmail.com> 发送时间:2017-03-16 13:05 主题:Re: Re: how to call recommend method from ml.recommendation.ALS 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"任弘迪"<ryan.hd@gmail.com>,"user.spark"<user@spark.apache.org> This is something that was just added to ML and will probably be released with 2.2. For now you can try to copy from the master code: https://github.com/apache/spark/blob/70f9d7f71c63d2b1fdfed75cb7a59285c272a62b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L352 and give it a try. Yuhao 2017-03-15 21:39 GMT-07:00 lk_spark <lk_sp...@163.com>: thanks for your reply , what I exactly want to know is : in package mllib.recommendation , MatrixFactorizationModel have method like recommendProducts , but I didn't find it in package ml.recommendation. how can I do the samething as mllib when I use ml. 2017-03-16 lk_spark 发件人:任弘迪 <ryan.hd@gmail.com> 发送时间:2017-03-16 10:46 主题:Re: how to call recommend method from ml.recommendation.ALS 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"user.spark"<user@spark.apache.org> if the num of user-item pairs to predict aren't too large, say millions, you could transform the target dataframe and save the result to a hive table, then build cache based on that table for online services. if it's not the case(such as billions of user item pairs to predict), you have to start a service with the model loaded, send user to the service, first match several hundreds of items from all items available which could itself be another service or cache, then transform this user and all items using the model to get prediction, and return items ordered by prediction. On Thu, Mar 16, 2017 at 9:32 AM, lk_spark <lk_sp...@163.com> wrote: hi,all: under spark2.0 ,I wonder to know after trained a ml.recommendation.ALSModel how I can do the recommend action? I try to save the model and load it by MatrixFactorizationModel but got error. 2017-03-16 lk_spark
Re: Re: how to call recommend method from ml.recommendation.ALS
thanks for your reply , what I exactly want to know is : in package mllib.recommendation , MatrixFactorizationModel have method like recommendProducts , but I didn't find it in package ml.recommendation. how can I do the samething as mllib when I use ml. 2017-03-16 lk_spark 发件人:任弘迪 <ryan.hd@gmail.com> 发送时间:2017-03-16 10:46 主题:Re: how to call recommend method from ml.recommendation.ALS 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"user.spark"<user@spark.apache.org> if the num of user-item pairs to predict aren't too large, say millions, you could transform the target dataframe and save the result to a hive table, then build cache based on that table for online services. if it's not the case(such as billions of user item pairs to predict), you have to start a service with the model loaded, send user to the service, first match several hundreds of items from all items available which could itself be another service or cache, then transform this user and all items using the model to get prediction, and return items ordered by prediction. On Thu, Mar 16, 2017 at 9:32 AM, lk_spark <lk_sp...@163.com> wrote: hi,all: under spark2.0 ,I wonder to know after trained a ml.recommendation.ALSModel how I can do the recommend action? I try to save the model and load it by MatrixFactorizationModel but got error. 2017-03-16 lk_spark
how to call recommend method from ml.recommendation.ALS
hi,all: under spark2.0 ,I wonder to know after trained a ml.recommendation.ALSModel how I can do the recommend action? I try to save the model and load it by MatrixFactorizationModel but got error. 2017-03-16 lk_spark
java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
hi,all: I want to extract some info from kafka useing sparkstream,my code like : val keyword = "" val system = "dmp" val datetime_idx = 0 val datetime_length = 23 val logLevelBeginIdx = datetime_length + 2 - 1 val logLevelMaxLenght = 5 val lines = messages.filter(record => record.value().matches("\\d{4}.*")).map(record => { val assembly = record.topic() val value = record.value val datatime = value.substring(datetime_idx, datetime_length - 1) val level = value.substring(logLevelBeginIdx, logLevelBeginIdx + logLevelMaxLenght - 1) (assembly,value,datatime,level) }) I will get error : Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Serialization stack: - object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@5a457aa1) - field (class: $iw, name: streamingContext, type: class org.apache.spark.streaming.StreamingContext) - object (class $iw, $iw@38eb2140) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@2a3ced3d) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@7c5dbca5) == if I change the parameter to constant I will not got error : val lines = messages.filter(record => record.value().matches("\\d{4}.*")).map(record => { val assembly = record.topic() val value = record.value val datatime = value.substring(0, 22) val level = value.substring(24, 27) (assembly,value,datatime,level) }) how can I pass parameter to the map function. 2017-02-27 lk_spark
help,I want to call spark-submit from java shell
hi,all: undering spark2.0 with hadoop 2.7.2 my code like this: String c1 = "/bin/sh"; String c2 = "-c"; StringBuilder sb = new StringBuilder("cd /home/hadoop/dmp/spark-2.0.2-bin-hadoop2.7/bin;spark-submit --class com.hua.spark.dataload.DataLoadFromBase64JSON --master yarn --deploy-mode client /home/hadoop/dmp/dataload-1.0-SNAPSHOT-jar-with-dependencies.jar "); Process pro = Runtime.getRuntime().exec(new String[]{c1,c2,sb.toString()}); pro.waitFor(); on the same node, I can exec the commond sucess from terminal , but in java I got error : 17/01/20 06:39:05 ERROR TransportChannelHandler: Connection to /192.168.0.136:51197 has been quiet for 12 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 17/01/20 06:39:05 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /192.168.0.136:51197 is closed 17/01/20 06:39:05 WARN NettyRpcEnv: Ignored failure: java.io.IOException: Connection from /192.168.0.136:51197 closed 17/01/20 06:39:05 ERROR CoarseGrainedExecutorBackend: Cannot register with driver: spark://CoarseGrainedScheduler@192.168.0.136:51197 org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216) at scala.util.Try$.apply(Try.scala:192) at scala.util.Failure.recover(Try.scala:216) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326) at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.complete(Promise.scala:55) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205) at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds ... 8 more 17/01/20 06:39:05 ERROR CoarseGrainedExecutorBackend: Driver 192.168.0.136:51197 disassociated! Shutting down. 2017-01-20 lk_spark
how to dynamic partition dataframe
hi,all: I want partition data by reading a config file who tells me how to partition current input data. DataFrameWriter have a method named with : partitionBy(colNames: String*): DataFrameWriter[T] why I can't pass parameter format with Seq[String] or Array[String]? 2017-01-18 lk_spark
how to use newAPIHadoopFile
hi,all I have a test with spark 2.0: I have a test file: field delimiter with \t kevin 30 2016 shen 30 2016 kai 33 2016 wei 30 2016 after useing: var datas: RDD[(LongWritable, String)] = sc.newAPIHadoopFile(inputPath+filename, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConf).map { case (key, value) => (key, new String(value.getBytes, decode)) } and I save RDD to hdfs I got this: (0,kevin 30 2016) (14,shen 30 20166) (27,kai 33 201666) (39,wei 30 201666) It looks like after the reader read a line and it did't clean it's buffer or something? 2017-01-17 lk_spark
Re: Re: Re: how to change datatype by useing StructType
a better way to answer my question: use GenericRow instead of Row val rows: RDD[Row] = spark.sparkContext.textFile("/sourcedata/test/test1").map { line => { val attributes: Array[String] = line.split(",") val ab = ArrayBuffer[Any]() for (i <- 0 until schemaType.length) { if (schemaType(i).equalsIgnoreCase("int")) { ab += attributes(i).toInt } else if (schemaType(i).equalsIgnoreCase("long")) { ab += attributes(i).toLong } else { ab += attributes(i) } } new GenericRow(ab.toArray) } } 2017-01-13 lk_spark 发件人:"lk_spark"<lk_sp...@163.com> 发送时间:2017-01-13 09:49 主题:Re: Re: Re: how to change datatype by useing StructType 收件人:"Nicholas Hakobian"<nicholas.hakob...@rallyhealth.com> 抄送:"user.spark"<user@spark.apache.org> Thank you Nicholas , if the sourcedata was csv format ,CSV reader works well. 2017-01-13 lk_spark 发件人:Nicholas Hakobian <nicholas.hakob...@rallyhealth.com> 发送时间:2017-01-13 08:35 主题:Re: Re: Re: how to change datatype by useing StructType 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org> Have you tried the native CSV reader (in spark 2) or the Databricks CSV reader (in 1.6). If your format is in a CSV like format it'll load it directly into a DataFrame. Its possible you have some rows where types are inconsistent. Nicholas Szandor Hakobian, Ph.D. Senior Data Scientist Rally Health nicholas.hakob...@rallyhealth.com On Thu, Jan 12, 2017 at 1:52 AM, lk_spark <lk_sp...@163.com> wrote: I have try like this: val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*") val rowRDD = peopleRDD.map(_.split(",")).map(attributes => { val ab = ArrayBuffer[Any]() for (i <- 0 until schemaType.length) { if (schemaType(i).equalsIgnoreCase("int")) { ab += attributes(i).toInt } else if (schemaType(i).equalsIgnoreCase("long")) { ab += attributes(i).toLong } else { ab += attributes(i) } } Row(ab.toArray) }) val peopleDF = spark.createDataFrame(rowRDD, schema) peopleDF .show I got error: Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a valid external type for schema of string at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) all the file was Any, what should I do? 2017-01-12 lk_spark 发件人:"lk_spark"<lk_sp...@163.com> 发送时间:2017-01-12 14:38 主题:Re: Re: how to change datatype by useing StructType 收件人:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org> 抄送: yes, field year is in my data: data: kevin,30,2016 shen,30,2016 kai,33,2016 wei,30,2016 this will not work val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2))) but I need read data by configurable. 2017-01-12 lk_spark 发件人:ayan guha <guha.a...@gmail.com> 发送时间:2017-01-12 14:34 主题:Re: how to change datatype by useing StructType 收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org> 抄送: Do you have year in your data? On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote: hi,all I have a txt file ,and I want to process it as dataframe : data like this : name1,30 name2,18 val schemaString = "name age year" val xMap=new scala.collection.mutable.HashMap[String,DataType]() xMap.put("name", StringType) xMap.put("age", IntegerType) xMap.put("year", IntegerType) val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, xMap.get(fieldName).get, nullable = true)) val schema = StructType(fields) val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*") //spark.read.schema(schema).text("/sourcedata/test/test*") val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) but when I write it to table or show it I will got error: Caused by: java.lang.
Re: Re: Re: how to change datatype by useing StructType
Thank you Nicholas , if the sourcedata was csv format ,CSV reader works well. 2017-01-13 lk_spark 发件人:Nicholas Hakobian <nicholas.hakob...@rallyhealth.com> 发送时间:2017-01-13 08:35 主题:Re: Re: Re: how to change datatype by useing StructType 收件人:"lk_spark"<lk_sp...@163.com> 抄送:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org> Have you tried the native CSV reader (in spark 2) or the Databricks CSV reader (in 1.6). If your format is in a CSV like format it'll load it directly into a DataFrame. Its possible you have some rows where types are inconsistent. Nicholas Szandor Hakobian, Ph.D. Senior Data Scientist Rally Health nicholas.hakob...@rallyhealth.com On Thu, Jan 12, 2017 at 1:52 AM, lk_spark <lk_sp...@163.com> wrote: I have try like this: val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*") val rowRDD = peopleRDD.map(_.split(",")).map(attributes => { val ab = ArrayBuffer[Any]() for (i <- 0 until schemaType.length) { if (schemaType(i).equalsIgnoreCase("int")) { ab += attributes(i).toInt } else if (schemaType(i).equalsIgnoreCase("long")) { ab += attributes(i).toLong } else { ab += attributes(i) } } Row(ab.toArray) }) val peopleDF = spark.createDataFrame(rowRDD, schema) peopleDF .show I got error: Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a valid external type for schema of string at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) all the file was Any, what should I do? 2017-01-12 lk_spark 发件人:"lk_spark"<lk_sp...@163.com> 发送时间:2017-01-12 14:38 主题:Re: Re: how to change datatype by useing StructType 收件人:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org> 抄送: yes, field year is in my data: data: kevin,30,2016 shen,30,2016 kai,33,2016 wei,30,2016 this will not work val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2))) but I need read data by configurable. 2017-01-12 lk_spark 发件人:ayan guha <guha.a...@gmail.com> 发送时间:2017-01-12 14:34 主题:Re: how to change datatype by useing StructType 收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org> 抄送: Do you have year in your data? On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote: hi,all I have a txt file ,and I want to process it as dataframe : data like this : name1,30 name2,18 val schemaString = "name age year" val xMap=new scala.collection.mutable.HashMap[String,DataType]() xMap.put("name", StringType) xMap.put("age", IntegerType) xMap.put("year", IntegerType) val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, xMap.get(fieldName).get, nullable = true)) val schema = StructType(fields) val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*") //spark.read.schema(schema).text("/sourcedata/test/test*") val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) but when I write it to table or show it I will got error: Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#1 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) if I change my code it will work: val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).toInt) but this is not a good idea . 2017-01-12 lk_spark
Re: Re: Re: how to change datatype by useing StructType
I have try like this: val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*") val rowRDD = peopleRDD.map(_.split(",")).map(attributes => { val ab = ArrayBuffer[Any]() for (i <- 0 until schemaType.length) { if (schemaType(i).equalsIgnoreCase("int")) { ab += attributes(i).toInt } else if (schemaType(i).equalsIgnoreCase("long")) { ab += attributes(i).toLong } else { ab += attributes(i) } } Row(ab.toArray) }) val peopleDF = spark.createDataFrame(rowRDD, schema) peopleDF .show I got error: Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a valid external type for schema of string at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) all the file was Any, what should I do? 2017-01-12 lk_spark 发件人:"lk_spark"<lk_sp...@163.com> 发送时间:2017-01-12 14:38 主题:Re: Re: how to change datatype by useing StructType 收件人:"ayan guha"<guha.a...@gmail.com>,"user.spark"<user@spark.apache.org> 抄送: yes, field year is in my data: data: kevin,30,2016 shen,30,2016 kai,33,2016 wei,30,2016 this will not work val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2))) but I need read data by configurable. 2017-01-12 lk_spark 发件人:ayan guha <guha.a...@gmail.com> 发送时间:2017-01-12 14:34 主题:Re: how to change datatype by useing StructType 收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org> 抄送: Do you have year in your data? On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote: hi,all I have a txt file ,and I want to process it as dataframe : data like this : name1,30 name2,18 val schemaString = "name age year" val xMap=new scala.collection.mutable.HashMap[String,DataType]() xMap.put("name", StringType) xMap.put("age", IntegerType) xMap.put("year", IntegerType) val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, xMap.get(fieldName).get, nullable = true)) val schema = StructType(fields) val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*") //spark.read.schema(schema).text("/sourcedata/test/test*") val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) but when I write it to table or show it I will got error: Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#1 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) if I change my code it will work: val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).toInt) but this is not a good idea . 2017-01-12 lk_spark
Re: Re: how to change datatype by useing StructType
yes, field year is in my data: data: kevin,30,2016 shen,30,2016 kai,33,2016 wei,30,2016 this will not work val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2))) but I need read data by configurable. 2017-01-12 lk_spark 发件人:ayan guha <guha.a...@gmail.com> 发送时间:2017-01-12 14:34 主题:Re: how to change datatype by useing StructType 收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org> 抄送: Do you have year in your data? On Thu, 12 Jan 2017 at 5:24 pm, lk_spark <lk_sp...@163.com> wrote: hi,all I have a txt file ,and I want to process it as dataframe : data like this : name1,30 name2,18 val schemaString = "name age year" val xMap=new scala.collection.mutable.HashMap[String,DataType]() xMap.put("name", StringType) xMap.put("age", IntegerType) xMap.put("year", IntegerType) val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, xMap.get(fieldName).get, nullable = true)) val schema = StructType(fields) val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*") //spark.read.schema(schema).text("/sourcedata/test/test*") val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) but when I write it to table or show it I will got error: Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#1 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) if I change my code it will work: val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).toInt) but this is not a good idea . 2017-01-12 lk_spark
how to change datatype by useing StructType
hi,all I have a txt file ,and I want to process it as dataframe : data like this : name1,30 name2,18 val schemaString = "name age year" val xMap=new scala.collection.mutable.HashMap[String,DataType]() xMap.put("name", StringType) xMap.put("age", IntegerType) xMap.put("year", IntegerType) val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, xMap.get(fieldName).get, nullable = true)) val schema = StructType(fields) val peopleRDD = spark.sparkContext.textFile("/sourcedata/test/test*") //spark.read.schema(schema).text("/sourcedata/test/test*") val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) but when I write it to table or show it I will got error: Caused by: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) AS name#1 +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, name), StringType), true) if I change my code it will work: val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).toInt) but this is not a good idea . 2017-01-12 lk_spark
Re: Re: Re: how to add colum to dataframe
I have know what is the right way to do it: val df = spark.read.parquet("/parquetdata/weixin/page/month=201607") val df2 = df.withColumn("pa_bid",when(isnull($"url"),"".split("#")(0)).otherwise(split(split(col("url"),"_biz=")(1), "")(1))) scala> df2.select("pa_bid","url").show +++ | pa_bid| url| +++ |MjM5MjEyNTk2MA==|http://mp.weixin| |MzAxODIwMDcwNA==|http://mp.weixin| |MzIzMjQ4NzQwOA==|http://mp.weixin| |MzAwOTIxMTcyMQ==|http://mp.weixin| |MzA3OTAyNzY2OQ==|http://mp.weixin| |MjM5NDAzMDAwMA==|http://mp.weixin| |MzAwMjE4MzU0Nw==|http://mp.weixin| |MzA4NzcyNjI0Mw==|http://mp.weixin| |MzI5OTE5Nzc5Ng==|http://mp.weixin| 2016-12-06 lk_spark 发件人:"lk_spark"<lk_sp...@163.com> 发送时间:2016-12-06 17:44 主题:Re: Re: how to add colum to dataframe 收件人:"Pankaj Wahane"<pankajwah...@live.com>,"user.spark"<user@spark.apache.org> 抄送: thanks for reply. I will search how to use na.fill . and I don't know how to get the value of the column and do some operation like substr or split. 2016-12-06 lk_spark 发件人:Pankaj Wahane <pankajwah...@live.com> 发送时间:2016-12-06 17:39 主题:Re: how to add colum to dataframe 收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org> 抄送: You may want to try using df2.na.fill(…) From: lk_spark <lk_sp...@163.com> Date: Tuesday, 6 December 2016 at 3:05 PM To: "user.spark" <user@spark.apache.org> Subject: how to add colum to dataframe hi,all: my spark version is 2.0 I have a parquet file with one colum name url type is string,I wang get substring from the url and add it to the datafram: val df = spark.read.parquet("/parquetdata/weixin/page/month=201607") val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5))) df2.select("pa_bid","url").show +--++ |pa_bid| url| +--++ | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| Why what I got is null? 2016-12-06 lk_spark
Re: Re: how to add colum to dataframe
thanks for reply. I will search how to use na.fill . and I don't know how to get the value of the column and do some operation like substr or split. 2016-12-06 lk_spark 发件人:Pankaj Wahane <pankajwah...@live.com> 发送时间:2016-12-06 17:39 主题:Re: how to add colum to dataframe 收件人:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org> 抄送: You may want to try using df2.na.fill(…) From: lk_spark <lk_sp...@163.com> Date: Tuesday, 6 December 2016 at 3:05 PM To: "user.spark" <user@spark.apache.org> Subject: how to add colum to dataframe hi,all: my spark version is 2.0 I have a parquet file with one colum name url type is string,I wang get substring from the url and add it to the datafram: val df = spark.read.parquet("/parquetdata/weixin/page/month=201607") val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5))) df2.select("pa_bid","url").show +--++ |pa_bid| url| +--++ | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| Why what I got is null? 2016-12-06 lk_spark
how to add colum to dataframe
hi,all: my spark version is 2.0 I have a parquet file with one colum name url type is string,I wang get substring from the url and add it to the datafram: val df = spark.read.parquet("/parquetdata/weixin/page/month=201607") val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5))) df2.select("pa_bid","url").show +--++ |pa_bid| url| +--++ | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| Why what I got is null? 2016-12-06 lk_spark
Re:RE: how to merge dataframe write output files
thank you for reply,Shreya: It's because the files is too small and hdfs dosen't like small file . for your question. yes I want to create ExternalTable on the parquetfile floder. And how to use fragmented files as you mention? the tests case as below: bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g --executor-memory 8g --executor-cores 2 --num-executors 4 val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/") df.createOrReplaceTempView("biztag") #almost 70M 3673411 rows val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608") df2.createOrReplaceTempView("biz1608")#almost 90M 381700 rows for(i <- 1 to 61) { val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}") dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}") } At 2016-11-10 15:47:02, "Shreya Agarwal" <shrey...@microsoft.com> wrote: Is there a reason you want to merge the files? The reason you are getting errors (afaik) is because when you try to coalesce and then write, you are forcing all the content to reside on one executor, and the size of data is exceeding the memory you have for storage in your executor, hence causing the container to be killed. We can confirm this if you provide the specs of your cluster. The whole purpose of multiple files is so that each executor can write its partition out in parallel, without having to collect the data in one place. Not to mention that it’ll make your write incredibly slow and also it’ll take away all the speed of reading in the data from a parquet as there won’t be any parallelism at the time of input (if you try to input this parquet). Again, the important question is – Why do you need it to be one file? Are you planning to use it externally? If yes, can you not use fragmented files there? If the data is too big for the Spark executor, it’ll most certainly be too much for JRE or any other runtime to load in memory on a single box. From: lk_spark [mailto:lk_sp...@163.com] Sent: Wednesday, November 9, 2016 11:29 PM To: user.spark <user@spark.apache.org> Subject: how to merge dataframe write output files hi,all: when I call api df.write.parquet ,there is alot of small files : how can I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will get error some times Container exited with a non-zero exit code 143 more an more... -rw-r--r-- 2 hadoop supergroup 14.5 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 16.4 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 14.2 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 14.4 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 16.0 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 14.0 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc more an more... 2016-11-10 lk_spark
how to merge dataframe write output files
hi,all: when I call api df.write.parquet ,there is alot of small files : how can I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will get error some times Container exited with a non-zero exit code 143 more an more... -rw-r--r-- 2 hadoop supergroup 14.5 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 16.4 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 14.2 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 14.4 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 16.0 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 14.0 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc more an more... 2016-11-10 lk_spark
Re: Re: How to iterate the element of an array in DataFrame?
how about change Schema from root |-- category.firstCategory: array (nullable = true) ||-- element: struct (containsNull = true) |||-- category: string (nullable = true) |||-- weight: string (nullable = true) to: root |-- category: string (nullable = true) |-- weight: string (nullable = true) 2016-10-21 lk_spark 发件人:颜发才(Yan Facai) <yaf...@gmail.com> 发送时间:2016-10-21 15:35 主题:Re: How to iterate the element of an array in DataFrame? 收件人:"user.spark"<user@spark.apache.org> 抄送: I don't know how to construct `array<struct<category:string,weight:string>>`. Could anyone help me? I try to get the array by : scala> mblog_tags.map(_.getSeq[(String, String)](0)) while the result is: res40: org.apache.spark.sql.Dataset[Seq[(String, String)]] = [value: array<struct<_1:string,_2:string>>] How to express `struct<string, string>` ? On Thu, Oct 20, 2016 at 4:34 PM, 颜发才(Yan Facai) <yaf...@gmail.com> wrote: Hi, I want to extract the attribute `weight` of an array, and combine them to construct a sparse vector. ### My data is like this: scala> mblog_tags.printSchema root |-- category.firstCategory: array (nullable = true) ||-- element: struct (containsNull = true) |||-- category: string (nullable = true) |||-- weight: string (nullable = true) scala> mblog_tags.show(false) +--+ |category.firstCategory| +--+ |[[tagCategory_060, 0.8], [tagCategory_029, 0.7]]| |[[tagCategory_029, 0.9]] | |[[tagCategory_029, 0.8]] | +--+ ### And expected: Vectors.sparse(100, Array(60, 29), Array(0.8, 0.7)) Vectors.sparse(100, Array(29), Array(0.9)) Vectors.sparse(100, Array(29), Array(0.8)) How to iterate an array in DataFrame? Thanks.
Spark ExternalTable doesn't recognize subdir
hi,all my issue is everyday I will receive some json datafile , I want to convert them to parquet file and save to hdfs, the floder will like this: /my_table_base_floder /my_table_base_floder/day_2 /my_table_base_floder/day_3 where the parquet files of "day_1" was store in /my_table_base_floder then I run : sqlContext.createExternalTable("tpc1.customer","hdfs://master1:9000/my_table_base_floder","parquet") but when I save parquet file to subdir ,for example : /my_table_base_floder/day_2 and refresh the metadata. spark doesn't recognize the data in subdir. How I can do it ? 2016-10-20 lk_spark
Re: Re: how to extract arraytype data to file
Thank you, all of you. explode() is helpful: df.selectExpr("explode(bizs) as e").select("e.*").show() 2016-10-19 lk_spark 发件人:Hyukjin Kwon <gurwls...@gmail.com> 发送时间:2016-10-19 13:16 主题:Re: how to extract arraytype data to file 收件人:"Divya Gehlot"<divya.htco...@gmail.com> 抄送:"lk_spark"<lk_sp...@163.com>,"user.spark"<user@spark.apache.org> This reminds me of https://github.com/databricks/spark-xml/issues/141#issuecomment-234835577 Maybe using explode() would be helpful. Thanks! 2016-10-19 14:05 GMT+09:00 Divya Gehlot <divya.htco...@gmail.com>: http://stackoverflow.com/questions/33864389/how-can-i-create-a-spark-dataframe-from-a-nested-array-of-struct-element Hope this helps Thanks, Divya On 19 October 2016 at 11:35, lk_spark <lk_sp...@163.com> wrote: hi,all: I want to read a json file and search it by sql . the data struct should be : bid: string (nullable = true) code: string (nullable = true) and the json file data should be like : {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"} {"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"} but in fact my json file data is : {"bizs":[ {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}]} {"bizs":[ {bid":"MzI4MTI5Mzcy00==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc201==","code":"西早君"}]} I load it by spark ,data schema shows like this : root |-- bizs: array (nullable = true) ||-- element: struct (containsNull = true) |||-- bid: string (nullable = true) |||-- code: string (nullable = true) I can select columns by : df.select("bizs.id","bizs.name") but the colume values is in array type: +++ | id|code| +++ |[4938200, 4938201...|[罗甸网警, 室内设计师杨焰红, ...| |[4938300, 4938301...|[SDCS十全九美, 旅梦长大, ...| |[4938400, 4938401...|[日重重工液压行走回转, 氧老家,...| |[4938500, 4938501...|[PABXSLZ, 陈少燕, 笑蜜...| |[4938600, 4938601...|[税海微云, 西域美农云家店, 福...| +++ what I want is I can read colum in normal row type. how I can do it ? 2016-10-19 lk_spark
how to extract arraytype data to file
hi,all: I want to read a json file and search it by sql . the data struct should be : bid: string (nullable = true) code: string (nullable = true) and the json file data should be like : {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"} {"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"} but in fact my json file data is : {"bizs":[ {bid":"MzI4MTI5MzcyNw==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc2Nw==","code":"西早君"}]} {"bizs":[ {bid":"MzI4MTI5Mzcy00==","code":"罗甸网警"},{"bid":"MzI3MzQ5Nzc201==","code":"西早君"}]} I load it by spark ,data schema shows like this : root |-- bizs: array (nullable = true) ||-- element: struct (containsNull = true) |||-- bid: string (nullable = true) |||-- code: string (nullable = true) I can select columns by : df.select("bizs.id","bizs.name") but the colume values is in array type: +++ | id|code| +++ |[4938200, 4938201...|[罗甸网警, 室内设计师杨焰红, ...| |[4938300, 4938301...|[SDCS十全九美, 旅梦长大, ...| |[4938400, 4938401...|[日重重工液压行走回转, 氧老家,...| |[4938500, 4938501...|[PABXSLZ, 陈少燕, 笑蜜...| |[4938600, 4938601...|[税海微云, 西域美农云家店, 福...| +++ what I want is I can read colum in normal row type. how I can do it ? 2016-10-19 lk_spark