This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1f6ab4ce6e9 Refactor IngestDataChangeType to enum (#29519)
1f6ab4ce6e9 is described below
commit 1f6ab4ce6e96fc29ea3b85b99c57e7e123e75a61
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 23 22:12:26 2023 +0800
Refactor IngestDataChangeType to enum (#29519)
---
.../core/importer/sink/PipelineDataSourceSink.java | 8 +++----
.../pipeline/core/ingest/IngestDataChangeType.java | 21 ++---------------
.../memory/MultiplexMemoryPipelineChannel.java | 2 +-
.../pipeline/core/ingest/record/DataRecord.java | 6 ++---
.../ingest/record/group/DataRecordGroupEngine.java | 4 ++--
.../data/pipeline/core/job/JobOperationType.java | 2 +-
.../mysql/ingest/MySQLIncrementalDumper.java | 4 ++--
.../ingest/wal/decode/MppdbDecodingPlugin.java | 14 +++++++----
.../postgresql/ingest/wal/WALEventConverter.java | 2 +-
.../ingest/wal/decode/TestDecodingPlugin.java | 14 +++++++----
.../cdc/util/DataRecordResultConvertUtils.java | 27 +++++++++++++---------
.../cdc/util/DataRecordResultConvertUtilsTest.java | 3 ++-
.../core/importer/PipelineDataSourceSinkTest.java | 26 ++++++++++-----------
13 files changed, 67 insertions(+), 66 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 3fdc0c82e73..b2b875c55c3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -92,7 +92,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
int insertRecordNumber = 0;
for (DataRecord each : dataRecords) {
- if (IngestDataChangeType.INSERT.equals(each.getType())) {
+ if (IngestDataChangeType.INSERT == each.getType()) {
insertRecordNumber++;
}
}
@@ -136,19 +136,19 @@ public final class PipelineDataSourceSink implements
PipelineSink {
connection.setAutoCommit(false);
}
switch (buffer.get(0).getType()) {
- case IngestDataChangeType.INSERT:
+ case INSERT:
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(JobOperationType.INSERT,
1);
}
executeBatchInsert(connection, buffer);
break;
- case IngestDataChangeType.UPDATE:
+ case UPDATE:
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(JobOperationType.UPDATE,
1);
}
executeUpdate(connection, buffer);
break;
- case IngestDataChangeType.DELETE:
+ case DELETE:
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(JobOperationType.DELETE,
1);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/IngestDataChangeType.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/IngestDataChangeType.java
index 849a7829121..05c9ae3ca99 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/IngestDataChangeType.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/IngestDataChangeType.java
@@ -17,27 +17,10 @@
package org.apache.shardingsphere.data.pipeline.core.ingest;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
/**
* Ingest data change type.
*/
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class IngestDataChangeType {
-
- /**
- * Insert flag.
- */
- public static final String INSERT = "INSERT";
-
- /**
- * Update flag.
- */
- public static final String UPDATE = "UPDATE";
+public enum IngestDataChangeType {
- /**
- * Delete flag.
- */
- public static final String DELETE = "DELETE";
+ INSERT, UPDATE, DELETE
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index da247ee2b18..a4ccfea4e94 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -57,7 +57,7 @@ public final class MultiplexMemoryPipelineChannel implements
PipelineChannel {
pushRecord(firstRecord);
return;
}
- long insertDataRecordsCount =
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each
-> IngestDataChangeType.INSERT.equals(each.getType())).count();
+ long insertDataRecordsCount =
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each
-> IngestDataChangeType.INSERT == each.getType()).count();
if (insertDataRecordsCount == records.size()) {
channels.get(Math.abs(firstRecord.hashCode() %
channelNumber)).pushRecords(records);
return;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
index c347c8921e0..05168f929e8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
@@ -39,7 +39,7 @@ import java.util.List;
@ToString
public final class DataRecord extends Record {
- private final String type;
+ private final IngestDataChangeType type;
private final String schemaName;
@@ -55,11 +55,11 @@ public final class DataRecord extends Record {
private Long csn;
- public DataRecord(final String type, final String tableName, final
IngestPosition position, final int columnCount) {
+ public DataRecord(final IngestDataChangeType type, final String tableName,
final IngestPosition position, final int columnCount) {
this(type, null, tableName, position, columnCount);
}
- public DataRecord(final String type, final String schemaName, final String
tableName, final IngestPosition position, final int columnCount) {
+ public DataRecord(final IngestDataChangeType type, final String
schemaName, final String tableName, final IngestPosition position, final int
columnCount) {
super(position);
this.type = type;
this.schemaName = schemaName;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index cfc2cefc70b..f06abca65c9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -46,7 +46,7 @@ public final class DataRecordGroupEngine {
Map<Key, Boolean> duplicateKeys = getDuplicateKeys(records);
Collection<String> tableNames = new LinkedHashSet<>();
Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
- Map<String, Map<String, List<DataRecord>>> batchDataRecords = new
LinkedHashMap<>();
+ Map<String, Map<IngestDataChangeType, List<DataRecord>>>
batchDataRecords = new LinkedHashMap<>();
for (DataRecord each : records) {
tableNames.add(each.getTableName());
if (duplicateKeys.getOrDefault(each.getKey(), false)) {
@@ -68,7 +68,7 @@ public final class DataRecordGroupEngine {
return result;
}
- private GroupedDataRecord getGroupedDataRecord(final String tableName,
final Map<String, List<DataRecord>> batchRecords, final List<DataRecord>
nonBatchRecords) {
+ private GroupedDataRecord getGroupedDataRecord(final String tableName,
final Map<IngestDataChangeType, List<DataRecord>> batchRecords, final
List<DataRecord> nonBatchRecords) {
return new GroupedDataRecord(tableName,
batchRecords.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
batchRecords.getOrDefault(IngestDataChangeType.UPDATE,
Collections.emptyList()),
batchRecords.getOrDefault(IngestDataChangeType.DELETE,
Collections.emptyList()), nonBatchRecords);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobOperationType.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobOperationType.java
index 78b1eabd0db..c01d6c51d4f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobOperationType.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobOperationType.java
@@ -22,5 +22,5 @@ package org.apache.shardingsphere.data.pipeline.core.job;
*/
public enum JobOperationType {
- INSERT, DELETE, UPDATE, SELECT
+ INSERT, UPDATE, DELETE, SELECT
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index bb80230684d..32c7d96cd81 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -214,12 +214,12 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
return dataTypeHandler.isPresent() ?
dataTypeHandler.get().handle(value) : value;
}
- private DataRecord createDataRecord(final String type, final
AbstractRowsEvent rowsEvent, final int columnCount) {
+ private DataRecord createDataRecord(final IngestDataChangeType type, final
AbstractRowsEvent rowsEvent, final int columnCount) {
String tableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
IngestPosition position = new BinlogPosition(rowsEvent.getFileName(),
rowsEvent.getPosition(), rowsEvent.getServerId());
DataRecord result = new DataRecord(type, tableName, position,
columnCount);
result.setActualTableName(rowsEvent.getTableName());
- result.setCommitTime(rowsEvent.getTimestamp() * 1000);
+ result.setCommitTime(rowsEvent.getTimestamp() * 1000L);
return result;
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 34dbef71f30..eee5462af81 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -99,14 +99,20 @@ public final class MppdbDecodingPlugin implements
DecodingPlugin {
mppTableData = JsonUtils.fromJsonString(mppData, MppTableData.class);
AbstractRowEvent result;
String rowEventType = mppTableData.getOpType();
- switch (rowEventType) {
- case IngestDataChangeType.INSERT:
+ IngestDataChangeType type;
+ try {
+ type = IngestDataChangeType.valueOf(rowEventType);
+ } catch (final IllegalArgumentException ex) {
+ throw new IngestException("Unknown rowEventType: " + rowEventType);
+ }
+ switch (type) {
+ case INSERT:
result = readWriteRowEvent(mppTableData);
break;
- case IngestDataChangeType.UPDATE:
+ case UPDATE:
result = readUpdateRowEvent(mppTableData);
break;
- case IngestDataChangeType.DELETE:
+ case DELETE:
result = readDeleteRowEvent(mppTableData);
break;
default:
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index 7be2b111705..3dd023f14a0 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -117,7 +117,7 @@ public final class WALEventConverter {
return result;
}
- private DataRecord createDataRecord(final String type, final
AbstractRowEvent rowsEvent, final int columnCount) {
+ private DataRecord createDataRecord(final IngestDataChangeType type, final
AbstractRowEvent rowsEvent, final int columnCount) {
String tableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(),
tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
result.setActualTableName(rowsEvent.getTableName());
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index 12f62f55d14..6bafcc2df23 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -68,14 +68,20 @@ public final class TestDecodingPlugin implements
DecodingPlugin {
AbstractRowEvent result;
String tableName = readTableName(data);
String rowEventType = readRowEventType(data);
- switch (rowEventType) {
- case IngestDataChangeType.INSERT:
+ IngestDataChangeType type;
+ try {
+ type = IngestDataChangeType.valueOf(rowEventType);
+ } catch (final IllegalArgumentException ex) {
+ throw new IngestException("Unknown rowEventType: " + rowEventType);
+ }
+ switch (type) {
+ case INSERT:
result = readWriteRowEvent(data);
break;
- case IngestDataChangeType.UPDATE:
+ case UPDATE:
result = readUpdateRowEvent(data);
break;
- case IngestDataChangeType.DELETE:
+ case DELETE:
result = readDeleteRowEvent(data);
break;
default:
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
index f7de6cbaadd..bf6d8e38e3f 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
@@ -21,14 +21,14 @@ import com.google.common.base.Strings;
import com.google.protobuf.Any;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.DataChangeType;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.MetaData;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.TableColumn;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import java.util.LinkedList;
import java.util.List;
@@ -55,15 +55,20 @@ public final class DataRecordResultConvertUtils {
after.add(TableColumn.newBuilder().setName(column.getName()).setValue(Any.pack(ColumnValueConvertUtils.convertToProtobufMessage(column.getValue()))).build());
}
MetaData metaData =
MetaData.newBuilder().setDatabase(database).setSchema(Strings.nullToEmpty(schema)).setTable(dataRecord.getTableName()).build();
- DataChangeType dataChangeType = DataChangeType.UNKNOWN;
- if (IngestDataChangeType.INSERT.equals(dataRecord.getType())) {
- dataChangeType = DataChangeType.INSERT;
- } else if (IngestDataChangeType.UPDATE.equals(dataRecord.getType())) {
- dataChangeType = DataChangeType.UPDATE;
- } else if (IngestDataChangeType.DELETE.equals(dataRecord.getType())) {
- dataChangeType = DataChangeType.DELETE;
- }
return
DataRecordResult.Record.newBuilder().setMetaData(metaData).addAllBefore(before).addAllAfter(after).setTransactionCommitMillis(dataRecord.getCommitTime())
- .setDataChangeType(dataChangeType).build();
+
.setDataChangeType(getDataChangeType(dataRecord.getType())).build();
+ }
+
+ private static DataChangeType getDataChangeType(final IngestDataChangeType
dataChangeType) {
+ switch (dataChangeType) {
+ case INSERT:
+ return DataChangeType.INSERT;
+ case UPDATE:
+ return DataChangeType.UPDATE;
+ case DELETE:
+ return DataChangeType.DELETE;
+ default:
+ return DataChangeType.UNKNOWN;
+ }
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index 3f15c9e6938..6d83fd4b865 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -23,6 +23,7 @@ import com.google.protobuf.TimestampProto;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.WrappersProto;
import com.google.protobuf.util.JsonFormat;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
@@ -54,7 +55,7 @@ class DataRecordResultConvertUtilsTest {
@Test
void assertConvertDataRecordToRecord() throws
InvalidProtocolBufferException, SQLException {
- DataRecord dataRecord = new DataRecord("INSERT", "t_order", new
IntegerPrimaryKeyIngestPosition(0, 1), 2);
+ DataRecord dataRecord = new DataRecord(IngestDataChangeType.INSERT,
"t_order", new IntegerPrimaryKeyIngestPosition(0, 1), 2);
dataRecord.addColumn(new Column("order_id", BigInteger.ONE, false,
true));
dataRecord.addColumn(new Column("price", BigDecimal.valueOf(123),
false, false));
dataRecord.addColumn(new Column("user_id", Long.MAX_VALUE, false,
false));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 3fd011d49c2..6d76bb39f43 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -95,19 +95,19 @@ class PipelineDataSourceSinkTest {
@Test
void assertWriteInsertDataRecord() throws SQLException {
- DataRecord insertRecord = getDataRecord("INSERT");
+ DataRecord insertRecord = getDataRecord(IngestDataChangeType.INSERT);
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(), anyLong(),
any())).thenReturn(mockRecords(insertRecord));
importer.run();
verify(preparedStatement).setObject(1, 1);
verify(preparedStatement).setObject(2, 10);
- verify(preparedStatement).setObject(3, "INSERT");
+ verify(preparedStatement).setObject(3, IngestDataChangeType.INSERT);
verify(preparedStatement).addBatch();
}
@Test
void assertDeleteDataRecord() throws SQLException {
- DataRecord deleteRecord = getDataRecord("DELETE");
+ DataRecord deleteRecord = getDataRecord(IngestDataChangeType.DELETE);
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(), anyLong(),
any())).thenReturn(mockRecords(deleteRecord));
when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
@@ -119,12 +119,12 @@ class PipelineDataSourceSinkTest {
@Test
void assertUpdateDataRecord() throws SQLException {
- DataRecord updateRecord = getDataRecord("UPDATE");
+ DataRecord updateRecord = getDataRecord(IngestDataChangeType.UPDATE);
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(), anyLong(),
any())).thenReturn(mockRecords(updateRecord));
importer.run();
verify(preparedStatement).setObject(1, 20);
- verify(preparedStatement).setObject(2, "UPDATE");
+ verify(preparedStatement).setObject(2, IngestDataChangeType.UPDATE);
verify(preparedStatement).setObject(3, 1);
verify(preparedStatement).setObject(4, 10);
verify(preparedStatement).executeUpdate();
@@ -139,7 +139,7 @@ class PipelineDataSourceSinkTest {
InOrder inOrder = inOrder(preparedStatement);
inOrder.verify(preparedStatement).setObject(1, 2);
inOrder.verify(preparedStatement).setObject(2, 10);
- inOrder.verify(preparedStatement).setObject(3, "UPDATE");
+ inOrder.verify(preparedStatement).setObject(3,
IngestDataChangeType.UPDATE);
inOrder.verify(preparedStatement).setObject(4, 1);
inOrder.verify(preparedStatement).setObject(5, 0);
inOrder.verify(preparedStatement).executeUpdate();
@@ -149,7 +149,7 @@ class PipelineDataSourceSinkTest {
DataRecord result = new DataRecord(IngestDataChangeType.UPDATE,
TABLE_NAME, new IngestPlaceholderPosition(), 3);
result.addColumn(new Column("id", 1, 2, true, true));
result.addColumn(new Column("user", 0, 10, true, false));
- result.addColumn(new Column("status", null, "UPDATE", true, false));
+ result.addColumn(new Column("status", null,
IngestDataChangeType.UPDATE, true, false));
return result;
}
@@ -160,26 +160,26 @@ class PipelineDataSourceSinkTest {
return result;
}
- private DataRecord getDataRecord(final String recordType) {
+ private DataRecord getDataRecord(final IngestDataChangeType recordType) {
Integer idOldValue = null;
Integer userOldValue = null;
Integer idValue = null;
Integer userValue = null;
- String statusOldValue = null;
- String statusValue = null;
- if ("INSERT".equals(recordType)) {
+ IngestDataChangeType statusOldValue = null;
+ IngestDataChangeType statusValue = null;
+ if (IngestDataChangeType.INSERT == recordType) {
idValue = 1;
userValue = 10;
statusValue = recordType;
}
- if ("UPDATE".equals(recordType)) {
+ if (IngestDataChangeType.UPDATE == recordType) {
idOldValue = 1;
idValue = idOldValue;
userOldValue = 10;
userValue = 20;
statusValue = recordType;
}
- if ("DELETE".equals(recordType)) {
+ if (IngestDataChangeType.DELETE == recordType) {
idOldValue = 1;
userOldValue = 10;
statusOldValue = recordType;