sandynz commented on code in PR #24440:
URL: https://github.com/apache/shardingsphere/pull/24440#discussion_r1136928007
##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/DataSourceImporter.java:
##########
@@ -98,19 +128,63 @@ public void write(final Record record) throws Exception {
}
}
- private Optional<String> buildSQL(final Record record) {
+ private String buildKey(final String schema, final String tableName) {
+ return schema.isEmpty() ? tableName : String.join(".", schema,
tableName);
+ }
+
+ private StandardTableMetaData loadTableMetaData(final String schema, final
String tableName) {
+ StandardTableMetaData result = tableMetaDataMap.get(buildKey(schema,
tableName));
+ if (null != result) {
+ return result;
+ }
+ result = loader.getTableMetaData(Strings.emptyToNull(schema),
tableName);
+ tableMetaDataMap.put(buildKey(schema, tableName), result);
+ return result;
+ }
+
+ private Optional<String> buildSQL(final Record record, final List<String>
uniqueKeyNames) {
switch (record.getDataChangeType()) {
case INSERT:
- return Optional.ofNullable(sqlBuilder.buildInsertSQL(record));
+ return Optional.ofNullable(sqlBuilder.buildInsertSQL(record,
uniqueKeyNames));
case UPDATE:
- return Optional.ofNullable(sqlBuilder.buildUpdateSQL(record));
+ return Optional.ofNullable(sqlBuilder.buildUpdateSQL(record,
uniqueKeyNames));
case DELETE:
- return Optional.ofNullable(sqlBuilder.buildDeleteSQL(record));
+ return Optional.ofNullable(sqlBuilder.buildDeleteSQL(record,
uniqueKeyNames));
default:
return Optional.empty();
}
}
+ private Object convertValueFromAny(final StandardTableMetaData
tableMetaData, final TableColumn tableColumn) {
+ StandardColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(tableColumn.getName());
+ Object object;
+ try {
+ object = AnyValueConvert.convertToObject(tableColumn.getValue());
Review Comment:
`AnyValueConvert` could be `ProtobufAnyValueConverter`
##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/DataSourceImporter.java:
##########
@@ -98,19 +128,63 @@ public void write(final Record record) throws Exception {
}
}
- private Optional<String> buildSQL(final Record record) {
+ private String buildKey(final String schema, final String tableName) {
+ return schema.isEmpty() ? tableName : String.join(".", schema,
tableName);
+ }
+
+ private StandardTableMetaData loadTableMetaData(final String schema, final
String tableName) {
+ StandardTableMetaData result = tableMetaDataMap.get(buildKey(schema,
tableName));
+ if (null != result) {
+ return result;
+ }
+ result = loader.getTableMetaData(Strings.emptyToNull(schema),
tableName);
+ tableMetaDataMap.put(buildKey(schema, tableName), result);
+ return result;
+ }
+
+ private Optional<String> buildSQL(final Record record, final List<String>
uniqueKeyNames) {
switch (record.getDataChangeType()) {
case INSERT:
- return Optional.ofNullable(sqlBuilder.buildInsertSQL(record));
+ return Optional.ofNullable(sqlBuilder.buildInsertSQL(record,
uniqueKeyNames));
case UPDATE:
- return Optional.ofNullable(sqlBuilder.buildUpdateSQL(record));
+ return Optional.ofNullable(sqlBuilder.buildUpdateSQL(record,
uniqueKeyNames));
case DELETE:
- return Optional.ofNullable(sqlBuilder.buildDeleteSQL(record));
+ return Optional.ofNullable(sqlBuilder.buildDeleteSQL(record,
uniqueKeyNames));
default:
return Optional.empty();
}
}
+ private Object convertValueFromAny(final StandardTableMetaData
tableMetaData, final TableColumn tableColumn) {
+ StandardColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(tableColumn.getName());
+ Object object;
Review Comment:
`object` could be `result`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]