This is an automated email from the ASF dual-hosted git repository.
xyao pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 138d33e HDDS-3188 Add failover proxy for SCM block location. (#1340)
138d33e is described below
commit 138d33ec0c6a0385f6a59f42fadaaec079ddb6c0
Author: Li Cheng <[email protected]>
AuthorDate: Thu Oct 1 00:27:09 2020 +0800
HDDS-3188 Add failover proxy for SCM block location. (#1340)
---
.../hadoop/hdds/scm/exceptions/SCMException.java | 3 +-
...lockLocationProtocolClientSideTranslatorPB.java | 25 +-
.../SCMBlockLocationFailoverProxyProvider.java | 280 +++++++++++++++++++++
.../hadoop/hdds/scm/proxy/SCMClientConfig.java | 103 ++++++++
.../apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java | 73 ++++++
.../apache/hadoop/hdds/scm/proxy/package-info.java | 22 ++
.../src/main/proto/ScmServerProtocol.proto | 3 +
...lockLocationProtocolServerSideTranslatorPB.java | 18 ++
.../hdds/scm/server/SCMBlockProtocolServer.java | 4 +
.../hdds/scm/server/StorageContainerManager.java | 19 ++
.../org/apache/hadoop/ozone/om/OzoneManager.java | 11 +-
11 files changed, 545 insertions(+), 16 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index db1f82a..11b7b3c 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -122,6 +122,7 @@ public class SCMException extends IOException {
FAILED_TO_FIND_ACTIVE_PIPELINE,
FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY,
FAILED_TO_ALLOCATE_ENOUGH_BLOCKS,
- INTERNAL_ERROR
+ INTERNAL_ERROR,
+ SCM_NOT_LEADER
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index e86ee81..12c51f6 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.ContainerBlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
import
org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type;
@@ -45,10 +46,11 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
@@ -73,15 +75,21 @@ public final class
ScmBlockLocationProtocolClientSideTranslatorPB
private static final RpcController NULL_RPC_CONTROLLER = null;
private final ScmBlockLocationProtocolPB rpcProxy;
+ private SCMBlockLocationFailoverProxyProvider failoverProxyProvider;
/**
* Creates a new StorageContainerLocationProtocolClientSideTranslatorPB.
*
- * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy
+ * @param proxyProvider {@link SCMBlockLocationFailoverProxyProvider}
+ * failover proxy provider.
*/
public ScmBlockLocationProtocolClientSideTranslatorPB(
- ScmBlockLocationProtocolPB rpcProxy) {
- this.rpcProxy = rpcProxy;
+ SCMBlockLocationFailoverProxyProvider proxyProvider) {
+ Preconditions.checkState(proxyProvider != null);
+ this.failoverProxyProvider = proxyProvider;
+ this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create(
+ ScmBlockLocationProtocolPB.class, failoverProxyProvider,
+ failoverProxyProvider.getSCMBlockLocationRetryPolicy(null));
}
/**
@@ -105,6 +113,11 @@ public final class
ScmBlockLocationProtocolClientSideTranslatorPB
try {
SCMBlockLocationResponse response =
rpcProxy.send(NULL_RPC_CONTROLLER, req);
+ if (response.getStatus() ==
+ ScmBlockLocationProtocolProtos.Status.SCM_NOT_LEADER) {
+ failoverProxyProvider
+ .performFailoverToAssignedLeader(response.getLeaderSCMNodeId());
+ }
return response;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
@@ -267,7 +280,7 @@ public final class
ScmBlockLocationProtocolClientSideTranslatorPB
}
@Override
- public void close() {
- RPC.stopProxy(rpcProxy);
+ public void close() throws IOException {
+ failoverProxyProvider.close();
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
new file mode 100644
index 0000000..1beb69e
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
+import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
+import static org.apache.hadoop.hdds.HddsUtils.getHostName;
+
+/**
+ * Failover proxy provider for SCM.
+ */
+public class SCMBlockLocationFailoverProxyProvider implements
+ FailoverProxyProvider<ScmBlockLocationProtocolPB>, Closeable {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class);
+
+ private Map<String, ProxyInfo<ScmBlockLocationProtocolPB>> scmProxies;
+ private Map<String, SCMProxyInfo> scmProxyInfoMap;
+ private List<String> scmNodeIDList;
+
+ private String currentProxySCMNodeId;
+ private int currentProxyIndex;
+
+ private final ConfigurationSource conf;
+ private final long scmVersion;
+
+ private final String scmServiceId;
+
+ private String lastAttemptedLeader;
+
+ private final int maxRetryCount;
+ private final long retryInterval;
+
+ public static final String SCM_DUMMY_NODEID_PREFIX = "scm";
+
+ public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) {
+ this.conf = conf;
+ this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocol.class);
+ this.scmServiceId = conf.getTrimmed(OZONE_SCM_SERVICE_IDS_KEY);
+ this.scmProxies = new HashMap<>();
+ this.scmProxyInfoMap = new HashMap<>();
+ this.scmNodeIDList = new ArrayList<>();
+ loadConfigs();
+
+
+ this.currentProxyIndex = 0;
+ currentProxySCMNodeId = scmNodeIDList.get(currentProxyIndex);
+
+ SCMClientConfig config = conf.getObject(SCMClientConfig.class);
+ this.maxRetryCount = config.getRetryCount();
+ this.retryInterval = config.getRetryInterval();
+ }
+
+ @VisibleForTesting
+ protected Collection<InetSocketAddress> getSCMAddressList() {
+ Collection<String> scmAddressList =
+ conf.getTrimmedStringCollection(OZONE_SCM_NAMES);
+ Collection<InetSocketAddress> resultList = new ArrayList<>();
+ if (!scmAddressList.isEmpty()) {
+ final int port = getPortNumberFromConfigKeys(conf,
+ ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY)
+ .orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT);
+ for (String scmAddress : scmAddressList) {
+ LOG.info("SCM Address for proxy is {}", scmAddress);
+
+ Optional<String> hostname = getHostName(scmAddress);
+ if (hostname.isPresent()) {
+ resultList.add(NetUtils.createSocketAddr(
+ hostname.get() + ":" + port));
+ }
+ }
+ }
+ if (resultList.isEmpty()) {
+ // fall back
+ resultList.add(getScmAddressForBlockClients(conf));
+ }
+ return resultList;
+ }
+
+ private void loadConfigs() {
+ Collection<InetSocketAddress> scmAddressList = getSCMAddressList();
+ int scmNodeIndex = 1;
+ for (InetSocketAddress scmAddress : scmAddressList) {
+ String nodeId = SCM_DUMMY_NODEID_PREFIX + scmNodeIndex;
+ if (scmAddress == null) {
+ LOG.error("Failed to create SCM proxy for {}.", nodeId);
+ continue;
+ }
+ scmNodeIndex++;
+ SCMProxyInfo scmProxyInfo = new SCMProxyInfo(
+ scmServiceId, nodeId, scmAddress);
+ ProxyInfo<ScmBlockLocationProtocolPB> proxy = new ProxyInfo<>(
+ null, scmProxyInfo.toString());
+ scmProxies.put(nodeId, proxy);
+ scmProxyInfoMap.put(nodeId, scmProxyInfo);
+ scmNodeIDList.add(nodeId);
+ }
+
+ if (scmProxies.isEmpty()) {
+ throw new IllegalArgumentException("Could not find any configured " +
+ "addresses for SCM. Please configure the system with "
+ + OZONE_SCM_NAMES);
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized String getCurrentProxyOMNodeId() {
+ return currentProxySCMNodeId;
+ }
+
+ @Override
+ public synchronized ProxyInfo getProxy() {
+ ProxyInfo currentProxyInfo = scmProxies.get(currentProxySCMNodeId);
+ createSCMProxyIfNeeded(currentProxyInfo, currentProxySCMNodeId);
+ return currentProxyInfo;
+ }
+
+ @Override
+ public void performFailover(ScmBlockLocationProtocolPB newLeader) {
+ // Should do nothing here.
+ LOG.debug("Failing over to next proxy. {}", getCurrentProxyOMNodeId());
+ }
+
+ public void performFailoverToAssignedLeader(String newLeader) {
+ if (newLeader == null) {
+ // If newLeader is not assigned, it will fail over to next proxy.
+ nextProxyIndex();
+ } else {
+ if (!assignLeaderToNode(newLeader)) {
+ LOG.debug("Failing over OM proxy to nodeId: {}", newLeader);
+ nextProxyIndex();
+ }
+ }
+ }
+
+ @Override
+ public Class<ScmBlockLocationProtocolPB> getInterface() {
+ return ScmBlockLocationProtocolPB.class;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ for (ProxyInfo<ScmBlockLocationProtocolPB> proxy : scmProxies.values()) {
+ ScmBlockLocationProtocolPB scmProxy = proxy.proxy;
+ if (scmProxy != null) {
+ RPC.stopProxy(scmProxy);
+ }
+ }
+ }
+
+ public RetryAction getRetryAction(int failovers) {
+ if (failovers < maxRetryCount) {
+ return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+ getRetryInterval());
+ } else {
+ return RetryAction.FAIL;
+ }
+ }
+
+ private synchronized long getRetryInterval() {
+ // TODO add exponential backup
+ return retryInterval;
+ }
+
+ private synchronized int nextProxyIndex() {
+ lastAttemptedLeader = currentProxySCMNodeId;
+
+ // round robin the next proxy
+ currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size();
+ currentProxySCMNodeId = scmNodeIDList.get(currentProxyIndex);
+ return currentProxyIndex;
+ }
+
+ private synchronized boolean assignLeaderToNode(String newLeaderNodeId) {
+ if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
+ if (scmProxies.containsKey(newLeaderNodeId)) {
+ lastAttemptedLeader = currentProxySCMNodeId;
+ currentProxySCMNodeId = newLeaderNodeId;
+ currentProxyIndex = scmNodeIDList.indexOf(currentProxySCMNodeId);
+ return true;
+ }
+ } else {
+ lastAttemptedLeader = currentProxySCMNodeId;
+ }
+ return false;
+ }
+
+ /**
+ * Creates proxy object if it does not already exist.
+ */
+ private void createSCMProxyIfNeeded(ProxyInfo proxyInfo,
+ String nodeId) {
+ if (proxyInfo.proxy == null) {
+ InetSocketAddress address = scmProxyInfoMap.get(nodeId).getAddress();
+ try {
+ ScmBlockLocationProtocolPB proxy = createSCMProxy(address);
+ try {
+ proxyInfo.proxy = proxy;
+ } catch (IllegalAccessError iae) {
+ scmProxies.put(nodeId,
+ new ProxyInfo<>(proxy, proxyInfo.proxyInfo));
+ }
+ } catch (IOException ioe) {
+ LOG.error("{} Failed to create RPC proxy to SCM at {}",
+ this.getClass().getSimpleName(), address, ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+
+ private ScmBlockLocationProtocolPB createSCMProxy(
+ InetSocketAddress scmAddress) throws IOException {
+ Configuration hadoopConf =
+ LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
+ RPC.setProtocolEngine(hadoopConf, ScmBlockLocationProtocol.class,
+ ProtobufRpcEngine.class);
+ return RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
+ scmAddress, UserGroupInformation.getCurrentUser(), hadoopConf,
+ NetUtils.getDefaultSocketFactory(hadoopConf),
+ (int)conf.getObject(SCMClientConfig.class).getRpcTimeOut());
+ }
+
+ public RetryPolicy getSCMBlockLocationRetryPolicy(String newLeader) {
+ RetryPolicy retryPolicy = new RetryPolicy() {
+ @Override
+ public RetryAction shouldRetry(Exception e, int retry,
+ int failover, boolean b) {
+ performFailoverToAssignedLeader(newLeader);
+ return getRetryAction(failover);
+ }
+ };
+ return retryPolicy;
+ }
+}
+
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMClientConfig.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMClientConfig.java
new file mode 100644
index 0000000..99dc446
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMClientConfig.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigType;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.CLIENT;
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+
+/**
+ * Config for SCM Block Client.
+ */
+@ConfigGroup(prefix = "hdds.scmclient")
+public class SCMClientConfig {
+ public static final String SCM_CLIENT_RPC_TIME_OUT = "rpc.timeout";
+ public static final String SCM_CLIENT_FAILOVER_MAX_RETRY =
+ "failover.max.retry";
+ public static final String SCM_CLIENT_RETRY_INTERVAL =
+ "failover.retry.interval";
+
+ @Config(key = SCM_CLIENT_RPC_TIME_OUT,
+ defaultValue = "15m",
+ type = ConfigType.TIME,
+ tags = {OZONE, SCM, CLIENT},
+ timeUnit = TimeUnit.MILLISECONDS,
+ description = "RpcClient timeout on waiting for the response from " +
+ "SCM. The default value is set to 15 minutes. " +
+ "If ipc.client.ping is set to true and this rpc-timeout " +
+ "is greater than the value of ipc.ping.interval, the effective " +
+ "value of the rpc-timeout is rounded up to multiple of " +
+ "ipc.ping.interval."
+ )
+ private long rpcTimeOut = 15 * 60 * 1000;
+
+ @Config(key = SCM_CLIENT_FAILOVER_MAX_RETRY,
+ defaultValue = "15",
+ type = ConfigType.INT,
+ tags = {OZONE, SCM, CLIENT},
+ description = "Max retry count for SCM Client when failover happens."
+ )
+ private int retryCount = 15;
+
+ @Config(key = SCM_CLIENT_RETRY_INTERVAL,
+ defaultValue = "2s",
+ type = ConfigType.TIME,
+ tags = {OZONE, SCM, CLIENT},
+ timeUnit = TimeUnit.MILLISECONDS,
+ description = "SCM Client timeout on waiting for the next connection " +
+ "retry to other SCM IP. The default value is set to 2 minutes. "
+ )
+ private long retryInterval = 2 * 1000;
+
+ public long getRpcTimeOut() {
+ return rpcTimeOut;
+ }
+
+ public void setRpcTimeOut(long timeOut) {
+ // As at the end this value should not exceed MAX_VALUE, as underlying
+ // Rpc layer SocketTimeout parameter is int.
+ if (rpcTimeOut > Integer.MAX_VALUE) {
+ this.rpcTimeOut = Integer.MAX_VALUE;
+ }
+ this.rpcTimeOut = timeOut;
+ }
+
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public long getRetryInterval() {
+ return retryInterval;
+ }
+
+ public void setRetryInterval(long retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java
new file mode 100644
index 0000000..ec2a5b0
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Class to store SCM proxy info.
+ */
+public class SCMProxyInfo {
+ private String serviceId;
+ private String nodeId;
+ private String rpcAddrStr;
+ private InetSocketAddress rpcAddr;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMProxyInfo.class);
+
+ public SCMProxyInfo(String serviceID, String nodeID,
+ InetSocketAddress rpcAddress) {
+ Preconditions.checkNotNull(rpcAddress);
+ this.serviceId = serviceID;
+ this.nodeId = nodeID;
+ this.rpcAddrStr = rpcAddress.toString();
+ this.rpcAddr = rpcAddress;
+ if (rpcAddr.isUnresolved()) {
+ LOG.warn("SCM address {} for serviceID {} remains unresolved " +
+ "for node ID {} Check your ozone-site.xml file to ensure scm " +
+ "addresses are configured properly.",
+ rpcAddress, serviceId, nodeId);
+ }
+ }
+
+ public String toString() {
+ return new StringBuilder()
+ .append("nodeId=")
+ .append(nodeId)
+ .append(",nodeAddress=")
+ .append(rpcAddrStr).toString();
+ }
+
+ public InetSocketAddress getAddress() {
+ return rpcAddr;
+ }
+
+ public String getServiceId() {
+ return serviceId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/package-info.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/package-info.java
new file mode 100644
index 0000000..e3bb058
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.proxy;
+
+/**
+ * This package contains classes related to scm proxy.
+ */
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index fc7a598..06f9c31 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -70,6 +70,8 @@ message SCMBlockLocationResponse {
optional string leaderOMNodeId = 6;
+ optional string leaderSCMNodeId = 7;
+
optional AllocateScmBlockResponseProto allocateScmBlockResponse = 11;
optional DeleteScmKeyBlocksResponseProto deleteScmKeyBlocksResponse = 12;
optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13;
@@ -114,6 +116,7 @@ enum Status {
FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY = 26;
FAILED_TO_ALLOCATE_ENOUGH_BLOCKS = 27;
INTERNAL_ERROR = 29;
+ SCM_NOT_LEADER = 30;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index fb07351..eec0718 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -42,6 +42,7 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer;
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
@@ -94,9 +95,26 @@ public final class
ScmBlockLocationProtocolServerSideTranslatorPB
.setTraceID(traceID);
}
+ private boolean isLeader() throws ServiceException {
+ if (!(impl instanceof SCMBlockProtocolServer)) {
+ throw new ServiceException("Should be SCMBlockProtocolServer");
+ } else {
+ return ((SCMBlockProtocolServer) impl).getScm().checkLeader();
+ }
+ }
+
@Override
public SCMBlockLocationResponse send(RpcController controller,
SCMBlockLocationRequest request) throws ServiceException {
+ if (!isLeader()) {
+ SCMBlockLocationResponse.Builder response = createSCMBlockResponse(
+ request.getCmdType(),
+ request.getTraceID());
+ response.setSuccess(false);
+ response.setStatus(Status.SCM_NOT_LEADER);
+ response.setLeaderSCMNodeId(null);
+ return response.build();
+ }
return dispatcher.processRequest(
request,
this::processMessage,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 99f873f..e334b73 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -297,6 +297,10 @@ public class SCMBlockProtocolServer implements
}
}
+ public StorageContainerManager getScm() {
+ return scm;
+ }
+
@Override
public List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 768ca09..44e133a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -1027,6 +1027,25 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
return replicationManager;
}
+ /**
+ * Check if the current scm is the leader.
+ * @return - if the current scm is the leader.
+ */
+ public boolean checkLeader() {
+ return scmHAManager.isLeader();
+ }
+
+ /**
+ * Get suggested leader from Raft.
+ * @return - suggested leader address.
+ */
+ public String getSuggestedLeader() {
+ if (scmHAManager.getSuggestedLeader() == null) {
+ return null;
+ }
+ return scmHAManager.getSuggestedLeader().getAddress();
+ }
+
public void checkAdminAccess(String remoteUser) throws IOException {
if (remoteUser != null && !scmAdminUsernames.contains(remoteUser)) {
throw new IOException(
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 01340cd..3129dee 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -70,6 +70,7 @@ import
org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideT
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import
org.apache.hadoop.hdds.security.x509.certificate.client.OMCertificateClient;
@@ -186,7 +187,6 @@ import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
-import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
import static
org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
@@ -807,16 +807,9 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
OzoneConfiguration conf) throws IOException {
RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
ProtobufRpcEngine.class);
- long scmVersion =
- RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
- InetSocketAddress scmBlockAddress =
- getScmAddressForBlockClients(conf);
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
new ScmBlockLocationProtocolClientSideTranslatorPB(
- RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
- scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
- NetUtils.getDefaultSocketFactory(conf),
- Client.getRpcTimeout(conf)));
+ new SCMBlockLocationFailoverProxyProvider(conf));
return TracingUtil
.createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class,
conf);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]