fanhualta commented on a change in pull request #3191:
URL: https://github.com/apache/iotdb/pull/3191#discussion_r648496396
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
##########
@@ -304,6 +311,74 @@ private TSStatus forwardPlan(List<PartitionGroup>
partitionGroups, PhysicalPlan
return status;
}
+ public void sendLogToAllDataGroups(Log log) throws ChangeMembershipException
{
+ if (logger.isDebugEnabled()) {
+ logger.debug("Send log {} to all data groups: start", log);
+ }
+
+ Map<PhysicalPlan, PartitionGroup> planGroupMap =
router.splitAndRouteChangeMembershipLog(log);
+ List<String> errorCodePartitionGroups = new CopyOnWriteArrayList<>();
+ CountDownLatch counter = new CountDownLatch(planGroupMap.size());
+ for (Map.Entry<PhysicalPlan, PartitionGroup> entry :
planGroupMap.entrySet()) {
+ metaGroupMember
+ .getAppendLogThreadPool()
+ .submit(() -> forwardChangeMembershipPlan(log, entry,
errorCodePartitionGroups, counter));
+ }
+ try {
+ counter.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ChangeMembershipException(
+ String.format("Can not wait all data groups to apply %s", log));
+ }
+ if (!errorCodePartitionGroups.isEmpty()) {
+ throw new ChangeMembershipException(
+ String.format("Apply %s failed with status {%s}", log,
errorCodePartitionGroups));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Send log {} to all data groups: end", log);
+ }
+ }
+
+ private void forwardChangeMembershipPlan(
+ Log log,
+ Map.Entry<PhysicalPlan, PartitionGroup> entry,
+ List<String> errorCodePartitionGroups,
+ CountDownLatch counter) {
+ int retryTime = 0;
+ try {
+ while (true) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Send change membership log {} to data group {}, retry time: {}",
+ log,
+ entry.getValue(),
+ retryTime);
+ }
+ try {
+ TSStatus status = forwardToSingleGroup(entry);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
{
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Success to send change membership log {} to data group {}",
+ log,
+ entry.getValue());
+ }
+ return;
+ }
+ Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ errorCodePartitionGroups.add(e.getMessage());
+ return;
+ }
+ retryTime++;
+ }
+ } finally {
+ counter.countDown();
+ }
+ }
Review comment:
Good suggestion. I have added timeout logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]