Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 wangl...@geekplus.com.cn

抱歉,是我搞错了。
实际上是写入数据的。我在 windows 下做测试,刷新下文件的大小始终是 0 , 只有编辑看下那个文件显示的文件大小才会变更。



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-09 10:17
Receiver: user-zh
Subject: Re: Re: CsvTableSink 目录没有写入具体的数据
没数据是因为没有trigger执行,  参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html
)
 
// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalentlyStreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
// create a TableSinkTableSink sink = new
CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schemaString[] fieldNames =
{"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT,
Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable",
fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL
queriesTable result = ...// emit the result Table to the registered
TableSinkresult.insertInto("CsvSinkTable");
// execute the program
 
加上 tableEnv.execute();
 
 
wangl...@geekplus.com.cn  于2019年8月9日周五 上午9:42写道:
 
>
> 我接入了一个 RocketMQ 的流作为输入。
>
>
>  DataStream> ds = env.addSource(new
> RocketMQSource(
> 
> System.out.println(res);
> return res;
> }
> });
>
>
> tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
> pick_list_no, sku_code");
>
> TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
> String[] fieldNames = {"num"};
> TypeInformation[] fieldTypes = {Types.INT};
> tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
> csvSink);
> tableEnv.sqlUpdate(
>     "INSERT INTO RubberOrders SELECT pick_task_id FROM
> t_pick_task");
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Alec Chen
> Send Time: 2019-08-08 21:01
> Receiver: user-zh
> Subject: Re: CsvTableSink 目录没有写入具体的数据
> 完整代码发一下
>
> wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
>
> >
> > 我按官网上的
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> > 例子写的代码
> > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
>


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 Alec Chen
没数据是因为没有trigger执行,  参考sample code from doc(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html
)

// get a StreamTableEnvironment, works for BatchTableEnvironment
equivalentlyStreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
// create a TableSinkTableSink sink = new
CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schemaString[] fieldNames =
{"a", "b", "c"};TypeInformation[] fieldTypes = {Types.INT,
Types.STRING, Types.LONG};tableEnv.registerTableSink("CsvSinkTable",
fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL
queriesTable result = ...// emit the result Table to the registered
TableSinkresult.insertInto("CsvSinkTable");
// execute the program

加上 tableEnv.execute();


wangl...@geekplus.com.cn  于2019年8月9日周五 上午9:42写道:

>
> 我接入了一个 RocketMQ 的流作为输入。
>
>
>  DataStream> ds = env.addSource(new
> RocketMQSource(
> 
> System.out.println(res);
> return res;
> }
> });
>
>
> tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id,
> pick_list_no, sku_code");
>
> TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
> String[] fieldNames = {"num"};
> TypeInformation[] fieldTypes = {Types.INT};
> tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes,
> csvSink);
> tableEnv.sqlUpdate(
>     "INSERT INTO RubberOrders SELECT pick_task_id FROM
> t_pick_task");
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Alec Chen
> Send Time: 2019-08-08 21:01
> Receiver: user-zh
> Subject: Re: CsvTableSink 目录没有写入具体的数据
> 完整代码发一下
>
> wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
>
> >
> > 我按官网上的
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> > 例子写的代码
> > 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
>


Re: Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 wangl...@geekplus.com.cn
   
我接入了一个 RocketMQ 的流作为输入。


 DataStream> ds = env.addSource(new 
RocketMQSource(

System.out.println(res);
return res;
}
});


tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, 
pick_list_no, sku_code");

TableSink csvSink = new CsvTableSink("D:\\data\\flink",",");
String[] fieldNames = {"num"};
TypeInformation[] fieldTypes = {Types.INT};
tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, 
csvSink);
tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT pick_task_id FROM 
t_pick_task");



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-08 21:01
Receiver: user-zh
Subject: Re: CsvTableSink 目录没有写入具体的数据
完整代码发一下
 
wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:
 
>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> wangl...@geekplus.com.cn
>


Re: CsvTableSink 目录没有写入具体的数据

2019-08-08 文章 Alec Chen
完整代码发一下

wangl...@geekplus.com.cn  于2019年8月8日周四 下午7:37写道:

>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> wangl...@geekplus.com.cn
>