Hi,all:
       近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary 
key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法:


/**
*
 Mysql upsert query use DUPLICATE KEY UPDATE.
*
*
 <p>NOTE: It requires Mysql's primary key to be consistent with pkFields.
*
*
 <p>We don't use REPLACE INTO, if there are other fields, we can keep their 
previous values.
*/
@Override
public Optional<String> getUpsertStatement(String tableName, String[] 
fieldNames, String[] uniqueKeyFields)
 {
String
 updateClause = Arrays.stream(fieldNames)
.map(f ->
 quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
return Optional.of(getInsertIntoStatement(tableName,
 fieldNames) +
"
 ON DUPLICATE KEY UPDATE " + updateClause
);
}

该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成:

String updateClause = Arrays.stream(uniqueKeyFields)
   .map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
   .collect(Collectors.joining(", "));

麻烦各位大佬确认。

回复