This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f42a96069a1 Add Optional process logic in LocalDataQueryResultRow 
(#30112)
f42a96069a1 is described below

commit f42a96069a13217ebadc22e02f186eb427cc2126
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Feb 13 21:28:28 2024 +0800

    Add Optional process logic in LocalDataQueryResultRow (#30112)
---
 .../merge/result/impl/local/LocalDataQueryResultRow.java  |  4 ++++
 .../result/impl/local/LocalDataQueryResultRowTest.java    | 10 ++++++++++
 .../handler/query/ShowStreamingJobStatusExecutor.java     | 15 ++++++++++-----
 3 files changed, 24 insertions(+), 5 deletions(-)

diff --git 
a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRow.java
 
b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRow.java
index 0178b3393c1..d122a8732ec 100644
--- 
a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRow.java
+++ 
b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRow.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.infra.util.json.JsonUtils;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -42,6 +43,9 @@ public final class LocalDataQueryResultRow {
         if (null == data) {
             return "";
         }
+        if (data instanceof Optional) {
+            return ((Optional<?>) data).isPresent() ? convert(((Optional<?>) 
data).get()) : "";
+        }
         if (data instanceof Boolean || data instanceof Integer || data 
instanceof Long) {
             return data.toString();
         }
diff --git 
a/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRowTest.java
 
b/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRowTest.java
index 8831c92c02f..b7bdd98908f 100644
--- 
a/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRowTest.java
+++ 
b/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRowTest.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.test.util.PropertiesBuilder;
 import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
 import org.junit.jupiter.api.Test;
 
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -83,6 +84,15 @@ class LocalDataQueryResultRowTest {
         assertThat(actual.getCell(2), is("{\"foo\":\"bar\"}"));
     }
     
+    @Test
+    void assertGetCellWithOptional() {
+        LocalDataQueryResultRow actual = new 
LocalDataQueryResultRow(Optional.empty(), Optional.of("foo"), Optional.of(1), 
Optional.of(PropertiesBuilder.build(new Property("foo", "bar"))));
+        assertThat(actual.getCell(1), is(""));
+        assertThat(actual.getCell(2), is("foo"));
+        assertThat(actual.getCell(3), is("1"));
+        assertThat(actual.getCell(4), is("{\"foo\":\"bar\"}"));
+    }
+    
     private enum FixtureEnum {
         FOO, BAR
     }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index 24ea3f6ee76..06bc06d99d2 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -59,14 +60,18 @@ public final class ShowStreamingJobStatusExecutor 
implements DistSQLQueryExecuto
         if (null == jobItemProgress) {
             return new 
LocalDataQueryResultRow(transmissionJobItemInfo.getShardingItem(), "", "", "", 
"", "", "", "", "", transmissionJobItemInfo.getErrorMessage());
         }
-        String incrementalIdleSeconds = "";
+        return new 
LocalDataQueryResultRow(transmissionJobItemInfo.getShardingItem(), 
jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(), 
jobItemProgress.isActive(),
+                jobItemProgress.getProcessedRecordsCount(), 
transmissionJobItemInfo.getInventoryFinishedPercentage(),
+                getIncrementalIdleSeconds(jobItemProgress, 
transmissionJobItemInfo, currentTimeMillis), 
cdcJobItemInfo.getConfirmedPosition(), cdcJobItemInfo.getCurrentPosition(),
+                transmissionJobItemInfo.getErrorMessage());
+    }
+    
+    private static Optional<Long> getIncrementalIdleSeconds(final 
TransmissionJobItemProgress jobItemProgress, final TransmissionJobItemInfo 
transmissionJobItemInfo, final long currentTimeMillis) {
         if 
(jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
             long latestActiveTimeMillis = 
Math.max(transmissionJobItemInfo.getStartTimeMillis(), 
jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
-            incrementalIdleSeconds = 
String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - 
latestActiveTimeMillis));
+            return 
Optional.of(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - 
latestActiveTimeMillis));
         }
-        return new 
LocalDataQueryResultRow(transmissionJobItemInfo.getShardingItem(), 
jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
-                jobItemProgress.isActive(), 
jobItemProgress.getProcessedRecordsCount(), 
transmissionJobItemInfo.getInventoryFinishedPercentage(),
-                incrementalIdleSeconds, cdcJobItemInfo.getConfirmedPosition(), 
cdcJobItemInfo.getCurrentPosition(), transmissionJobItemInfo.getErrorMessage());
+        return Optional.empty();
     }
     
     @Override

Reply via email to