您好,我使用pyflink时的代码如下,有如下两个问题:
1.
source = st_env.from_path('source')
#st_env是StreamTableEnvironment,source是kafka源端
main_table = source.select(".......")
sub_table = source.select(".......")
main_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
最终发现只insert了main_table,请问遇见这种情况该怎么处理,就是如何插入多个表?
2.
for i in range(1,20):
sub_table = source.select("...%s...%d...."
%(str(i),i))#我的select语句是一个动态拼接的sql,在一个for循环中,获取多个sub_table
sub_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
#这种方式是否合理,因为也是测试无法插入多个表,有没有什么解决方案
以上两个问题希望您们能够给予解答!感谢!