2018yinjian commented on issue #6384: URL: https://github.com/apache/kyuubi/issues/6384#issuecomment-2105581067
df1 = spark.readStream \ .format("kafka") \ .options(**src_configs) \ .load() \ .selectExpr("offset", "get_json_object(cast(value as string), '$.database') as database_name", "get_json_object(cast(value as string), '$.table') as table_name", "get_json_object(cast(value as string), '$.type') as operation_type", "cast(get_json_object(cast(value as string), '$.isDdl') as boolean) as is_ddl", "get_json_object(cast(value as string), '$.data') as new_data", "cast(from_unixtime(cast(get_json_object(cast(value as string), '$.es') as bigint)/1000 ) as timestamp) as canal_create_time" ) df = df1.filter("operation_type != 'DELETE' and is_ddl = false") \ .selectExpr("offset", "database_name", "table_name", "canal_create_time", "explode(from_json(new_data,'array<string>')) AS new_data", ) query = df.writeStream.queryName("ads-work-real-starrocks") \ .foreachBatch(process_row) \ .trigger(processingTime=str(src_process_time) + ' seconds') \ .option("checkpointLocation", f"/tmp/checkpoint/{tar_tab}") \ .start() query.awaitTermination() def process_row(df): df_extend1 = df.filter( "database_name = 'base_serv_work' and rlike(table_name,'serv_work_extend_[0,1,2,3,4,5,6,7]')") \ .selectExpr("offset", "cast(get_json_object(new_data, '$.serv_work_id') as bigint) as id", "cast(get_json_object(new_data, '$.source_plat') as int) as plat_source", "cast(get_json_object(new_data, '$.work_plat') as int) as plat", "cast(get_json_object(new_data, '$.biz_type') as int) as serv_cluster_type", "cast(get_json_object(new_data, '$.engineer_supervisor_id') as int) as engineer_sup_id", "cast(get_json_object(new_data, '$.liability_engineer_id') as int) as perf_engineer_id", "cast(get_json_object(new_data, '$.source_cooperation_id') as int) as cooperation_id", "cast(get_json_object(new_data, '$.duplicate') as int) as duplicate", "cast(get_json_object(new_data, '$.test') as int) as test", "cast(get_json_object(new_data, '$.receive_entrance_id') as int) as receive_entrance", "cast(get_json_object(new_data, '$.exam_area') as int) as exam_area", "cast(get_json_object(new_data, '$.deleted') as int) as deleted" ) \ .withColumn("num", F.expr("row_number() over(partition by id order by offset desc)")) df_extend2 = df_extend1.filter("num=1 and deleted=1").na.fill(0) # 来源合作关联合作维表取合作及渠道信息但要排除已经停用的帐号,关联工程师维度表取名称,关联入口取入口类型 if not df_extend2.rdd.isEmpty(): df_extend = df_extend2.alias("m").join(df_coop.alias("co1"), col("m.cooperation_id") == col("co1.cooperate_id"),"left") \ .join(df_eng.alias("e"), col("m.perf_engineer_id") == col("e.engineer_id"), "left") \ .join(df_gate.alias("g"), col("m.receive_entrance") == col("g.gate_id"), "left") \ .selectExpr("m.*", "co1.channel_one_id", "co1.channel_two_id", "co1.channel_thr_id" # 来源合并补充字段 , "co1.cooperate_name as cooperation_name", "co1.cooperate_one_id as cooperation_one_id" , "co1.cooperate_manage_dept_one_id as cooperation_dept_one_id", "co1.cooperate_manage_dept_two_id as cooperation_dept_two_id", "co1.cooperate_manage_dept_thr_id as cooperation_dept_thr_id" , "co1.cooperation_type", "co1.coop_brand_type as brand_type", "co1.coop_brand_id as brand_id" , "e.real_name as perf_engineer_name", "cast(g.gate_type as int) as receive_entrance_type" ,"cast(now() as bigint) as data_update_time" ) resultDF = df_extend.selectExpr("id","plat_source","plat","serv_cluster_type","engineer_sup_id","perf_engineer_id","perf_engineer_name" ,"duplicate","test","receive_entrance","receive_entrance_type","cooperation_id","cooperation_name","cooperation_one_id" ,"cooperation_dept_one_id","cooperation_dept_two_id","cooperation_dept_thr_id","cooperation_type","brand_type","brand_id" ,"channel_one_id","channel_two_id","channel_thr_id","exam_area","data_update_time") resultDF.write.format('starrocks').options(**tar_configs) \ .option('starrocks.write.properties.partial_update','true') \ .option('starrocks.columns','''id,plat_source,plat,serv_cluster_type,engineer_sup_id,perf_engineer_id,perf_engineer_name,duplicate,test ,receive_entrance,receive_entrance_type,cooperation_id,cooperation_name,cooperation_one_id,cooperation_dept_one_id,cooperation_dept_two_id ,cooperation_dept_thr_id,cooperation_type,brand_type,brand_id,channel_one_id,channel_two_id,channel_thr_id,exam_area,data_update_time''') \ .mode('append').save() 通过structuredStreaming实时写starrocks频繁出现警告信息,警告日志如下 24/05/10 23:26:08 WARN SparkSQLLineageParseHelper: Extract Statement[11677] columns lineage failed. java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) ~[scala-library-2.12.15.jar:?] at scala.None$.get(Option.scala:527) ~[scala-library-2.12.15.jar:?] at org.apache.kyuubi.plugin.lineage.helper.LineageParser.getV2TableName(SparkSQLLineageParseHelper.scala:493) ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?] at org.apache.kyuubi.plugin.lineage.helper.LineageParser.extractColumnsLineage(SparkSQLLineageParseHelper.scala:304) ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?] at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse(SparkSQLLineageParseHelper.scala:54) ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?] at org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse$(SparkSQLLineageParseHelper.scala:52) ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?] at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.parse(SparkSQLLineageParseHelper.scala:510) ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?] at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.$anonfun$transformToLineage$1(SparkSQLLineageParseHelper.scala:516) ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?] at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.15.jar:?] at org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.transformToLineage(SparkSQLLineageParseHelper.scala:516) ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?] at org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener.onSuccess(SparkOperationLineageQueryExecutionListener.scala:34) ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?] at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:165) ~[spark-sql_2.12-3.3.1.jar:3.3.1] at org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:135) ~[spark-sql_2.12-3.3.1.jar:3.3.1] at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:135) ~[spark-sql_2.12-3.3.1.jar:3.3.1] at org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:147) ~[spark-sql_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105) ~[spark-core_2.12-3.3.1.jar:3.3.1] at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) ~[scala-library-2.12.15.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) ~[scala-library-2.12.15.jar:?] at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446) ~[spark-core_2.12-3.3.1.jar:3.3.1] at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96) ~[spark-core_2.12-3.3.1.jar:3.3.1] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@kyuubi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@kyuubi.apache.org For additional commands, e-mail: notifications-h...@kyuubi.apache.org