回复:打印不同流怎么进行区分

2023-04-13 文章 17610775726
Hi



Print 方法是可以传入一个参数的,用来标识某个流,比如 print(“a”); print(“b");


Best
JasonLee


 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2023年04月14日 09:46 |
| 收件人 | user-zh |
| 主题 | 打印不同流怎么进行区分 |
你好,请问一下再一个程序中,有流与流之间的转换,比如说流A转换为流B,那我想看看流A,也想看看流B,请问我该怎么实现,直接print的话,再控制面板会乱掉


| |
小昌同学
|
|
ccc0606fight...@163.com
|

打印不同流怎么进行区分

2023-04-13 文章 小昌同学
你好,请问一下再一个程序中,有流与流之间的转换,比如说流A转换为流B,那我想看看流A,也想看看流B,请问我该怎么实现,直接print的话,再控制面板会乱掉


| |
小昌同学
|
|
ccc0606fight...@163.com
|

Re: 退订

2023-04-13 文章 Shammon FY
退订请发送任意邮件到 user-zh-unsubscr...@flink.apache.org
,可以参考[1]

[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

On Thu, Apr 13, 2023 at 9:53 PM lei-tian  wrote:

> 退订
>
>
>
> | |
> totorobabyf...@163.com
> |
> |
> 邮箱:totorobabyf...@163.com
> |


退订

2023-04-13 文章 lei-tian
退订



| |
totorobabyf...@163.com
|
|
邮箱:totorobabyf...@163.com
|

ogg json格式不支持database include和table include参数

2023-04-13 文章 casel.chen
多张oracle表变更同步到同一个kafka topic,现在实时flinlk作业需要消费其中一张oracle表,查了一下没看到类似canal json格式中 

canal-json.database.include 和 canal-json.table.include 参数,只在available 
metadata中看到 table 字段,这意味着我需要在select语句中按table字段进行过滤吗?


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/ogg/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/canal/

转发:

2023-04-13 文章 孟令平


发件人: Henry meng (孟令平)
发送时间: 2023年4月13日 15:27
收件人: 'user-zh-i...@flink.apache.org' 
主题:


public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
//env.enableCheckpointing(6);
  SingleOutputStreamOperator dataStream = env.addSource(new 
FlinkRocketMQConsumer("10.164.15.31:9876","flink_data"))
.uid("source-id");
  dataStream .print("**");
  DataStream datStream =dataStream.process(new MyFunction());
  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  datStream.print("=>");
  Table inputTable = 
tableEnv.fromDataStream(datStream,"devId,identifier,dataValue,dataTime,tenantId");//"devId,identifier,dataValue,dataTime,tenantId"
  tableEnv.createTemporaryView("InputTable", inputTable);
  Table resultTable = tableEnv.sqlQuery(
"select devId,'DAY_POWER_GENERATION' as identify, 
(MAX(dataValue)-Min(dataValue)) value1,MAX(dataTime) datatime, " +
  "tenantId from InputTable where 
identifier='TOTAL_POWER_GENERATION' group by devId,tenantId");
//resultTable.print();
  DataStream resultStream = tableEnv.toChangelogStream(resultTable);
  resultStream.print(">");
//tableEnv.createTemporaryView("resultTable", resultTable);

  tableEnv.executeSql("CREATE TABLE print_table (`DEV_ID` 
BIGINT,`IDENTIFIER` String," +
"`DATA_VALUE` Decimal(16,4),`DATA_TIME` TIMESTAMP,`TENANT_ID` 
BIGINT) " +
"WITH ('connector' = 'print')");
//tableEnv.executeSql("insert into print_table select 
devId,identifier,dataValue,dataTime,tenantId from resultTable");
  inputTable.executeInsert("print_table");
  env.execute("FlinkRocketMQConsumerDemo");}

如上代码所示,当我添加上env.execute()方法后,发现print_table不打印数据了这是什么原因?
StreamExecutionEnvironment.execute()和StreamTableEnvironment.executeSql()同时执行有什么问题吗?