Hi,all:
近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary
key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法:
/**
*
Mysql upsert query use DUPLICATE KEY UPDATE.
*
*
<p>NOTE: It requires Mysql's primary key to be consistent with pkFields.
*
*
<p>We don't use REPLACE INTO, if there are other fields, we can keep their
previous values.
*/
@Override
public Optional<String> getUpsertStatement(String tableName, String[]
fieldNames, String[] uniqueKeyFields)
{
String
updateClause = Arrays.stream(fieldNames)
.map(f ->
quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
return Optional.of(getInsertIntoStatement(tableName,
fieldNames) +
"
ON DUPLICATE KEY UPDATE " + updateClause
);
}
该方法中有uniqueKeyFields参数但是没有用到,且我认为updateClause应该是用uniqueKeyFields生成, 代码改成:
String updateClause = Arrays.stream(uniqueKeyFields)
.map(f -> quoteIdentifier(f) + "=VALUES(" + quoteIdentifier(f) + ")")
.collect(Collectors.joining(", "));
麻烦各位大佬确认。