http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java new file mode 100644 index 0000000..2d04452 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; + +/** + * This class is for maintaining Ozone Manager statistics. + */ +@InterfaceAudience.Private +@Metrics(about="Ozone Manager Metrics", context="dfs") +public class OMMetrics { + private static final String SOURCE_NAME = + OMMetrics.class.getSimpleName(); + + // OM request type op metrics + private @Metric MutableCounterLong numVolumeOps; + private @Metric MutableCounterLong numBucketOps; + private @Metric MutableCounterLong numKeyOps; + + // OM op metrics + private @Metric MutableCounterLong numVolumeCreates; + private @Metric MutableCounterLong numVolumeUpdates; + private @Metric MutableCounterLong numVolumeInfos; + private @Metric MutableCounterLong numVolumeCheckAccesses; + private @Metric MutableCounterLong numBucketCreates; + private @Metric MutableCounterLong numVolumeDeletes; + private @Metric MutableCounterLong numBucketInfos; + private @Metric MutableCounterLong numBucketUpdates; + private @Metric MutableCounterLong numBucketDeletes; + private @Metric MutableCounterLong numKeyAllocate; + private @Metric MutableCounterLong numKeyLookup; + private @Metric MutableCounterLong numKeyRenames; + private @Metric MutableCounterLong numKeyDeletes; + private @Metric MutableCounterLong numBucketLists; + private @Metric MutableCounterLong numKeyLists; + private @Metric MutableCounterLong numVolumeLists; + private @Metric MutableCounterLong numKeyCommits; + private @Metric MutableCounterLong numAllocateBlockCalls; + private @Metric MutableCounterLong numGetServiceLists; + + // Failure Metrics + private @Metric MutableCounterLong numVolumeCreateFails; + private @Metric MutableCounterLong numVolumeUpdateFails; + private @Metric MutableCounterLong numVolumeInfoFails; + private @Metric MutableCounterLong numVolumeDeleteFails; + private @Metric MutableCounterLong numBucketCreateFails; + private @Metric MutableCounterLong numVolumeCheckAccessFails; + private @Metric MutableCounterLong numBucketInfoFails; + private @Metric MutableCounterLong numBucketUpdateFails; + private @Metric MutableCounterLong numBucketDeleteFails; + private @Metric MutableCounterLong numKeyAllocateFails; + private @Metric MutableCounterLong numKeyLookupFails; + private @Metric MutableCounterLong numKeyRenameFails; + private @Metric MutableCounterLong numKeyDeleteFails; + private @Metric MutableCounterLong numBucketListFails; + private @Metric MutableCounterLong numKeyListFails; + private @Metric MutableCounterLong numVolumeListFails; + private @Metric MutableCounterLong numKeyCommitFails; + private @Metric MutableCounterLong numBlockAllocateCallFails; + private @Metric MutableCounterLong numGetServiceListFails; + + public OMMetrics() { + } + + public static OMMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, + "Oozne Manager Metrics", + new OMMetrics()); + } + + public void incNumVolumeCreates() { + numVolumeOps.incr(); + numVolumeCreates.incr(); + } + + public void incNumVolumeUpdates() { + numVolumeOps.incr(); + numVolumeUpdates.incr(); + } + + public void incNumVolumeInfos() { + numVolumeOps.incr(); + numVolumeInfos.incr(); + } + + public void incNumVolumeDeletes() { + numVolumeOps.incr(); + numVolumeDeletes.incr(); + } + + public void incNumVolumeCheckAccesses() { + numVolumeOps.incr(); + numVolumeCheckAccesses.incr(); + } + + public void incNumBucketCreates() { + numBucketOps.incr(); + numBucketCreates.incr(); + } + + public void incNumBucketInfos() { + numBucketOps.incr(); + numBucketInfos.incr(); + } + + public void incNumBucketUpdates() { + numBucketOps.incr(); + numBucketUpdates.incr(); + } + + public void incNumBucketDeletes() { + numBucketOps.incr(); + numBucketDeletes.incr(); + } + + public void incNumBucketLists() { + numBucketOps.incr(); + numBucketLists.incr(); + } + + public void incNumKeyLists() { + numKeyOps.incr(); + numKeyLists.incr(); + } + + public void incNumVolumeLists() { + numVolumeOps.incr(); + numVolumeLists.incr(); + } + + public void incNumGetServiceLists() { + numGetServiceLists.incr(); + } + + public void incNumVolumeCreateFails() { + numVolumeCreateFails.incr(); + } + + public void incNumVolumeUpdateFails() { + numVolumeUpdateFails.incr(); + } + + public void incNumVolumeInfoFails() { + numVolumeInfoFails.incr(); + } + + public void incNumVolumeDeleteFails() { + numVolumeDeleteFails.incr(); + } + + public void incNumVolumeCheckAccessFails() { + numVolumeCheckAccessFails.incr(); + } + + public void incNumBucketCreateFails() { + numBucketCreateFails.incr(); + } + + public void incNumBucketInfoFails() { + numBucketInfoFails.incr(); + } + + public void incNumBucketUpdateFails() { + numBucketUpdateFails.incr(); + } + + public void incNumBucketDeleteFails() { + numBucketDeleteFails.incr(); + } + + public void incNumKeyAllocates() { + numKeyOps.incr(); + numKeyAllocate.incr(); + } + + public void incNumKeyAllocateFails() { + numKeyAllocateFails.incr(); + } + + public void incNumKeyLookups() { + numKeyOps.incr(); + numKeyLookup.incr(); + } + + public void incNumKeyLookupFails() { + numKeyLookupFails.incr(); + } + + public void incNumKeyRenames() { + numKeyOps.incr(); + numKeyRenames.incr(); + } + + public void incNumKeyRenameFails() { + numKeyOps.incr(); + numKeyRenameFails.incr(); + } + + public void incNumKeyDeleteFails() { + numKeyDeleteFails.incr(); + } + + public void incNumKeyDeletes() { + numKeyOps.incr(); + numKeyDeletes.incr(); + } + + public void incNumKeyCommits() { + numKeyOps.incr(); + numKeyCommits.incr(); + } + + public void incNumKeyCommitFails() { + numKeyCommitFails.incr(); + } + + public void incNumBlockAllocateCalls() { + numAllocateBlockCalls.incr(); + } + + public void incNumBlockAllocateCallFails() { + numBlockAllocateCallFails.incr(); + } + + public void incNumBucketListFails() { + numBucketListFails.incr(); + } + + public void incNumKeyListFails() { + numKeyListFails.incr(); + } + + public void incNumVolumeListFails() { + numVolumeListFails.incr(); + } + + public void incNumGetServiceListFails() { + numGetServiceListFails.incr(); + } + + @VisibleForTesting + public long getNumVolumeCreates() { + return numVolumeCreates.value(); + } + + @VisibleForTesting + public long getNumVolumeUpdates() { + return numVolumeUpdates.value(); + } + + @VisibleForTesting + public long getNumVolumeInfos() { + return numVolumeInfos.value(); + } + + @VisibleForTesting + public long getNumVolumeDeletes() { + return numVolumeDeletes.value(); + } + + @VisibleForTesting + public long getNumVolumeCheckAccesses() { + return numVolumeCheckAccesses.value(); + } + + @VisibleForTesting + public long getNumBucketCreates() { + return numBucketCreates.value(); + } + + @VisibleForTesting + public long getNumBucketInfos() { + return numBucketInfos.value(); + } + + @VisibleForTesting + public long getNumBucketUpdates() { + return numBucketUpdates.value(); + } + + @VisibleForTesting + public long getNumBucketDeletes() { + return numBucketDeletes.value(); + } + + @VisibleForTesting + public long getNumBucketLists() { + return numBucketLists.value(); + } + + @VisibleForTesting + public long getNumVolumeLists() { + return numVolumeLists.value(); + } + + @VisibleForTesting + public long getNumKeyLists() { + return numKeyLists.value(); + } + + @VisibleForTesting + public long getNumGetServiceLists() { + return numGetServiceLists.value(); + } + + @VisibleForTesting + public long getNumVolumeCreateFails() { + return numVolumeCreateFails.value(); + } + + @VisibleForTesting + public long getNumVolumeUpdateFails() { + return numVolumeUpdateFails.value(); + } + + @VisibleForTesting + public long getNumVolumeInfoFails() { + return numVolumeInfoFails.value(); + } + + @VisibleForTesting + public long getNumVolumeDeleteFails() { + return numVolumeDeleteFails.value(); + } + + @VisibleForTesting + public long getNumVolumeCheckAccessFails() { + return numVolumeCheckAccessFails.value(); + } + + @VisibleForTesting + public long getNumBucketCreateFails() { + return numBucketCreateFails.value(); + } + + @VisibleForTesting + public long getNumBucketInfoFails() { + return numBucketInfoFails.value(); + } + + @VisibleForTesting + public long getNumBucketUpdateFails() { + return numBucketUpdateFails.value(); + } + + @VisibleForTesting + public long getNumBucketDeleteFails() { + return numBucketDeleteFails.value(); + } + + @VisibleForTesting + public long getNumKeyAllocates() { + return numKeyAllocate.value(); + } + + @VisibleForTesting + public long getNumKeyAllocateFails() { + return numKeyAllocateFails.value(); + } + + @VisibleForTesting + public long getNumKeyLookups() { + return numKeyLookup.value(); + } + + @VisibleForTesting + public long getNumKeyLookupFails() { + return numKeyLookupFails.value(); + } + + @VisibleForTesting + public long getNumKeyRenames() { + return numKeyRenames.value(); + } + + @VisibleForTesting + public long getNumKeyRenameFails() { + return numKeyRenameFails.value(); + } + + @VisibleForTesting + public long getNumKeyDeletes() { + return numKeyDeletes.value(); + } + + @VisibleForTesting + public long getNumKeyDeletesFails() { + return numKeyDeleteFails.value(); + } + + @VisibleForTesting + public long getNumBucketListFails() { + return numBucketListFails.value(); + } + + @VisibleForTesting + public long getNumKeyListFails() { + return numKeyListFails.value(); + } + + @VisibleForTesting + public long getNumVolumeListFails() { + return numVolumeListFails.value(); + } + + @VisibleForTesting + public long getNumKeyCommits() { + return numKeyCommits.value(); + } + + @VisibleForTesting + public long getNumKeyCommitFails() { + return numKeyCommitFails.value(); + } + + @VisibleForTesting + public long getNumBlockAllocates() { + return numAllocateBlockCalls.value(); + } + + @VisibleForTesting + public long getNumBlockAllocateFails() { + return numBlockAllocateCallFails.value(); + } + + @VisibleForTesting + public long getNumGetServiceListFails() { + return numGetServiceListFails.value(); + } + + public void unRegister() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java new file mode 100644 index 0000000..3820aed --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import java.io.IOException; +import java.util.Properties; +import java.util.UUID; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.common.Storage; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType; + +import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID; +import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; + +/** + * OMStorage is responsible for management of the StorageDirectories used by + * the Ozone Manager. + */ +public class OMStorage extends Storage { + + public static final String STORAGE_DIR = "om"; + public static final String OM_ID = "omUuid"; + + /** + * Construct OMStorage. + * @throws IOException if any directories are inaccessible. + */ + public OMStorage(OzoneConfiguration conf) throws IOException { + super(NodeType.OM, getOzoneMetaDirPath(conf), STORAGE_DIR); + } + + public void setScmId(String scmId) throws IOException { + if (getState() == StorageState.INITIALIZED) { + throw new IOException("OM is already initialized."); + } else { + getStorageInfo().setProperty(SCM_ID, scmId); + } + } + + public void setOmId(String omId) throws IOException { + if (getState() == StorageState.INITIALIZED) { + throw new IOException("OM is already initialized."); + } else { + getStorageInfo().setProperty(OM_ID, omId); + } + } + + /** + * Retrieves the SCM ID from the version file. + * @return SCM_ID + */ + public String getScmId() { + return getStorageInfo().getProperty(SCM_ID); + } + + /** + * Retrieves the OM ID from the version file. + * @return OM_ID + */ + public String getOmId() { + return getStorageInfo().getProperty(OM_ID); + } + + @Override + protected Properties getNodeProperties() { + String omId = getOmId(); + if (omId == null) { + omId = UUID.randomUUID().toString(); + } + Properties omProperties = new Properties(); + omProperties.setProperty(OM_ID, omId); + return omProperties; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java new file mode 100644 index 0000000..21d2411 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -0,0 +1,526 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList; + +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataKeyFilters; +import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; +import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; +import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR; +import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_PREFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys + .OZONE_OM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys + .OZONE_OM_DB_CACHE_SIZE_MB; +import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; + +/** + * Ozone metadata manager interface. + */ +public class OmMetadataManagerImpl implements OMMetadataManager { + + private final MetadataStore store; + private final ReadWriteLock lock; + private final long openKeyExpireThresholdMS; + + public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException { + File metaDir = getOzoneMetaDirPath(conf); + final int cacheSize = conf.getInt(OZONE_OM_DB_CACHE_SIZE_MB, + OZONE_OM_DB_CACHE_SIZE_DEFAULT); + File omDBFile = new File(metaDir.getPath(), OM_DB_NAME); + this.store = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(omDBFile) + .setCacheSize(cacheSize * OzoneConsts.MB) + .build(); + this.lock = new ReentrantReadWriteLock(); + this.openKeyExpireThresholdMS = 1000 * conf.getInt( + OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, + OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT); + } + + /** + * Start metadata manager. + */ + @Override + public void start() { + + } + + /** + * Stop metadata manager. + */ + @Override + public void stop() throws IOException { + if (store != null) { + store.close(); + } + } + + /** + * Get metadata store. + * @return store - metadata store. + */ + @VisibleForTesting + @Override + public MetadataStore getStore() { + return store; + } + + /** + * Given a volume return the corresponding DB key. + * @param volume - Volume name + */ + public byte[] getVolumeKey(String volume) { + String dbVolumeName = OzoneConsts.OM_VOLUME_PREFIX + volume; + return DFSUtil.string2Bytes(dbVolumeName); + } + + /** + * Given a user return the corresponding DB key. + * @param user - User name + */ + public byte[] getUserKey(String user) { + String dbUserName = OzoneConsts.OM_USER_PREFIX + user; + return DFSUtil.string2Bytes(dbUserName); + } + + /** + * Given a volume and bucket, return the corresponding DB key. + * @param volume - User name + * @param bucket - Bucket name + */ + public byte[] getBucketKey(String volume, String bucket) { + String bucketKeyString = OzoneConsts.OM_VOLUME_PREFIX + volume + + OzoneConsts.OM_BUCKET_PREFIX + bucket; + return DFSUtil.string2Bytes(bucketKeyString); + } + + /** + * @param volume + * @param bucket + * @return + */ + private String getBucketWithDBPrefix(String volume, String bucket) { + StringBuffer sb = new StringBuffer(); + sb.append(OzoneConsts.OM_VOLUME_PREFIX) + .append(volume) + .append(OzoneConsts.OM_BUCKET_PREFIX); + if (!Strings.isNullOrEmpty(bucket)) { + sb.append(bucket); + } + return sb.toString(); + } + + @Override + public String getKeyWithDBPrefix(String volume, String bucket, String key) { + String keyVB = OzoneConsts.OM_KEY_PREFIX + volume + + OzoneConsts.OM_KEY_PREFIX + bucket + + OzoneConsts.OM_KEY_PREFIX; + return Strings.isNullOrEmpty(key) ? keyVB : keyVB + key; + } + + @Override + public byte[] getDBKeyBytes(String volume, String bucket, String key) { + return DFSUtil.string2Bytes(getKeyWithDBPrefix(volume, bucket, key)); + } + + @Override + public byte[] getDeletedKeyName(byte[] keyName) { + return DFSUtil.string2Bytes( + DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName)); + } + + @Override + public byte[] getOpenKeyNameBytes(String keyName, int id) { + return DFSUtil.string2Bytes(OPEN_KEY_PREFIX + id + + OPEN_KEY_ID_DELIMINATOR + keyName); + } + + /** + * Returns the read lock used on Metadata DB. + * @return readLock + */ + @Override + public Lock readLock() { + return lock.readLock(); + } + + /** + * Returns the write lock used on Metadata DB. + * @return writeLock + */ + @Override + public Lock writeLock() { + return lock.writeLock(); + } + + /** + * Returns the value associated with this key. + * @param key - key + * @return value + */ + @Override + public byte[] get(byte[] key) throws IOException { + return store.get(key); + } + + /** + * Puts a Key into Metadata DB. + * @param key - key + * @param value - value + */ + @Override + public void put(byte[] key, byte[] value) throws IOException { + store.put(key, value); + } + + /** + * Deletes a Key from Metadata DB. + * @param key - key + */ + public void delete(byte[] key) throws IOException { + store.delete(key); + } + + @Override + public void writeBatch(BatchOperation batch) throws IOException { + this.store.writeBatch(batch); + } + + /** + * Given a volume, check if it is empty, i.e there are no buckets inside it. + * @param volume - Volume name + * @return true if the volume is empty + */ + public boolean isVolumeEmpty(String volume) throws IOException { + String dbVolumeRootName = OzoneConsts.OM_VOLUME_PREFIX + volume + + OzoneConsts.OM_BUCKET_PREFIX; + byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName); + ImmutablePair<byte[], byte[]> volumeRoot = + store.peekAround(0, dbVolumeRootKey); + if (volumeRoot != null) { + return !DFSUtil.bytes2String(volumeRoot.getKey()) + .startsWith(dbVolumeRootName); + } + return true; + } + + /** + * Given a volume/bucket, check if it is empty, + * i.e there are no keys inside it. + * @param volume - Volume name + * @param bucket - Bucket name + * @return true if the bucket is empty + */ + public boolean isBucketEmpty(String volume, String bucket) + throws IOException { + String keyRootName = getKeyWithDBPrefix(volume, bucket, null); + byte[] keyRoot = DFSUtil.string2Bytes(keyRootName); + ImmutablePair<byte[], byte[]> firstKey = store.peekAround(0, keyRoot); + if (firstKey != null) { + return !DFSUtil.bytes2String(firstKey.getKey()) + .startsWith(keyRootName); + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public List<OmBucketInfo> listBuckets(final String volumeName, + final String startBucket, final String bucketPrefix, + final int maxNumOfBuckets) throws IOException { + List<OmBucketInfo> result = new ArrayList<>(); + if (Strings.isNullOrEmpty(volumeName)) { + throw new OMException("Volume name is required.", + ResultCodes.FAILED_VOLUME_NOT_FOUND); + } + + byte[] volumeNameBytes = getVolumeKey(volumeName); + if (store.get(volumeNameBytes) == null) { + throw new OMException("Volume " + volumeName + " not found.", + ResultCodes.FAILED_VOLUME_NOT_FOUND); + } + + + // A bucket starts with /#volume/#bucket_prefix + MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> { + if (currentKey != null) { + String bucketNamePrefix = + getBucketWithDBPrefix(volumeName, bucketPrefix); + String bucket = DFSUtil.bytes2String(currentKey); + return bucket.startsWith(bucketNamePrefix); + } + return false; + }; + + List<Map.Entry<byte[], byte[]>> rangeResult; + if (!Strings.isNullOrEmpty(startBucket)) { + // Since we are excluding start key from the result, + // the maxNumOfBuckets is incremented. + rangeResult = store.getSequentialRangeKVs( + getBucketKey(volumeName, startBucket), + maxNumOfBuckets + 1, filter); + if (!rangeResult.isEmpty()) { + //Remove start key from result. + rangeResult.remove(0); + } + } else { + rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter); + } + + for (Map.Entry<byte[], byte[]> entry : rangeResult) { + OmBucketInfo info = OmBucketInfo.getFromProtobuf( + BucketInfo.parseFrom(entry.getValue())); + result.add(info); + } + return result; + } + + @Override + public List<OmKeyInfo> listKeys(String volumeName, String bucketName, + String startKey, String keyPrefix, int maxKeys) throws IOException { + List<OmKeyInfo> result = new ArrayList<>(); + if (Strings.isNullOrEmpty(volumeName)) { + throw new OMException("Volume name is required.", + ResultCodes.FAILED_VOLUME_NOT_FOUND); + } + + if (Strings.isNullOrEmpty(bucketName)) { + throw new OMException("Bucket name is required.", + ResultCodes.FAILED_BUCKET_NOT_FOUND); + } + + byte[] bucketNameBytes = getBucketKey(volumeName, bucketName); + if (store.get(bucketNameBytes) == null) { + throw new OMException("Bucket " + bucketName + " not found.", + ResultCodes.FAILED_BUCKET_NOT_FOUND); + } + + MetadataKeyFilter filter = new KeyPrefixFilter() + .addFilter(getKeyWithDBPrefix(volumeName, bucketName, keyPrefix)); + + List<Map.Entry<byte[], byte[]>> rangeResult; + if (!Strings.isNullOrEmpty(startKey)) { + //Since we are excluding start key from the result, + // the maxNumOfBuckets is incremented. + rangeResult = store.getSequentialRangeKVs( + getDBKeyBytes(volumeName, bucketName, startKey), + maxKeys + 1, filter); + if (!rangeResult.isEmpty()) { + //Remove start key from result. + rangeResult.remove(0); + } + } else { + rangeResult = store.getSequentialRangeKVs(null, maxKeys, filter); + } + + for (Map.Entry<byte[], byte[]> entry : rangeResult) { + OmKeyInfo info = OmKeyInfo.getFromProtobuf( + KeyInfo.parseFrom(entry.getValue())); + result.add(info); + } + return result; + } + + @Override + public List<OmVolumeArgs> listVolumes(String userName, + String prefix, String startKey, int maxKeys) throws IOException { + List<OmVolumeArgs> result = Lists.newArrayList(); + VolumeList volumes; + if (Strings.isNullOrEmpty(userName)) { + volumes = getAllVolumes(); + } else { + volumes = getVolumesByUser(userName); + } + + if (volumes == null || volumes.getVolumeNamesCount() == 0) { + return result; + } + + boolean startKeyFound = Strings.isNullOrEmpty(startKey); + for (String volumeName : volumes.getVolumeNamesList()) { + if (!Strings.isNullOrEmpty(prefix)) { + if (!volumeName.startsWith(prefix)) { + continue; + } + } + + if (!startKeyFound && volumeName.equals(startKey)) { + startKeyFound = true; + continue; + } + if (startKeyFound && result.size() < maxKeys) { + byte[] volumeInfo = store.get(this.getVolumeKey(volumeName)); + if (volumeInfo == null) { + // Could not get volume info by given volume name, + // since the volume name is loaded from db, + // this probably means om db is corrupted or some entries are + // accidentally removed. + throw new OMException("Volume info not found for " + volumeName, + ResultCodes.FAILED_VOLUME_NOT_FOUND); + } + VolumeInfo info = VolumeInfo.parseFrom(volumeInfo); + OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(info); + result.add(volumeArgs); + } + } + + return result; + } + + private VolumeList getVolumesByUser(String userName) + throws OMException { + return getVolumesByUser(getUserKey(userName)); + } + + private VolumeList getVolumesByUser(byte[] userNameKey) + throws OMException { + VolumeList volumes = null; + try { + byte[] volumesInBytes = store.get(userNameKey); + if (volumesInBytes == null) { + // No volume found for this user, return an empty list + return VolumeList.newBuilder().build(); + } + volumes = VolumeList.parseFrom(volumesInBytes); + } catch (IOException e) { + throw new OMException("Unable to get volumes info by the given user, " + + "metadata might be corrupted", e, + ResultCodes.FAILED_METADATA_ERROR); + } + return volumes; + } + + private VolumeList getAllVolumes() throws IOException { + // Scan all users in database + KeyPrefixFilter filter = + new KeyPrefixFilter().addFilter(OzoneConsts.OM_USER_PREFIX); + // We are not expecting a huge number of users per cluster, + // it should be fine to scan all users in db and return us a + // list of volume names in string per user. + List<Map.Entry<byte[], byte[]>> rangeKVs = store + .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter); + + VolumeList.Builder builder = VolumeList.newBuilder(); + for (Map.Entry<byte[], byte[]> entry : rangeKVs) { + VolumeList volumes = this.getVolumesByUser(entry.getKey()); + builder.addAllVolumeNames(volumes.getVolumeNamesList()); + } + + return builder.build(); + } + + @Override + public List<BlockGroup> getPendingDeletionKeys(final int count) + throws IOException { + List<BlockGroup> keyBlocksList = Lists.newArrayList(); + List<Map.Entry<byte[], byte[]>> rangeResult = + store.getRangeKVs(null, count, + MetadataKeyFilters.getDeletingKeyFilter()); + for (Map.Entry<byte[], byte[]> entry : rangeResult) { + OmKeyInfo info = + OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); + // Get block keys as a list. + OmKeyLocationInfoGroup latest = info.getLatestVersionLocations(); + if (latest == null) { + return Collections.emptyList(); + } + List<BlockID> item = latest.getLocationList().stream() + .map(b->new BlockID(b.getContainerID(), b.getLocalID())) + .collect(Collectors.toList()); + BlockGroup keyBlocks = BlockGroup.newBuilder() + .setKeyName(DFSUtil.bytes2String(entry.getKey())) + .addAllBlockIDs(item) + .build(); + keyBlocksList.add(keyBlocks); + } + return keyBlocksList; + } + + @Override + public List<BlockGroup> getExpiredOpenKeys() throws IOException { + List<BlockGroup> keyBlocksList = Lists.newArrayList(); + long now = Time.now(); + final MetadataKeyFilter openKeyFilter = + new KeyPrefixFilter().addFilter(OPEN_KEY_PREFIX); + List<Map.Entry<byte[], byte[]>> rangeResult = + store.getSequentialRangeKVs(null, Integer.MAX_VALUE, + openKeyFilter); + for (Map.Entry<byte[], byte[]> entry : rangeResult) { + OmKeyInfo info = + OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); + long lastModify = info.getModificationTime(); + if (now - lastModify < this.openKeyExpireThresholdMS) { + // consider as may still be active, not hanging. + continue; + } + // Get block keys as a list. + List<BlockID> item = info.getLatestVersionLocations() + .getBlocksLatestVersionOnly().stream() + .map(b->new BlockID(b.getContainerID(), b.getLocalID())) + .collect(Collectors.toList()); + BlockGroup keyBlocks = BlockGroup.newBuilder() + .setKeyName(DFSUtil.bytes2String(entry.getKey())) + .addAllBlockIDs(item) + .build(); + keyBlocksList.add(keyBlocks); + } + return keyBlocksList; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java new file mode 100644 index 0000000..8d94f5a --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.utils.BackgroundService; +import org.apache.hadoop.utils.BackgroundTask; +import org.apache.hadoop.utils.BackgroundTaskQueue; +import org.apache.hadoop.utils.BackgroundTaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * This is the background service to delete hanging open keys. + * Scan the metadata of om periodically to get + * the keys with prefix "#open#" and ask scm to + * delete metadata accordingly, if scm returns + * success for keys, then clean up those keys. + */ +public class OpenKeyCleanupService extends BackgroundService { + + private static final Logger LOG = + LoggerFactory.getLogger(OpenKeyCleanupService.class); + + private final static int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2; + + private final KeyManager keyManager; + private final ScmBlockLocationProtocol scmClient; + + public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient, + KeyManager keyManager, int serviceInterval, + long serviceTimeout) { + super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS, + OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); + this.keyManager = keyManager; + this.scmClient = scmClient; + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new OpenKeyDeletingTask()); + return queue; + } + + private class OpenKeyDeletingTask + implements BackgroundTask<BackgroundTaskResult> { + + @Override + public int getPriority() { + return 0; + } + + @Override + public BackgroundTaskResult call() throws Exception { + try { + List<BlockGroup> keyBlocksList = keyManager.getExpiredOpenKeys(); + if (keyBlocksList.size() > 0) { + int toDeleteSize = keyBlocksList.size(); + LOG.debug("Found {} to-delete open keys in OM", toDeleteSize); + List<DeleteBlockGroupResult> results = + scmClient.deleteKeyBlocks(keyBlocksList); + int deletedSize = 0; + for (DeleteBlockGroupResult result : results) { + if (result.isSuccess()) { + try { + keyManager.deleteExpiredOpenKey(result.getObjectKey()); + LOG.debug("Key {} deleted from OM DB", result.getObjectKey()); + deletedSize += 1; + } catch (IOException e) { + LOG.warn("Failed to delete hanging-open key {}", + result.getObjectKey(), e); + } + } else { + LOG.warn("Deleting open Key {} failed because some of the blocks" + + " were failed to delete, failed blocks: {}", + result.getObjectKey(), + StringUtils.join(",", result.getFailedBlocks())); + } + } + LOG.info("Found {} expired open key entries, successfully " + + "cleaned up {} entries", toDeleteSize, deletedSize); + return results::size; + } else { + LOG.debug("No hanging open key found in OM"); + } + } catch (IOException e) { + LOG.error("Unable to get hanging open keys, retry in" + + " next interval", e); + } + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java new file mode 100644 index 0000000..71fa921 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -0,0 +1,911 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; +import org.apache.hadoop.ozone.common.Storage.StorageState; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.helpers.OpenKeySession; +import org.apache.hadoop.ozone.om.helpers.ServiceInfo; +import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; +import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos + .ServicePort; +import org.apache.hadoop.ozone.protocol.proto + .OzoneManagerProtocolProtos.OzoneAclInfo; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdds.scm.ScmInfo; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.protocolPB + .ScmBlockLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; +import org.apache.hadoop.hdds.scm.protocolPB + .StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.StringUtils; + +import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients; +import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients; +import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled; +import static org.apache.hadoop.ozone.OmUtils.getOmAddress; +import static org.apache.hadoop.hdds.server.ServerUtils + .updateRPCListenAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.ObjectName; +import java.io.IOException; +import java.io.PrintStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; +import static org.apache.hadoop.ozone.om.OMConfigKeys + .OZONE_OM_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys + .OZONE_OM_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys + .OZONE_OM_HANDLER_COUNT_KEY; +import static org.apache.hadoop.ozone.protocol.proto + .OzoneManagerProtocolProtos.OzoneManagerService + .newReflectiveBlockingService; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos + .NodeState.HEALTHY; +import static org.apache.hadoop.util.ExitUtil.terminate; + +/** + * Ozone Manager is the metadata manager of ozone. + */ +@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) +public final class OzoneManager extends ServiceRuntimeInfoImpl + implements OzoneManagerProtocol, OMMXBean { + private static final Logger LOG = + LoggerFactory.getLogger(OzoneManager.class); + + private static final String USAGE = + "Usage: \n ozone om [genericOptions] " + "[ " + + StartupOption.CREATEOBJECTSTORE.getName() + " ]\n " + "ozone om [ " + + StartupOption.HELP.getName() + " ]\n"; + + /** Startup options. */ + public enum StartupOption { + CREATEOBJECTSTORE("-createObjectStore"), + HELP("-help"), + REGULAR("-regular"); + + private final String name; + + StartupOption(String arg) { + this.name = arg; + } + + public String getName() { + return name; + } + + public static StartupOption parse(String value) { + for (StartupOption option : StartupOption.values()) { + if (option.name.equalsIgnoreCase(value)) { + return option; + } + } + return null; + } + } + + private final OzoneConfiguration configuration; + private final RPC.Server omRpcServer; + private final InetSocketAddress omRpcAddress; + private final OMMetadataManager metadataManager; + private final VolumeManager volumeManager; + private final BucketManager bucketManager; + private final KeyManager keyManager; + private final OMMetrics metrics; + private final OzoneManagerHttpServer httpServer; + private final OMStorage omStorage; + private final ScmBlockLocationProtocol scmBlockClient; + private final StorageContainerLocationProtocol scmContainerClient; + private ObjectName omInfoBeanName; + + private OzoneManager(OzoneConfiguration conf) throws IOException { + Preconditions.checkNotNull(conf); + configuration = conf; + omStorage = new OMStorage(conf); + scmBlockClient = getScmBlockClient(configuration); + scmContainerClient = getScmContainerClient(configuration); + if (omStorage.getState() != StorageState.INITIALIZED) { + throw new OMException("OM not initialized.", + ResultCodes.OM_NOT_INITIALIZED); + } + + // verifies that the SCM info in the OM Version file is correct. + ScmInfo scmInfo = scmBlockClient.getScmInfo(); + if (!(scmInfo.getClusterId().equals(omStorage.getClusterID()) && scmInfo + .getScmId().equals(omStorage.getScmId()))) { + throw new OMException("SCM version info mismatch.", + ResultCodes.SCM_VERSION_MISMATCH_ERROR); + } + final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY, + OZONE_OM_HANDLER_COUNT_DEFAULT); + + RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class, + ProtobufRpcEngine.class); + + BlockingService omService = newReflectiveBlockingService( + new OzoneManagerProtocolServerSideTranslatorPB(this)); + final InetSocketAddress omNodeRpcAddr = + getOmAddress(configuration); + omRpcServer = startRpcServer(configuration, omNodeRpcAddr, + OzoneManagerProtocolPB.class, omService, + handlerCount); + omRpcAddress = updateRPCListenAddress(configuration, + OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer); + metadataManager = new OmMetadataManagerImpl(configuration); + volumeManager = new VolumeManagerImpl(metadataManager, configuration); + bucketManager = new BucketManagerImpl(metadataManager); + metrics = OMMetrics.create(); + keyManager = + new KeyManagerImpl(scmBlockClient, metadataManager, configuration, + omStorage.getOmId()); + httpServer = new OzoneManagerHttpServer(configuration, this); + } + + /** + * Create a scm block client, used by putKey() and getKey(). + * + * @return {@link ScmBlockLocationProtocol} + * @throws IOException + */ + private static ScmBlockLocationProtocol getScmBlockClient( + OzoneConfiguration conf) throws IOException { + RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class, + ProtobufRpcEngine.class); + long scmVersion = + RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class); + InetSocketAddress scmBlockAddress = + getScmAddressForBlockClients(conf); + ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = + new ScmBlockLocationProtocolClientSideTranslatorPB( + RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion, + scmBlockAddress, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), + Client.getRpcTimeout(conf))); + return scmBlockLocationClient; + } + + /** + * Returns a scm container client. + * + * @return {@link StorageContainerLocationProtocol} + * @throws IOException + */ + private static StorageContainerLocationProtocol getScmContainerClient( + OzoneConfiguration conf) throws IOException { + RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, + ProtobufRpcEngine.class); + long scmVersion = + RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class); + InetSocketAddress scmAddr = getScmAddressForClients( + conf); + StorageContainerLocationProtocolClientSideTranslatorPB scmContainerClient = + new StorageContainerLocationProtocolClientSideTranslatorPB( + RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion, + scmAddr, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), + Client.getRpcTimeout(conf))); + return scmContainerClient; + } + + @VisibleForTesting + public KeyManager getKeyManager() { + return keyManager; + } + + @VisibleForTesting + public ScmInfo getScmInfo() throws IOException { + return scmBlockClient.getScmInfo(); + } + + @VisibleForTesting + public OMStorage getOmStorage() { + return omStorage; + } + /** + * Starts an RPC server, if configured. + * + * @param conf configuration + * @param addr configured address of RPC server + * @param protocol RPC protocol provided by RPC server + * @param instance RPC protocol implementation instance + * @param handlerCount RPC server handler count + * + * @return RPC server + * @throws IOException if there is an I/O error while creating RPC server + */ + private static RPC.Server startRpcServer(OzoneConfiguration conf, + InetSocketAddress addr, Class<?> protocol, BlockingService instance, + int handlerCount) throws IOException { + RPC.Server rpcServer = new RPC.Builder(conf) + .setProtocol(protocol) + .setInstance(instance) + .setBindAddress(addr.getHostString()) + .setPort(addr.getPort()) + .setNumHandlers(handlerCount) + .setVerbose(false) + .setSecretManager(null) + .build(); + + DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); + return rpcServer; + } + + /** + * Get metadata manager. + * @return metadata manager. + */ + public OMMetadataManager getMetadataManager() { + return metadataManager; + } + + public OMMetrics getMetrics() { + return metrics; + } + + /** + * Main entry point for starting OzoneManager. + * + * @param argv arguments + * @throws IOException if startup fails due to I/O error + */ + public static void main(String[] argv) throws IOException { + if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) { + System.exit(0); + } + try { + OzoneConfiguration conf = new OzoneConfiguration(); + GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); + if (!hParser.isParseSuccessful()) { + System.err.println("USAGE: " + USAGE + " \n"); + hParser.printGenericCommandUsage(System.err); + System.exit(1); + } + StringUtils.startupShutdownMessage(OzoneManager.class, argv, LOG); + OzoneManager om = createOm(hParser.getRemainingArgs(), conf); + if (om != null) { + om.start(); + om.join(); + } + } catch (Throwable t) { + LOG.error("Failed to start the OzoneManager.", t); + terminate(1, t); + } + } + + private static void printUsage(PrintStream out) { + out.println(USAGE + "\n"); + } + + /** + * Constructs OM instance based on command line arguments. + * @param argv Command line arguments + * @param conf OzoneConfiguration + * @return OM instance + * @throws IOException in case OM instance creation fails. + */ + + public static OzoneManager createOm(String[] argv, + OzoneConfiguration conf) throws IOException { + if (!isHddsEnabled(conf)) { + System.err.println("OM cannot be started in secure mode or when " + + OZONE_ENABLED + " is set to false"); + System.exit(1); + } + StartupOption startOpt = parseArguments(argv); + if (startOpt == null) { + printUsage(System.err); + terminate(1); + return null; + } + switch (startOpt) { + case CREATEOBJECTSTORE: + terminate(omInit(conf) ? 0 : 1); + return null; + case HELP: + printUsage(System.err); + terminate(0); + return null; + default: + return new OzoneManager(conf); + } + } + + /** + * Initializes the OM instance. + * @param conf OzoneConfiguration + * @return true if OM initialization succeeds, false otherwise + * @throws IOException in case ozone metadata directory path is not accessible + */ + + private static boolean omInit(OzoneConfiguration conf) throws IOException { + OMStorage omStorage = new OMStorage(conf); + StorageState state = omStorage.getState(); + if (state != StorageState.INITIALIZED) { + try { + ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(conf); + ScmInfo scmInfo = scmBlockClient.getScmInfo(); + String clusterId = scmInfo.getClusterId(); + String scmId = scmInfo.getScmId(); + if (clusterId == null || clusterId.isEmpty()) { + throw new IOException("Invalid Cluster ID"); + } + if (scmId == null || scmId.isEmpty()) { + throw new IOException("Invalid SCM ID"); + } + omStorage.setClusterId(clusterId); + omStorage.setScmId(scmId); + omStorage.initialize(); + System.out.println( + "OM initialization succeeded.Current cluster id for sd=" + + omStorage.getStorageDir() + ";cid=" + omStorage + .getClusterID()); + return true; + } catch (IOException ioe) { + LOG.error("Could not initialize OM version file", ioe); + return false; + } + } else { + System.out.println( + "OM already initialized.Reusing existing cluster id for sd=" + + omStorage.getStorageDir() + ";cid=" + omStorage + .getClusterID()); + return true; + } + } + + /** + * Parses the command line options for OM initialization. + * @param args command line arguments + * @return StartupOption if options are valid, null otherwise + */ + private static StartupOption parseArguments(String[] args) { + if (args == null || args.length == 0) { + return StartupOption.REGULAR; + } else if (args.length == 1) { + return StartupOption.parse(args[0]); + } + return null; + } + + /** + * Builds a message for logging startup information about an RPC server. + * + * @param description RPC server description + * @param addr RPC server listening address + * @return server startup message + */ + private static String buildRpcServerStartMessage(String description, + InetSocketAddress addr) { + return addr != null ? String.format("%s is listening at %s", + description, addr.toString()) : + String.format("%s not started", description); + } + + /** + * Start service. + */ + public void start() throws IOException { + LOG.info(buildRpcServerStartMessage("OzoneManager RPC server", + omRpcAddress)); + DefaultMetricsSystem.initialize("OzoneManager"); + metadataManager.start(); + keyManager.start(); + omRpcServer.start(); + httpServer.start(); + registerMXBean(); + setStartTime(); + } + + /** + * Stop service. + */ + public void stop() { + try { + metadataManager.stop(); + omRpcServer.stop(); + keyManager.stop(); + httpServer.stop(); + metrics.unRegister(); + unregisterMXBean(); + } catch (Exception e) { + LOG.error("OzoneManager stop failed.", e); + } + } + + /** + * Wait until service has completed shutdown. + */ + public void join() { + try { + omRpcServer.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info("Interrupted during OzoneManager join.", e); + } + } + + /** + * Creates a volume. + * + * @param args - Arguments to create Volume. + * @throws IOException + */ + @Override + public void createVolume(OmVolumeArgs args) throws IOException { + try { + metrics.incNumVolumeCreates(); + volumeManager.createVolume(args); + } catch (Exception ex) { + metrics.incNumVolumeCreateFails(); + throw ex; + } + } + + /** + * Changes the owner of a volume. + * + * @param volume - Name of the volume. + * @param owner - Name of the owner. + * @throws IOException + */ + @Override + public void setOwner(String volume, String owner) throws IOException { + try { + metrics.incNumVolumeUpdates(); + volumeManager.setOwner(volume, owner); + } catch (Exception ex) { + metrics.incNumVolumeUpdateFails(); + throw ex; + } + } + + /** + * Changes the Quota on a volume. + * + * @param volume - Name of the volume. + * @param quota - Quota in bytes. + * @throws IOException + */ + @Override + public void setQuota(String volume, long quota) throws IOException { + try { + metrics.incNumVolumeUpdates(); + volumeManager.setQuota(volume, quota); + } catch (Exception ex) { + metrics.incNumVolumeUpdateFails(); + throw ex; + } + } + + /** + * Checks if the specified user can access this volume. + * + * @param volume - volume + * @param userAcl - user acls which needs to be checked for access + * @return true if the user has required access for the volume, + * false otherwise + * @throws IOException + */ + @Override + public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) + throws IOException { + try { + metrics.incNumVolumeCheckAccesses(); + return volumeManager.checkVolumeAccess(volume, userAcl); + } catch (Exception ex) { + metrics.incNumVolumeCheckAccessFails(); + throw ex; + } + } + + /** + * Gets the volume information. + * + * @param volume - Volume name. + * @return VolumeArgs or exception is thrown. + * @throws IOException + */ + @Override + public OmVolumeArgs getVolumeInfo(String volume) throws IOException { + try { + metrics.incNumVolumeInfos(); + return volumeManager.getVolumeInfo(volume); + } catch (Exception ex) { + metrics.incNumVolumeInfoFails(); + throw ex; + } + } + + /** + * Deletes an existing empty volume. + * + * @param volume - Name of the volume. + * @throws IOException + */ + @Override + public void deleteVolume(String volume) throws IOException { + try { + metrics.incNumVolumeDeletes(); + volumeManager.deleteVolume(volume); + } catch (Exception ex) { + metrics.incNumVolumeDeleteFails(); + throw ex; + } + } + + /** + * Lists volume owned by a specific user. + * + * @param userName - user name + * @param prefix - Filter prefix -- Return only entries that match this. + * @param prevKey - Previous key -- List starts from the next from the + * prevkey + * @param maxKeys - Max number of keys to return. + * @return List of Volumes. + * @throws IOException + */ + @Override + public List<OmVolumeArgs> listVolumeByUser(String userName, String prefix, + String prevKey, int maxKeys) throws IOException { + try { + metrics.incNumVolumeLists(); + return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys); + } catch (Exception ex) { + metrics.incNumVolumeListFails(); + throw ex; + } + } + + /** + * Lists volume all volumes in the cluster. + * + * @param prefix - Filter prefix -- Return only entries that match this. + * @param prevKey - Previous key -- List starts from the next from the + * prevkey + * @param maxKeys - Max number of keys to return. + * @return List of Volumes. + * @throws IOException + */ + @Override + public List<OmVolumeArgs> listAllVolumes(String prefix, String prevKey, int + maxKeys) throws IOException { + try { + metrics.incNumVolumeLists(); + return volumeManager.listVolumes(null, prefix, prevKey, maxKeys); + } catch (Exception ex) { + metrics.incNumVolumeListFails(); + throw ex; + } + } + + /** + * Creates a bucket. + * + * @param bucketInfo - BucketInfo to create bucket. + * @throws IOException + */ + @Override + public void createBucket(OmBucketInfo bucketInfo) throws IOException { + try { + metrics.incNumBucketCreates(); + bucketManager.createBucket(bucketInfo); + } catch (Exception ex) { + metrics.incNumBucketCreateFails(); + throw ex; + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<OmBucketInfo> listBuckets(String volumeName, + String startKey, String prefix, int maxNumOfBuckets) + throws IOException { + try { + metrics.incNumBucketLists(); + return bucketManager.listBuckets(volumeName, + startKey, prefix, maxNumOfBuckets); + } catch (IOException ex) { + metrics.incNumBucketListFails(); + throw ex; + } + } + + /** + * Gets the bucket information. + * + * @param volume - Volume name. + * @param bucket - Bucket name. + * @return OmBucketInfo or exception is thrown. + * @throws IOException + */ + @Override + public OmBucketInfo getBucketInfo(String volume, String bucket) + throws IOException { + try { + metrics.incNumBucketInfos(); + return bucketManager.getBucketInfo(volume, bucket); + } catch (Exception ex) { + metrics.incNumBucketInfoFails(); + throw ex; + } + } + + /** + * Allocate a key. + * + * @param args - attributes of the key. + * @return OmKeyInfo - the info about the allocated key. + * @throws IOException + */ + @Override + public OpenKeySession openKey(OmKeyArgs args) throws IOException { + try { + metrics.incNumKeyAllocates(); + return keyManager.openKey(args); + } catch (Exception ex) { + metrics.incNumKeyAllocateFails(); + throw ex; + } + } + + @Override + public void commitKey(OmKeyArgs args, int clientID) + throws IOException { + try { + metrics.incNumKeyCommits(); + keyManager.commitKey(args, clientID); + } catch (Exception ex) { + metrics.incNumKeyCommitFails(); + throw ex; + } + } + + @Override + public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID) + throws IOException { + try { + metrics.incNumBlockAllocateCalls(); + return keyManager.allocateBlock(args, clientID); + } catch (Exception ex) { + metrics.incNumBlockAllocateCallFails(); + throw ex; + } + } + + /** + * Lookup a key. + * + * @param args - attributes of the key. + * @return OmKeyInfo - the info about the requested key. + * @throws IOException + */ + @Override + public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException { + try { + metrics.incNumKeyLookups(); + return keyManager.lookupKey(args); + } catch (Exception ex) { + metrics.incNumKeyLookupFails(); + throw ex; + } + } + + @Override + public void renameKey(OmKeyArgs args, String toKeyName) throws IOException { + try { + metrics.incNumKeyRenames(); + keyManager.renameKey(args, toKeyName); + } catch (IOException e) { + metrics.incNumKeyRenameFails(); + throw e; + } + } + + /** + * Deletes an existing key. + * + * @param args - attributes of the key. + * @throws IOException + */ + @Override + public void deleteKey(OmKeyArgs args) throws IOException { + try { + metrics.incNumKeyDeletes(); + keyManager.deleteKey(args); + } catch (Exception ex) { + metrics.incNumKeyDeleteFails(); + throw ex; + } + } + + @Override + public List<OmKeyInfo> listKeys(String volumeName, String bucketName, + String startKey, String keyPrefix, int maxKeys) throws IOException { + try { + metrics.incNumKeyLists(); + return keyManager.listKeys(volumeName, bucketName, + startKey, keyPrefix, maxKeys); + } catch (IOException ex) { + metrics.incNumKeyListFails(); + throw ex; + } + } + + /** + * Sets bucket property from args. + * @param args - BucketArgs. + * @throws IOException + */ + @Override + public void setBucketProperty(OmBucketArgs args) + throws IOException { + try { + metrics.incNumBucketUpdates(); + bucketManager.setBucketProperty(args); + } catch (Exception ex) { + metrics.incNumBucketUpdateFails(); + throw ex; + } + } + + + /** + * Deletes an existing empty bucket from volume. + * @param volume - Name of the volume. + * @param bucket - Name of the bucket. + * @throws IOException + */ + public void deleteBucket(String volume, String bucket) throws IOException { + try { + metrics.incNumBucketDeletes(); + bucketManager.deleteBucket(volume, bucket); + } catch (Exception ex) { + metrics.incNumBucketDeleteFails(); + throw ex; + } + } + + private void registerMXBean() { + Map<String, String> jmxProperties = new HashMap<String, String>(); + jmxProperties.put("component", "ServerRuntime"); + this.omInfoBeanName = + MBeans.register("OzoneManager", + "OzoneManagerInfo", + jmxProperties, + this); + } + + private void unregisterMXBean() { + if (this.omInfoBeanName != null) { + MBeans.unregister(this.omInfoBeanName); + this.omInfoBeanName = null; + } + } + + @Override + public String getRpcPort() { + return "" + omRpcAddress.getPort(); + } + + @VisibleForTesting + public OzoneManagerHttpServer getHttpServer() { + return httpServer; + } + + @Override + public List<ServiceInfo> getServiceList() throws IOException { + // When we implement multi-home this call has to be handled properly. + List<ServiceInfo> services = new ArrayList<>(); + ServiceInfo.Builder omServiceInfoBuilder = ServiceInfo.newBuilder() + .setNodeType(HddsProtos.NodeType.OM) + .setHostname(omRpcAddress.getHostName()) + .addServicePort(ServicePort.newBuilder() + .setType(ServicePort.Type.RPC) + .setValue(omRpcAddress.getPort()) + .build()); + if (httpServer.getHttpAddress() != null) { + omServiceInfoBuilder.addServicePort(ServicePort.newBuilder() + .setType(ServicePort.Type.HTTP) + .setValue(httpServer.getHttpAddress().getPort()) + .build()); + } + if (httpServer.getHttpsAddress() != null) { + omServiceInfoBuilder.addServicePort(ServicePort.newBuilder() + .setType(ServicePort.Type.HTTPS) + .setValue(httpServer.getHttpsAddress().getPort()) + .build()); + } + services.add(omServiceInfoBuilder.build()); + + // For client we have to return SCM with container protocol port, + // not block protocol. + InetSocketAddress scmAddr = getScmAddressForClients( + configuration); + ServiceInfo.Builder scmServiceInfoBuilder = ServiceInfo.newBuilder() + .setNodeType(HddsProtos.NodeType.SCM) + .setHostname(scmAddr.getHostName()) + .addServicePort(ServicePort.newBuilder() + .setType(ServicePort.Type.RPC) + .setValue(scmAddr.getPort()).build()); + services.add(scmServiceInfoBuilder.build()); + + List<HddsProtos.Node> nodes = scmContainerClient.queryNode(HEALTHY, + HddsProtos.QueryScope.CLUSTER, ""); + + for (HddsProtos.Node node : nodes) { + HddsProtos.DatanodeDetailsProto datanode = node.getNodeID(); + + ServiceInfo.Builder dnServiceInfoBuilder = ServiceInfo.newBuilder() + .setNodeType(HddsProtos.NodeType.DATANODE) + .setHostname(datanode.getHostName()); + + dnServiceInfoBuilder.addServicePort(ServicePort.newBuilder() + .setType(ServicePort.Type.HTTP) + .setValue(DatanodeDetails.getFromProtoBuf(datanode) + .getPort(DatanodeDetails.Port.Name.REST).getValue()) + .build()); + + services.add(dnServiceInfoBuilder.build()); + } + + metrics.incNumGetServiceLists(); + // For now there is no exception that can can happen in this call, + // so failure metrics is not handled. In future if there is any need to + // handle exception in this method, we need to incorporate + // metrics.incNumGetServiceListFails() + return services; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java new file mode 100644 index 0000000..bd6ab69 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.hdds.server.BaseHttpServer; + +import java.io.IOException; + +/** + * HttpServer wrapper for the OzoneManager. + */ +public class OzoneManagerHttpServer extends BaseHttpServer { + + public OzoneManagerHttpServer(Configuration conf, OzoneManager om) + throws IOException { + super(conf, "ozoneManager"); + addServlet("serviceList", "/serviceList", ServiceListJSONServlet.class); + getWebAppContext().setAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE, om); + } + + @Override protected String getHttpAddressKey() { + return OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY; + } + + @Override protected String getHttpBindHostKey() { + return OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_KEY; + } + + @Override protected String getHttpsAddressKey() { + return OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY; + } + + @Override protected String getHttpsBindHostKey() { + return OMConfigKeys.OZONE_OM_HTTPS_BIND_HOST_KEY; + } + + @Override protected String getBindHostDefault() { + return OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_DEFAULT; + } + + @Override protected int getHttpBindPortDefault() { + return OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DEFAULT; + } + + @Override protected int getHttpsBindPortDefault() { + return OMConfigKeys.OZONE_OM_HTTPS_BIND_PORT_DEFAULT; + } + + @Override protected String getKeytabFile() { + return OMConfigKeys.OZONE_OM_KEYTAB_FILE; + } + + @Override protected String getSpnegoPrincipal() { + return OzoneConfigKeys.OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL; + } + + @Override protected String getEnabledKey() { + return OMConfigKeys.OZONE_OM_HTTP_ENABLED_KEY; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java new file mode 100644 index 0000000..47713e2 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +import org.apache.hadoop.ozone.OzoneConsts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; + + +/** + * Provides REST access to Ozone Service List. + * <p> + * This servlet generally will be placed under the /serviceList URL of + * OzoneManager HttpServer. + * + * The return format is of JSON and in the form + * <p> + * <code><pre> + * { + * "services" : [ + * { + * "NodeType":"OM", + * "Hostname" "$hostname", + * "ports" : { + * "$PortType" : "$port", + * ... + * } + * } + * ] + * } + * </pre></code> + * <p> + * + */ +public class ServiceListJSONServlet extends HttpServlet { + + private static final Logger LOG = + LoggerFactory.getLogger(ServiceListJSONServlet.class); + private static final long serialVersionUID = 1L; + + private transient OzoneManager om; + + public void init() throws ServletException { + this.om = (OzoneManager) getServletContext() + .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE); + } + + /** + * Process a GET request for the specified resource. + * + * @param request + * The servlet request we are processing + * @param response + * The servlet response we are creating + */ + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + response.setContentType("application/json; charset=utf8"); + PrintWriter writer = response.getWriter(); + try { + writer.write(objectMapper.writeValueAsString(om.getServiceList())); + } finally { + if (writer != null) { + writer.close(); + } + } + } catch (IOException e) { + LOG.error( + "Caught an exception while processing ServiceList request", e); + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java new file mode 100644 index 0000000..8475dd9 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.protocol.proto + .OzoneManagerProtocolProtos.OzoneAclInfo; + +import java.io.IOException; +import java.util.List; + +/** + * OM volume manager interface. + */ +public interface VolumeManager { + + /** + * Create a new volume. + * @param args - Volume args to create a volume + */ + void createVolume(OmVolumeArgs args) throws IOException; + + /** + * Changes the owner of a volume. + * + * @param volume - Name of the volume. + * @param owner - Name of the owner. + * @throws IOException + */ + void setOwner(String volume, String owner) throws IOException; + + /** + * Changes the Quota on a volume. + * + * @param volume - Name of the volume. + * @param quota - Quota in bytes. + * @throws IOException + */ + void setQuota(String volume, long quota) throws IOException; + + /** + * Gets the volume information. + * @param volume - Volume name. + * @return VolumeArgs or exception is thrown. + * @throws IOException + */ + OmVolumeArgs getVolumeInfo(String volume) throws IOException; + + /** + * Deletes an existing empty volume. + * + * @param volume - Name of the volume. + * @throws IOException + */ + void deleteVolume(String volume) throws IOException; + + /** + * Checks if the specified user with a role can access this volume. + * + * @param volume - volume + * @param userAcl - user acl which needs to be checked for access + * @return true if the user has access for the volume, false otherwise + * @throws IOException + */ + boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl) + throws IOException; + + /** + * Returns a list of volumes owned by a given user; if user is null, + * returns all volumes. + * + * @param userName + * volume owner + * @param prefix + * the volume prefix used to filter the listing result. + * @param startKey + * the start volume name determines where to start listing from, + * this key is excluded from the result. + * @param maxKeys + * the maximum number of volumes to return. + * @return a list of {@link OmVolumeArgs} + * @throws IOException + */ + List<OmVolumeArgs> listVolumes(String userName, String prefix, + String startKey, int maxKeys) throws IOException; +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org