chengjianyun commented on a change in pull request #4079:
URL: https://github.com/apache/iotdb/pull/4079#discussion_r733288235
##########
File path: .github/workflows/e2e.yml
##########
@@ -16,6 +16,8 @@ on:
- master
- 'rel/*'
- cluster_new
+ #remove me when cluster- branch is merged
Review comment:
ditto
##########
File path: .github/workflows/sonar-coveralls.yml
##########
@@ -15,6 +15,8 @@ on:
- master
- "rel/*"
- cluster_new
+ #remove me when cluster- branch is merged
Review comment:
ditto
##########
File path: .github/workflows/main-unix.yml
##########
@@ -16,6 +16,8 @@ on:
- master
- 'rel/*'
- cluster_new
+ #remove me when cluster- branch is merged
Review comment:
ditto
##########
File path: .github/workflows/main-win.yml
##########
@@ -15,6 +15,8 @@ on:
- master
- 'rel/*'
- cluster_new
+ #remove me when cluster- branch is merged
Review comment:
ditto
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public interface ClusterIoTDBMBean {
+ /** @return true only if the log degree is DEBUG and the report is enabled */
+ boolean startRaftInfoReport();
+
Review comment:
The interface is registered into JMX framework. User can access the
method via some JMX console.
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
+ String.format(
+ "%s:%s=%s", "org.apache.iotdb.cluster.service",
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+ // TODO fix me: better to throw exception if the client can not be get. Then
we can remove this
+ // field.
+ public static boolean printClientConnectionErrorStack = false;
+
+ // establish the cluster as a seed
+ private static final String MODE_START = "-s";
+ // join an established cluster
+ private static final String MODE_ADD = "-a";
+ // send a request to remove a node, more arguments: ip-of-removed-node
+ // metaport-of-removed-node
+ private static final String MODE_REMOVE = "-r";
+
+ private MetaGroupMember metaGroupEngine;
Review comment:
Track the work by issue: https://github.com/apache/iotdb/issues/3881
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBMBean.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public interface ClusterIoTDBMBean {
+ /** @return true only if the log degree is DEBUG and the report is enabled */
+ boolean startRaftInfoReport();
+
Review comment:
Fixed.
##########
File path:
cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class ClientPoolFactoryTest {
+ private ClusterConfig clusterConfig =
ClusterDescriptor.getInstance().getConfig();
+
+ private long mockMaxWaitTimeoutMs = 10 * 1000L;
+ private int mockMaxClientPerMember = 10;
+
+ private int maxClientPerNodePerMember =
clusterConfig.getMaxClientPerNodePerMember();
+ private long waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+
+ private ClientPoolFactory clientPoolFactory;
+ private MockClientManager mockClientManager;
+
+ @Before
+ public void setUp() {
+ clusterConfig.setMaxClientPerNodePerMember(mockMaxClientPerMember);
+ clusterConfig.setWaitClientTimeoutMS(mockMaxWaitTimeoutMs);
+ clientPoolFactory = new ClientPoolFactory();
+ mockClientManager =
+ new MockClientManager() {
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory
category) {
+ assert (client == asyncClient);
Review comment:
Done.
##########
File path:
cluster/src/test/java/org/apache/iotdb/cluster/client/ClientManagerTest.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ClientManagerTest extends BaseClientTest {
+
+ @Before
+ public void setUp() throws IOException {
+ startDataServer();
+ startMetaServer();
+ startDataHeartbeatServer();
+ startMetaHeartbeatServer();
+ }
+
+ @After
+ public void tearDown() throws IOException, InterruptedException {
+ stopDataServer();
+ stopMetaServer();
+ stopDataHeartbeatServer();
+ stopMetaHeartbeatServer();
+ }
+
+ @Test
+ public void syncClientManagersTest() throws Exception {
+ // ---------Sync cluster clients manager test------------
+ ClientManager clusterManager =
+ new ClientManager(false, ClientManager.Type.RequestForwardClient);
+ RaftService.Client syncClusterClient =
+ clusterManager.borrowSyncClient(defaultNode, ClientCategory.DATA);
+
+ Assert.assertNotNull(syncClusterClient);
+ Assert.assertTrue(syncClusterClient instanceof SyncDataClient);
+ Assert.assertEquals(((SyncDataClient) syncClusterClient).getNode(),
defaultNode);
+
Assert.assertTrue(syncClusterClient.getInputProtocol().getTransport().isOpen());
+ ((SyncDataClient) syncClusterClient).returnSelf();
+
+ // cluster test
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode,
ClientCategory.META));
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode,
ClientCategory.META_HEARTBEAT));
+
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode,
ClientCategory.META));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode,
ClientCategory.META_HEARTBEAT));
+
+ // ---------Sync meta(meta heartbeat) clients manager test------------
+ ClientManager metaManager = new ClientManager(false,
ClientManager.Type.MetaGroupClient);
+ RaftService.Client metaClient = metaManager.borrowSyncClient(defaultNode,
ClientCategory.META);
+ Assert.assertNotNull(metaClient);
+ Assert.assertTrue(metaClient instanceof SyncMetaClient);
+ Assert.assertEquals(((SyncMetaClient) metaClient).getNode(), defaultNode);
+ Assert.assertTrue(metaClient.getInputProtocol().getTransport().isOpen());
+ ((SyncMetaClient) metaClient).returnSelf();
+
+ RaftService.Client metaHeartClient =
+ metaManager.borrowSyncClient(defaultNode,
ClientCategory.META_HEARTBEAT);
+ Assert.assertNotNull(metaHeartClient);
+ Assert.assertTrue(metaHeartClient instanceof SyncMetaClient);
+ Assert.assertEquals(((SyncMetaClient) metaHeartClient).getNode(),
defaultNode);
+
Assert.assertTrue(metaHeartClient.getInputProtocol().getTransport().isOpen());
+ ((SyncMetaClient) metaHeartClient).returnSelf();
+
+ // cluster test
+ Assert.assertNull(metaManager.borrowSyncClient(defaultNode,
ClientCategory.DATA));
+ Assert.assertNull(metaManager.borrowSyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT));
+
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA));
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode,
ClientCategory.META));
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode,
ClientCategory.META_HEARTBEAT));
+
+ // ---------Sync data(data heartbeat) clients manager test------------
+ ClientManager dataManager = new ClientManager(false,
ClientManager.Type.DataGroupClient);
+
+ RaftService.Client dataClient = dataManager.borrowSyncClient(defaultNode,
ClientCategory.DATA);
+ Assert.assertNotNull(dataClient);
+ Assert.assertTrue(dataClient instanceof SyncDataClient);
+ Assert.assertEquals(((SyncDataClient) dataClient).getNode(), defaultNode);
+ Assert.assertTrue(dataClient.getInputProtocol().getTransport().isOpen());
+ ((SyncDataClient) dataClient).returnSelf();
+
+ RaftService.Client dataHeartClient =
+ dataManager.borrowSyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT);
+ Assert.assertNotNull(dataHeartClient);
+ Assert.assertTrue(dataHeartClient instanceof SyncDataClient);
+ Assert.assertEquals(((SyncDataClient) dataHeartClient).getNode(),
defaultNode);
+
Assert.assertTrue(dataHeartClient.getInputProtocol().getTransport().isOpen());
+ ((SyncDataClient) dataHeartClient).returnSelf();
+
+ // cluster test
+ Assert.assertNull(dataManager.borrowSyncClient(defaultNode,
ClientCategory.META));
+ Assert.assertNull(dataManager.borrowSyncClient(defaultNode,
ClientCategory.META_HEARTBEAT));
+
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA));
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode,
ClientCategory.META));
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode,
ClientCategory.META_HEARTBEAT));
+ }
+
+ @Test
+ public void asyncClientManagersTest() throws Exception {
+ // ---------async cluster clients manager test------------
+ ClientManager clusterManager = new ClientManager(true,
ClientManager.Type.RequestForwardClient);
+ RaftService.AsyncClient clusterClient =
+ clusterManager.borrowAsyncClient(defaultNode, ClientCategory.DATA);
+
+ Assert.assertNotNull(clusterClient);
+ Assert.assertTrue(clusterClient instanceof AsyncDataClient);
+ Assert.assertEquals(((AsyncDataClient) clusterClient).getNode(),
defaultNode);
+ Assert.assertTrue(((AsyncDataClient) clusterClient).isValid());
+ Assert.assertTrue(((AsyncDataClient) clusterClient).isReady());
Review comment:
Sure, let me try to do it.
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
+ String.format(
+ "%s:%s=%s", "org.apache.iotdb.cluster.service",
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+ // TODO fix me: better to throw exception if the client can not be get. Then
we can remove this
+ // field.
+ public static boolean printClientConnectionErrorStack = false;
+
+ // establish the cluster as a seed
+ private static final String MODE_START = "-s";
+ // join an established cluster
+ private static final String MODE_ADD = "-a";
+ // send a request to remove a node, more arguments: ip-of-removed-node
+ // metaport-of-removed-node
+ private static final String MODE_REMOVE = "-r";
+
+ private MetaGroupMember metaGroupEngine;
+
+ // split DataGroupServiceImpls into engine and impls
+ private DataGroupEngine dataGroupEngine;
+
+ private Node thisNode;
+ private Coordinator coordinator;
+
+ private final IoTDB iotdb = IoTDB.getInstance();
+
+ // Cluster IoTDB uses a individual registerManager with its parent.
+ private RegisterManager registerManager = new RegisterManager();
+
+ /**
+ * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread"
will print the status
+ * of all raft members in this node
+ */
+ private ScheduledExecutorService reportThread;
+
+ private boolean allowReport = true;
+
+ /** hardLinkCleaner will periodically clean expired hardlinks created during
snapshots */
+ private ScheduledExecutorService hardLinkCleanerThread;
+
+ // currently, clientManager is only used for those instances who do not
belong to any
+ // DataGroup..
+ private IClientManager clientManager;
+
+ private ClusterIoTDB() {
+ // we do not init anything here, so that we can re-initialize the instance
in IT.
+ }
+
+ public void initLocalEngines() {
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ thisNode = new Node();
+ // set internal rpc ip and ports
+ thisNode.setInternalIp(config.getInternalIp());
+ thisNode.setMetaPort(config.getInternalMetaPort());
+ thisNode.setDataPort(config.getInternalDataPort());
+ // set client rpc ip and ports
+ thisNode.setClientPort(config.getClusterRpcPort());
+
thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
+ coordinator = new Coordinator();
+ // local engine
+ TProtocolFactory protocolFactory =
+ ThriftServiceThread.getProtocolFactory(
+
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+ metaGroupEngine = new MetaGroupMember(protocolFactory, thisNode,
coordinator);
+ IoTDB.setClusterMode();
+ IoTDB.setMetaManager(CMManager.getInstance());
+ ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
+ ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
+ MetaPuller.getInstance().init(metaGroupEngine);
+
+ // from the scope of the DataGroupEngine,it should be singleton pattern
+ // the way of setting MetaGroupMember in DataGroupEngine may need a better
modification in
+ // future commit.
+ DataGroupEngine.setProtocolFactory(protocolFactory);
+ DataGroupEngine.setMetaGroupMember(metaGroupEngine);
+ dataGroupEngine = DataGroupEngine.getInstance();
+ clientManager =
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient);
+ initTasks();
+ try {
+ // we need to check config after initLocalEngines.
+ startServerCheck();
+ } catch (StartupException e) {
+ logger.error("Failed to check cluster config.", e);
+ stop();
Review comment:
Fixed.
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
+ String.format(
+ "%s:%s=%s", "org.apache.iotdb.cluster.service",
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+ // TODO fix me: better to throw exception if the client can not be get. Then
we can remove this
+ // field.
+ public static boolean printClientConnectionErrorStack = false;
+
+ // establish the cluster as a seed
+ private static final String MODE_START = "-s";
+ // join an established cluster
+ private static final String MODE_ADD = "-a";
+ // send a request to remove a node, more arguments: ip-of-removed-node
+ // metaport-of-removed-node
+ private static final String MODE_REMOVE = "-r";
+
+ private MetaGroupMember metaGroupEngine;
+
+ // split DataGroupServiceImpls into engine and impls
+ private DataGroupEngine dataGroupEngine;
+
+ private Node thisNode;
+ private Coordinator coordinator;
+
+ private final IoTDB iotdb = IoTDB.getInstance();
+
+ // Cluster IoTDB uses a individual registerManager with its parent.
+ private RegisterManager registerManager = new RegisterManager();
+
+ /**
+ * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread"
will print the status
+ * of all raft members in this node
+ */
+ private ScheduledExecutorService reportThread;
+
+ private boolean allowReport = true;
+
+ /** hardLinkCleaner will periodically clean expired hardlinks created during
snapshots */
+ private ScheduledExecutorService hardLinkCleanerThread;
+
+ // currently, clientManager is only used for those instances who do not
belong to any
+ // DataGroup..
+ private IClientManager clientManager;
+
+ private ClusterIoTDB() {
+ // we do not init anything here, so that we can re-initialize the instance
in IT.
+ }
+
+ public void initLocalEngines() {
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ thisNode = new Node();
+ // set internal rpc ip and ports
+ thisNode.setInternalIp(config.getInternalIp());
+ thisNode.setMetaPort(config.getInternalMetaPort());
+ thisNode.setDataPort(config.getInternalDataPort());
+ // set client rpc ip and ports
+ thisNode.setClientPort(config.getClusterRpcPort());
+
thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
+ coordinator = new Coordinator();
+ // local engine
+ TProtocolFactory protocolFactory =
+ ThriftServiceThread.getProtocolFactory(
+
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+ metaGroupEngine = new MetaGroupMember(protocolFactory, thisNode,
coordinator);
+ IoTDB.setClusterMode();
+ IoTDB.setMetaManager(CMManager.getInstance());
+ ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
+ ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
+ MetaPuller.getInstance().init(metaGroupEngine);
+
+ // from the scope of the DataGroupEngine,it should be singleton pattern
+ // the way of setting MetaGroupMember in DataGroupEngine may need a better
modification in
+ // future commit.
+ DataGroupEngine.setProtocolFactory(protocolFactory);
+ DataGroupEngine.setMetaGroupMember(metaGroupEngine);
+ dataGroupEngine = DataGroupEngine.getInstance();
+ clientManager =
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient);
+ initTasks();
+ try {
+ // we need to check config after initLocalEngines.
+ startServerCheck();
+ } catch (StartupException e) {
+ logger.error("Failed to check cluster config.", e);
+ stop();
+ }
+ JMXService.registerMBean(metaGroupEngine, metaGroupEngine.getMBeanName());
+ }
+
+ private void initTasks() {
+ reportThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
+ reportThread.scheduleAtFixedRate(
+ this::generateNodeReport,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ TimeUnit.SECONDS);
+ hardLinkCleanerThread =
+
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
+ hardLinkCleanerThread.scheduleAtFixedRate(
+ new HardLinkCleaner(),
+ ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
+ ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
+ TimeUnit.SECONDS);
+ }
+
+ /**
+ * Generate a report containing the status of both MetaGroupMember and
DataGroupMembers of this
+ * node. This will help to see if the node is in a consistent and right
state during debugging.
+ */
+ private void generateNodeReport() {
+ if (logger.isDebugEnabled() && allowReport) {
+ try {
+ NodeReport report = new NodeReport(thisNode);
+ report.setMetaMemberReport(metaGroupEngine.genMemberReport());
+ report.setDataMemberReportList(dataGroupEngine.genMemberReports());
+ logger.debug(report.toString());
+ } catch (Exception e) {
+ logger.error("exception occurred when generating node report", e);
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ logger.error(
+ "Usage: <-s|-a|-r> "
+ + "[-D{} <configure folder>] \n"
+ + "-s: start the node as a seed\n"
+ + "-a: start the node as a new node\n"
+ + "-r: remove the node out of the cluster\n",
+ IoTDBConstant.IOTDB_CONF);
+ return;
+ }
+
+ ClusterIoTDB cluster = ClusterIoTDBHolder.INSTANCE;
+ // check config of iotdb,and set some configs in cluster mode
+ try {
+ if (!cluster.serverCheckAndInit()) {
+ return;
+ }
+ } catch (ConfigurationException | IOException e) {
+ logger.error("meet error when doing start checking", e);
+ return;
+ }
+ String mode = args[0];
+ logger.info("Running mode {}", mode);
+
+ // initialize the current node and its services
+ cluster.initLocalEngines();
+
+ // we start IoTDB kernel first. then we start the cluster module.
+ if (MODE_START.equals(mode)) {
+ cluster.activeStartNodeMode();
+ } else if (MODE_ADD.equals(mode)) {
+ cluster.activeAddNodeMode();
+ } else if (MODE_REMOVE.equals(mode)) {
+ try {
+ cluster.doRemoveNode(args);
+ } catch (IOException e) {
+ logger.error("Fail to remove node in cluster", e);
+ }
+ } else {
+ logger.error("Unrecognized mode {}", mode);
+ }
+ }
+
+ private boolean serverCheckAndInit() throws ConfigurationException,
IOException {
+ IoTDBConfigCheck.getInstance().checkConfig();
+ // init server's configuration first, because the cluster configuration
may read settings from
+ // the server's configuration.
+ IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false);
+ // auto create schema is took over by cluster module, so we disable it in
the server module.
+
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+ // check cluster config
+ String checkResult = clusterConfigCheck();
+ if (checkResult != null) {
+ logger.error(checkResult);
+ return false;
+ }
+ return true;
+ }
+
+ private String clusterConfigCheck() {
+ try {
+ ClusterDescriptor.getInstance().replaceHostnameWithIp();
+ } catch (Exception e) {
+ return String.format("replace hostname with ip failed, %s",
e.getMessage());
+ }
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ // check the initial replicateNum and refuse to start when the
replicateNum <= 0
+ if (config.getReplicationNum() <= 0) {
+ return String.format(
+ "ReplicateNum should be greater than 0 instead of %d.",
config.getReplicationNum());
+ }
+ // check the initial cluster size and refuse to start when the size <
quorum
+ int quorum = config.getReplicationNum() / 2 + 1;
+ if (config.getSeedNodeUrls().size() < quorum) {
+ return String.format(
+ "Seed number less than quorum, seed number: %s, quorum: " + "%s.",
+ config.getSeedNodeUrls().size(), quorum);
+ }
+ // TODO duplicate code,consider to solve it later
+ Set<Node> seedNodes = new HashSet<>();
+ for (String url : config.getSeedNodeUrls()) {
+ Node node = ClusterUtils.parseNode(url);
+ if (seedNodes.contains(node)) {
+ return String.format(
+ "SeedNodes must not repeat each other. SeedNodes: %s",
config.getSeedNodeUrls());
+ }
+ seedNodes.add(node);
+ }
+ return null;
+ }
+
+ public void activeStartNodeMode() {
+ try {
+ // start iotdb server first
+ IoTDB.getInstance().active();
+ // some work about cluster
+ preInitCluster();
+ // try to build cluster
+ metaGroupEngine.buildCluster();
+ // register service after cluster build
+ postInitCluster();
+ // init ServiceImpl to handle request of client
+ startClientRPC();
+ } catch (StartupException
+ | StartUpCheckFailureException
+ | ConfigInconsistentException
+ | QueryProcessException e) {
+ logger.error("Fail to start server", e);
+ stop();
+ }
+ }
+
+ private void preInitCluster() throws StartupException {
+ stopRaftInfoReport();
+ JMXService.registerMBean(this, mbeanName);
+ // register MetaGroupMember. MetaGroupMember has the same position with
"StorageEngine" in the
Review comment:
In my opinion, the interfaces in ClusterIoTDBMBean only provides user a
chance to look into system status. It can be enable by interface provided by
JMX. Is it right? @jixuan1989
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
+ String.format(
+ "%s:%s=%s", "org.apache.iotdb.cluster.service",
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+ // TODO fix me: better to throw exception if the client can not be get. Then
we can remove this
+ // field.
+ public static boolean printClientConnectionErrorStack = false;
+
+ // establish the cluster as a seed
+ private static final String MODE_START = "-s";
+ // join an established cluster
+ private static final String MODE_ADD = "-a";
+ // send a request to remove a node, more arguments: ip-of-removed-node
+ // metaport-of-removed-node
+ private static final String MODE_REMOVE = "-r";
+
+ private MetaGroupMember metaGroupEngine;
+
+ // split DataGroupServiceImpls into engine and impls
+ private DataGroupEngine dataGroupEngine;
+
+ private Node thisNode;
+ private Coordinator coordinator;
+
+ private final IoTDB iotdb = IoTDB.getInstance();
+
+ // Cluster IoTDB uses a individual registerManager with its parent.
+ private RegisterManager registerManager = new RegisterManager();
+
+ /**
+ * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread"
will print the status
+ * of all raft members in this node
+ */
+ private ScheduledExecutorService reportThread;
+
+ private boolean allowReport = true;
+
+ /** hardLinkCleaner will periodically clean expired hardlinks created during
snapshots */
+ private ScheduledExecutorService hardLinkCleanerThread;
+
+ // currently, clientManager is only used for those instances who do not
belong to any
+ // DataGroup..
+ private IClientManager clientManager;
+
+ private ClusterIoTDB() {
+ // we do not init anything here, so that we can re-initialize the instance
in IT.
+ }
+
+ public void initLocalEngines() {
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ thisNode = new Node();
+ // set internal rpc ip and ports
+ thisNode.setInternalIp(config.getInternalIp());
+ thisNode.setMetaPort(config.getInternalMetaPort());
+ thisNode.setDataPort(config.getInternalDataPort());
+ // set client rpc ip and ports
+ thisNode.setClientPort(config.getClusterRpcPort());
+
thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
+ coordinator = new Coordinator();
+ // local engine
+ TProtocolFactory protocolFactory =
+ ThriftServiceThread.getProtocolFactory(
+
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+ metaGroupEngine = new MetaGroupMember(protocolFactory, thisNode,
coordinator);
+ IoTDB.setClusterMode();
+ IoTDB.setMetaManager(CMManager.getInstance());
+ ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
+ ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
+ MetaPuller.getInstance().init(metaGroupEngine);
+
+ // from the scope of the DataGroupEngine,it should be singleton pattern
+ // the way of setting MetaGroupMember in DataGroupEngine may need a better
modification in
+ // future commit.
+ DataGroupEngine.setProtocolFactory(protocolFactory);
+ DataGroupEngine.setMetaGroupMember(metaGroupEngine);
+ dataGroupEngine = DataGroupEngine.getInstance();
+ clientManager =
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient);
+ initTasks();
+ try {
+ // we need to check config after initLocalEngines.
+ startServerCheck();
+ } catch (StartupException e) {
+ logger.error("Failed to check cluster config.", e);
+ stop();
Review comment:
Moving `JMXService.registerMBean(metaGroupEngine,
metaGroupEngine.getMBeanName());` into `try` block looks like more clear.
How do you think?
##########
File path:
cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class ClientPoolFactoryTest {
+ private ClusterConfig clusterConfig =
ClusterDescriptor.getInstance().getConfig();
+
+ private long mockMaxWaitTimeoutMs = 10 * 1000L;
+ private int mockMaxClientPerMember = 10;
+
+ private int maxClientPerNodePerMember =
clusterConfig.getMaxClientPerNodePerMember();
+ private long waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+
+ private ClientPoolFactory clientPoolFactory;
+ private MockClientManager mockClientManager;
+
+ @Before
+ public void setUp() {
+ clusterConfig.setMaxClientPerNodePerMember(mockMaxClientPerMember);
+ clusterConfig.setWaitClientTimeoutMS(mockMaxWaitTimeoutMs);
+ clientPoolFactory = new ClientPoolFactory();
+ mockClientManager =
+ new MockClientManager() {
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory
category) {
+ assert (client == asyncClient);
+ }
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {
+ Assert.assertTrue(client == syncClient);
+ }
+ };
+ clientPoolFactory.setClientManager(mockClientManager);
+ }
+
+ @After
+ public void tearDown() {
+ clusterConfig.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
+ clusterConfig.setWaitClientTimeoutMS(waitClientTimeoutMS);
+ }
+
+ @Test
+ public void poolConfigTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+ clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+ Node node = constructDefaultNode();
+
+ for (int i = 0; i < mockMaxClientPerMember; i++) {
+ RaftService.AsyncClient client = pool.borrowObject(node);
+ Assert.assertNotNull(client);
+ }
+
+ long timeStart = System.currentTimeMillis();
+ try {
+ pool.borrowObject(node);
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof NoSuchElementException);
+ } finally {
+ Assert.assertTrue(System.currentTimeMillis() - timeStart + 10 >
mockMaxWaitTimeoutMs);
+ }
+ }
+
+ @Test
+ public void poolRecycleTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+ clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+
+ Node node = constructDefaultNode();
+ List<RaftService.AsyncClient> clientList = new ArrayList<>();
+ for (int i = 0; i < pool.getMaxIdlePerKey(); i++) {
+ RaftService.AsyncClient client = pool.borrowObject(node);
+ Assert.assertNotNull(client);
+ clientList.add(client);
+ }
+
+ for (RaftService.AsyncClient client : clientList) {
+ pool.returnObject(node, client);
+ }
+
+ for (int i = 0; i < pool.getMaxIdlePerKey(); i++) {
+ RaftService.AsyncClient client = pool.borrowObject(node);
+ Assert.assertNotNull(client);
+ Assert.assertTrue(clientList.contains(client));
+ }
+ }
+
+ @Test
+ public void createAsyncDataClientTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+ clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+
+ Assert.assertEquals(pool.getMaxTotalPerKey(), mockMaxClientPerMember);
+ Assert.assertEquals(pool.getMaxWaitDuration(),
Duration.ofMillis(mockMaxWaitTimeoutMs));
+
+ RaftService.AsyncClient asyncClient = null;
+
+ Node node = constructDefaultNode();
+
+ asyncClient = pool.borrowObject(node);
+ mockClientManager.setAsyncClient(asyncClient);
+ Assert.assertNotNull(asyncClient);
+ Assert.assertTrue(asyncClient instanceof AsyncDataClient);
Review comment:
Useless code. Remove it.
##########
File path:
cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
##########
@@ -4,123 +4,107 @@
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient.FactorySync;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-import org.apache.iotdb.rpc.TSocketWrapper;
+import org.apache.iotdb.cluster.client.BaseClientTest;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.net.ServerSocket;
+import java.net.SocketException;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-public class SyncDataClientTest {
+public class SyncDataClientTest extends BaseClientTest {
- @Test
- public void test() throws IOException, InterruptedException {
- Node node = new Node();
-
node.setDataPort(40010).setInternalIp("localhost").setClientIp("localhost");
- ServerSocket serverSocket = new ServerSocket(node.getDataPort());
- Thread listenThread =
- new Thread(
- () -> {
- while (!Thread.interrupted()) {
- try {
- serverSocket.accept();
- } catch (IOException e) {
- return;
- }
- }
- });
- listenThread.start();
+ private TProtocolFactory protocolFactory;
+
+ @Before
+ public void setUp() {
+ protocolFactory =
+
ClusterDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
+ ? new TCompactProtocol.Factory()
+ : new TBinaryProtocol.Factory();
+ }
+ @Test
+ public void testDataClient() throws IOException, InterruptedException,
TTransportException {
try {
- SyncClientPool syncClientPool = new SyncClientPool(new FactorySync(new
Factory()));
- SyncDataClient client;
- client = (SyncDataClient) syncClientPool.getClient(node);
+ startDataServer();
+ SyncDataClient dataClient =
+ new SyncDataClient(protocolFactory, defaultNode,
ClientCategory.DATA);
- assertEquals(node, client.getNode());
+ assertEquals(
+ "SyncDataClient{node=Node(internalIp:localhost, metaPort:9003,
nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0,
clientIp:localhost),port=40010}",
+ dataClient.toString());
- client.setTimeout(1000);
- assertEquals(1000, client.getTimeout());
+ assertCheck(dataClient);
- client.putBack();
- Client newClient = syncClientPool.getClient(node);
- assertEquals(client, newClient);
- assertTrue(client.getInputProtocol().getTransport().isOpen());
+ dataClient =
+ new SyncDataClient.SyncDataClientFactory(protocolFactory,
ClientCategory.DATA)
+ .makeObject(defaultNode)
+ .getObject();
assertEquals(
- "DataClient{node=ClusterNode{ internalIp='localhost', metaPort=0,
nodeIdentifier=0,"
- + " dataPort=40010, clientPort=0, clientIp='localhost'}}",
- client.toString());
-
- client =
- new SyncDataClient(
- new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(),
node.getDataPort())));
- // client without a belong pool will be closed after putBack()
- client.putBack();
- assertFalse(client.getInputProtocol().getTransport().isOpen());
+ "SyncDataClient{node=Node(internalIp:localhost, metaPort:9003,
nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0,
clientIp:localhost),port=40010}",
+ dataClient.toString());
+
+ assertCheck(dataClient);
+ } catch (Exception e) {
+ e.printStackTrace();
Review comment:
added condition check.
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
+ String.format(
+ "%s:%s=%s", "org.apache.iotdb.cluster.service",
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+ // TODO fix me: better to throw exception if the client can not be get. Then
we can remove this
+ // field.
+ public static boolean printClientConnectionErrorStack = false;
+
+ // establish the cluster as a seed
+ private static final String MODE_START = "-s";
+ // join an established cluster
+ private static final String MODE_ADD = "-a";
+ // send a request to remove a node, more arguments: ip-of-removed-node
+ // metaport-of-removed-node
+ private static final String MODE_REMOVE = "-r";
+
+ private MetaGroupMember metaGroupEngine;
+
+ // split DataGroupServiceImpls into engine and impls
+ private DataGroupEngine dataGroupEngine;
+
+ private Node thisNode;
+ private Coordinator coordinator;
+
+ private final IoTDB iotdb = IoTDB.getInstance();
+
+ // Cluster IoTDB uses a individual registerManager with its parent.
+ private RegisterManager registerManager = new RegisterManager();
+
+ /**
+ * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread"
will print the status
+ * of all raft members in this node
+ */
+ private ScheduledExecutorService reportThread;
+
+ private boolean allowReport = true;
+
+ /** hardLinkCleaner will periodically clean expired hardlinks created during
snapshots */
+ private ScheduledExecutorService hardLinkCleanerThread;
+
+ // currently, clientManager is only used for those instances who do not
belong to any
+ // DataGroup..
+ private IClientManager clientManager;
+
+ private ClusterIoTDB() {
+ // we do not init anything here, so that we can re-initialize the instance
in IT.
+ }
+
+ public void initLocalEngines() {
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ thisNode = new Node();
+ // set internal rpc ip and ports
+ thisNode.setInternalIp(config.getInternalIp());
+ thisNode.setMetaPort(config.getInternalMetaPort());
+ thisNode.setDataPort(config.getInternalDataPort());
+ // set client rpc ip and ports
+ thisNode.setClientPort(config.getClusterRpcPort());
+
thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
+ coordinator = new Coordinator();
+ // local engine
+ TProtocolFactory protocolFactory =
+ ThriftServiceThread.getProtocolFactory(
+
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+ metaGroupEngine = new MetaGroupMember(protocolFactory, thisNode,
coordinator);
+ IoTDB.setClusterMode();
+ IoTDB.setMetaManager(CMManager.getInstance());
+ ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
+ ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
+ MetaPuller.getInstance().init(metaGroupEngine);
+
+ // from the scope of the DataGroupEngine,it should be singleton pattern
+ // the way of setting MetaGroupMember in DataGroupEngine may need a better
modification in
+ // future commit.
+ DataGroupEngine.setProtocolFactory(protocolFactory);
+ DataGroupEngine.setMetaGroupMember(metaGroupEngine);
+ dataGroupEngine = DataGroupEngine.getInstance();
+ clientManager =
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient);
+ initTasks();
+ try {
+ // we need to check config after initLocalEngines.
+ startServerCheck();
+ } catch (StartupException e) {
+ logger.error("Failed to check cluster config.", e);
+ stop();
+ }
+ JMXService.registerMBean(metaGroupEngine, metaGroupEngine.getMBeanName());
+ }
+
+ private void initTasks() {
+ reportThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
+ reportThread.scheduleAtFixedRate(
+ this::generateNodeReport,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ TimeUnit.SECONDS);
+ hardLinkCleanerThread =
+
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
+ hardLinkCleanerThread.scheduleAtFixedRate(
+ new HardLinkCleaner(),
+ ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
+ ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
+ TimeUnit.SECONDS);
+ }
+
+ /**
+ * Generate a report containing the status of both MetaGroupMember and
DataGroupMembers of this
+ * node. This will help to see if the node is in a consistent and right
state during debugging.
+ */
+ private void generateNodeReport() {
+ if (logger.isDebugEnabled() && allowReport) {
+ try {
+ NodeReport report = new NodeReport(thisNode);
+ report.setMetaMemberReport(metaGroupEngine.genMemberReport());
+ report.setDataMemberReportList(dataGroupEngine.genMemberReports());
+ logger.debug(report.toString());
+ } catch (Exception e) {
+ logger.error("exception occurred when generating node report", e);
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ logger.error(
+ "Usage: <-s|-a|-r> "
+ + "[-D{} <configure folder>] \n"
+ + "-s: start the node as a seed\n"
+ + "-a: start the node as a new node\n"
+ + "-r: remove the node out of the cluster\n",
+ IoTDBConstant.IOTDB_CONF);
+ return;
+ }
+
+ ClusterIoTDB cluster = ClusterIoTDBHolder.INSTANCE;
+ // check config of iotdb,and set some configs in cluster mode
+ try {
+ if (!cluster.serverCheckAndInit()) {
+ return;
+ }
+ } catch (ConfigurationException | IOException e) {
+ logger.error("meet error when doing start checking", e);
+ return;
+ }
+ String mode = args[0];
+ logger.info("Running mode {}", mode);
+
+ // initialize the current node and its services
+ cluster.initLocalEngines();
+
Review comment:
Good idea.
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
+ String.format(
+ "%s:%s=%s", "org.apache.iotdb.cluster.service",
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+ // TODO fix me: better to throw exception if the client can not be get. Then
we can remove this
+ // field.
+ public static boolean printClientConnectionErrorStack = false;
+
+ // establish the cluster as a seed
+ private static final String MODE_START = "-s";
+ // join an established cluster
+ private static final String MODE_ADD = "-a";
+ // send a request to remove a node, more arguments: ip-of-removed-node
+ // metaport-of-removed-node
+ private static final String MODE_REMOVE = "-r";
+
+ private MetaGroupMember metaGroupEngine;
Review comment:
Good catch. We didn't rename the class in the PR to avoid too much
conflict when merge with master. We'll start a new PR for rename work.
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
+ String.format(
+ "%s:%s=%s", "org.apache.iotdb.cluster.service",
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+ // TODO fix me: better to throw exception if the client can not be get. Then
we can remove this
+ // field.
+ public static boolean printClientConnectionErrorStack = false;
+
+ // establish the cluster as a seed
+ private static final String MODE_START = "-s";
+ // join an established cluster
+ private static final String MODE_ADD = "-a";
+ // send a request to remove a node, more arguments: ip-of-removed-node
+ // metaport-of-removed-node
+ private static final String MODE_REMOVE = "-r";
+
+ private MetaGroupMember metaGroupEngine;
+
+ // split DataGroupServiceImpls into engine and impls
+ private DataGroupEngine dataGroupEngine;
+
+ private Node thisNode;
+ private Coordinator coordinator;
+
+ private final IoTDB iotdb = IoTDB.getInstance();
+
+ // Cluster IoTDB uses a individual registerManager with its parent.
+ private RegisterManager registerManager = new RegisterManager();
+
+ /**
+ * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread"
will print the status
+ * of all raft members in this node
+ */
+ private ScheduledExecutorService reportThread;
+
+ private boolean allowReport = true;
+
+ /** hardLinkCleaner will periodically clean expired hardlinks created during
snapshots */
+ private ScheduledExecutorService hardLinkCleanerThread;
+
+ // currently, clientManager is only used for those instances who do not
belong to any
+ // DataGroup..
+ private IClientManager clientManager;
+
+ private ClusterIoTDB() {
+ // we do not init anything here, so that we can re-initialize the instance
in IT.
+ }
+
+ public void initLocalEngines() {
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ thisNode = new Node();
+ // set internal rpc ip and ports
+ thisNode.setInternalIp(config.getInternalIp());
+ thisNode.setMetaPort(config.getInternalMetaPort());
+ thisNode.setDataPort(config.getInternalDataPort());
+ // set client rpc ip and ports
+ thisNode.setClientPort(config.getClusterRpcPort());
+
thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
+ coordinator = new Coordinator();
+ // local engine
+ TProtocolFactory protocolFactory =
+ ThriftServiceThread.getProtocolFactory(
+
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+ metaGroupEngine = new MetaGroupMember(protocolFactory, thisNode,
coordinator);
+ IoTDB.setClusterMode();
+ IoTDB.setMetaManager(CMManager.getInstance());
+ ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
+ ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
+ MetaPuller.getInstance().init(metaGroupEngine);
+
+ // from the scope of the DataGroupEngine,it should be singleton pattern
+ // the way of setting MetaGroupMember in DataGroupEngine may need a better
modification in
+ // future commit.
+ DataGroupEngine.setProtocolFactory(protocolFactory);
+ DataGroupEngine.setMetaGroupMember(metaGroupEngine);
+ dataGroupEngine = DataGroupEngine.getInstance();
+ clientManager =
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient);
+ initTasks();
+ try {
+ // we need to check config after initLocalEngines.
+ startServerCheck();
+ } catch (StartupException e) {
+ logger.error("Failed to check cluster config.", e);
+ stop();
+ }
+ JMXService.registerMBean(metaGroupEngine, metaGroupEngine.getMBeanName());
+ }
+
+ private void initTasks() {
+ reportThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
+ reportThread.scheduleAtFixedRate(
+ this::generateNodeReport,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ TimeUnit.SECONDS);
Review comment:
Yes. I'd like to check `logger.isDebugEnable()` here to decide whether
need to create the thread. `allowReport` could be change dynamically, won't
check it when start the thread. @jixuan1989
##########
File path:
cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class ClientPoolFactoryTest {
+ private ClusterConfig clusterConfig =
ClusterDescriptor.getInstance().getConfig();
+
+ private long mockMaxWaitTimeoutMs = 10 * 1000L;
+ private int mockMaxClientPerMember = 10;
+
+ private int maxClientPerNodePerMember =
clusterConfig.getMaxClientPerNodePerMember();
+ private long waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+
+ private ClientPoolFactory clientPoolFactory;
+ private MockClientManager mockClientManager;
+
+ @Before
+ public void setUp() {
+ clusterConfig.setMaxClientPerNodePerMember(mockMaxClientPerMember);
+ clusterConfig.setWaitClientTimeoutMS(mockMaxWaitTimeoutMs);
+ clientPoolFactory = new ClientPoolFactory();
+ mockClientManager =
+ new MockClientManager() {
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory
category) {
+ assert (client == asyncClient);
+ }
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {
+ Assert.assertTrue(client == syncClient);
+ }
+ };
+ clientPoolFactory.setClientManager(mockClientManager);
+ }
+
+ @After
+ public void tearDown() {
+ clusterConfig.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
+ clusterConfig.setWaitClientTimeoutMS(waitClientTimeoutMS);
+ }
+
+ @Test
+ public void poolConfigTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+ clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+ Node node = constructDefaultNode();
+
+ for (int i = 0; i < mockMaxClientPerMember; i++) {
+ RaftService.AsyncClient client = pool.borrowObject(node);
+ Assert.assertNotNull(client);
+ }
+
+ long timeStart = System.currentTimeMillis();
+ try {
+ pool.borrowObject(node);
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof NoSuchElementException);
+ } finally {
+ Assert.assertTrue(System.currentTimeMillis() - timeStart + 10 >
mockMaxWaitTimeoutMs);
+ }
Review comment:
Add null check for borrowed object.
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
+ String.format(
+ "%s:%s=%s", "org.apache.iotdb.cluster.service",
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+ // TODO fix me: better to throw exception if the client can not be get. Then
we can remove this
+ // field.
+ public static boolean printClientConnectionErrorStack = false;
+
+ // establish the cluster as a seed
+ private static final String MODE_START = "-s";
+ // join an established cluster
+ private static final String MODE_ADD = "-a";
+ // send a request to remove a node, more arguments: ip-of-removed-node
+ // metaport-of-removed-node
+ private static final String MODE_REMOVE = "-r";
+
+ private MetaGroupMember metaGroupEngine;
+
+ // split DataGroupServiceImpls into engine and impls
+ private DataGroupEngine dataGroupEngine;
+
+ private Node thisNode;
+ private Coordinator coordinator;
+
+ private final IoTDB iotdb = IoTDB.getInstance();
+
+ // Cluster IoTDB uses a individual registerManager with its parent.
+ private RegisterManager registerManager = new RegisterManager();
+
+ /**
+ * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread"
will print the status
+ * of all raft members in this node
+ */
+ private ScheduledExecutorService reportThread;
+
+ private boolean allowReport = true;
+
+ /** hardLinkCleaner will periodically clean expired hardlinks created during
snapshots */
+ private ScheduledExecutorService hardLinkCleanerThread;
+
+ // currently, clientManager is only used for those instances who do not
belong to any
+ // DataGroup..
+ private IClientManager clientManager;
+
+ private ClusterIoTDB() {
+ // we do not init anything here, so that we can re-initialize the instance
in IT.
+ }
+
+ public void initLocalEngines() {
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ thisNode = new Node();
+ // set internal rpc ip and ports
+ thisNode.setInternalIp(config.getInternalIp());
+ thisNode.setMetaPort(config.getInternalMetaPort());
+ thisNode.setDataPort(config.getInternalDataPort());
+ // set client rpc ip and ports
+ thisNode.setClientPort(config.getClusterRpcPort());
+
thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
+ coordinator = new Coordinator();
+ // local engine
+ TProtocolFactory protocolFactory =
+ ThriftServiceThread.getProtocolFactory(
+
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+ metaGroupEngine = new MetaGroupMember(protocolFactory, thisNode,
coordinator);
+ IoTDB.setClusterMode();
+ IoTDB.setMetaManager(CMManager.getInstance());
+ ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
+ ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
+ MetaPuller.getInstance().init(metaGroupEngine);
+
+ // from the scope of the DataGroupEngine,it should be singleton pattern
+ // the way of setting MetaGroupMember in DataGroupEngine may need a better
modification in
+ // future commit.
+ DataGroupEngine.setProtocolFactory(protocolFactory);
+ DataGroupEngine.setMetaGroupMember(metaGroupEngine);
+ dataGroupEngine = DataGroupEngine.getInstance();
+ clientManager =
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient);
+ initTasks();
+ try {
+ // we need to check config after initLocalEngines.
+ startServerCheck();
+ } catch (StartupException e) {
+ logger.error("Failed to check cluster config.", e);
+ stop();
+ }
+ JMXService.registerMBean(metaGroupEngine, metaGroupEngine.getMBeanName());
+ }
+
+ private void initTasks() {
+ reportThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
+ reportThread.scheduleAtFixedRate(
+ this::generateNodeReport,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ TimeUnit.SECONDS);
+ hardLinkCleanerThread =
+
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
+ hardLinkCleanerThread.scheduleAtFixedRate(
+ new HardLinkCleaner(),
+ ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
+ ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
+ TimeUnit.SECONDS);
+ }
+
+ /**
+ * Generate a report containing the status of both MetaGroupMember and
DataGroupMembers of this
+ * node. This will help to see if the node is in a consistent and right
state during debugging.
+ */
+ private void generateNodeReport() {
+ if (logger.isDebugEnabled() && allowReport) {
+ try {
+ NodeReport report = new NodeReport(thisNode);
+ report.setMetaMemberReport(metaGroupEngine.genMemberReport());
+ report.setDataMemberReportList(dataGroupEngine.genMemberReports());
+ logger.debug(report.toString());
+ } catch (Exception e) {
+ logger.error("exception occurred when generating node report", e);
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ logger.error(
+ "Usage: <-s|-a|-r> "
+ + "[-D{} <configure folder>] \n"
+ + "-s: start the node as a seed\n"
+ + "-a: start the node as a new node\n"
+ + "-r: remove the node out of the cluster\n",
+ IoTDBConstant.IOTDB_CONF);
+ return;
+ }
+
+ ClusterIoTDB cluster = ClusterIoTDBHolder.INSTANCE;
+ // check config of iotdb,and set some configs in cluster mode
+ try {
+ if (!cluster.serverCheckAndInit()) {
+ return;
+ }
+ } catch (ConfigurationException | IOException e) {
+ logger.error("meet error when doing start checking", e);
+ return;
+ }
+ String mode = args[0];
+ logger.info("Running mode {}", mode);
+
+ // initialize the current node and its services
+ cluster.initLocalEngines();
+
+ // we start IoTDB kernel first. then we start the cluster module.
+ if (MODE_START.equals(mode)) {
+ cluster.activeStartNodeMode();
+ } else if (MODE_ADD.equals(mode)) {
+ cluster.activeAddNodeMode();
+ } else if (MODE_REMOVE.equals(mode)) {
+ try {
+ cluster.doRemoveNode(args);
+ } catch (IOException e) {
+ logger.error("Fail to remove node in cluster", e);
+ }
+ } else {
+ logger.error("Unrecognized mode {}", mode);
+ }
+ }
+
+ private boolean serverCheckAndInit() throws ConfigurationException,
IOException {
+ IoTDBConfigCheck.getInstance().checkConfig();
+ // init server's configuration first, because the cluster configuration
may read settings from
+ // the server's configuration.
+ IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false);
+ // auto create schema is took over by cluster module, so we disable it in
the server module.
+
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+ // check cluster config
+ String checkResult = clusterConfigCheck();
+ if (checkResult != null) {
+ logger.error(checkResult);
+ return false;
+ }
+ return true;
+ }
+
+ private String clusterConfigCheck() {
+ try {
+ ClusterDescriptor.getInstance().replaceHostnameWithIp();
+ } catch (Exception e) {
+ return String.format("replace hostname with ip failed, %s",
e.getMessage());
+ }
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ // check the initial replicateNum and refuse to start when the
replicateNum <= 0
+ if (config.getReplicationNum() <= 0) {
+ return String.format(
+ "ReplicateNum should be greater than 0 instead of %d.",
config.getReplicationNum());
+ }
+ // check the initial cluster size and refuse to start when the size <
quorum
+ int quorum = config.getReplicationNum() / 2 + 1;
+ if (config.getSeedNodeUrls().size() < quorum) {
+ return String.format(
+ "Seed number less than quorum, seed number: %s, quorum: " + "%s.",
+ config.getSeedNodeUrls().size(), quorum);
+ }
+ // TODO duplicate code,consider to solve it later
+ Set<Node> seedNodes = new HashSet<>();
+ for (String url : config.getSeedNodeUrls()) {
+ Node node = ClusterUtils.parseNode(url);
+ if (seedNodes.contains(node)) {
+ return String.format(
+ "SeedNodes must not repeat each other. SeedNodes: %s",
config.getSeedNodeUrls());
+ }
+ seedNodes.add(node);
+ }
+ return null;
+ }
+
+ public void activeStartNodeMode() {
+ try {
+ // start iotdb server first
+ IoTDB.getInstance().active();
+ // some work about cluster
+ preInitCluster();
+ // try to build cluster
+ metaGroupEngine.buildCluster();
+ // register service after cluster build
+ postInitCluster();
+ // init ServiceImpl to handle request of client
+ startClientRPC();
+ } catch (StartupException
+ | StartUpCheckFailureException
+ | ConfigInconsistentException
+ | QueryProcessException e) {
+ logger.error("Fail to start server", e);
+ stop();
+ }
+ }
+
+ private void preInitCluster() throws StartupException {
+ stopRaftInfoReport();
+ JMXService.registerMBean(this, mbeanName);
+ // register MetaGroupMember. MetaGroupMember has the same position with
"StorageEngine" in the
Review comment:
@jixuan1989 , any idea?
##########
File path: .github/workflows/client-go.yml
##########
@@ -13,6 +13,8 @@ on:
branches:
- master
- 'rel/*'
+ #remove me when cluster- branch is merged
Review comment:
remove?
##########
File path: .github/workflows/client.yml
##########
@@ -14,6 +14,8 @@ on:
branches:
- master
- "rel/*"
+ #remove me when cluster- branch is merged
Review comment:
ditto
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
+ String.format(
+ "%s:%s=%s", "org.apache.iotdb.cluster.service",
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+ // TODO fix me: better to throw exception if the client can not be get. Then
we can remove this
+ // field.
+ public static boolean printClientConnectionErrorStack = false;
+
+ // establish the cluster as a seed
+ private static final String MODE_START = "-s";
+ // join an established cluster
+ private static final String MODE_ADD = "-a";
+ // send a request to remove a node, more arguments: ip-of-removed-node
+ // metaport-of-removed-node
+ private static final String MODE_REMOVE = "-r";
+
+ private MetaGroupMember metaGroupEngine;
+
+ // split DataGroupServiceImpls into engine and impls
+ private DataGroupEngine dataGroupEngine;
+
+ private Node thisNode;
+ private Coordinator coordinator;
+
+ private final IoTDB iotdb = IoTDB.getInstance();
+
+ // Cluster IoTDB uses a individual registerManager with its parent.
+ private RegisterManager registerManager = new RegisterManager();
+
+ /**
+ * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread"
will print the status
+ * of all raft members in this node
+ */
+ private ScheduledExecutorService reportThread;
+
+ private boolean allowReport = true;
+
+ /** hardLinkCleaner will periodically clean expired hardlinks created during
snapshots */
+ private ScheduledExecutorService hardLinkCleanerThread;
+
+ // currently, clientManager is only used for those instances who do not
belong to any
+ // DataGroup..
+ private IClientManager clientManager;
+
+ private ClusterIoTDB() {
+ // we do not init anything here, so that we can re-initialize the instance
in IT.
+ }
+
+ public void initLocalEngines() {
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ thisNode = new Node();
+ // set internal rpc ip and ports
+ thisNode.setInternalIp(config.getInternalIp());
+ thisNode.setMetaPort(config.getInternalMetaPort());
+ thisNode.setDataPort(config.getInternalDataPort());
+ // set client rpc ip and ports
+ thisNode.setClientPort(config.getClusterRpcPort());
+
thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
+ coordinator = new Coordinator();
+ // local engine
+ TProtocolFactory protocolFactory =
+ ThriftServiceThread.getProtocolFactory(
+
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+ metaGroupEngine = new MetaGroupMember(protocolFactory, thisNode,
coordinator);
+ IoTDB.setClusterMode();
+ IoTDB.setMetaManager(CMManager.getInstance());
+ ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
+ ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
+ MetaPuller.getInstance().init(metaGroupEngine);
+
+ // from the scope of the DataGroupEngine,it should be singleton pattern
+ // the way of setting MetaGroupMember in DataGroupEngine may need a better
modification in
+ // future commit.
+ DataGroupEngine.setProtocolFactory(protocolFactory);
+ DataGroupEngine.setMetaGroupMember(metaGroupEngine);
+ dataGroupEngine = DataGroupEngine.getInstance();
+ clientManager =
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient);
+ initTasks();
+ try {
+ // we need to check config after initLocalEngines.
+ startServerCheck();
+ } catch (StartupException e) {
+ logger.error("Failed to check cluster config.", e);
+ stop();
+ }
+ JMXService.registerMBean(metaGroupEngine, metaGroupEngine.getMBeanName());
+ }
+
+ private void initTasks() {
+ reportThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
+ reportThread.scheduleAtFixedRate(
+ this::generateNodeReport,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ TimeUnit.SECONDS);
Review comment:
After discussion, `logger.isDebugEnable()` could also be update via JMX
interface. I think we won't fix this one.
##########
File path:
cluster/src/test/java/org/apache/iotdb/cluster/client/ClientManagerTest.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ClientManagerTest extends BaseClientTest {
+
+ @Before
+ public void setUp() throws IOException {
+ startDataServer();
+ startMetaServer();
+ startDataHeartbeatServer();
+ startMetaHeartbeatServer();
+ }
+
+ @After
+ public void tearDown() throws IOException, InterruptedException {
+ stopDataServer();
+ stopMetaServer();
+ stopDataHeartbeatServer();
+ stopMetaHeartbeatServer();
+ }
+
+ @Test
+ public void syncClientManagersTest() throws Exception {
+ // ---------Sync cluster clients manager test------------
+ ClientManager clusterManager =
+ new ClientManager(false, ClientManager.Type.RequestForwardClient);
+ RaftService.Client syncClusterClient =
+ clusterManager.borrowSyncClient(defaultNode, ClientCategory.DATA);
+
+ Assert.assertNotNull(syncClusterClient);
+ Assert.assertTrue(syncClusterClient instanceof SyncDataClient);
+ Assert.assertEquals(((SyncDataClient) syncClusterClient).getNode(),
defaultNode);
+
Assert.assertTrue(syncClusterClient.getInputProtocol().getTransport().isOpen());
+ ((SyncDataClient) syncClusterClient).returnSelf();
+
+ // cluster test
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode,
ClientCategory.META));
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode,
ClientCategory.META_HEARTBEAT));
+
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode,
ClientCategory.META));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode,
ClientCategory.META_HEARTBEAT));
+
+ // ---------Sync meta(meta heartbeat) clients manager test------------
+ ClientManager metaManager = new ClientManager(false,
ClientManager.Type.MetaGroupClient);
+ RaftService.Client metaClient = metaManager.borrowSyncClient(defaultNode,
ClientCategory.META);
+ Assert.assertNotNull(metaClient);
+ Assert.assertTrue(metaClient instanceof SyncMetaClient);
+ Assert.assertEquals(((SyncMetaClient) metaClient).getNode(), defaultNode);
+ Assert.assertTrue(metaClient.getInputProtocol().getTransport().isOpen());
+ ((SyncMetaClient) metaClient).returnSelf();
+
+ RaftService.Client metaHeartClient =
+ metaManager.borrowSyncClient(defaultNode,
ClientCategory.META_HEARTBEAT);
+ Assert.assertNotNull(metaHeartClient);
+ Assert.assertTrue(metaHeartClient instanceof SyncMetaClient);
+ Assert.assertEquals(((SyncMetaClient) metaHeartClient).getNode(),
defaultNode);
+
Assert.assertTrue(metaHeartClient.getInputProtocol().getTransport().isOpen());
+ ((SyncMetaClient) metaHeartClient).returnSelf();
+
+ // cluster test
+ Assert.assertNull(metaManager.borrowSyncClient(defaultNode,
ClientCategory.DATA));
+ Assert.assertNull(metaManager.borrowSyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT));
+
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA));
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode,
ClientCategory.META));
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode,
ClientCategory.META_HEARTBEAT));
+
+ // ---------Sync data(data heartbeat) clients manager test------------
+ ClientManager dataManager = new ClientManager(false,
ClientManager.Type.DataGroupClient);
+
+ RaftService.Client dataClient = dataManager.borrowSyncClient(defaultNode,
ClientCategory.DATA);
+ Assert.assertNotNull(dataClient);
+ Assert.assertTrue(dataClient instanceof SyncDataClient);
+ Assert.assertEquals(((SyncDataClient) dataClient).getNode(), defaultNode);
+ Assert.assertTrue(dataClient.getInputProtocol().getTransport().isOpen());
+ ((SyncDataClient) dataClient).returnSelf();
+
+ RaftService.Client dataHeartClient =
+ dataManager.borrowSyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT);
+ Assert.assertNotNull(dataHeartClient);
+ Assert.assertTrue(dataHeartClient instanceof SyncDataClient);
+ Assert.assertEquals(((SyncDataClient) dataHeartClient).getNode(),
defaultNode);
+
Assert.assertTrue(dataHeartClient.getInputProtocol().getTransport().isOpen());
+ ((SyncDataClient) dataHeartClient).returnSelf();
+
+ // cluster test
+ Assert.assertNull(dataManager.borrowSyncClient(defaultNode,
ClientCategory.META));
+ Assert.assertNull(dataManager.borrowSyncClient(defaultNode,
ClientCategory.META_HEARTBEAT));
+
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA));
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode,
ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode,
ClientCategory.META));
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode,
ClientCategory.META_HEARTBEAT));
+ }
+
+ @Test
+ public void asyncClientManagersTest() throws Exception {
+ // ---------async cluster clients manager test------------
+ ClientManager clusterManager = new ClientManager(true,
ClientManager.Type.RequestForwardClient);
+ RaftService.AsyncClient clusterClient =
+ clusterManager.borrowAsyncClient(defaultNode, ClientCategory.DATA);
+
+ Assert.assertNotNull(clusterClient);
+ Assert.assertTrue(clusterClient instanceof AsyncDataClient);
+ Assert.assertEquals(((AsyncDataClient) clusterClient).getNode(),
defaultNode);
+ Assert.assertTrue(((AsyncDataClient) clusterClient).isValid());
+ Assert.assertTrue(((AsyncDataClient) clusterClient).isReady());
Review comment:
There is no interface to invalidate async client. And I think the case
is covered by verify all clients got from the manager are valid. Could we
ignore the branch test? How do you think?
##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of
IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
+ String.format(
+ "%s:%s=%s", "org.apache.iotdb.cluster.service",
IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+ // TODO fix me: better to throw exception if the client can not be get. Then
we can remove this
+ // field.
+ public static boolean printClientConnectionErrorStack = false;
+
+ // establish the cluster as a seed
+ private static final String MODE_START = "-s";
+ // join an established cluster
+ private static final String MODE_ADD = "-a";
+ // send a request to remove a node, more arguments: ip-of-removed-node
+ // metaport-of-removed-node
+ private static final String MODE_REMOVE = "-r";
+
+ private MetaGroupMember metaGroupEngine;
+
+ // split DataGroupServiceImpls into engine and impls
+ private DataGroupEngine dataGroupEngine;
+
+ private Node thisNode;
+ private Coordinator coordinator;
+
+ private final IoTDB iotdb = IoTDB.getInstance();
+
+ // Cluster IoTDB uses a individual registerManager with its parent.
+ private RegisterManager registerManager = new RegisterManager();
+
+ /**
+ * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread"
will print the status
+ * of all raft members in this node
+ */
+ private ScheduledExecutorService reportThread;
+
+ private boolean allowReport = true;
+
+ /** hardLinkCleaner will periodically clean expired hardlinks created during
snapshots */
+ private ScheduledExecutorService hardLinkCleanerThread;
+
+ // currently, clientManager is only used for those instances who do not
belong to any
+ // DataGroup..
+ private IClientManager clientManager;
+
+ private ClusterIoTDB() {
+ // we do not init anything here, so that we can re-initialize the instance
in IT.
+ }
+
+ public void initLocalEngines() {
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ thisNode = new Node();
+ // set internal rpc ip and ports
+ thisNode.setInternalIp(config.getInternalIp());
+ thisNode.setMetaPort(config.getInternalMetaPort());
+ thisNode.setDataPort(config.getInternalDataPort());
+ // set client rpc ip and ports
+ thisNode.setClientPort(config.getClusterRpcPort());
+
thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
+ coordinator = new Coordinator();
+ // local engine
+ TProtocolFactory protocolFactory =
+ ThriftServiceThread.getProtocolFactory(
+
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+ metaGroupEngine = new MetaGroupMember(protocolFactory, thisNode,
coordinator);
+ IoTDB.setClusterMode();
+ IoTDB.setMetaManager(CMManager.getInstance());
+ ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
+ ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
+ MetaPuller.getInstance().init(metaGroupEngine);
+
+ // from the scope of the DataGroupEngine,it should be singleton pattern
+ // the way of setting MetaGroupMember in DataGroupEngine may need a better
modification in
+ // future commit.
+ DataGroupEngine.setProtocolFactory(protocolFactory);
+ DataGroupEngine.setMetaGroupMember(metaGroupEngine);
+ dataGroupEngine = DataGroupEngine.getInstance();
+ clientManager =
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient);
+ initTasks();
+ try {
+ // we need to check config after initLocalEngines.
+ startServerCheck();
+ } catch (StartupException e) {
+ logger.error("Failed to check cluster config.", e);
+ stop();
+ }
+ JMXService.registerMBean(metaGroupEngine, metaGroupEngine.getMBeanName());
+ }
+
+ private void initTasks() {
+ reportThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
+ reportThread.scheduleAtFixedRate(
+ this::generateNodeReport,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ ClusterConstant.REPORT_INTERVAL_SEC,
+ TimeUnit.SECONDS);
+ hardLinkCleanerThread =
+
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
+ hardLinkCleanerThread.scheduleAtFixedRate(
+ new HardLinkCleaner(),
+ ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
+ ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
+ TimeUnit.SECONDS);
+ }
+
+ /**
+ * Generate a report containing the status of both MetaGroupMember and
DataGroupMembers of this
+ * node. This will help to see if the node is in a consistent and right
state during debugging.
+ */
+ private void generateNodeReport() {
+ if (logger.isDebugEnabled() && allowReport) {
+ try {
+ NodeReport report = new NodeReport(thisNode);
+ report.setMetaMemberReport(metaGroupEngine.genMemberReport());
+ report.setDataMemberReportList(dataGroupEngine.genMemberReports());
+ logger.debug(report.toString());
+ } catch (Exception e) {
+ logger.error("exception occurred when generating node report", e);
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ logger.error(
+ "Usage: <-s|-a|-r> "
+ + "[-D{} <configure folder>] \n"
+ + "-s: start the node as a seed\n"
+ + "-a: start the node as a new node\n"
+ + "-r: remove the node out of the cluster\n",
+ IoTDBConstant.IOTDB_CONF);
+ return;
+ }
+
+ ClusterIoTDB cluster = ClusterIoTDBHolder.INSTANCE;
+ // check config of iotdb,and set some configs in cluster mode
+ try {
+ if (!cluster.serverCheckAndInit()) {
+ return;
+ }
+ } catch (ConfigurationException | IOException e) {
+ logger.error("meet error when doing start checking", e);
+ return;
+ }
+ String mode = args[0];
+ logger.info("Running mode {}", mode);
+
+ // initialize the current node and its services
+ cluster.initLocalEngines();
+
+ // we start IoTDB kernel first. then we start the cluster module.
+ if (MODE_START.equals(mode)) {
+ cluster.activeStartNodeMode();
+ } else if (MODE_ADD.equals(mode)) {
+ cluster.activeAddNodeMode();
+ } else if (MODE_REMOVE.equals(mode)) {
+ try {
+ cluster.doRemoveNode(args);
+ } catch (IOException e) {
+ logger.error("Fail to remove node in cluster", e);
+ }
+ } else {
+ logger.error("Unrecognized mode {}", mode);
+ }
+ }
+
+ private boolean serverCheckAndInit() throws ConfigurationException,
IOException {
+ IoTDBConfigCheck.getInstance().checkConfig();
+ // init server's configuration first, because the cluster configuration
may read settings from
+ // the server's configuration.
+ IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false);
+ // auto create schema is took over by cluster module, so we disable it in
the server module.
+
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+ // check cluster config
+ String checkResult = clusterConfigCheck();
+ if (checkResult != null) {
+ logger.error(checkResult);
+ return false;
+ }
+ return true;
+ }
+
+ private String clusterConfigCheck() {
+ try {
+ ClusterDescriptor.getInstance().replaceHostnameWithIp();
+ } catch (Exception e) {
+ return String.format("replace hostname with ip failed, %s",
e.getMessage());
+ }
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ // check the initial replicateNum and refuse to start when the
replicateNum <= 0
+ if (config.getReplicationNum() <= 0) {
+ return String.format(
+ "ReplicateNum should be greater than 0 instead of %d.",
config.getReplicationNum());
+ }
+ // check the initial cluster size and refuse to start when the size <
quorum
+ int quorum = config.getReplicationNum() / 2 + 1;
+ if (config.getSeedNodeUrls().size() < quorum) {
+ return String.format(
+ "Seed number less than quorum, seed number: %s, quorum: " + "%s.",
+ config.getSeedNodeUrls().size(), quorum);
+ }
+ // TODO duplicate code,consider to solve it later
+ Set<Node> seedNodes = new HashSet<>();
+ for (String url : config.getSeedNodeUrls()) {
+ Node node = ClusterUtils.parseNode(url);
+ if (seedNodes.contains(node)) {
+ return String.format(
+ "SeedNodes must not repeat each other. SeedNodes: %s",
config.getSeedNodeUrls());
+ }
+ seedNodes.add(node);
+ }
+ return null;
+ }
+
+ public void activeStartNodeMode() {
+ try {
+ // start iotdb server first
+ IoTDB.getInstance().active();
+ // some work about cluster
+ preInitCluster();
+ // try to build cluster
+ metaGroupEngine.buildCluster();
+ // register service after cluster build
+ postInitCluster();
+ // init ServiceImpl to handle request of client
+ startClientRPC();
+ } catch (StartupException
+ | StartUpCheckFailureException
+ | ConfigInconsistentException
+ | QueryProcessException e) {
+ logger.error("Fail to start server", e);
+ stop();
+ }
+ }
+
+ private void preInitCluster() throws StartupException {
+ stopRaftInfoReport();
+ JMXService.registerMBean(this, mbeanName);
+ // register MetaGroupMember. MetaGroupMember has the same position with
"StorageEngine" in the
Review comment:
Won't fix.
##########
File path:
cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class ClientPoolFactoryTest {
+ private ClusterConfig clusterConfig =
ClusterDescriptor.getInstance().getConfig();
+
+ private long mockMaxWaitTimeoutMs = 10 * 1000L;
+ private int mockMaxClientPerMember = 10;
+
+ private int maxClientPerNodePerMember =
clusterConfig.getMaxClientPerNodePerMember();
+ private long waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+
+ private ClientPoolFactory clientPoolFactory;
+ private MockClientManager mockClientManager;
+
+ @Before
+ public void setUp() {
+ clusterConfig.setMaxClientPerNodePerMember(mockMaxClientPerMember);
+ clusterConfig.setWaitClientTimeoutMS(mockMaxWaitTimeoutMs);
+ clientPoolFactory = new ClientPoolFactory();
+ mockClientManager =
+ new MockClientManager() {
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory
category) {
+ assert (client == asyncClient);
+ }
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {
+ Assert.assertTrue(client == syncClient);
+ }
+ };
+ clientPoolFactory.setClientManager(mockClientManager);
+ }
+
+ @After
+ public void tearDown() {
+ clusterConfig.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
+ clusterConfig.setWaitClientTimeoutMS(waitClientTimeoutMS);
+ }
+
+ @Test
+ public void poolConfigTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+ clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+ Node node = constructDefaultNode();
+
+ for (int i = 0; i < mockMaxClientPerMember; i++) {
+ RaftService.AsyncClient client = pool.borrowObject(node);
+ Assert.assertNotNull(client);
+ }
+
+ long timeStart = System.currentTimeMillis();
+ try {
+ pool.borrowObject(node);
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof NoSuchElementException);
+ } finally {
+ Assert.assertTrue(System.currentTimeMillis() - timeStart + 10 >
mockMaxWaitTimeoutMs);
+ }
+ }
+
+ @Test
+ public void poolRecycleTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+ clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+
+ Node node = constructDefaultNode();
+ List<RaftService.AsyncClient> clientList = new ArrayList<>();
+ for (int i = 0; i < pool.getMaxIdlePerKey(); i++) {
+ RaftService.AsyncClient client = pool.borrowObject(node);
+ Assert.assertNotNull(client);
+ clientList.add(client);
+ }
+
+ for (RaftService.AsyncClient client : clientList) {
+ pool.returnObject(node, client);
+ }
+
+ for (int i = 0; i < pool.getMaxIdlePerKey(); i++) {
+ RaftService.AsyncClient client = pool.borrowObject(node);
+ Assert.assertNotNull(client);
+ Assert.assertTrue(clientList.contains(client));
+ }
+ }
Review comment:
Yes, the idle ones will be destroy by eviction thread periodically.
##########
File path:
cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
##########
@@ -44,23 +45,27 @@
@Before
public void setUp() throws Exception {
initConfigs();
- metaServer = new MetaClusterServer();
- metaServer.start();
- metaServer.buildCluster();
+ daemon = ClusterIoTDB.getInstance();
+ daemon.initLocalEngines();
+ DataGroupEngine.getInstance().resetFactory();
+ daemon.activeStartNodeMode();
}
@After
public void tearDown() throws Exception {
- metaServer.stop();
+ // TODO fixme
+ daemon.stop();
Review comment:
Remove the comments, it should be left when develop to do some mark. We
now can pass all ut cases, so won't need it anymore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]