HADOOP-13037. Refactor Azure Data Lake Store as an independent FileSystem. 
Contributed by Vishwajeet Dusane


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c61ad24
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c61ad24
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c61ad24

Branch: refs/heads/YARN-4752
Commit: 5c61ad24887f76dfc5a5935b2c5dceb6bfd99417
Parents: a9ad5d6
Author: Chris Douglas <cdoug...@apache.org>
Authored: Thu Nov 10 21:55:55 2016 -0800
Committer: Chris Douglas <cdoug...@apache.org>
Committed: Fri Nov 11 11:15:07 2016 -0800

----------------------------------------------------------------------
 .../src/main/resources/core-default.xml         |   48 +-
 .../dev-support/findbugs-exclude.xml            |   24 -
 hadoop-tools/hadoop-azure-datalake/pom.xml      |   47 +-
 .../main/java/org/apache/hadoop/fs/adl/Adl.java |    4 +
 .../org/apache/hadoop/fs/adl/AdlConfKeys.java   |   92 ++
 .../org/apache/hadoop/fs/adl/AdlFileSystem.java |  892 +++++++++++++-
 .../apache/hadoop/fs/adl/AdlFsInputStream.java  |  149 +++
 .../apache/hadoop/fs/adl/AdlFsOutputStream.java |   82 ++
 .../org/apache/hadoop/fs/adl/AdlPermission.java |   69 ++
 .../hadoop/fs/adl/SdkTokenProviderAdapter.java  |   41 +
 .../apache/hadoop/fs/adl/TokenProviderType.java |   25 +
 .../fs/adl/oauth2/AzureADTokenProvider.java     |   70 ++
 ...hedRefreshTokenBasedAccessTokenProvider.java |  135 ---
 .../hadoop/fs/adl/oauth2/package-info.java      |    2 +-
 .../org/apache/hadoop/fs/adl/package-info.java  |    2 +-
 .../org/apache/hadoop/hdfs/web/ADLConfKeys.java |   61 -
 .../apache/hadoop/hdfs/web/BufferManager.java   |  180 ---
 .../web/PrivateAzureDataLakeFileSystem.java     | 1116 ------------------
 ...ClientCredentialBasedAccesTokenProvider.java |  156 ---
 ...hedRefreshTokenBasedAccessTokenProvider.java |   37 -
 .../hadoop/hdfs/web/oauth2/package-info.java    |   24 -
 .../apache/hadoop/hdfs/web/package-info.java    |   25 -
 .../hadoop/hdfs/web/resources/ADLFlush.java     |   49 -
 .../hdfs/web/resources/ADLGetOpParam.java       |   96 --
 .../hdfs/web/resources/ADLPostOpParam.java      |   97 --
 .../hdfs/web/resources/ADLPutOpParam.java       |   94 --
 .../hdfs/web/resources/ADLVersionInfo.java      |   51 -
 .../web/resources/AppendADLNoRedirectParam.java |   45 -
 .../web/resources/CreateADLNoRedirectParam.java |   44 -
 .../hadoop/hdfs/web/resources/LeaseParam.java   |   53 -
 .../web/resources/ReadADLNoRedirectParam.java   |   44 -
 .../hadoop/hdfs/web/resources/package-info.java |   27 -
 .../META-INF/org.apache.hadoop.fs.FileSystem    |   16 +
 .../src/site/markdown/index.md                  |  126 +-
 .../apache/hadoop/fs/adl/AdlMockWebServer.java  |   99 ++
 .../apache/hadoop/fs/adl/TestACLFeatures.java   |  262 ++++
 .../hadoop/fs/adl/TestADLResponseData.java      |   67 +-
 .../org/apache/hadoop/fs/adl/TestAdlRead.java   |  196 +++
 .../hadoop/fs/adl/TestAzureADTokenProvider.java |  133 +++
 .../adl/TestConcurrentDataReadOperations.java   |  299 +++++
 .../hadoop/fs/adl/TestCustomTokenProvider.java  |  136 +++
 .../apache/hadoop/fs/adl/TestGetFileStatus.java |   33 +-
 .../apache/hadoop/fs/adl/TestListStatus.java    |   34 +-
 .../fs/adl/TestRelativePathFormation.java       |   61 +
 .../fs/adl/TestValidateConfiguration.java       |  103 ++
 .../hadoop/fs/adl/TestableAdlFileSystem.java    |    3 +-
 .../fs/adl/common/CustomMockTokenProvider.java  |   61 +
 .../hadoop/fs/adl/common/ExpectedResponse.java  |   71 ++
 .../hadoop/fs/adl/common/Parallelized.java      |   60 +
 .../hadoop/fs/adl/common/TestDataForRead.java   |  122 ++
 .../fs/adl/live/AdlStorageConfiguration.java    |   50 +-
 .../hadoop/fs/adl/live/AdlStorageContract.java  |   19 +-
 .../live/TestAdlDifferentSizeWritesLive.java    |    2 +-
 .../adl/live/TestAdlFileSystemContractLive.java |    2 +-
 .../hadoop/fs/adl/live/TestAdlReadLive.java     |  342 ------
 ...estAdlWebHdfsFileContextCreateMkdirLive.java |   79 --
 ...AdlWebHdfsFileContextMainOperationsLive.java |  104 --
 ...hedRefreshTokenBasedAccessTokenProvider.java |  149 ---
 .../hadoop/fs/common/AdlMockWebServer.java      |  116 --
 .../hadoop/fs/common/ExpectedResponse.java      |   72 --
 .../hadoop/fs/common/TestDataForRead.java       |  120 --
 .../org/apache/hadoop/hdfs/web/TestAdlRead.java |  205 ----
 .../web/TestConcurrentDataReadOperations.java   |  306 -----
 .../hdfs/web/TestConfigurationSetting.java      |  138 ---
 .../hdfs/web/TestSplitSizeCalculation.java      |  123 --
 .../src/test/resources/adls.xml                 |   11 +-
 .../test/resources/contract-test-options.xml    |   62 +-
 .../src/test/resources/log4j.properties         |   30 +
 68 files changed, 3277 insertions(+), 4416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 327acfa..05004c1 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2382,53 +2382,6 @@
 
 
   <!-- Azure Data Lake File System Configurations -->
-
-  <property>
-    <name>adl.feature.override.readahead</name>
-    <value>true</value>
-    <description>
-      Enables read aheads in the ADL client, the feature is used to
-      improve read throughput.
-      This works in conjunction with the value set in
-      adl.feature.override.readahead.max.buffersize.
-      When set to false the read ahead feature is turned off.
-      Default : True if not configured.
-    </description>
-  </property>
-
-  <property>
-    <name>adl.feature.override.readahead.max.buffersize</name>
-    <value>8388608</value>
-    <description>
-      Define maximum buffer size to cache read ahead data, this is
-      allocated per process to
-      cache read ahead data. Applicable only when
-      adl.feature.override.readahead is set to true.
-      Default : 8388608 Byte i.e. 8MB if not configured.
-    </description>
-  </property>
-
-  <property>
-    <name>adl.feature.override.readahead.max.concurrent.connection</name>
-    <value>2</value>
-    <description>
-      Define maximum concurrent connection can be established to
-      read ahead. If the data size is less than 4MB then only 1 read n/w
-      connection
-      is set. If the data size is less than 4MB but less than 8MB then 2 read
-      n/w connection
-      is set. Data greater than 8MB then value set under the property would
-      take
-      effect. Applicable only when adl.feature.override.readahead is set
-      to true and buffer size is greater than 8MB.
-      It is recommended to reset this property if the
-      adl.feature.override.readahead.max.buffersize
-      is less than 8MB to gain performance. Application has to consider
-      throttling limit for the account as well before configuring large
-      buffer size.
-    </description>
-  </property>
-
   <property>
     <name>fs.adl.impl</name>
     <value>org.apache.hadoop.fs.adl.AdlFileSystem</value>
