HI,fulin 如 Yangze所说,这是es6 new connector 引入的一个bug, 你可以使用用old connector的语法绕过,就是connector.type=’xx’ ,这样代码路径还走之前的代码, 或者使用es7 nconnector。
祝好, Leonard Xu > 在 2020年7月13日,17:19,Yangze Guo <[email protected]> 写道: > > 验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。 > > [1] > https://github.com/apache/flink/blob/0fbea46ac0271dd84fa8acd7f99f449a9a0d458c/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java#L285 > > Best, > Yangze Guo > > On Mon, Jul 13, 2020 at 3:44 PM sunfulin <[email protected]> wrote: >> >> hi,YangZe,Leonard, >> 我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。 >> >> import org.apache.flink.api.common.typeinfo.Types; >> >> import org.apache.flink.streaming.api.datastream.DataStream; >> >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> >> import >> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; >> >> import org.apache.flink.table.api.EnvironmentSettings; >> >> import org.apache.flink.table.api.StatementSet; >> >> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >> >> import org.apache.flink.types.Row; >> >> >> import static org.apache.flink.table.api.Expressions.$; >> >> >> public class ESNewJobTest { >> >> >> //构建StreamExecutionEnvironment >> >> public static final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> >> //构建EnvironmentSettings 并指定Blink Planner >> >> private static final EnvironmentSettings bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> >> >> //构建StreamTableEnvironment >> >> public static final StreamTableEnvironment tEnv = >> StreamTableEnvironment.create(env, bsSettings); >> >> >> //DDL语句 >> >> public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE >> es_sink_test_no_pk (\n" + >> >> " idx integer,\n" + >> >> " firstx varchar\n" + >> >> ") WITH (\n" + >> >> "'connector' = 'elasticsearch-6',\n" + >> >> "'hosts' = '168.61.113.171:9200',\n" + >> >> "'index' = 'es_sink_test_no_pk',\n" + >> >> "'document-type' = 'default',\n" + >> >> "'document-id.key-delimiter' = '$',\n" + >> >> "'sink.bulk-flush.interval' = '1000',\n" + >> >> "'failure-handler' = 'fail',\n" + >> >> "'format' = 'json'\n" + >> >> ")"; >> >> public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE >> es_sink_test_with_pk (\n" + >> >> " idx integer,\n" + >> >> " firstx varchar,\n" + >> >> " primary key (idx, firstx) not enforced\n" + >> >> ") WITH (\n" + >> >> "'connector' = 'elasticsearch-6',\n" + >> >> "'hosts' = '168.61.113.171:9200',\n" + >> >> "'index' = 'es_sink_test_with_pk',\n" + >> >> "'document-type' = 'default',\n" + >> >> "'document-id.key-delimiter' = '$',\n" + >> >> "'sink.bulk-flush.interval' = '1000',\n" + >> >> "'failure-handler' = 'fail',\n" + >> >> "'format' = 'json'\n" + >> >> ")"; >> >> >> public static String getCharAndNumr(int length) { >> >> StringBuffer valSb = new StringBuffer(); >> >> for (int i = 0; i < length; i++) { >> >> String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : >> "num"; // 输出字母还是数字 >> >> if ("char".equalsIgnoreCase(charOrNum)) { >> >> // 字符串 >> >> int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97; >> // 取得大写字母还是小写字母 >> >> valSb.append((char) (choice + Math.round(Math.random()*25))); >> >> } else if ("num".equalsIgnoreCase(charOrNum)) { >> >> // 数字 >> >> valSb.append(String.valueOf(Math.round(Math.random()*9))); >> >> } >> >> } >> >> return valSb.toString(); >> >> >> } >> >> >> public static void main(String[] args) throws Exception { >> >> >> DataStream<Row> ds = env.addSource(new >> RichParallelSourceFunction<Row>() { >> >> >> volatile boolean flag = true; >> >> >> @Override >> >> public void run(SourceContext<Row> ctx) throws Exception { >> >> while (flag) { >> >> Row row = new Row(2); >> >> row.setField(0, 2207); >> >> row.setField(1, getCharAndNumr(4)); >> >> ctx.collect(row); >> >> Thread.sleep(1000); >> >> } >> >> >> } >> >> >> @Override >> >> public void cancel() { >> >> flag = false; >> >> } >> >> }).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING)); >> >> >> >> //ES sink测试ddl >> >> tEnv.executeSql(ES_SINK_DDL_NO_PK); >> >> tEnv.executeSql(ES_SINK_DDL_WITH_PK); >> >> >> //source注册成表 >> >> tEnv.createTemporaryView("test", ds, $("f0").as("idx"), >> $("f1").as("firstx"), $("p").proctime()); >> >> >> //sink写入 >> >> StatementSet ss = tEnv.createStatementSet(); >> >> ss.addInsertSql("insert into es_sink_test_no_pk select idx, firstx >> from test"); >> >> ss.addInsertSql("insert into es_sink_test_with_pk select idx, firstx >> from test"); >> >> ss.execute(); >> >> } >> >> } >> >> >> >> 在 2020-07-13 14:03:21,"Yangze Guo" <[email protected]> 写道: >>> INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。 >>> >>> Best, >>> Yangze Guo >>> >>> On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[email protected]> wrote: >>>> >>>> >>>> hi, Leonard >>>> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。 >>>> 我将DDL更换为之前版本的with参数(声明使用update-mode = >>>> ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。 >>>> >>>> @[email protected] >>>> 我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert? >>>> >>>> CREATE TABLE ES6_SENSORDATA_SERVER_API ( >>>> event varchar, >>>> user_id varchar, >>>> distinct_id varchar, >>>> _date varchar, >>>> _event_time varchar, >>>> recv_time varchar, >>>> code varchar, >>>> _current_project varchar, >>>> api varchar, >>>> elapsed int , >>>> `start` bigint, >>>> is_err int >>>> ) WITH ( >>>> 'connector' = 'elasticsearch-6', >>>> 'hosts' = '<ES_YUNTU.SERVERS>', >>>> 'index' = 'flink_sensordata_server_api', >>>> 'document-type' = 'default', >>>> 'document-id.key-delimiter' = '$', >>>> 'sink.bulk-flush.interval' = '1000', >>>> 'failure-handler' = 'fail', >>>> 'format' = 'json' >>>> ) >>>> >>>> >>>> >>>> INSERT INTO ES6_SENSORDATA_SERVER_API >>>> >>>> SELECT event, >>>> >>>> user_id, >>>> >>>> distinct_id, >>>> >>>> ts2Date(`time`, 'yyyy-MM-dd') as _date, >>>> >>>> ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time, >>>> >>>> ts2Date(recv_time, false, false) as recv_time, >>>> >>>> properties.code as code, >>>> >>>> properties.`project` as _current_project, >>>> >>>> properties.api as api, >>>> >>>> properties.elapsed as elapsed, >>>> >>>> properties.`start` as `start`, >>>> >>>> case when properties.code = '0' then 0 else 1 end as is_err >>>> >>>> FROM KafkaEventTopic >>>> >>>> where `type` in ('track') and event in ('serverApiReqEvt') >>>> >>>> >>>> 在 2020-07-13 13:44:29,"Leonard Xu" <[email protected]> 写道: >>>>> Hello, fulin >>>>> >>>>> 这个问题能提供段可以复现的代码吗? >>>>> >>>>> 祝好, >>>>> Leonard Xu >>>>> >>>>> >>>>>> 在 2020年7月13日,09:50,Yangze Guo <[email protected]> 写道: >>>>>> >>>>>> Hi, >>>>>> >>>>>> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2]. >>>>>> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。 >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102 >>>>>> [2] >>>>>> https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509 >>>>>> >>>>>> Best, >>>>>> Yangze Guo >>>>>> >>>>>> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[email protected]> wrote: >>>>>>> >>>>>>> hi, >>>>>>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary >>>>>>> key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary >>>>>>> key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: >>>>>>> 不确定是我配置使用的方式不对,还是确实存在bug。。 >>>>>>> >>>>>>> >>>>>>> CREATE TABLE ES6_SENSORDATA_OUTPUT ( >>>>>>> event varchar, >>>>>>> user_id varchar, >>>>>>> distinct_id varchar, >>>>>>> _date varchar, >>>>>>> _event_time varchar, >>>>>>> recv_time varchar, >>>>>>> _browser_version varchar, >>>>>>> path_name varchar, >>>>>>> _search varchar, >>>>>>> event_type varchar, >>>>>>> _current_project varchar, >>>>>>> message varchar, >>>>>>> stack varchar, >>>>>>> component_stack varchar, >>>>>>> _screen_width varchar, >>>>>>> _screen_height varchar >>>>>>> ) WITH ( >>>>>>> 'connector' = 'elasticsearch-6', >>>>>>> 'hosts' = '<ES_YUNTU.SERVERS>', >>>>>>> 'index' = 'flink_sensordata_target_event', >>>>>>> 'document-type' = 'default', >>>>>>> 'document-id.key-delimiter' = '$', >>>>>>> 'sink.bulk-flush.interval' = '1000', >>>>>>> 'failure-handler' = 'fail', >>>>>>> 'format' = 'json' >>>>>>> ) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling >>>> >>>> >>>> >>>> >> >> >> >>
