This is an automated email from the ASF dual-hosted git repository. arp pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new cdc36fe HDDS-1649. On installSnapshot notification from OM leader, download checkpoint and reload OM state (#948) cdc36fe is described below commit cdc36fe286708b5ff12675599da8c7650744f064 Author: Hanisha Koneru <hanishakon...@apache.org> AuthorDate: Mon Jul 22 12:06:55 2019 -0700 HDDS-1649. On installSnapshot notification from OM leader, download checkpoint and reload OM state (#948) --- .../java/org/apache/hadoop/ozone/OzoneConsts.java | 1 + .../common/src/main/resources/ozone-default.xml | 8 + .../org/apache/hadoop/ozone/om/OMConfigKeys.java | 3 + .../hadoop/ozone/om/exceptions/OMException.java | 3 +- .../ozone/om/protocol/OzoneManagerHAProtocol.java | 3 +- .../src/main/proto/OzoneManagerProtocol.proto | 2 + .../org/apache/hadoop/ozone/MiniOzoneCluster.java | 6 + .../hadoop/ozone/MiniOzoneHAClusterImpl.java | 49 ++- .../hadoop/ozone/om/TestOMRatisSnapshots.java | 189 +++++++++++ .../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 7 +- .../hadoop/ozone/om/OMDBCheckpointServlet.java | 2 +- .../java/org/apache/hadoop/ozone/om/OMMetrics.java | 9 +- .../org/apache/hadoop/ozone/om/OzoneManager.java | 359 ++++++++++++++++----- .../ozone/om/ratis/OzoneManagerRatisServer.java | 15 +- .../ozone/om/ratis/OzoneManagerStateMachine.java | 81 ++++- .../om/snapshot/OzoneManagerSnapshotProvider.java | 2 +- 16 files changed, 637 insertions(+), 102 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index d28e477..67bd22d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -119,6 +119,7 @@ public final class OzoneConsts { public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX; public static final String DELETED_BLOCK_DB = "deletedBlock.db"; public static final String OM_DB_NAME = "om.db"; + public static final String OM_DB_BACKUP_PREFIX = "om.db.backup."; public static final String OM_DB_CHECKPOINTS_DIR_NAME = "om.db.checkpoints"; public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db"; public static final String SCM_DB_NAME = "scm.db"; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 30cf386..b2f820b 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1630,6 +1630,14 @@ <description>Byte limit for Raft's Log Worker queue. </description> </property> + <property> + <name>ozone.om.ratis.log.purge.gap</name> + <value>1000000</value> + <tag>OZONE, OM, RATIS</tag> + <description>The minimum gap between log indices for Raft server to purge + its log segments after taking snapshot. + </description> + </property> <property> <name>ozone.om.ratis.snapshot.auto.trigger.threshold</name> diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 14b6783..35431fa 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -123,6 +123,9 @@ public final class OMConfigKeys { "ozone.om.ratis.log.appender.queue.byte-limit"; public static final String OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB"; + public static final String OZONE_OM_RATIS_LOG_PURGE_GAP = + "ozone.om.ratis.log.purge.gap"; + public static final int OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT = 1000000; // OM Snapshot configurations public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java index 66ce1cc..78bdb21 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java @@ -203,7 +203,8 @@ public class OMException extends IOException { PREFIX_NOT_FOUND, - S3_BUCKET_INVALID_LENGTH + S3_BUCKET_INVALID_LENGTH, + RATIS_ERROR // Error in Ratis server } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java index 1434dca..675c814 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java @@ -29,9 +29,10 @@ public interface OzoneManagerHAProtocol { /** * Store the snapshot index i.e. the raft log index, corresponding to the * last transaction applied to the OM RocksDB, in OM metadata dir on disk. + * @param flush flush the OM DB to disk if true * @return the snapshot index * @throws IOException */ - long saveRatisSnapshot() throws IOException; + long saveRatisSnapshot(boolean flush) throws IOException; } diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 7007d98..5dd2b55 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -278,6 +278,8 @@ enum Status { PREFIX_NOT_FOUND=50; S3_BUCKET_INVALID_LENGTH = 51; // s3 bucket invalid length. + + RATIS_ERROR = 52; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 4927da1..1139a65 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -241,6 +241,7 @@ public interface MiniOzoneCluster { protected String clusterId; protected String omServiceId; protected int numOfOMs; + protected int numOfActiveOMs; protected Optional<Boolean> enableTrace = Optional.of(false); protected Optional<Integer> hbInterval = Optional.empty(); @@ -440,6 +441,11 @@ public interface MiniOzoneCluster { return this; } + public Builder setNumOfActiveOMs(int numActiveOMs) { + this.numOfActiveOMs = numActiveOMs; + return this; + } + public Builder setStreamBufferSizeUnit(StorageUnit unit) { this.streamBufferSizeUnit = Optional.of(unit); return this; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java index 4d00710..1d9a99e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java @@ -53,6 +53,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { private Map<String, OzoneManager> ozoneManagerMap; private List<OzoneManager> ozoneManagers; + // Active OMs denote OMs which are up and running + private List<OzoneManager> activeOMs; + private List<OzoneManager> inactiveOMs; + private static final Random RANDOM = new Random(); private static final int RATIS_LEADER_ELECTION_TIMEOUT = 1000; // 1 seconds @@ -67,11 +71,15 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { private MiniOzoneHAClusterImpl( OzoneConfiguration conf, Map<String, OzoneManager> omMap, + List<OzoneManager> activeOMList, + List<OzoneManager> inactiveOMList, StorageContainerManager scm, List<HddsDatanodeService> hddsDatanodes) { super(conf, scm, hddsDatanodes); this.ozoneManagerMap = omMap; this.ozoneManagers = new ArrayList<>(omMap.values()); + this.activeOMs = activeOMList; + this.inactiveOMs = inactiveOMList; } /** @@ -83,6 +91,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { return this.ozoneManagers.get(0); } + public boolean isOMActive(String omNodeId) { + return activeOMs.contains(ozoneManagerMap.get(omNodeId)); + } + public OzoneManager getOzoneManager(int index) { return this.ozoneManagers.get(index); } @@ -91,6 +103,20 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { return this.ozoneManagerMap.get(omNodeId); } + /** + * Start a previously inactive OM. + */ + public void startInactiveOM(String omNodeID) throws IOException { + OzoneManager ozoneManager = ozoneManagerMap.get(omNodeID); + if (!inactiveOMs.contains(ozoneManager)) { + throw new IOException("OM is already active."); + } else { + ozoneManager.start(); + activeOMs.add(ozoneManager); + inactiveOMs.remove(ozoneManager); + } + } + @Override public void restartOzoneManager() throws IOException { for (OzoneManager ozoneManager : ozoneManagers) { @@ -125,6 +151,8 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { public static class Builder extends MiniOzoneClusterImpl.Builder { private final String nodeIdBaseStr = "omNode-"; + private List<OzoneManager> activeOMs = new ArrayList<>(); + private List<OzoneManager> inactiveOMs = new ArrayList<>(); /** * Creates a new Builder. @@ -137,6 +165,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { @Override public MiniOzoneCluster build() throws IOException { + if (numOfActiveOMs > numOfOMs) { + throw new IllegalArgumentException("Number of active OMs cannot be " + + "more than the total number of OMs"); + } DefaultMetricsSystem.setMiniClusterMode(true); initializeConfiguration(); StorageContainerManager scm; @@ -150,8 +182,8 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { } final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm); - MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap, - scm, hddsDatanodes); + MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl( + conf, omMap, activeOMs, inactiveOMs, scm, hddsDatanodes); if (startDataNodes) { cluster.startHddsDatanodes(); } @@ -215,9 +247,16 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { om.setCertClient(certClient); omMap.put(nodeId, om); - om.start(); - LOG.info("Started OzoneManager RPC server at " + - om.getOmRpcServerAddr()); + if (i <= numOfActiveOMs) { + om.start(); + activeOMs.add(om); + LOG.info("Started OzoneManager RPC server at " + + om.getOmRpcServerAddr()); + } else { + inactiveOMs.add(om); + LOG.info("Intialized OzoneManager at " + om.getOmRpcServerAddr() + + ". This OM is currently inactive (not running)."); + } } // Set default OM address to point to the first OM. Clients would diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java new file mode 100644 index 0000000..6ac28c3 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.VolumeArgs; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.utils.db.DBCheckpoint; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.apache.hadoop.ozone.om.TestOzoneManagerHA.createKey; + +/** + * Tests the Ratis snaphsots feature in OM. + */ +public class TestOMRatisSnapshots { + + private MiniOzoneHAClusterImpl cluster = null; + private ObjectStore objectStore; + private OzoneConfiguration conf; + private String clusterId; + private String scmId; + private int numOfOMs = 3; + private static final long SNAPSHOT_THRESHOLD = 50; + private static final int LOG_PURGE_GAP = 50; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Rule + public Timeout timeout = new Timeout(500_000); + + /** + * Create a MiniOzoneCluster for testing. The cluster initially has one + * inactive OM. So at the start of the cluster, there will be 2 active and 1 + * inactive OM. + * + * @throws IOException + */ + @Before + public void init() throws Exception { + conf = new OzoneConfiguration(); + clusterId = UUID.randomUUID().toString(); + scmId = UUID.randomUUID().toString(); + conf.setLong( + OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, + SNAPSHOT_THRESHOLD); + conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP); + cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf) + .setClusterId(clusterId) + .setScmId(scmId) + .setOMServiceId("om-service-test1") + .setNumOfOzoneManagers(numOfOMs) + .setNumOfActiveOMs(2) + .build(); + cluster.waitForClusterToBeReady(); + objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @After + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testInstallSnapshot() throws Exception { + // Get the leader OM + String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider() + .getCurrentProxyOMNodeId(); + OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId); + OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer(); + + // Find the inactive OM + String followerNodeId = leaderOM.getPeerNodes().get(0).getOMNodeId(); + if (cluster.isOMActive(followerNodeId)) { + followerNodeId = leaderOM.getPeerNodes().get(1).getOMNodeId(); + } + OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); + + // Do some transactions so that the log index increases + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + + VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() + .setOwner(userName) + .setAdmin(adminName) + .build(); + + objectStore.createVolume(volumeName, createVolumeArgs); + OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); + + retVolumeinfo.createBucket(bucketName); + OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName); + + long leaderOMappliedLogIndex = + leaderRatisServer.getStateMachineLastAppliedIndex(); + leaderOM.getOmRatisServer().getStateMachineLastAppliedIndex(); + + List<String> keys = new ArrayList<>(); + while (leaderOMappliedLogIndex < 2000) { + keys.add(createKey(ozoneBucket)); + leaderOMappliedLogIndex = + leaderRatisServer.getStateMachineLastAppliedIndex(); + } + + // Get the latest db checkpoint from the leader OM. + long leaderOMSnaphsotIndex = leaderOM.saveRatisSnapshot(true); + DBCheckpoint leaderDbCheckpoint = + leaderOM.getMetadataManager().getStore().getCheckpoint(false); + + // Start the inactive OM + cluster.startInactiveOM(followerNodeId); + + // The recently started OM should be lagging behind the leader OM. + long followerOMLastAppliedIndex = + followerOM.getOmRatisServer().getStateMachineLastAppliedIndex(); + Assert.assertTrue( + followerOMLastAppliedIndex < leaderOMSnaphsotIndex); + + // Install leader OM's db checkpoint on the lagging OM. + followerOM.getOmRatisServer().getOmStateMachine().pause(); + followerOM.getMetadataManager().getStore().close(); + followerOM.replaceOMDBWithCheckpoint( + leaderOMSnaphsotIndex, leaderDbCheckpoint.getCheckpointLocation()); + + // Reload the follower OM with new DB checkpoint from the leader OM. + followerOM.reloadOMState(leaderOMSnaphsotIndex); + followerOM.getOmRatisServer().getOmStateMachine().unpause( + leaderOMSnaphsotIndex); + + // After the new checkpoint is loaded and state machine is unpaused, the + // follower OM lastAppliedIndex must match the snapshot index of the + // checkpoint. + followerOMLastAppliedIndex = followerOM.getOmRatisServer() + .getStateMachineLastAppliedIndex(); + Assert.assertEquals(leaderOMSnaphsotIndex, followerOMLastAppliedIndex); + + // Verify that the follower OM's DB contains the transactions which were + // made while it was inactive. + OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager(); + Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get( + followerOMMetaMngr.getVolumeKey(volumeName))); + Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get( + followerOMMetaMngr.getBucketKey(volumeName, bucketName))); + for (String key : keys) { + Assert.assertNotNull(followerOMMetaMngr.getKeyTable().get( + followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key))); + } + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index 05c53b3..92fc263 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -829,7 +829,11 @@ public class TestOzoneManagerHA { } - private void createKey(OzoneBucket ozoneBucket) throws IOException { + /** + * Create a key in the bucket. + * @return the key name. + */ + static String createKey(OzoneBucket ozoneBucket) throws IOException { String keyName = "key" + RandomStringUtils.randomNumeric(5); String data = "data" + RandomStringUtils.randomNumeric(5); OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName, @@ -837,5 +841,6 @@ public class TestOzoneManagerHA { ReplicationFactor.ONE, new HashMap<>()); ozoneOutputStream.write(data.getBytes(), 0, data.length()); ozoneOutputStream.close(); + return keyName; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java index d54e121..b36a128 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java @@ -126,7 +126,7 @@ public class OMDBCheckpointServlet extends HttpServlet { // ratis snapshot first. This step also included flushing the OM DB. // Hence, we can set flush to false. flush = false; - ratisSnapshotIndex = om.saveRatisSnapshot(); + ratisSnapshotIndex = om.saveRatisSnapshot(true); } else { ratisSnapshotIndex = om.loadRatisSnapshotIndex(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index 46fdabd..dbadf68 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -196,15 +196,18 @@ public class OMMetrics { } public void setNumVolumes(long val) { - this.numVolumes.incr(val); + long oldVal = this.numVolumes.value(); + this.numVolumes.incr(val - oldVal); } public void setNumBuckets(long val) { - this.numBuckets.incr(val); + long oldVal = this.numBuckets.value(); + this.numBuckets.incr(val - oldVal); } public void setNumKeys(long val) { - this.numKeys.incr(val); + long oldVal = this.numKeys.value(); + this.numKeys.incr(val- oldVal); } public long getNumVolumes() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 4312516..0267350 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.BlockingService; import java.net.InetAddress; +import java.nio.file.Path; import java.security.PrivateKey; import java.security.PublicKey; import java.security.KeyPair; @@ -143,6 +144,10 @@ import org.apache.hadoop.util.KMSUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.utils.RetriableTask; +import org.apache.hadoop.utils.db.DBCheckpoint; +import org.apache.hadoop.utils.db.DBStore; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.LifeCycle; import org.bouncycastle.pkcs.PKCS10CertificationRequest; import org.slf4j.Logger; @@ -236,18 +241,20 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private RPC.Server omRpcServer; private InetSocketAddress omRpcAddress; private String omId; - private final OMMetadataManager metadataManager; - private final VolumeManager volumeManager; - private final BucketManager bucketManager; - private final KeyManager keyManager; - private final PrefixManagerImpl prefixManager; + + private OMMetadataManager metadataManager; + private VolumeManager volumeManager; + private BucketManager bucketManager; + private KeyManager keyManager; + private PrefixManagerImpl prefixManager; + private S3BucketManager s3BucketManager; + private final OMMetrics metrics; private OzoneManagerHttpServer httpServer; private final OMStorage omStorage; private final ScmBlockLocationProtocol scmBlockClient; private final StorageContainerLocationProtocol scmContainerClient; private ObjectName omInfoBeanName; - private final S3BucketManager s3BucketManager; private Timer metricsTimer; private ScheduleOMMetricsWriteTask scheduleOMMetricsWriteTask; private static final ObjectWriter WRITER = @@ -258,7 +265,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private final Runnable shutdownHook; private final File omMetaDir; private final boolean isAclEnabled; - private final IAccessAuthorizer accessAuthorizer; + private IAccessAuthorizer accessAuthorizer; private JvmPauseMonitor jvmPauseMonitor; private final SecurityConfig secConfig; private S3SecretManager s3SecretManager; @@ -308,12 +315,37 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl throw new OMException("OM not initialized.", ResultCodes.OM_NOT_INITIALIZED); } + + // Read configuration and set values. + ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS); + omMetaDir = OmUtils.getOmDbDir(configuration); + this.isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED, + OZONE_ACL_ENABLED_DEFAULT); + this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, + OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES); + this.preallocateBlocksMax = conf.getInt( + OZONE_KEY_PREALLOCATION_BLOCKS_MAX, + OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT); + this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED, + HDDS_BLOCK_TOKEN_ENABLED_DEFAULT); + this.useRatisForReplication = conf.getBoolean( + DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT); + // TODO: This is a temporary check. Once fully implemented, all OM state + // change should go through Ratis - be it standalone (for non-HA) or + // replicated (for HA). + isRatisEnabled = configuration.getBoolean( + OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, + OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); + // Load HA related configurations loadOMHAConfigs(configuration); + InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress(); + omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString()); scmContainerClient = getScmContainerClient(configuration); // verifies that the SCM info in the OM Version file is correct. scmBlockClient = getScmBlockClient(configuration); + this.scmClient = new ScmClient(scmBlockClient, scmContainerClient); // For testing purpose only, not hit scm from om as Hadoop UGI can't login // two principals in the same JVM. @@ -329,16 +361,32 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class, ProtobufRpcEngine.class); - metadataManager = new OmMetadataManagerImpl(configuration); + secConfig = new SecurityConfig(configuration); + // Create the KMS Key Provider + try { + kmsProvider = createKeyProviderExt(configuration); + } catch (IOException ioe) { + kmsProvider = null; + LOG.error("Fail to create Key Provider"); + } + if (secConfig.isSecurityEnabled()) { + omComponent = OM_DAEMON + "-" + omId; + if(omStorage.getOmCertSerialId() == null) { + throw new RuntimeException("OzoneManager started in secure mode but " + + "doesn't have SCM signed certificate."); + } + certClient = new OMCertificateClient(new SecurityConfig(conf), + omStorage.getOmCertSerialId()); + } + if (secConfig.isBlockTokenEnabled()) { + blockTokenMgr = createBlockTokenSecretManager(configuration); + } + + instantiateServices(); + + initializeRatisServer(); + initializeRatisClient(); - // This is a temporary check. Once fully implemented, all OM state change - // should go through Ratis - be it standalone (for non-HA) or replicated - // (for HA). - isRatisEnabled = configuration.getBoolean( - OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, - OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); - startRatisServer(); - startRatisClient(); if (isRatisEnabled) { // Create Ratis storage dir String omRatisDirectory = OmUtils.getOMRatisDirectory(configuration); @@ -361,59 +409,44 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl OM_RATIS_SNAPSHOT_INDEX); this.snapshotIndex = loadRatisSnapshotIndex(); - InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress(); - omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString()); - secConfig = new SecurityConfig(configuration); - volumeManager = new VolumeManagerImpl(metadataManager, configuration); + metrics = OMMetrics.create(); - // Create the KMS Key Provider - try { - kmsProvider = createKeyProviderExt(configuration); - } catch (IOException ioe) { - kmsProvider = null; - LOG.error("Fail to create Key Provider"); - } + // Start Om Rpc Server. + omRpcServer = getRpcServer(conf); + omRpcAddress = updateRPCListenAddress(configuration, + OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer); + + shutdownHook = () -> { + saveOmMetrics(); + }; + ShutdownHookManager.get().addShutdownHook(shutdownHook, + SHUTDOWN_HOOK_PRIORITY); + } + + /** + * Instantiate services which are dependent on the OM DB state. + * When OM state is reloaded, these services are re-initialized with the + * new OM state. + */ + private void instantiateServices() throws IOException { + metadataManager = new OmMetadataManagerImpl(configuration); + volumeManager = new VolumeManagerImpl(metadataManager, configuration); bucketManager = new BucketManagerImpl(metadataManager, getKmsProvider(), isRatisEnabled); - metrics = OMMetrics.create(); - s3BucketManager = new S3BucketManagerImpl(configuration, metadataManager, volumeManager, bucketManager); if (secConfig.isSecurityEnabled()) { - omComponent = OM_DAEMON + "-" + omId; - if(omStorage.getOmCertSerialId() == null) { - throw new RuntimeException("OzoneManager started in secure mode but " + - "doesn't have SCM signed certificate."); - } - certClient = new OMCertificateClient(new SecurityConfig(conf), - omStorage.getOmCertSerialId()); s3SecretManager = new S3SecretManagerImpl(configuration, metadataManager); delegationTokenMgr = createDelegationTokenSecretManager(configuration); } - if (secConfig.isBlockTokenEnabled()) { - blockTokenMgr = createBlockTokenSecretManager(configuration); - } - - omRpcServer = getRpcServer(conf); - omRpcAddress = updateRPCListenAddress(configuration, - OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer); - - this.scmClient = new ScmClient(scmBlockClient, scmContainerClient); prefixManager = new PrefixManagerImpl(metadataManager); keyManager = new KeyManagerImpl(this, scmClient, configuration, omStorage.getOmId()); - shutdownHook = () -> { - saveOmMetrics(); - }; - ShutdownHookManager.get().addShutdownHook(shutdownHook, - SHUTDOWN_HOOK_PRIORITY); - isAclEnabled = conf.getBoolean(OZONE_ACL_ENABLED, - OZONE_ACL_ENABLED_DEFAULT); if (isAclEnabled) { - accessAuthorizer = getACLAuthorizerInstance(conf); + accessAuthorizer = getACLAuthorizerInstance(configuration); if (accessAuthorizer instanceof OzoneNativeAuthorizer) { OzoneNativeAuthorizer authorizer = (OzoneNativeAuthorizer) accessAuthorizer; @@ -425,17 +458,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl } else { accessAuthorizer = null; } - ozAdmins = conf.getTrimmedStringCollection(OZONE_ADMINISTRATORS); - omMetaDir = OmUtils.getOmDbDir(configuration); - this.scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, - OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES); - this.preallocateBlocksMax = conf.getInt( - OZONE_KEY_PREALLOCATION_BLOCKS_MAX, - OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT); - this.grpcBlockTokenEnabled = conf.getBoolean(HDDS_BLOCK_TOKEN_ENABLED, - HDDS_BLOCK_TOKEN_ENABLED_DEFAULT); - this.useRatisForReplication = conf.getBoolean( - DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT); } /** @@ -1235,6 +1257,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl DefaultMetricsSystem.initialize("OzoneManager"); + // Start Ratis services + if (omRatisServer != null) { + omRatisServer.start(); + } + if (omRatisClient != null) { + omRatisClient.connect(); + } + metadataManager.start(configuration); startSecretManagerIfNecessary(); @@ -1305,8 +1335,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl omRpcServer.start(); isOmRpcServerRunning = true; - startRatisServer(); - startRatisClient(); + initializeRatisServer(); + if (omRatisServer != null) { + omRatisServer.start(); + } + initializeRatisClient(); + if (omRatisClient != null) { + omRatisClient.connect(); + } try { httpServer = new OzoneManagerHttpServer(configuration, this); @@ -1353,15 +1389,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl /** * Creates an instance of ratis server. */ - private void startRatisServer() throws IOException { + private void initializeRatisServer() throws IOException { if (isRatisEnabled) { if (omRatisServer == null) { omRatisServer = OzoneManagerRatisServer.newOMRatisServer( configuration, this, omNodeDetails, peerNodes); } - omRatisServer.start(); - - LOG.info("OzoneManager Ratis server started at port {}", + LOG.info("OzoneManager Ratis server initialized at port {}", omRatisServer.getServerPort()); } else { omRatisServer = null; @@ -1371,14 +1405,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl /** * Creates an instance of ratis client. */ - private void startRatisClient() throws IOException { + private void initializeRatisClient() throws IOException { if (isRatisEnabled) { if (omRatisClient == null) { omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient( omNodeDetails.getOMNodeId(), omRatisServer.getRaftGroup(), configuration); } - omRatisClient.connect(); } else { omRatisClient = null; } @@ -1398,11 +1431,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl } @Override - public long saveRatisSnapshot() throws IOException { + public long saveRatisSnapshot(boolean flush) throws IOException { snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex(); - // Flush the OM state to disk - getMetadataManager().getStore().flush(); + if (flush) { + // Flush the OM state to disk + metadataManager.getStore().flush(); + } PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex); LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}", @@ -2697,7 +2732,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl } } } - @Override public OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException { @@ -3069,6 +3103,179 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl } } + /** + * Download and install latest checkpoint from leader OM. + * If the download checkpoints snapshot index is greater than this OM's + * last applied transaction index, then re-initialize the OM state via this + * checkpoint. Before re-initializing OM state, the OM Ratis server should + * be stopped so that no new transactions can be applied. + * @param leaderId peerNodeID of the leader OM + * @return If checkpoint is installed, return the corresponding termIndex. + * Otherwise, return null. + */ + public TermIndex installSnapshot(String leaderId) { + if (omSnapshotProvider == null) { + LOG.error("OM Snapshot Provider is not configured as there are no peer " + + "nodes."); + return null; + } + + DBCheckpoint omDBcheckpoint = getDBCheckpointFromLeader(leaderId); + Path newDBlocation = omDBcheckpoint.getCheckpointLocation(); + + // Check if current ratis log index is smaller than the downloaded + // snapshot index. If yes, proceed by stopping the ratis server so that + // the OM state can be re-initialized. If no, then do not proceed with + // installSnapshot. + long lastAppliedIndex = omRatisServer.getStateMachineLastAppliedIndex(); + long checkpointSnapshotIndex = omDBcheckpoint.getRatisSnapshotIndex(); + if (checkpointSnapshotIndex <= lastAppliedIndex) { + LOG.error("Failed to install checkpoint from OM leader: {}. The last " + + "applied index: {} is greater than or equal to the checkpoint's " + + "snapshot index: {}. Deleting the downloaded checkpoint {}", leaderId, + lastAppliedIndex, checkpointSnapshotIndex, + newDBlocation); + try { + FileUtils.deleteFully(newDBlocation); + } catch (IOException e) { + LOG.error("Failed to fully delete the downloaded DB checkpoint {} " + + "from OM leader {}.", newDBlocation, + leaderId, e); + } + return null; + } + + // Pause the State Machine so that no new transactions can be applied. + // This action also clears the OM Double Buffer so that if there are any + // pending transactions in the buffer, they are discarded. + // TODO: The Ratis server should also be paused here. This is required + // because a leader election might happen while the snapshot + // installation is in progress and the new leader might start sending + // append log entries to the ratis server. + omRatisServer.getOmStateMachine().pause(); + + File dbBackup; + try { + dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, newDBlocation); + } catch (Exception e) { + LOG.error("OM DB checkpoint replacement with new downloaded checkpoint " + + "failed.", e); + return null; + } + + // Reload the OM DB store with the new checkpoint. + // Restart (unpause) the state machine and update its last applied index + // to the installed checkpoint's snapshot index. + try { + reloadOMState(checkpointSnapshotIndex); + omRatisServer.getOmStateMachine().unpause(checkpointSnapshotIndex); + } catch (IOException e) { + LOG.error("Failed to reload OM state with new DB checkpoint.", e); + return null; + } + + // Delete the backup DB + try { + FileUtils.deleteFully(dbBackup); + } catch (IOException e) { + LOG.error("Failed to delete the backup of the original DB {}", dbBackup); + } + + // TODO: We should only return the snpashotIndex to the leader. + // Should be fixed after RATIS-586 + TermIndex newTermIndex = TermIndex.newTermIndex(0, + checkpointSnapshotIndex); + + return newTermIndex; + } + + /** + * Download the latest OM DB checkpoint from the leader OM. + * @param leaderId OMNodeID of the leader OM node. + * @return latest DB checkpoint from leader OM. + */ + private DBCheckpoint getDBCheckpointFromLeader(String leaderId) { + LOG.info("Downloading checkpoint from leader OM {} and reloading state " + + "from the checkpoint.", leaderId); + + try { + return omSnapshotProvider.getOzoneManagerDBSnapshot(leaderId); + } catch (IOException e) { + LOG.error("Failed to download checkpoint from OM leader {}", leaderId, e); + } + return null; + } + + /** + * Replace the current OM DB with the new DB checkpoint. + * @param lastAppliedIndex the last applied index in the current OM DB. + * @param checkpointPath path to the new DB checkpoint + * @return location of the backup of the original DB + * @throws Exception + */ + File replaceOMDBWithCheckpoint(long lastAppliedIndex, Path checkpointPath) + throws Exception { + // Stop the DB first + DBStore store = metadataManager.getStore(); + store.close(); + + // Take a backup of the current DB + File db = store.getDbLocation(); + String dbBackupName = OzoneConsts.OM_DB_BACKUP_PREFIX + + lastAppliedIndex + "_" + System.currentTimeMillis(); + File dbBackup = new File(db.getParentFile(), dbBackupName); + + try { + Files.move(db.toPath(), dbBackup.toPath()); + } catch (IOException e) { + LOG.error("Failed to create a backup of the current DB. Aborting " + + "snapshot installation."); + throw e; + } + + // Move the new DB checkpoint into the om metadata dir + try { + Files.move(checkpointPath, db.toPath()); + } catch (IOException e) { + LOG.error("Failed to move downloaded DB checkpoint {} to metadata " + + "directory {}. Resetting to original DB.", checkpointPath, + db.toPath()); + Files.move(dbBackup.toPath(), db.toPath()); + throw e; + } + return dbBackup; + } + + /** + * Re-instantiate MetadataManager with new DB checkpoint. + * All the classes which use/ store MetadataManager should also be updated + * with the new MetadataManager instance. + */ + void reloadOMState(long newSnapshotIndex) throws IOException { + + instantiateServices(); + + // Restart required services + metadataManager.start(configuration); + keyManager.start(configuration); + + // Set metrics and start metrics back ground thread + metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager + .getVolumeTable())); + metrics.setNumBuckets(metadataManager.countRowsInTable(metadataManager + .getBucketTable())); + + // Delete the omMetrics file if it exists and save the a new metrics file + // with new data + Files.deleteIfExists(getMetricsStorageFile().toPath()); + saveOmMetrics(); + + // Update OM snapshot index with the new snapshot index (from the new OM + // DB state) and save the snapshot index to disk + this.snapshotIndex = newSnapshotIndex; + saveRatisSnapshot(false); + } + public static Logger getLogger() { return LOG; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index 49a84da..1e51273 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -308,10 +308,15 @@ public final class OzoneManagerRatisServer { } /** - * Returns OzoneManager StateMachine. + * Initializes and returns OzoneManager StateMachine. */ private OzoneManagerStateMachine getStateMachine() { - return new OzoneManagerStateMachine(this); + return new OzoneManagerStateMachine(this); + } + + @VisibleForTesting + public OzoneManagerStateMachine getOmStateMachine() { + return omStateMachine; } public OzoneManager getOzoneManager() { @@ -387,6 +392,12 @@ public final class OzoneManagerRatisServer { SizeInBytes.valueOf(logAppenderQueueByteLimit)); RaftServerConfigKeys.Log.setPreallocatedSize(properties, SizeInBytes.valueOf(raftSegmentPreallocatedSize)); + RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties, + false); + final int logPurgeGap = conf.getInt( + OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, + OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP_DEFAULT); + RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap); // For grpc set the maximum message size // TODO: calculate the optimal max message size diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 31c467d..c51323e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.ozone.container.common.transport.server.ratis .ContainerStateMachine; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .OMRequest; @@ -43,12 +44,15 @@ import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.LifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,8 +72,9 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private OzoneManagerHARequestHandler handler; private RaftGroupId raftGroupId; private long lastAppliedIndex = 0; - private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer; + private OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer; private final ExecutorService executorService; + private final ExecutorService installSnapshotExecutor; public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) { this.omRatisServer = ratisServer; @@ -82,19 +87,20 @@ public class OzoneManagerStateMachine extends BaseStateMachine { ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build(); this.executorService = HadoopExecutors.newSingleThreadExecutor(build); + this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor(); } /** * Initializes the State Machine with the given server, group and storage. - * TODO: Load the latest snapshot from the file system. */ @Override - public void initialize( - RaftServer server, RaftGroupId id, RaftStorage raftStorage) - throws IOException { - super.initialize(server, id, raftStorage); - this.raftGroupId = id; - storage.init(raftStorage); + public void initialize(RaftServer server, RaftGroupId id, + RaftStorage raftStorage) throws IOException { + lifeCycle.startAndTransition(() -> { + super.initialize(server, id, raftStorage); + this.raftGroupId = id; + storage.init(raftStorage); + }); } /** @@ -185,6 +191,27 @@ public class OzoneManagerStateMachine extends BaseStateMachine { } } + @Override + public void pause() { + lifeCycle.transition(LifeCycle.State.PAUSING); + lifeCycle.transition(LifeCycle.State.PAUSED); + ozoneManagerDoubleBuffer.stop(); + } + + /** + * Unpause the StateMachine, re-initialize the DoubleBuffer and update the + * lastAppliedIndex. This should be done after uploading new state to the + * StateMachine. + */ + public void unpause(long newLastAppliedSnaphsotIndex) { + lifeCycle.startAndTransition(() -> { + this.ozoneManagerDoubleBuffer = + new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(), + this::updateLastAppliedIndex); + this.updateLastAppliedIndex(newLastAppliedSnaphsotIndex); + }); + } + /** * Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index * is the log index corresponding to the last applied transaction on the OM @@ -197,12 +224,45 @@ public class OzoneManagerStateMachine extends BaseStateMachine { public long takeSnapshot() throws IOException { LOG.info("Saving Ratis snapshot on the OM."); if (ozoneManager != null) { - return ozoneManager.saveRatisSnapshot(); + return ozoneManager.saveRatisSnapshot(true); } return 0; } /** + * Leader OM has purged entries from its log. To catch up, OM must download + * the latest checkpoint from the leader OM and install it. + * @param roleInfoProto the leader node information + * @param firstTermIndexInLog TermIndex of the first append entry available + * in the Leader's log. + * @return the last term index included in the installed snapshot. + */ + @Override + public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader( + RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) { + + String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getSelf().getId()) + .toString(); + + LOG.info("Received install snapshot notificaiton form OM leader: {} with " + + "term index: {}", leaderNodeId, firstTermIndexInLog); + + if (!roleInfoProto.getRole().equals(RaftProtos.RaftPeerRole.LEADER)) { + // A non-leader Ratis server should not send this notification. + LOG.error("Received Install Snapshot notification from non-leader OM " + + "node: {}. Ignoring the notification.", leaderNodeId); + return completeExceptionally(new OMException("Received notification to " + + "install snaphost from non-leader OM node", + OMException.ResultCodes.RATIS_ERROR)); + } + + CompletableFuture<TermIndex> future = CompletableFuture.supplyAsync( + () -> ozoneManager.installSnapshot(leaderNodeId), + installSnapshotExecutor); + return future; + } + + /** * Notifies the state machine that the raft peer is no longer leader. */ @Override @@ -276,10 +336,9 @@ public class OzoneManagerStateMachine extends BaseStateMachine { this.raftGroupId = raftGroupId; } - public void stop() { ozoneManagerDoubleBuffer.stop(); HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS); + HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS); } - } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java index e1d4889..87446db 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java @@ -149,7 +149,7 @@ public class OzoneManagerSnapshotProvider { * @param leaderOMNodeID leader OM Node ID. * @return the DB checkpoint (including the ratis snapshot index) */ - protected DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID) + public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID) throws IOException { String snapshotFileName = OM_SNAPSHOT_DB + "_" + System.currentTimeMillis(); File targetFile = new File(omSnapshotDir, snapshotFileName + ".tar.gz"); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org