@@ -2438,6 +2391,7 @@
     <name>fs.AbstractFileSystem.adl.impl</name>
     <value>org.apache.hadoop.fs.adl.Adl</value>
   </property>
+  <!-- Azure Data Lake File System Configurations Ends Here-->
 
   <property>
     <name>hadoop.caller.context.enabled</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml 
b/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
deleted file mode 100644
index 4fd36ef..0000000
--- a/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<FindBugsFilter>
-    <!-- Buffer object is accessed withing trusted code and intentionally 
assigned instead of array copy -->
-    <Match>
-        <Class 
name="org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem$BatchAppendOutputStream$CommitTask"/>
-        <Bug pattern="EI_EXPOSE_REP2"/>
-        <Priority value="2"/>
-    </Match>
-</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/pom.xml 
b/hadoop-tools/hadoop-azure-datalake/pom.xml
index 9a15b04..eba0979 100644
--- a/hadoop-tools/hadoop-azure-datalake/pom.xml
+++ b/hadoop-tools/hadoop-azure-datalake/pom.xml
@@ -35,22 +35,17 @@
     <file.encoding>UTF-8</file.encoding>
     <downloadSources>true</downloadSources>
   </properties>
-
+  <repositories>
+    <repository>
+      <id>snapshots-repo</id>
+      <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+      <releases><enabled>false</enabled></releases>
+      <snapshots><enabled>true</enabled></snapshots>
+    </repository>
+  </repositories>
   <build>
     <plugins>
       <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
-        <configuration>
-          <findbugsXmlOutput>true</findbugsXmlOutput>
-          <xmlOutput>true</xmlOutput>
-          <excludeFilterFile>
-            ${basedir}/dev-support/findbugs-exclude.xml
-          </excludeFilterFile>
-          <effort>Max</effort>
-        </configuration>
-      </plugin>
-      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-project-info-reports-plugin</artifactId>
 
@@ -130,20 +125,13 @@
   </build>
 
   <dependencies>
+    <!-- SDK Dependency -->
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-      <exclusions>
-        <exclusion>
-          <artifactId>javax.servlet-api</artifactId>
-          <groupId>javax.servlet</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs-client</artifactId>
+      <groupId>com.microsoft.azure</groupId>
+      <artifactId>azure-data-lake-store-sdk</artifactId>
+      <version>2.0.4-SNAPSHOT</version>
     </dependency>
+    <!--  ENDS HERE-->
     <dependency>
     <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
@@ -159,11 +147,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>com.eclipsesource.minimal-json</groupId>
       <artifactId>minimal-json</artifactId>
       <version>0.9.1</version>
@@ -181,9 +164,5 @@
       <version>2.4.0</version>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
index 4642d6b..7ec04cf 100644
--- 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
@@ -19,6 +19,8 @@
 
 package org.apache.hadoop.fs.adl;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DelegateToFileSystem;
 
