INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。

Best,
Yangze Guo

On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[email protected]> wrote:
>
>
> hi, Leonard
> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。
> 我将DDL更换为之前版本的with参数(声明使用update-mode = 
> ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。
>
>  @[email protected]  我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert?
>
> CREATE TABLE ES6_SENSORDATA_SERVER_API (
>   event varchar,
>   user_id varchar,
>   distinct_id varchar,
>   _date varchar,
>   _event_time varchar,
>   recv_time varchar,
>   code varchar,
>   _current_project varchar,
>   api varchar,
>   elapsed int ,
>   `start` bigint,
>   is_err int
> ) WITH (
> 'connector' = 'elasticsearch-6',
> 'hosts' = '<ES_YUNTU.SERVERS>',
> 'index' = 'flink_sensordata_server_api',
> 'document-type' = 'default',
> 'document-id.key-delimiter' = '$',
> 'sink.bulk-flush.interval' = '1000',
> 'failure-handler' = 'fail',
> 'format' = 'json'
> )
>
>
>
> INSERT INTO ES6_SENSORDATA_SERVER_API
>
> SELECT event,
>
>        user_id,
>
>        distinct_id,
>
>        ts2Date(`time`, 'yyyy-MM-dd') as _date,
>
>        ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time,
>
>        ts2Date(recv_time, false, false) as recv_time,
>
>        properties.code as code,
>
>        properties.`project` as _current_project,
>
>        properties.api as api,
>
>        properties.elapsed as elapsed,
>
>        properties.`start` as `start`,
>
>        case when properties.code = '0' then 0 else 1 end as is_err
>
> FROM KafkaEventTopic
>
> where `type` in ('track') and event in ('serverApiReqEvt')
>
>
> 在 2020-07-13 13:44:29,"Leonard Xu" <[email protected]> 写道:
> >Hello, fulin
> >
> >这个问题能提供段可以复现的代码吗?
> >
> >祝好,
> >Leonard Xu
> >
> >
> >> 在 2020年7月13日,09:50,Yangze Guo <[email protected]> 写道:
> >>
> >> Hi,
> >>
> >> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
> >> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
> >>
> >> [1] 
> >> https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102
> >> [2] 
> >> https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[email protected]> wrote:
> >>>
> >>> hi,
> >>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary 
> >>> key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary
> >>>  key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
> >>> 不确定是我配置使用的方式不对,还是确实存在bug。。
> >>>
> >>>
> >>> CREATE TABLE ES6_SENSORDATA_OUTPUT (
> >>> event varchar,
> >>> user_id varchar,
> >>> distinct_id varchar,
> >>> _date varchar,
> >>> _event_time varchar,
> >>> recv_time varchar,
> >>> _browser_version varchar,
> >>> path_name varchar,
> >>> _search varchar,
> >>> event_type varchar,
> >>> _current_project varchar,
> >>> message varchar,
> >>> stack varchar,
> >>> component_stack varchar,
> >>> _screen_width varchar,
> >>> _screen_height varchar
> >>> ) WITH (
> >>> 'connector' = 'elasticsearch-6',
> >>> 'hosts' = '<ES_YUNTU.SERVERS>',
> >>> 'index' = 'flink_sensordata_target_event',
> >>> 'document-type' = 'default',
> >>> 'document-id.key-delimiter' = '$',
> >>> 'sink.bulk-flush.interval' = '1000',
> >>> 'failure-handler' = 'fail',
> >>> 'format' = 'json'
> >>> )
> >>>
> >>>
> >>>
> >>>
> >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling
>
>
>
>

回复