Repository: hadoop
Updated Branches:
refs/heads/hdfs-7240 [created] 312de
http://git-wip-us.apache.org/repos/asf/hadoop/blob/312d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
--
diff --cc
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index da09b0e,29bcd79..c93a362
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@@ -2816,30 -2633,14 +2831,30 @@@ public class DataNode extends Reconfigu
}
/**
-* Convenience method, which unwraps RemoteException.
-* @throws IOException not a RemoteException.
-*/
- * Update replica with the new generation stamp and length.
++ * Convenience method, which unwraps RemoteException.
++ * @throws IOException not a RemoteException.
++ */
+ private static ReplicaRecoveryInfo callInitReplicaRecovery(
+ InterDatanodeProtocol datanode,
+ RecoveringBlock rBlock) throws IOException {
+try {
+ return datanode.initReplicaRecovery(rBlock);
- } catch(RemoteException re) {
++} catch (RemoteException re) {
+ throw re.unwrapRemoteException();
+}
+ }
+
+ /**
-* Update replica with the new generation stamp and length.
++ * Update replica with the new generation stamp and length.
*/
@Override // InterDatanodeProtocol
public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
-- final long recoveryId, final long newBlockId, final long newLength)
++ final long recoveryId, final long
newBlockId, final long newLength)
throws IOException {
-final String storageID = data.updateReplicaUnderRecovery(oldBlock,
-recoveryId, newBlockId, newLength);
+final FsDatasetSpi dataset =
+(FsDatasetSpi) getDataset(oldBlock.getBlockPoolId());
+final String storageID = dataset.updateReplicaUnderRecovery(
+oldBlock, recoveryId, newBlockId, newLength);
// Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the
// block locations until the next block report.
@@@ -2851,234 -2652,6 +2866,244 @@@
return storageID;
}
- /** A convenient class used in block recovery */
- static class BlockRecord {
++ /**
++ * A convenient class used in block recovery
++ */
++ static class BlockRecord {
+final DatanodeID id;
+final InterDatanodeProtocol datanode;
+final ReplicaRecoveryInfo rInfo;
-
+private String storageID;
+
+BlockRecord(DatanodeID id,
+InterDatanodeProtocol datanode,
+ReplicaRecoveryInfo rInfo) {
+ this.id = id;
+ this.datanode = datanode;
+ this.rInfo = rInfo;
+}
+
+void updateReplicaUnderRecovery(String bpid, long recoveryId,
+long newBlockId, long newLength)
+throws IOException {
+ final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
+ storageID = datanode.updateReplicaUnderRecovery(b, recoveryId,
newBlockId,
+ newLength);
+}
+
+@Override
+public String toString() {
+ return "block:" + rInfo + " node:" + id;
+}
+ }
+
- /** Recover a block */
++
++ /**
++ * Recover a block
++ */
+ private void recoverBlock(RecoveringBlock rBlock) throws IOException {
+ExtendedBlock block = rBlock.getBlock();
+String blookPoolId = block.getBlockPoolId();
+DatanodeID[] datanodeids = rBlock.getLocations();
+List syncList = new
ArrayList(datanodeids.length);
+int errorCount = 0;
+
+//check generation stamps
- for(DatanodeID id : datanodeids) {
++for (DatanodeID id : datanodeids) {
+ try {
+BPOfferService bpos = blockPoolManager.get(blookPoolId);
+DatanodeRegistration bpReg = bpos.bpRegistration;
- InterDatanodeProtocol datanode = bpReg.equals(id)?
- this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
- dnConf.socketTimeout, dnConf.connectToDnViaHostname);
++InterDatanodeProtocol datanode = bpReg.equals(id) ?
++this : DataNode.createInterDataNodeProtocolProxy(id, getConf(),
++dnConf.socketTimeout, dnConf.connectToDnViaHostname);
+ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
+if (info != null &&
+info.getGenerationStamp() >= block.getGenerationStamp() &&
+info.getNumBytes() > 0) {
+ syncList.add(new BlockRecord(id, datanode, info));
+}
+ } catch (RecoveryInProgressException ripE) {
+InterDatanodeProtocol.LOG.warn(
+"Recovery