This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4ef338f add unit test for shardingsphere-scaling (#6963)
4ef338f is described below
commit 4ef338fcb2f7b6974441787903f36b1ae3f1a705
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Fri Aug 21 17:33:53 2020 +0800
add unit test for shardingsphere-scaling (#6963)
* add unit test for shardingsphere-scaling-core
* add generic for ResponseContent
* add unit test
* add final keyword
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/utils/ResponseContentUtil.java | 10 +-
.../scaling/utils/ReflectionUtil.java | 71 ++++++++++++++
.../scaling/utils/ResponseContentUtilTest.java | 47 +++++++++
.../scaling/web/HttpServerHandlerTest.java | 50 ++++++----
.../scaling/web/HttpServerInitializerTest.java | 54 ++++++++++
.../engine/ShardingScalingExecuteEngine.java | 9 +-
.../core/execute/executor/channel/Channel.java | 2 +-
.../execute/executor/channel/MemoryChannel.java | 7 +-
.../executor/dumper/AbstractJDBCDumper.java | 8 +-
...osition.java => FinishedInventoryPosition.java} | 4 +-
...tion.java => PlaceholderInventoryPosition.java} | 4 +-
.../resume/AbstractResumeBreakPointManager.java | 6 +-
.../splitter/InventoryDataTaskSplitter.java | 4 +-
.../scaling/core/metadata/JdbcUri.java | 2 +-
.../scaling/core/utils/InventoryPositionUtil.java | 4 +-
.../ThreadUtil.java} | 27 +++--
.../engine/ShardingScalingExecuteEngineTest.java | 33 ++++++-
.../executor/channel/DistributionChannelTest.java | 109 +++++++++++++++++++++
.../core/fixture/FixtureH2ScalingEntry.java | 2 +-
.../scaling/core/fixture/FixtureNopManager.java | 53 ++++++++++
.../position/PlaceholderIncrementalPosition.java} | 4 +-
.../AbstractResumeBreakPointManagerTest.java | 56 +++++++----
.../scaling/mysql/MySQLJdbcDumper.java | 26 ++---
.../scaling/mysql/BinlogPositionTest.java | 7 ++
.../scaling/mysql/MySQLBinlogDumperTest.java} | 19 +---
.../scaling/mysql/MySQLDataSourceCheckerTest.java | 29 +++++-
.../scaling/mysql/MySQLImporterTest.java | 56 +++++++++++
.../scaling/mysql/MySQLJdbcDumperTest.java | 107 ++++++++++++++++++++
.../scaling/mysql/MySQLPositionManagerTest.java | 12 ++-
.../postgresql/PostgreSQLPositionManager.java | 2 +-
.../scaling/postgresql/PostgreSQLWalDumper.java | 10 +-
.../postgresql/PostgreSQLPositionManagerTest.java | 26 +++--
.../postgresql/PostgreSQLSqlBuilderTest.java | 44 +++++++++
33 files changed, 773 insertions(+), 131 deletions(-)
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtil.java
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtil.java
index 290b8c8..6ce7ea3 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtil.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtil.java
@@ -33,7 +33,7 @@ public final class ResponseContentUtil {
*
* @return response result
*/
- public static ResponseContent success() {
+ public static ResponseContent<?> success() {
return build(null);
}
@@ -57,8 +57,8 @@ public final class ResponseContentUtil {
* @param errorMsg error message
* @return response result
*/
- public static ResponseContent handleBadRequest(final String errorMsg) {
- ResponseContent result = new ResponseContent<>();
+ public static ResponseContent<?> handleBadRequest(final String errorMsg) {
+ ResponseContent<?> result = new ResponseContent<>();
result.setSuccess(false);
result.setErrorCode(ResponseCode.BAD_REQUEST);
result.setErrorMsg(errorMsg);
@@ -71,8 +71,8 @@ public final class ResponseContentUtil {
* @param errorMsg error message
* @return response result
*/
- public static ResponseContent handleException(final String errorMsg) {
- ResponseContent result = new ResponseContent<>();
+ public static ResponseContent<?> handleException(final String errorMsg) {
+ ResponseContent<?> result = new ResponseContent<>();
result.setSuccess(false);
result.setErrorCode(ResponseCode.SERVER_ERROR);
result.setErrorMsg(errorMsg);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ReflectionUtil.java
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ReflectionUtil.java
new file mode 100644
index 0000000..1bb1e69
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ReflectionUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.lang.reflect.Field;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ReflectionUtil {
+
+ /**
+ * Get field from class.
+ *
+ * @param targetClass target class
+ * @param fieldName field name
+ * @param isDeclared is declared
+ * @return {@link Field}
+ * @throws NoSuchFieldException no such field exception
+ */
+ public static Field getFieldFromClass(final Class<?> targetClass, final
String fieldName, final boolean isDeclared) throws NoSuchFieldException {
+ Field result;
+ if (isDeclared) {
+ result = targetClass.getDeclaredField(fieldName);
+ } else {
+ result = targetClass.getField(fieldName);
+ }
+ result.setAccessible(true);
+ return result;
+ }
+
+ /**
+ * Get field value from instance target object.
+ *
+ * @param target target object
+ * @param fieldName field name
+ * @param valueClass expected value class
+ * @param <T> expected value class
+ * @return target filed value
+ * @throws NoSuchFieldException no such field exception
+ * @throws IllegalAccessException illegal access exception
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getFieldValueFromClass(final Object target, final
String fieldName, final Class<T> valueClass) throws NoSuchFieldException,
IllegalAccessException {
+ Field field = getFieldFromClass(target.getClass(), fieldName, true);
+ Object value = field.get(target);
+ if (null == value) {
+ return null;
+ }
+ if (value.getClass().isAssignableFrom(value.getClass())) {
+ return (T) value;
+ }
+ throw new ClassCastException("field " + fieldName + " is " +
target.getClass().getName() + " can cast to " + valueClass.getName());
+ }
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtilTest.java
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtilTest.java
new file mode 100644
index 0000000..2652107
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/utils/ResponseContentUtilTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.utils;
+
+import org.apache.shardingsphere.scaling.web.entity.ResponseCode;
+import org.apache.shardingsphere.scaling.web.entity.ResponseContent;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class ResponseContentUtilTest {
+
+ public static final String ERROR_MESSAGE = "error message.";
+
+ @Test
+ public void assertHandleBadRequest() {
+ ResponseContent<?> responseContent =
ResponseContentUtil.handleBadRequest(ERROR_MESSAGE);
+ assertThat(responseContent.getErrorMsg(), is(ERROR_MESSAGE));
+ assertThat(responseContent.getErrorCode(),
is(ResponseCode.BAD_REQUEST));
+ assertNull(responseContent.getModel());
+ }
+
+ @Test
+ public void assertHandleException() {
+ ResponseContent<?> responseContent =
ResponseContentUtil.handleException(ERROR_MESSAGE);
+ assertThat(responseContent.getErrorMsg(), is(ERROR_MESSAGE));
+ assertThat(responseContent.getErrorCode(),
is(ResponseCode.SERVER_ERROR));
+ assertNull(responseContent.getModel());
+ }
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
index d4e4915..8cbc268 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
@@ -45,6 +45,7 @@ import java.util.Map;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
@@ -70,6 +71,14 @@ public final class HttpServerHandlerTest {
@Test
public void assertChannelReadStartSuccess() {
+ startScalingJob();
+ ArgumentCaptor<FullHttpResponse> argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
+ verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
+ FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
+
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("{\"success\":true"));
+ }
+
+ private void startScalingJob() {
scalingConfiguration.getRuleConfiguration().setSourceDatasource("ds_0:
!!" + YamlDataSourceConfiguration.class.getName() + "\n "
+ "dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n
props:\n "
+ "jdbcUrl:
jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL\n
username: root\n password: 'password'\n connectionTimeout: 30000\n "
@@ -80,20 +89,27 @@ public final class HttpServerHandlerTest {
ByteBuf byteBuf =
Unpooled.copiedBuffer(GSON.toJson(scalingConfiguration), CharsetUtil.UTF_8);
fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.POST, "/scaling/job/start", byteBuf);
httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
- ArgumentCaptor argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
- verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
- FullHttpResponse fullHttpResponse = (FullHttpResponse)
argumentCaptor.getValue();
-
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("{\"success\":true"));
}
@Test
- public void assertChannelReadProgress() {
- fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/scaling/job/progress/2");
+ public void assertChannelReadProgressFail() {
+ fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/scaling/job/progress/9");
httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
- ArgumentCaptor argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
+ ArgumentCaptor<FullHttpResponse> argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
- FullHttpResponse fullHttpResponse = (FullHttpResponse)
argumentCaptor.getValue();
-
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("Can't
find scaling job id 2"));
+ FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
+
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("Can't
find scaling job id 9"));
+ }
+
+ @Test
+ public void assertChannelReadProgressSuccess() {
+ startScalingJob();
+ fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/scaling/job/progress/1");
+ httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
+ ArgumentCaptor<FullHttpResponse> argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
+ verify(channelHandlerContext,
times(2)).writeAndFlush(argumentCaptor.capture());
+ FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
+
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("{\"success\":true"));
}
@Test
@@ -103,9 +119,9 @@ public final class HttpServerHandlerTest {
ByteBuf byteBuf = Unpooled.copiedBuffer(GSON.toJson(map),
CharsetUtil.UTF_8);
fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.POST, "/scaling/job/stop", byteBuf);
httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
- ArgumentCaptor argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
+ ArgumentCaptor<FullHttpResponse> argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
- FullHttpResponse fullHttpResponse = (FullHttpResponse)
argumentCaptor.getValue();
+ FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("{\"success\":true"));
}
@@ -113,9 +129,9 @@ public final class HttpServerHandlerTest {
public void assertChannelReadList() {
fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.GET, "/scaling/job/list");
httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
- ArgumentCaptor argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
+ ArgumentCaptor<FullHttpResponse> argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
- FullHttpResponse fullHttpResponse = (FullHttpResponse)
argumentCaptor.getValue();
+ FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("{\"success\":true"));
}
@@ -123,9 +139,9 @@ public final class HttpServerHandlerTest {
public void assertChannelReadUnsupportedUrl() {
fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.DELETE, "/scaling/1");
httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
- ArgumentCaptor argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
+ ArgumentCaptor<FullHttpResponse> argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
- FullHttpResponse fullHttpResponse = (FullHttpResponse)
argumentCaptor.getValue();
+ FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("Not
support request!"));
}
@@ -133,9 +149,9 @@ public final class HttpServerHandlerTest {
public void assertChannelReadUnsupportedMethod() {
fullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.DELETE, "/scaling/job/stop");
httpServerHandler.channelRead0(channelHandlerContext, fullHttpRequest);
- ArgumentCaptor argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
+ ArgumentCaptor<FullHttpResponse> argumentCaptor =
ArgumentCaptor.forClass(FullHttpResponse.class);
verify(channelHandlerContext).writeAndFlush(argumentCaptor.capture());
- FullHttpResponse fullHttpResponse = (FullHttpResponse)
argumentCaptor.getValue();
+ FullHttpResponse fullHttpResponse = argumentCaptor.getValue();
assertTrue(fullHttpResponse.content().toString(CharsetUtil.UTF_8).contains("Not
support request!"));
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
new file mode 100644
index 0000000..0f87fe4
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.web;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class HttpServerInitializerTest {
+
+ @Mock
+ private SocketChannel socketChannel;
+
+ @Mock
+ private ChannelPipeline channelPipeline;
+
+ @Before
+ public void setUp() {
+ when(socketChannel.pipeline()).thenReturn(channelPipeline);
+ }
+
+ @Test
+ public void assertInitChannel() {
+ HttpServerInitializer httpServerInitializer = new
HttpServerInitializer();
+ httpServerInitializer.initChannel(socketChannel);
+ verify(channelPipeline, times(3)).addLast(any(ChannelHandler.class));
+ }
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
index 261aba7..bba6b04 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingEx
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -73,13 +74,13 @@ public final class ShardingScalingExecuteEngine {
* @return execute future of all
*/
public Future<?> submitAll(final Collection<? extends
ShardingScalingExecutor> shardingScalingExecutors, final ExecuteCallback
executeCallback) {
- Collection<ListenableFuture<Object>> listenableFutures = new
ArrayList<>(shardingScalingExecutors.size());
+ Collection<ListenableFuture<?>> listenableFutures = new
ArrayList<>(shardingScalingExecutors.size());
for (ShardingScalingExecutor each : shardingScalingExecutors) {
- ListenableFuture listenableFuture = executorService.submit(each);
+ ListenableFuture<?> listenableFuture =
executorService.submit(each);
listenableFutures.add(listenableFuture);
}
- ListenableFuture result = Futures.allAsList(listenableFutures);
- Futures.addCallback(result, new
ExecuteFutureCallback<Collection<Object>>(executeCallback), executorService);
+ ListenableFuture<List<Object>> result =
Futures.allAsList(listenableFutures);
+ Futures.addCallback(result, new
ExecuteFutureCallback<Collection<?>>(executeCallback), executorService);
return result;
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/Channel.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/Channel.java
index 1b38825..6c31d2e 100755
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/Channel.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/Channel.java
@@ -38,7 +38,7 @@ public interface Channel {
* fetch {@code Record} from channel, if the timeout also returns the
record.
*
* @param batchSize record batch size
- * @param timeout value
+ * @param timeout timeout(seconds)
* @return record
*/
List<Record> fetchRecords(int batchSize, int timeout);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
index 72dd930..da20ad1 100755
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.scaling.core.execute.executor.channel;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -54,11 +55,7 @@ public final class MemoryChannel implements Channel {
if (timeout * 1000L <= System.currentTimeMillis() - start) {
break;
}
- try {
- Thread.sleep(100L);
- } catch (final InterruptedException ignored) {
- break;
- }
+ ThreadUtil.sleep(100L);
}
queue.drainTo(records, batchSize);
toBeAcknowledgeRecords.addAll(records);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index a47bec2..30da411 100755
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -32,10 +32,10 @@ import
org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import
org.apache.shardingsphere.scaling.core.job.position.FinishedInventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import
org.apache.shardingsphere.scaling.core.job.position.PlaceholderInventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
import org.apache.shardingsphere.scaling.core.utils.RdbmsConfigurationUtil;
@@ -98,7 +98,7 @@ public abstract class AbstractJDBCDumper extends
AbstractShardingScalingExecutor
}
pushRecord(record);
}
- pushRecord(new FinishedRecord(new FinishedPosition()));
+ pushRecord(new FinishedRecord(new FinishedInventoryPosition()));
} catch (final SQLException ex) {
stop();
channel.close();
@@ -110,7 +110,7 @@ public abstract class AbstractJDBCDumper extends
AbstractShardingScalingExecutor
private InventoryPosition newInventoryPosition(final ResultSet rs) throws
SQLException {
if (null == inventoryDumperConfiguration.getPrimaryKey()) {
- return new PlaceholderPosition();
+ return new PlaceholderInventoryPosition();
}
return new
PrimaryKeyPosition(rs.getLong(inventoryDumperConfiguration.getPrimaryKey()),
((PrimaryKeyPosition)
inventoryDumperConfiguration.getPositionManager().getPosition()).getEndValue());
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedInventoryPosition.java
similarity index 91%
copy from
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
copy to
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedInventoryPosition.java
index 212f168..a094116 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedInventoryPosition.java
@@ -21,9 +21,9 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
/**
- * Finished position.
+ * Finished inventory position.
*/
-public final class FinishedPosition implements InventoryPosition {
+public final class FinishedInventoryPosition implements InventoryPosition {
@Override
public JsonElement toJson() {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderInventoryPosition.java
similarity index 90%
rename from
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
rename to
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderInventoryPosition.java
index 416f594..e3d17e1 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderInventoryPosition.java
@@ -21,9 +21,9 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
/**
- * Placeholder position.
+ * Placeholder inventory position.
*/
-public final class PlaceholderPosition implements InventoryPosition {
+public final class PlaceholderInventoryPosition implements InventoryPosition {
@Override
public JsonElement toJson() {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
index d312c7e..6e26122 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
@@ -26,7 +26,7 @@ import com.google.gson.JsonParser;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import
org.apache.shardingsphere.scaling.core.job.position.FinishedInventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import
org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
@@ -83,7 +83,7 @@ public abstract class AbstractResumeBreakPointManager
implements ResumeBreakPoin
inventoryPositionManagerMap.put(entry.getKey(), new
InventoryPositionManager<>(entry.getValue()));
}
for (String each : inventoryPositions.getFinished()) {
- inventoryPositionManagerMap.put(each, new
InventoryPositionManager<>(new FinishedPosition()));
+ inventoryPositionManagerMap.put(each, new
InventoryPositionManager<>(new FinishedInventoryPosition()));
}
}
@@ -103,7 +103,7 @@ public abstract class AbstractResumeBreakPointManager
implements ResumeBreakPoin
JsonObject unfinished = new JsonObject();
Set<String> finished = Sets.newHashSet();
for (Entry<String, PositionManager<InventoryPosition>> entry :
inventoryPositionManagerMap.entrySet()) {
- if (entry.getValue().getPosition() instanceof FinishedPosition) {
+ if (entry.getValue().getPosition() instanceof
FinishedInventoryPosition) {
finished.add(entry.getKey());
continue;
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index afb75fb..3823eaf 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import
org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import
org.apache.shardingsphere.scaling.core.job.position.PlaceholderInventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
@@ -85,7 +85,7 @@ public final class InventoryDataTaskSplitter {
for (String each : dumperConfiguration.getTableNameMap().keySet()) {
InventoryDumperConfiguration dumperConfig = new
InventoryDumperConfiguration(dumperConfiguration);
dumperConfig.setTableName(each);
- dumperConfig.setPositionManager(new InventoryPositionManager<>(new
PlaceholderPosition()));
+ dumperConfig.setPositionManager(new InventoryPositionManager<>(new
PlaceholderInventoryPosition()));
result.add(dumperConfig);
}
return result;
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/metadata/JdbcUri.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/metadata/JdbcUri.java
index 14ab58b..0c88238 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/metadata/JdbcUri.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/metadata/JdbcUri.java
@@ -75,7 +75,7 @@ public final class JdbcUri {
* @return database name
*/
public String getDatabase() {
- return jdbcUri.getPath().replaceFirst("/", "");
+ return null == jdbcUri.getPath() ? "" :
jdbcUri.getPath().replaceFirst("/", "");
}
/**
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/InventoryPositionUtil.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/InventoryPositionUtil.java
index c4db6c1..20a9558 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/InventoryPositionUtil.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/InventoryPositionUtil.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.utils;
import com.google.gson.Gson;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import
org.apache.shardingsphere.scaling.core.job.position.PlaceholderInventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import java.util.List;
@@ -42,6 +42,6 @@ public final class InventoryPositionUtil {
if (2 == values.size()) {
return new PrimaryKeyPosition(values.get(0).longValue(),
values.get(1).longValue());
}
- return new PlaceholderPosition();
+ return new PlaceholderInventoryPosition();
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ThreadUtil.java
similarity index 67%
copy from
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
copy to
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ThreadUtil.java
index 212f168..fda741c 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ThreadUtil.java
@@ -15,23 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.job.position;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
+package org.apache.shardingsphere.scaling.core.utils;
/**
- * Finished position.
+ * Thread util.
*/
-public final class FinishedPosition implements InventoryPosition {
-
- @Override
- public JsonElement toJson() {
- return new JsonObject();
- }
+public final class ThreadUtil {
- @Override
- public int compareTo(final Position o) {
- return 0;
+ /**
+ * Sleep ignored InterruptedException.
+ *
+ * @param millis sleep time.
+ */
+ public static void sleep(final long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException ignored) {
+ }
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
index a464d77..2153813 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
@@ -20,8 +20,11 @@ package
org.apache.shardingsphere.scaling.core.execute.engine;
import
org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
import org.junit.Test;
+import java.util.Collections;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
public final class ShardingScalingExecuteEngineTest {
@@ -31,13 +34,41 @@ public final class ShardingScalingExecuteEngineTest {
ShardingScalingExecuteEngine executeEngine = new
ShardingScalingExecuteEngine(2);
try {
for (int i = 0; i < 5; i++) {
- executeEngine.submit(mockShardingScalingExecutor());
+ Future<?> submit =
executeEngine.submit(mockShardingScalingExecutor());
+ assertFalse(submit.isCancelled());
}
} catch (final RejectedExecutionException ex) {
fail();
}
}
+ @Test
+ public void assertSubmitAllMoreThanMaxWorkerNumber() {
+ ShardingScalingExecuteEngine executeEngine = new
ShardingScalingExecuteEngine(2);
+ try {
+ for (int i = 0; i < 5; i++) {
+ Future<?> submit =
executeEngine.submitAll(Collections.singletonList(mockShardingScalingExecutor()),
mockExecuteCallback());
+ assertFalse(submit.isCancelled());
+ }
+ } catch (final RejectedExecutionException ex) {
+ fail();
+ }
+ }
+
+ private ExecuteCallback mockExecuteCallback() {
+ return new ExecuteCallback() {
+ @Override
+ public void onSuccess() {
+
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+
+ }
+ };
+ }
+
private ShardingScalingExecutor mockShardingScalingExecutor() {
return new ShardingScalingExecutor() {
@Override
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannelTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannelTest.java
new file mode 100644
index 0000000..9efc85d
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannelTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.channel;
+
+import com.google.gson.JsonElement;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import
org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
+import
org.apache.shardingsphere.scaling.core.execute.executor.record.PlaceholderRecord;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class DistributionChannelTest {
+
+ private DistributionChannel distributionChannel;
+
+ @Before
+ public void setUp() {
+ ScalingContext.getInstance().init(new ServerConfiguration());
+ }
+
+ @Test
+ @SneakyThrows(InterruptedException.class)
+ public void assertAckCallbackResultSortable() {
+ distributionChannel = new DistributionChannel(2, records -> {
+ assertThat(records.size(), is(2));
+ assertTrue(((IntPosition) records.get(0).getPosition()).getId() <
((IntPosition) records.get(1).getPosition()).getId());
+ });
+ distributionChannel.pushRecord(new PlaceholderRecord(new
IntPosition(1)));
+ distributionChannel.pushRecord(new PlaceholderRecord(new
IntPosition(2)));
+ fetchRecordsAndSleep(0);
+ fetchRecordsAndSleep(1);
+ invokeAckRecords0();
+ }
+
+ private void fetchRecordsAndSleep(final int millis) {
+ new Thread(() -> {
+ distributionChannel.fetchRecords(1, 0);
+ if (millis > 0) {
+ ThreadUtil.sleep(millis);
+ }
+ distributionChannel.ack();
+ }).start();
+ }
+
+ @Test
+ @SneakyThrows(InterruptedException.class)
+ public void assertBroadcastFinishedRecord() {
+ distributionChannel = new DistributionChannel(2, records ->
assertThat(records.size(), is(2)));
+ distributionChannel.pushRecord(new FinishedRecord(new NopPosition()));
+ }
+
+ @SneakyThrows(ReflectiveOperationException.class)
+ private void invokeAckRecords0() {
+ Method ackRecords0 =
DistributionChannel.class.getDeclaredMethod("ackRecords0");
+ ackRecords0.setAccessible(true);
+ ackRecords0.invoke(distributionChannel);
+ }
+
+ @After
+ public void tearDown() {
+ distributionChannel.close();
+ }
+
+ @AllArgsConstructor
+ @Getter
+ private static class IntPosition implements Position {
+
+ private final int id;
+
+ @Override
+ public int compareTo(final Position position) {
+ return id - ((IntPosition) position).id;
+ }
+
+ @Override
+ public JsonElement toJson() {
+ return null;
+ }
+ }
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index b856a2d..b1f32c5 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -39,7 +39,7 @@ public final class FixtureH2ScalingEntry implements
ScalingEntry {
@Override
public Class<? extends PositionManager<IncrementalPosition>>
getPositionManager() {
- return null;
+ return FixtureNopManager.class;
}
@Override
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureNopManager.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureNopManager.java
new file mode 100644
index 0000000..e71e175
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureNopManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.fixture;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+
+import javax.sql.DataSource;
+
+@RequiredArgsConstructor
+public final class FixtureNopManager extends
BasePositionManager<IncrementalPosition> implements
PositionManager<IncrementalPosition> {
+
+ private DataSource dataSource;
+
+ public FixtureNopManager(final String position) {
+ }
+
+ @Override
+ public IncrementalPosition getPosition() {
+
+ return new IncrementalPosition() {
+ @Override
+ public int compareTo(final Position o) {
+ return 0;
+ }
+
+ @Override
+ public JsonElement toJson() {
+ return new JsonObject();
+ }
+ };
+ }
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderIncrementalPosition.java
similarity index 90%
copy from
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
copy to
shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderIncrementalPosition.java
index 212f168..03b5b66 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderIncrementalPosition.java
@@ -21,9 +21,9 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
/**
- * Finished position.
+ * Placeholder inventory position.
*/
-public final class FinishedPosition implements InventoryPosition {
+public final class PlaceholderIncrementalPosition implements
IncrementalPosition {
@Override
public JsonElement toJson() {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
similarity index 77%
rename from
shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
rename to
shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
index 766e99d..ba9866f 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
@@ -18,15 +18,16 @@
package org.apache.shardingsphere.scaling.core.job.position.resume;
import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import
org.apache.shardingsphere.scaling.core.job.position.FinishedInventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import
org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import
org.apache.shardingsphere.scaling.core.job.position.PlaceholderIncrementalPosition;
+import
org.apache.shardingsphere.scaling.core.job.position.PlaceholderInventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
-import org.apache.shardingsphere.scaling.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -39,7 +40,7 @@ public final class AbstractResumeBreakPointManagerTest {
private AbstractResumeBreakPointManager resumeBreakPointManager;
- private final String incrementalPosition =
"{\"ds0\":{\"filename\":\"mysql-bin.000001\",\"position\":4},\"ds1\":{\"filename\":\"mysql-bin.000002\",\"position\":4}}";
+ private final String incrementalPosition = "{\"ds0\":{},\"ds1\":{}}";
private final String inventoryPosition =
"{\"unfinished\":{\"ds0.t_order_1#0\":[0,100],\"ds0.t_order_2\":[],\"ds1.t_order_1#0\":[0,200]},\"finished\":[\"ds0.t_order_1#1\"]}";
@@ -47,8 +48,8 @@ public final class AbstractResumeBreakPointManagerTest {
public void setUp() throws NoSuchFieldException, IllegalAccessException {
resumeBreakPointManager = new AbstractResumeBreakPointManager() {
};
- resumeBreakPointManager.setDatabaseType("MySQL");
- resumeBreakPointManager.setTaskPath("/scalingTest/item-0");
+ resumeBreakPointManager.setDatabaseType("H2");
+ resumeBreakPointManager.setTaskPath("/");
ReflectionUtil.getFieldFromClass(AbstractResumeBreakPointManager.class,
"inventoryPositionManagerMap", true)
.set(resumeBreakPointManager, new TreeMap<String,
PositionManager<InventoryPosition>>());
ReflectionUtil.getFieldFromClass(AbstractResumeBreakPointManager.class,
"incrementalPositionManagerMap", true)
@@ -57,20 +58,24 @@ public final class AbstractResumeBreakPointManagerTest {
@Test
public void assertResumeIncrementalPosition() {
- resumeBreakPointManager.resumeIncrementalPosition(incrementalPosition);
-
assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(),
is(2));
+ resumeBreakPointManager.resumeInventoryPosition("");
+
assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(),
is(0));
+ resumeBreakPointManager.resumeInventoryPosition(inventoryPosition);
+
assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(),
is(4));
}
@Test
public void assertResumeInventoryPosition() {
- resumeBreakPointManager.resumeInventoryPosition(inventoryPosition);
-
assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(),
is(4));
+ resumeBreakPointManager.resumeIncrementalPosition("");
+
assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(),
is(0));
+ resumeBreakPointManager.resumeIncrementalPosition(incrementalPosition);
+
assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(),
is(2));
}
@Test
public void assertGetIncrementalPositionData() {
- resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0",
new BasePositionManager<>(new BinlogPosition("mysql-bin.000001", 4L)));
- resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1",
new BasePositionManager<>(new BinlogPosition("mysql-bin.000002", 4L)));
+ resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0",
new BasePositionManager<>(new PlaceholderIncrementalPosition()));
+ resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1",
new BasePositionManager<>(new PlaceholderIncrementalPosition()));
assertThat(resumeBreakPointManager.getIncrementalPositionData(),
is(incrementalPosition));
}
@@ -82,22 +87,39 @@ public final class AbstractResumeBreakPointManagerTest {
@Test
public void assertPlaceholderPositionJson() {
-
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0",
new InventoryPositionManager<>(new PlaceholderPosition()));
+
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0",
new InventoryPositionManager<>(new PlaceholderInventoryPosition()));
assertThat(resumeBreakPointManager.getInventoryPositionData(),
is("{\"unfinished\":{\"ds0.t_order_1#0\":[]},\"finished\":[]}"));
+ assertThat(new PlaceholderInventoryPosition().toJson().toString(),
is("[]"));
}
@Test
public void assertFinishedPositionJson() {
-
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0",
new InventoryPositionManager<>(new FinishedPosition()));
+
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0",
new InventoryPositionManager<>(new FinishedInventoryPosition()));
assertThat(resumeBreakPointManager.getInventoryPositionData(),
is("{\"unfinished\":{},\"finished\":[\"ds0.t_order_1#0\"]}"));
+ assertThat(new FinishedInventoryPosition().toJson().toString(),
is("{}"));
}
@Test
public void assertGetInventoryPositionData() {
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0",
new InventoryPositionManager<>(new PrimaryKeyPosition(0L, 100L)));
-
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1",
new InventoryPositionManager<>(new FinishedPosition()));
-
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2",
new InventoryPositionManager<>(new PlaceholderPosition()));
+
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1",
new InventoryPositionManager<>(new FinishedInventoryPosition()));
+
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2",
new InventoryPositionManager<>(new PlaceholderInventoryPosition()));
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0",
new InventoryPositionManager<>(new PrimaryKeyPosition(0L, 200L)));
assertThat(resumeBreakPointManager.getInventoryPositionData(),
is(inventoryPosition));
}
+
+ @Test
+ public void assertGetDatabaseType() {
+ assertThat(resumeBreakPointManager.getDatabaseType(), is("H2"));
+ }
+
+ @Test
+ public void assertGetTaskPath() {
+ assertThat(resumeBreakPointManager.getTaskPath(), is("/"));
+ }
+
+ @After
+ public void tearDown() {
+ resumeBreakPointManager.close();
+ }
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
index 5dda1d3..baba817 100755
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
@@ -42,6 +42,18 @@ public final class MySQLJdbcDumper extends
AbstractJDBCDumper {
jdbcDataSourceConfiguration.setJdbcUrl(fixMySQLUrl(jdbcDataSourceConfiguration.getJdbcUrl()));
}
+ private String fixMySQLUrl(final String url) {
+ JdbcUri uri = new JdbcUri(url);
+ return String.format("jdbc:%s://%s/%s?%s", uri.getScheme(),
uri.getHost(), uri.getDatabase(), fixMySQLParams(uri.getParameters()));
+ }
+
+ private String fixMySQLParams(final Map<String, String> parameters) {
+ if (!parameters.containsKey("yearIsDateType")) {
+ parameters.put("yearIsDateType", "false");
+ }
+ return formatMySQLParams(parameters);
+ }
+
private String formatMySQLParams(final Map<String, String> params) {
StringBuilder result = new StringBuilder();
for (Entry<String, String> entry : params.entrySet()) {
@@ -55,18 +67,6 @@ public final class MySQLJdbcDumper extends
AbstractJDBCDumper {
return result.toString();
}
- private String fixMySQLUrl(final String url) {
- JdbcUri uri = new JdbcUri(url);
- return String.format("jdbc:%s://%s/%s?%s", uri.getScheme(),
uri.getHost(), uri.getDatabase(), fixMySQLParams(uri.getParameters()));
- }
-
- private String fixMySQLParams(final Map<String, String> parameters) {
- if (!parameters.containsKey("yearIsDateType")) {
- parameters.put("yearIsDateType", "false");
- }
- return formatMySQLParams(parameters);
- }
-
@Override
public Object readValue(final ResultSet resultSet, final int index) throws
SQLException {
if (isDateTimeValue(resultSet.getMetaData().getColumnType(index))) {
@@ -83,7 +83,7 @@ public final class MySQLJdbcDumper extends AbstractJDBCDumper
{
@Override
protected PreparedStatement createPreparedStatement(final Connection conn,
final String sql) throws SQLException {
PreparedStatement result = conn.prepareStatement(sql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- result.setFetchSize(Integer.MIN_VALUE);
+ result.setFetchSize(100);
return result;
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/BinlogPositionTest.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/BinlogPositionTest.java
index b4a118c..5123396 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/BinlogPositionTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/BinlogPositionTest.java
@@ -37,5 +37,12 @@ public final class BinlogPositionTest {
assertThat(binlogPosition.compareTo(new BinlogPosition(fileName, 10)),
is(0));
assertThat(binlogPosition.compareTo(new BinlogPosition(fileName, 9)),
is(1));
assertThat(binlogPosition.compareTo(new BinlogPosition(fileName, 11)),
is(-1));
+ assertThat(binlogPosition.compareTo(null), is(1));
+ }
+
+ @Test
+ public void assertToJson() {
+ BinlogPosition binlogPosition = new BinlogPosition("mysql-bin.000001",
4);
+ assertThat(binlogPosition.toJson().toString(),
is("{\"filename\":\"mysql-bin.000001\",\"position\":4}"));
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumperTest.java
similarity index 67%
rename from
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
rename to
shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumperTest.java
index 212f168..47570e4 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumperTest.java
@@ -15,23 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.job.position;
+package org.apache.shardingsphere.scaling.mysql;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
+import org.junit.Test;
-/**
- * Finished position.
- */
-public final class FinishedPosition implements InventoryPosition {
+public final class MySQLBinlogDumperTest {
- @Override
- public JsonElement toJson() {
- return new JsonObject();
- }
+ @Test
+ public void assertStart() {
- @Override
- public int compareTo(final Position o) {
- return 0;
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
index ddd4d01..b417dfe 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
@@ -82,9 +82,9 @@ public final class MySQLDataSourceCheckerTest {
dataSourceChecker.checkPrivilege(dataSources);
verify(preparedStatement, Mockito.times(1)).executeQuery();
}
-
+
@Test
- public void assertCheckPrivilegeFailure() throws SQLException {
+ public void assertCheckPrivilegeLackPrivileges() throws SQLException {
when(resultSet.next()).thenReturn(false);
try {
dataSourceChecker.checkPrivilege(dataSources);
@@ -94,6 +94,16 @@ public final class MySQLDataSourceCheckerTest {
}
@Test
+ public void assertCheckPrivilegeFailure() throws SQLException {
+ when(resultSet.next()).thenThrow(new SQLException());
+ try {
+ dataSourceChecker.checkPrivilege(dataSources);
+ } catch (final PrepareFailedException ex) {
+ assertThat(ex.getMessage(), is("Source datasource check privileges
failed."));
+ }
+ }
+
+ @Test
public void assertCheckVariableSuccess() throws SQLException {
when(resultSet.next()).thenReturn(true, true);
when(resultSet.getString(2)).thenReturn("ON", "ROW");
@@ -102,14 +112,23 @@ public final class MySQLDataSourceCheckerTest {
}
@Test
- public void assertCheckVariableFailure() throws SQLException {
+ public void assertCheckVariableWithWrongVariable() throws SQLException {
when(resultSet.next()).thenReturn(true, true);
when(resultSet.getString(2)).thenReturn("OFF", "ROW");
try {
dataSourceChecker.checkVariable(dataSources);
- } catch (final PrepareFailedException checkFailedEx) {
- assertThat(checkFailedEx.getMessage(), is("Source datasource
required LOG_BIN = ON, now is OFF"));
+ } catch (final PrepareFailedException ex) {
+ assertThat(ex.getMessage(), is("Source datasource required LOG_BIN
= ON, now is OFF"));
}
}
+ @Test
+ public void assertCheckVariableFailure() throws SQLException {
+ when(resultSet.next()).thenThrow(new SQLException());
+ try {
+ dataSourceChecker.checkVariable(dataSources);
+ } catch (final PrepareFailedException ex) {
+ assertThat(ex.getMessage(), is("Source datasource check variables
failed."));
+ }
+ }
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
new file mode 100644
index 0000000..b048129
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.mysql;
+
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MySQLImporterTest {
+
+ @Mock
+ private ImporterConfiguration importerConfiguration;
+
+ @Mock
+ private DataSourceManager dataSourceManager;
+
+ @Test
+ public void assertCreateSqlBuilder() {
+ MySQLImporter mySQLImporter = new MySQLImporter(importerConfiguration,
dataSourceManager);
+ String insertSQL =
mySQLImporter.createSqlBuilder().buildInsertSQL(mockDataRecord());
+ assertThat(insertSQL, is("INSERT INTO `t_order`(`id`,`name`)
VALUES(?,?)"));
+ }
+
+ private DataRecord mockDataRecord() {
+ DataRecord result = new DataRecord(new BinlogPosition("binlog-000001",
4), 2);
+ result.setTableName("t_order");
+ result.addColumn(new Column("id", 1, true, true));
+ result.addColumn(new Column("name", "", true, false));
+ return result;
+ }
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
new file mode 100644
index 0000000..506170e
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumperTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.mysql;
+
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import
org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
+import
org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class MySQLJdbcDumperTest {
+
+ private DataSourceManager dataSourceManager;
+
+ private MySQLJdbcDumper mySQLJdbcDumper;
+
+ @Before
+ public void setUp() {
+ dataSourceManager = new DataSourceManager();
+ mySQLJdbcDumper = new
MySQLJdbcDumper(mockInventoryDumperConfiguration(), dataSourceManager);
+ }
+
+ private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
+ DumperConfiguration dumperConfiguration = mockDumperConfiguration();
+ initTableData(dumperConfiguration);
+ InventoryDumperConfiguration result = new
InventoryDumperConfiguration(dumperConfiguration);
+ result.setTableName("t_order");
+ return result;
+ }
+
+ private DumperConfiguration mockDumperConfiguration() {
+ DumperConfiguration dumperConfiguration = new DumperConfiguration();
+ dumperConfiguration.setDataSourceConfiguration(
+ new
JDBCDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
"root", "root"));
+ return dumperConfiguration;
+ }
+
+ @SneakyThrows(SQLException.class)
+ private void initTableData(final DumperConfiguration dumperConfig) {
+ DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
+ try (Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("DROP TABLE IF EXISTS t_order");
+ statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY,
user_id VARCHAR(12))");
+ statement.execute("INSERT INTO t_order (id, user_id) VALUES (1,
'xxx'), (999, 'yyy')");
+ }
+ }
+
+ @Test
+ @SneakyThrows(SQLException.class)
+ public void assertReadValue() {
+ ResultSet resultSet = mock(ResultSet.class);
+ ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
+ when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+ when(resultSetMetaData.getColumnType(1)).thenReturn(Types.TIMESTAMP);
+ when(resultSetMetaData.getColumnType(2)).thenReturn(Types.VARCHAR);
+ mySQLJdbcDumper.readValue(resultSet, 1);
+ mySQLJdbcDumper.readValue(resultSet, 2);
+ verify(resultSet).getString(1);
+ verify(resultSet).getObject(2);
+ }
+
+ @Test
+ @SneakyThrows(SQLException.class)
+ public void assertCreatePreparedStatement() {
+ DataSource dataSource =
dataSourceManager.getDataSource(mockDumperConfiguration().getDataSourceConfiguration());
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
mySQLJdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
+ assertThat(preparedStatement.getFetchSize(), is(100));
+ }
+ }
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
index 9ce18cc..d0bfcca 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.mysql;
+import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +52,8 @@ public final class MySQLPositionManagerTest {
private Connection connection;
@Before
- public void setUp() throws Exception {
+ @SneakyThrows(SQLException.class)
+ public void setUp() {
when(dataSource.getConnection()).thenReturn(connection);
PreparedStatement positionStatement = mockPositionStatement();
when(connection.prepareStatement("SHOW MASTER
STATUS")).thenReturn(positionStatement);
@@ -69,6 +71,14 @@ public final class MySQLPositionManagerTest {
}
@Test
+ public void assertInitPositionByJson() {
+ MySQLPositionManager mysqlPositionManager = new
MySQLPositionManager(new BinlogPosition(LOG_FILE_NAME,
LOG_POSITION).toJson().toString());
+ BinlogPosition actual = mysqlPositionManager.getPosition();
+ assertThat(actual.getFilename(), is(LOG_FILE_NAME));
+ assertThat(actual.getPosition(), is(LOG_POSITION));
+ }
+
+ @Test
public void assertUpdateCurrentPosition() {
MySQLPositionManager mysqlPositionManager = new
MySQLPositionManager(dataSource);
BinlogPosition expected = new BinlogPosition(LOG_FILE_NAME,
LOG_POSITION, SERVER_ID);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
index 3d9d525..d5023bf 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
@@ -47,7 +47,7 @@ public final class PostgreSQLPositionManager extends
BasePositionManager<WalPosi
}
public PostgreSQLPositionManager(final String position) {
- setPosition(new WalPosition(LogSequenceNumber.valueOf(position)));
+ setPosition(new
WalPosition(LogSequenceNumber.valueOf(Long.parseLong(position))));
}
@Override
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index 09840e1..8965bc5 100755
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
@@ -80,7 +81,7 @@ public final class PostgreSQLWalDumper extends
AbstractShardingScalingExecutor<W
while (isRunning()) {
ByteBuffer msg = stream.readPending();
if (msg == null) {
- sleep();
+ ThreadUtil.sleep(10L);
continue;
}
AbstractWalEvent event = decodingPlugin.decode(msg,
stream.getLastReceiveLSN());
@@ -91,13 +92,6 @@ public final class PostgreSQLWalDumper extends
AbstractShardingScalingExecutor<W
}
}
- private void sleep() {
- try {
- Thread.sleep(10L);
- } catch (final InterruptedException ignored) {
- }
- }
-
private void pushRecord(final Record record) {
try {
channel.pushRecord(record);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
index f74c487..111b1c1 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql;
+import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.junit.Before;
import org.junit.Test;
@@ -54,7 +55,8 @@ public final class PostgreSQLPositionManagerTest {
private DatabaseMetaData databaseMetaData;
@Before
- public void setUp() throws Exception {
+ @SneakyThrows(SQLException.class)
+ public void setUp() {
when(dataSource.getConnection()).thenReturn(connection);
when(connection.getMetaData()).thenReturn(databaseMetaData);
PreparedStatement postgreSQL96LsnPs = mockPostgreSQL96Lsn();
@@ -66,7 +68,15 @@ public final class PostgreSQLPositionManagerTest {
}
@Test
- public void assertGetCurrentPositionOnPostgreSQL96() throws SQLException {
+ public void assertInitPositionByJson() {
+ PostgreSQLPositionManager postgreSQLPositionManager = new
PostgreSQLPositionManager("100");
+ WalPosition actual = postgreSQLPositionManager.getPosition();
+ assertThat(actual.getLogSequenceNumber().asLong(),
is(LogSequenceNumber.valueOf(100L).asLong()));
+ }
+
+ @Test
+ @SneakyThrows(SQLException.class)
+ public void assertGetCurrentPositionOnPostgreSQL96() {
PostgreSQLPositionManager postgreSQLPositionManager = new
PostgreSQLPositionManager(dataSource);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
@@ -75,7 +85,8 @@ public final class PostgreSQLPositionManagerTest {
}
@Test
- public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
+ @SneakyThrows(SQLException.class)
+ public void assertGetCurrentPositionOnPostgreSQL10() {
PostgreSQLPositionManager postgreSQLPositionManager = new
PostgreSQLPositionManager(dataSource);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
WalPosition actual = postgreSQLPositionManager.getPosition();
@@ -83,7 +94,8 @@ public final class PostgreSQLPositionManagerTest {
}
@Test(expected = RuntimeException.class)
- public void assertGetCurrentPositionThrowException() throws SQLException {
+ @SneakyThrows(SQLException.class)
+ public void assertGetCurrentPositionThrowException() {
PostgreSQLPositionManager postgreSQLPositionManager = new
PostgreSQLPositionManager(dataSource);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
@@ -98,7 +110,8 @@ public final class PostgreSQLPositionManagerTest {
assertThat(postgreSQLPositionManager.getPosition(), is(expected));
}
- private PreparedStatement mockPostgreSQL96Lsn() throws SQLException {
+ @SneakyThrows(SQLException.class)
+ private PreparedStatement mockPostgreSQL96Lsn() {
PreparedStatement result = mock(PreparedStatement.class);
ResultSet resultSet = mock(ResultSet.class);
when(result.executeQuery()).thenReturn(resultSet);
@@ -107,7 +120,8 @@ public final class PostgreSQLPositionManagerTest {
return result;
}
- private PreparedStatement mockPostgreSQL10Lsn() throws SQLException {
+ @SneakyThrows(SQLException.class)
+ private PreparedStatement mockPostgreSQL10Lsn() {
PreparedStatement result = mock(PreparedStatement.class);
ResultSet resultSet = mock(ResultSet.class);
when(result.executeQuery()).thenReturn(resultSet);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
new file mode 100644
index 0000000..c13ace9
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql;
+
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.junit.Test;
+import org.postgresql.replication.LogSequenceNumber;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class PostgreSQLSqlBuilderTest {
+
+ @Test
+ public void assertBuildInsertSQL() {
+ String actual = new
PostgreSQLSqlBuilder().buildInsertSQL(mockDataRecord());
+ assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\")
VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+ }
+
+ private DataRecord mockDataRecord() {
+ DataRecord result = new DataRecord(new
WalPosition(LogSequenceNumber.valueOf(100L)), 2);
+ result.setTableName("t_order");
+ result.addColumn(new Column("id", 1, true, true));
+ result.addColumn(new Column("name", "", true, false));
+ return result;
+ }
+}