HI,fulin

如 Yangze所说,这是es6 new connector 引入的一个bug,  你可以使用用old 
connector的语法绕过,就是connector.type=’xx’ ,这样代码路径还走之前的代码, 或者使用es7 nconnector。

祝好,
Leonard Xu

> 在 2020年7月13日,17:19,Yangze Guo <[email protected]> 写道:
> 
> 验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。
> 
> [1] 
> https://github.com/apache/flink/blob/0fbea46ac0271dd84fa8acd7f99f449a9a0d458c/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java#L285
> 
> Best,
> Yangze Guo
> 
> On Mon, Jul 13, 2020 at 3:44 PM sunfulin <[email protected]> wrote:
>> 
>> hi,YangZe,Leonard,
>> 我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。
>> 
>> import org.apache.flink.api.common.typeinfo.Types;
>> 
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> 
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> 
>> import 
>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>> 
>> import org.apache.flink.table.api.EnvironmentSettings;
>> 
>> import org.apache.flink.table.api.StatementSet;
>> 
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> 
>> import org.apache.flink.types.Row;
>> 
>> 
>> import static org.apache.flink.table.api.Expressions.$;
>> 
>> 
>> public class ESNewJobTest {
>> 
>> 
>>    //构建StreamExecutionEnvironment
>> 
>>    public static final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 
>>    //构建EnvironmentSettings 并指定Blink Planner
>> 
>>    private static final EnvironmentSettings bsSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> 
>> 
>>    //构建StreamTableEnvironment
>> 
>>    public static final StreamTableEnvironment tEnv = 
>> StreamTableEnvironment.create(env, bsSettings);
>> 
>> 
>>    //DDL语句
>> 
>>    public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE 
>> es_sink_test_no_pk (\n" +
>> 
>>            "  idx integer,\n" +
>> 
>>            "  firstx varchar\n" +
>> 
>>            ") WITH (\n" +
>> 
>>            "'connector' = 'elasticsearch-6',\n" +
>> 
>>            "'hosts' = '168.61.113.171:9200',\n" +
>> 
>>            "'index' = 'es_sink_test_no_pk',\n" +
>> 
>>            "'document-type' = 'default',\n" +
>> 
>>            "'document-id.key-delimiter' = '$',\n" +
>> 
>>            "'sink.bulk-flush.interval' = '1000',\n" +
>> 
>>            "'failure-handler' = 'fail',\n" +
>> 
>>            "'format' = 'json'\n" +
>> 
>>            ")";
>> 
>>    public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE 
>> es_sink_test_with_pk (\n" +
>> 
>>            "  idx integer,\n" +
>> 
>>            "  firstx varchar,\n" +
>> 
>>            "  primary key (idx, firstx) not enforced\n" +
>> 
>>            ") WITH (\n" +
>> 
>>            "'connector' = 'elasticsearch-6',\n" +
>> 
>>            "'hosts' = '168.61.113.171:9200',\n" +
>> 
>>            "'index' = 'es_sink_test_with_pk',\n" +
>> 
>>            "'document-type' = 'default',\n" +
>> 
>>            "'document-id.key-delimiter' = '$',\n" +
>> 
>>            "'sink.bulk-flush.interval' = '1000',\n" +
>> 
>>            "'failure-handler' = 'fail',\n" +
>> 
>>            "'format' = 'json'\n" +
>> 
>>            ")";
>> 
>> 
>>    public static String getCharAndNumr(int length) {
>> 
>>        StringBuffer valSb = new StringBuffer();
>> 
>>        for (int i = 0; i < length; i++) {
>> 
>>            String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : 
>> "num"; // 输出字母还是数字
>> 
>>            if ("char".equalsIgnoreCase(charOrNum)) {
>> 
>>                // 字符串
>> 
>>                int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97;  
>> // 取得大写字母还是小写字母
>> 
>>                valSb.append((char) (choice + Math.round(Math.random()*25)));
>> 
>>            } else if ("num".equalsIgnoreCase(charOrNum)) {
>> 
>>                // 数字
>> 
>>                valSb.append(String.valueOf(Math.round(Math.random()*9)));
>> 
>>            }
>> 
>>        }
>> 
>>        return valSb.toString();
>> 
>> 
>>    }
>> 
>> 
>>    public static void main(String[] args) throws Exception {
>> 
>> 
>>        DataStream<Row> ds = env.addSource(new 
>> RichParallelSourceFunction<Row>() {
>> 
>> 
>>            volatile boolean flag = true;
>> 
>> 
>>            @Override
>> 
>>            public void run(SourceContext<Row> ctx) throws Exception {
>> 
>>                while (flag) {
>> 
>>                    Row row = new Row(2);
>> 
>>                    row.setField(0, 2207);
>> 
>>                    row.setField(1, getCharAndNumr(4));
>> 
>>                    ctx.collect(row);
>> 
>>                    Thread.sleep(1000);
>> 
>>                }
>> 
>> 
>>            }
>> 
>> 
>>            @Override
>> 
>>            public void cancel() {
>> 
>>                flag = false;
>> 
>>            }
>> 
>>        }).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING));
>> 
>> 
>> 
>>        //ES sink测试ddl
>> 
>>        tEnv.executeSql(ES_SINK_DDL_NO_PK);
>> 
>>        tEnv.executeSql(ES_SINK_DDL_WITH_PK);
>> 
>> 
>>        //source注册成表
>> 
>>        tEnv.createTemporaryView("test", ds, $("f0").as("idx"), 
>> $("f1").as("firstx"), $("p").proctime());
>> 
>> 
>>        //sink写入
>> 
>>        StatementSet ss = tEnv.createStatementSet();
>> 
>>        ss.addInsertSql("insert into es_sink_test_no_pk select idx, firstx 
>> from test");
>> 
>>        ss.addInsertSql("insert into es_sink_test_with_pk select idx, firstx 
>> from test");
>> 
>>        ss.execute();
>> 
>>    }
>> 
>> }
>> 
>> 
>> 
>> 在 2020-07-13 14:03:21,"Yangze Guo" <[email protected]> 写道:
>>> INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。
>>> 
>>> Best,
>>> Yangze Guo
>>> 
>>> On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[email protected]> wrote:
>>>> 
>>>> 
>>>> hi, Leonard
>>>> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。
>>>> 我将DDL更换为之前版本的with参数(声明使用update-mode = 
>>>> ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。
>>>> 
>>>> @[email protected]  
>>>> 我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert?
>>>> 
>>>> CREATE TABLE ES6_SENSORDATA_SERVER_API (
>>>>  event varchar,
>>>>  user_id varchar,
>>>>  distinct_id varchar,
>>>>  _date varchar,
>>>>  _event_time varchar,
>>>>  recv_time varchar,
>>>>  code varchar,
>>>>  _current_project varchar,
>>>>  api varchar,
>>>>  elapsed int ,
>>>>  `start` bigint,
>>>>  is_err int
>>>> ) WITH (
>>>> 'connector' = 'elasticsearch-6',
>>>> 'hosts' = '<ES_YUNTU.SERVERS>',
>>>> 'index' = 'flink_sensordata_server_api',
>>>> 'document-type' = 'default',
>>>> 'document-id.key-delimiter' = '$',
>>>> 'sink.bulk-flush.interval' = '1000',
>>>> 'failure-handler' = 'fail',
>>>> 'format' = 'json'
>>>> )
>>>> 
>>>> 
>>>> 
>>>> INSERT INTO ES6_SENSORDATA_SERVER_API
>>>> 
>>>> SELECT event,
>>>> 
>>>>       user_id,
>>>> 
>>>>       distinct_id,
>>>> 
>>>>       ts2Date(`time`, 'yyyy-MM-dd') as _date,
>>>> 
>>>>       ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time,
>>>> 
>>>>       ts2Date(recv_time, false, false) as recv_time,
>>>> 
>>>>       properties.code as code,
>>>> 
>>>>       properties.`project` as _current_project,
>>>> 
>>>>       properties.api as api,
>>>> 
>>>>       properties.elapsed as elapsed,
>>>> 
>>>>       properties.`start` as `start`,
>>>> 
>>>>       case when properties.code = '0' then 0 else 1 end as is_err
>>>> 
>>>> FROM KafkaEventTopic
>>>> 
>>>> where `type` in ('track') and event in ('serverApiReqEvt')
>>>> 
>>>> 
>>>> 在 2020-07-13 13:44:29,"Leonard Xu" <[email protected]> 写道:
>>>>> Hello, fulin
>>>>> 
>>>>> 这个问题能提供段可以复现的代码吗?
>>>>> 
>>>>> 祝好,
>>>>> Leonard Xu
>>>>> 
>>>>> 
>>>>>> 在 2020年7月13日,09:50,Yangze Guo <[email protected]> 写道:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
>>>>>> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
>>>>>> 
>>>>>> [1] 
>>>>>> https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102
>>>>>> [2] 
>>>>>> https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509
>>>>>> 
>>>>>> Best,
>>>>>> Yangze Guo
>>>>>> 
>>>>>> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[email protected]> wrote:
>>>>>>> 
>>>>>>> hi,
>>>>>>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary 
>>>>>>> key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary
>>>>>>>  key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
>>>>>>> 不确定是我配置使用的方式不对,还是确实存在bug。。
>>>>>>> 
>>>>>>> 
>>>>>>> CREATE TABLE ES6_SENSORDATA_OUTPUT (
>>>>>>> event varchar,
>>>>>>> user_id varchar,
>>>>>>> distinct_id varchar,
>>>>>>> _date varchar,
>>>>>>> _event_time varchar,
>>>>>>> recv_time varchar,
>>>>>>> _browser_version varchar,
>>>>>>> path_name varchar,
>>>>>>> _search varchar,
>>>>>>> event_type varchar,
>>>>>>> _current_project varchar,
>>>>>>> message varchar,
>>>>>>> stack varchar,
>>>>>>> component_stack varchar,
>>>>>>> _screen_width varchar,
>>>>>>> _screen_height varchar
>>>>>>> ) WITH (
>>>>>>> 'connector' = 'elasticsearch-6',
>>>>>>> 'hosts' = '<ES_YUNTU.SERVERS>',
>>>>>>> 'index' = 'flink_sensordata_target_event',
>>>>>>> 'document-type' = 'default',
>>>>>>> 'document-id.key-delimiter' = '$',
>>>>>>> 'sink.bulk-flush.interval' = '1000',
>>>>>>> 'failure-handler' = 'fail',
>>>>>>> 'format' = 'json'
>>>>>>> )
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling
>>>> 
>>>> 
>>>> 
>>>> 
>> 
>> 
>> 
>> 

回复