This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f6ab4ce6e9 Refactor IngestDataChangeType to enum (#29519)
1f6ab4ce6e9 is described below

commit 1f6ab4ce6e96fc29ea3b85b99c57e7e123e75a61
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 23 22:12:26 2023 +0800

    Refactor IngestDataChangeType to enum (#29519)
---
 .../core/importer/sink/PipelineDataSourceSink.java |  8 +++----
 .../pipeline/core/ingest/IngestDataChangeType.java | 21 ++---------------
 .../memory/MultiplexMemoryPipelineChannel.java     |  2 +-
 .../pipeline/core/ingest/record/DataRecord.java    |  6 ++---
 .../ingest/record/group/DataRecordGroupEngine.java |  4 ++--
 .../data/pipeline/core/job/JobOperationType.java   |  2 +-
 .../mysql/ingest/MySQLIncrementalDumper.java       |  4 ++--
 .../ingest/wal/decode/MppdbDecodingPlugin.java     | 14 +++++++----
 .../postgresql/ingest/wal/WALEventConverter.java   |  2 +-
 .../ingest/wal/decode/TestDecodingPlugin.java      | 14 +++++++----
 .../cdc/util/DataRecordResultConvertUtils.java     | 27 +++++++++++++---------
 .../cdc/util/DataRecordResultConvertUtilsTest.java |  3 ++-
 .../core/importer/PipelineDataSourceSinkTest.java  | 26 ++++++++++-----------
 13 files changed, 67 insertions(+), 66 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 3fdc0c82e73..b2b875c55c3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -92,7 +92,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         }
         int insertRecordNumber = 0;
         for (DataRecord each : dataRecords) {
-            if (IngestDataChangeType.INSERT.equals(each.getType())) {
+            if (IngestDataChangeType.INSERT == each.getType()) {
                 insertRecordNumber++;
             }
         }
@@ -136,19 +136,19 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
                 connection.setAutoCommit(false);
             }
             switch (buffer.get(0).getType()) {
-                case IngestDataChangeType.INSERT:
+                case INSERT:
                     if (null != rateLimitAlgorithm) {
                         rateLimitAlgorithm.intercept(JobOperationType.INSERT, 
1);
                     }
                     executeBatchInsert(connection, buffer);
                     break;
-                case IngestDataChangeType.UPDATE:
+                case UPDATE:
                     if (null != rateLimitAlgorithm) {
                         rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 
1);
                     }
                     executeUpdate(connection, buffer);
                     break;
-                case IngestDataChangeType.DELETE:
+                case DELETE:
                     if (null != rateLimitAlgorithm) {
                         rateLimitAlgorithm.intercept(JobOperationType.DELETE, 
1);
                     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/IngestDataChangeType.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/IngestDataChangeType.java
index 849a7829121..05c9ae3ca99 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/IngestDataChangeType.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/IngestDataChangeType.java
@@ -17,27 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.core.ingest;
 
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
 /**
  * Ingest data change type.
  */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class IngestDataChangeType {
-    
-    /**
-     * Insert flag.
-     */
-    public static final String INSERT = "INSERT";
-    
-    /**
-     * Update flag.
-     */
-    public static final String UPDATE = "UPDATE";
+public enum IngestDataChangeType {
     
-    /**
-     * Delete flag.
-     */
-    public static final String DELETE = "DELETE";
+    INSERT, UPDATE, DELETE
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index da247ee2b18..a4ccfea4e94 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -57,7 +57,7 @@ public final class MultiplexMemoryPipelineChannel implements 
PipelineChannel {
             pushRecord(firstRecord);
             return;
         }
-        long insertDataRecordsCount = 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each
 -> IngestDataChangeType.INSERT.equals(each.getType())).count();
+        long insertDataRecordsCount = 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).filter(each
 -> IngestDataChangeType.INSERT == each.getType()).count();
         if (insertDataRecordsCount == records.size()) {
             channels.get(Math.abs(firstRecord.hashCode() % 
channelNumber)).pushRecords(records);
             return;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
index c347c8921e0..05168f929e8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
@@ -39,7 +39,7 @@ import java.util.List;
 @ToString
 public final class DataRecord extends Record {
     
-    private final String type;
+    private final IngestDataChangeType type;
     
     private final String schemaName;
     
@@ -55,11 +55,11 @@ public final class DataRecord extends Record {
     
     private Long csn;
     
-    public DataRecord(final String type, final String tableName, final 
IngestPosition position, final int columnCount) {
+    public DataRecord(final IngestDataChangeType type, final String tableName, 
final IngestPosition position, final int columnCount) {
         this(type, null, tableName, position, columnCount);
     }
     
-    public DataRecord(final String type, final String schemaName, final String 
tableName, final IngestPosition position, final int columnCount) {
+    public DataRecord(final IngestDataChangeType type, final String 
schemaName, final String tableName, final IngestPosition position, final int 
columnCount) {
         super(position);
         this.type = type;
         this.schemaName = schemaName;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
index cfc2cefc70b..f06abca65c9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngine.java
@@ -46,7 +46,7 @@ public final class DataRecordGroupEngine {
         Map<Key, Boolean> duplicateKeys = getDuplicateKeys(records);
         Collection<String> tableNames = new LinkedHashSet<>();
         Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
-        Map<String, Map<String, List<DataRecord>>> batchDataRecords = new 
LinkedHashMap<>();
+        Map<String, Map<IngestDataChangeType, List<DataRecord>>> 
batchDataRecords = new LinkedHashMap<>();
         for (DataRecord each : records) {
             tableNames.add(each.getTableName());
             if (duplicateKeys.getOrDefault(each.getKey(), false)) {
@@ -68,7 +68,7 @@ public final class DataRecordGroupEngine {
         return result;
     }
     
-    private GroupedDataRecord getGroupedDataRecord(final String tableName, 
final Map<String, List<DataRecord>> batchRecords, final List<DataRecord> 
nonBatchRecords) {
+    private GroupedDataRecord getGroupedDataRecord(final String tableName, 
final Map<IngestDataChangeType, List<DataRecord>> batchRecords, final 
List<DataRecord> nonBatchRecords) {
         return new GroupedDataRecord(tableName, 
batchRecords.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
                 batchRecords.getOrDefault(IngestDataChangeType.UPDATE, 
Collections.emptyList()), 
batchRecords.getOrDefault(IngestDataChangeType.DELETE, 
Collections.emptyList()), nonBatchRecords);
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobOperationType.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobOperationType.java
index 78b1eabd0db..c01d6c51d4f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobOperationType.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/JobOperationType.java
@@ -22,5 +22,5 @@ package org.apache.shardingsphere.data.pipeline.core.job;
  */
 public enum JobOperationType {
     
-    INSERT, DELETE, UPDATE, SELECT
+    INSERT, UPDATE, DELETE, SELECT
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index bb80230684d..32c7d96cd81 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -214,12 +214,12 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
         return dataTypeHandler.isPresent() ? 
dataTypeHandler.get().handle(value) : value;
     }
     
-    private DataRecord createDataRecord(final String type, final 
AbstractRowsEvent rowsEvent, final int columnCount) {
+    private DataRecord createDataRecord(final IngestDataChangeType type, final 
AbstractRowsEvent rowsEvent, final int columnCount) {
         String tableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
         IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), 
rowsEvent.getPosition(), rowsEvent.getServerId());
         DataRecord result = new DataRecord(type, tableName, position, 
columnCount);
         result.setActualTableName(rowsEvent.getTableName());
-        result.setCommitTime(rowsEvent.getTimestamp() * 1000);
+        result.setCommitTime(rowsEvent.getTimestamp() * 1000L);
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 34dbef71f30..eee5462af81 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -99,14 +99,20 @@ public final class MppdbDecodingPlugin implements 
DecodingPlugin {
         mppTableData = JsonUtils.fromJsonString(mppData, MppTableData.class);
         AbstractRowEvent result;
         String rowEventType = mppTableData.getOpType();
-        switch (rowEventType) {
-            case IngestDataChangeType.INSERT:
+        IngestDataChangeType type;
+        try {
+            type = IngestDataChangeType.valueOf(rowEventType);
+        } catch (final IllegalArgumentException ex) {
+            throw new IngestException("Unknown rowEventType: " + rowEventType);
+        }
+        switch (type) {
+            case INSERT:
                 result = readWriteRowEvent(mppTableData);
                 break;
-            case IngestDataChangeType.UPDATE:
+            case UPDATE:
                 result = readUpdateRowEvent(mppTableData);
                 break;
-            case IngestDataChangeType.DELETE:
+            case DELETE:
                 result = readDeleteRowEvent(mppTableData);
                 break;
             default:
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index 7be2b111705..3dd023f14a0 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -117,7 +117,7 @@ public final class WALEventConverter {
         return result;
     }
     
-    private DataRecord createDataRecord(final String type, final 
AbstractRowEvent rowsEvent, final int columnCount) {
+    private DataRecord createDataRecord(final IngestDataChangeType type, final 
AbstractRowEvent rowsEvent, final int columnCount) {
         String tableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString();
         DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), 
tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
         result.setActualTableName(rowsEvent.getTableName());
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index 12f62f55d14..6bafcc2df23 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -68,14 +68,20 @@ public final class TestDecodingPlugin implements 
DecodingPlugin {
         AbstractRowEvent result;
         String tableName = readTableName(data);
         String rowEventType = readRowEventType(data);
-        switch (rowEventType) {
-            case IngestDataChangeType.INSERT:
+        IngestDataChangeType type;
+        try {
+            type = IngestDataChangeType.valueOf(rowEventType);
+        } catch (final IllegalArgumentException ex) {
+            throw new IngestException("Unknown rowEventType: " + rowEventType);
+        }
+        switch (type) {
+            case INSERT:
                 result = readWriteRowEvent(data);
                 break;
-            case IngestDataChangeType.UPDATE:
+            case UPDATE:
                 result = readUpdateRowEvent(data);
                 break;
-            case IngestDataChangeType.DELETE:
+            case DELETE:
                 result = readDeleteRowEvent(data);
                 break;
             default:
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
index f7de6cbaadd..bf6d8e38e3f 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
@@ -21,14 +21,14 @@ import com.google.common.base.Strings;
 import com.google.protobuf.Any;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.DataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.MetaData;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.TableColumn;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -55,15 +55,20 @@ public final class DataRecordResultConvertUtils {
             
after.add(TableColumn.newBuilder().setName(column.getName()).setValue(Any.pack(ColumnValueConvertUtils.convertToProtobufMessage(column.getValue()))).build());
         }
         MetaData metaData = 
MetaData.newBuilder().setDatabase(database).setSchema(Strings.nullToEmpty(schema)).setTable(dataRecord.getTableName()).build();
-        DataChangeType dataChangeType = DataChangeType.UNKNOWN;
-        if (IngestDataChangeType.INSERT.equals(dataRecord.getType())) {
-            dataChangeType = DataChangeType.INSERT;
-        } else if (IngestDataChangeType.UPDATE.equals(dataRecord.getType())) {
-            dataChangeType = DataChangeType.UPDATE;
-        } else if (IngestDataChangeType.DELETE.equals(dataRecord.getType())) {
-            dataChangeType = DataChangeType.DELETE;
-        }
         return 
DataRecordResult.Record.newBuilder().setMetaData(metaData).addAllBefore(before).addAllAfter(after).setTransactionCommitMillis(dataRecord.getCommitTime())
-                .setDataChangeType(dataChangeType).build();
+                
.setDataChangeType(getDataChangeType(dataRecord.getType())).build();
+    }
+    
+    private static DataChangeType getDataChangeType(final IngestDataChangeType 
dataChangeType) {
+        switch (dataChangeType) {
+            case INSERT:
+                return DataChangeType.INSERT;
+            case UPDATE:
+                return DataChangeType.UPDATE;
+            case DELETE:
+                return DataChangeType.DELETE;
+            default:
+                return DataChangeType.UNKNOWN;
+        }
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index 3f15c9e6938..6d83fd4b865 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -23,6 +23,7 @@ import com.google.protobuf.TimestampProto;
 import com.google.protobuf.TypeRegistry;
 import com.google.protobuf.WrappersProto;
 import com.google.protobuf.util.JsonFormat;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
@@ -54,7 +55,7 @@ class DataRecordResultConvertUtilsTest {
     
     @Test
     void assertConvertDataRecordToRecord() throws 
InvalidProtocolBufferException, SQLException {
-        DataRecord dataRecord = new DataRecord("INSERT", "t_order", new 
IntegerPrimaryKeyIngestPosition(0, 1), 2);
+        DataRecord dataRecord = new DataRecord(IngestDataChangeType.INSERT, 
"t_order", new IntegerPrimaryKeyIngestPosition(0, 1), 2);
         dataRecord.addColumn(new Column("order_id", BigInteger.ONE, false, 
true));
         dataRecord.addColumn(new Column("price", BigDecimal.valueOf(123), 
false, false));
         dataRecord.addColumn(new Column("user_id", Long.MAX_VALUE, false, 
false));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 3fd011d49c2..6d76bb39f43 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -95,19 +95,19 @@ class PipelineDataSourceSinkTest {
     
     @Test
     void assertWriteInsertDataRecord() throws SQLException {
-        DataRecord insertRecord = getDataRecord("INSERT");
+        DataRecord insertRecord = getDataRecord(IngestDataChangeType.INSERT);
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(insertRecord));
         importer.run();
         verify(preparedStatement).setObject(1, 1);
         verify(preparedStatement).setObject(2, 10);
-        verify(preparedStatement).setObject(3, "INSERT");
+        verify(preparedStatement).setObject(3, IngestDataChangeType.INSERT);
         verify(preparedStatement).addBatch();
     }
     
     @Test
     void assertDeleteDataRecord() throws SQLException {
-        DataRecord deleteRecord = getDataRecord("DELETE");
+        DataRecord deleteRecord = getDataRecord(IngestDataChangeType.DELETE);
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(deleteRecord));
         when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
@@ -119,12 +119,12 @@ class PipelineDataSourceSinkTest {
     
     @Test
     void assertUpdateDataRecord() throws SQLException {
-        DataRecord updateRecord = getDataRecord("UPDATE");
+        DataRecord updateRecord = getDataRecord(IngestDataChangeType.UPDATE);
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), anyLong(), 
any())).thenReturn(mockRecords(updateRecord));
         importer.run();
         verify(preparedStatement).setObject(1, 20);
-        verify(preparedStatement).setObject(2, "UPDATE");
+        verify(preparedStatement).setObject(2, IngestDataChangeType.UPDATE);
         verify(preparedStatement).setObject(3, 1);
         verify(preparedStatement).setObject(4, 10);
         verify(preparedStatement).executeUpdate();
@@ -139,7 +139,7 @@ class PipelineDataSourceSinkTest {
         InOrder inOrder = inOrder(preparedStatement);
         inOrder.verify(preparedStatement).setObject(1, 2);
         inOrder.verify(preparedStatement).setObject(2, 10);
-        inOrder.verify(preparedStatement).setObject(3, "UPDATE");
+        inOrder.verify(preparedStatement).setObject(3, 
IngestDataChangeType.UPDATE);
         inOrder.verify(preparedStatement).setObject(4, 1);
         inOrder.verify(preparedStatement).setObject(5, 0);
         inOrder.verify(preparedStatement).executeUpdate();
@@ -149,7 +149,7 @@ class PipelineDataSourceSinkTest {
         DataRecord result = new DataRecord(IngestDataChangeType.UPDATE, 
TABLE_NAME, new IngestPlaceholderPosition(), 3);
         result.addColumn(new Column("id", 1, 2, true, true));
         result.addColumn(new Column("user", 0, 10, true, false));
-        result.addColumn(new Column("status", null, "UPDATE", true, false));
+        result.addColumn(new Column("status", null, 
IngestDataChangeType.UPDATE, true, false));
         return result;
     }
     
@@ -160,26 +160,26 @@ class PipelineDataSourceSinkTest {
         return result;
     }
     
-    private DataRecord getDataRecord(final String recordType) {
+    private DataRecord getDataRecord(final IngestDataChangeType recordType) {
         Integer idOldValue = null;
         Integer userOldValue = null;
         Integer idValue = null;
         Integer userValue = null;
-        String statusOldValue = null;
-        String statusValue = null;
-        if ("INSERT".equals(recordType)) {
+        IngestDataChangeType statusOldValue = null;
+        IngestDataChangeType statusValue = null;
+        if (IngestDataChangeType.INSERT == recordType) {
             idValue = 1;
             userValue = 10;
             statusValue = recordType;
         }
-        if ("UPDATE".equals(recordType)) {
+        if (IngestDataChangeType.UPDATE == recordType) {
             idOldValue = 1;
             idValue = idOldValue;
             userOldValue = 10;
             userValue = 20;
             statusValue = recordType;
         }
-        if ("DELETE".equals(recordType)) {
+        if (IngestDataChangeType.DELETE == recordType) {
             idOldValue = 1;
             userOldValue = 10;
             statusOldValue = recordType;

Reply via email to