http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java new file mode 100644 index 0000000..5dfb356 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafemodeService.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.util.Time.now; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically check if the {@link org.apache.hadoop.hdfs.server. + * federation.store.StateStoreService StateStoreService} cached information in + * the {@link Router} is up to date. This is for performance and removes the + * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService + * StateStoreService} from the critical path in common operations. + */ +public class RouterSafemodeService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterSafemodeService.class); + + /** Router to manage safe mode. */ + private final Router router; + + /** Interval in ms to wait post startup before allowing RPC requests. */ + private long startupInterval; + /** Interval in ms after which the State Store cache is too stale. */ + private long staleInterval; + /** Start time in ms of this service. */ + private long startupTime; + + /** The time the Router enters safe mode in milliseconds. */ + private long enterSafeModeTime = now(); + + + /** + * Create a new Cache update service. + * + * @param router Router containing the cache. + */ + public RouterSafemodeService(Router router) { + super(RouterSafemodeService.class.getSimpleName()); + this.router = router; + } + + /** + * Enter safe mode. + */ + private void enter() { + LOG.info("Entering safe mode"); + enterSafeModeTime = now(); + RouterRpcServer rpcServer = router.getRpcServer(); + rpcServer.setSafeMode(true); + router.updateRouterState(RouterServiceState.SAFEMODE); + } + + /** + * Leave safe mode. + */ + private void leave() { + // Cache recently updated, leave safemode + long timeInSafemode = now() - enterSafeModeTime; + LOG.info("Leaving safe mode after {} milliseconds", timeInSafemode); + RouterMetrics routerMetrics = router.getRouterMetrics(); + if (routerMetrics == null) { + LOG.error("The Router metrics are not enabled"); + } else { + routerMetrics.setSafeModeTime(timeInSafemode); + } + RouterRpcServer rpcServer = router.getRpcServer(); + rpcServer.setSafeMode(false); + router.updateRouterState(RouterServiceState.RUNNING); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + + // Use same interval as cache update service + this.setIntervalMs(conf.getTimeDuration( + RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT, + TimeUnit.MILLISECONDS)); + + this.startupInterval = conf.getTimeDuration( + RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION, + RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT, + TimeUnit.MILLISECONDS); + LOG.info("Leave startup safe mode after {} ms", this.startupInterval); + + this.staleInterval = conf.getTimeDuration( + RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION, + RBFConfigKeys.DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT, + TimeUnit.MILLISECONDS); + LOG.info("Enter safe mode after {} ms without reaching the State Store", + this.staleInterval); + + this.startupTime = Time.now(); + + // Initializing the RPC server in safe mode, it will disable it later + enter(); + + super.serviceInit(conf); + } + + @Override + public void periodicInvoke() { + long now = Time.now(); + long delta = now - startupTime; + if (delta < startupInterval) { + LOG.info("Delaying safemode exit for {} milliseconds...", + this.startupInterval - delta); + return; + } + RouterRpcServer rpcServer = router.getRpcServer(); + StateStoreService stateStore = router.getStateStore(); + long cacheUpdateTime = stateStore.getCacheUpdateTime(); + boolean isCacheStale = (now - cacheUpdateTime) > this.staleInterval; + + // Always update to indicate our cache was updated + if (isCacheStale) { + if (!rpcServer.isInSafeMode()) { + enter(); + } + } else if (rpcServer.isInSafeMode()) { + // Cache recently updated, leave safe mode + leave(); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java new file mode 100644 index 0000000..3accbe9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +/** + * States of the Router. + */ +public enum RouterServiceState { + UNINITIALIZED, + INITIALIZING, + SAFEMODE, + RUNNING, + STOPPING, + SHUTDOWN, + EXPIRED; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java new file mode 100644 index 0000000..527600c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateManager.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse; + +/** + * Interface of managing the Router state. + */ +public interface RouterStateManager { + /** + * Enter safe mode and change Router state to RouterServiceState#SAFEMODE. + */ + EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request) + throws IOException; + + /** + * Leave safe mode and change Router state to RouterServiceState#RUNNING. + */ + LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request) + throws IOException; + + /** + * Verify if current Router state is safe mode. + */ + GetSafeModeResponse getSafeMode(GetSafeModeRequest request) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java new file mode 100644 index 0000000..327f39b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/package-info.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The router package includes the core services for a HDFS federation router. + * The {@link Router} acts as a transparent proxy in front of a cluster of + * multiple NameNodes and nameservices. The {@link RouterRpcServer} exposes the + * NameNode clientProtocol and is the primary contact point for DFS clients in a + * federated cluster. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java new file mode 100644 index 0000000..cdd4449 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Record store that takes care of caching the records in memory. + * + * @param <R> Record to store by this interface. + */ +public abstract class CachedRecordStore<R extends BaseRecord> + extends RecordStore<R> implements StateStoreCache { + + private static final Logger LOG = + LoggerFactory.getLogger(CachedRecordStore.class); + + + /** Prevent loading the cache more than once every 500 ms. */ + private static final long MIN_UPDATE_MS = 500; + + + /** Cached entries. */ + private List<R> records = new ArrayList<>(); + + /** Time stamp of the cached entries. */ + private long timestamp = -1; + + /** If the cache is initialized. */ + private boolean initialized = false; + + /** Last time the cache was updated. */ + private long lastUpdate = -1; + + /** Lock to access the memory cache. */ + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + /** If it should override the expired values when loading the cache. */ + private boolean override = false; + + + /** + * Create a new cached record store. + * + * @param clazz Class of the record to store. + * @param driver State Store driver. + */ + protected CachedRecordStore(Class<R> clazz, StateStoreDriver driver) { + this(clazz, driver, false); + } + + /** + * Create a new cached record store. + * + * @param clazz Class of the record to store. + * @param driver State Store driver. + * @param over If the entries should be override if they expire + */ + protected CachedRecordStore( + Class<R> clazz, StateStoreDriver driver, boolean over) { + super(clazz, driver); + + this.override = over; + } + + /** + * Check that the cache of the State Store information is available. + * + * @throws StateStoreUnavailableException If the cache is not initialized. + */ + private void checkCacheAvailable() throws StateStoreUnavailableException { + if (!this.initialized) { + throw new StateStoreUnavailableException( + "Cached State Store not initialized, " + + getRecordClass().getSimpleName() + " records not valid"); + } + } + + @Override + public boolean loadCache(boolean force) throws IOException { + // Prevent loading the cache too frequently + if (force || isUpdateTime()) { + List<R> newRecords = null; + long t = -1; + try { + QueryResult<R> result = getDriver().get(getRecordClass()); + newRecords = result.getRecords(); + t = result.getTimestamp(); + + // If we have any expired record, update the State Store + if (this.override) { + overrideExpiredRecords(result); + } + } catch (IOException e) { + LOG.error("Cannot get \"{}\" records from the State Store", + getRecordClass().getSimpleName()); + this.initialized = false; + return false; + } + + // Update cache atomically + writeLock.lock(); + try { + this.records.clear(); + this.records.addAll(newRecords); + this.timestamp = t; + this.initialized = true; + } finally { + writeLock.unlock(); + } + + // Update the metrics for the cache State Store size + StateStoreMetrics metrics = getDriver().getMetrics(); + if (metrics != null) { + String recordName = getRecordClass().getSimpleName(); + metrics.setCacheSize(recordName, this.records.size()); + } + + lastUpdate = Time.monotonicNow(); + } + return true; + } + + /** + * Check if it's time to update the cache. Update it it was never updated. + * + * @return If it's time to update this cache. + */ + private boolean isUpdateTime() { + return Time.monotonicNow() - lastUpdate > MIN_UPDATE_MS; + } + + /** + * Updates the state store with any record overrides we detected, such as an + * expired state. + * + * @param query RecordQueryResult containing the data to be inspected. + * @throws IOException + */ + public void overrideExpiredRecords(QueryResult<R> query) throws IOException { + List<R> commitRecords = new ArrayList<>(); + List<R> newRecords = query.getRecords(); + long currentDriverTime = query.getTimestamp(); + if (newRecords == null || currentDriverTime <= 0) { + LOG.error("Cannot check overrides for record"); + return; + } + for (R record : newRecords) { + if (record.checkExpired(currentDriverTime)) { + String recordName = StateStoreUtils.getRecordName(record.getClass()); + LOG.info("Override State Store record {}: {}", recordName, record); + commitRecords.add(record); + } + } + if (commitRecords.size() > 0) { + getDriver().putAll(commitRecords, true, false); + } + } + + /** + * Updates the state store with any record overrides we detected, such as an + * expired state. + * + * @param record Record record to be updated. + * @throws IOException + */ + public void overrideExpiredRecord(R record) throws IOException { + List<R> newRecords = Collections.singletonList(record); + long time = getDriver().getTime(); + QueryResult<R> query = new QueryResult<>(newRecords, time); + overrideExpiredRecords(query); + } + + /** + * Get all the cached records. + * + * @return Copy of the cached records. + * @throws StateStoreUnavailableException If the State store is not available. + */ + public List<R> getCachedRecords() throws StateStoreUnavailableException { + checkCacheAvailable(); + + List<R> ret = new LinkedList<R>(); + this.readLock.lock(); + try { + ret.addAll(this.records); + } finally { + this.readLock.unlock(); + } + return ret; + } + + /** + * Get all the cached records and the time stamp of the cache. + * + * @return Copy of the cached records and the time stamp. + * @throws StateStoreUnavailableException If the State store is not available. + */ + protected QueryResult<R> getCachedRecordsAndTimeStamp() + throws StateStoreUnavailableException { + checkCacheAvailable(); + + this.readLock.lock(); + try { + return new QueryResult<R>(this.records, this.timestamp); + } finally { + this.readLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java new file mode 100644 index 0000000..3e8ba6b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MembershipStore.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; + +/** + * Management API for NameNode registrations stored in + * {@link org.apache.hadoop.hdfs.server.federation.store.records.MembershipState + * MembershipState} records. The {@link org.apache.hadoop.hdfs.server. + * federation.router.RouterHeartbeatService RouterHeartbeatService} periodically + * polls each NN to update the NameNode metadata(addresses, operational) and HA + * state(active, standby). Each NameNode may be polled by multiple + * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router} + * instances. + * <p> + * Once fetched from the + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver}, NameNode registrations are cached until the next query. + * The fetched registration data is aggregated using a quorum to determine the + * best/most accurate state for each NameNode. The cache is periodically updated + * by the @{link StateStoreCacheUpdateService}. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class MembershipStore + extends CachedRecordStore<MembershipState> { + + protected MembershipStore(StateStoreDriver driver) { + super(MembershipState.class, driver, true); + } + + /** + * Inserts or updates a namenode membership entry into the table. + * + * @param request Fully populated NamenodeHeartbeatRequest request. + * @return True if successful, false otherwise. + * @throws StateStoreUnavailableException Throws exception if the data store + * is not initialized. + * @throws IOException if the data store could not be queried or the query is + * invalid. + */ + public abstract NamenodeHeartbeatResponse namenodeHeartbeat( + NamenodeHeartbeatRequest request) throws IOException; + + /** + * Queries for a single cached registration entry matching the given + * parameters. Possible keys are the names of data structure elements Possible + * values are matching SQL "LIKE" targets. + * + * @param request Fully populated GetNamenodeRegistrationsRequest request. + * @return Single matching FederationMembershipStateEntry or null if not found + * or more than one entry matches. + * @throws StateStoreUnavailableException Throws exception if the data store + * is not initialized. + * @throws IOException if the data store could not be queried or the query is + * invalid. + */ + public abstract GetNamenodeRegistrationsResponse getNamenodeRegistrations( + GetNamenodeRegistrationsRequest request) throws IOException; + + /** + * Get the expired registrations from the registration cache. + * + * @return Expired registrations or zero-length list if none are found. + * @throws StateStoreUnavailableException Throws exception if the data store + * is not initialized. + * @throws IOException if the data store could not be queried or the query is + * invalid. + */ + public abstract GetNamenodeRegistrationsResponse + getExpiredNamenodeRegistrations(GetNamenodeRegistrationsRequest request) + throws IOException; + + /** + * Retrieves a list of registered nameservices and their associated info. + * + * @param request + * @return Collection of information for each registered nameservice. + * @throws IOException if the data store could not be queried or the query is + * invalid. + */ + public abstract GetNamespaceInfoResponse getNamespaceInfo( + GetNamespaceInfoRequest request) throws IOException; + + /** + * Overrides a cached namenode state with an updated state. + * + * @param request Fully populated OverrideNamenodeRegistrationRequest request. + * @return OverrideNamenodeRegistrationResponse + * @throws StateStoreUnavailableException if the data store is not + * initialized. + * @throws IOException if the data store could not be queried or the query is + * invalid. + */ + public abstract UpdateNamenodeRegistrationResponse updateNamenodeRegistration( + UpdateNamenodeRegistrationRequest request) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java new file mode 100644 index 0000000..b439659 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/MountTableStore.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; + +/** + * Management API for the HDFS mount table information stored in + * {@link org.apache.hadoop.hdfs.server.federation.store.records.MountTable + * MountTable} records. The mount table contains entries that map a particular + * global namespace path one or more HDFS nameservices (NN) + target path. It is + * possible to map mount locations for root folders, directories or individual + * files. + * <p> + * Once fetched from the + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver}, MountTable records are cached in a tree for faster access. + * Each path in the global namespace is mapped to a nameserivce ID and local + * path upon request. The cache is periodically updated by the @{link + * StateStoreCacheUpdateService}. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class MountTableStore extends CachedRecordStore<MountTable> + implements MountTableManager { + + public MountTableStore(StateStoreDriver driver) { + super(MountTable.class, driver); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java new file mode 100644 index 0000000..53a8b82 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RecordStore.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; + +/** + * Store records in the State Store. Subclasses provide interfaces to operate on + * those records. + * + * @param <R> Record to store by this interface. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class RecordStore<R extends BaseRecord> { + + private static final Log LOG = LogFactory.getLog(RecordStore.class); + + + /** Class of the record stored in this State Store. */ + private final Class<R> recordClass; + + /** State store driver backed by persistent storage. */ + private final StateStoreDriver driver; + + + /** + * Create a new store for records. + * + * @param clazz Class of the record to store. + * @param stateStoreDriver Driver for the State Store. + */ + protected RecordStore(Class<R> clazz, StateStoreDriver stateStoreDriver) { + this.recordClass = clazz; + this.driver = stateStoreDriver; + } + + /** + * Report a required record to the data store. The data store uses this to + * create/maintain storage for the record. + * + * @return The class of the required record or null if no record is required + * for this interface. + */ + public Class<R> getRecordClass() { + return this.recordClass; + } + + /** + * Get the State Store driver. + * + * @return State Store driver. + */ + protected StateStoreDriver getDriver() { + return this.driver; + } + + /** + * Build a state store API implementation interface. + * + * @param clazz The specific interface implementation to create + * @param driver The {@link StateStoreDriver} implementation in use. + * @return An initialized instance of the specified state store API + * implementation. + */ + public static <T extends RecordStore<?>> T newInstance( + final Class<T> clazz, final StateStoreDriver driver) { + + try { + Constructor<T> constructor = clazz.getConstructor(StateStoreDriver.class); + T recordStore = constructor.newInstance(driver); + return recordStore; + } catch (Exception e) { + LOG.error("Cannot create new instance for " + clazz, e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java new file mode 100644 index 0000000..c6a0dad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/RouterStore.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; + +/** + * Management API for + * {@link org.apache.hadoop.hdfs.server.federation.store.records.RouterState + * RouterState} records in the state store. Accesses the data store via the + * {@link org.apache.hadoop.hdfs.server.federation.store.driver. + * StateStoreDriver StateStoreDriver} interface. No data is cached. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class RouterStore extends CachedRecordStore<RouterState> { + + public RouterStore(StateStoreDriver driver) { + super(RouterState.class, driver, true); + } + + /** + * Fetches the current router state object. + * + * @param request Fully populated request object. + * @return The matching router record or null if none exists. + * @throws IOException Throws exception if unable to query the data store or + * if more than one matching record is found. + */ + public abstract GetRouterRegistrationResponse getRouterRegistration( + GetRouterRegistrationRequest request) throws IOException; + + /** + * Fetches all router status objects. + * + * @param request Fully populated request object. + * @return List of Router records present in the data store. + * @throws IOException Throws exception if unable to query the data store + */ + public abstract GetRouterRegistrationsResponse getRouterRegistrations( + GetRouterRegistrationsRequest request) throws IOException; + + /** + * Update the state of this router in the State Store. + * + * @param request Fully populated request object. + * @return True if the update was successfully recorded, false otherwise. + * @throws IOException Throws exception if unable to query the data store + */ + public abstract RouterHeartbeatResponse routerHeartbeat( + RouterHeartbeatRequest request) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java new file mode 100644 index 0000000..83fc501 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCache.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; + +/** + * Interface for a cached copy of the State Store. + */ +public interface StateStoreCache { + + /** + * Load the cache from the State Store. Called by the cache update service + * when the data has been reloaded. + * + * @param force If we force the load. + * @return If the cache was loaded successfully. + * @throws IOException If there was an error loading the cache. + */ + boolean loadCache(boolean force) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java new file mode 100644 index 0000000..6a4dc76 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreCacheUpdateService.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.PeriodicService; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically update the {@link StateStoreService} + * cached information in the + * {@link org.apache.hadoop.hdfs.server.federation.router.Router Router}. + * This is for performance and removes the State Store from the critical path + * in common operations. + */ +public class StateStoreCacheUpdateService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreCacheUpdateService.class); + + /** The service that manages the State Store connection. */ + private final StateStoreService stateStore; + + + /** + * Create a new Cache update service. + * + * @param stateStore Implementation of the state store + */ + public StateStoreCacheUpdateService(StateStoreService stateStore) { + super(StateStoreCacheUpdateService.class.getSimpleName()); + this.stateStore = stateStore; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + + this.setIntervalMs(conf.getTimeDuration( + RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, + RBFConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT, + TimeUnit.MILLISECONDS)); + + super.serviceInit(conf); + } + + @Override + public void periodicInvoke() { + LOG.debug("Updating State Store cache"); + stateStore.refreshCaches(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java new file mode 100644 index 0000000..8ebf4b8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.PeriodicService; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically monitor the connection of the StateStore + * {@link StateStoreService} data store and to re-open the connection + * to the data store if required. + */ +public class StateStoreConnectionMonitorService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreConnectionMonitorService.class); + + /** Service that maintains the State Store connection. */ + private final StateStoreService stateStore; + + + /** + * Create a new service to monitor the connectivity of the state store driver. + * + * @param store Instance of the state store to be monitored. + */ + public StateStoreConnectionMonitorService(StateStoreService store) { + super(StateStoreConnectionMonitorService.class.getSimpleName()); + this.stateStore = store; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.setIntervalMs(conf.getLong( + RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, + RBFConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT)); + + super.serviceInit(conf); + } + + @Override + public void periodicInvoke() { + LOG.debug("Checking state store connection"); + if (!stateStore.isDriverReady()) { + LOG.info("Attempting to open state store driver."); + stateStore.loadDriver(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java new file mode 100644 index 0000000..ccbde09 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -0,0 +1,450 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMBean; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl; +import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; +import org.apache.hadoop.hdfs.server.federation.store.impl.RouterStoreImpl; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A service to initialize a + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver + * StateStoreDriver} and maintain the connection to the data store. There are + * multiple state store driver connections supported: + * <ul> + * <li>File + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. + * StateStoreFileImpl StateStoreFileImpl} + * <li>ZooKeeper + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. + * StateStoreZooKeeperImpl StateStoreZooKeeperImpl} + * </ul> + * <p> + * The service also supports the dynamic registration of record stores like: + * <ul> + * <li>{@link MembershipStore}: state of the Namenodes in the + * federation. + * <li>{@link MountTableStore}: Mount table between to subclusters. + * See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}. + * <li>{@link RebalancerStore}: Log of the rebalancing operations. + * <li>{@link RouterStore}: Router state in the federation. + * <li>{@link TokenStore}: Tokens in the federation. + * </ul> + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class StateStoreService extends CompositeService { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreService.class); + + + /** State Store configuration. */ + private Configuration conf; + + /** Identifier for the service. */ + private String identifier; + + /** Driver for the back end connection. */ + private StateStoreDriver driver; + + /** Service to maintain data store connection. */ + private StateStoreConnectionMonitorService monitorService; + + /** StateStore metrics. */ + private StateStoreMetrics metrics; + + /** Supported record stores. */ + private final Map< + Class<? extends BaseRecord>, RecordStore<? extends BaseRecord>> + recordStores; + + /** Service to maintain State Store caches. */ + private StateStoreCacheUpdateService cacheUpdater; + /** Time the cache was last successfully updated. */ + private long cacheLastUpdateTime; + /** List of internal caches to update. */ + private final List<StateStoreCache> cachesToUpdateInternal; + /** List of external caches to update. */ + private final List<StateStoreCache> cachesToUpdateExternal; + + + public StateStoreService() { + super(StateStoreService.class.getName()); + + // Records and stores supported by this implementation + this.recordStores = new HashMap<>(); + + // Caches to maintain + this.cachesToUpdateInternal = new ArrayList<>(); + this.cachesToUpdateExternal = new ArrayList<>(); + } + + /** + * Initialize the State Store and the connection to the backend. + * + * @param config Configuration for the State Store. + * @throws IOException + */ + @Override + protected void serviceInit(Configuration config) throws Exception { + this.conf = config; + + // Create implementation of State Store + Class<? extends StateStoreDriver> driverClass = this.conf.getClass( + RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS, + RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT, + StateStoreDriver.class); + this.driver = ReflectionUtils.newInstance(driverClass, this.conf); + + if (this.driver == null) { + throw new IOException("Cannot create driver for the State Store"); + } + + // Add supported record stores + addRecordStore(MembershipStoreImpl.class); + addRecordStore(MountTableStoreImpl.class); + addRecordStore(RouterStoreImpl.class); + + // Check the connection to the State Store periodically + this.monitorService = new StateStoreConnectionMonitorService(this); + this.addService(monitorService); + + // Set expirations intervals for each record + MembershipState.setExpirationMs(conf.getLong( + RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, + RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT)); + + RouterState.setExpirationMs(conf.getTimeDuration( + RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS, + RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT, + TimeUnit.MILLISECONDS)); + + // Cache update service + this.cacheUpdater = new StateStoreCacheUpdateService(this); + addService(this.cacheUpdater); + + // Create metrics for the State Store + this.metrics = StateStoreMetrics.create(conf); + + // Adding JMX interface + try { + StandardMBean bean = new StandardMBean(metrics, StateStoreMBean.class); + ObjectName registeredObject = + MBeans.register("Router", "StateStore", bean); + LOG.info("Registered StateStoreMBean: {}", registeredObject); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad StateStoreMBean setup", e); + } catch (MetricsException e) { + LOG.info("Failed to register State Store bean {}", e.getMessage()); + } + + super.serviceInit(this.conf); + } + + @Override + protected void serviceStart() throws Exception { + loadDriver(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + closeDriver(); + + if (metrics != null) { + metrics.shutdown(); + metrics = null; + } + + super.serviceStop(); + } + + /** + * Add a record store to the State Store. It includes adding the store, the + * supported record and the cache management. + * + * @param clazz Class of the record store to track. + * @return New record store. + * @throws ReflectiveOperationException + */ + private <T extends RecordStore<?>> void addRecordStore( + final Class<T> clazz) throws ReflectiveOperationException { + + assert this.getServiceState() == STATE.INITED : + "Cannot add record to the State Store once started"; + + T recordStore = RecordStore.newInstance(clazz, this.getDriver()); + Class<? extends BaseRecord> recordClass = recordStore.getRecordClass(); + this.recordStores.put(recordClass, recordStore); + + // Subscribe for cache updates + if (recordStore instanceof StateStoreCache) { + StateStoreCache cachedRecordStore = (StateStoreCache) recordStore; + this.cachesToUpdateInternal.add(cachedRecordStore); + } + } + + /** + * Get the record store in this State Store for a given interface. + * + * @param recordStoreClass Class of the record store. + * @return Registered record store or null if not found. + */ + public <T extends RecordStore<?>> T getRegisteredRecordStore( + final Class<T> recordStoreClass) { + for (RecordStore<? extends BaseRecord> recordStore : + this.recordStores.values()) { + if (recordStoreClass.isInstance(recordStore)) { + @SuppressWarnings("unchecked") + T recordStoreChecked = (T) recordStore; + return recordStoreChecked; + } + } + return null; + } + + /** + * List of records supported by this State Store. + * + * @return List of supported record classes. + */ + public Collection<Class<? extends BaseRecord>> getSupportedRecords() { + return this.recordStores.keySet(); + } + + /** + * Load the State Store driver. If successful, refresh cached data tables. + */ + public void loadDriver() { + synchronized (this.driver) { + if (!isDriverReady()) { + String driverName = this.driver.getClass().getSimpleName(); + if (this.driver.init( + conf, getIdentifier(), getSupportedRecords(), metrics)) { + LOG.info("Connection to the State Store driver {} is open and ready", + driverName); + this.refreshCaches(); + } else { + LOG.error("Cannot initialize State Store driver {}", driverName); + } + } + } + } + + /** + * Check if the driver is ready to be used. + * + * @return If the driver is ready. + */ + public boolean isDriverReady() { + return this.driver.isDriverReady(); + } + + /** + * Manually shuts down the driver. + * + * @throws Exception If the driver cannot be closed. + */ + @VisibleForTesting + public void closeDriver() throws Exception { + if (this.driver != null) { + this.driver.close(); + } + } + + /** + * Get the state store driver. + * + * @return State store driver. + */ + public StateStoreDriver getDriver() { + return this.driver; + } + + /** + * Fetch a unique identifier for this state store instance. Typically it is + * the address of the router. + * + * @return Unique identifier for this store. + */ + public String getIdentifier() { + return this.identifier; + } + + /** + * Set a unique synchronization identifier for this store. + * + * @param id Unique identifier, typically the router's RPC address. + */ + public void setIdentifier(String id) { + this.identifier = id; + } + + // + // Cached state store data + // + /** + * The last time the state store cache was fully updated. + * + * @return Timestamp. + */ + public long getCacheUpdateTime() { + return this.cacheLastUpdateTime; + } + + /** + * Stops the cache update service. + */ + @VisibleForTesting + public void stopCacheUpdateService() { + if (this.cacheUpdater != null) { + this.cacheUpdater.stop(); + removeService(this.cacheUpdater); + this.cacheUpdater = null; + } + } + + /** + * Register a cached record store for automatic periodic cache updates. + * + * @param client Client to the state store. + */ + public void registerCacheExternal(StateStoreCache client) { + this.cachesToUpdateExternal.add(client); + } + + /** + * Refresh the cache with information from the State Store. Called + * periodically by the CacheUpdateService to maintain data caches and + * versions. + */ + public void refreshCaches() { + refreshCaches(false); + } + + /** + * Refresh the cache with information from the State Store. Called + * periodically by the CacheUpdateService to maintain data caches and + * versions. + * @param force If we force the refresh. + */ + public void refreshCaches(boolean force) { + boolean success = true; + if (isDriverReady()) { + List<StateStoreCache> cachesToUpdate = new LinkedList<>(); + cachesToUpdate.addAll(cachesToUpdateInternal); + cachesToUpdate.addAll(cachesToUpdateExternal); + for (StateStoreCache cachedStore : cachesToUpdate) { + String cacheName = cachedStore.getClass().getSimpleName(); + boolean result = false; + try { + result = cachedStore.loadCache(force); + } catch (IOException e) { + LOG.error("Error updating cache for {}", cacheName, e); + result = false; + } + if (!result) { + success = false; + LOG.error("Cache update failed for cache {}", cacheName); + } + } + } else { + success = false; + LOG.info("Skipping State Store cache update, driver is not ready."); + } + if (success) { + // Uses local time, not driver time. + this.cacheLastUpdateTime = Time.now(); + } + } + + /** + * Update the cache for a specific record store. + * + * @param clazz Class of the record store. + * @return If the cached was loaded. + * @throws IOException if the cache update failed. + */ + public boolean loadCache(final Class<?> clazz) throws IOException { + return loadCache(clazz, false); + } + + /** + * Update the cache for a specific record store. + * + * @param clazz Class of the record store. + * @param force Force the update ignoring cached periods. + * @return If the cached was loaded. + * @throws IOException if the cache update failed. + */ + public boolean loadCache(Class<?> clazz, boolean force) throws IOException { + List<StateStoreCache> cachesToUpdate = + new LinkedList<StateStoreCache>(); + cachesToUpdate.addAll(this.cachesToUpdateInternal); + cachesToUpdate.addAll(this.cachesToUpdateExternal); + for (StateStoreCache cachedStore : cachesToUpdate) { + if (clazz.isInstance(cachedStore)) { + return cachedStore.loadCache(force); + } + } + throw new IOException("Registered cache was not found for " + clazz); + } + + /** + * Get the metrics for the State Store. + * + * @return State Store metrics. + */ + public StateStoreMetrics getMetrics() { + return metrics; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java new file mode 100644 index 0000000..4e6f8c8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUnavailableException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.io.IOException; + +/** + * Thrown when the state store is not reachable or available. Cached APIs and + * queries may succeed. Client should retry again later. + */ +public class StateStoreUnavailableException extends IOException { + + private static final long serialVersionUID = 1L; + + public StateStoreUnavailableException(String msg) { + super(msg); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java new file mode 100644 index 0000000..0a36619 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Set of utility functions used to work with the State Store. + */ +public final class StateStoreUtils { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreUtils.class); + + + private StateStoreUtils() { + // Utility class + } + + /** + * Get the base class for a record class. If we get an implementation of a + * record we will return the real parent record class. + * + * @param clazz Class of the data record to check. + * @return Base class for the record. + */ + @SuppressWarnings("unchecked") + public static <T extends BaseRecord> + Class<? extends BaseRecord> getRecordClass(final Class<T> clazz) { + + // We ignore the Impl classes and go to the super class + Class<? extends BaseRecord> actualClazz = clazz; + while (actualClazz.getSimpleName().endsWith("Impl")) { + actualClazz = (Class<? extends BaseRecord>) actualClazz.getSuperclass(); + } + + // Check if we went too far + if (actualClazz.equals(BaseRecord.class)) { + LOG.error("We went too far ({}) with {}", actualClazz, clazz); + actualClazz = clazz; + } + return actualClazz; + } + + /** + * Get the base class for a record. If we get an implementation of a record we + * will return the real parent record class. + * + * @param record Record to check its main class. + * @return Base class for the record. + */ + public static <T extends BaseRecord> + Class<? extends BaseRecord> getRecordClass(final T record) { + return getRecordClass(record.getClass()); + } + + /** + * Get the base class name for a record. If we get an implementation of a + * record we will return the real parent record class. + * + * @param clazz Class of the data record to check. + * @return Name of the base class for the record. + */ + public static <T extends BaseRecord> String getRecordName( + final Class<T> clazz) { + return getRecordClass(clazz).getSimpleName(); + } + + /** + * Filters a list of records to find all records matching the query. + * + * @param query Map of field names and objects to use to filter results. + * @param records List of data records to filter. + * @return List of all records matching the query (or empty list if none + * match), null if the data set could not be filtered. + */ + public static <T extends BaseRecord> List<T> filterMultiple( + final Query<T> query, final Iterable<T> records) { + + List<T> matchingList = new ArrayList<>(); + for (T record : records) { + if (query.matches(record)) { + matchingList.add(record); + } + } + return matchingList; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java new file mode 100644 index 0000000..d595a97 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.net.InetAddress; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Driver class for an implementation of a {@link StateStoreService} + * provider. Driver implementations will extend this class and implement some of + * the default methods. + */ +public abstract class StateStoreDriver implements StateStoreRecordOperations { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreDriver.class); + + + /** State Store configuration. */ + private Configuration conf; + + /** Identifier for the driver. */ + private String identifier; + + /** State Store metrics. */ + private StateStoreMetrics metrics; + + + /** + * Initialize the state store connection. + * + * @param config Configuration for the driver. + * @param id Identifier for the driver. + * @param records Records that are supported. + * @return If initialized and ready, false if failed to initialize driver. + */ + public boolean init(final Configuration config, final String id, + final Collection<Class<? extends BaseRecord>> records, + final StateStoreMetrics stateStoreMetrics) { + + this.conf = config; + this.identifier = id; + this.metrics = stateStoreMetrics; + + if (this.identifier == null) { + LOG.warn("The identifier for the State Store connection is not set"); + } + + boolean success = initDriver(); + if (!success) { + LOG.error("Cannot initialize driver for {}", getDriverName()); + return false; + } + + for (Class<? extends BaseRecord> cls : records) { + String recordString = StateStoreUtils.getRecordName(cls); + if (!initRecordStorage(recordString, cls)) { + LOG.error("Cannot initialize record store for {}", cls.getSimpleName()); + return false; + } + } + return true; + } + + /** + * Get the State Store configuration. + * + * @return Configuration for the State Store. + */ + protected Configuration getConf() { + return this.conf; + } + + /** + * Gets a unique identifier for the running task/process. Typically the + * router address. + * + * @return Unique identifier for the running task. + */ + public String getIdentifier() { + return this.identifier; + } + + /** + * Get the metrics for the State Store. + * + * @return State Store metrics. + */ + public StateStoreMetrics getMetrics() { + return this.metrics; + } + + /** + * Prepare the driver to access data storage. + * + * @return True if the driver was successfully initialized. If false is + * returned, the state store will periodically attempt to + * re-initialize the driver and the router will remain in safe mode + * until the driver is initialized. + */ + public abstract boolean initDriver(); + + /** + * Initialize storage for a single record class. + * + * @param className String reference of the record class to initialize, + * used to construct paths and file names for the record. + * Determined by configuration settings for the specific + * driver. + * @param clazz Record type corresponding to the provided name. + * @return True if successful, false otherwise. + */ + public abstract <T extends BaseRecord> boolean initRecordStorage( + String className, Class<T> clazz); + + /** + * Check if the driver is currently running and the data store connection is + * valid. + * + * @return True if the driver is initialized and the data store is ready. + */ + public abstract boolean isDriverReady(); + + /** + * Check if the driver is ready to be used and throw an exception otherwise. + * + * @throws StateStoreUnavailableException If the driver is not ready. + */ + public void verifyDriverReady() throws StateStoreUnavailableException { + if (!isDriverReady()) { + String driverName = getDriverName(); + String hostname = getHostname(); + throw new StateStoreUnavailableException("State Store driver " + + driverName + " in " + hostname + " is not ready."); + } + } + + /** + * Close the State Store driver connection. + */ + public abstract void close() throws Exception; + + /** + * Returns the current time synchronization from the underlying store. + * Override for stores that supply a current date. The data store driver is + * responsible for maintaining the official synchronization time/date for all + * distributed components. + * + * @return Current time stamp, used for all synchronization dates. + */ + public long getTime() { + return Time.now(); + } + + /** + * Get the name of the driver implementation for debugging. + * + * @return Name of the driver implementation. + */ + private String getDriverName() { + return this.getClass().getSimpleName(); + } + + /** + * Get the host name of the machine running the driver for debugging. + * + * @return Host name of the machine running the driver. + */ + private String getHostname() { + String hostname = "Unknown"; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (Exception e) { + LOG.error("Cannot get local address", e); + } + return hostname; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java new file mode 100644 index 0000000..443d46e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.apache.hadoop.io.retry.AtMostOnce; +import org.apache.hadoop.io.retry.Idempotent; + +/** + * Operations for a driver to manage records in the State Store. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface StateStoreRecordOperations { + + /** + * Get all records of the requested record class from the data store. To use + * the default implementations in this class, getAll must return new instances + * of the records on each call. It is recommended to override the default + * implementations for better performance. + * + * @param clazz Class of record to fetch. + * @return List of all records that match the clazz. + * @throws IOException Throws exception if unable to query the data store. + */ + @Idempotent + <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException; + + /** + * Get a single record from the store that matches the query. + * + * @param clazz Class of record to fetch. + * @param query Query to filter results. + * @return A single record matching the query. Null if there are no matching + * records or more than one matching record in the store. + * @throws IOException If multiple records match or if the data store cannot + * be queried. + */ + @Idempotent + <T extends BaseRecord> T get(Class<T> clazz, Query<T> query) + throws IOException; + + /** + * Get multiple records from the store that match a query. This method + * assumes the underlying driver does not support filtering. If the driver + * supports filtering it should overwrite this method. + * + * @param clazz Class of record to fetch. + * @param query Query to filter results. + * @return Records of type clazz that match the query or empty list if none + * are found. + * @throws IOException Throws exception if unable to query the data store. + */ + @Idempotent + <T extends BaseRecord> List<T> getMultiple( + Class<T> clazz, Query<T> query) throws IOException; + + /** + * Creates a single record. Optionally updates an existing record with same + * primary key. + * + * @param record The record to insert or update. + * @param allowUpdate True if update of exiting record is allowed. + * @param errorIfExists True if an error should be returned when inserting + * an existing record. Only used if allowUpdate = false. + * @return True if the operation was successful. + * + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + <T extends BaseRecord> boolean put( + T record, boolean allowUpdate, boolean errorIfExists) throws IOException; + + /** + * Creates multiple records. Optionally updates existing records that have + * the same primary key. + * + * @param records List of data records to update or create. All records must + * be of class clazz. + * @param clazz Record class of records. + * @param allowUpdate True if update of exiting record is allowed. + * @param errorIfExists True if an error should be returned when inserting + * an existing record. Only used if allowUpdate = false. + * @return true if all operations were successful. + * + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + <T extends BaseRecord> boolean putAll( + List<T> records, boolean allowUpdate, boolean errorIfExists) + throws IOException; + + /** + * Remove a single record. + * + * @param record Record to be removed. + * @return true If the record was successfully removed. False if the record + * could not be removed or not stored. + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + <T extends BaseRecord> boolean remove(T record) throws IOException; + + /** + * Remove all records of this class from the store. + * + * @param clazz Class of records to remove. + * @return True if successful. + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException; + + /** + * Remove multiple records of a specific class that match a query. Requires + * the getAll implementation to fetch fresh records on each call. + * + * @param query Query to filter what to remove. + * @return The number of records removed. + * @throws IOException Throws exception if unable to query the data store. + */ + @AtMostOnce + <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) + throws IOException; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java new file mode 100644 index 0000000..666712f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreSerializer.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Serializer to store and retrieve data in the State Store. + */ +public abstract class StateStoreSerializer { + + /** Singleton for the serializer instance. */ + private static StateStoreSerializer defaultSerializer; + + /** + * Get the default serializer based. + * @return Singleton serializer. + */ + public static StateStoreSerializer getSerializer() { + return getSerializer(null); + } + + /** + * Get a serializer based on the provided configuration. + * @param conf Configuration. Default if null. + * @return Singleton serializer. + */ + public static StateStoreSerializer getSerializer(Configuration conf) { + if (conf == null) { + synchronized (StateStoreSerializer.class) { + if (defaultSerializer == null) { + conf = new Configuration(); + defaultSerializer = newSerializer(conf); + } + } + return defaultSerializer; + } else { + return newSerializer(conf); + } + } + + private static StateStoreSerializer newSerializer(final Configuration conf) { + Class<? extends StateStoreSerializer> serializerName = conf.getClass( + RBFConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS, + RBFConfigKeys.FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT, + StateStoreSerializer.class); + return ReflectionUtils.newInstance(serializerName, conf); + } + + /** + * Create a new record. + * @param clazz Class of the new record. + * @return New record. + */ + public static <T> T newRecord(Class<T> clazz) { + return getSerializer(null).newRecordInstance(clazz); + } + + /** + * Create a new record. + * @param clazz Class of the new record. + * @return New record. + */ + public abstract <T> T newRecordInstance(Class<T> clazz); + + /** + * Serialize a record into a byte array. + * @param record Record to serialize. + * @return Byte array with the serialized record. + */ + public abstract byte[] serialize(BaseRecord record); + + /** + * Serialize a record into a string. + * @param record Record to serialize. + * @return String with the serialized record. + */ + public abstract String serializeString(BaseRecord record); + + /** + * Deserialize a bytes array into a record. + * @param byteArray Byte array to deserialize. + * @param clazz Class of the record. + * @return New record. + * @throws IOException If it cannot deserialize the record. + */ + public abstract <T extends BaseRecord> T deserialize( + byte[] byteArray, Class<T> clazz) throws IOException; + + /** + * Deserialize a string into a record. + * @param data String with the data to deserialize. + * @param clazz Class of the record. + * @return New record. + * @throws IOException If it cannot deserialize the record. + */ + public abstract <T extends BaseRecord> T deserialize( + String data, Class<T> clazz) throws IOException; +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org