2018yinjian commented on issue #6384:
URL: https://github.com/apache/kyuubi/issues/6384#issuecomment-2105581067

   df1 = spark.readStream \
        .format("kafka") \
        .options(**src_configs) \
        .load() \
        .selectExpr("offset",
                                "get_json_object(cast(value as string), 
'$.database') as database_name",
                                "get_json_object(cast(value as string), 
'$.table') as table_name",
                                "get_json_object(cast(value as string), 
'$.type') as operation_type",
                                "cast(get_json_object(cast(value as string), 
'$.isDdl') as boolean) as is_ddl",
                                "get_json_object(cast(value as string), 
'$.data') as new_data",
                                
"cast(from_unixtime(cast(get_json_object(cast(value as string), '$.es') as 
bigint)/1000 ) as timestamp) as canal_create_time"
                                )
   df = df1.filter("operation_type != 'DELETE' and is_ddl = false") \
        .selectExpr("offset", "database_name", "table_name", 
"canal_create_time",
                                "explode(from_json(new_data,'array<string>')) 
AS new_data",
                                )
   
   
   query = df.writeStream.queryName("ads-work-real-starrocks") \
           .foreachBatch(process_row) \
           .trigger(processingTime=str(src_process_time) + ' seconds') \
           .option("checkpointLocation", f"/tmp/checkpoint/{tar_tab}") \
           .start()
       query.awaitTermination()
        
   def process_row(df):
       df_extend1 = df.filter(
           "database_name = 'base_serv_work' and  
rlike(table_name,'serv_work_extend_[0,1,2,3,4,5,6,7]')") \
           .selectExpr("offset", "cast(get_json_object(new_data, 
'$.serv_work_id') as bigint) as id",
                       "cast(get_json_object(new_data, '$.source_plat') as int) 
as plat_source",
                       "cast(get_json_object(new_data, '$.work_plat') as int) 
as plat",
                       "cast(get_json_object(new_data, '$.biz_type') as int) as 
serv_cluster_type",
                       "cast(get_json_object(new_data, 
'$.engineer_supervisor_id') as int) as engineer_sup_id",
                       "cast(get_json_object(new_data, 
'$.liability_engineer_id') as int) as perf_engineer_id",
                       "cast(get_json_object(new_data, 
'$.source_cooperation_id') as int) as cooperation_id",
                       "cast(get_json_object(new_data, '$.duplicate') as int) 
as duplicate",
                       "cast(get_json_object(new_data, '$.test') as int) as 
test",
                       "cast(get_json_object(new_data, '$.receive_entrance_id') 
as int) as receive_entrance",
                       "cast(get_json_object(new_data, '$.exam_area') as int) 
as exam_area",
                       "cast(get_json_object(new_data, '$.deleted') as int) as 
