This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f42a96069a1 Add Optional process logic in LocalDataQueryResultRow
(#30112)
f42a96069a1 is described below
commit f42a96069a13217ebadc22e02f186eb427cc2126
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Feb 13 21:28:28 2024 +0800
Add Optional process logic in LocalDataQueryResultRow (#30112)
---
.../merge/result/impl/local/LocalDataQueryResultRow.java | 4 ++++
.../result/impl/local/LocalDataQueryResultRowTest.java | 10 ++++++++++
.../handler/query/ShowStreamingJobStatusExecutor.java | 15 ++++++++++-----
3 files changed, 24 insertions(+), 5 deletions(-)
diff --git
a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRow.java
b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRow.java
index 0178b3393c1..d122a8732ec 100644
---
a/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRow.java
+++
b/infra/merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRow.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.infra.util.json.JsonUtils;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -42,6 +43,9 @@ public final class LocalDataQueryResultRow {
if (null == data) {
return "";
}
+ if (data instanceof Optional) {
+ return ((Optional<?>) data).isPresent() ? convert(((Optional<?>)
data).get()) : "";
+ }
if (data instanceof Boolean || data instanceof Integer || data
instanceof Long) {
return data.toString();
}
diff --git
a/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRowTest.java
b/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRowTest.java
index 8831c92c02f..b7bdd98908f 100644
---
a/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRowTest.java
+++
b/infra/merge/src/test/java/org/apache/shardingsphere/infra/merge/result/impl/local/LocalDataQueryResultRowTest.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.test.util.PropertiesBuilder;
import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
import org.junit.jupiter.api.Test;
+import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
@@ -83,6 +84,15 @@ class LocalDataQueryResultRowTest {
assertThat(actual.getCell(2), is("{\"foo\":\"bar\"}"));
}
+ @Test
+ void assertGetCellWithOptional() {
+ LocalDataQueryResultRow actual = new
LocalDataQueryResultRow(Optional.empty(), Optional.of("foo"), Optional.of(1),
Optional.of(PropertiesBuilder.build(new Property("foo", "bar"))));
+ assertThat(actual.getCell(1), is(""));
+ assertThat(actual.getCell(2), is("foo"));
+ assertThat(actual.getCell(3), is("1"));
+ assertThat(actual.getCell(4), is("{\"foo\":\"bar\"}"));
+ }
+
private enum FixtureEnum {
FOO, BAR
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index 24ea3f6ee76..06bc06d99d2 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -59,14 +60,18 @@ public final class ShowStreamingJobStatusExecutor
implements DistSQLQueryExecuto
if (null == jobItemProgress) {
return new
LocalDataQueryResultRow(transmissionJobItemInfo.getShardingItem(), "", "", "",
"", "", "", "", "", transmissionJobItemInfo.getErrorMessage());
}
- String incrementalIdleSeconds = "";
+ return new
LocalDataQueryResultRow(transmissionJobItemInfo.getShardingItem(),
jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
jobItemProgress.isActive(),
+ jobItemProgress.getProcessedRecordsCount(),
transmissionJobItemInfo.getInventoryFinishedPercentage(),
+ getIncrementalIdleSeconds(jobItemProgress,
transmissionJobItemInfo, currentTimeMillis),
cdcJobItemInfo.getConfirmedPosition(), cdcJobItemInfo.getCurrentPosition(),
+ transmissionJobItemInfo.getErrorMessage());
+ }
+
+ private static Optional<Long> getIncrementalIdleSeconds(final
TransmissionJobItemProgress jobItemProgress, final TransmissionJobItemInfo
transmissionJobItemInfo, final long currentTimeMillis) {
if
(jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
long latestActiveTimeMillis =
Math.max(transmissionJobItemInfo.getStartTimeMillis(),
jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
- incrementalIdleSeconds =
String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis -
latestActiveTimeMillis));
+ return
Optional.of(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis -
latestActiveTimeMillis));
}
- return new
LocalDataQueryResultRow(transmissionJobItemInfo.getShardingItem(),
jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
- jobItemProgress.isActive(),
jobItemProgress.getProcessedRecordsCount(),
transmissionJobItemInfo.getInventoryFinishedPercentage(),
- incrementalIdleSeconds, cdcJobItemInfo.getConfirmedPosition(),
cdcJobItemInfo.getCurrentPosition(), transmissionJobItemInfo.getErrorMessage());
+ return Optional.empty();
}
@Override