你的SQL语句语法有误,请参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/elasticsearch.html
希望能帮助到你!
发件人: 小墨鱼
发送时间: 2020-12-11 14:46
收件人: user-zh
主题: flink无法写入数据到ES中
我在使用Flink写入数据到ES中,程序可以执行成功但是ES中没有数据,而且没有任何报错信息我首先创建了一个sink的es表String sql =
"CREATE TABLE es_sink (\n" + "uid INT,\n" +
"appid INT,\n" + "prepage_id INT,\n" +
"page_id INT,\n" + "action_id STRING,\n" +
"page_name STRING,\n" + "action_name STRING,\n" +
"prepage_name STRING,\n" + "stat_time BIGINT,\n" +
"dt DATE,\n" + "PRIMARY KEY (uid) NOT ENFORCED\n" +
") WITH (\n" + "'connector.type' = 'elasticsearch',\n" +
"'connector.version' = '6',\n" + "'connector.hosts' =
'http://localhost:9200',\n" + "'connector.index' =
'mytest',\n" + "'connector.document-type' = 'user_action',\n"
+ "'update-mode' = 'append',\n" +
"'connector.key-null-literal' = 'n/a',\n" +
"'connector.bulk-flush.max-actions' = '1',\n" +
"'format.type' = 'json'\n" + ")";并通过下面查询出数据String sql =
"select 1 as uid,2 as appid,3 as prepage_id,4 as page_id,'5' as
action_id,'6' as page_name,'7' as action_name,'8' as prepage_name,cast(9 as
bigint) as stat_time, cast('2020-11-11' as date) as dt from student limit
1";我的flink版本是1.11.1,es版本是6.2.2有遇到的朋友可以帮助我看一下
--
Sent from: http://apache-flink.147419.n8.nabble.com/