deleted"
                       ) \
           .withColumn("num", F.expr("row_number() over(partition by id order 
by offset desc)"))
       df_extend2 = df_extend1.filter("num=1 and deleted=1").na.fill(0)
       # 来源合作关联合作维表取合作及渠道信息但要排除已经停用的帐号,关联工程师维度表取名称,关联入口取入口类型
       if not df_extend2.rdd.isEmpty():
           df_extend = df_extend2.alias("m").join(df_coop.alias("co1"), 
col("m.cooperation_id") == col("co1.cooperate_id"),"left") \
               .join(df_eng.alias("e"), col("m.perf_engineer_id") == 
col("e.engineer_id"), "left") \
               .join(df_gate.alias("g"), col("m.receive_entrance") == 
col("g.gate_id"), "left") \
               .selectExpr("m.*", "co1.channel_one_id", "co1.channel_two_id", 
"co1.channel_thr_id"
                           # 来源合并补充字段
                           , "co1.cooperate_name as cooperation_name", 
"co1.cooperate_one_id as cooperation_one_id"
                           , "co1.cooperate_manage_dept_one_id as 
cooperation_dept_one_id",
                           "co1.cooperate_manage_dept_two_id as 
cooperation_dept_two_id",
                           "co1.cooperate_manage_dept_thr_id as 
cooperation_dept_thr_id"
                           , "co1.cooperation_type", "co1.coop_brand_type as 
brand_type", "co1.coop_brand_id as brand_id"
                           , "e.real_name as perf_engineer_name", 
"cast(g.gate_type as int) as receive_entrance_type"
                           ,"cast(now() as bigint) as data_update_time"
                           )
           resultDF =  
df_extend.selectExpr("id","plat_source","plat","serv_cluster_type","engineer_sup_id","perf_engineer_id","perf_engineer_name"
                                          
,"duplicate","test","receive_entrance","receive_entrance_type","cooperation_id","cooperation_name","cooperation_one_id"
                                          
,"cooperation_dept_one_id","cooperation_dept_two_id","cooperation_dept_thr_id","cooperation_type","brand_type","brand_id"
                                          
,"channel_one_id","channel_two_id","channel_thr_id","exam_area","data_update_time")
           resultDF.write.format('starrocks').options(**tar_configs) \
               .option('starrocks.write.properties.partial_update','true') \
               
.option('starrocks.columns','''id,plat_source,plat,serv_cluster_type,engineer_sup_id,perf_engineer_id,perf_engineer_name,duplicate,test
                       
,receive_entrance,receive_entrance_type,cooperation_id,cooperation_name,cooperation_one_id,cooperation_dept_one_id,cooperation_dept_two_id
                       
,cooperation_dept_thr_id,cooperation_type,brand_type,brand_id,channel_one_id,channel_two_id,channel_thr_id,exam_area,data_update_time''')
 \
               .mode('append').save()
   通过structuredStreaming实时写starrocks频繁出现警告信息,警告日志如下
   24/05/10 23:26:08 WARN SparkSQLLineageParseHelper: Extract Statement[11677] 
columns lineage failed.
   java.util.NoSuchElementException: None.get
        at scala.None$.get(Option.scala:529) ~[scala-library-2.12.15.jar:?]
        at scala.None$.get(Option.scala:527) ~[scala-library-2.12.15.jar:?]
        at 
org.apache.kyuubi.plugin.lineage.helper.LineageParser.getV2TableName(SparkSQLLineageParseHelper.scala:493)
 ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?]
        at 
org.apache.kyuubi.plugin.lineage.helper.LineageParser.extractColumnsLineage(SparkSQLLineageParseHelper.scala:304)
 ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?]
        at 
org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse(SparkSQLLineageParseHelper.scala:54)
 ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?]
        at 
org.apache.kyuubi.plugin.lineage.helper.LineageParser.parse$(SparkSQLLineageParseHelper.scala:52)
 ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?]
        at 
org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.parse(SparkSQLLineageParseHelper.scala:510)
 ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?]
        at 
org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.$anonfun$transformToLineage$1(SparkSQLLineageParseHelper.scala:516)
 ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?]
        at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.15.jar:?]
        at 
org.apache.kyuubi.plugin.lineage.helper.SparkSQLLineageParseHelper.transformToLineage(SparkSQLLineageParseHelper.scala:516)
 ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?]
        at 
org.apache.kyuubi.plugin.lineage.SparkOperationLineageQueryExecutionListener.onSuccess(SparkOperationLineageQueryExecutionListener.scala:34)
 ~[kyuubi-spark-lineage_2.12-1.8.1-jar-with-dependencies.jar:?]
        at 
org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:165)
 ~[spark-sql_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.sql.util.ExecutionListenerBus.doPostEvent(QueryExecutionListener.scala:135)
 ~[spark-sql_2.12-3.3.1.jar:3.3.1]
        at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) 
~[spark-core_2.12-3.3.1.jar:3.3.1]
        at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) 
~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.sql.util.ExecutionListenerBus.postToAll(QueryExecutionListener.scala:135)
 ~[spark-sql_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.sql.util.ExecutionListenerBus.onOtherEvent(QueryExecutionListener.scala:147)
 ~[spark-sql_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:100)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117) 
~[spark-core_2.12-3.3.1.jar:3.3.1]
        at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101) 
~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) 
~[scala-library-2.12.15.jar:?]
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) 
~[scala-library-2.12.15.jar:?]
        at 
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1446) 
~[spark-core_2.12-3.3.1.jar:3.3.1]
        at 
org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
 ~[spark-core_2.12-3.3.1.jar:3.3.1]


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@kyuubi.apache.org
For additional commands, e-mail: notifications-h...@kyuubi.apache.org

Reply via email to