@@ -29,6 +31,8 @@ import java.net.URISyntaxException;
 /**
  * Expose adl:// scheme to access ADL file system.
  */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
 public class Adl extends DelegateToFileSystem {
 
   Adl(URI theUri, Configuration conf) throws IOException, URISyntaxException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlConfKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlConfKeys.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlConfKeys.java
new file mode 100644
index 0000000..21120df
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlConfKeys.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.adl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Constants.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class AdlConfKeys {
+  // OAuth2 Common Configuration
+  public static final String AZURE_AD_REFRESH_URL_KEY = "dfs.adls.oauth2"
+      + ".refresh.url";
+
+  // optional when provider type is refresh or client id.
+  public static final String AZURE_AD_TOKEN_PROVIDER_CLASS_KEY =
+      "dfs.adls.oauth2.access.token.provider";
+  public static final String AZURE_AD_CLIENT_ID_KEY =
+      "dfs.adls.oauth2.client.id";
+  public static final String AZURE_AD_TOKEN_PROVIDER_TYPE_KEY =
+      "dfs.adls.oauth2.access.token.provider.type";
+
+  // OAuth Refresh Token Configuration
+  public static final String AZURE_AD_REFRESH_TOKEN_KEY =
+      "dfs.adls.oauth2.refresh.token";
+
+  public static final String TOKEN_PROVIDER_TYPE_REFRESH_TOKEN = 
"RefreshToken";
+  // OAuth Client Cred Token Configuration
+  public static final String AZURE_AD_CLIENT_SECRET_KEY =
+      "dfs.adls.oauth2.credential";
+  public static final String TOKEN_PROVIDER_TYPE_CLIENT_CRED =
+      "ClientCredential";
+
+  public static final String READ_AHEAD_BUFFER_SIZE_KEY =
+      "adl.feature.client.cache.readahead";
+
+  public static final String WRITE_BUFFER_SIZE_KEY =
+      "adl.feature.client.cache.drop.behind.writes";
+  static final String SECURE_TRANSPORT_SCHEME = "https";
+  static final String INSECURE_TRANSPORT_SCHEME = "http";
+  static final String ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER =
+      "adl.debug.override.localuserasfileowner";
+
+  static final boolean ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT = false;
+  static final long ADL_BLOCK_SIZE = 256 * 1024 * 1024;
+  static final int ADL_REPLICATION_FACTOR = 1;
+  static final String ADL_HADOOP_CLIENT_NAME = "hadoop-azure-datalake-";
+  static final String ADL_HADOOP_CLIENT_VERSION =
+      "2.0.0-SNAPSHOT";
+  static final String ADL_EVENTS_TRACKING_SOURCE = 
"adl.events.tracking.source";
+  static final String ADL_EVENTS_TRACKING_CLUSTERNAME =
+      "adl.events.tracking.clustername";
+
+  static final String ADL_EVENTS_TRACKING_CLUSTERTYPE =
+      "adl.events.tracking.clustertype";
+  static final int DEFAULT_READ_AHEAD_BUFFER_SIZE = 4 * 1024 * 1024;
+  static final int DEFAULT_WRITE_AHEAD_BUFFER_SIZE = 4 * 1024 * 1024;
+
+  static final String LATENCY_TRACKER_KEY =
+      "adl.dfs.enable.client.latency.tracker";
+  static final boolean LATENCY_TRACKER_DEFAULT = true;
+
+  static final String ADL_EXPERIMENT_POSITIONAL_READ_KEY =
+      "adl.feature.experiment.positional.read.enable";
+  static final boolean ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT = true;
+
+  static final String ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION =
+      "adl.feature.support.acl.bit";
+  static final boolean ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION_DEFAULT = true;
+
+  private AdlConfKeys() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
index 11e1e0b..9083afc 100644
--- 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
@@ -19,23 +19,905 @@
 
 package org.apache.hadoop.fs.adl;
 
-import org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.azure.datalake.store.ADLStoreClient;
+import com.microsoft.azure.datalake.store.ADLStoreOptions;
+import com.microsoft.azure.datalake.store.DirectoryEntry;
+import com.microsoft.azure.datalake.store.DirectoryEntryType;
+import com.microsoft.azure.datalake.store.IfExists;
+import com.microsoft.azure.datalake.store.LatencyTracker;
+import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider;
+import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;
+import 
com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.ContentSummary.Builder;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.adl.oauth2.AzureADTokenProvider;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.VersionInfo;
+import static org.apache.hadoop.fs.adl.AdlConfKeys.*;
 
 /**
- * Expose adl:// scheme to access ADL file system.
+ * A FileSystem to access Azure Data Lake Store.
  */
-public class AdlFileSystem extends PrivateAzureDataLakeFileSystem {
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AdlFileSystem extends FileSystem {
+  static final String SCHEME = "adl";
+  static final int DEFAULT_PORT = 443;
+  private URI uri;
+  private String userName;
+  private boolean overrideOwner;
+  private ADLStoreClient adlClient;
+  private Path workingDirectory;
+  private boolean aclBitStatus;
 
-  public static final String SCHEME = "adl";
-  public static final int DEFAULT_PORT = 443;
+  // retained for tests
+  private AccessTokenProvider tokenProvider;
+  private AzureADTokenProvider azureTokenProvider;
 
   @Override
   public String getScheme() {
     return SCHEME;
   }
 
+  public URI getUri() {
+    return uri;
+  }
+
   @Override
   public int getDefaultPort() {
     return DEFAULT_PORT;
   }
+
+  @Override
+  public boolean supportsSymlinks() {
+    return false;
+  }
+
+  /**
+   * Called after a new FileSystem instance is constructed.
+   *
+   * @param storeUri a uri whose authority section names the host, port, etc.
+   *                 for this FileSystem
+   * @param conf     the configuration
+   */
+  @Override
+  public void initialize(URI storeUri, Configuration conf) throws IOException {
+    super.initialize(storeUri, conf);
+    this.setConf(conf);
+    this.uri = URI
+        .create(storeUri.getScheme() + "://" + storeUri.getAuthority());
+
+    try {
+      userName = UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (IOException e) {
+      userName = "hadoop";
+    }
+
+    this.setWorkingDirectory(getHomeDirectory());
+
+    overrideOwner = 
getConf().getBoolean(ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER,
+        ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
+
+    aclBitStatus = conf.getBoolean(ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION,
+        ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION_DEFAULT);
+
+    String accountFQDN = null;
+    String mountPoint = null;
+    String hostname = storeUri.getHost();
+    if (!hostname.contains(".") && !hostname.equalsIgnoreCase(
+        "localhost")) {  // this is a symbolic name. Resolve it.
+      String hostNameProperty = "dfs.adls." + hostname + ".hostname";
+      String mountPointProperty = "dfs.adls." + hostname + ".mountpoint";
+      accountFQDN = getNonEmptyVal(conf, hostNameProperty);
+      mountPoint = getNonEmptyVal(conf, mountPointProperty);
+    } else {
+      accountFQDN = hostname;
+    }
+
+    if (storeUri.getPort() > 0) {
+      accountFQDN = accountFQDN + ":" + storeUri.getPort();
+    }
+
+    adlClient = ADLStoreClient
+        .createClient(accountFQDN, getAccessTokenProvider(conf));
+
+    ADLStoreOptions options = new ADLStoreOptions();
+    options.enableThrowingRemoteExceptions();
+
+    if (getTransportScheme().equalsIgnoreCase(INSECURE_TRANSPORT_SCHEME)) {
+      options.setInsecureTransport();
+    }
+
+    if (mountPoint != null) {
+      options.setFilePathPrefix(mountPoint);
+    }
+
+    String clusterName = conf.get(ADL_EVENTS_TRACKING_CLUSTERNAME, "UNKNOWN");
+    String clusterType = conf.get(ADL_EVENTS_TRACKING_CLUSTERTYPE, "UNKNOWN");
+
+    String clientVersion = ADL_HADOOP_CLIENT_NAME + (StringUtils
+        .isEmpty(VersionInfo.getVersion().trim()) ?
+        ADL_HADOOP_CLIENT_VERSION.trim() :
+        VersionInfo.getVersion().trim());
+    options.setUserAgentSuffix(clientVersion + "/" +
+        VersionInfo.getVersion().trim() + "/" + clusterName + "/"
+        + clusterType);
+
+    adlClient.setOptions(options);
+
+    boolean trackLatency = conf
+        .getBoolean(LATENCY_TRACKER_KEY, LATENCY_TRACKER_DEFAULT);
+    if (!trackLatency) {
+      LatencyTracker.disable();
+    }
+  }
+
+  /**
+   * This method is provided for convenience for derived classes to define
+   * custom {@link AzureADTokenProvider} instance.
+   *
+   * In order to ensure secure hadoop infrastructure and user context for which
+   * respective {@link AdlFileSystem} instance is initialized,
+   * Loading {@link AzureADTokenProvider} is not sufficient.
+   *
+   * The order of loading {@link AzureADTokenProvider} is to first invoke
+   * {@link #getCustomAccessTokenProvider(Configuration)}, If method return 
null
+   * which means no implementation provided by derived classes, then
+   * configuration object is loaded to retrieve token configuration as 
specified
+   * is documentation.
+   *
+   * Custom token management takes the higher precedence during initialization.
+   *
+   * @param conf Configuration object
+   * @return null if the no custom {@link AzureADTokenProvider} token 
management
+   * is specified.
+   * @throws IOException if failed to initialize token provider.
+   */
+  protected synchronized AzureADTokenProvider getCustomAccessTokenProvider(
+      Configuration conf) throws IOException {
+    String className = getNonEmptyVal(conf, AZURE_AD_TOKEN_PROVIDER_CLASS_KEY);
+
+    Class<? extends AzureADTokenProvider> azureADTokenProviderClass =
+        conf.getClass(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY, null,
+            AzureADTokenProvider.class);
+    if (azureADTokenProviderClass == null) {
+      throw new IllegalArgumentException(
+          "Configuration  " + className + " " + "not defined/accessible.");
+    }
+
+    azureTokenProvider = ReflectionUtils
+        .newInstance(azureADTokenProviderClass, conf);
+    if (azureTokenProvider == null) {
+      throw new IllegalArgumentException("Failed to initialize " + className);
+    }
+
+    azureTokenProvider.initialize(conf);
+    return azureTokenProvider;
+  }
+
+  private AccessTokenProvider getAccessTokenProvider(Configuration conf)
+      throws IOException {
+    TokenProviderType type = conf.getEnum(
+        AdlConfKeys.AZURE_AD_TOKEN_PROVIDER_TYPE_KEY, 
TokenProviderType.Custom);
+
+    switch (type) {
+    case RefreshToken:
+      tokenProvider = getConfRefreshTokenBasedTokenProvider(conf);
+      break;
+    case ClientCredential:
+      tokenProvider = getConfCredentialBasedTokenProvider(conf);
+      break;
+    case Custom:
+    default:
+      AzureADTokenProvider azureADTokenProvider = getCustomAccessTokenProvider(
+          conf);
+      tokenProvider = new SdkTokenProviderAdapter(azureADTokenProvider);
+      break;
+    }
+
+    return tokenProvider;
+  }
+
+  private AccessTokenProvider getConfCredentialBasedTokenProvider(
+      Configuration conf) {
+    String clientId = getNonEmptyVal(conf, AZURE_AD_CLIENT_ID_KEY);
+    String refreshUrl = getNonEmptyVal(conf, AZURE_AD_REFRESH_URL_KEY);
+    String clientSecret = getNonEmptyVal(conf, AZURE_AD_CLIENT_SECRET_KEY);
+    return new ClientCredsTokenProvider(refreshUrl, clientId, clientSecret);
+  }
+
+  private AccessTokenProvider getConfRefreshTokenBasedTokenProvider(
+      Configuration conf) {
+    String clientId = getNonEmptyVal(conf, AZURE_AD_CLIENT_ID_KEY);
+    String refreshToken = getNonEmptyVal(conf, AZURE_AD_REFRESH_TOKEN_KEY);
+    return new RefreshTokenBasedTokenProvider(clientId, refreshToken);
+  }
+
+  @VisibleForTesting
+  AccessTokenProvider getTokenProvider() {
+    return tokenProvider;
+  }
+
+  @VisibleForTesting
+  AzureADTokenProvider getAzureTokenProvider() {
+    return azureTokenProvider;
+  }
+
+  /**
+   * Constructing home directory locally is fine as long as Hadoop
+   * local user name and ADL user name relationship story is not fully baked
+   * yet.
+   *
+   * @return Hadoop local user home directory.
+   */
+  @Override
+  public Path getHomeDirectory() {
+    return makeQualified(new Path("/user/" + userName));
+  }
+
+  /**
+   * Create call semantic is handled differently in case of ADL. Create
+   * semantics is translated to Create/Append
+   * semantics.
+   * 1. No dedicated connection to server.
+   * 2. Buffering is locally done, Once buffer is full or flush is invoked on
+   * the by the caller. All the pending
+   * data is pushed to ADL as APPEND operation code.
+   * 3. On close - Additional call is send to server to close the stream, and
+   * release lock from the stream.
+   *
+   * Necessity of Create/Append semantics is
+   * 1. ADL backend server does not allow idle connection for longer duration
+   * . In case of slow writer scenario,
+   * observed connection timeout/Connection reset causing occasional job
+   * failures.
+   * 2. Performance boost to jobs which are slow writer, avoided network 
latency
+   * 3. ADL equally better performing with multiple of 4MB chunk as append
+   * calls.
+   *
+   * @param f           File path
+   * @param permission  Access permission for the newly created file
+   * @param overwrite   Remove existing file and recreate new one if true
+   *                    otherwise throw error if file exist
+   * @param bufferSize  Buffer size, ADL backend does not honour
+   * @param replication Replication count, ADL backend does not honour
+   * @param blockSize   Block size, ADL backend does not honour
+   * @param progress    Progress indicator
+   * @return FSDataOutputStream OutputStream on which application can push
+   * stream of bytes
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    statistics.incrementWriteOps(1);
+    IfExists overwriteRule = overwrite ? IfExists.OVERWRITE : IfExists.FAIL;
+    return new FSDataOutputStream(new AdlFsOutputStream(adlClient
+        .createFile(toRelativeFilePath(f), overwriteRule,
+            Integer.toOctalString(applyUMask(permission).toShort()), true),
+        getConf()), this.statistics);
+  }
+
+  /**
+   * Opens an FSDataOutputStream at the indicated Path with write-progress
+   * reporting. Same as create(), except fails if parent directory doesn't
+   * already exist.
+   *
+   * @param f           the file name to open
+   * @param permission  Access permission for the newly created file
+   * @param flags       {@link CreateFlag}s to use for this stream.
+   * @param bufferSize  the size of the buffer to be used. ADL backend does
+   *                    not honour
+   * @param replication required block replication for the file. ADL backend
+   *                    does not honour
+   * @param blockSize   Block size, ADL backend does not honour
+   * @param progress    Progress indicator
+   * @throws IOException when system error, internal server error or user error
+   * @see #setPermission(Path, FsPermission)
+   * @deprecated API only for 0.20-append
+   */
+  @Deprecated
+  @Override
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+      EnumSet<CreateFlag> flags, int bufferSize, short replication,
+      long blockSize, Progressable progress) throws IOException {
+    statistics.incrementWriteOps(1);
+    IfExists overwriteRule = IfExists.FAIL;
+    for (CreateFlag flag : flags) {
+      if (flag == CreateFlag.OVERWRITE) {
+        overwriteRule = IfExists.OVERWRITE;
+        break;
+      }
+    }
+
+    return new FSDataOutputStream(new AdlFsOutputStream(adlClient
+        .createFile(toRelativeFilePath(f), overwriteRule,
+            Integer.toOctalString(applyUMask(permission).toShort()), false),
+        getConf()), this.statistics);
+  }
+
+  /**
+   * Append to an existing file (optional operation).
+   *
+   * @param f          the existing file to be appended.
+   * @param bufferSize the size of the buffer to be used. ADL backend does
+   *                   not honour
+   * @param progress   Progress indicator
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    statistics.incrementWriteOps(1);
+    return new FSDataOutputStream(
+        new AdlFsOutputStream(adlClient.getAppendStream(toRelativeFilePath(f)),
+            getConf()), this.statistics);
+  }
+
+  /**
+   * Azure data lake does not support user configuration for data replication
+   * hence not leaving system to query on
+   * azure data lake.
+   *
+   * Stub implementation
+   *
+   * @param p           Not honoured
+   * @param replication Not honoured
+   * @return True hard coded since ADL file system does not support
+   * replication configuration
+   * @throws IOException No exception would not thrown in this case however
+   *                     aligning with parent api definition.
+   */
+  @Override
+  public boolean setReplication(final Path p, final short replication)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    return true;
+  }
+
+  /**
+   * Open call semantic is handled differently in case of ADL. Instead of
+   * network stream is returned to the user,
+   * Overridden FsInputStream is returned.
+   *
+   * @param f          File path
+   * @param buffersize Buffer size, Not honoured
+   * @return FSDataInputStream InputStream on which application can read
+   * stream of bytes
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public FSDataInputStream open(final Path f, final int buffersize)
+      throws IOException {
+    statistics.incrementReadOps(1);
+    return new FSDataInputStream(
+        new AdlFsInputStream(adlClient.getReadStream(toRelativeFilePath(f)),
+            statistics, getConf()));
+  }
+
+  /**
+   * Return a file status object that represents the path.
+   *
+   * @param f The path we want information from
+   * @return a FileStatus object
+   * @throws IOException when the path does not exist or any other error;
+   *                     IOException see specific implementation
+   */
+  @Override
+  public FileStatus getFileStatus(final Path f) throws IOException {
+    statistics.incrementReadOps(1);
+    DirectoryEntry entry = adlClient.getDirectoryEntry(toRelativeFilePath(f));
+    return toFileStatus(entry, f);
+  }
+
+  /**
+   * List the statuses of the files/directories in the given path if the path 
is
+   * a directory.
+   *
+   * @param f given path
+   * @return the statuses of the files/directories in the given patch
+   * @throws IOException when the path does not exist or any other error;
+   *                     IOException see specific implementation
+   */
+  @Override
+  public FileStatus[] listStatus(final Path f) throws IOException {
+    statistics.incrementReadOps(1);
+    List<DirectoryEntry> entries =
+        adlClient.enumerateDirectory(toRelativeFilePath(f));
+    return toFileStatuses(entries, f);
+  }
+
+  /**
+   * Renames Path src to Path dst.  Can take place on local fs
+   * or remote DFS.
+   *
+   * ADLS support POSIX standard for rename operation.
+   *
+   * @param src path to be renamed
+   * @param dst new path after rename
+   * @return true if rename is successful
+   * @throws IOException on failure
+   */
+  @Override
+  public boolean rename(final Path src, final Path dst) throws IOException {
+    statistics.incrementWriteOps(1);
+    return adlClient.rename(toRelativeFilePath(src), toRelativeFilePath(dst));
+  }
+
+  @Override
+  @Deprecated
+  public void rename(final Path src, final Path dst,
+      final Options.Rename... options) throws IOException {
+    statistics.incrementWriteOps(1);
+    boolean overwrite = false;
+    for (Rename renameOption : options) {
+      if (renameOption == Rename.OVERWRITE) {
+        overwrite = true;
+        break;
+      }
+    }
+    adlClient
+        .rename(toRelativeFilePath(src), toRelativeFilePath(dst), overwrite);
+  }
+
+  /**
+   * Concat existing files together.
+   *
+   * @param trg  the path to the target destination.
+   * @param srcs the paths to the sources to use for the concatenation.
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public void concat(final Path trg, final Path[] srcs) throws IOException {
+    statistics.incrementWriteOps(1);
+    List<String> sourcesList = new ArrayList<String>();
+    for (Path entry : srcs) {
+      sourcesList.add(toRelativeFilePath(entry));
+    }
+    adlClient.concatenateFiles(toRelativeFilePath(trg), sourcesList);
+  }
+
+  /**
+   * Delete a file.
+   *
+   * @param path      the path to delete.
+   * @param recursive if path is a directory and set to
+   *                  true, the directory is deleted else throws an exception.
+   *                  In case of a file the recursive can be set to either
+   *                  true or false.
+   * @return true if delete is successful else false.
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public boolean delete(final Path path, final boolean recursive)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    return recursive ?
+        adlClient.deleteRecursive(toRelativeFilePath(path)) :
+        adlClient.delete(toRelativeFilePath(path));
+  }
+
+  /**
+   * Make the given file and all non-existent parents into
+   * directories. Has the semantics of Unix 'mkdir -p'.
+   * Existence of the directory hierarchy is not an error.
+   *
+   * @param path       path to create
+   * @param permission to apply to path
+   */
+  @Override
+  public boolean mkdirs(final Path path, final FsPermission permission)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    return adlClient.createDirectory(toRelativeFilePath(path),
+        Integer.toOctalString(applyUMask(permission).toShort()));
+  }
+
+  private FileStatus[] toFileStatuses(final List<DirectoryEntry> entries,
+      final Path parent) {
+    FileStatus[] fileStatuses = new FileStatus[entries.size()];
+    int index = 0;
+    for (DirectoryEntry entry : entries) {
+      FileStatus status = toFileStatus(entry, parent);
+      if (!(entry.name == null || entry.name == "")) {
+        status.setPath(
+            new Path(parent.makeQualified(uri, workingDirectory), entry.name));
+      }
+
+      fileStatuses[index++] = status;
+    }
+
+    return fileStatuses;
+  }
+
+  private FsPermission applyUMask(FsPermission permission) {
+    if (permission == null) {
+      permission = FsPermission.getDefault();
+    }
+    return permission.applyUMask(FsPermission.getUMask(getConf()));
+  }
+
+  private FileStatus toFileStatus(final DirectoryEntry entry, final Path f) {
+    boolean isDirectory = entry.type == DirectoryEntryType.DIRECTORY;
+    long lastModificationData = entry.lastModifiedTime.getTime();
+    long lastAccessTime = entry.lastAccessTime.getTime();
+    FsPermission permission = new AdlPermission(aclBitStatus,
+        Short.valueOf(entry.permission, 8));
+    String user = entry.user;
+    String group = entry.group;
+
+    FileStatus status;
+    if (overrideOwner) {
+      status = new FileStatus(entry.length, isDirectory, 
ADL_REPLICATION_FACTOR,
+          ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission,
+          userName, "hdfs", this.makeQualified(f));
+    } else {
+      status = new FileStatus(entry.length, isDirectory, 
ADL_REPLICATION_FACTOR,
+          ADL_BLOCK_SIZE, lastModificationData, lastAccessTime, permission,
+          user, group, this.makeQualified(f));
+    }
+
+    return status;
+  }
+
+  /**
+   * Set owner of a path (i.e. a file or a directory).
+   * The parameters owner and group cannot both be null.
+   *
+   * @param path  The path
+   * @param owner If it is null, the original username remains unchanged.
+   * @param group If it is null, the original groupname remains unchanged.
+   */
+  @Override
+  public void setOwner(final Path path, final String owner, final String group)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    adlClient.setOwner(toRelativeFilePath(path), owner, group);
+  }
+
+  /**
+   * Set permission of a path.
+   *
+   * @param path       The path
+   * @param permission Access permission
+   */
+  @Override
+  public void setPermission(final Path path, final FsPermission permission)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    adlClient.setPermission(toRelativeFilePath(path),
+        Integer.toOctalString(permission.toShort()));
+  }
+
+  /**
+   * Modifies ACL entries of files and directories.  This method can add new 
ACL
+   * entries or modify the permissions on existing ACL entries.  All existing
+   * ACL entries that are not specified in this call are retained without
+   * changes.  (Modifications are merged into the current ACL.)
+   *
+   * @param path    Path to modify
+   * @param aclSpec List of AclEntry describing modifications
+   * @throws IOException if an ACL could not be modified
+   */
+  @Override
+  public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
+        ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
+    for (AclEntry aclEntry : aclSpec) {
+      msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
+          .parseAclEntry(aclEntry.toString()));
+    }
+    adlClient.modifyAclEntries(toRelativeFilePath(path), msAclEntries);
+  }
+
+  /**
+   * Removes ACL entries from files and directories.  Other ACL entries are
+   * retained.
+   *
+   * @param path    Path to modify
+   * @param aclSpec List of AclEntry describing entries to remove
+   * @throws IOException if an ACL could not be modified
+   */
+  @Override
+  public void removeAclEntries(final Path path, final List<AclEntry> aclSpec)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
+        ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
+    for (AclEntry aclEntry : aclSpec) {
+      msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
+          .parseAclEntry(aclEntry.toString(), true));
+    }
+    adlClient.removeAclEntries(toRelativeFilePath(path), msAclEntries);
+  }
+
+  /**
+   * Removes all default ACL entries from files and directories.
+   *
+   * @param path Path to modify
+   * @throws IOException if an ACL could not be modified
+   */
+  @Override
+  public void removeDefaultAcl(final Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    adlClient.removeDefaultAcls(toRelativeFilePath(path));
+  }
+
+  /**
+   * Removes all but the base ACL entries of files and directories.  The 
entries
+   * for user, group, and others are retained for compatibility with permission
+   * bits.
+   *
+   * @param path Path to modify
+   * @throws IOException if an ACL could not be removed
+   */
+  @Override
+  public void removeAcl(final Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    adlClient.removeAllAcls(toRelativeFilePath(path));
+  }
+
+  /**
+   * Fully replaces ACL of files and directories, discarding all existing
+   * entries.
+   *
+   * @param path    Path to modify
+   * @param aclSpec List of AclEntry describing modifications, must include
+   *                entries for user, group, and others for compatibility with
+   *                permission bits.
+   * @throws IOException if an ACL could not be modified
+   */
+  @Override
+  public void setAcl(final Path path, final List<AclEntry> aclSpec)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
+        ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
+    for (AclEntry aclEntry : aclSpec) {
+      msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
+          .parseAclEntry(aclEntry.toString()));
+    }
+
+    adlClient.setAcl(toRelativeFilePath(path), msAclEntries);
+  }
+
+  /**
+   * Gets the ACL of a file or directory.
+   *
+   * @param path Path to get
+   * @return AclStatus describing the ACL of the file or directory
+   * @throws IOException if an ACL could not be read
+   */
+  @Override
+  public AclStatus getAclStatus(final Path path) throws IOException {
+    statistics.incrementReadOps(1);
+    com.microsoft.azure.datalake.store.acl.AclStatus adlStatus = adlClient
+        .getAclStatus(toRelativeFilePath(path));
+    AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
+    aclStatusBuilder.owner(adlStatus.owner);
+    aclStatusBuilder.group(adlStatus.group);
+    aclStatusBuilder.setPermission(
+        new FsPermission(Short.valueOf(adlStatus.octalPermissions, 8)));
+    aclStatusBuilder.stickyBit(adlStatus.stickyBit);
+    String aclListString = com.microsoft.azure.datalake.store.acl.AclEntry
+        .aclListToString(adlStatus.aclSpec);
+    List<AclEntry> aclEntries = AclEntry.parseAclSpec(aclListString, true);
+    aclStatusBuilder.addEntries(aclEntries);
+    return aclStatusBuilder.build();
+  }
+
+  /**
+   * Checks if the user can access a path.  The mode specifies which access
+   * checks to perform.  If the requested permissions are granted, then the
+   * method returns normally.  If access is denied, then the method throws an
+   * {@link AccessControlException}.
+   *
+   * @param path Path to check
+   * @param mode type of access to check
+   * @throws AccessControlException        if access is denied
+   * @throws java.io.FileNotFoundException if the path does not exist
+   * @throws IOException                   see specific implementation
+   */
+  @Override
+  public void access(final Path path, FsAction mode) throws IOException {
+    statistics.incrementReadOps(1);
+    if (!adlClient.checkAccess(toRelativeFilePath(path), mode.SYMBOL)) {
+      throw new AccessControlException("Access Denied : " + path.toString());
+    }
+  }
+
+  /**
+   * Return the {@link ContentSummary} of a given {@link Path}.
+   *
+   * @param f path to use
+   */
+  @Override
+  public ContentSummary getContentSummary(Path f) throws IOException {
+    statistics.incrementReadOps(1);
+    com.microsoft.azure.datalake.store.ContentSummary msSummary = adlClient
+        .getContentSummary(toRelativeFilePath(f));
+    return new Builder().length(msSummary.length)
+        
.directoryCount(msSummary.directoryCount).fileCount(msSummary.fileCount)
+        .spaceConsumed(msSummary.spaceConsumed).build();
+  }
+
+  @VisibleForTesting
+  protected String getTransportScheme() {
+    return SECURE_TRANSPORT_SCHEME;
+  }
+
+  @VisibleForTesting
+  String toRelativeFilePath(Path path) {
+    return path.makeQualified(uri, workingDirectory).toUri().getPath();
+  }
+
+  /**
+   * Get the current working directory for the given file system.
+   *
+   * @return the directory pathname
+   */
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDirectory;
+  }
+
+  /**
+   * Set the current working directory for the given file system. All relative
+   * paths will be resolved relative to it.
+   *
+   * @param dir Working directory path.
+   */
+  @Override
+  public void setWorkingDirectory(final Path dir) {
+    if (dir == null) {
+      throw new InvalidPathException("Working directory cannot be set to 
NULL");
+    }
+
+    /**
+     * Do not validate the scheme and URI of the passsed parameter. When Adls
+     * runs as additional file system, working directory set has the default
+     * file system scheme and uri.
+     *
+     * Found a problem during PIG execution in
+     * https://github.com/apache/pig/blob/branch-0
+     * .15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer
+     * /PigInputFormat.java#L235
+     * However similar problem would be present in other application so
+     * defaulting to build working directory using relative path only.
+     */
+    this.workingDirectory = this.makeAbsolute(dir);
+  }
+
+  /**
+   * Return the number of bytes that large input files should be optimally
+   * be split into to minimize i/o time.
+   *
+   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
+   */
+  @Deprecated
+  public long getDefaultBlockSize() {
+    return ADL_BLOCK_SIZE;
+  }
+
+  /**
+   * Return the number of bytes that large input files should be optimally
+   * be split into to minimize i/o time.  The given path will be used to
+   * locate the actual filesystem.  The full path does not have to exist.
+   *
+   * @param f path of file
+   * @return the default block size for the path's filesystem
+   */
+  public long getDefaultBlockSize(Path f) {
+    return getDefaultBlockSize();
+  }
+
+  /**
+   * Get the block size.
+   * @param f the filename
+   * @return the number of bytes in a block
+   */
+  /**
+   * @deprecated Use getFileStatus() instead
+   */
+  @Deprecated
+  public long getBlockSize(Path f) throws IOException {
+    return ADL_BLOCK_SIZE;
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(final FileStatus status,
+      final long offset, final long length) throws IOException {
+    if (status == null) {
+      return null;
+    }
+
+    if ((offset < 0) || (length < 0)) {
+      throw new IllegalArgumentException("Invalid start or len parameter");
+    }
+
+    if (status.getLen() < offset) {
+      return new BlockLocation[0];
+    }
+
+    final String[] name = {"localhost"};
+    final String[] host = {"localhost"};
+    long blockSize = ADL_BLOCK_SIZE;
+    int numberOfLocations =
+        (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
+    BlockLocation[] locations = new BlockLocation[numberOfLocations];
+    for (int i = 0; i < locations.length; i++) {
+      long currentOffset = offset + (i * blockSize);
+      long currentLength = Math.min(blockSize, offset + length - 
currentOffset);
+      locations[i] = new BlockLocation(name, host, currentOffset,
+          currentLength);
+    }
+
+    return locations;
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
+      final long length) throws IOException {
+    // read ops incremented in getFileStatus
+    FileStatus fileStatus = getFileStatus(p);
+    return getFileBlockLocations(fileStatus, offset, length);
+  }
+
+  /**
+   * Get replication.
+   *
+   * @param src file name
+   * @return file replication
+   * @deprecated Use getFileStatus() instead
+   */
+  @Deprecated
+  public short getReplication(Path src) {
+    return ADL_REPLICATION_FACTOR;
+  }
+
+  private Path makeAbsolute(Path path) {
+    return path.isAbsolute() ? path : new Path(this.workingDirectory, path);
+  }
+
+  private static String getNonEmptyVal(Configuration conf, String key) {
+    String value = conf.get(key);
+    if (StringUtils.isEmpty(value)) {
+      throw new IllegalArgumentException(
+          "No value for " + key + " found in conf file.");
+    }
+    return value;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsInputStream.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsInputStream.java
new file mode 100644
index 0000000..5248cbf
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsInputStream.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.adl;
+
+import com.microsoft.azure.datalake.store.ADLFileInputStream;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.adl.AdlConfKeys
+    .ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT;
+import static org.apache.hadoop.fs.adl.AdlConfKeys
+    .ADL_EXPERIMENT_POSITIONAL_READ_KEY;
+import static org.apache.hadoop.fs.adl.AdlConfKeys
+    .DEFAULT_READ_AHEAD_BUFFER_SIZE;
+import static org.apache.hadoop.fs.adl.AdlConfKeys.READ_AHEAD_BUFFER_SIZE_KEY;
+
+/**
+ * Wraps {@link ADLFileInputStream} implementation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class AdlFsInputStream extends FSInputStream {
+
+  private final ADLFileInputStream in;
+  private final Statistics stat;
+  private final boolean enablePositionalReadExperiment;
+
+  public AdlFsInputStream(ADLFileInputStream inputStream, Statistics 
statistics,
+      Configuration conf) throws IOException {
+    this.in = inputStream;
+    this.in.setBufferSize(conf.getInt(READ_AHEAD_BUFFER_SIZE_KEY,
+        DEFAULT_READ_AHEAD_BUFFER_SIZE));
+    enablePositionalReadExperiment = conf
+        .getBoolean(ADL_EXPERIMENT_POSITIONAL_READ_KEY,
+            ADL_EXPERIMENT_POSITIONAL_READ_DEFAULT);
+    stat = statistics;
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    in.seek(pos);
+  }
+
+  /**
+   * Return the current offset from the start of the file.
+   */
+  @Override
+  public synchronized long getPos() throws IOException {
+    return in.getPos();
+  }
+
+  @Override
+  public boolean seekToNewSource(long l) throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    int ch = in.read();
+    if (stat != null && ch != -1) {
+      stat.incrementBytesRead(1);
+    }
+    return ch;
+  }
+
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    int numberOfByteRead = 0;
+    if (enablePositionalReadExperiment) {
+      numberOfByteRead = in.read(position, buffer, offset, length);
+    } else {
+      numberOfByteRead = super.read(position, buffer, offset, length);
+    }
+
+    if (stat != null && numberOfByteRead > 0) {
+      stat.incrementBytesRead(numberOfByteRead);
+    }
+    return numberOfByteRead;
+  }
+
+  @Override
+  public synchronized int read(byte[] buffer, int offset, int length)
+      throws IOException {
+    int numberOfByteRead = in.read(buffer, offset, length);
+    if (stat != null && numberOfByteRead > 0) {
+      stat.incrementBytesRead(numberOfByteRead);
+    }
+    return numberOfByteRead;
+  }
+
+  /**
+   * This method returns the remaining bytes in the stream, rather than the
+   * expected Java
+   * interpretation of {@link java.io.InputStream#available()}, which expects
+   * the
+   * number of remaining
+   * bytes in the local buffer. Moreover, it caps the value returned to a
+   * maximum of Integer.MAX_VALUE.
+   * These changed behaviors are to ensure compatibility with the
+   * expectations of HBase WAL reader,
+   * which depends on available() returning the number of bytes in stream.
+   *
+   * Given all other FileSystems in the hadoop ecosystem (especially HDFS) do
+   * this, it is possible other
+   * apps other than HBase would also pick up expectation of this behavior
+   * based on HDFS implementation.
+   * Therefore keeping this quirky behavior here, to ensure compatibility.
+   *
+   * @return remaining bytes in the stream, with maximum of Integer.MAX_VALUE.
+   * @throws IOException If fails to get the position or file length from SDK.
+   */
+  @Override
+  public synchronized int available() throws IOException {
+    return (int) Math.min(in.length() - in.getPos(), Integer.MAX_VALUE);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    in.close();
+  }
+
+  @Override
+  public synchronized long skip(long pos) throws IOException {
+    return in.skip(pos);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java
new file mode 100644
index 0000000..2b89fb0
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFsOutputStream.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.adl;
+
+import com.microsoft.azure.datalake.store.ADLFileOutputStream;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Syncable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.apache.hadoop.fs.adl.AdlConfKeys
+    .DEFAULT_WRITE_AHEAD_BUFFER_SIZE;
+import static org.apache.hadoop.fs.adl.AdlConfKeys.WRITE_BUFFER_SIZE_KEY;
+
+/**
+ * Wraps {@link com.microsoft.azure.datalake.store.ADLFileOutputStream}
+ * implementation.
+ *
+ * Flush semantics.
+ * no-op, since some parts of hadoop ecosystem call flush(), expecting it to
+ * have no perf impact. In hadoop filesystems, flush() itself guarantees no
+ * durability: that is achieved by calling hflush() or hsync()
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class AdlFsOutputStream extends OutputStream implements Syncable {
+  private final ADLFileOutputStream out;
+
+  public AdlFsOutputStream(ADLFileOutputStream out, Configuration 
configuration)
+      throws IOException {
+    this.out = out;
+    out.setBufferSize(configuration
+        .getInt(WRITE_BUFFER_SIZE_KEY, DEFAULT_WRITE_AHEAD_BUFFER_SIZE));
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    out.write(b);
+  }
+
+  @Override
+  public synchronized void write(byte[] b, int off, int len)
+      throws IOException {
+    out.write(b, off, len);
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    out.close();
+  }
+
+  public synchronized void sync() throws IOException {
+    out.flush();
+  }
+
+  public synchronized void hflush() throws IOException {
+    out.flush();
+  }
+
+  public synchronized void hsync() throws IOException {
+    out.flush();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlPermission.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlPermission.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlPermission.java
new file mode 100644
index 0000000..af3342a
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlPermission.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.adl;
+
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * Hadoop shell command -getfacl does not invoke getAclStatus if FsPermission
+ * from getFileStatus has not set ACL bit to true. By default getAclBit returns
+ * false.
+ *
+ * Provision to make additional call to invoke getAclStatus would be redundant
+ * when adls is running as additional FS. To avoid this redundancy, provided
+ * configuration to return true/false on getAclBit.
+ */
+class AdlPermission extends FsPermission {
+  private final boolean aclBit;
+
+  AdlPermission(boolean aclBitStatus, Short aShort) {
+    super(aShort);
+    this.aclBit = aclBitStatus;
+  }
+
+  /**
+   * Returns true if "adl.feature.support.acl.bit" configuration is set to
+   * true.
+   *
+   * If configuration is not set then default value is true.
+   *
+   * @return If configuration is not set then default value is true.
+   */
+  public boolean getAclBit() {
+    return aclBit;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof FsPermission) {
+      FsPermission that = (FsPermission) obj;
+      return this.getUserAction() == that.getUserAction()
+          && this.getGroupAction() == that.getGroupAction()
+          && this.getOtherAction() == that.getOtherAction()
+          && this.getStickyBit() == that.getStickyBit();
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return toShort();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/SdkTokenProviderAdapter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/SdkTokenProviderAdapter.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/SdkTokenProviderAdapter.java
new file mode 100644
index 0000000..7b107ae
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/SdkTokenProviderAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.adl;
+
+import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider;
+import com.microsoft.azure.datalake.store.oauth2.AzureADToken;
+import org.apache.hadoop.fs.adl.oauth2.AzureADTokenProvider;
+
+import java.io.IOException;
+
+final class SdkTokenProviderAdapter extends AccessTokenProvider {
+
+  private AzureADTokenProvider tokenProvider;
+
+  SdkTokenProviderAdapter(AzureADTokenProvider tp) {
+    this.tokenProvider = tp;
+  }
+
+  protected AzureADToken refreshToken() throws IOException {
+    AzureADToken azureADToken = new AzureADToken();
+    azureADToken.accessToken = tokenProvider.getAccessToken();
+    azureADToken.expiry = tokenProvider.getExpiryTime();
+    return azureADToken;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/TokenProviderType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/TokenProviderType.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/TokenProviderType.java
new file mode 100644
index 0000000..9fd4f4f
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/TokenProviderType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.adl;
+
+enum TokenProviderType {
+  RefreshToken,
+  ClientCredential,
+  Custom
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/AzureADTokenProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/AzureADTokenProvider.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/AzureADTokenProvider.java
new file mode 100644
index 0000000..a0b3922
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/AzureADTokenProvider.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.adl.oauth2;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.Date;
+
+/**
+ * Provide an Azure Active Directory supported
+ * OAuth2 access token to be used to authenticate REST calls against Azure data
+ * lake file system {@link org.apache.hadoop.fs.adl.AdlFileSystem}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class AzureADTokenProvider {
+
+  /**
+   * Initialize with supported configuration. This method is invoked when the
+   * {@link org.apache.hadoop.fs.adl.AdlFileSystem#initialize
+   * (URI, Configuration)} method is invoked.
+   *
+   * @param configuration Configuration object
+   * @throws IOException if instance can not be configured.
+   */
+  public abstract void initialize(Configuration configuration)
+      throws IOException;
+
+  /**
+   * Obtain the access token that should be added to https connection's header.
+   * Will be called depending upon {@link #getExpiryTime()} expiry time is set,
+   * so implementations should be performant. Implementations are responsible
+   * for any refreshing of the token.
+   *
+   * @return String containing the access token
+   * @throws IOException if there is an error fetching the token
+   */
+  public abstract String getAccessToken() throws IOException;
+
+  /**
+   * Obtain expiry time of the token. If implementation is performant enough to
+   * maintain expiry and expect {@link #getAccessToken()} call for every
+   * connection then safe to return current or past time.
+   *
+   * However recommended to use the token expiry time received from Azure 
Active
+   * Directory.
+   *
+   * @return Date to expire access token retrieved from AAD.
+   */
+  public abstract Date getExpiryTime();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
deleted file mode 100644
index b7f3b00..0000000
--- 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.fs.adl.oauth2;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.LinkedHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.web.oauth2.AccessTokenProvider;
-import 
org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider;
-import 
org.apache.hadoop.hdfs.web.oauth2.PrivateCachedRefreshTokenBasedAccessTokenProvider;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
-import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
-import static 
org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY;
-
-/**
- * Share refresh tokens across all ADLS instances with a common client ID. The
- * {@link AccessTokenProvider} can be shared across multiple instances,
- * amortizing the cost of refreshing tokens.
- */
-public class CachedRefreshTokenBasedAccessTokenProvider
-    extends PrivateCachedRefreshTokenBasedAccessTokenProvider {
-
-  public static final String FORCE_REFRESH = "adl.force.token.refresh";
-
-  private static final Logger LOG =
-      
LoggerFactory.getLogger(CachedRefreshTokenBasedAccessTokenProvider.class);
-
-  /** Limit size of provider cache. */
-  static final int MAX_PROVIDERS = 10;
-  @SuppressWarnings("serial")
-  private static final Map<String, AccessTokenProvider> CACHE =
-      new LinkedHashMap<String, AccessTokenProvider>() {
-        @Override
-        public boolean removeEldestEntry(
-            Map.Entry<String, AccessTokenProvider> e) {
-          return size() > MAX_PROVIDERS;
-        }
-      };
-
-  private AccessTokenProvider instance = null;
-
-  /**
-   * Create handle for cached instance.
-   */
-  public CachedRefreshTokenBasedAccessTokenProvider() {
-  }
-
-  /**
-   * Gets the access token from internally cached
-   * ConfRefreshTokenBasedAccessTokenProvider instance.
-   *
-   * @return Valid OAuth2 access token for the user.
-   * @throws IOException when system error, internal server error or user error
-   */
-  @Override
-  public synchronized String getAccessToken() throws IOException {
-    return instance.getAccessToken();
-  }
-
-  /**
-   * @return A cached Configuration consistent with the parameters of this
-   * instance.
-   */
-  @Override
-  public synchronized Configuration getConf() {
-    return instance.getConf();
-  }
-
-  /**
-   * Configure cached instance. Note that the Configuration instance returned
-   * from subsequent calls to {@link #getConf() getConf} may be from a
-   * previous, cached entry.
-   * @param conf Configuration instance
-   */
-  @Override
-  public synchronized void setConf(Configuration conf) {
-    String id = conf.get(OAUTH_CLIENT_ID_KEY);
-    if (null == id) {
-      throw new IllegalArgumentException("Missing client ID");
-    }
-    synchronized (CACHE) {
-      instance = CACHE.get(id);
-      if (null == instance
-          || conf.getBoolean(FORCE_REFRESH, false)
-          || replace(instance, conf)) {
-        instance = newInstance();
-        // clone configuration
-        instance.setConf(new Configuration(conf));
-        CACHE.put(id, instance);
-        LOG.debug("Created new client {}", id);
-      }
-    }
-  }
-
-  AccessTokenProvider newInstance() {
-    return new ConfRefreshTokenBasedAccessTokenProvider();
-  }
-
-  private static boolean replace(AccessTokenProvider cached, Configuration c2) 
{
-    // ConfRefreshTokenBasedAccessTokenProvider::setConf asserts !null
-    final Configuration c1 = cached.getConf();
-    for (String key : new String[] {
-        OAUTH_REFRESH_TOKEN_KEY, OAUTH_REFRESH_URL_KEY }) {
-      if (!c1.get(key).equals(c2.get(key))) {
-        // replace cached instance for this clientID
-        return true;
-      }
-    }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
index b444984..1613941 100644
--- 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
@@ -20,4 +20,4 @@
 /**
  * public interface to expose OAuth2 authentication related features.
  */
-package org.apache.hadoop.fs.adl.oauth2;
+package org.apache.hadoop.fs.adl.oauth2;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
index 98e6a77..456eebc 100644
--- 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
@@ -20,4 +20,4 @@
 /**
  * Supporting classes for metrics instrumentation.
  */
-package org.apache.hadoop.fs.adl;
+package org.apache.hadoop.fs.adl;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
deleted file mode 100644
index a7f932f..0000000
--- 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdfs.web;
-
-/**
- * Constants.
- */
-public final class ADLConfKeys {
-  public static final String
-      ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN =
-      "adl.feature.override.readahead.max.concurrent.connection";
-  public static final int
-      ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT = 2;
-  public static final String ADL_WEBSDK_VERSION_KEY = "ADLFeatureSet";
-  static final String ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER =
-      "adl.debug.override.localuserasfileowner";
-  static final boolean ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT = false;
-  static final String ADL_FEATURE_REDIRECT_OFF =
-      "adl.feature.override.redirection.off";
-  static final boolean ADL_FEATURE_REDIRECT_OFF_DEFAULT = true;
-  static final String ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED =
-      "adl.feature.override.getblocklocation.locally.bundled";
-  static final boolean ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT
-      = true;
-  static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD =
-      "adl.feature.override.readahead";
-  static final boolean ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT =
-      true;
-  static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE =
-      "adl.feature.override.readahead.max.buffersize";
-
-  static final int KB = 1024;
-  static final int MB = KB * KB;
-  static final int DEFAULT_BLOCK_SIZE = 4 * MB;
-  static final int DEFAULT_EXTENT_SIZE = 256 * MB;
-  static final int DEFAULT_TIMEOUT_IN_SECONDS = 120;
-  static final int
-      ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT =
-      8 * MB;
-
-  private ADLConfKeys() {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c61ad24/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
deleted file mode 100644
index 350c6e7..0000000
--- 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdfs.web;
-
-/**
- * Responsible for holding buffered data in the process. Hold only 1 and only
- * 1 buffer block in the memory. Buffer block
- * information is for the given file and the offset from the which the block
- * is fetched. Across the webhdfs instances if
- * same buffer block has been used then backend trip is avoided. Buffer block
- * is certainly important since ADL fetches
- * large amount of data (Default is 4MB however can be configured through
- * core-site.xml) from the backend.
- * Observation is in case of ORC/Avro kind of compressed file, buffer block
- * does not avoid few backend calls across
- * webhdfs
- * instances.
- */
-final class BufferManager {
-  private static final BufferManager BUFFER_MANAGER_INSTANCE = new
-      BufferManager();
-  private static Object lock = new Object();
-  private Buffer buffer = null;
-  private String fileName;
-
-  /**
-   * Constructor.
-   */
-  private BufferManager() {
-  }
-
-  public static Object getLock() {
-    return lock;
-  }
-
-  public static BufferManager getInstance() {
-    return BUFFER_MANAGER_INSTANCE;
-  }
-
-  /**
-   * Validate if the current buffer block is of given stream.
-   *
-   * @param path   ADL stream path
-   * @param offset Stream offset that caller is interested in
-   * @return True if the buffer block is available otherwise false
-   */
-  boolean hasValidDataForOffset(String path, long offset) {
-    if (this.fileName == null) {
-      return false;
-    }
-
-    if (!this.fileName.equals(path)) {
-      return false;
-    }
-
-    if (buffer == null) {
-      return false;
-    }
-
-    if ((offset < buffer.offset) || (offset >= (buffer.offset
-        + buffer.data.length))) {
-      return false;
-    }
-
-    return true;
-  }
-
-  /**
-   * Clean buffer block.
-   */
-  void clear() {
-    buffer = null;
-  }
-
-  /**
-   * Validate if the current buffer block is of given stream. For now partial
-   * data available is not supported.
-   * Data must be available exactly or within the range of offset and size
-   * passed as parameter.
-   *
-   * @param path   Stream path
-   * @param offset Offset of the stream
-   * @param size   Size of the data from the offset of the stream caller
-   *               interested in
-   * @return True if the data is available from the given offset and of the
-   * size caller is interested in.
-   */
-  boolean hasData(String path, long offset, int size) {
-
-    if (!hasValidDataForOffset(path, offset)) {
-      return false;
-    }
-
-    if ((size + offset) > (buffer.data.length + buffer.offset)) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Return the buffer block from the requested offset. It is caller
-   * responsibility to check if the buffer block is
-   * of there interest and offset is valid.
-   *
-   * @param data   Byte array to be filed from the buffer block
-   * @param offset Data to be fetched from the offset.
-   */
-  void get(byte[] data, long offset) {
-    System.arraycopy(buffer.data, (int) (offset - buffer.offset), data, 0,
-        data.length);
-  }
-
-  /**
-   * Create new empty buffer block of the given size.
-   *
-   * @param len Size of the buffer block.
-   * @return Empty byte array.
-   */
-  byte[] getEmpty(int len) {
-    return new byte[len];
-  }
-
-  /**
-   * This function allows caller to specify new buffer block for the stream
-   * which is pulled from the backend.
-   *
-   * @param data   Buffer
-   * @param path   Stream path to which buffer belongs to
-   * @param offset Stream offset where buffer start with
-   */
-  void add(byte[] data, String path, long offset) {
-    if (data == null) {
-      return;
-    }
-
-    buffer = new Buffer();
-    buffer.data = data;
-    buffer.offset = offset;
-    this.fileName = path;
-  }
-
-  /**
-   * @return Size of the buffer.
-   */
-  int getBufferSize() {
-    return buffer.data.length;
-  }
-
-  /**
-   * @return Stream offset where buffer start with
-   */
-  long getBufferOffset() {
-    return buffer.offset;
-  }
-
-  /**
-   * Buffer container.
-   */
-  static class Buffer {
-    private byte[] data;
-    private long offset;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to