HADOOP-13037. Refactor Azure Data Lake Store as an independent FileSystem. Contributed by Vishwajeet Dusane
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c61ad24 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c61ad24 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c61ad24 Branch: refs/heads/YARN-4752 Commit: 5c61ad24887f76dfc5a5935b2c5dceb6bfd99417 Parents: a9ad5d6 Author: Chris Douglas <cdoug...@apache.org> Authored: Thu Nov 10 21:55:55 2016 -0800 Committer: Chris Douglas <cdoug...@apache.org> Committed: Fri Nov 11 11:15:07 2016 -0800 ---------------------------------------------------------------------- .../src/main/resources/core-default.xml | 48 +- .../dev-support/findbugs-exclude.xml | 24 - hadoop-tools/hadoop-azure-datalake/pom.xml | 47 +- .../main/java/org/apache/hadoop/fs/adl/Adl.java | 4 + .../org/apache/hadoop/fs/adl/AdlConfKeys.java | 92 ++ .../org/apache/hadoop/fs/adl/AdlFileSystem.java | 892 +++++++++++++- .../apache/hadoop/fs/adl/AdlFsInputStream.java | 149 +++ .../apache/hadoop/fs/adl/AdlFsOutputStream.java | 82 ++ .../org/apache/hadoop/fs/adl/AdlPermission.java | 69 ++ .../hadoop/fs/adl/SdkTokenProviderAdapter.java | 41 + .../apache/hadoop/fs/adl/TokenProviderType.java | 25 + .../fs/adl/oauth2/AzureADTokenProvider.java | 70 ++ ...hedRefreshTokenBasedAccessTokenProvider.java | 135 --- .../hadoop/fs/adl/oauth2/package-info.java | 2 +- .../org/apache/hadoop/fs/adl/package-info.java | 2 +- .../org/apache/hadoop/hdfs/web/ADLConfKeys.java | 61 - .../apache/hadoop/hdfs/web/BufferManager.java | 180 --- .../web/PrivateAzureDataLakeFileSystem.java | 1116 ------------------ ...ClientCredentialBasedAccesTokenProvider.java | 156 --- ...hedRefreshTokenBasedAccessTokenProvider.java | 37 - .../hadoop/hdfs/web/oauth2/package-info.java | 24 - .../apache/hadoop/hdfs/web/package-info.java | 25 - .../hadoop/hdfs/web/resources/ADLFlush.java | 49 - .../hdfs/web/resources/ADLGetOpParam.java | 96 -- .../hdfs/web/resources/ADLPostOpParam.java | 97 -- .../hdfs/web/resources/ADLPutOpParam.java | 94 -- .../hdfs/web/resources/ADLVersionInfo.java | 51 - .../web/resources/AppendADLNoRedirectParam.java | 45 - .../web/resources/CreateADLNoRedirectParam.java | 44 - .../hadoop/hdfs/web/resources/LeaseParam.java | 53 - .../web/resources/ReadADLNoRedirectParam.java | 44 - .../hadoop/hdfs/web/resources/package-info.java | 27 - .../META-INF/org.apache.hadoop.fs.FileSystem | 16 + .../src/site/markdown/index.md | 126 +- .../apache/hadoop/fs/adl/AdlMockWebServer.java | 99 ++ .../apache/hadoop/fs/adl/TestACLFeatures.java | 262 ++++ .../hadoop/fs/adl/TestADLResponseData.java | 67 +- .../org/apache/hadoop/fs/adl/TestAdlRead.java | 196 +++ .../hadoop/fs/adl/TestAzureADTokenProvider.java | 133 +++ .../adl/TestConcurrentDataReadOperations.java | 299 +++++ .../hadoop/fs/adl/TestCustomTokenProvider.java | 136 +++ .../apache/hadoop/fs/adl/TestGetFileStatus.java | 33 +- .../apache/hadoop/fs/adl/TestListStatus.java | 34 +- .../fs/adl/TestRelativePathFormation.java | 61 + .../fs/adl/TestValidateConfiguration.java | 103 ++ .../hadoop/fs/adl/TestableAdlFileSystem.java | 3 +- .../fs/adl/common/CustomMockTokenProvider.java | 61 + .../hadoop/fs/adl/common/ExpectedResponse.java | 71 ++ .../hadoop/fs/adl/common/Parallelized.java | 60 + .../hadoop/fs/adl/common/TestDataForRead.java | 122 ++ .../fs/adl/live/AdlStorageConfiguration.java | 50 +- .../hadoop/fs/adl/live/AdlStorageContract.java | 19 +- .../live/TestAdlDifferentSizeWritesLive.java | 2 +- .../adl/live/TestAdlFileSystemContractLive.java | 2 +- .../hadoop/fs/adl/live/TestAdlReadLive.java | 342 ------ ...estAdlWebHdfsFileContextCreateMkdirLive.java | 79 -- ...AdlWebHdfsFileContextMainOperationsLive.java | 104 -- ...hedRefreshTokenBasedAccessTokenProvider.java | 149 --- .../hadoop/fs/common/AdlMockWebServer.java | 116 -- .../hadoop/fs/common/ExpectedResponse.java | 72 -- .../hadoop/fs/common/TestDataForRead.java | 120 -- .../org/apache/hadoop/hdfs/web/TestAdlRead.java | 205 ---- .../web/TestConcurrentDataReadOperations.java | 306 ----- .../hdfs/web/TestConfigurationSetting.java | 138 --- .../hdfs/web/TestSplitSizeCalculation.java | 123 -- .../src/test/resources/adls.xml | 11 +- .../test/resources/contract-test-options.xml | 62 +- .../src/test/resources/log4j.properties | 30 + 68 files changed, 3277 insertions(+), 4416 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 327acfa..05004c1 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2382,53 +2382,6 @@ <!-- Azure Data Lake File System Configurations --> - - <property> - <name>adl.feature.override.readahead</name> - <value>true</value> - <description> - Enables read aheads in the ADL client, the feature is used to - improve read throughput. - This works in conjunction with the value set in - adl.feature.override.readahead.max.buffersize. - When set to false the read ahead feature is turned off. - Default : True if not configured. - </description> - </property> - - <property> - <name>adl.feature.override.readahead.max.buffersize</name> - <value>8388608</value> - <description> - Define maximum buffer size to cache read ahead data, this is - allocated per process to - cache read ahead data. Applicable only when - adl.feature.override.readahead is set to true. - Default : 8388608 Byte i.e. 8MB if not configured. - </description> - </property> - - <property> - <name>adl.feature.override.readahead.max.concurrent.connection</name> - <value>2</value> - <description> - Define maximum concurrent connection can be established to - read ahead. If the data size is less than 4MB then only 1 read n/w - connection - is set. If the data size is less than 4MB but less than 8MB then 2 read - n/w connection - is set. Data greater than 8MB then value set under the property would - take - effect. Applicable only when adl.feature.override.readahead is set - to true and buffer size is greater than 8MB. - It is recommended to reset this property if the - adl.feature.override.readahead.max.buffersize - is less than 8MB to gain performance. Application has to consider - throttling limit for the account as well before configuring large - buffer size. - </description> - </property> - <property> <name>fs.adl.impl</name> <value>org.apache.hadoop.fs.adl.AdlFileSystem</value> @@ -2438,6 +2391,7 @@ <name>fs.AbstractFileSystem.adl.impl</name> <value>org.apache.hadoop.fs.adl.Adl</value> </property> + <!-- Azure Data Lake File System Configurations Ends Here--> <property> <name>hadoop.caller.context.enabled</name> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml deleted file mode 100644 index 4fd36ef..0000000 --- a/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<FindBugsFilter> - <!-- Buffer object is accessed withing trusted code and intentionally assigned instead of array copy --> - <Match> - <Class name="org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem$BatchAppendOutputStream$CommitTask"/> - <Bug pattern="EI_EXPOSE_REP2"/> - <Priority value="2"/> - </Match> -</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/pom.xml b/hadoop-tools/hadoop-azure-datalake/pom.xml index 9a15b04..eba0979 100644 --- a/hadoop-tools/hadoop-azure-datalake/pom.xml +++ b/hadoop-tools/hadoop-azure-datalake/pom.xml @@ -35,22 +35,17 @@ <file.encoding>UTF-8</file.encoding> <downloadSources>true</downloadSources> </properties> - + <repositories> + <repository> + <id>snapshots-repo</id> + <url>https://oss.sonatype.org/content/repositories/snapshots</url> + <releases><enabled>false</enabled></releases> + <snapshots><enabled>true</enabled></snapshots> + </repository> + </repositories> <build> <plugins> <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <configuration> - <findbugsXmlOutput>true</findbugsXmlOutput> - <xmlOutput>true</xmlOutput> - <excludeFilterFile> - ${basedir}/dev-support/findbugs-exclude.xml - </excludeFilterFile> - <effort>Max</effort> - </configuration> - </plugin> - <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-project-info-reports-plugin</artifactId> @@ -130,20 +125,13 @@ </build> <dependencies> + <!-- SDK Dependency --> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <exclusions> - <exclusion> - <artifactId>javax.servlet-api</artifactId> - <groupId>javax.servlet</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs-client</artifactId> + <groupId>com.microsoft.azure</groupId> + <artifactId>azure-data-lake-store-sdk</artifactId> + <version>2.0.4-SNAPSHOT</version> </dependency> + <!-- ENDS HERE--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> @@ -159,11 +147,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <scope>test</scope> - </dependency> - <dependency> <groupId>com.eclipsesource.minimal-json</groupId> <artifactId>minimal-json</artifactId> <version>0.9.1</version> @@ -181,9 +164,5 @@ <version>2.4.0</version> <scope>test</scope> </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java index 4642d6b..7ec04cf 100644 --- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.adl; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DelegateToFileSystem; @@ -29,6 +31,8 @@ import java.net.URISyntaxException; /** * Expose adl:// scheme to access ADL file system. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public class Adl extends DelegateToFileSystem { Adl(URI theUri, Configuration conf) throws IOException, URISyntaxException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlConfKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlConfKeys.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlConfKeys.java new file mode 100644 index 0000000..21120df --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlConfKeys.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.adl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Constants. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class AdlConfKeys { + // OAuth2 Common Configuration + public static final String AZURE_AD_REFRESH_URL_KEY = "dfs.adls.oauth2" + + ".refresh.url"; + + // optional when provider type is refresh or client id. + public static final String AZURE_AD_TOKEN_PROVIDER_CLASS_KEY = + "dfs.adls.oauth2.access.token.provider"; + public static final String AZURE_AD_CLIENT_ID_KEY = + "dfs.adls.oauth2.client.id"; + public static final String AZURE_AD_TOKEN_PROVIDER_TYPE_KEY = + "dfs.adls.oauth2.access.token.provider.type"; + + // OAuth Refresh Token Configuration + public static final String AZURE_AD_REFRESH_TOKEN_KEY = + "dfs.adls.oauth2.refresh.token"; + + public static final String TOKEN_PROVIDER_TYPE_REFRESH_TOKEN = "RefreshToken"; + // OAuth Client Cred Token Configuration + public static final String AZURE_AD_CLIENT_SECRET_KEY = + "dfs.adls.oauth2.credential"; + public static final String TOKEN_PROVIDER_TYPE_CLIENT_CRED = + "ClientCredential"; + + public static final String READ_AHEAD_BUFFER_SIZE_KEY = + "adl.feature.client.cache.readahead"; + + public static final String WRITE_BUFFER_SIZE_KEY = + "adl.feature.client.cache.drop.behind.writes"; + static final String SECURE_TRANSPORT_SCHEME = "https"; + static final String INSECURE_TRANSPORT_SCHEME = "http"; + static final String ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER = + "adl.debug.override.localuserasfileowner"; + + static final boolean ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT = false; + static final long ADL_BLOCK_SIZE = 256 * 1024 * 1024; + static final int ADL_REPLICATION_FACTOR = 1; + static final String ADL_HADOOP_CLIENT_NAME = "hadoop-azure-datalake-"; + static final String ADL_HADOOP_CLIENT_VERSION = + "2.0.0-SNAPSHOT"; + static final String ADL_EVENTS_TRACKING_SOURCE = "adl.events.tracking.source"; + static final String ADL_EVENTS_TRACKING_CLUSTERNAME = + "adl.events.tracking.clustername"; + + static final String ADL_EVENTS_TRACKING_CLUSTERTYPE = + "adl.events.tracking.clustertype"; + static final int DEFAULT_READ_AHEAD_BUFFER_SIZE = 4 * 1024 * 1024; + static final int DEFAULT_WRITE_AHEAD_BUFFER_SIZE = 4 * 1024 * 1024; + + static final String LATENCY_TRACKER_KEY = + "adl.dfs.enable.client.latency.tracker"; + static final boolean LATENCY_TRACKER_DEFAULT = true; + + static final String ADL_EXPERIMENT_POSITIONAL_READ_KEY = + "adl.feature.experiment.positional.read.enable"; + static final boolean ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT = true; + + static final String ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION = + "adl.feature.support.acl.bit"; + static final boolean ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION_DEFAULT = true; + + private AdlConfKeys() { + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java index 11e1e0b..9083afc 100644 --- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java @@ -19,23 +19,905 @@ package org.apache.hadoop.fs.adl; -import org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.microsoft.azure.datalake.store.ADLStoreClient; +import com.microsoft.azure.datalake.store.ADLStoreOptions; +import com.microsoft.azure.datalake.store.DirectoryEntry; +import com.microsoft.azure.datalake.store.DirectoryEntryType; +import com.microsoft.azure.datalake.store.IfExists; +import com.microsoft.azure.datalake.store.LatencyTracker; +import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider; +import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider; +import com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.ContentSummary.Builder; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.InvalidPathException; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.adl.oauth2.AzureADTokenProvider; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.VersionInfo; +import static org.apache.hadoop.fs.adl.AdlConfKeys.*; /** - * Expose adl:// scheme to access ADL file system. + * A FileSystem to access Azure Data Lake Store. */ -public class AdlFileSystem extends PrivateAzureDataLakeFileSystem { +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AdlFileSystem extends FileSystem { + static final String SCHEME = "adl"; + static final int DEFAULT_PORT = 443; + private URI uri; + private String userName; + private boolean overrideOwner; + private ADLStoreClient adlClient; + private Path workingDirectory; + private boolean aclBitStatus; - public static final String SCHEME = "adl"; - public static final int DEFAULT_PORT = 443; + // retained for tests + private AccessTokenProvider tokenProvider; + private AzureADTokenProvider azureTokenProvider; @Override public String getScheme() { return SCHEME; } + public URI getUri() { + return uri; + } + @Override public int getDefaultPort() { return DEFAULT_PORT; } + + @Override + public boolean supportsSymlinks() { + return false; + } + + /** + * Called after a new FileSystem instance is constructed. + * + * @param storeUri a uri whose authority section names the host, port, etc. + * for this FileSystem + * @param conf the configuration + */ + @Override + public void initialize(URI storeUri, Configuration conf) throws IOException { + super.initialize(storeUri, conf); + this.setConf(conf); + this.uri = URI + .create(storeUri.getScheme() + "://" + storeUri.getAuthority()); + + try { + userName = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + userName = "hadoop"; + } + + this.setWorkingDirectory(getHomeDirectory()); + + overrideOwner = getConf().getBoolean(ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER, + ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT); + + aclBitStatus = conf.getBoolean(ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION, + ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION_DEFAULT); + + String accountFQDN = null; + String mountPoint = null; + String hostname = storeUri.getHost(); + if (!hostname.contains(".") && !hostname.equalsIgnoreCase( + "localhost")) { // this is a symbolic name. Resolve it. + String hostNameProperty = "dfs.adls." + hostname + ".hostname"; + String mountPointProperty = "dfs.adls." + hostname + ".mountpoint"; + accountFQDN = getNonEmptyVal(conf, hostNameProperty); + mountPoint = getNonEmptyVal(conf, mountPointProperty); + } else { + accountFQDN = hostname; + } + + if (storeUri.getPort() > 0) { + accountFQDN = accountFQDN + ":" + storeUri.getPort(); + } + + adlClient = ADLStoreClient + .createClient(accountFQDN, getAccessTokenProvider(conf)); + + ADLStoreOptions options = new ADLStoreOptions(); + options.enableThrowingRemoteExceptions(); + + if (getTransportScheme().equalsIgnoreCase(INSECURE_TRANSPORT_SCHEME)) { + options.setInsecureTransport(); + } + + if (mountPoint != null) { + options.setFilePathPrefix(mountPoint); + } + + String clusterName = conf.get(ADL_EVENTS_TRACKING_CLUSTERNAME, "UNKNOWN"); + String clusterType = conf.get(ADL_EVENTS_TRACKING_CLUSTERTYPE, "UNKNOWN"); + + String clientVersion = ADL_HADOOP_CLIENT_NAME + (StringUtils + .isEmpty(VersionInfo.getVersion().trim()) ? + ADL_HADOOP_CLIENT_VERSION.trim() : + VersionInfo.getVersion().trim()); + options.setUserAgentSuffix(clientVersion + "/" + + VersionInfo.getVersion().trim() + "/" + clusterName + "/" + + clusterType); + + adlClient.setOptions(options); + + boolean trackLatency = conf + .getBoolean(LATENCY_TRACKER_KEY, LATENCY_TRACKER_DEFAULT); + if (!trackLatency) { + LatencyTracker.disable(); + } + } + + /** + * This method is provided for convenience for derived classes to define + * custom {@link AzureADTokenProvider} instance. + * + * In order to ensure secure hadoop infrastructure and user context for which + * respective {@link AdlFileSystem} instance is initialized, + * Loading {@link AzureADTokenProvider} is not sufficient. + * + * The order of loading {@link AzureADTokenProvider} is to first invoke + * {@link #getCustomAccessTokenProvider(Configuration)}, If method return null + * which means no implementation provided by derived classes, then + * configuration object is loaded to retrieve token configuration as specified + * is documentation. + * + * Custom token management takes the higher precedence during initialization. + * + * @param conf Configuration object + * @return null if the no custom {@link AzureADTokenProvider} token management + * is specified. + * @throws IOException if failed to initialize token provider. + */ + protected synchronized AzureADTokenProvider getCustomAccessTokenProvider( + Configuration conf) throws IOException { + String className = getNonEmptyVal(conf, AZURE_AD_TOKEN_PROVIDER_CLASS_KEY); + + Class<? extends AzureADTokenProvider> azureADTokenProviderClass = + conf.getClass(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY, null, + AzureADTokenProvider.class); + if (azureADTokenProviderClass == null) { + throw new IllegalArgumentException( + "Configuration " + className + " " + "not defined/accessible."); + } + + azureTokenProvider = ReflectionUtils + .newInstance(azureADTokenProviderClass, conf); + if (azureTokenProvider == null) { + throw new IllegalArgumentException("Failed to initialize " + className); + } + + azureTokenProvider.initialize(conf); + return azureTokenProvider; + } + + private AccessTokenProvider getAccessTokenProvider(Configuration conf) + throws IOException { + TokenProviderType type = conf.getEnum( + AdlConfKeys.AZURE_AD_TOKEN_PROVIDER_TYPE_KEY, TokenProviderType.Custom); + + switch (type) { + case RefreshToken: + tokenProvider = getConfRefreshTokenBasedTokenProvider(conf); + break; + case ClientCredential: + tokenProvider = getConfCredentialBasedTokenProvider(conf); + break; + case Custom: + default: + AzureADTokenProvider azureADTokenProvider = getCustomAccessTokenProvider( + conf); + tokenProvider = new SdkTokenProviderAdapter(azureADTokenProvider); + break; + } + + return tokenProvider; + } + + private AccessTokenProvider getConfCredentialBasedTokenProvider( + Configuration conf) { + String clientId = getNonEmptyVal(conf, AZURE_AD_CLIENT_ID_KEY); + String refreshUrl = getNonEmptyVal(conf, AZURE_AD_REFRESH_URL_KEY); + String clientSecret = getNonEmptyVal(conf, AZURE_AD_CLIENT_SECRET_KEY); + return new ClientCredsTokenProvider(refreshUrl, clientId, clientSecret); + } + + private AccessTokenProvider getConfRefreshTokenBasedTokenProvider( + Configuration conf) { + String clientId = getNonEmptyVal(conf, AZURE_AD_CLIENT_ID_KEY); + String refreshToken = getNonEmptyVal(conf, AZURE_AD_REFRESH_TOKEN_KEY); + return new RefreshTokenBasedTokenProvider(clientId, refreshToken); + } + + @VisibleForTesting + AccessTokenProvider getTokenProvider() { + return tokenProvider; + } + + @VisibleForTesting + AzureADTokenProvider getAzureTokenProvider() { + return azureTokenProvider; + } + + /** + * Constructing home directory locally is fine as long as Hadoop + * local user name and ADL user name relationship story is not fully baked + * yet. + * + * @return Hadoop local user home directory. + */ + @Override + public Path getHomeDirectory() { + return makeQualified(new Path("/user/" + userName)); + } + + /** + * Create call semantic is handled differently in case of ADL. Create + * semantics is translated to Create/Append + * semantics. + * 1. No dedicated connection to server. + * 2. Buffering is locally done, Once buffer is full or flush is invoked on + * the by the caller. All the pending + * data is pushed to ADL as APPEND operation code. + * 3. On close - Additional call is send to server to close the stream, and + * release lock from the stream. + * + * Necessity of Create/Append semantics is + * 1. ADL backend server does not allow idle connection for longer duration + * . In case of slow writer scenario, + * observed connection timeout/Connection reset causing occasional job + * failures. + * 2. Performance boost to jobs which are slow writer, avoided network latency + * 3. ADL equally better performing with multiple of 4MB chunk as append + * calls. + * + * @param f File path + * @param permission Access permission for the newly created file + * @param overwrite Remove existing file and recreate new one if true + * otherwise throw error if file exist + * @param bufferSize Buffer size, ADL backend does not honour + * @param replication Replication count, ADL backend does not honour + * @param blockSize Block size, ADL backend does not honour + * @param progress Progress indicator + * @return FSDataOutputStream OutputStream on which application can push + * stream of bytes + * @throws IOException when system error, internal server error or user error + */ + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + statistics.incrementWriteOps(1); + IfExists overwriteRule = overwrite ? IfExists.OVERWRITE : IfExists.FAIL; + return new FSDataOutputStream(new AdlFsOutputStream(adlClient + .createFile(toRelativeFilePath(f), overwriteRule, + Integer.toOctalString(applyUMask(permission).toShort()), true), + getConf()), this.statistics); + } + + /** + * Opens an FSDataOutputStream at the indicated Path with write-progress + * reporting. Same as create(), except fails if parent directory doesn't + * already exist. + * + * @param f the file name to open + * @param permission Access permission for the newly created file + * @param flags {@link CreateFlag}s to use for this stream. + * @param bufferSize the size of the buffer to be used. ADL backend does + * not honour + * @param replication required block replication for the file. ADL backend + * does not honour + * @param blockSize Block size, ADL backend does not honour + * @param progress Progress indicator + * @throws IOException when system error, internal server error or user error + * @see #setPermission(Path, FsPermission) + * @deprecated API only for 0.20-append + */ + @Deprecated + @Override + public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, + EnumSet<CreateFlag> flags, int bufferSize, short replication, + long blockSize, Progressable progress) throws IOException { + statistics.incrementWriteOps(1); + IfExists overwriteRule = IfExists.FAIL; + for (CreateFlag flag : flags) { + if (flag == CreateFlag.OVERWRITE) { + overwriteRule = IfExists.OVERWRITE; + break; + } + } + + return new FSDataOutputStream(new AdlFsOutputStream(adlClient + .createFile(toRelativeFilePath(f), overwriteRule, + Integer.toOctalString(applyUMask(permission).toShort()), false), + getConf()), this.statistics); + } + + /** + * Append to an existing file (optional operation). + * + * @param f the existing file to be appended. + * @param bufferSize the size of the buffer to be used. ADL backend does + * not honour + * @param progress Progress indicator + * @throws IOException when system error, internal server error or user error + */ + @Override + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + statistics.incrementWriteOps(1); + return new FSDataOutputStream( + new AdlFsOutputStream(adlClient.getAppendStream(toRelativeFilePath(f)), + getConf()), this.statistics); + } + + /** + * Azure data lake does not support user configuration for data replication + * hence not leaving system to query on + * azure data lake. + * + * Stub implementation + * + * @param p Not honoured + * @param replication Not honoured + * @return True hard coded since ADL file system does not support + * replication configuration + * @throws IOException No exception would not thrown in this case however + * aligning with parent api definition. + */ + @Override + public boolean setReplication(final Path p, final short replication) + throws IOException { + statistics.incrementWriteOps(1); + return true; + } + + /** + * Open call semantic is handled differently in case of ADL. Instead of + * network stream is returned to the user, + * Overridden FsInputStream is returned. + * + * @param f File path + * @param buffersize Buffer size, Not honoured + * @return FSDataInputStream InputStream on which application can read + * stream of bytes + * @throws IOException when system error, internal server error or user error + */ + @Override + public FSDataInputStream open(final Path f, final int buffersize) + throws IOException { + statistics.incrementReadOps(1); + return new FSDataInputStream( + new AdlFsInputStream(adlClient.getReadStream(toRelativeFilePath(f)), + statistics, getConf())); + } + + /** + * Return a file status object that represents the path. + * + * @param f The path we want information from + * @return a FileStatus object + * @throws IOException when the path does not exist or any other error; + * IOException see specific implementation + */ + @Override + public FileStatus getFileStatus(final Path f) throws IOException { + statistics.incrementReadOps(1); + DirectoryEntry entry = adlClient.getDirectoryEntry(toRelativeFilePath(f)); + return toFileStatus(entry, f); + } + + /** + * List the statuses of the files/directories in the given path if the path is + * a directory. + * + * @param f given path + * @return the statuses of the files/directories in the given patch + * @throws IOException when the path does not exist or any other error; + * IOException see specific implementation + */ + @Override + public FileStatus[] listStatus(final Path f) throws IOException { + statistics.incrementReadOps(1); + List<DirectoryEntry> entries = + adlClient.enumerateDirectory(toRelativeFilePath(f)); + return toFileStatuses(entries, f); + } + + /** + * Renames Path src to Path dst. Can take place on local fs + * or remote DFS. + * + * ADLS support POSIX standard for rename operation. + * + * @param src path to be renamed + * @param dst new path after rename + * @return true if rename is successful + * @throws IOException on failure + */ + @Override + public boolean rename(final Path src, final Path dst) throws IOException { + statistics.incrementWriteOps(1); + return adlClient.rename(toRelativeFilePath(src), toRelativeFilePath(dst)); + } + + @Override + @Deprecated + public void rename(final Path src, final Path dst, + final Options.Rename... options) throws IOException { + statistics.incrementWriteOps(1); + boolean overwrite = false; + for (Rename renameOption : options) { + if (renameOption == Rename.OVERWRITE) { + overwrite = true; + break; + } + } + adlClient + .rename(toRelativeFilePath(src), toRelativeFilePath(dst), overwrite); + } + + /** + * Concat existing files together. + * + * @param trg the path to the target destination. + * @param srcs the paths to the sources to use for the concatenation. + * @throws IOException when system error, internal server error or user error + */ + @Override + public void concat(final Path trg, final Path[] srcs) throws IOException { + statistics.incrementWriteOps(1); + List<String> sourcesList = new ArrayList<String>(); + for (Path entry : srcs) { + sourcesList.add(toRelativeFilePath(entry)); + } + adlClient.concatenateFiles(toRelativeFilePath(trg), sourcesList); + } + + /** + * Delete a file. + * + * @param path the path to delete. + * @param recursive if path is a directory and set to + * true, the directory is deleted else throws an exception. + * In case of a file the recursive can be set to either + * true or false. + * @return true if delete is successful else false. + * @throws IOException when system error, internal server error or user error + */ + @Override + public boolean delete(final Path path, final boolean recursive) + throws IOException { + statistics.incrementWriteOps(1); + return recursive ? + adlClient.deleteRecursive(toRelativeFilePath(path)) : + adlClient.delete(toRelativeFilePath(path)); + } + + /** + * Make the given file and all non-existent parents into + * directories. Has the semantics of Unix 'mkdir -p'. + * Existence of the directory hierarchy is not an error. + * + * @param path path to create + * @param permission to apply to path + */ + @Override + public boolean mkdirs(final Path path, final FsPermission permission) + throws IOException { + statistics.incrementWriteOps(1); + return adlClient.createDirectory(toRelativeFilePath(path), + Integer.toOctalString(applyUMask(permission).toShort())); + } + + private FileStatus[] toFileStatuses(final List<DirectoryEntry> entries, + final Path parent) { + FileStatus[] fileStatuses = new FileStatus[entries.size()]; + int index = 0; + for (DirectoryEntry entry : entries) { + FileStatus status = toFileStatus(entry, parent); + if (!(entry.name == null || entry.name == "")) { + status.setPath( + new Path(parent.makeQualified(uri, workingDirectory), entry.name)); + } + + fileStatuses[index++] = status; + } + + return fileStatuses; + } + + private FsPermission applyUMask(FsPermission permission) { + if (permission == null) { + permission = FsPermission.getDefault(); + } + return permission.applyUMask(FsPermission.getUMask(getConf())); + } + + private FileStatus toFileStatus(final DirectoryEntry entry, final Path f) { + boolean isDirectory = entry.type == DirectoryEntryType.DIRECTORY; + long lastModificationData = entry.lastModifiedTime.getTime(); + long lastAccessTime = entry.lastAccessTime.getTime(); + FsPermission permission = new AdlPermission(aclBitStatus, + Short.valueOf(entry.permission, 8)); + String user = entry.user; + String group = entry.group; + + FileStatus status; + if (overrideOwner) { + status = new FileStatus(entry.length, isDirectory, ADL_REPLICATION_FACTOR, + ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission, + userName, "hdfs", this.makeQualified(f)); + } else { + status = new FileStatus(entry.length, isDirectory, ADL_REPLICATION_FACTOR, + ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission, + user, group, this.makeQualified(f)); + } + + return status; + } + + /** + * Set owner of a path (i.e. a file or a directory). + * The parameters owner and group cannot both be null. + * + * @param path The path + * @param owner If it is null, the original username remains unchanged. + * @param group If it is null, the original groupname remains unchanged. + */ + @Override + public void setOwner(final Path path, final String owner, final String group) + throws IOException { + statistics.incrementWriteOps(1); + adlClient.setOwner(toRelativeFilePath(path), owner, group); + } + + /** + * Set permission of a path. + * + * @param path The path + * @param permission Access permission + */ + @Override + public void setPermission(final Path path, final FsPermission permission) + throws IOException { + statistics.incrementWriteOps(1); + adlClient.setPermission(toRelativeFilePath(path), + Integer.toOctalString(permission.toShort())); + } + + /** + * Modifies ACL entries of files and directories. This method can add new ACL + * entries or modify the permissions on existing ACL entries. All existing + * ACL entries that are not specified in this call are retained without + * changes. (Modifications are merged into the current ACL.) + * + * @param path Path to modify + * @param aclSpec List of AclEntry describing modifications + * @throws IOException if an ACL could not be modified + */ + @Override + public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec) + throws IOException { + statistics.incrementWriteOps(1); + List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new + ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>(); + for (AclEntry aclEntry : aclSpec) { + msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry + .parseAclEntry(aclEntry.toString())); + } + adlClient.modifyAclEntries(toRelativeFilePath(path), msAclEntries); + } + + /** + * Removes ACL entries from files and directories. Other ACL entries are + * retained. + * + * @param path Path to modify + * @param aclSpec List of AclEntry describing entries to remove + * @throws IOException if an ACL could not be modified + */ + @Override + public void removeAclEntries(final Path path, final List<AclEntry> aclSpec) + throws IOException { + statistics.incrementWriteOps(1); + List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new + ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>(); + for (AclEntry aclEntry : aclSpec) { + msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry + .parseAclEntry(aclEntry.toString(), true)); + } + adlClient.removeAclEntries(toRelativeFilePath(path), msAclEntries); + } + + /** + * Removes all default ACL entries from files and directories. + * + * @param path Path to modify + * @throws IOException if an ACL could not be modified + */ + @Override + public void removeDefaultAcl(final Path path) throws IOException { + statistics.incrementWriteOps(1); + adlClient.removeDefaultAcls(toRelativeFilePath(path)); + } + + /** + * Removes all but the base ACL entries of files and directories. The entries + * for user, group, and others are retained for compatibility with permission + * bits. + * + * @param path Path to modify + * @throws IOException if an ACL could not be removed + */ + @Override + public void removeAcl(final Path path) throws IOException { + statistics.incrementWriteOps(1); + adlClient.removeAllAcls(toRelativeFilePath(path)); + } + + /** + * Fully replaces ACL of files and directories, discarding all existing + * entries. + * + * @param path Path to modify + * @param aclSpec List of AclEntry describing modifications, must include + * entries for user, group, and others for compatibility with + * permission bits. + * @throws IOException if an ACL could not be modified + */ + @Override + public void setAcl(final Path path, final List<AclEntry> aclSpec) + throws IOException { + statistics.incrementWriteOps(1); + List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new + ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>(); + for (AclEntry aclEntry : aclSpec) { + msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry + .parseAclEntry(aclEntry.toString())); + } + + adlClient.setAcl(toRelativeFilePath(path), msAclEntries); + } + + /** + * Gets the ACL of a file or directory. + * + * @param path Path to get + * @return AclStatus describing the ACL of the file or directory + * @throws IOException if an ACL could not be read + */ + @Override + public AclStatus getAclStatus(final Path path) throws IOException { + statistics.incrementReadOps(1); + com.microsoft.azure.datalake.store.acl.AclStatus adlStatus = adlClient + .getAclStatus(toRelativeFilePath(path)); + AclStatus.Builder aclStatusBuilder = new AclStatus.Builder(); + aclStatusBuilder.owner(adlStatus.owner); + aclStatusBuilder.group(adlStatus.group); + aclStatusBuilder.setPermission( + new FsPermission(Short.valueOf(adlStatus.octalPermissions, 8))); + aclStatusBuilder.stickyBit(adlStatus.stickyBit); + String aclListString = com.microsoft.azure.datalake.store.acl.AclEntry + .aclListToString(adlStatus.aclSpec); + List<AclEntry> aclEntries = AclEntry.parseAclSpec(aclListString, true); + aclStatusBuilder.addEntries(aclEntries); + return aclStatusBuilder.build(); + } + + /** + * Checks if the user can access a path. The mode specifies which access + * checks to perform. If the requested permissions are granted, then the + * method returns normally. If access is denied, then the method throws an + * {@link AccessControlException}. + * + * @param path Path to check + * @param mode type of access to check + * @throws AccessControlException if access is denied + * @throws java.io.FileNotFoundException if the path does not exist + * @throws IOException see specific implementation + */ + @Override + public void access(final Path path, FsAction mode) throws IOException { + statistics.incrementReadOps(1); + if (!adlClient.checkAccess(toRelativeFilePath(path), mode.SYMBOL)) { + throw new AccessControlException("Access Denied : " + path.toString()); + } + } + + /** + * Return the {@link ContentSummary} of a given {@link Path}. + * + * @param f path to use + */ + @Override + public ContentSummary getContentSummary(Path f) throws IOException { + statistics.incrementReadOps(1); + com.microsoft.azure.datalake.store.ContentSummary msSummary = adlClient + .getContentSummary(toRelativeFilePath(f)); + return new Builder().length(msSummary.length) + .directoryCount(msSummary.directoryCount).fileCount(msSummary.fileCount) + .spaceConsumed(msSummary.spaceConsumed).build(); + } + + @VisibleForTesting + protected String getTransportScheme() { + return SECURE_TRANSPORT_SCHEME; + } + + @VisibleForTesting + String toRelativeFilePath(Path path) { + return path.makeQualified(uri, workingDirectory).toUri().getPath(); + } + + /** + * Get the current working directory for the given file system. + * + * @return the directory pathname + */ + @Override + public Path getWorkingDirectory() { + return workingDirectory; + } + + /** + * Set the current working directory for the given file system. All relative + * paths will be resolved relative to it. + * + * @param dir Working directory path. + */ + @Override + public void setWorkingDirectory(final Path dir) { + if (dir == null) { + throw new InvalidPathException("Working directory cannot be set to NULL"); + } + + /** + * Do not validate the scheme and URI of the passsed parameter. When Adls + * runs as additional file system, working directory set has the default + * file system scheme and uri. + * + * Found a problem during PIG execution in + * https://github.com/apache/pig/blob/branch-0 + * .15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer + * /PigInputFormat.java#L235 + * However similar problem would be present in other application so + * defaulting to build working directory using relative path only. + */ + this.workingDirectory = this.makeAbsolute(dir); + } + + /** + * Return the number of bytes that large input files should be optimally + * be split into to minimize i/o time. + * + * @deprecated use {@link #getDefaultBlockSize(Path)} instead + */ + @Deprecated + public long getDefaultBlockSize() { + return ADL_BLOCK_SIZE; + } + + /** + * Return the number of bytes that large input files should be optimally + * be split into to minimize i/o time. The given path will be used to + * locate the actual filesystem. The full path does not have to exist. + * + * @param f path of file + * @return the default block size for the path's filesystem + */ + public long getDefaultBlockSize(Path f) { + return getDefaultBlockSize(); + } + + /** + * Get the block size. + * @param f the filename + * @return the number of bytes in a block + */ + /** + * @deprecated Use getFileStatus() instead + */ + @Deprecated + public long getBlockSize(Path f) throws IOException { + return ADL_BLOCK_SIZE; + } + + @Override + public BlockLocation[] getFileBlockLocations(final FileStatus status, + final long offset, final long length) throws IOException { + if (status == null) { + return null; + } + + if ((offset < 0) || (length < 0)) { + throw new IllegalArgumentException("Invalid start or len parameter"); + } + + if (status.getLen() < offset) { + return new BlockLocation[0]; + } + + final String[] name = {"localhost"}; + final String[] host = {"localhost"}; + long blockSize = ADL_BLOCK_SIZE; + int numberOfLocations = + (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1); + BlockLocation[] locations = new BlockLocation[numberOfLocations]; + for (int i = 0; i < locations.length; i++) { + long currentOffset = offset + (i * blockSize); + long currentLength = Math.min(blockSize, offset + length - currentOffset); + locations[i] = new BlockLocation(name, host, currentOffset, + currentLength); + } + + return locations; + } + + @Override + public BlockLocation[] getFileBlockLocations(final Path p, final long offset, + final long length) throws IOException { + // read ops incremented in getFileStatus + FileStatus fileStatus = getFileStatus(p); + return getFileBlockLocations(fileStatus, offset, length); + } + + /** + * Get replication. + * + * @param src file name + * @return file replication + * @deprecated Use getFileStatus() instead + */ + @Deprecated + public short getReplication(Path src) { + return ADL_REPLICATION_FACTOR; + } + + private Path makeAbsolute(Path path) { + return path.isAbsolute() ? path : new Path(this.workingDirectory, path); + } + + private static String getNonEmptyVal(Configuration conf, String key) { + String value = conf.get(key); + if (StringUtils.isEmpty(value)) { + throw new IllegalArgumentException( + "No value for " + key + " found in conf file."); + } + return value; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsInputStream.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsInputStream.java new file mode 100644 index 0000000..5248cbf --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsInputStream.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.adl; + +import com.microsoft.azure.datalake.store.ADLFileInputStream; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; + +import java.io.IOException; + +import static org.apache.hadoop.fs.adl.AdlConfKeys + .ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT; +import static org.apache.hadoop.fs.adl.AdlConfKeys + .ADL_EXPERIMENT_POSITIONAL_READ_KEY; +import static org.apache.hadoop.fs.adl.AdlConfKeys + .DEFAULT_READ_AHEAD_BUFFER_SIZE; +import static org.apache.hadoop.fs.adl.AdlConfKeys.READ_AHEAD_BUFFER_SIZE_KEY; + +/** + * Wraps {@link ADLFileInputStream} implementation. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class AdlFsInputStream extends FSInputStream { + + private final ADLFileInputStream in; + private final Statistics stat; + private final boolean enablePositionalReadExperiment; + + public AdlFsInputStream(ADLFileInputStream inputStream, Statistics statistics, + Configuration conf) throws IOException { + this.in = inputStream; + this.in.setBufferSize(conf.getInt(READ_AHEAD_BUFFER_SIZE_KEY, + DEFAULT_READ_AHEAD_BUFFER_SIZE)); + enablePositionalReadExperiment = conf + .getBoolean(ADL_EXPERIMENT_POSITIONAL_READ_KEY, + ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT); + stat = statistics; + } + + @Override + public synchronized void seek(long pos) throws IOException { + in.seek(pos); + } + + /** + * Return the current offset from the start of the file. + */ + @Override + public synchronized long getPos() throws IOException { + return in.getPos(); + } + + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public synchronized int read() throws IOException { + int ch = in.read(); + if (stat != null && ch != -1) { + stat.incrementBytesRead(1); + } + return ch; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + int numberOfByteRead = 0; + if (enablePositionalReadExperiment) { + numberOfByteRead = in.read(position, buffer, offset, length); + } else { + numberOfByteRead = super.read(position, buffer, offset, length); + } + + if (stat != null && numberOfByteRead > 0) { + stat.incrementBytesRead(numberOfByteRead); + } + return numberOfByteRead; + } + + @Override + public synchronized int read(byte[] buffer, int offset, int length) + throws IOException { + int numberOfByteRead = in.read(buffer, offset, length); + if (stat != null && numberOfByteRead > 0) { + stat.incrementBytesRead(numberOfByteRead); + } + return numberOfByteRead; + } + + /** + * This method returns the remaining bytes in the stream, rather than the + * expected Java + * interpretation of {@link java.io.InputStream#available()}, which expects + * the + * number of remaining + * bytes in the local buffer. Moreover, it caps the value returned to a + * maximum of Integer.MAX_VALUE. + * These changed behaviors are to ensure compatibility with the + * expectations of HBase WAL reader, + * which depends on available() returning the number of bytes in stream. + * + * Given all other FileSystems in the hadoop ecosystem (especially HDFS) do + * this, it is possible other + * apps other than HBase would also pick up expectation of this behavior + * based on HDFS implementation. + * Therefore keeping this quirky behavior here, to ensure compatibility. + * + * @return remaining bytes in the stream, with maximum of Integer.MAX_VALUE. + * @throws IOException If fails to get the position or file length from SDK. + */ + @Override + public synchronized int available() throws IOException { + return (int) Math.min(in.length() - in.getPos(), Integer.MAX_VALUE); + } + + @Override + public synchronized void close() throws IOException { + in.close(); + } + + @Override + public synchronized long skip(long pos) throws IOException { + return in.skip(pos); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java new file mode 100644 index 0000000..2b89fb0 --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.adl; + +import com.microsoft.azure.datalake.store.ADLFileOutputStream; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Syncable; + +import java.io.IOException; +import java.io.OutputStream; + +import static org.apache.hadoop.fs.adl.AdlConfKeys + .DEFAULT_WRITE_AHEAD_BUFFER_SIZE; +import static org.apache.hadoop.fs.adl.AdlConfKeys.WRITE_BUFFER_SIZE_KEY; + +/** + * Wraps {@link com.microsoft.azure.datalake.store.ADLFileOutputStream} + * implementation. + * + * Flush semantics. + * no-op, since some parts of hadoop ecosystem call flush(), expecting it to + * have no perf impact. In hadoop filesystems, flush() itself guarantees no + * durability: that is achieved by calling hflush() or hsync() + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class AdlFsOutputStream extends OutputStream implements Syncable { + private final ADLFileOutputStream out; + + public AdlFsOutputStream(ADLFileOutputStream out, Configuration configuration) + throws IOException { + this.out = out; + out.setBufferSize(configuration + .getInt(WRITE_BUFFER_SIZE_KEY, DEFAULT_WRITE_AHEAD_BUFFER_SIZE)); + } + + @Override + public synchronized void write(int b) throws IOException { + out.write(b); + } + + @Override + public synchronized void write(byte[] b, int off, int len) + throws IOException { + out.write(b, off, len); + } + + @Override + public synchronized void close() throws IOException { + out.close(); + } + + public synchronized void sync() throws IOException { + out.flush(); + } + + public synchronized void hflush() throws IOException { + out.flush(); + } + + public synchronized void hsync() throws IOException { + out.flush(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlPermission.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlPermission.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlPermission.java new file mode 100644 index 0000000..af3342a --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlPermission.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.fs.adl; + +import org.apache.hadoop.fs.permission.FsPermission; + +/** + * Hadoop shell command -getfacl does not invoke getAclStatus if FsPermission + * from getFileStatus has not set ACL bit to true. By default getAclBit returns + * false. + * + * Provision to make additional call to invoke getAclStatus would be redundant + * when adls is running as additional FS. To avoid this redundancy, provided + * configuration to return true/false on getAclBit. + */ +class AdlPermission extends FsPermission { + private final boolean aclBit; + + AdlPermission(boolean aclBitStatus, Short aShort) { + super(aShort); + this.aclBit = aclBitStatus; + } + + /** + * Returns true if "adl.feature.support.acl.bit" configuration is set to + * true. + * + * If configuration is not set then default value is true. + * + * @return If configuration is not set then default value is true. + */ + public boolean getAclBit() { + return aclBit; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FsPermission) { + FsPermission that = (FsPermission) obj; + return this.getUserAction() == that.getUserAction() + && this.getGroupAction() == that.getGroupAction() + && this.getOtherAction() == that.getOtherAction() + && this.getStickyBit() == that.getStickyBit(); + } + return false; + } + + @Override + public int hashCode() { + return toShort(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/SdkTokenProviderAdapter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/SdkTokenProviderAdapter.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/SdkTokenProviderAdapter.java new file mode 100644 index 0000000..7b107ae --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/SdkTokenProviderAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.adl; + +import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider; +import com.microsoft.azure.datalake.store.oauth2.AzureADToken; +import org.apache.hadoop.fs.adl.oauth2.AzureADTokenProvider; + +import java.io.IOException; + +final class SdkTokenProviderAdapter extends AccessTokenProvider { + + private AzureADTokenProvider tokenProvider; + + SdkTokenProviderAdapter(AzureADTokenProvider tp) { + this.tokenProvider = tp; + } + + protected AzureADToken refreshToken() throws IOException { + AzureADToken azureADToken = new AzureADToken(); + azureADToken.accessToken = tokenProvider.getAccessToken(); + azureADToken.expiry = tokenProvider.getExpiryTime(); + return azureADToken; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/TokenProviderType.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/TokenProviderType.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/TokenProviderType.java new file mode 100644 index 0000000..9fd4f4f --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/TokenProviderType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.adl; + +enum TokenProviderType { + RefreshToken, + ClientCredential, + Custom +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/AzureADTokenProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/AzureADTokenProvider.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/AzureADTokenProvider.java new file mode 100644 index 0000000..a0b3922 --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/AzureADTokenProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.adl.oauth2; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.util.Date; + +/** + * Provide an Azure Active Directory supported + * OAuth2 access token to be used to authenticate REST calls against Azure data + * lake file system {@link org.apache.hadoop.fs.adl.AdlFileSystem}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class AzureADTokenProvider { + + /** + * Initialize with supported configuration. This method is invoked when the + * {@link org.apache.hadoop.fs.adl.AdlFileSystem#initialize + * (URI, Configuration)} method is invoked. + * + * @param configuration Configuration object + * @throws IOException if instance can not be configured. + */ + public abstract void initialize(Configuration configuration) + throws IOException; + + /** + * Obtain the access token that should be added to https connection's header. + * Will be called depending upon {@link #getExpiryTime()} expiry time is set, + * so implementations should be performant. Implementations are responsible + * for any refreshing of the token. + * + * @return String containing the access token + * @throws IOException if there is an error fetching the token + */ + public abstract String getAccessToken() throws IOException; + + /** + * Obtain expiry time of the token. If implementation is performant enough to + * maintain expiry and expect {@link #getAccessToken()} call for every + * connection then safe to return current or past time. + * + * However recommended to use the token expiry time received from Azure Active + * Directory. + * + * @return Date to expire access token retrieved from AAD. + */ + public abstract Date getExpiryTime(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java deleted file mode 100644 index b7f3b00..0000000 --- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.hadoop.fs.adl.oauth2; - -import java.io.IOException; -import java.util.Map; -import java.util.LinkedHashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.web.oauth2.AccessTokenProvider; -import org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider; -import org.apache.hadoop.hdfs.web.oauth2.PrivateCachedRefreshTokenBasedAccessTokenProvider; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY; -import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY; - -/** - * Share refresh tokens across all ADLS instances with a common client ID. The - * {@link AccessTokenProvider} can be shared across multiple instances, - * amortizing the cost of refreshing tokens. - */ -public class CachedRefreshTokenBasedAccessTokenProvider - extends PrivateCachedRefreshTokenBasedAccessTokenProvider { - - public static final String FORCE_REFRESH = "adl.force.token.refresh"; - - private static final Logger LOG = - LoggerFactory.getLogger(CachedRefreshTokenBasedAccessTokenProvider.class); - - /** Limit size of provider cache. */ - static final int MAX_PROVIDERS = 10; - @SuppressWarnings("serial") - private static final Map<String, AccessTokenProvider> CACHE = - new LinkedHashMap<String, AccessTokenProvider>() { - @Override - public boolean removeEldestEntry( - Map.Entry<String, AccessTokenProvider> e) { - return size() > MAX_PROVIDERS; - } - }; - - private AccessTokenProvider instance = null; - - /** - * Create handle for cached instance. - */ - public CachedRefreshTokenBasedAccessTokenProvider() { - } - - /** - * Gets the access token from internally cached - * ConfRefreshTokenBasedAccessTokenProvider instance. - * - * @return Valid OAuth2 access token for the user. - * @throws IOException when system error, internal server error or user error - */ - @Override - public synchronized String getAccessToken() throws IOException { - return instance.getAccessToken(); - } - - /** - * @return A cached Configuration consistent with the parameters of this - * instance. - */ - @Override - public synchronized Configuration getConf() { - return instance.getConf(); - } - - /** - * Configure cached instance. Note that the Configuration instance returned - * from subsequent calls to {@link #getConf() getConf} may be from a - * previous, cached entry. - * @param conf Configuration instance - */ - @Override - public synchronized void setConf(Configuration conf) { - String id = conf.get(OAUTH_CLIENT_ID_KEY); - if (null == id) { - throw new IllegalArgumentException("Missing client ID"); - } - synchronized (CACHE) { - instance = CACHE.get(id); - if (null == instance - || conf.getBoolean(FORCE_REFRESH, false) - || replace(instance, conf)) { - instance = newInstance(); - // clone configuration - instance.setConf(new Configuration(conf)); - CACHE.put(id, instance); - LOG.debug("Created new client {}", id); - } - } - } - - AccessTokenProvider newInstance() { - return new ConfRefreshTokenBasedAccessTokenProvider(); - } - - private static boolean replace(AccessTokenProvider cached, Configuration c2) { - // ConfRefreshTokenBasedAccessTokenProvider::setConf asserts !null - final Configuration c1 = cached.getConf(); - for (String key : new String[] { - OAUTH_REFRESH_TOKEN_KEY, OAUTH_REFRESH_URL_KEY }) { - if (!c1.get(key).equals(c2.get(key))) { - // replace cached instance for this clientID - return true; - } - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java index b444984..1613941 100644 --- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java @@ -20,4 +20,4 @@ /** * public interface to expose OAuth2 authentication related features. */ -package org.apache.hadoop.fs.adl.oauth2; +package org.apache.hadoop.fs.adl.oauth2; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java index 98e6a77..456eebc 100644 --- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java @@ -20,4 +20,4 @@ /** * Supporting classes for metrics instrumentation. */ -package org.apache.hadoop.fs.adl; +package org.apache.hadoop.fs.adl; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java deleted file mode 100644 index a7f932f..0000000 --- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.hadoop.hdfs.web; - -/** - * Constants. - */ -public final class ADLConfKeys { - public static final String - ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN = - "adl.feature.override.readahead.max.concurrent.connection"; - public static final int - ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT = 2; - public static final String ADL_WEBSDK_VERSION_KEY = "ADLFeatureSet"; - static final String ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER = - "adl.debug.override.localuserasfileowner"; - static final boolean ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT = false; - static final String ADL_FEATURE_REDIRECT_OFF = - "adl.feature.override.redirection.off"; - static final boolean ADL_FEATURE_REDIRECT_OFF_DEFAULT = true; - static final String ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED = - "adl.feature.override.getblocklocation.locally.bundled"; - static final boolean ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT - = true; - static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD = - "adl.feature.override.readahead"; - static final boolean ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT = - true; - static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE = - "adl.feature.override.readahead.max.buffersize"; - - static final int KB = 1024; - static final int MB = KB * KB; - static final int DEFAULT_BLOCK_SIZE = 4 * MB; - static final int DEFAULT_EXTENT_SIZE = 256 * MB; - static final int DEFAULT_TIMEOUT_IN_SECONDS = 120; - static final int - ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT = - 8 * MB; - - private ADLConfKeys() { - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java deleted file mode 100644 index 350c6e7..0000000 --- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.hadoop.hdfs.web; - -/** - * Responsible for holding buffered data in the process. Hold only 1 and only - * 1 buffer block in the memory. Buffer block - * information is for the given file and the offset from the which the block - * is fetched. Across the webhdfs instances if - * same buffer block has been used then backend trip is avoided. Buffer block - * is certainly important since ADL fetches - * large amount of data (Default is 4MB however can be configured through - * core-site.xml) from the backend. - * Observation is in case of ORC/Avro kind of compressed file, buffer block - * does not avoid few backend calls across - * webhdfs - * instances. - */ -final class BufferManager { - private static final BufferManager BUFFER_MANAGER_INSTANCE = new - BufferManager(); - private static Object lock = new Object(); - private Buffer buffer = null; - private String fileName; - - /** - * Constructor. - */ - private BufferManager() { - } - - public static Object getLock() { - return lock; - } - - public static BufferManager getInstance() { - return BUFFER_MANAGER_INSTANCE; - } - - /** - * Validate if the current buffer block is of given stream. - * - * @param path ADL stream path - * @param offset Stream offset that caller is interested in - * @return True if the buffer block is available otherwise false - */ - boolean hasValidDataForOffset(String path, long offset) { - if (this.fileName == null) { - return false; - } - - if (!this.fileName.equals(path)) { - return false; - } - - if (buffer == null) { - return false; - } - - if ((offset < buffer.offset) || (offset >= (buffer.offset - + buffer.data.length))) { - return false; - } - - return true; - } - - /** - * Clean buffer block. - */ - void clear() { - buffer = null; - } - - /** - * Validate if the current buffer block is of given stream. For now partial - * data available is not supported. - * Data must be available exactly or within the range of offset and size - * passed as parameter. - * - * @param path Stream path - * @param offset Offset of the stream - * @param size Size of the data from the offset of the stream caller - * interested in - * @return True if the data is available from the given offset and of the - * size caller is interested in. - */ - boolean hasData(String path, long offset, int size) { - - if (!hasValidDataForOffset(path, offset)) { - return false; - } - - if ((size + offset) > (buffer.data.length + buffer.offset)) { - return false; - } - return true; - } - - /** - * Return the buffer block from the requested offset. It is caller - * responsibility to check if the buffer block is - * of there interest and offset is valid. - * - * @param data Byte array to be filed from the buffer block - * @param offset Data to be fetched from the offset. - */ - void get(byte[] data, long offset) { - System.arraycopy(buffer.data, (int) (offset - buffer.offset), data, 0, - data.length); - } - - /** - * Create new empty buffer block of the given size. - * - * @param len Size of the buffer block. - * @return Empty byte array. - */ - byte[] getEmpty(int len) { - return new byte[len]; - } - - /** - * This function allows caller to specify new buffer block for the stream - * which is pulled from the backend. - * - * @param data Buffer - * @param path Stream path to which buffer belongs to - * @param offset Stream offset where buffer start with - */ - void add(byte[] data, String path, long offset) { - if (data == null) { - return; - } - - buffer = new Buffer(); - buffer.data = data; - buffer.offset = offset; - this.fileName = path; - } - - /** - * @return Size of the buffer. - */ - int getBufferSize() { - return buffer.data.length; - } - - /** - * @return Stream offset where buffer start with - */ - long getBufferOffset() { - return buffer.offset; - } - - /** - * Buffer container. - */ - static class Buffer { - private byte[] data; - private long offset; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org