我接入了一个 RocketMQ 的流作为输入。
DataStream<Tuple3<Integer, String, String>> ds = env.addSource(new
RocketMQSource(
........
System.out.println(res);
return res;
}
});
tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
pick_list_no, sku_code");
TableSink<Row> csvSink = new CsvTableSink("D:\\data\\flink",",");
String[] fieldNames = {"num"};
TypeInformation[] fieldTypes = {Types.INT};
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
csvSink);
tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT pick_task_id FROM
t_pick_task");
[email protected]
Sender: Alec Chen
Send Time: 2019-08-08 21:01
Receiver: user-zh
Subject: Re: CsvTableSink 目录没有写入具体的数据
完整代码发一下
[email protected] <[email protected]> 于2019年8月8日周四 下午7:37写道:
>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> [email protected]
>